diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/http/http1/test_assemble.py | 6 | ||||
-rw-r--r-- | test/http/http1/test_read.py | 22 | ||||
-rw-r--r-- | test/http/test_authentication.py | 12 | ||||
-rw-r--r-- | test/http/test_headers.py | 149 | ||||
-rw-r--r-- | test/http/test_models.py | 152 | ||||
-rw-r--r-- | test/test_utils.py | 20 | ||||
-rw-r--r-- | test/websockets/test_websockets.py | 13 |
7 files changed, 186 insertions, 188 deletions
diff --git a/test/http/http1/test_assemble.py b/test/http/http1/test_assemble.py index 2d250909..963e7549 100644 --- a/test/http/http1/test_assemble.py +++ b/test/http/http1/test_assemble.py @@ -77,16 +77,16 @@ def test_assemble_request_line(): def test_assemble_request_headers(): # https://github.com/mitmproxy/mitmproxy/issues/186 r = treq(body=b"") - r.headers[b"Transfer-Encoding"] = b"chunked" + r.headers["Transfer-Encoding"] = "chunked" c = _assemble_request_headers(r) assert b"Transfer-Encoding" in c - assert b"Host" in _assemble_request_headers(treq(headers=Headers())) + assert b"host" in _assemble_request_headers(treq(headers=Headers())) def test_assemble_response_headers(): # https://github.com/mitmproxy/mitmproxy/issues/186 r = tresp(body=b"") - r.headers["Transfer-Encoding"] = b"chunked" + r.headers["Transfer-Encoding"] = "chunked" c = _assemble_response_headers(r) assert b"Transfer-Encoding" in c diff --git a/test/http/http1/test_read.py b/test/http/http1/test_read.py index 55def2a5..9eb02a24 100644 --- a/test/http/http1/test_read.py +++ b/test/http/http1/test_read.py @@ -1,9 +1,7 @@ from __future__ import absolute_import, print_function, division from io import BytesIO import textwrap - from mock import Mock - from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect from netlib.http import Headers from netlib.http.http1.read import ( @@ -35,7 +33,7 @@ def test_read_request_head(): rfile.first_byte_timestamp = 42 r = read_request_head(rfile) assert r.method == b"GET" - assert r.headers["Content-Length"] == b"4" + assert r.headers["Content-Length"] == "4" assert r.body is None assert rfile.reset_timestamps.called assert r.timestamp_start == 42 @@ -62,7 +60,7 @@ def test_read_response_head(): rfile.first_byte_timestamp = 42 r = read_response_head(rfile) assert r.status_code == 418 - assert r.headers["Content-Length"] == b"4" + assert r.headers["Content-Length"] == "4" assert r.body is None assert rfile.reset_timestamps.called assert r.timestamp_start == 42 @@ -76,14 +74,12 @@ class TestReadBody(object): assert body == b"foo" assert rfile.read() == b"bar" - def test_known_size(self): rfile = BytesIO(b"foobar") body = b"".join(read_body(rfile, 3)) assert body == b"foo" assert rfile.read() == b"bar" - def test_known_size_limit(self): rfile = BytesIO(b"foobar") with raises(HttpException): @@ -99,7 +95,6 @@ class TestReadBody(object): body = b"".join(read_body(rfile, -1)) assert body == b"foobar" - def test_unknown_size_limit(self): rfile = BytesIO(b"foobar") with raises(HttpException): @@ -121,13 +116,13 @@ def test_connection_close(): def test_expected_http_body_size(): # Expect: 100-continue assert expected_http_body_size( - treq(headers=Headers(expect=b"100-continue", content_length=b"42")) + treq(headers=Headers(expect="100-continue", content_length="42")) ) == 0 # http://tools.ietf.org/html/rfc7230#section-3.3 assert expected_http_body_size( treq(method=b"HEAD"), - tresp(headers=Headers(content_length=b"42")) + tresp(headers=Headers(content_length="42")) ) == 0 assert expected_http_body_size( treq(method=b"CONNECT"), @@ -141,17 +136,17 @@ def test_expected_http_body_size(): # chunked assert expected_http_body_size( - treq(headers=Headers(transfer_encoding=b"chunked")), + treq(headers=Headers(transfer_encoding="chunked")), ) is None # explicit length - for l in (b"foo", b"-7"): + for val in (b"foo", b"-7"): with raises(HttpSyntaxException): expected_http_body_size( - treq(headers=Headers(content_length=l)) + treq(headers=Headers(content_length=val)) ) assert expected_http_body_size( - treq(headers=Headers(content_length=b"42")) + treq(headers=Headers(content_length="42")) ) == 42 # no length @@ -286,6 +281,7 @@ class TestReadHeaders(object): with raises(HttpSyntaxException): self._read(data) + def test_read_chunked(): req = treq(body=None) req.headers["Transfer-Encoding"] = "chunked" diff --git a/test/http/test_authentication.py b/test/http/test_authentication.py index a2aa774a..1df7cd9c 100644 --- a/test/http/test_authentication.py +++ b/test/http/test_authentication.py @@ -5,13 +5,13 @@ from netlib.http import authentication, Headers def test_parse_http_basic_auth(): - vals = (b"basic", b"foo", b"bar") + vals = ("basic", "foo", "bar") assert authentication.parse_http_basic_auth( authentication.assemble_http_basic_auth(*vals) ) == vals assert not authentication.parse_http_basic_auth("") assert not authentication.parse_http_basic_auth("foo bar") - v = b"basic " + binascii.b2a_base64(b"foo") + v = "basic " + binascii.b2a_base64(b"foo").decode("ascii") assert not authentication.parse_http_basic_auth(v) @@ -34,7 +34,7 @@ class TestPassManHtpasswd: def test_simple(self): pm = authentication.PassManHtpasswd(tutils.test_data.path("data/htpasswd")) - vals = (b"basic", b"test", b"test") + vals = ("basic", "test", "test") authentication.assemble_http_basic_auth(*vals) assert pm.test("test", "test") assert not pm.test("test", "foo") @@ -73,7 +73,7 @@ class TestBasicProxyAuth: ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test") headers = Headers() - vals = (b"basic", b"foo", b"bar") + vals = ("basic", "foo", "bar") headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals) assert ba.authenticate(headers) @@ -86,12 +86,12 @@ class TestBasicProxyAuth: headers[ba.AUTH_HEADER] = "foo" assert not ba.authenticate(headers) - vals = (b"foo", b"foo", b"bar") + vals = ("foo", "foo", "bar") headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals) assert not ba.authenticate(headers) ba = authentication.BasicProxyAuth(authentication.PassMan(), "test") - vals = (b"basic", b"foo", b"bar") + vals = ("basic", "foo", "bar") headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals) assert not ba.authenticate(headers) diff --git a/test/http/test_headers.py b/test/http/test_headers.py new file mode 100644 index 00000000..f1af1feb --- /dev/null +++ b/test/http/test_headers.py @@ -0,0 +1,149 @@ +from netlib.http import Headers +from netlib.tutils import raises + + +class TestHeaders(object): + def _2host(self): + return Headers( + [ + [b"Host", b"example.com"], + [b"host", b"example.org"] + ] + ) + + def test_init(self): + headers = Headers() + assert len(headers) == 0 + + headers = Headers([[b"Host", b"example.com"]]) + assert len(headers) == 1 + assert headers["Host"] == "example.com" + + headers = Headers(Host="example.com") + assert len(headers) == 1 + assert headers["Host"] == "example.com" + + headers = Headers( + [[b"Host", b"invalid"]], + Host="example.com" + ) + assert len(headers) == 1 + assert headers["Host"] == "example.com" + + headers = Headers( + [[b"Host", b"invalid"], [b"Accept", b"text/plain"]], + Host="example.com" + ) + assert len(headers) == 2 + assert headers["Host"] == "example.com" + assert headers["Accept"] == "text/plain" + + def test_getitem(self): + headers = Headers(Host="example.com") + assert headers["Host"] == "example.com" + assert headers["host"] == "example.com" + with raises(KeyError): + _ = headers["Accept"] + + headers = self._2host() + assert headers["Host"] == "example.com, example.org" + + def test_str(self): + headers = Headers(Host="example.com") + assert bytes(headers) == b"Host: example.com\r\n" + + headers = Headers([ + [b"Host", b"example.com"], + [b"Accept", b"text/plain"] + ]) + assert bytes(headers) == b"Host: example.com\r\nAccept: text/plain\r\n" + + headers = Headers() + assert bytes(headers) == b"" + + def test_setitem(self): + headers = Headers() + headers["Host"] = "example.com" + assert "Host" in headers + assert "host" in headers + assert headers["Host"] == "example.com" + + headers["host"] = "example.org" + assert "Host" in headers + assert "host" in headers + assert headers["Host"] == "example.org" + + headers["accept"] = "text/plain" + assert len(headers) == 2 + assert "Accept" in headers + assert "Host" in headers + + headers = self._2host() + assert len(headers.fields) == 2 + headers["Host"] = "example.com" + assert len(headers.fields) == 1 + assert "Host" in headers + + def test_delitem(self): + headers = Headers(Host="example.com") + assert len(headers) == 1 + del headers["host"] + assert len(headers) == 0 + try: + del headers["host"] + except KeyError: + assert True + else: + assert False + + headers = self._2host() + del headers["Host"] + assert len(headers) == 0 + + def test_keys(self): + headers = Headers(Host="example.com") + assert list(headers.keys()) == ["Host"] + + headers = self._2host() + assert list(headers.keys()) == ["Host"] + + def test_eq_ne(self): + headers1 = Headers(Host="example.com") + headers2 = Headers(host="example.com") + assert not (headers1 == headers2) + assert headers1 != headers2 + + headers1 = Headers(Host="example.com") + headers2 = Headers(Host="example.com") + assert headers1 == headers2 + assert not (headers1 != headers2) + + assert headers1 != 42 + + def test_get_all(self): + headers = self._2host() + assert headers.get_all("host") == ["example.com", "example.org"] + assert headers.get_all("accept") == [] + + def test_set_all(self): + headers = Headers(Host="example.com") + headers.set_all("Accept", ["text/plain"]) + assert len(headers) == 2 + assert "accept" in headers + + headers = self._2host() + headers.set_all("Host", ["example.org"]) + assert headers["host"] == "example.org" + + headers.set_all("Host", ["example.org", "example.net"]) + assert headers["host"] == "example.org, example.net" + + def test_state(self): + headers = self._2host() + assert len(headers.get_state()) == 2 + assert headers == Headers.from_state(headers.get_state()) + + headers2 = Headers() + assert headers != headers2 + headers2.load_state(headers.get_state()) + assert headers == headers2 diff --git a/test/http/test_models.py b/test/http/test_models.py index d420b22b..10e0795a 100644 --- a/test/http/test_models.py +++ b/test/http/test_models.py @@ -58,20 +58,20 @@ class TestRequest(object): req = tutils.treq() req.headers["Accept-Encoding"] = "foobar" req.anticomp() - assert req.headers["Accept-Encoding"] == b"identity" + assert req.headers["Accept-Encoding"] == "identity" def test_constrain_encoding(self): req = tutils.treq() req.headers["Accept-Encoding"] = "identity, gzip, foo" req.constrain_encoding() - assert b"foo" not in req.headers["Accept-Encoding"] + assert "foo" not in req.headers["Accept-Encoding"] def test_update_host(self): req = tutils.treq() req.headers["Host"] = "" req.host = "foobar" req.update_host_header() - assert req.headers["Host"] == b"foobar" + assert req.headers["Host"] == "foobar" def test_get_form(self): req = tutils.treq() @@ -393,149 +393,3 @@ class TestResponse(object): v = resp.get_cookies() assert len(v) == 1 assert v["foo"] == [["bar", ODictCaseless()]] - - -class TestHeaders(object): - def _2host(self): - return Headers( - [ - [b"Host", b"example.com"], - [b"host", b"example.org"] - ] - ) - - def test_init(self): - headers = Headers() - assert len(headers) == 0 - - headers = Headers([[b"Host", b"example.com"]]) - assert len(headers) == 1 - assert headers["Host"] == b"example.com" - - headers = Headers(Host="example.com") - assert len(headers) == 1 - assert headers["Host"] == b"example.com" - - headers = Headers( - [[b"Host", b"invalid"]], - Host="example.com" - ) - assert len(headers) == 1 - assert headers["Host"] == b"example.com" - - headers = Headers( - [[b"Host", b"invalid"], [b"Accept", b"text/plain"]], - Host="example.com" - ) - assert len(headers) == 2 - assert headers["Host"] == b"example.com" - assert headers["Accept"] == b"text/plain" - - def test_getitem(self): - headers = Headers(Host="example.com") - assert headers["Host"] == b"example.com" - assert headers["host"] == b"example.com" - tutils.raises(KeyError, headers.__getitem__, "Accept") - - headers = self._2host() - assert headers["Host"] == b"example.com, example.org" - - def test_str(self): - headers = Headers(Host="example.com") - assert bytes(headers) == b"Host: example.com\r\n" - - headers = Headers([ - [b"Host", b"example.com"], - [b"Accept", b"text/plain"] - ]) - assert bytes(headers) == b"Host: example.com\r\nAccept: text/plain\r\n" - - headers = Headers() - assert bytes(headers) == b"" - - def test_setitem(self): - headers = Headers() - headers["Host"] = "example.com" - assert "Host" in headers - assert "host" in headers - assert headers["Host"] == b"example.com" - - headers["host"] = "example.org" - assert "Host" in headers - assert "host" in headers - assert headers["Host"] == b"example.org" - - headers["accept"] = "text/plain" - assert len(headers) == 2 - assert "Accept" in headers - assert "Host" in headers - - headers = self._2host() - assert len(headers.fields) == 2 - headers["Host"] = "example.com" - assert len(headers.fields) == 1 - assert "Host" in headers - - def test_delitem(self): - headers = Headers(Host="example.com") - assert len(headers) == 1 - del headers["host"] - assert len(headers) == 0 - try: - del headers["host"] - except KeyError: - assert True - else: - assert False - - headers = self._2host() - del headers["Host"] - assert len(headers) == 0 - - def test_keys(self): - headers = Headers(Host="example.com") - assert list(headers.keys()) == [b"Host"] - - headers = self._2host() - assert list(headers.keys()) == [b"Host"] - - def test_eq_ne(self): - headers1 = Headers(Host="example.com") - headers2 = Headers(host="example.com") - assert not (headers1 == headers2) - assert headers1 != headers2 - - headers1 = Headers(Host="example.com") - headers2 = Headers(Host="example.com") - assert headers1 == headers2 - assert not (headers1 != headers2) - - assert headers1 != 42 - - def test_get_all(self): - headers = self._2host() - assert headers.get_all("host") == [b"example.com", b"example.org"] - assert headers.get_all("accept") == [] - - def test_set_all(self): - headers = Headers(Host="example.com") - headers.set_all("Accept", ["text/plain"]) - assert len(headers) == 2 - assert "accept" in headers - - headers = self._2host() - headers.set_all("Host", ["example.org"]) - assert headers["host"] == b"example.org" - - headers.set_all("Host", ["example.org", "example.net"]) - assert headers["host"] == b"example.org, example.net" - - def test_state(self): - headers = self._2host() - assert len(headers.get_state()) == 2 - assert headers == Headers.from_state(headers.get_state()) - - headers2 = Headers() - assert headers != headers2 - headers2.load_state(headers.get_state()) - assert headers == headers2 diff --git a/test/test_utils.py b/test/test_utils.py index 8f4b4059..17636cc4 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -103,17 +103,17 @@ def test_get_header_tokens(): headers = Headers() assert utils.get_header_tokens(headers, "foo") == [] headers["foo"] = "bar" - assert utils.get_header_tokens(headers, "foo") == [b"bar"] + assert utils.get_header_tokens(headers, "foo") == ["bar"] headers["foo"] = "bar, voing" - assert utils.get_header_tokens(headers, "foo") == [b"bar", b"voing"] + assert utils.get_header_tokens(headers, "foo") == ["bar", "voing"] headers.set_all("foo", ["bar, voing", "oink"]) - assert utils.get_header_tokens(headers, "foo") == [b"bar", b"voing", b"oink"] + assert utils.get_header_tokens(headers, "foo") == ["bar", "voing", "oink"] def test_multipartdecode(): - boundary = b'somefancyboundary' + boundary = 'somefancyboundary' headers = Headers( - content_type=b'multipart/form-data; boundary=' + boundary + content_type='multipart/form-data; boundary=' + boundary ) content = ( "--{0}\n" @@ -122,7 +122,7 @@ def test_multipartdecode(): "--{0}\n" "Content-Disposition: form-data; name=\"field2\"\n\n" "value2\n" - "--{0}--".format(boundary.decode()).encode() + "--{0}--".format(boundary).encode() ) form = utils.multipartdecode(headers, content) @@ -134,8 +134,8 @@ def test_multipartdecode(): def test_parse_content_type(): p = utils.parse_content_type - assert p(b"text/html") == (b"text", b"html", {}) - assert p(b"text") is None + assert p("text/html") == ("text", "html", {}) + assert p("text") is None - v = p(b"text/html; charset=UTF-8") - assert v == (b'text', b'html', {b'charset': b'UTF-8'}) + v = p("text/html; charset=UTF-8") + assert v == ('text', 'html', {'charset': 'UTF-8'}) diff --git a/test/websockets/test_websockets.py b/test/websockets/test_websockets.py index 48acc2d6..4ae4cf45 100644 --- a/test/websockets/test_websockets.py +++ b/test/websockets/test_websockets.py @@ -64,15 +64,14 @@ class WebSocketsClient(tcp.TCPClient): preamble = b'GET / HTTP/1.1' self.wfile.write(preamble + b"\r\n") headers = self.protocol.client_handshake_headers() - self.client_nonce = headers["sec-websocket-key"] + self.client_nonce = headers["sec-websocket-key"].encode("ascii") self.wfile.write(bytes(headers) + b"\r\n") self.wfile.flush() resp = read_response(self.rfile, treq(method="GET")) server_nonce = self.protocol.check_server_handshake(resp.headers) - if not server_nonce == self.protocol.create_server_nonce( - self.client_nonce): + if not server_nonce == self.protocol.create_server_nonce(self.client_nonce): self.close() def read_next_message(self): @@ -207,14 +206,14 @@ class TestFrameHeader: fin=True, payload_length=10 ) - assert f.human_readable() + assert repr(f) f = websockets.FrameHeader() - assert f.human_readable() + assert repr(f) def test_funky(self): f = websockets.FrameHeader(masking_key=b"test", mask=False) - bytes = f.to_bytes() - f2 = websockets.FrameHeader.from_file(tutils.treader(bytes)) + raw = bytes(f) + f2 = websockets.FrameHeader.from_file(tutils.treader(raw)) assert not f2.mask def test_violations(self): |