diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/pathod/test_language_base.py | 13 | ||||
-rw-r--r-- | test/pathod/test_language_http.py | 146 | ||||
-rw-r--r-- | test/pathod/test_language_websocket.py | 14 | ||||
-rw-r--r-- | test/pathod/test_pathoc.py | 57 | ||||
-rw-r--r-- | test/pathod/test_pathod.py | 28 | ||||
-rw-r--r-- | test/pathod/tutils.py | 3 |
6 files changed, 120 insertions, 141 deletions
diff --git a/test/pathod/test_language_base.py b/test/pathod/test_language_base.py index 075dc2b8..7c7d8cf9 100644 --- a/test/pathod/test_language_base.py +++ b/test/pathod/test_language_base.py @@ -55,8 +55,15 @@ class TestTokValueLiteral: v = base.TokValueLiteral("f\x00oo") assert v.spec() == repr(v) == r"'f\x00oo'" - v = base.TokValueLiteral("\"") - assert v.spec() == repr(v) == '\'"\'' + v = base.TokValueLiteral('"') + assert v.spec() == repr(v) == """ '"' """.strip() + + # While pyparsing has a escChar argument for QuotedString, + # escChar only performs scapes single-character escapes and does not work for e.g. r"\x02". + # Thus, we cannot use that option, which means we cannot have single quotes in strings. + # To fix this, we represent single quotes as r"\x07". + v = base.TokValueLiteral("'") + assert v.spec() == r"'\x27'" def roundtrip(self, spec): e = base.TokValueLiteral.expr() @@ -311,7 +318,7 @@ def test_options_or_value(): def test_integer(): e = base.Integer.expr() v = e.parseString("200")[0] - assert v.string() == "200" + assert v.string() == b"200" assert v.spec() == "200" assert v.freeze({}).value == v.value diff --git a/test/pathod/test_language_http.py b/test/pathod/test_language_http.py index d1870a63..18059e3a 100644 --- a/test/pathod/test_language_http.py +++ b/test/pathod/test_language_http.py @@ -1,15 +1,15 @@ -from six.moves import cStringIO as StringIO +from six import BytesIO from pathod import language from pathod.language import http, base import tutils def parse_request(s): - return language.parse_pathoc(s).next() + return next(language.parse_pathoc(s)) def test_make_error_response(): - d = StringIO() + d = BytesIO() s = http.make_error_response("foo") language.serve(s, d, {}) @@ -24,17 +24,17 @@ class TestRequest: def test_simple(self): r = parse_request('GET:"/foo"') - assert r.method.string() == "GET" - assert r.path.string() == "/foo" + assert r.method.string() == b"GET" + assert r.path.string() == b"/foo" r = parse_request('GET:/foo') - assert r.path.string() == "/foo" + assert r.path.string() == b"/foo" r = parse_request('GET:@1k') assert len(r.path.string()) == 1024 def test_multiple(self): r = list(language.parse_pathoc("GET:/ PUT:/")) - assert r[0].method.string() == "GET" - assert r[1].method.string() == "PUT" + assert r[0].method.string() == b"GET" + assert r[1].method.string() == b"PUT" assert len(r) == 2 l = """ @@ -54,8 +54,8 @@ class TestRequest: """ r = list(language.parse_pathoc(l)) assert len(r) == 2 - assert r[0].method.string() == "GET" - assert r[1].method.string() == "PUT" + assert r[0].method.string() == b"GET" + assert r[1].method.string() == b"PUT" l = """ get:"http://localhost:9999/p/200":ir,@1 @@ -63,8 +63,8 @@ class TestRequest: """ r = list(language.parse_pathoc(l)) assert len(r) == 2 - assert r[0].method.string() == "GET" - assert r[1].method.string() == "GET" + assert r[0].method.string() == b"GET" + assert r[1].method.string() == b"GET" def test_nested_response(self): l = "get:/p:s'200'" @@ -75,7 +75,7 @@ class TestRequest: assert r[0].values({}) def test_render(self): - s = StringIO() + s = BytesIO() r = parse_request("GET:'/foo'") assert language.serve( r, @@ -90,8 +90,8 @@ class TestRequest: ir,@1 """ r = parse_request(l) - assert r.method.string() == "GET" - assert r.path.string() == "/foo" + assert r.method.string() == b"GET" + assert r.path.string() == b"/foo" assert r.actions l = """ @@ -106,8 +106,8 @@ class TestRequest: ir,@1 """ r = parse_request(l) - assert r.method.string() == "GET" - assert r.path.string().endswith("bar") + assert r.method.string() == b"GET" + assert r.path.string().endswith(b"bar") assert r.actions def test_spec(self): @@ -128,66 +128,66 @@ class TestRequest: def test_websocket(self): r = parse_request('ws:/path/') res = r.resolve(language.Settings()) - assert res.method.string().lower() == "get" - assert res.tok(http.Path).value.val == "/path/" - assert res.tok(http.Method).value.val.lower() == "get" - assert http.get_header("Upgrade", res.headers).value.val == "websocket" + assert res.method.string().lower() == b"get" + assert res.tok(http.Path).value.val == b"/path/" + assert res.tok(http.Method).value.val.lower() == b"get" + assert http.get_header(b"Upgrade", res.headers).value.val == b"websocket" r = parse_request('ws:put:/path/') res = r.resolve(language.Settings()) - assert r.method.string().lower() == "put" - assert res.tok(http.Path).value.val == "/path/" - assert res.tok(http.Method).value.val.lower() == "put" - assert http.get_header("Upgrade", res.headers).value.val == "websocket" + assert r.method.string().lower() == b"put" + assert res.tok(http.Path).value.val == b"/path/" + assert res.tok(http.Method).value.val.lower() == b"put" + assert http.get_header(b"Upgrade", res.headers).value.val == b"websocket" class TestResponse: def dummy_response(self): - return language.parse_pathod("400'msg'").next() + return next(language.parse_pathod("400'msg'")) def test_response(self): - r = language.parse_pathod("400:m'msg'").next() - assert r.status_code.string() == "400" - assert r.reason.string() == "msg" + r = next(language.parse_pathod("400:m'msg'")) + assert r.status_code.string() == b"400" + assert r.reason.string() == b"msg" - r = language.parse_pathod("400:m'msg':b@100b").next() - assert r.reason.string() == "msg" + r = next(language.parse_pathod("400:m'msg':b@100b")) + assert r.reason.string() == b"msg" assert r.body.values({}) assert str(r) - r = language.parse_pathod("200").next() - assert r.status_code.string() == "200" + r = next(language.parse_pathod("200")) + assert r.status_code.string() == b"200" assert not r.reason - assert "OK" in [i[:] for i in r.preamble({})] + assert b"OK" in [i[:] for i in r.preamble({})] def test_render(self): - s = StringIO() - r = language.parse_pathod("400:m'msg'").next() + s = BytesIO() + r = next(language.parse_pathod("400:m'msg'")) assert language.serve(r, s, {}) - r = language.parse_pathod("400:p0,100:dr").next() + r = next(language.parse_pathod("400:p0,100:dr")) assert "p0" in r.spec() s = r.preview_safe() assert "p0" not in s.spec() def test_raw(self): - s = StringIO() - r = language.parse_pathod("400:b'foo'").next() + s = BytesIO() + r = next(language.parse_pathod("400:b'foo'")) language.serve(r, s, {}) v = s.getvalue() - assert "Content-Length" in v + assert b"Content-Length" in v - s = StringIO() - r = language.parse_pathod("400:b'foo':r").next() + s = BytesIO() + r = next(language.parse_pathod("400:b'foo':r")) language.serve(r, s, {}) v = s.getvalue() - assert "Content-Length" not in v + assert b"Content-Length" not in v def test_length(self): def testlen(x): - s = StringIO() - x = x.next() + s = BytesIO() + x = next(x) language.serve(x, s, language.Settings()) assert x.length(language.Settings()) == len(s.getvalue()) testlen(language.parse_pathod("400:m'msg':r")) @@ -196,8 +196,8 @@ class TestResponse: def test_maximum_length(self): def testlen(x): - x = x.next() - s = StringIO() + x = next(x) + s = BytesIO() m = x.maximum_length({}) language.serve(x, s, {}) assert m >= len(s.getvalue()) @@ -225,19 +225,19 @@ class TestResponse: tutils.raises("ascii", language.parse_pathod, "foo:b\xf0") def test_parse_header(self): - r = language.parse_pathod('400:h"foo"="bar"').next() - assert http.get_header("foo", r.headers) + r = next(language.parse_pathod('400:h"foo"="bar"')) + assert http.get_header(b"foo", r.headers) def test_parse_pause_before(self): - r = language.parse_pathod("400:p0,10").next() + r = next(language.parse_pathod("400:p0,10")) assert r.actions[0].spec() == "p0,10" def test_parse_pause_after(self): - r = language.parse_pathod("400:pa,10").next() + r = next(language.parse_pathod("400:pa,10")) assert r.actions[0].spec() == "pa,10" def test_parse_pause_random(self): - r = language.parse_pathod("400:pr,10").next() + r = next(language.parse_pathod("400:pr,10")) assert r.actions[0].spec() == "pr,10" def test_parse_stress(self): @@ -245,29 +245,29 @@ class TestResponse: # returns an int and a python 2.7 int on windows has 32bit precision. # Therefore, we should keep the body length < 2147483647 bytes in our # tests. - r = language.parse_pathod("400:b@1g").next() + r = next(language.parse_pathod("400:b@1g")) assert r.length({}) def test_spec(self): def rt(s): - s = language.parse_pathod(s).next().spec() - assert language.parse_pathod(s).next().spec() == s + s = next(language.parse_pathod(s)).spec() + assert next(language.parse_pathod(s)).spec() == s rt("400:b@100g") rt("400") rt("400:da") def test_websockets(self): - r = language.parse_pathod("ws").next() + r = next(language.parse_pathod("ws")) tutils.raises("no websocket key", r.resolve, language.Settings()) - res = r.resolve(language.Settings(websocket_key="foo")) - assert res.status_code.string() == "101" + res = r.resolve(language.Settings(websocket_key=b"foo")) + assert res.status_code.string() == b"101" def test_ctype_shortcut(): e = http.ShortcutContentType.expr() v = e.parseString("c'foo'")[0] - assert v.key.val == "Content-Type" - assert v.value.val == "foo" + assert v.key.val == b"Content-Type" + assert v.value.val == b"foo" s = v.spec() assert s == e.parseString(s)[0].spec() @@ -282,8 +282,8 @@ def test_ctype_shortcut(): def test_location_shortcut(): e = http.ShortcutLocation.expr() v = e.parseString("l'foo'")[0] - assert v.key.val == "Location" - assert v.value.val == "foo" + assert v.key.val == b"Location" + assert v.value.val == b"foo" s = v.spec() assert s == e.parseString(s)[0].spec() @@ -296,23 +296,23 @@ def test_location_shortcut(): def test_shortcuts(): - assert language.parse_pathod( - "400:c'foo'").next().headers[0].key.val == "Content-Type" - assert language.parse_pathod( - "400:l'foo'").next().headers[0].key.val == "Location" + assert next(language.parse_pathod( + "400:c'foo'")).headers[0].key.val == b"Content-Type" + assert next(language.parse_pathod( + "400:l'foo'")).headers[0].key.val == b"Location" - assert "Android" in tutils.render(parse_request("get:/:ua")) - assert "User-Agent" in tutils.render(parse_request("get:/:ua")) + assert b"Android" in tutils.render(parse_request("get:/:ua")) + assert b"User-Agent" in tutils.render(parse_request("get:/:ua")) def test_user_agent(): e = http.ShortcutUserAgent.expr() v = e.parseString("ua")[0] - assert "Android" in v.string() + assert b"Android" in v.string() e = http.ShortcutUserAgent.expr() v = e.parseString("u'a'")[0] - assert "Android" not in v.string() + assert b"Android" not in v.string() v = e.parseString("u@100'")[0] assert len(str(v.freeze({}).value)) > 100 @@ -324,7 +324,7 @@ def test_user_agent(): def test_nested_response(): e = http.NestedResponse.expr() v = e.parseString("s'200'")[0] - assert v.value.val == "200" + assert v.value.val == b"200" tutils.raises( language.ParseException, e.parseString, @@ -340,9 +340,7 @@ def test_nested_response(): def test_nested_response_freeze(): e = http.NestedResponse( base.TokValueLiteral( - "200:b'foo':i10,'\\x27'".encode( - "string_escape" - ) + r"200:b\'foo\':i10,\'\\x27\'" ) ) assert e.freeze({}) diff --git a/test/pathod/test_language_websocket.py b/test/pathod/test_language_websocket.py index 29155dba..58297141 100644 --- a/test/pathod/test_language_websocket.py +++ b/test/pathod/test_language_websocket.py @@ -6,7 +6,7 @@ import tutils def parse_request(s): - return language.parse_pathoc(s).next() + return next(language.parse_pathoc(s)) class TestWebsocketFrame: @@ -93,9 +93,9 @@ class TestWebsocketFrame: def test_rawbody(self): frm = self.fr("wf:mask:r'foo'") assert len(frm.payload) == 3 - assert frm.payload != "foo" + assert frm.payload != b"foo" - assert self.fr("wf:r'foo'").payload == "foo" + assert self.fr("wf:r'foo'").payload == b"foo" def test_construction_2(self): # Simple server frame @@ -109,7 +109,7 @@ class TestWebsocketFrame: assert frm.header.masking_key frm = self.fr("wf:b'foo':k'abcd'", is_client=True) assert frm.header.mask - assert frm.header.masking_key == 'abcd' + assert frm.header.masking_key == b'abcd' # Server frame, mask explicitly set frm = self.fr("wf:b'foo':mask") @@ -117,7 +117,7 @@ class TestWebsocketFrame: assert frm.header.masking_key frm = self.fr("wf:b'foo':k'abcd'") assert frm.header.mask - assert frm.header.masking_key == 'abcd' + assert frm.header.masking_key == b'abcd' # Client frame, mask explicitly unset frm = self.fr("wf:b'foo':-mask", is_client=True) @@ -128,7 +128,7 @@ class TestWebsocketFrame: assert not frm.header.mask # We're reading back a corrupted frame - the first 3 characters of the # mask is mis-interpreted as the payload - assert frm.payload == "abc" + assert frm.payload == b"abc" def test_knone(self): with tutils.raises("expected 4 bytes"): @@ -138,5 +138,5 @@ class TestWebsocketFrame: assert self.fr("wf:l3:b'foo'").header.payload_length == 3 frm = self.fr("wf:l2:b'foo'") assert frm.header.payload_length == 2 - assert frm.payload == "fo" + assert frm.payload == b"fo" tutils.raises("expected 1024 bytes", self.fr, "wf:l1024:b'foo'") diff --git a/test/pathod/test_pathoc.py b/test/pathod/test_pathoc.py index 8b69a2a6..dc5011c0 100644 --- a/test/pathod/test_pathoc.py +++ b/test/pathod/test_pathoc.py @@ -17,43 +17,26 @@ def test_response(): class PathocTestDaemon(tutils.DaemonTests): - def tval( - self, - requests, - showreq=False, - showresp=False, - explain=False, - showssl=False, - hexdump=False, - timeout=None, - ignorecodes=(), - ignoretimeout=None, - showsummary=True - ): + def tval(self, requests, timeout=None, showssl=False, **kwargs): s = StringIO() c = pathoc.Pathoc( ("127.0.0.1", self.d.port), ssl=self.ssl, - showreq=showreq, - showresp=showresp, - explain=explain, - hexdump=hexdump, - ignorecodes=ignorecodes, - ignoretimeout=ignoretimeout, - showsummary=showsummary, - fp=s + fp=s, + **kwargs ) with c.connect(showssl=showssl, fp=s): if timeout: c.settimeout(timeout) for i in requests: r = language.parse_pathoc(i).next() - if explain: + if kwargs.get("explain"): r = r.freeze(language.Settings()) try: c.request(r) except NetlibException: pass + self.d.wait_for_silence() return s.getvalue() @@ -66,14 +49,10 @@ class TestDaemonSSL(PathocTestDaemon): ) def test_sni(self): - c = pathoc.Pathoc( - ("127.0.0.1", self.d.port), - ssl=True, - sni="foobar.com", - fp=None + self.tval( + ["get:/p/200"], + sni="foobar.com" ) - with c.connect(): - c.request("get:/p/200") log = self.d.log() assert log[0]["request"]["sni"] == "foobar.com" @@ -81,15 +60,10 @@ class TestDaemonSSL(PathocTestDaemon): assert "certificate chain" in self.tval(["get:/p/200"], showssl=True) def test_clientcert(self): - c = pathoc.Pathoc( - ("127.0.0.1", self.d.port), - ssl=True, + self.tval( + ["get:/p/200"], clientcert=tutils.test_data.path("data/clientcert/client.pem"), - fp=None ) - with c.connect(): - c.request("get:/p/200") - log = self.d.log() assert log[0]["request"]["clientcert"]["keyinfo"] @@ -170,9 +144,7 @@ class TestDaemon(PathocTestDaemon): assert "Invalid server response" in self.tval(["get:'/p/200:d2'"]) def test_websocket_shutdown(self): - c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None) - with c.connect(): - c.request("ws:/") + self.tval(["ws:/"]) def test_wait_finish(self): c = pathoc.Pathoc( @@ -182,9 +154,10 @@ class TestDaemon(PathocTestDaemon): ) with c.connect(): c.request("ws:/") - c.request("wf:f'wf:x100'") - [i for i in c.wait(timeout=0, finish=False)] - [i for i in c.wait(timeout=0)] + c.request("wf:f'wf'") + # This should read a frame and close the websocket reader + assert len([i for i in c.wait(timeout=5, finish=False)]) == 1 + assert not [i for i in c.wait(timeout=0)] def test_connect_fail(self): to = ("foobar", 80) diff --git a/test/pathod/test_pathod.py b/test/pathod/test_pathod.py index 0bbad6c2..ed4ef49f 100644 --- a/test/pathod/test_pathod.py +++ b/test/pathod/test_pathod.py @@ -145,14 +145,14 @@ class CommonTests(tutils.DaemonTests): def test_invalid_first_line(self): c = tcp.TCPClient(("localhost", self.d.port)) - c.connect() - if self.ssl: - c.convert_to_ssl() - c.wfile.write("foo\n\n\n") - c.wfile.flush() - l = self.d.last_log() - assert l["type"] == "error" - assert "foo" in l["msg"] + with c.connect(): + if self.ssl: + c.convert_to_ssl() + c.wfile.write("foo\n\n\n") + c.wfile.flush() + l = self.d.last_log() + assert l["type"] == "error" + assert "foo" in l["msg"] def test_invalid_content_length(self): tutils.raises( @@ -238,12 +238,12 @@ class TestDaemonSSL(CommonTests): c = tcp.TCPClient(("localhost", self.d.port)) c.rbufsize = 0 c.wbufsize = 0 - c.connect() - c.wfile.write("\0\0\0\0") - tutils.raises(TlsException, c.convert_to_ssl) - l = self.d.last_log() - assert l["type"] == "error" - assert "SSL" in l["msg"] + with c.connect(): + c.wfile.write("\0\0\0\0") + tutils.raises(TlsException, c.convert_to_ssl) + l = self.d.last_log() + assert l["type"] == "error" + assert "SSL" in l["msg"] def test_ssl_cipher(self): r, _ = self.pathoc([r"get:/p/202"]) diff --git a/test/pathod/tutils.py b/test/pathod/tutils.py index 56cd2002..bf5e3165 100644 --- a/test/pathod/tutils.py +++ b/test/pathod/tutils.py @@ -3,6 +3,7 @@ import re import shutil import requests from six.moves import cStringIO as StringIO +from six import BytesIO import urllib from netlib import tcp @@ -147,6 +148,6 @@ test_data = utils.Data(__name__) def render(r, settings=language.Settings()): r = r.resolve(settings) - s = StringIO() + s = BytesIO() assert language.serve(r, s, settings) return s.getvalue() |