diff options
author | Aldo Cortesi <aldo@nullcube.com> | 2012-10-05 10:30:32 +1300 |
---|---|---|
committer | Aldo Cortesi <aldo@nullcube.com> | 2012-10-05 10:30:32 +1300 |
commit | f5d5cc49887d3a54bc5edc7905b90e5912ae9e8a (patch) | |
tree | e47d8a110995ceb40d03887b62e78817c516518a /test/test_language.py | |
parent | d3e1f8a0149d41dfb2660442d663be4425d5770e (diff) | |
download | mitmproxy-f5d5cc49887d3a54bc5edc7905b90e5912ae9e8a.tar.gz mitmproxy-f5d5cc49887d3a54bc5edc7905b90e5912ae9e8a.tar.bz2 mitmproxy-f5d5cc49887d3a54bc5edc7905b90e5912ae9e8a.zip |
rparse.py -> language.py
Diffstat (limited to 'test/test_language.py')
-rw-r--r-- | test/test_language.py | 529 |
1 files changed, 529 insertions, 0 deletions
diff --git a/test/test_language.py b/test/test_language.py new file mode 100644 index 00000000..0668fba2 --- /dev/null +++ b/test/test_language.py @@ -0,0 +1,529 @@ +import os, cStringIO +from libpathod import language, utils +import tutils + +language.TESTING = True + + +class TestMisc: + def test_generators(self): + v = language.Value.parseString("'val'")[0] + g = v.get_generator({}) + assert g[:] == "val" + + def test_randomgenerator(self): + g = language.RandomGenerator("bytes", 100) + assert repr(g) + assert len(g[:10]) == 10 + assert len(g[1:10]) == 9 + assert len(g[:1000]) == 100 + assert len(g[1000:1001]) == 0 + assert g[0] + + def test_literalgenerator(self): + g = language.LiteralGenerator("one") + assert repr(g) + assert g == "one" + assert g[:] == "one" + assert g[1] == "n" + + def test_filegenerator(self): + with tutils.tmpdir() as t: + path = os.path.join(t, "foo") + f = open(path, "w") + f.write("x"*10000) + f.close() + g = language.FileGenerator(path) + assert len(g) == 10000 + assert g[0] == "x" + assert g[-1] == "x" + assert g[0:5] == "xxxxx" + assert repr(g) + + def test_valueliteral(self): + v = language.ValueLiteral("foo") + assert v.expr() + assert v.val == "foo" + + v = language.ValueLiteral(r"foo\n") + assert v.expr() + assert v.val == "foo\n" + assert repr(v) + + def test_valuenakedliteral(self): + v = language.ValueNakedLiteral("foo") + assert v.expr() + assert repr(v) + + def test_file_value(self): + v = language.Value.parseString("<'one two'")[0] + assert str(v) + assert v.path == "one two" + + v = language.Value.parseString("<path")[0] + assert v.path == "path" + + with tutils.tmpdir() as t: + p = os.path.join(t, "path") + f = open(p, "w") + f.write("x"*10000) + f.close() + + assert v.get_generator(dict(staticdir=t)) + + v = language.Value.parseString("<path2")[0] + tutils.raises(language.FileAccessDenied, v.get_generator, dict(staticdir=t)) + tutils.raises("access disabled", v.get_generator, dict()) + + v = language.Value.parseString("</outside")[0] + tutils.raises("outside", v.get_generator, dict(staticdir=t)) + + def test_generated_value(self): + v = language.Value.parseString("@10b")[0] + assert v.usize == 10 + assert v.unit == "b" + assert v.bytes() == 10 + v = language.Value.parseString("@10")[0] + assert v.unit == "b" + v = language.Value.parseString("@10k")[0] + assert v.bytes() == 10240 + v = language.Value.parseString("@10g")[0] + assert v.bytes() == 1024**3 * 10 + + v = language.Value.parseString("@10g,digits")[0] + assert v.datatype == "digits" + g = v.get_generator({}) + assert g[:100] + + v = language.Value.parseString("@10,digits")[0] + assert v.unit == "b" + assert v.datatype == "digits" + + def test_value(self): + assert language.Value.parseString("'val'")[0].val == "val" + assert language.Value.parseString('"val"')[0].val == "val" + assert language.Value.parseString('"\'val\'"')[0].val == "'val'" + + def test_path(self): + e = language.Path.expr() + assert e.parseString('"/foo"')[0].value.val == "/foo" + e = language.Path("/foo") + assert e.value.val == "/foo" + + def test_method(self): + e = language.Method.expr() + assert e.parseString("get")[0].value.val == "GET" + assert e.parseString("'foo'")[0].value.val == "foo" + assert e.parseString("'get'")[0].value.val == "get" + + def test_raw(self): + e = language.Raw.expr() + assert e.parseString("r")[0] + + def test_body(self): + e = language.Body.expr() + v = e.parseString("b'foo'")[0] + assert v.value.val == "foo" + + v = e.parseString("b@100")[0] + assert str(v.value) == "@100b,bytes" + + v = e.parseString("b@100g,digits", parseAll=True)[0] + assert v.value.datatype == "digits" + assert str(v.value) == "@100g,digits" + + def test_header(self): + e = language.Header.expr() + v = e.parseString("h'foo'='bar'")[0] + assert v.key.val == "foo" + assert v.value.val == "bar" + + def test_code(self): + e = language.Code.expr() + v = e.parseString("200")[0] + assert v.code == 200 + + v = e.parseString("404'msg'")[0] + assert v.code == 404 + assert v.msg.val == "msg" + + r = e.parseString("200'foo'")[0] + assert r.msg.val == "foo" + + r = e.parseString("200'\"foo\"'")[0] + assert r.msg.val == "\"foo\"" + + r = e.parseString('200"foo"')[0] + assert r.msg.val == "foo" + + r = e.parseString('404')[0] + assert r.msg.val == "Not Found" + + r = e.parseString('10')[0] + assert r.msg.val == "Unknown code" + + def test_internal_response(self): + d = cStringIO.StringIO() + s = language.PathodErrorResponse("foo") + s.serve(d) + + +class TestDisconnects: + def test_parse_response(self): + assert (0, "disconnect") in language.parse_response({}, "400:d0").actions + assert ("r", "disconnect") in language.parse_response({}, "400:dr").actions + + def test_at(self): + e = language.DisconnectAt.expr() + v = e.parseString("d0")[0] + assert isinstance(v, language.DisconnectAt) + assert v.value == 0 + + v = e.parseString("d100")[0] + assert v.value == 100 + + e = language.DisconnectAt.expr() + v = e.parseString("dr")[0] + assert v.value == "r" + + +class TestInject: + def test_parse_response(self): + a = language.parse_response({}, "400:ir,@100").actions[0] + assert a[0] == "r" + assert a[1] == "inject" + + a = language.parse_response({}, "400:ia,@100").actions[0] + assert a[0] == "a" + assert a[1] == "inject" + + def test_at(self): + e = language.InjectAt.expr() + v = e.parseString("i0,'foo'")[0] + assert v.value.val == "foo" + assert v.offset == 0 + assert isinstance(v, language.InjectAt) + + v = e.parseString("ir,'foo'")[0] + assert v.offset == "r" + + def test_serve(self): + s = cStringIO.StringIO() + r = language.parse_response({}, "400:i0,'foo'") + assert r.serve(s, None) + + +class TestShortcuts: + def test_parse_response(self): + assert language.parse_response({}, "400:c'foo'").headers[0][0][:] == "Content-Type" + assert language.parse_response({}, "400:l'foo'").headers[0][0][:] == "Location" + + +class TestPauses: + def test_parse_response(self): + e = language.PauseAt.expr() + v = e.parseString("p10,10")[0] + assert v.seconds == 10 + assert v.offset == 10 + + v = e.parseString("p10,f")[0] + assert v.seconds == "f" + + v = e.parseString("pr,f")[0] + assert v.offset == "r" + + v = e.parseString("pa,f")[0] + assert v.offset == "a" + + def test_request(self): + r = language.parse_response({}, '400:p10,10') + assert r.actions[0] == (10, "pause", 10) + + +class TestParseRequest: + def test_file(self): + p = tutils.test_data.path("data") + d = dict(staticdir=p) + r = language.parse_request(d, "+request") + assert r.path == "/foo" + + def test_err(self): + tutils.raises(language.ParseException, language.parse_request, {}, 'GET') + + def test_simple(self): + r = language.parse_request({}, 'GET:"/foo"') + assert r.method == "GET" + assert r.path == "/foo" + r = language.parse_request({}, 'GET:/foo') + assert r.path == "/foo" + r = language.parse_request({}, 'GET:@1k') + assert len(r.path) == 1024 + + def test_render(self): + s = cStringIO.StringIO() + r = language.parse_request({}, "GET:'/foo'") + assert r.serve(s, None, "foo.com") + + def test_str(self): + r = language.parse_request({}, 'GET:"/foo"') + assert str(r) + + def test_multiline(self): + l = """ + GET + "/foo" + ir,@1 + """ + r = language.parse_request({}, l) + assert r.method == "GET" + assert r.path == "/foo" + assert r.actions + + + l = """ + GET + + "/foo + + + + bar" + + ir,@1 + """ + r = language.parse_request({}, l) + assert r.method == "GET" + assert r.path.s.endswith("bar") + assert r.actions + + +class TestParseResponse: + def test_parse_err(self): + tutils.raises(language.ParseException, language.parse_response, {}, "400:msg,b:") + try: + language.parse_response({}, "400'msg':b:") + except language.ParseException, v: + assert v.marked() + assert str(v) + + def test_parse_header(self): + r = language.parse_response({}, '400:h"foo"="bar"') + assert utils.get_header("foo", r.headers) + + def test_parse_pause_before(self): + r = language.parse_response({}, "400:p0,10") + assert (0, "pause", 10) in r.actions + + def test_parse_pause_after(self): + r = language.parse_response({}, "400:pa,10") + assert ("a", "pause", 10) in r.actions + + def test_parse_pause_random(self): + r = language.parse_response({}, "400:pr,10") + assert ("r", "pause", 10) in r.actions + + def test_parse_stress(self): + r = language.parse_response({}, "400:b@100g") + assert r.length() + + +class TestWriteValues: + def test_send_chunk(self): + v = "foobarfoobar" + for bs in range(1, len(v)+2): + s = cStringIO.StringIO() + language.send_chunk(s, v, bs, 0, len(v)) + assert s.getvalue() == v + for start in range(len(v)): + for end in range(len(v)): + s = cStringIO.StringIO() + language.send_chunk(s, v, bs, start, end) + assert s.getvalue() == v[start:end] + + def test_write_values_inject(self): + tst = "foo" + + s = cStringIO.StringIO() + language.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5) + assert s.getvalue() == "aaafoo" + + s = cStringIO.StringIO() + language.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5) + assert s.getvalue() == "faaaoo" + + s = cStringIO.StringIO() + language.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5) + assert s.getvalue() == "faaaoo" + + def test_write_values_disconnects(self): + s = cStringIO.StringIO() + tst = "foo"*100 + language.write_values(s, [tst], [(0, "disconnect")], blocksize=5) + assert not s.getvalue() + + def test_write_values(self): + tst = "foobarvoing" + s = cStringIO.StringIO() + language.write_values(s, [tst], []) + assert s.getvalue() == tst + + for bs in range(1, len(tst) + 2): + for off in range(len(tst)): + s = cStringIO.StringIO() + language.write_values(s, [tst], [(off, "disconnect")], blocksize=bs) + assert s.getvalue() == tst[:off] + + def test_write_values_pauses(self): + tst = "".join(str(i) for i in range(10)) + for i in range(2, 10): + s = cStringIO.StringIO() + language.write_values(s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i) + assert s.getvalue() == tst + + for i in range(2, 10): + s = cStringIO.StringIO() + language.write_values(s, [tst], [(1, "pause", 0)], blocksize=i) + assert s.getvalue() == tst + + tst = ["".join(str(i) for i in range(10))]*5 + for i in range(2, 10): + s = cStringIO.StringIO() + language.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i) + assert s.getvalue() == "".join(tst) + + def test_write_values_after(self): + s = cStringIO.StringIO() + r = language.parse_response({}, "400:da") + r.serve(s, None) + + s = cStringIO.StringIO() + r = language.parse_response({}, "400:pa,0") + r.serve(s, None) + + s = cStringIO.StringIO() + r = language.parse_response({}, "400:ia,'xx'") + r.serve(s, None) + assert s.getvalue().endswith('xx') + + +def test_ready_actions(): + x = [(0, 5)] + assert language.ready_actions(100, x) == x + + x = [("r", 5)] + ret = language.ready_actions(100, x) + assert 0 <= ret[0][0] < 100 + + x = [("a", "pause", 5)] + ret = language.ready_actions(100, x) + assert ret[0][0] > 100 + + x = [(1, 5), (0, 5)] + assert language.ready_actions(100, x) == sorted(x) + + +class TestResponse: + def dummy_response(self): + return language.parse_response({}, "400'msg'") + + def test_file(self): + p = tutils.test_data.path("data") + d = dict(staticdir=p) + r = language.parse_response(d, "+response") + assert r.code == 202 + + def test_response(self): + r = language.parse_response({}, "400'msg'") + assert r.code == 400 + assert r.msg == "msg" + + r = language.parse_response({}, "400'msg':b@100b") + assert r.msg == "msg" + assert r.body[:] + assert str(r) + + def test_checkfunc(self): + s = cStringIO.StringIO() + r = language.parse_response({}, "400:b@100k") + def check(req, acts): + return "errmsg" + assert r.serve(s, check=check)["error"] == "errmsg" + + def test_render(self): + s = cStringIO.StringIO() + r = language.parse_response({}, "400'msg'") + assert r.serve(s, None) + + def test_raw(self): + s = cStringIO.StringIO() + r = language.parse_response({}, "400:b'foo'") + r.serve(s, None) + v = s.getvalue() + assert "Content-Length" in v + assert "Date" in v + + s = cStringIO.StringIO() + r = language.parse_response({}, "400:b'foo':r") + r.serve(s, None) + v = s.getvalue() + assert not "Content-Length" in v + assert not "Date" in v + + def test_length(self): + def testlen(x): + s = cStringIO.StringIO() + x.serve(s, None) + assert x.length() == len(s.getvalue()) + testlen(language.parse_response({}, "400'msg'")) + testlen(language.parse_response({}, "400'msg':h'foo'='bar'")) + testlen(language.parse_response({}, "400'msg':h'foo'='bar':b@100b")) + + def test_effective_length(self): + def testlen(x, actions): + s = cStringIO.StringIO() + x.serve(s, None) + assert x.effective_length(actions) == len(s.getvalue()) + actions = [ + + ] + r = language.parse_response({}, "400'msg':b@100") + + actions = [ + (0, "disconnect"), + ] + r.actions = actions + testlen(r, actions) + + actions = [ + (0, "disconnect"), + (0, "inject", "foo") + ] + r.actions = actions + testlen(r, actions) + + actions = [ + (0, "inject", "foo") + ] + r.actions = actions + testlen(r, actions) + + def test_render(self): + r = language.parse_response({}, "400:p0,100:dr") + assert r.actions[0][1] == "pause" + assert len(r.preview_safe()) == 1 + assert not r.actions[0][1] == "pause" + + + +def test_read_file(): + tutils.raises(language.FileAccessDenied, language.read_file, {}, "=/foo") + p = tutils.test_data.path("data") + d = dict(staticdir=p) + assert language.read_file(d, "+./file").strip() == "testfile" + assert language.read_file(d, "+file").strip() == "testfile" + tutils.raises(language.FileAccessDenied, language.read_file, d, "+./nonexistent") + tutils.raises(language.FileAccessDenied, language.read_file, d, "+/nonexistent") + + tutils.raises(language.FileAccessDenied, language.read_file, d, "+../test_language.py") + d["unconstrained_file_access"] = True + assert language.read_file(d, "+../test_language.py") |