diff options
Diffstat (limited to 'test/pathod/language')
-rw-r--r-- | test/pathod/language/__init__.py | 0 | ||||
-rw-r--r-- | test/pathod/language/test_actions.py | 134 | ||||
-rw-r--r-- | test/pathod/language/test_base.py | 354 | ||||
-rw-r--r-- | test/pathod/language/test_exceptions.py | 1 | ||||
-rw-r--r-- | test/pathod/language/test_generators.py | 45 | ||||
-rw-r--r-- | test/pathod/language/test_http.py | 355 | ||||
-rw-r--r-- | test/pathod/language/test_http2.py | 236 | ||||
-rw-r--r-- | test/pathod/language/test_message.py | 1 | ||||
-rw-r--r-- | test/pathod/language/test_websockets.py | 142 | ||||
-rw-r--r-- | test/pathod/language/test_writer.py | 90 |
10 files changed, 1358 insertions, 0 deletions
diff --git a/test/pathod/language/__init__.py b/test/pathod/language/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/pathod/language/__init__.py diff --git a/test/pathod/language/test_actions.py b/test/pathod/language/test_actions.py new file mode 100644 index 00000000..9740e5c7 --- /dev/null +++ b/test/pathod/language/test_actions.py @@ -0,0 +1,134 @@ +import io + +from pathod.language import actions, parse_pathoc, parse_pathod, serve + + +def parse_request(s): + return next(parse_pathoc(s)) + + +def test_unique_name(): + assert not actions.PauseAt(0, "f").unique_name + assert actions.DisconnectAt(0).unique_name + + +class TestDisconnects: + + def test_parse_pathod(self): + a = next(parse_pathod("400:d0")).actions[0] + assert a.spec() == "d0" + a = next(parse_pathod("400:dr")).actions[0] + assert a.spec() == "dr" + + def test_at(self): + e = actions.DisconnectAt.expr() + v = e.parseString("d0")[0] + assert isinstance(v, actions.DisconnectAt) + assert v.offset == 0 + + v = e.parseString("d100")[0] + assert v.offset == 100 + + e = actions.DisconnectAt.expr() + v = e.parseString("dr")[0] + assert v.offset == "r" + + def test_spec(self): + assert actions.DisconnectAt("r").spec() == "dr" + assert actions.DisconnectAt(10).spec() == "d10" + + +class TestInject: + + def test_parse_pathod(self): + a = next(parse_pathod("400:ir,@100")).actions[0] + assert a.offset == "r" + assert a.value.datatype == "bytes" + assert a.value.usize == 100 + + a = next(parse_pathod("400:ia,@100")).actions[0] + assert a.offset == "a" + + def test_at(self): + e = actions.InjectAt.expr() + v = e.parseString("i0,'foo'")[0] + assert v.value.val == b"foo" + assert v.offset == 0 + assert isinstance(v, actions.InjectAt) + + v = e.parseString("ir,'foo'")[0] + assert v.offset == "r" + + def test_serve(self): + s = io.BytesIO() + r = next(parse_pathod("400:i0,'foo'")) + assert serve(r, s, {}) + + def test_spec(self): + e = actions.InjectAt.expr() + v = e.parseString("i0,'foo'")[0] + assert v.spec() == "i0,'foo'" + + def test_spec2(self): + e = actions.InjectAt.expr() + v = e.parseString("i0,@100")[0] + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.value.val == v3.value.val + + +class TestPauses: + + def test_parse_pathod(self): + e = actions.PauseAt.expr() + v = e.parseString("p10,10")[0] + assert v.seconds == 10 + assert v.offset == 10 + + v = e.parseString("p10,f")[0] + assert v.seconds == "f" + + v = e.parseString("pr,f")[0] + assert v.offset == "r" + + v = e.parseString("pa,f")[0] + assert v.offset == "a" + + def test_request(self): + r = next(parse_pathod('400:p10,10')) + assert r.actions[0].spec() == "p10,10" + + def test_spec(self): + assert actions.PauseAt("r", 5).spec() == "pr,5" + assert actions.PauseAt(0, 5).spec() == "p0,5" + assert actions.PauseAt(0, "f").spec() == "p0,f" + + def test_freeze(self): + l = actions.PauseAt("r", 5) + assert l.freeze({}).spec() == l.spec() + + +class Test_Action: + + def test_cmp(self): + a = actions.DisconnectAt(0) + b = actions.DisconnectAt(1) + c = actions.DisconnectAt(0) + assert a < b + assert a == c + l = sorted([b, a]) + assert l[0].offset == 0 + + def test_resolve(self): + r = parse_request('GET:"/foo"') + e = actions.DisconnectAt("r") + ret = e.resolve({}, r) + assert isinstance(ret.offset, int) + + def test_repr(self): + e = actions.DisconnectAt("r") + assert repr(e) + + def test_freeze(self): + l = actions.DisconnectAt(5) + assert l.freeze({}).spec() == l.spec() diff --git a/test/pathod/language/test_base.py b/test/pathod/language/test_base.py new file mode 100644 index 00000000..85e9e53b --- /dev/null +++ b/test/pathod/language/test_base.py @@ -0,0 +1,354 @@ +import os +import pytest + +from pathod import language +from pathod.language import base, exceptions + +from mitmproxy.test import tutils + + +def parse_request(s): + return language.parse_pathoc(s).next() + + +def test_times(): + reqs = list(language.parse_pathoc("get:/:x5")) + assert len(reqs) == 5 + assert not reqs[0].times + + +def test_caseless_literal(): + class CL(base.CaselessLiteral): + TOK = "foo" + v = CL("foo") + assert v.expr() + assert v.values(language.Settings()) + + +class TestTokValueNakedLiteral: + + def test_expr(self): + v = base.TokValueNakedLiteral("foo") + assert v.expr() + + def test_spec(self): + v = base.TokValueNakedLiteral("foo") + assert v.spec() == repr(v) == "foo" + + v = base.TokValueNakedLiteral("f\x00oo") + assert v.spec() == repr(v) == r"f\x00oo" + + +class TestTokValueLiteral: + + def test_expr(self): + v = base.TokValueLiteral("foo") + assert v.expr() + assert v.val == b"foo" + + v = base.TokValueLiteral("foo\n") + assert v.expr() + assert v.val == b"foo\n" + assert repr(v) + + def test_spec(self): + v = base.TokValueLiteral("foo") + assert v.spec() == r"'foo'" + + v = base.TokValueLiteral("f\x00oo") + assert v.spec() == repr(v) == r"'f\x00oo'" + + v = base.TokValueLiteral('"') + assert v.spec() == repr(v) == """ '"' """.strip() + + # While pyparsing has a escChar argument for QuotedString, + # escChar only performs scapes single-character escapes and does not work for e.g. r"\x02". + # Thus, we cannot use that option, which means we cannot have single quotes in strings. + # To fix this, we represent single quotes as r"\x07". + v = base.TokValueLiteral("'") + assert v.spec() == r"'\x27'" + + def roundtrip(self, spec): + e = base.TokValueLiteral.expr() + v = base.TokValueLiteral(spec) + v2 = e.parseString(v.spec()) + assert v.val == v2[0].val + assert v.spec() == v2[0].spec() + + def test_roundtrip(self): + self.roundtrip("'") + self.roundtrip(r"\'") + self.roundtrip("a") + self.roundtrip("\"") + # self.roundtrip("\\") + self.roundtrip("200:b'foo':i23,'\\''") + self.roundtrip("\a") + + +class TestTokValueGenerate: + + def test_basic(self): + v = base.TokValue.parseString("@10b")[0] + assert v.usize == 10 + assert v.unit == "b" + assert v.bytes() == 10 + v = base.TokValue.parseString("@10")[0] + assert v.unit == "b" + v = base.TokValue.parseString("@10k")[0] + assert v.bytes() == 10240 + v = base.TokValue.parseString("@10g")[0] + assert v.bytes() == 1024 ** 3 * 10 + + v = base.TokValue.parseString("@10g,digits")[0] + assert v.datatype == "digits" + g = v.get_generator({}) + assert g[:100] + + v = base.TokValue.parseString("@10,digits")[0] + assert v.unit == "b" + assert v.datatype == "digits" + + def test_spec(self): + v = base.TokValueGenerate(1, "b", "bytes") + assert v.spec() == repr(v) == "@1" + + v = base.TokValueGenerate(1, "k", "bytes") + assert v.spec() == repr(v) == "@1k" + + v = base.TokValueGenerate(1, "k", "ascii") + assert v.spec() == repr(v) == "@1k,ascii" + + v = base.TokValueGenerate(1, "b", "ascii") + assert v.spec() == repr(v) == "@1,ascii" + + def test_freeze(self): + v = base.TokValueGenerate(100, "b", "ascii") + f = v.freeze(language.Settings()) + assert len(f.val) == 100 + + +class TestTokValueFile: + + def test_file_value(self): + v = base.TokValue.parseString("<'one two'")[0] + assert str(v) + assert v.path == "one two" + + v = base.TokValue.parseString("<path")[0] + assert v.path == "path" + + def test_access_control(self): + v = base.TokValue.parseString("<path")[0] + with tutils.tmpdir() as t: + p = os.path.join(t, "path") + with open(p, "wb") as f: + f.write(b"x" * 10000) + + assert v.get_generator(language.Settings(staticdir=t)) + + v = base.TokValue.parseString("<path2")[0] + with pytest.raises(exceptions.FileAccessDenied): + v.get_generator(language.Settings(staticdir=t)) + with pytest.raises(Exception, match="access disabled"): + v.get_generator(language.Settings()) + + v = base.TokValue.parseString("</outside")[0] + with pytest.raises(Exception, match="outside"): + v.get_generator(language.Settings(staticdir=t)) + + def test_spec(self): + v = base.TokValue.parseString("<'one two'")[0] + v2 = base.TokValue.parseString(v.spec())[0] + assert v2.path == "one two" + + def test_freeze(self): + v = base.TokValue.parseString("<'one two'")[0] + v2 = v.freeze({}) + assert v2.path == v.path + + +class TestMisc: + + def test_generators(self): + v = base.TokValue.parseString("'val'")[0] + g = v.get_generator({}) + assert g[:] == b"val" + + def test_value(self): + assert base.TokValue.parseString("'val'")[0].val == b"val" + assert base.TokValue.parseString('"val"')[0].val == b"val" + assert base.TokValue.parseString('"\'val\'"')[0].val == b"'val'" + + def test_value2(self): + class TT(base.Value): + preamble = "m" + e = TT.expr() + v = e.parseString("m'msg'")[0] + assert v.value.val == b"msg" + + s = v.spec() + assert s == e.parseString(s)[0].spec() + + v = e.parseString("m@100")[0] + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.value.val == v3.value.val + + def test_fixedlengthvalue(self, tmpdir): + class TT(base.FixedLengthValue): + preamble = "m" + length = 4 + + e = TT.expr() + assert e.parseString("m@4") + with pytest.raises(Exception, match="Invalid value length"): + e.parseString("m@100") + with pytest.raises(Exception, match="Invalid value length"): + e.parseString("m@1") + + s = base.Settings(staticdir=str(tmpdir)) + tmpdir.join("path").write_binary(b"a" * 20, ensure=True) + v = e.parseString("m<path")[0] + with pytest.raises(Exception, match="Invalid value length"): + v.values(s) + + tmpdir.join("path2").write_binary(b"a" * 4, ensure=True) + v = e.parseString("m<path2")[0] + assert v.values(s) + + +class TKeyValue(base.KeyValue): + preamble = "h" + + def values(self, settings): + return [ + self.key.get_generator(settings), + ": ", + self.value.get_generator(settings), + "\r\n", + ] + + +class TestKeyValue: + + def test_simple(self): + e = TKeyValue.expr() + v = e.parseString("h'foo'='bar'")[0] + assert v.key.val == b"foo" + assert v.value.val == b"bar" + + v2 = e.parseString(v.spec())[0] + assert v2.key.val == v.key.val + assert v2.value.val == v.value.val + + s = v.spec() + assert s == e.parseString(s)[0].spec() + + def test_freeze(self): + e = TKeyValue.expr() + v = e.parseString("h@10=@10'")[0] + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.key.val == v3.key.val + assert v2.value.val == v3.value.val + + +def test_intfield(): + class TT(base.IntField): + preamble = "t" + names = { + "one": 1, + "two": 2, + "three": 3 + } + max = 4 + e = TT.expr() + + v = e.parseString("tone")[0] + assert v.value == 1 + assert v.spec() == "tone" + assert v.values(language.Settings()) + + v = e.parseString("t1")[0] + assert v.value == 1 + assert v.spec() == "t1" + + v = e.parseString("t4")[0] + assert v.value == 4 + assert v.spec() == "t4" + + with pytest.raises(Exception, match="can't exceed"): + e.parseString("t5") + + +def test_options_or_value(): + class TT(base.OptionsOrValue): + options = [ + "one", + "two", + "three" + ] + e = TT.expr() + assert e.parseString("one")[0].value.val == b"one" + assert e.parseString("'foo'")[0].value.val == b"foo" + assert e.parseString("'get'")[0].value.val == b"get" + + assert e.parseString("one")[0].spec() == "one" + assert e.parseString("'foo'")[0].spec() == "'foo'" + + s = e.parseString("one")[0].spec() + assert s == e.parseString(s)[0].spec() + + s = e.parseString("'foo'")[0].spec() + assert s == e.parseString(s)[0].spec() + + v = e.parseString("@100")[0] + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.value.val == v3.value.val + + +def test_integer(): + e = base.Integer.expr() + v = e.parseString("200")[0] + assert v.string() == b"200" + assert v.spec() == "200" + + assert v.freeze({}).value == v.value + + class BInt(base.Integer): + bounds = (1, 5) + + with pytest.raises(Exception, match="must be between"): + BInt(0) + with pytest.raises(Exception, match="must be between"): + BInt(6) + assert BInt(5) + assert BInt(1) + assert BInt(3) + + +class TBoolean(base.Boolean): + name = "test" + + +def test_unique_name(): + b = TBoolean(True) + assert b.unique_name + + +class test_boolean: + e = TBoolean.expr() + assert e.parseString("test")[0].value + assert not e.parseString("-test")[0].value + + def roundtrip(s): + e = TBoolean.expr() + s2 = e.parseString(s)[0].spec() + v1 = e.parseString(s)[0].value + v2 = e.parseString(s2)[0].value + assert s == s2 + assert v1 == v2 + + roundtrip("test") + roundtrip("-test") diff --git a/test/pathod/language/test_exceptions.py b/test/pathod/language/test_exceptions.py new file mode 100644 index 00000000..777ab4dd --- /dev/null +++ b/test/pathod/language/test_exceptions.py @@ -0,0 +1 @@ +# TODO: write tests diff --git a/test/pathod/language/test_generators.py b/test/pathod/language/test_generators.py new file mode 100644 index 00000000..b3ce0335 --- /dev/null +++ b/test/pathod/language/test_generators.py @@ -0,0 +1,45 @@ +import os + +from pathod.language import generators +from mitmproxy.test import tutils + + +def test_randomgenerator(): + g = generators.RandomGenerator("bytes", 100) + assert repr(g) + assert g[0] + assert len(g[0]) == 1 + assert len(g[:10]) == 10 + assert len(g[1:10]) == 9 + assert len(g[:1000]) == 100 + assert len(g[1000:1001]) == 0 + + +def test_filegenerator(): + with tutils.tmpdir() as t: + path = os.path.join(t, "foo") + f = open(path, "wb") + f.write(b"x" * 10000) + f.close() + g = generators.FileGenerator(path) + assert len(g) == 10000 + assert g[0] == b"x" + assert g[-1] == b"x" + assert g[0:5] == b"xxxxx" + assert len(g[1:10]) == 9 + assert len(g[10000:10001]) == 0 + assert repr(g) + # remove all references to FileGenerator instance to close the file + # handle. + del g + + +def test_transform_generator(): + def trans(offset, data): + return "a" * len(data) + g = "one" + t = generators.TransformGenerator(g, trans) + assert len(t) == len(g) + assert t[0] == "a" + assert t[:] == "a" * len(g) + assert repr(t) diff --git a/test/pathod/language/test_http.py b/test/pathod/language/test_http.py new file mode 100644 index 00000000..a5b35c05 --- /dev/null +++ b/test/pathod/language/test_http.py @@ -0,0 +1,355 @@ +import io +import pytest + +from pathod import language +from pathod.language import http, base + +from .. import tservers + + +def parse_request(s): + return next(language.parse_pathoc(s)) + + +def test_make_error_response(): + d = io.BytesIO() + s = http.make_error_response("foo") + language.serve(s, d, {}) + + +class TestRequest: + + def test_nonascii(self): + with pytest.raises(Exception, match="ASCII"): + parse_request("get:\xf0") + + def test_err(self): + with pytest.raises(language.ParseException): + parse_request('GET') + + def test_simple(self): + r = parse_request('GET:"/foo"') + assert r.method.string() == b"GET" + assert r.path.string() == b"/foo" + r = parse_request('GET:/foo') + assert r.path.string() == b"/foo" + r = parse_request('GET:@1k') + assert len(r.path.string()) == 1024 + + def test_multiple(self): + r = list(language.parse_pathoc("GET:/ PUT:/")) + assert r[0].method.string() == b"GET" + assert r[1].method.string() == b"PUT" + assert len(r) == 2 + + l = """ + GET + "/foo" + ir,@1 + + PUT + + "/foo + + + + bar" + + ir,@1 + """ + r = list(language.parse_pathoc(l)) + assert len(r) == 2 + assert r[0].method.string() == b"GET" + assert r[1].method.string() == b"PUT" + + l = """ + get:"http://localhost:9999/p/200":ir,@1 + get:"http://localhost:9999/p/200":ir,@2 + """ + r = list(language.parse_pathoc(l)) + assert len(r) == 2 + assert r[0].method.string() == b"GET" + assert r[1].method.string() == b"GET" + + def test_nested_response(self): + l = "get:/p:s'200'" + r = list(language.parse_pathoc(l)) + assert len(r) == 1 + assert len(r[0].tokens) == 3 + assert isinstance(r[0].tokens[2], http.NestedResponse) + assert r[0].values({}) + + def test_render(self): + s = io.BytesIO() + r = parse_request("GET:'/foo'") + assert language.serve( + r, + s, + language.Settings(request_host="foo.com") + ) + + def test_multiline(self): + l = """ + GET + "/foo" + ir,@1 + """ + r = parse_request(l) + assert r.method.string() == b"GET" + assert r.path.string() == b"/foo" + assert r.actions + + l = """ + GET + + "/foo + + + + bar" + + ir,@1 + """ + r = parse_request(l) + assert r.method.string() == b"GET" + assert r.path.string().endswith(b"bar") + assert r.actions + + def test_spec(self): + def rt(s): + s = parse_request(s).spec() + assert parse_request(s).spec() == s + rt("get:/foo") + rt("get:/foo:da") + + def test_freeze(self): + r = parse_request("GET:/:b@100").freeze(language.Settings()) + assert len(r.spec()) > 100 + + def test_path_generator(self): + r = parse_request("GET:@100").freeze(language.Settings()) + assert len(r.spec()) > 100 + + def test_websocket(self): + r = parse_request('ws:/path/') + res = r.resolve(language.Settings()) + assert res.method.string().lower() == b"get" + assert res.tok(http.Path).value.val == b"/path/" + assert res.tok(http.Method).value.val.lower() == b"get" + assert http.get_header(b"Upgrade", res.headers).value.val == b"websocket" + + r = parse_request('ws:put:/path/') + res = r.resolve(language.Settings()) + assert r.method.string().lower() == b"put" + assert res.tok(http.Path).value.val == b"/path/" + assert res.tok(http.Method).value.val.lower() == b"put" + assert http.get_header(b"Upgrade", res.headers).value.val == b"websocket" + + +class TestResponse: + + def dummy_response(self): + return next(language.parse_pathod("400'msg'")) + + def test_response(self): + r = next(language.parse_pathod("400:m'msg'")) + assert r.status_code.string() == b"400" + assert r.reason.string() == b"msg" + + r = next(language.parse_pathod("400:m'msg':b@100b")) + assert r.reason.string() == b"msg" + assert r.body.values({}) + assert str(r) + + r = next(language.parse_pathod("200")) + assert r.status_code.string() == b"200" + assert not r.reason + assert b"OK" in [i[:] for i in r.preamble({})] + + def test_render(self): + s = io.BytesIO() + r = next(language.parse_pathod("400:m'msg'")) + assert language.serve(r, s, {}) + + r = next(language.parse_pathod("400:p0,100:dr")) + assert "p0" in r.spec() + s = r.preview_safe() + assert "p0" not in s.spec() + + def test_raw(self): + s = io.BytesIO() + r = next(language.parse_pathod("400:b'foo'")) + language.serve(r, s, {}) + v = s.getvalue() + assert b"Content-Length" in v + + s = io.BytesIO() + r = next(language.parse_pathod("400:b'foo':r")) + language.serve(r, s, {}) + v = s.getvalue() + assert b"Content-Length" not in v + + def test_length(self): + def testlen(x): + s = io.BytesIO() + x = next(x) + language.serve(x, s, language.Settings()) + assert x.length(language.Settings()) == len(s.getvalue()) + testlen(language.parse_pathod("400:m'msg':r")) + testlen(language.parse_pathod("400:m'msg':h'foo'='bar':r")) + testlen(language.parse_pathod("400:m'msg':h'foo'='bar':b@100b:r")) + + def test_maximum_length(self): + def testlen(x): + x = next(x) + s = io.BytesIO() + m = x.maximum_length({}) + language.serve(x, s, {}) + assert m >= len(s.getvalue()) + + r = language.parse_pathod("400:m'msg':b@100:d0") + testlen(r) + + r = language.parse_pathod("400:m'msg':b@100:d0:i0,'foo'") + testlen(r) + + r = language.parse_pathod("400:m'msg':b@100:d0:i0,'foo'") + testlen(r) + + def test_parse_err(self): + with pytest.raises(language.ParseException): + language.parse_pathod("400:msg,b:") + try: + language.parse_pathod("400'msg':b:") + except language.ParseException as v: + assert v.marked() + assert str(v) + + def test_nonascii(self): + with pytest.raises(Exception, match="ASCII"): + language.parse_pathod("foo:b\xf0") + + def test_parse_header(self): + r = next(language.parse_pathod('400:h"foo"="bar"')) + assert http.get_header(b"foo", r.headers) + + def test_parse_pause_before(self): + r = next(language.parse_pathod("400:p0,10")) + assert r.actions[0].spec() == "p0,10" + + def test_parse_pause_after(self): + r = next(language.parse_pathod("400:pa,10")) + assert r.actions[0].spec() == "pa,10" + + def test_parse_pause_random(self): + r = next(language.parse_pathod("400:pr,10")) + assert r.actions[0].spec() == "pr,10" + + def test_parse_stress(self): + # While larger values are known to work on linux, len() technically + # returns an int and a python 2.7 int on windows has 32bit precision. + # Therefore, we should keep the body length < 2147483647 bytes in our + # tests. + r = next(language.parse_pathod("400:b@1g")) + assert r.length({}) + + def test_spec(self): + def rt(s): + s = next(language.parse_pathod(s)).spec() + assert next(language.parse_pathod(s)).spec() == s + rt("400:b@100g") + rt("400") + rt("400:da") + + def test_websockets(self): + r = next(language.parse_pathod("ws")) + with pytest.raises(Exception, match="No websocket key"): + r.resolve(language.Settings()) + res = r.resolve(language.Settings(websocket_key=b"foo")) + assert res.status_code.string() == b"101" + + +def test_ctype_shortcut(): + e = http.ShortcutContentType.expr() + v = e.parseString("c'foo'")[0] + assert v.key.val == b"Content-Type" + assert v.value.val == b"foo" + + s = v.spec() + assert s == e.parseString(s)[0].spec() + + e = http.ShortcutContentType.expr() + v = e.parseString("c@100")[0] + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.value.val == v3.value.val + + +def test_location_shortcut(): + e = http.ShortcutLocation.expr() + v = e.parseString("l'foo'")[0] + assert v.key.val == b"Location" + assert v.value.val == b"foo" + + s = v.spec() + assert s == e.parseString(s)[0].spec() + + e = http.ShortcutLocation.expr() + v = e.parseString("l@100")[0] + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.value.val == v3.value.val + + +def test_shortcuts(): + assert next(language.parse_pathod( + "400:c'foo'")).headers[0].key.val == b"Content-Type" + assert next(language.parse_pathod( + "400:l'foo'")).headers[0].key.val == b"Location" + + assert b"Android" in tservers.render(parse_request("get:/:ua")) + assert b"User-Agent" in tservers.render(parse_request("get:/:ua")) + + +def test_user_agent(): + e = http.ShortcutUserAgent.expr() + v = e.parseString("ua")[0] + assert b"Android" in v.string() + + e = http.ShortcutUserAgent.expr() + v = e.parseString("u'a'")[0] + assert b"Android" not in v.string() + + v = e.parseString("u@100'")[0] + assert len(str(v.freeze({}).value)) > 100 + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.value.val == v3.value.val + + +def test_nested_response(): + e = http.NestedResponse.expr() + v = e.parseString("s'200'")[0] + assert v.value.val == b"200" + with pytest.raises(language.ParseException): + e.parseString("s'foo'") + + v = e.parseString('s"200:b@1"')[0] + assert "@1" in v.spec() + f = v.freeze({}) + assert "@1" not in f.spec() + + +def test_nested_response_freeze(): + e = http.NestedResponse( + base.TokValueLiteral( + r"200:b\'foo\':i10,\'\\x27\'" + ) + ) + assert e.freeze({}) + assert e.values({}) + + +def test_unique_components(): + with pytest.raises(Exception, match="multiple body clauses"): + language.parse_pathod("400:b@1:b@1") diff --git a/test/pathod/language/test_http2.py b/test/pathod/language/test_http2.py new file mode 100644 index 00000000..4f89adb8 --- /dev/null +++ b/test/pathod/language/test_http2.py @@ -0,0 +1,236 @@ +import io +import pytest + +from mitmproxy.net import tcp +from mitmproxy.net.http import user_agents + +from pathod import language +from pathod.language import http2 +from pathod.protocols.http2 import HTTP2StateProtocol + + +def parse_request(s): + return next(language.parse_pathoc(s, True)) + + +def parse_response(s): + return next(language.parse_pathod(s, True)) + + +def default_settings(): + return language.Settings( + request_host="foo.com", + protocol=HTTP2StateProtocol(tcp.TCPClient(('localhost', 1234))) + ) + + +def test_make_error_response(): + d = io.BytesIO() + s = http2.make_error_response("foo", "bar") + language.serve(s, d, default_settings()) + + +class TestRequest: + + def test_cached_values(self): + req = parse_request("get:/") + req_id = id(req) + assert req_id == id(req.resolve(default_settings())) + assert req.values(default_settings()) == req.values(default_settings()) + + def test_nonascii(self): + with pytest.raises(Exception, match="ASCII"): + parse_request("get:\xf0") + + def test_err(self): + with pytest.raises(language.ParseException): + parse_request('GET') + + def test_simple(self): + r = parse_request('GET:"/foo"') + assert r.method.string() == b"GET" + assert r.path.string() == b"/foo" + r = parse_request('GET:/foo') + assert r.path.string() == b"/foo" + + def test_multiple(self): + r = list(language.parse_pathoc("GET:/ PUT:/")) + assert r[0].method.string() == b"GET" + assert r[1].method.string() == b"PUT" + assert len(r) == 2 + + l = """ + GET + "/foo" + + PUT + + "/foo + + + + bar" + """ + r = list(language.parse_pathoc(l, True)) + assert len(r) == 2 + assert r[0].method.string() == b"GET" + assert r[1].method.string() == b"PUT" + + l = """ + get:"http://localhost:9999/p/200" + get:"http://localhost:9999/p/200" + """ + r = list(language.parse_pathoc(l, True)) + assert len(r) == 2 + assert r[0].method.string() == b"GET" + assert r[1].method.string() == b"GET" + + def test_render_simple(self): + s = io.BytesIO() + r = parse_request("GET:'/foo'") + assert language.serve( + r, + s, + default_settings(), + ) + + def test_raw_content_length(self): + r = parse_request('GET:/:r') + assert len(r.headers) == 0 + + r = parse_request('GET:/:r:b"foobar"') + assert len(r.headers) == 0 + + r = parse_request('GET:/') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == (b"content-length", b"0") + + r = parse_request('GET:/:b"foobar"') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == (b"content-length", b"6") + + r = parse_request('GET:/:b"foobar":h"content-length"="42"') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == (b"content-length", b"42") + + r = parse_request('GET:/:r:b"foobar":h"content-length"="42"') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == (b"content-length", b"42") + + def test_content_type(self): + r = parse_request('GET:/:r:c"foobar"') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == (b"content-type", b"foobar") + + def test_user_agent(self): + r = parse_request('GET:/:r:ua') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == (b"user-agent", user_agents.get_by_shortcut('a')[2].encode()) + + def test_render_with_headers(self): + s = io.BytesIO() + r = parse_request('GET:/foo:h"foo"="bar"') + assert language.serve( + r, + s, + default_settings(), + ) + + def test_nested_response(self): + l = "get:/p/:s'200'" + r = parse_request(l) + assert len(r.tokens) == 3 + assert isinstance(r.tokens[2], http2.NestedResponse) + assert r.values(default_settings()) + + def test_render_with_body(self): + s = io.BytesIO() + r = parse_request("GET:'/foo':bfoobar") + assert language.serve( + r, + s, + default_settings(), + ) + + def test_spec(self): + def rt(s): + s = parse_request(s).spec() + assert parse_request(s).spec() == s + rt("get:/foo") + + +class TestResponse: + + def test_cached_values(self): + res = parse_response("200") + res_id = id(res) + assert res_id == id(res.resolve(default_settings())) + assert res.values(default_settings()) == res.values(default_settings()) + + def test_nonascii(self): + with pytest.raises(Exception, match="ASCII"): + parse_response("200:\xf0") + + def test_err(self): + with pytest.raises(language.ParseException): + parse_response('GET:/') + + def test_raw_content_length(self): + r = parse_response('200:r') + assert len(r.headers) == 0 + + r = parse_response('200') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == (b"content-length", b"0") + + def test_content_type(self): + r = parse_response('200:r:c"foobar"') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == (b"content-type", b"foobar") + + def test_simple(self): + r = parse_response('200:r:h"foo"="bar"') + assert r.status_code.string() == b"200" + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == (b"foo", b"bar") + assert r.body is None + + r = parse_response('200:r:h"foo"="bar":bfoobar:h"bla"="fasel"') + assert r.status_code.string() == b"200" + assert len(r.headers) == 2 + assert r.headers[0].values(default_settings()) == (b"foo", b"bar") + assert r.headers[1].values(default_settings()) == (b"bla", b"fasel") + assert r.body.string() == b"foobar" + + def test_render_simple(self): + s = io.BytesIO() + r = parse_response('200') + assert language.serve( + r, + s, + default_settings(), + ) + + def test_render_with_headers(self): + s = io.BytesIO() + r = parse_response('200:h"foo"="bar"') + assert language.serve( + r, + s, + default_settings(), + ) + + def test_render_with_body(self): + s = io.BytesIO() + r = parse_response('200:bfoobar') + assert language.serve( + r, + s, + default_settings(), + ) + + def test_spec(self): + def rt(s): + s = parse_response(s).spec() + assert parse_response(s).spec() == s + rt("200:bfoobar") diff --git a/test/pathod/language/test_message.py b/test/pathod/language/test_message.py new file mode 100644 index 00000000..777ab4dd --- /dev/null +++ b/test/pathod/language/test_message.py @@ -0,0 +1 @@ +# TODO: write tests diff --git a/test/pathod/language/test_websockets.py b/test/pathod/language/test_websockets.py new file mode 100644 index 00000000..ed766bca --- /dev/null +++ b/test/pathod/language/test_websockets.py @@ -0,0 +1,142 @@ +import pytest + +from pathod import language +from pathod.language import websockets +import mitmproxy.net.websockets + +from .. import tservers + + +def parse_request(s): + return next(language.parse_pathoc(s)) + + +class TestWebsocketFrame: + + def _test_messages(self, specs, message_klass): + for i in specs: + wf = parse_request(i) + assert isinstance(wf, message_klass) + assert wf + assert wf.values(language.Settings()) + assert wf.resolve(language.Settings()) + + spec = wf.spec() + wf2 = parse_request(spec) + assert wf2.spec() == spec + + def test_server_values(self): + specs = [ + "wf", + "wf:dr", + "wf:b'foo'", + "wf:mask:r'foo'", + "wf:l1024:b'foo'", + "wf:cbinary", + "wf:c1", + "wf:mask:knone", + "wf:fin", + "wf:fin:rsv1:rsv2:rsv3:mask", + "wf:-fin:-rsv1:-rsv2:-rsv3:-mask", + "wf:k@4", + "wf:x10", + ] + self._test_messages(specs, websockets.WebsocketFrame) + + def test_parse_websocket_frames(self): + wf = language.parse_websocket_frame("wf:x10") + assert len(list(wf)) == 10 + with pytest.raises(language.ParseException): + language.parse_websocket_frame("wf:x") + + def test_client_values(self): + specs = [ + "wf:f'wf'", + ] + self._test_messages(specs, websockets.WebsocketClientFrame) + + def test_nested_frame(self): + wf = parse_request("wf:f'wf'") + assert wf.nested_frame + + def test_flags(self): + wf = parse_request("wf:fin:mask:rsv1:rsv2:rsv3") + frm = mitmproxy.net.websockets.Frame.from_bytes(tservers.render(wf)) + assert frm.header.fin + assert frm.header.mask + assert frm.header.rsv1 + assert frm.header.rsv2 + assert frm.header.rsv3 + + wf = parse_request("wf:-fin:-mask:-rsv1:-rsv2:-rsv3") + frm = mitmproxy.net.websockets.Frame.from_bytes(tservers.render(wf)) + assert not frm.header.fin + assert not frm.header.mask + assert not frm.header.rsv1 + assert not frm.header.rsv2 + assert not frm.header.rsv3 + + def fr(self, spec, **kwargs): + settings = language.base.Settings(**kwargs) + wf = parse_request(spec) + return mitmproxy.net.websockets.Frame.from_bytes(tservers.render(wf, settings)) + + def test_construction(self): + assert self.fr("wf:c1").header.opcode == 1 + assert self.fr("wf:c0").header.opcode == 0 + assert self.fr("wf:cbinary").header.opcode ==\ + mitmproxy.net.websockets.OPCODE.BINARY + assert self.fr("wf:ctext").header.opcode ==\ + mitmproxy.net.websockets.OPCODE.TEXT + + def test_rawbody(self): + frm = self.fr("wf:mask:r'foo'") + assert len(frm.payload) == 3 + assert frm.payload != b"foo" + + assert self.fr("wf:r'foo'").payload == b"foo" + + def test_construction_2(self): + # Simple server frame + frm = self.fr("wf:b'foo'") + assert not frm.header.mask + assert not frm.header.masking_key + + # Simple client frame + frm = self.fr("wf:b'foo'", is_client=True) + assert frm.header.mask + assert frm.header.masking_key + frm = self.fr("wf:b'foo':k'abcd'", is_client=True) + assert frm.header.mask + assert frm.header.masking_key == b'abcd' + + # Server frame, mask explicitly set + frm = self.fr("wf:b'foo':mask") + assert frm.header.mask + assert frm.header.masking_key + frm = self.fr("wf:b'foo':k'abcd'") + assert frm.header.mask + assert frm.header.masking_key == b'abcd' + + # Client frame, mask explicitly unset + frm = self.fr("wf:b'foo':-mask", is_client=True) + assert not frm.header.mask + assert not frm.header.masking_key + + frm = self.fr("wf:b'foo':-mask:k'abcd'", is_client=True) + assert not frm.header.mask + # We're reading back a corrupted frame - the first 3 characters of the + # mask is mis-interpreted as the payload + assert frm.payload == b"abc" + + def test_knone(self): + with pytest.raises(Exception, match="Expected 4 bytes"): + self.fr("wf:b'foo':mask:knone") + + def test_length(self): + assert self.fr("wf:l3:b'foo'").header.payload_length == 3 + frm = self.fr("wf:l2:b'foo'") + assert frm.header.payload_length == 2 + assert frm.payload == b"fo" + with pytest.raises(Exception, match="Expected 1024 bytes"): + self.fr("wf:l1024:b'foo'") diff --git a/test/pathod/language/test_writer.py b/test/pathod/language/test_writer.py new file mode 100644 index 00000000..7feb985d --- /dev/null +++ b/test/pathod/language/test_writer.py @@ -0,0 +1,90 @@ +import io +from pathod import language +from pathod.language import writer + + +def test_send_chunk(): + v = b"foobarfoobar" + for bs in range(1, len(v) + 2): + s = io.BytesIO() + writer.send_chunk(s, v, bs, 0, len(v)) + assert s.getvalue() == v + for start in range(len(v)): + for end in range(len(v)): + s = io.BytesIO() + writer.send_chunk(s, v, bs, start, end) + assert s.getvalue() == v[start:end] + + +def test_write_values_inject(): + tst = b"foo" + + s = io.BytesIO() + writer.write_values(s, [tst], [(0, "inject", b"aaa")], blocksize=5) + assert s.getvalue() == b"aaafoo" + + s = io.BytesIO() + writer.write_values(s, [tst], [(1, "inject", b"aaa")], blocksize=5) + assert s.getvalue() == b"faaaoo" + + s = io.BytesIO() + writer.write_values(s, [tst], [(1, "inject", b"aaa")], blocksize=5) + assert s.getvalue() == b"faaaoo" + + +def test_write_values_disconnects(): + s = io.BytesIO() + tst = b"foo" * 100 + writer.write_values(s, [tst], [(0, "disconnect")], blocksize=5) + assert not s.getvalue() + + +def test_write_values(): + tst = b"foobarvoing" + s = io.BytesIO() + writer.write_values(s, [tst], []) + assert s.getvalue() == tst + + for bs in range(1, len(tst) + 2): + for off in range(len(tst)): + s = io.BytesIO() + writer.write_values( + s, [tst], [(off, "disconnect")], blocksize=bs + ) + assert s.getvalue() == tst[:off] + + +def test_write_values_pauses(): + tst = "".join(str(i) for i in range(10)).encode() + for i in range(2, 10): + s = io.BytesIO() + writer.write_values( + s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i + ) + assert s.getvalue() == tst + + for i in range(2, 10): + s = io.BytesIO() + writer.write_values(s, [tst], [(1, "pause", 0)], blocksize=i) + assert s.getvalue() == tst + + tst = [tst] * 5 + for i in range(2, 10): + s = io.BytesIO() + writer.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i) + assert s.getvalue() == b"".join(tst) + + +def test_write_values_after(): + s = io.BytesIO() + r = next(language.parse_pathod("400:da")) + language.serve(r, s, {}) + + s = io.BytesIO() + r = next(language.parse_pathod("400:pa,0")) + language.serve(r, s, {}) + + s = io.BytesIO() + r = next(language.parse_pathod("400:ia,'xx'")) + language.serve(r, s, {}) + assert s.getvalue().endswith(b'xx') |