aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/http/http1/test_assemble.py6
-rw-r--r--test/http/http1/test_read.py22
-rw-r--r--test/http/test_authentication.py12
-rw-r--r--test/http/test_headers.py149
-rw-r--r--test/http/test_models.py152
-rw-r--r--test/test_utils.py20
-rw-r--r--test/websockets/test_websockets.py13
7 files changed, 186 insertions, 188 deletions
diff --git a/test/http/http1/test_assemble.py b/test/http/http1/test_assemble.py
index 2d250909..963e7549 100644
--- a/test/http/http1/test_assemble.py
+++ b/test/http/http1/test_assemble.py
@@ -77,16 +77,16 @@ def test_assemble_request_line():
def test_assemble_request_headers():
# https://github.com/mitmproxy/mitmproxy/issues/186
r = treq(body=b"")
- r.headers[b"Transfer-Encoding"] = b"chunked"
+ r.headers["Transfer-Encoding"] = "chunked"
c = _assemble_request_headers(r)
assert b"Transfer-Encoding" in c
- assert b"Host" in _assemble_request_headers(treq(headers=Headers()))
+ assert b"host" in _assemble_request_headers(treq(headers=Headers()))
def test_assemble_response_headers():
# https://github.com/mitmproxy/mitmproxy/issues/186
r = tresp(body=b"")
- r.headers["Transfer-Encoding"] = b"chunked"
+ r.headers["Transfer-Encoding"] = "chunked"
c = _assemble_response_headers(r)
assert b"Transfer-Encoding" in c
diff --git a/test/http/http1/test_read.py b/test/http/http1/test_read.py
index 55def2a5..9eb02a24 100644
--- a/test/http/http1/test_read.py
+++ b/test/http/http1/test_read.py
@@ -1,9 +1,7 @@
from __future__ import absolute_import, print_function, division
from io import BytesIO
import textwrap
-
from mock import Mock
-
from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect
from netlib.http import Headers
from netlib.http.http1.read import (
@@ -35,7 +33,7 @@ def test_read_request_head():
rfile.first_byte_timestamp = 42
r = read_request_head(rfile)
assert r.method == b"GET"
- assert r.headers["Content-Length"] == b"4"
+ assert r.headers["Content-Length"] == "4"
assert r.body is None
assert rfile.reset_timestamps.called
assert r.timestamp_start == 42
@@ -62,7 +60,7 @@ def test_read_response_head():
rfile.first_byte_timestamp = 42
r = read_response_head(rfile)
assert r.status_code == 418
- assert r.headers["Content-Length"] == b"4"
+ assert r.headers["Content-Length"] == "4"
assert r.body is None
assert rfile.reset_timestamps.called
assert r.timestamp_start == 42
@@ -76,14 +74,12 @@ class TestReadBody(object):
assert body == b"foo"
assert rfile.read() == b"bar"
-
def test_known_size(self):
rfile = BytesIO(b"foobar")
body = b"".join(read_body(rfile, 3))
assert body == b"foo"
assert rfile.read() == b"bar"
-
def test_known_size_limit(self):
rfile = BytesIO(b"foobar")
with raises(HttpException):
@@ -99,7 +95,6 @@ class TestReadBody(object):
body = b"".join(read_body(rfile, -1))
assert body == b"foobar"
-
def test_unknown_size_limit(self):
rfile = BytesIO(b"foobar")
with raises(HttpException):
@@ -121,13 +116,13 @@ def test_connection_close():
def test_expected_http_body_size():
# Expect: 100-continue
assert expected_http_body_size(
- treq(headers=Headers(expect=b"100-continue", content_length=b"42"))
+ treq(headers=Headers(expect="100-continue", content_length="42"))
) == 0
# http://tools.ietf.org/html/rfc7230#section-3.3
assert expected_http_body_size(
treq(method=b"HEAD"),
- tresp(headers=Headers(content_length=b"42"))
+ tresp(headers=Headers(content_length="42"))
) == 0
assert expected_http_body_size(
treq(method=b"CONNECT"),
@@ -141,17 +136,17 @@ def test_expected_http_body_size():
# chunked
assert expected_http_body_size(
- treq(headers=Headers(transfer_encoding=b"chunked")),
+ treq(headers=Headers(transfer_encoding="chunked")),
) is None
# explicit length
- for l in (b"foo", b"-7"):
+ for val in (b"foo", b"-7"):
with raises(HttpSyntaxException):
expected_http_body_size(
- treq(headers=Headers(content_length=l))
+ treq(headers=Headers(content_length=val))
)
assert expected_http_body_size(
- treq(headers=Headers(content_length=b"42"))
+ treq(headers=Headers(content_length="42"))
) == 42
# no length
@@ -286,6 +281,7 @@ class TestReadHeaders(object):
with raises(HttpSyntaxException):
self._read(data)
+
def test_read_chunked():
req = treq(body=None)
req.headers["Transfer-Encoding"] = "chunked"
diff --git a/test/http/test_authentication.py b/test/http/test_authentication.py
index a2aa774a..1df7cd9c 100644
--- a/test/http/test_authentication.py
+++ b/test/http/test_authentication.py
@@ -5,13 +5,13 @@ from netlib.http import authentication, Headers
def test_parse_http_basic_auth():
- vals = (b"basic", b"foo", b"bar")
+ vals = ("basic", "foo", "bar")
assert authentication.parse_http_basic_auth(
authentication.assemble_http_basic_auth(*vals)
) == vals
assert not authentication.parse_http_basic_auth("")
assert not authentication.parse_http_basic_auth("foo bar")
- v = b"basic " + binascii.b2a_base64(b"foo")
+ v = "basic " + binascii.b2a_base64(b"foo").decode("ascii")
assert not authentication.parse_http_basic_auth(v)
@@ -34,7 +34,7 @@ class TestPassManHtpasswd:
def test_simple(self):
pm = authentication.PassManHtpasswd(tutils.test_data.path("data/htpasswd"))
- vals = (b"basic", b"test", b"test")
+ vals = ("basic", "test", "test")
authentication.assemble_http_basic_auth(*vals)
assert pm.test("test", "test")
assert not pm.test("test", "foo")
@@ -73,7 +73,7 @@ class TestBasicProxyAuth:
ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
headers = Headers()
- vals = (b"basic", b"foo", b"bar")
+ vals = ("basic", "foo", "bar")
headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
assert ba.authenticate(headers)
@@ -86,12 +86,12 @@ class TestBasicProxyAuth:
headers[ba.AUTH_HEADER] = "foo"
assert not ba.authenticate(headers)
- vals = (b"foo", b"foo", b"bar")
+ vals = ("foo", "foo", "bar")
headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
assert not ba.authenticate(headers)
ba = authentication.BasicProxyAuth(authentication.PassMan(), "test")
- vals = (b"basic", b"foo", b"bar")
+ vals = ("basic", "foo", "bar")
headers[ba.AUTH_HEADER] = authentication.assemble_http_basic_auth(*vals)
assert not ba.authenticate(headers)
diff --git a/test/http/test_headers.py b/test/http/test_headers.py
new file mode 100644
index 00000000..f1af1feb
--- /dev/null
+++ b/test/http/test_headers.py
@@ -0,0 +1,149 @@
+from netlib.http import Headers
+from netlib.tutils import raises
+
+
+class TestHeaders(object):
+ def _2host(self):
+ return Headers(
+ [
+ [b"Host", b"example.com"],
+ [b"host", b"example.org"]
+ ]
+ )
+
+ def test_init(self):
+ headers = Headers()
+ assert len(headers) == 0
+
+ headers = Headers([[b"Host", b"example.com"]])
+ assert len(headers) == 1
+ assert headers["Host"] == "example.com"
+
+ headers = Headers(Host="example.com")
+ assert len(headers) == 1
+ assert headers["Host"] == "example.com"
+
+ headers = Headers(
+ [[b"Host", b"invalid"]],
+ Host="example.com"
+ )
+ assert len(headers) == 1
+ assert headers["Host"] == "example.com"
+
+ headers = Headers(
+ [[b"Host", b"invalid"], [b"Accept", b"text/plain"]],
+ Host="example.com"
+ )
+ assert len(headers) == 2
+ assert headers["Host"] == "example.com"
+ assert headers["Accept"] == "text/plain"
+
+ def test_getitem(self):
+ headers = Headers(Host="example.com")
+ assert headers["Host"] == "example.com"
+ assert headers["host"] == "example.com"
+ with raises(KeyError):
+ _ = headers["Accept"]
+
+ headers = self._2host()
+ assert headers["Host"] == "example.com, example.org"
+
+ def test_str(self):
+ headers = Headers(Host="example.com")
+ assert bytes(headers) == b"Host: example.com\r\n"
+
+ headers = Headers([
+ [b"Host", b"example.com"],
+ [b"Accept", b"text/plain"]
+ ])
+ assert bytes(headers) == b"Host: example.com\r\nAccept: text/plain\r\n"
+
+ headers = Headers()
+ assert bytes(headers) == b""
+
+ def test_setitem(self):
+ headers = Headers()
+ headers["Host"] = "example.com"
+ assert "Host" in headers
+ assert "host" in headers
+ assert headers["Host"] == "example.com"
+
+ headers["host"] = "example.org"
+ assert "Host" in headers
+ assert "host" in headers
+ assert headers["Host"] == "example.org"
+
+ headers["accept"] = "text/plain"
+ assert len(headers) == 2
+ assert "Accept" in headers
+ assert "Host" in headers
+
+ headers = self._2host()
+ assert len(headers.fields) == 2
+ headers["Host"] = "example.com"
+ assert len(headers.fields) == 1
+ assert "Host" in headers
+
+ def test_delitem(self):
+ headers = Headers(Host="example.com")
+ assert len(headers) == 1
+ del headers["host"]
+ assert len(headers) == 0
+ try:
+ del headers["host"]
+ except KeyError:
+ assert True
+ else:
+ assert False
+
+ headers = self._2host()
+ del headers["Host"]
+ assert len(headers) == 0
+
+ def test_keys(self):
+ headers = Headers(Host="example.com")
+ assert list(headers.keys()) == ["Host"]
+
+ headers = self._2host()
+ assert list(headers.keys()) == ["Host"]
+
+ def test_eq_ne(self):
+ headers1 = Headers(Host="example.com")
+ headers2 = Headers(host="example.com")
+ assert not (headers1 == headers2)
+ assert headers1 != headers2
+
+ headers1 = Headers(Host="example.com")
+ headers2 = Headers(Host="example.com")
+ assert headers1 == headers2
+ assert not (headers1 != headers2)
+
+ assert headers1 != 42
+
+ def test_get_all(self):
+ headers = self._2host()
+ assert headers.get_all("host") == ["example.com", "example.org"]
+ assert headers.get_all("accept") == []
+
+ def test_set_all(self):
+ headers = Headers(Host="example.com")
+ headers.set_all("Accept", ["text/plain"])
+ assert len(headers) == 2
+ assert "accept" in headers
+
+ headers = self._2host()
+ headers.set_all("Host", ["example.org"])
+ assert headers["host"] == "example.org"
+
+ headers.set_all("Host", ["example.org", "example.net"])
+ assert headers["host"] == "example.org, example.net"
+
+ def test_state(self):
+ headers = self._2host()
+ assert len(headers.get_state()) == 2
+ assert headers == Headers.from_state(headers.get_state())
+
+ headers2 = Headers()
+ assert headers != headers2
+ headers2.load_state(headers.get_state())
+ assert headers == headers2
diff --git a/test/http/test_models.py b/test/http/test_models.py
index d420b22b..10e0795a 100644
--- a/test/http/test_models.py
+++ b/test/http/test_models.py
@@ -58,20 +58,20 @@ class TestRequest(object):
req = tutils.treq()
req.headers["Accept-Encoding"] = "foobar"
req.anticomp()
- assert req.headers["Accept-Encoding"] == b"identity"
+ assert req.headers["Accept-Encoding"] == "identity"
def test_constrain_encoding(self):
req = tutils.treq()
req.headers["Accept-Encoding"] = "identity, gzip, foo"
req.constrain_encoding()
- assert b"foo" not in req.headers["Accept-Encoding"]
+ assert "foo" not in req.headers["Accept-Encoding"]
def test_update_host(self):
req = tutils.treq()
req.headers["Host"] = ""
req.host = "foobar"
req.update_host_header()
- assert req.headers["Host"] == b"foobar"
+ assert req.headers["Host"] == "foobar"
def test_get_form(self):
req = tutils.treq()
@@ -393,149 +393,3 @@ class TestResponse(object):
v = resp.get_cookies()
assert len(v) == 1
assert v["foo"] == [["bar", ODictCaseless()]]
-
-
-class TestHeaders(object):
- def _2host(self):
- return Headers(
- [
- [b"Host", b"example.com"],
- [b"host", b"example.org"]
- ]
- )
-
- def test_init(self):
- headers = Headers()
- assert len(headers) == 0
-
- headers = Headers([[b"Host", b"example.com"]])
- assert len(headers) == 1
- assert headers["Host"] == b"example.com"
-
- headers = Headers(Host="example.com")
- assert len(headers) == 1
- assert headers["Host"] == b"example.com"
-
- headers = Headers(
- [[b"Host", b"invalid"]],
- Host="example.com"
- )
- assert len(headers) == 1
- assert headers["Host"] == b"example.com"
-
- headers = Headers(
- [[b"Host", b"invalid"], [b"Accept", b"text/plain"]],
- Host="example.com"
- )
- assert len(headers) == 2
- assert headers["Host"] == b"example.com"
- assert headers["Accept"] == b"text/plain"
-
- def test_getitem(self):
- headers = Headers(Host="example.com")
- assert headers["Host"] == b"example.com"
- assert headers["host"] == b"example.com"
- tutils.raises(KeyError, headers.__getitem__, "Accept")
-
- headers = self._2host()
- assert headers["Host"] == b"example.com, example.org"
-
- def test_str(self):
- headers = Headers(Host="example.com")
- assert bytes(headers) == b"Host: example.com\r\n"
-
- headers = Headers([
- [b"Host", b"example.com"],
- [b"Accept", b"text/plain"]
- ])
- assert bytes(headers) == b"Host: example.com\r\nAccept: text/plain\r\n"
-
- headers = Headers()
- assert bytes(headers) == b""
-
- def test_setitem(self):
- headers = Headers()
- headers["Host"] = "example.com"
- assert "Host" in headers
- assert "host" in headers
- assert headers["Host"] == b"example.com"
-
- headers["host"] = "example.org"
- assert "Host" in headers
- assert "host" in headers
- assert headers["Host"] == b"example.org"
-
- headers["accept"] = "text/plain"
- assert len(headers) == 2
- assert "Accept" in headers
- assert "Host" in headers
-
- headers = self._2host()
- assert len(headers.fields) == 2
- headers["Host"] = "example.com"
- assert len(headers.fields) == 1
- assert "Host" in headers
-
- def test_delitem(self):
- headers = Headers(Host="example.com")
- assert len(headers) == 1
- del headers["host"]
- assert len(headers) == 0
- try:
- del headers["host"]
- except KeyError:
- assert True
- else:
- assert False
-
- headers = self._2host()
- del headers["Host"]
- assert len(headers) == 0
-
- def test_keys(self):
- headers = Headers(Host="example.com")
- assert list(headers.keys()) == [b"Host"]
-
- headers = self._2host()
- assert list(headers.keys()) == [b"Host"]
-
- def test_eq_ne(self):
- headers1 = Headers(Host="example.com")
- headers2 = Headers(host="example.com")
- assert not (headers1 == headers2)
- assert headers1 != headers2
-
- headers1 = Headers(Host="example.com")
- headers2 = Headers(Host="example.com")
- assert headers1 == headers2
- assert not (headers1 != headers2)
-
- assert headers1 != 42
-
- def test_get_all(self):
- headers = self._2host()
- assert headers.get_all("host") == [b"example.com", b"example.org"]
- assert headers.get_all("accept") == []
-
- def test_set_all(self):
- headers = Headers(Host="example.com")
- headers.set_all("Accept", ["text/plain"])
- assert len(headers) == 2
- assert "accept" in headers
-
- headers = self._2host()
- headers.set_all("Host", ["example.org"])
- assert headers["host"] == b"example.org"
-
- headers.set_all("Host", ["example.org", "example.net"])
- assert headers["host"] == b"example.org, example.net"
-
- def test_state(self):
- headers = self._2host()
- assert len(headers.get_state()) == 2
- assert headers == Headers.from_state(headers.get_state())
-
- headers2 = Headers()
- assert headers != headers2
- headers2.load_state(headers.get_state())
- assert headers == headers2
diff --git a/test/test_utils.py b/test/test_utils.py
index 8f4b4059..17636cc4 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -103,17 +103,17 @@ def test_get_header_tokens():
headers = Headers()
assert utils.get_header_tokens(headers, "foo") == []
headers["foo"] = "bar"
- assert utils.get_header_tokens(headers, "foo") == [b"bar"]
+ assert utils.get_header_tokens(headers, "foo") == ["bar"]
headers["foo"] = "bar, voing"
- assert utils.get_header_tokens(headers, "foo") == [b"bar", b"voing"]
+ assert utils.get_header_tokens(headers, "foo") == ["bar", "voing"]
headers.set_all("foo", ["bar, voing", "oink"])
- assert utils.get_header_tokens(headers, "foo") == [b"bar", b"voing", b"oink"]
+ assert utils.get_header_tokens(headers, "foo") == ["bar", "voing", "oink"]
def test_multipartdecode():
- boundary = b'somefancyboundary'
+ boundary = 'somefancyboundary'
headers = Headers(
- content_type=b'multipart/form-data; boundary=' + boundary
+ content_type='multipart/form-data; boundary=' + boundary
)
content = (
"--{0}\n"
@@ -122,7 +122,7 @@ def test_multipartdecode():
"--{0}\n"
"Content-Disposition: form-data; name=\"field2\"\n\n"
"value2\n"
- "--{0}--".format(boundary.decode()).encode()
+ "--{0}--".format(boundary).encode()
)
form = utils.multipartdecode(headers, content)
@@ -134,8 +134,8 @@ def test_multipartdecode():
def test_parse_content_type():
p = utils.parse_content_type
- assert p(b"text/html") == (b"text", b"html", {})
- assert p(b"text") is None
+ assert p("text/html") == ("text", "html", {})
+ assert p("text") is None
- v = p(b"text/html; charset=UTF-8")
- assert v == (b'text', b'html', {b'charset': b'UTF-8'})
+ v = p("text/html; charset=UTF-8")
+ assert v == ('text', 'html', {'charset': 'UTF-8'})
diff --git a/test/websockets/test_websockets.py b/test/websockets/test_websockets.py
index 48acc2d6..4ae4cf45 100644
--- a/test/websockets/test_websockets.py
+++ b/test/websockets/test_websockets.py
@@ -64,15 +64,14 @@ class WebSocketsClient(tcp.TCPClient):
preamble = b'GET / HTTP/1.1'
self.wfile.write(preamble + b"\r\n")
headers = self.protocol.client_handshake_headers()
- self.client_nonce = headers["sec-websocket-key"]
+ self.client_nonce = headers["sec-websocket-key"].encode("ascii")
self.wfile.write(bytes(headers) + b"\r\n")
self.wfile.flush()
resp = read_response(self.rfile, treq(method="GET"))
server_nonce = self.protocol.check_server_handshake(resp.headers)
- if not server_nonce == self.protocol.create_server_nonce(
- self.client_nonce):
+ if not server_nonce == self.protocol.create_server_nonce(self.client_nonce):
self.close()
def read_next_message(self):
@@ -207,14 +206,14 @@ class TestFrameHeader:
fin=True,
payload_length=10
)
- assert f.human_readable()
+ assert repr(f)
f = websockets.FrameHeader()
- assert f.human_readable()
+ assert repr(f)
def test_funky(self):
f = websockets.FrameHeader(masking_key=b"test", mask=False)
- bytes = f.to_bytes()
- f2 = websockets.FrameHeader.from_file(tutils.treader(bytes))
+ raw = bytes(f)
+ f2 = websockets.FrameHeader.from_file(tutils.treader(raw))
assert not f2.mask
def test_violations(self):