From 11e7f476bd4bbcd6d072fa3659f628ae3a19705d Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Tue, 15 Sep 2015 19:12:15 +0200 Subject: wip --- test/http/http1/test_protocol.py | 159 ++++++++++++++++----------------------- test/http/http2/test_frames.py | 4 +- test/http/test_authentication.py | 2 +- test/http/test_semantics.py | 2 +- test/test_encoding.py | 24 +++--- test/test_utils.py | 75 +++++++++--------- test/test_version_check.py | 8 +- test/tservers.py | 8 +- 8 files changed, 131 insertions(+), 151 deletions(-) (limited to 'test') diff --git a/test/http/http1/test_protocol.py b/test/http/http1/test_protocol.py index f7c615bd..bdcba5cb 100644 --- a/test/http/http1/test_protocol.py +++ b/test/http/http1/test_protocol.py @@ -1,9 +1,12 @@ -import cStringIO +from io import BytesIO import textwrap +from http.http1.protocol import _parse_authority_form +from netlib.exceptions import HttpSyntaxException, HttpReadDisconnect, HttpException -from netlib import http, odict, tcp, tutils +from netlib import http, tcp, tutils from netlib.http import semantics, Headers -from netlib.http.http1 import HTTP1Protocol +from netlib.http.http1 import HTTP1Protocol, read_message_body, read_request, \ + read_message_body_chunked, expected_http_body_size from ... import tservers @@ -14,8 +17,8 @@ class NoContentLengthHTTPHandler(tcp.BaseHandler): def mock_protocol(data=''): - rfile = cStringIO.StringIO(data) - wfile = cStringIO.StringIO() + rfile = BytesIO(data) + wfile = BytesIO() return HTTP1Protocol(rfile=rfile, wfile=wfile) @@ -37,53 +40,35 @@ def test_stripped_chunked_encoding_no_content(): assert "Content-Length" in mock_protocol()._assemble_response_headers(r) -def test_has_chunked_encoding(): - headers = http.Headers() - assert not HTTP1Protocol.has_chunked_encoding(headers) - headers["transfer-encoding"] = "chunked" - assert HTTP1Protocol.has_chunked_encoding(headers) - - def test_read_chunked(): - headers = http.Headers() - headers["transfer-encoding"] = "chunked" + req = tutils.treq(None) + req.headers["Transfer-Encoding"] = "chunked" - data = "1\r\na\r\n0\r\n" - tutils.raises( - "malformed chunked body", - mock_protocol(data).read_http_body, - headers, None, "GET", None, True - ) + data = b"1\r\na\r\n0\r\n" + with tutils.raises(HttpSyntaxException): + read_message_body(BytesIO(data), req) - data = "1\r\na\r\n0\r\n\r\n" - assert mock_protocol(data).read_http_body(headers, None, "GET", None, True) == "a" + data = b"1\r\na\r\n0\r\n\r\n" + assert read_message_body(BytesIO(data), req) == b"a" - data = "\r\n\r\n1\r\na\r\n0\r\n\r\n" - assert mock_protocol(data).read_http_body(headers, None, "GET", None, True) == "a" + data = b"\r\n\r\n1\r\na\r\n1\r\nb\r\n0\r\n\r\n" + assert read_message_body(BytesIO(data), req) == b"ab" - data = "\r\n" - tutils.raises( - "closed prematurely", - mock_protocol(data).read_http_body, - headers, None, "GET", None, True - ) + data = b"\r\n" + with tutils.raises("closed prematurely"): + read_message_body(BytesIO(data), req) - data = "1\r\nfoo" - tutils.raises( - "malformed chunked body", - mock_protocol(data).read_http_body, - headers, None, "GET", None, True - ) + data = b"1\r\nfoo" + with tutils.raises("malformed chunked body"): + read_message_body(BytesIO(data), req) - data = "foo\r\nfoo" - tutils.raises( - http.HttpError, - mock_protocol(data).read_http_body, - headers, None, "GET", None, True - ) + data = b"foo\r\nfoo" + with tutils.raises(HttpSyntaxException): + read_message_body(BytesIO(data), req) - data = "5\r\naaaaa\r\n0\r\n\r\n" - tutils.raises("too large", mock_protocol(data).read_http_body, headers, 2, "GET", None, True) + data = b"5\r\naaaaa\r\n0\r\n\r\n" + with tutils.raises("too large"): + read_message_body(BytesIO(data), req, limit=2) def test_connection_close(): @@ -171,52 +156,37 @@ def test_read_http_body(): def test_expected_http_body_size(): # gibber in the content-length field headers = Headers(content_length="foo") - assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) is None + with tutils.raises(HttpSyntaxException): + expected_http_body_size(headers, False, "GET", 200) is None # negative number in the content-length field headers = Headers(content_length="-7") - assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) is None + with tutils.raises(HttpSyntaxException): + expected_http_body_size(headers, False, "GET", 200) is None # explicit length headers = Headers(content_length="5") - assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) == 5 + assert expected_http_body_size(headers, False, "GET", 200) == 5 # no length headers = Headers() - assert HTTP1Protocol.expected_http_body_size(headers, False, "GET", 200) == -1 + assert expected_http_body_size(headers, False, "GET", 200) == -1 # no length request headers = Headers() - assert HTTP1Protocol.expected_http_body_size(headers, True, "GET", None) == 0 - - -def test_get_request_line(): - data = "\nfoo" - p = mock_protocol(data) - assert p._get_request_line() == "foo" - assert not p._get_request_line() - - -def test_parse_http_protocol(): - assert HTTP1Protocol._parse_http_protocol("HTTP/1.1") == (1, 1) - assert HTTP1Protocol._parse_http_protocol("HTTP/0.0") == (0, 0) - assert not HTTP1Protocol._parse_http_protocol("HTTP/a.1") - assert not HTTP1Protocol._parse_http_protocol("HTTP/1.a") - assert not HTTP1Protocol._parse_http_protocol("foo/0.0") - assert not HTTP1Protocol._parse_http_protocol("HTTP/x") + assert expected_http_body_size(headers, True, "GET", None) == 0 + # expect header + headers = Headers(content_length="5", expect="100-continue") + assert expected_http_body_size(headers, True, "GET", None) == 0 def test_parse_init_connect(): - assert HTTP1Protocol._parse_init_connect("CONNECT host.com:443 HTTP/1.0") - assert not HTTP1Protocol._parse_init_connect("C\xfeONNECT host.com:443 HTTP/1.0") - assert not HTTP1Protocol._parse_init_connect("CONNECT \0host.com:443 HTTP/1.0") - assert not HTTP1Protocol._parse_init_connect("CONNECT host.com:444444 HTTP/1.0") - assert not HTTP1Protocol._parse_init_connect("bogus") - assert not HTTP1Protocol._parse_init_connect("GET host.com:443 HTTP/1.0") - assert not HTTP1Protocol._parse_init_connect("CONNECT host.com443 HTTP/1.0") - assert not HTTP1Protocol._parse_init_connect("CONNECT host.com:443 foo/1.0") - assert not HTTP1Protocol._parse_init_connect("CONNECT host.com:foo HTTP/1.0") + assert _parse_authority_form(b"CONNECT host.com:443 HTTP/1.0") + tutils.raises(ValueError,_parse_authority_form, b"\0host.com:443") + tutils.raises(ValueError,_parse_authority_form, b"host.com:444444") + tutils.raises(ValueError,_parse_authority_form, b"CONNECT host.com443 HTTP/1.0") + tutils.raises(ValueError,_parse_authority_form, b"CONNECT host.com:foo HTTP/1.0") def test_parse_init_proxy(): - u = "GET http://foo.com:8888/test HTTP/1.1" - m, s, h, po, pa, httpversion = HTTP1Protocol._parse_init_proxy(u) + u = b"GET http://foo.com:8888/test HTTP/1.1" + m, s, h, po, pa, httpversion = HTTP1Protocol._parse_absolute_form(u) assert m == "GET" assert s == "http" assert h == "foo.com" @@ -225,11 +195,14 @@ def test_parse_init_proxy(): assert httpversion == (1, 1) u = "G\xfeET http://foo.com:8888/test HTTP/1.1" - assert not HTTP1Protocol._parse_init_proxy(u) + assert not HTTP1Protocol._parse_absolute_form(u) - assert not HTTP1Protocol._parse_init_proxy("invalid") - assert not HTTP1Protocol._parse_init_proxy("GET invalid HTTP/1.1") - assert not HTTP1Protocol._parse_init_proxy("GET http://foo.com:8888/test foo/1.1") + with tutils.raises(ValueError): + assert not HTTP1Protocol._parse_absolute_form("invalid") + with tutils.raises(ValueError): + assert not HTTP1Protocol._parse_absolute_form("GET invalid HTTP/1.1") + with tutils.raises(ValueError): + assert not HTTP1Protocol._parse_absolute_form("GET http://foo.com:8888/test foo/1.1") def test_parse_init_http(): @@ -317,15 +290,11 @@ class TestReadRequest(object): "get / HTTP/1.1\r\nfoo" ) tutils.raises( - tcp.NetLibDisconnect, + HttpReadDisconnect, self.tst, "\r\n" ) - def test_empty(self): - v = self.tst("", allow_empty=True) - assert isinstance(v, semantics.EmptyRequest) - def test_asterisk_form_in(self): v = self.tst("OPTIONS * HTTP/1.1") assert v.form_in == "relative" @@ -356,18 +325,18 @@ class TestReadRequest(object): assert v.host == "foo.com" def test_expect(self): - data = "".join( - "GET / HTTP/1.1\r\n" - "Content-Length: 3\r\n" - "Expect: 100-continue\r\n\r\n" - "foobar" + data = ( + b"GET / HTTP/1.1\r\n" + b"Content-Length: 3\r\n" + b"Expect: 100-continue\r\n" + b"\r\n" + b"foobar" ) - p = mock_protocol(data) - v = p.read_request() - assert p.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n" - assert v.body == "foo" - assert p.tcp_handler.rfile.read(3) == "bar" + rfile = BytesIO(data) + r = read_request(rfile) + assert r.body == b"" + assert rfile.read(-1) == b"foobar" class TestReadResponse(object): diff --git a/test/http/http2/test_frames.py b/test/http/http2/test_frames.py index 5d5cb0ba..efdb55e2 100644 --- a/test/http/http2/test_frames.py +++ b/test/http/http2/test_frames.py @@ -1,4 +1,4 @@ -import cStringIO +from io import BytesIO from nose.tools import assert_equal from netlib import tcp, tutils @@ -7,7 +7,7 @@ from netlib.http.http2.frame import * def hex_to_file(data): data = data.decode('hex') - return tcp.Reader(cStringIO.StringIO(data)) + return tcp.Reader(BytesIO(data)) def test_invalid_flags(): diff --git a/test/http/test_authentication.py b/test/http/test_authentication.py index 17c91fe5..ee192dd7 100644 --- a/test/http/test_authentication.py +++ b/test/http/test_authentication.py @@ -5,7 +5,7 @@ from netlib.http import authentication, Headers def test_parse_http_basic_auth(): - vals = ("basic", "foo", "bar") + vals = (b"basic", b"foo", b"bar") assert authentication.parse_http_basic_auth( authentication.assemble_http_basic_auth(*vals) ) == vals diff --git a/test/http/test_semantics.py b/test/http/test_semantics.py index 6dcbbe07..44d3c85e 100644 --- a/test/http/test_semantics.py +++ b/test/http/test_semantics.py @@ -475,7 +475,7 @@ class TestHeaders(object): def test_str(self): headers = semantics.Headers(Host="example.com") - assert str(headers) == "Host: example.com\r\n" + assert bytes(headers) == "Host: example.com\r\n" headers = semantics.Headers([ ["Host", "example.com"], diff --git a/test/test_encoding.py b/test/test_encoding.py index 612aea89..9da3a38d 100644 --- a/test/test_encoding.py +++ b/test/test_encoding.py @@ -9,25 +9,29 @@ def test_identity(): def test_gzip(): - assert "string" == encoding.decode( + assert b"string" == encoding.decode( "gzip", encoding.encode( "gzip", - "string")) - assert None == encoding.decode("gzip", "bogus") + b"string" + ) + ) + assert encoding.decode("gzip", b"bogus") is None def test_deflate(): - assert "string" == encoding.decode( + assert b"string" == encoding.decode( "deflate", encoding.encode( "deflate", - "string")) - assert "string" == encoding.decode( + b"string" + ) + ) + assert b"string" == encoding.decode( "deflate", encoding.encode( "deflate", - "string")[ - 2:- - 4]) - assert None == encoding.decode("deflate", "bogus") + b"string" + )[2:-4] + ) + assert encoding.decode("deflate", b"bogus") is None diff --git a/test/test_utils.py b/test/test_utils.py index 9dba5d35..8b2ddae4 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -36,46 +36,51 @@ def test_pretty_size(): def test_parse_url(): - assert not utils.parse_url("") + with tutils.raises(ValueError): + utils.parse_url("") - u = "http://foo.com:8888/test" - s, h, po, pa = utils.parse_url(u) - assert s == "http" - assert h == "foo.com" + s, h, po, pa = utils.parse_url(b"http://foo.com:8888/test") + assert s == b"http" + assert h == b"foo.com" assert po == 8888 - assert pa == "/test" + assert pa == b"/test" s, h, po, pa = utils.parse_url("http://foo/bar") - assert s == "http" - assert h == "foo" + assert s == b"http" + assert h == b"foo" assert po == 80 - assert pa == "/bar" + assert pa == b"/bar" - s, h, po, pa = utils.parse_url("http://user:pass@foo/bar") - assert s == "http" - assert h == "foo" + s, h, po, pa = utils.parse_url(b"http://user:pass@foo/bar") + assert s == b"http" + assert h == b"foo" assert po == 80 - assert pa == "/bar" + assert pa == b"/bar" - s, h, po, pa = utils.parse_url("http://foo") - assert pa == "/" + s, h, po, pa = utils.parse_url(b"http://foo") + assert pa == b"/" - s, h, po, pa = utils.parse_url("https://foo") + s, h, po, pa = utils.parse_url(b"https://foo") assert po == 443 - assert not utils.parse_url("https://foo:bar") - assert not utils.parse_url("https://foo:") + with tutils.raises(ValueError): + utils.parse_url(b"https://foo:bar") # Invalid IDNA - assert not utils.parse_url("http://\xfafoo") + with tutils.raises(ValueError): + utils.parse_url("http://\xfafoo") # Invalid PATH - assert not utils.parse_url("http:/\xc6/localhost:56121") + with tutils.raises(ValueError): + utils.parse_url("http:/\xc6/localhost:56121") # Null byte in host - assert not utils.parse_url("http://foo\0") + with tutils.raises(ValueError): + utils.parse_url("http://foo\0") # Port out of range - assert not utils.parse_url("http://foo:999999") + _, _, port, _ = utils.parse_url("http://foo:999999") + assert port == 80 # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt - assert not utils.parse_url('http://lo[calhost') + with tutils.raises(ValueError): + utils.parse_url('http://lo[calhost') def test_unparse_url(): @@ -106,23 +111,25 @@ def test_get_header_tokens(): def test_multipartdecode(): - boundary = 'somefancyboundary' + boundary = b'somefancyboundary' headers = Headers( - content_type='multipart/form-data; boundary=%s' % boundary + content_type=b'multipart/form-data; boundary=' + boundary + ) + content = ( + "--{0}\n" + "Content-Disposition: form-data; name=\"field1\"\n\n" + "value1\n" + "--{0}\n" + "Content-Disposition: form-data; name=\"field2\"\n\n" + "value2\n" + "--{0}--".format(boundary).encode("ascii") ) - content = "--{0}\n" \ - "Content-Disposition: form-data; name=\"field1\"\n\n" \ - "value1\n" \ - "--{0}\n" \ - "Content-Disposition: form-data; name=\"field2\"\n\n" \ - "value2\n" \ - "--{0}--".format(boundary) form = utils.multipartdecode(headers, content) assert len(form) == 2 - assert form[0] == ('field1', 'value1') - assert form[1] == ('field2', 'value2') + assert form[0] == (b"field1", b"value1") + assert form[1] == (b"field2", b"value2") def test_parse_content_type(): diff --git a/test/test_version_check.py b/test/test_version_check.py index 9a127814..ec2396fe 100644 --- a/test/test_version_check.py +++ b/test/test_version_check.py @@ -1,11 +1,11 @@ -import cStringIO +from io import StringIO import mock from netlib import version_check, version @mock.patch("sys.exit") def test_check_mitmproxy_version(sexit): - fp = cStringIO.StringIO() + fp = StringIO() version_check.check_mitmproxy_version(version.IVERSION, fp=fp) assert not fp.getvalue() assert not sexit.called @@ -18,7 +18,7 @@ def test_check_mitmproxy_version(sexit): @mock.patch("sys.exit") def test_check_pyopenssl_version(sexit): - fp = cStringIO.StringIO() + fp = StringIO() version_check.check_pyopenssl_version(fp=fp) assert not fp.getvalue() assert not sexit.called @@ -32,7 +32,7 @@ def test_check_pyopenssl_version(sexit): @mock.patch("OpenSSL.__version__") def test_unparseable_pyopenssl_version(version, sexit): version.split.return_value = ["foo", "bar"] - fp = cStringIO.StringIO() + fp = StringIO() version_check.check_pyopenssl_version(fp=fp) assert "Cannot parse" in fp.getvalue() assert not sexit.called diff --git a/test/tservers.py b/test/tservers.py index 682a9144..1f4ce725 100644 --- a/test/tservers.py +++ b/test/tservers.py @@ -1,7 +1,7 @@ from __future__ import (absolute_import, print_function, division) import threading -import Queue -import cStringIO +from six.moves import queue +from io import StringIO import OpenSSL from netlib import tcp from netlib import tutils @@ -27,7 +27,7 @@ class ServerTestBase(object): @classmethod def setupAll(cls): - cls.q = Queue.Queue() + cls.q = queue.Queue() s = cls.makeserver() cls.port = s.address.port cls.server = ServerThread(s) @@ -102,6 +102,6 @@ class TServer(tcp.TCPServer): h.finish() def handle_error(self, connection, client_address, fp=None): - s = cStringIO.StringIO() + s = StringIO() tcp.TCPServer.handle_error(self, connection, client_address, s) self.q.put(s.getvalue()) -- cgit v1.2.3 From a077d8877d210562f703c23e9625e8467c81222d Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Wed, 16 Sep 2015 00:04:23 +0200 Subject: finish netlib.http.http1 refactor --- test/http/http1/test_assemble.py | 91 ++++++ test/http/http1/test_protocol.py | 466 ------------------------------ test/http/http1/test_read.py | 313 ++++++++++++++++++++ test/http/http2/test_frames.py | 2 +- test/http/http2/test_protocol.py | 16 +- test/http/test_exceptions.py | 6 - test/http/test_models.py | 540 ++++++++++++++++++++++++++++++++++ test/http/test_semantics.py | 573 ------------------------------------- test/websockets/test_websockets.py | 16 +- 9 files changed, 960 insertions(+), 1063 deletions(-) create mode 100644 test/http/http1/test_assemble.py create mode 100644 test/http/http1/test_read.py delete mode 100644 test/http/test_exceptions.py create mode 100644 test/http/test_models.py delete mode 100644 test/http/test_semantics.py (limited to 'test') diff --git a/test/http/http1/test_assemble.py b/test/http/http1/test_assemble.py new file mode 100644 index 00000000..8a0a54f1 --- /dev/null +++ b/test/http/http1/test_assemble.py @@ -0,0 +1,91 @@ +from __future__ import absolute_import, print_function, division +from netlib.exceptions import HttpException +from netlib.http import CONTENT_MISSING, Headers +from netlib.http.http1.assemble import ( + assemble_request, assemble_request_head, assemble_response, + assemble_response_head, _assemble_request_line, _assemble_request_headers, + _assemble_response_headers +) +from netlib.tutils import treq, raises, tresp + + +def test_assemble_request(): + c = assemble_request(treq()) == ( + b"GET /path HTTP/1.1\r\n" + b"header: qvalue\r\n" + b"Host: address:22\r\n" + b"Content-Length: 7\r\n" + b"\r\n" + b"content" + ) + + with raises(HttpException): + assemble_request(treq(body=CONTENT_MISSING)) + + +def test_assemble_request_head(): + c = assemble_request_head(treq()) + assert b"GET" in c + assert b"qvalue" in c + assert b"content" not in c + + +def test_assemble_response(): + c = assemble_response(tresp()) == ( + b"HTTP/1.1 200 OK\r\n" + b"header-response: svalue\r\n" + b"Content-Length: 7\r\n" + b"\r\n" + b"message" + ) + + with raises(HttpException): + assemble_response(tresp(body=CONTENT_MISSING)) + + +def test_assemble_response_head(): + c = assemble_response_head(tresp()) + assert b"200" in c + assert b"svalue" in c + assert b"message" not in c + + +def test_assemble_request_line(): + assert _assemble_request_line(treq()) == b"GET /path HTTP/1.1" + + authority_request = treq(method=b"CONNECT", form_in="authority") + assert _assemble_request_line(authority_request) == b"CONNECT address:22 HTTP/1.1" + + absolute_request = treq(form_in="absolute") + assert _assemble_request_line(absolute_request) == b"GET http://address:22/path HTTP/1.1" + + with raises(RuntimeError): + _assemble_request_line(treq(), "invalid_form") + + +def test_assemble_request_headers(): + # https://github.com/mitmproxy/mitmproxy/issues/186 + r = treq(body=b"") + r.headers[b"Transfer-Encoding"] = b"chunked" + c = _assemble_request_headers(r) + assert b"Content-Length" in c + assert b"Transfer-Encoding" not in c + + assert b"Host" in _assemble_request_headers(treq(headers=Headers())) + + assert b"Proxy-Connection" not in _assemble_request_headers( + treq(headers=Headers(Proxy_Connection="42")) + ) + + +def test_assemble_response_headers(): + # https://github.com/mitmproxy/mitmproxy/issues/186 + r = tresp(body=b"") + r.headers["Transfer-Encoding"] = b"chunked" + c = _assemble_response_headers(r) + assert b"Content-Length" in c + assert b"Transfer-Encoding" not in c + + assert b"Proxy-Connection" not in _assemble_response_headers( + tresp(headers=Headers(Proxy_Connection=b"42")) + ) diff --git a/test/http/http1/test_protocol.py b/test/http/http1/test_protocol.py index bdcba5cb..e69de29b 100644 --- a/test/http/http1/test_protocol.py +++ b/test/http/http1/test_protocol.py @@ -1,466 +0,0 @@ -from io import BytesIO -import textwrap -from http.http1.protocol import _parse_authority_form -from netlib.exceptions import HttpSyntaxException, HttpReadDisconnect, HttpException - -from netlib import http, tcp, tutils -from netlib.http import semantics, Headers -from netlib.http.http1 import HTTP1Protocol, read_message_body, read_request, \ - read_message_body_chunked, expected_http_body_size -from ... import tservers - - -class NoContentLengthHTTPHandler(tcp.BaseHandler): - def handle(self): - self.wfile.write("HTTP/1.1 200 OK\r\n\r\nbar\r\n\r\n") - self.wfile.flush() - - -def mock_protocol(data=''): - rfile = BytesIO(data) - wfile = BytesIO() - return HTTP1Protocol(rfile=rfile, wfile=wfile) - - -def match_http_string(data): - return textwrap.dedent(data).strip().replace('\n', '\r\n') - - -def test_stripped_chunked_encoding_no_content(): - """ - https://github.com/mitmproxy/mitmproxy/issues/186 - """ - - r = tutils.treq(content="") - r.headers["Transfer-Encoding"] = "chunked" - assert "Content-Length" in mock_protocol()._assemble_request_headers(r) - - r = tutils.tresp(content="") - r.headers["Transfer-Encoding"] = "chunked" - assert "Content-Length" in mock_protocol()._assemble_response_headers(r) - - -def test_read_chunked(): - req = tutils.treq(None) - req.headers["Transfer-Encoding"] = "chunked" - - data = b"1\r\na\r\n0\r\n" - with tutils.raises(HttpSyntaxException): - read_message_body(BytesIO(data), req) - - data = b"1\r\na\r\n0\r\n\r\n" - assert read_message_body(BytesIO(data), req) == b"a" - - data = b"\r\n\r\n1\r\na\r\n1\r\nb\r\n0\r\n\r\n" - assert read_message_body(BytesIO(data), req) == b"ab" - - data = b"\r\n" - with tutils.raises("closed prematurely"): - read_message_body(BytesIO(data), req) - - data = b"1\r\nfoo" - with tutils.raises("malformed chunked body"): - read_message_body(BytesIO(data), req) - - data = b"foo\r\nfoo" - with tutils.raises(HttpSyntaxException): - read_message_body(BytesIO(data), req) - - data = b"5\r\naaaaa\r\n0\r\n\r\n" - with tutils.raises("too large"): - read_message_body(BytesIO(data), req, limit=2) - - -def test_connection_close(): - headers = Headers() - assert HTTP1Protocol.connection_close((1, 0), headers) - assert not HTTP1Protocol.connection_close((1, 1), headers) - - headers["connection"] = "keep-alive" - assert not HTTP1Protocol.connection_close((1, 1), headers) - - headers["connection"] = "close" - assert HTTP1Protocol.connection_close((1, 1), headers) - - -def test_read_http_body_request(): - headers = Headers() - data = "testing" - assert mock_protocol(data).read_http_body(headers, None, "GET", None, True) == "" - - -def test_read_http_body_response(): - headers = Headers() - data = "testing" - assert mock_protocol(data).read_http_body(headers, None, "GET", 200, False) == "testing" - - -def test_read_http_body(): - # test default case - headers = Headers() - headers["content-length"] = "7" - data = "testing" - assert mock_protocol(data).read_http_body(headers, None, "GET", 200, False) == "testing" - - # test content length: invalid header - headers["content-length"] = "foo" - data = "testing" - tutils.raises( - http.HttpError, - mock_protocol(data).read_http_body, - headers, None, "GET", 200, False - ) - - # test content length: invalid header #2 - headers["content-length"] = "-1" - data = "testing" - tutils.raises( - http.HttpError, - mock_protocol(data).read_http_body, - headers, None, "GET", 200, False - ) - - # test content length: content length > actual content - headers["content-length"] = "5" - data = "testing" - tutils.raises( - http.HttpError, - mock_protocol(data).read_http_body, - headers, 4, "GET", 200, False - ) - - # test content length: content length < actual content - data = "testing" - assert len(mock_protocol(data).read_http_body(headers, None, "GET", 200, False)) == 5 - - # test no content length: limit > actual content - headers = Headers() - data = "testing" - assert len(mock_protocol(data).read_http_body(headers, 100, "GET", 200, False)) == 7 - - # test no content length: limit < actual content - data = "testing" - tutils.raises( - http.HttpError, - mock_protocol(data).read_http_body, - headers, 4, "GET", 200, False - ) - - # test chunked - headers = Headers() - headers["transfer-encoding"] = "chunked" - data = "5\r\naaaaa\r\n0\r\n\r\n" - assert mock_protocol(data).read_http_body(headers, 100, "GET", 200, False) == "aaaaa" - - -def test_expected_http_body_size(): - # gibber in the content-length field - headers = Headers(content_length="foo") - with tutils.raises(HttpSyntaxException): - expected_http_body_size(headers, False, "GET", 200) is None - # negative number in the content-length field - headers = Headers(content_length="-7") - with tutils.raises(HttpSyntaxException): - expected_http_body_size(headers, False, "GET", 200) is None - # explicit length - headers = Headers(content_length="5") - assert expected_http_body_size(headers, False, "GET", 200) == 5 - # no length - headers = Headers() - assert expected_http_body_size(headers, False, "GET", 200) == -1 - # no length request - headers = Headers() - assert expected_http_body_size(headers, True, "GET", None) == 0 - # expect header - headers = Headers(content_length="5", expect="100-continue") - assert expected_http_body_size(headers, True, "GET", None) == 0 - - -def test_parse_init_connect(): - assert _parse_authority_form(b"CONNECT host.com:443 HTTP/1.0") - tutils.raises(ValueError,_parse_authority_form, b"\0host.com:443") - tutils.raises(ValueError,_parse_authority_form, b"host.com:444444") - tutils.raises(ValueError,_parse_authority_form, b"CONNECT host.com443 HTTP/1.0") - tutils.raises(ValueError,_parse_authority_form, b"CONNECT host.com:foo HTTP/1.0") - - -def test_parse_init_proxy(): - u = b"GET http://foo.com:8888/test HTTP/1.1" - m, s, h, po, pa, httpversion = HTTP1Protocol._parse_absolute_form(u) - assert m == "GET" - assert s == "http" - assert h == "foo.com" - assert po == 8888 - assert pa == "/test" - assert httpversion == (1, 1) - - u = "G\xfeET http://foo.com:8888/test HTTP/1.1" - assert not HTTP1Protocol._parse_absolute_form(u) - - with tutils.raises(ValueError): - assert not HTTP1Protocol._parse_absolute_form("invalid") - with tutils.raises(ValueError): - assert not HTTP1Protocol._parse_absolute_form("GET invalid HTTP/1.1") - with tutils.raises(ValueError): - assert not HTTP1Protocol._parse_absolute_form("GET http://foo.com:8888/test foo/1.1") - - -def test_parse_init_http(): - u = "GET /test HTTP/1.1" - m, u, httpversion = HTTP1Protocol._parse_init_http(u) - assert m == "GET" - assert u == "/test" - assert httpversion == (1, 1) - - u = "G\xfeET /test HTTP/1.1" - assert not HTTP1Protocol._parse_init_http(u) - - assert not HTTP1Protocol._parse_init_http("invalid") - assert not HTTP1Protocol._parse_init_http("GET invalid HTTP/1.1") - assert not HTTP1Protocol._parse_init_http("GET /test foo/1.1") - assert not HTTP1Protocol._parse_init_http("GET /test\xc0 HTTP/1.1") - - -class TestReadHeaders: - - def _read(self, data, verbatim=False): - if not verbatim: - data = textwrap.dedent(data) - data = data.strip() - return mock_protocol(data).read_headers() - - def test_read_simple(self): - data = """ - Header: one - Header2: two - \r\n - """ - headers = self._read(data) - assert headers.fields == [["Header", "one"], ["Header2", "two"]] - - def test_read_multi(self): - data = """ - Header: one - Header: two - \r\n - """ - headers = self._read(data) - assert headers.fields == [["Header", "one"], ["Header", "two"]] - - def test_read_continued(self): - data = """ - Header: one - \ttwo - Header2: three - \r\n - """ - headers = self._read(data) - assert headers.fields == [["Header", "one\r\n two"], ["Header2", "three"]] - - def test_read_continued_err(self): - data = "\tfoo: bar\r\n" - assert self._read(data, True) is None - - def test_read_err(self): - data = """ - foo - """ - assert self._read(data) is None - - -class TestReadRequest(object): - - def tst(self, data, **kwargs): - return mock_protocol(data).read_request(**kwargs) - - def test_invalid(self): - tutils.raises( - "bad http request", - self.tst, - "xxx" - ) - tutils.raises( - "bad http request line", - self.tst, - "get /\xff HTTP/1.1" - ) - tutils.raises( - "invalid headers", - self.tst, - "get / HTTP/1.1\r\nfoo" - ) - tutils.raises( - HttpReadDisconnect, - self.tst, - "\r\n" - ) - - def test_asterisk_form_in(self): - v = self.tst("OPTIONS * HTTP/1.1") - assert v.form_in == "relative" - assert v.method == "OPTIONS" - - def test_absolute_form_in(self): - tutils.raises( - "Bad HTTP request line", - self.tst, - "GET oops-no-protocol.com HTTP/1.1" - ) - v = self.tst("GET http://address:22/ HTTP/1.1") - assert v.form_in == "absolute" - assert v.port == 22 - assert v.host == "address" - assert v.scheme == "http" - - def test_connect(self): - tutils.raises( - "Bad HTTP request line", - self.tst, - "CONNECT oops-no-port.com HTTP/1.1" - ) - v = self.tst("CONNECT foo.com:443 HTTP/1.1") - assert v.form_in == "authority" - assert v.method == "CONNECT" - assert v.port == 443 - assert v.host == "foo.com" - - def test_expect(self): - data = ( - b"GET / HTTP/1.1\r\n" - b"Content-Length: 3\r\n" - b"Expect: 100-continue\r\n" - b"\r\n" - b"foobar" - ) - - rfile = BytesIO(data) - r = read_request(rfile) - assert r.body == b"" - assert rfile.read(-1) == b"foobar" - - -class TestReadResponse(object): - def tst(self, data, method, body_size_limit, include_body=True): - data = textwrap.dedent(data) - return mock_protocol(data).read_response( - method, body_size_limit, include_body=include_body - ) - - def test_errors(self): - tutils.raises("server disconnect", self.tst, "", "GET", None) - tutils.raises("invalid server response", self.tst, "foo", "GET", None) - - def test_simple(self): - data = """ - HTTP/1.1 200 - """ - assert self.tst(data, "GET", None) == http.Response( - (1, 1), 200, '', Headers(), '' - ) - - def test_simple_message(self): - data = """ - HTTP/1.1 200 OK - """ - assert self.tst(data, "GET", None) == http.Response( - (1, 1), 200, 'OK', Headers(), '' - ) - - def test_invalid_http_version(self): - data = """ - HTTP/x 200 OK - """ - tutils.raises("invalid http version", self.tst, data, "GET", None) - - def test_invalid_status_code(self): - data = """ - HTTP/1.1 xx OK - """ - tutils.raises("invalid server response", self.tst, data, "GET", None) - - def test_valid_with_continue(self): - data = """ - HTTP/1.1 100 CONTINUE - - HTTP/1.1 200 OK - """ - assert self.tst(data, "GET", None) == http.Response( - (1, 1), 100, 'CONTINUE', Headers(), '' - ) - - def test_simple_body(self): - data = """ - HTTP/1.1 200 OK - Content-Length: 3 - - foo - """ - assert self.tst(data, "GET", None).body == 'foo' - assert self.tst(data, "HEAD", None).body == '' - - def test_invalid_headers(self): - data = """ - HTTP/1.1 200 OK - \tContent-Length: 3 - - foo - """ - tutils.raises("invalid headers", self.tst, data, "GET", None) - - def test_without_body(self): - data = """ - HTTP/1.1 200 OK - Content-Length: 3 - - foo - """ - assert self.tst(data, "GET", None, include_body=False).body is None - - -class TestReadResponseNoContentLength(tservers.ServerTestBase): - handler = NoContentLengthHTTPHandler - - def test_no_content_length(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - c.connect() - resp = HTTP1Protocol(c).read_response("GET", None) - assert resp.body == "bar\r\n\r\n" - - -class TestAssembleRequest(object): - def test_simple(self): - req = tutils.treq() - b = HTTP1Protocol().assemble_request(req) - assert b == match_http_string(""" - GET /path HTTP/1.1 - header: qvalue - Host: address:22 - Content-Length: 7 - - content""") - - def test_body_missing(self): - req = tutils.treq(content=semantics.CONTENT_MISSING) - tutils.raises(http.HttpError, HTTP1Protocol().assemble_request, req) - - def test_not_a_request(self): - tutils.raises(AssertionError, HTTP1Protocol().assemble_request, 'foo') - - -class TestAssembleResponse(object): - def test_simple(self): - resp = tutils.tresp() - b = HTTP1Protocol().assemble_response(resp) - assert b == match_http_string(""" - HTTP/1.1 200 OK - header_response: svalue - Content-Length: 7 - - message""") - - def test_body_missing(self): - resp = tutils.tresp(content=semantics.CONTENT_MISSING) - tutils.raises(http.HttpError, HTTP1Protocol().assemble_response, resp) - - def test_not_a_request(self): - tutils.raises(AssertionError, HTTP1Protocol().assemble_response, 'foo') diff --git a/test/http/http1/test_read.py b/test/http/http1/test_read.py new file mode 100644 index 00000000..5e6680af --- /dev/null +++ b/test/http/http1/test_read.py @@ -0,0 +1,313 @@ +from __future__ import absolute_import, print_function, division +from io import BytesIO +import textwrap + +from mock import Mock + +from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect +from netlib.http import Headers +from netlib.http.http1.read import ( + read_request, read_response, read_request_head, + read_response_head, read_body, connection_close, expected_http_body_size, _get_first_line, + _read_request_line, _parse_authority_form, _read_response_line, _check_http_version, + _read_headers, _read_chunked +) +from netlib.tutils import treq, tresp, raises + + +def test_read_request(): + rfile = BytesIO(b"GET / HTTP/1.1\r\n\r\nskip") + r = read_request(rfile) + assert r.method == b"GET" + assert r.body == b"" + assert r.timestamp_end + assert rfile.read() == b"skip" + + +def test_read_request_head(): + rfile = BytesIO( + b"GET / HTTP/1.1\r\n" + b"Content-Length: 4\r\n" + b"\r\n" + b"skip" + ) + rfile.reset_timestamps = Mock() + rfile.first_byte_timestamp = 42 + r = read_request_head(rfile) + assert r.method == b"GET" + assert r.headers["Content-Length"] == b"4" + assert r.body is None + assert rfile.reset_timestamps.called + assert r.timestamp_start == 42 + assert rfile.read() == b"skip" + + +def test_read_response(): + req = treq() + rfile = BytesIO(b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody") + r = read_response(rfile, req) + assert r.status_code == 418 + assert r.body == b"body" + assert r.timestamp_end + + +def test_read_response_head(): + rfile = BytesIO( + b"HTTP/1.1 418 I'm a teapot\r\n" + b"Content-Length: 4\r\n" + b"\r\n" + b"skip" + ) + rfile.reset_timestamps = Mock() + rfile.first_byte_timestamp = 42 + r = read_response_head(rfile) + assert r.status_code == 418 + assert r.headers["Content-Length"] == b"4" + assert r.body is None + assert rfile.reset_timestamps.called + assert r.timestamp_start == 42 + assert rfile.read() == b"skip" + + +class TestReadBody(object): + def test_chunked(self): + rfile = BytesIO(b"3\r\nfoo\r\n0\r\n\r\nbar") + body = b"".join(read_body(rfile, None)) + assert body == b"foo" + assert rfile.read() == b"bar" + + + def test_known_size(self): + rfile = BytesIO(b"foobar") + body = b"".join(read_body(rfile, 3)) + assert body == b"foo" + assert rfile.read() == b"bar" + + + def test_known_size_limit(self): + rfile = BytesIO(b"foobar") + with raises(HttpException): + b"".join(read_body(rfile, 3, 2)) + + def test_known_size_too_short(self): + rfile = BytesIO(b"foo") + with raises(HttpException): + b"".join(read_body(rfile, 6)) + + def test_unknown_size(self): + rfile = BytesIO(b"foobar") + body = b"".join(read_body(rfile, -1)) + assert body == b"foobar" + + + def test_unknown_size_limit(self): + rfile = BytesIO(b"foobar") + with raises(HttpException): + b"".join(read_body(rfile, -1, 3)) + + +def test_connection_close(): + headers = Headers() + assert connection_close((1, 0), headers) + assert not connection_close((1, 1), headers) + + headers["connection"] = "keep-alive" + assert not connection_close((1, 1), headers) + + headers["connection"] = "close" + assert connection_close((1, 1), headers) + + +def test_expected_http_body_size(): + # Expect: 100-continue + assert expected_http_body_size( + treq(headers=Headers(expect=b"100-continue", content_length=b"42")) + ) == 0 + + # http://tools.ietf.org/html/rfc7230#section-3.3 + assert expected_http_body_size( + treq(method=b"HEAD"), + tresp(headers=Headers(content_length=b"42")) + ) == 0 + assert expected_http_body_size( + treq(method=b"CONNECT"), + tresp() + ) == 0 + for code in (100, 204, 304): + assert expected_http_body_size( + treq(), + tresp(status_code=code) + ) == 0 + + # chunked + assert expected_http_body_size( + treq(headers=Headers(transfer_encoding=b"chunked")), + ) is None + + # explicit length + for l in (b"foo", b"-7"): + with raises(HttpSyntaxException): + expected_http_body_size( + treq(headers=Headers(content_length=l)) + ) + assert expected_http_body_size( + treq(headers=Headers(content_length=b"42")) + ) == 42 + + # no length + assert expected_http_body_size( + treq() + ) == 0 + assert expected_http_body_size( + treq(), tresp() + ) == -1 + + +def test_get_first_line(): + rfile = BytesIO(b"foo\r\nbar") + assert _get_first_line(rfile) == b"foo" + + rfile = BytesIO(b"\r\nfoo\r\nbar") + assert _get_first_line(rfile) == b"foo" + + with raises(HttpReadDisconnect): + rfile = BytesIO(b"") + _get_first_line(rfile) + + with raises(HttpSyntaxException): + rfile = BytesIO(b"GET /\xff HTTP/1.1") + _get_first_line(rfile) + + +def test_read_request_line(): + def t(b): + return _read_request_line(BytesIO(b)) + + assert (t(b"GET / HTTP/1.1") == + ("relative", b"GET", None, None, None, b"/", b"HTTP/1.1")) + assert (t(b"OPTIONS * HTTP/1.1") == + ("relative", b"OPTIONS", None, None, None, b"*", b"HTTP/1.1")) + assert (t(b"CONNECT foo:42 HTTP/1.1") == + ("authority", b"CONNECT", None, b"foo", 42, None, b"HTTP/1.1")) + assert (t(b"GET http://foo:42/bar HTTP/1.1") == + ("absolute", b"GET", b"http", b"foo", 42, b"/bar", b"HTTP/1.1")) + + with raises(HttpSyntaxException): + t(b"GET / WTF/1.1") + with raises(HttpSyntaxException): + t(b"this is not http") + + +def test_parse_authority_form(): + assert _parse_authority_form(b"foo:42") == (b"foo", 42) + with raises(HttpSyntaxException): + _parse_authority_form(b"foo") + with raises(HttpSyntaxException): + _parse_authority_form(b"foo:bar") + with raises(HttpSyntaxException): + _parse_authority_form(b"foo:99999999") + with raises(HttpSyntaxException): + _parse_authority_form(b"f\x00oo:80") + + +def test_read_response_line(): + def t(b): + return _read_response_line(BytesIO(b)) + + assert t(b"HTTP/1.1 200 OK") == (b"HTTP/1.1", 200, b"OK") + assert t(b"HTTP/1.1 200") == (b"HTTP/1.1", 200, b"") + with raises(HttpSyntaxException): + assert t(b"HTTP/1.1") + + with raises(HttpSyntaxException): + t(b"HTTP/1.1 OK OK") + with raises(HttpSyntaxException): + t(b"WTF/1.1 200 OK") + + +def test_check_http_version(): + _check_http_version(b"HTTP/0.9") + _check_http_version(b"HTTP/1.0") + _check_http_version(b"HTTP/1.1") + _check_http_version(b"HTTP/2.0") + with raises(HttpSyntaxException): + _check_http_version(b"WTF/1.0") + with raises(HttpSyntaxException): + _check_http_version(b"HTTP/1.10") + with raises(HttpSyntaxException): + _check_http_version(b"HTTP/1.b") + + +class TestReadHeaders(object): + @staticmethod + def _read(data): + return _read_headers(BytesIO(data)) + + def test_read_simple(self): + data = ( + b"Header: one\r\n" + b"Header2: two\r\n" + b"\r\n" + ) + headers = self._read(data) + assert headers.fields == [[b"Header", b"one"], [b"Header2", b"two"]] + + def test_read_multi(self): + data = ( + b"Header: one\r\n" + b"Header: two\r\n" + b"\r\n" + ) + headers = self._read(data) + assert headers.fields == [[b"Header", b"one"], [b"Header", b"two"]] + + def test_read_continued(self): + data = ( + b"Header: one\r\n" + b"\ttwo\r\n" + b"Header2: three\r\n" + b"\r\n" + ) + headers = self._read(data) + assert headers.fields == [[b"Header", b"one\r\n two"], [b"Header2", b"three"]] + + def test_read_continued_err(self): + data = b"\tfoo: bar\r\n" + with raises(HttpSyntaxException): + self._read(data) + + def test_read_err(self): + data = b"foo" + with raises(HttpSyntaxException): + self._read(data) + + +def test_read_chunked(): + req = treq(body=None) + req.headers["Transfer-Encoding"] = "chunked" + + data = b"1\r\na\r\n0\r\n" + with raises(HttpSyntaxException): + b"".join(_read_chunked(BytesIO(data))) + + data = b"1\r\na\r\n0\r\n\r\n" + assert b"".join(_read_chunked(BytesIO(data))) == b"a" + + data = b"\r\n\r\n1\r\na\r\n1\r\nb\r\n0\r\n\r\n" + assert b"".join(_read_chunked(BytesIO(data))) == b"ab" + + data = b"\r\n" + with raises("closed prematurely"): + b"".join(_read_chunked(BytesIO(data))) + + data = b"1\r\nfoo" + with raises("malformed chunked body"): + b"".join(_read_chunked(BytesIO(data))) + + data = b"foo\r\nfoo" + with raises(HttpSyntaxException): + b"".join(_read_chunked(BytesIO(data))) + + data = b"5\r\naaaaa\r\n0\r\n\r\n" + with raises("too large"): + b"".join(_read_chunked(BytesIO(data), limit=2)) diff --git a/test/http/http2/test_frames.py b/test/http/http2/test_frames.py index efdb55e2..4c89b023 100644 --- a/test/http/http2/test_frames.py +++ b/test/http/http2/test_frames.py @@ -39,7 +39,7 @@ def test_too_large_frames(): flags=Frame.FLAG_END_STREAM, stream_id=0x1234567, payload='foobar' * 3000) - tutils.raises(FrameSizeError, f.to_bytes) + tutils.raises(HttpSyntaxException, f.to_bytes) def test_data_frame_to_bytes(): diff --git a/test/http/http2/test_protocol.py b/test/http/http2/test_protocol.py index 2b7d7958..789b6e63 100644 --- a/test/http/http2/test_protocol.py +++ b/test/http/http2/test_protocol.py @@ -2,21 +2,21 @@ import OpenSSL import mock from netlib import tcp, http, tutils -from netlib.http import http2, Headers -from netlib.http.http2 import HTTP2Protocol +from netlib.http import Headers +from netlib.http.http2.connections import HTTP2Protocol, TCPHandler from netlib.http.http2.frame import * from ... import tservers class TestTCPHandlerWrapper: def test_wrapped(self): - h = http2.TCPHandler(rfile='foo', wfile='bar') + h = TCPHandler(rfile='foo', wfile='bar') p = HTTP2Protocol(h) assert p.tcp_handler.rfile == 'foo' assert p.tcp_handler.wfile == 'bar' def test_direct(self): p = HTTP2Protocol(rfile='foo', wfile='bar') - assert isinstance(p.tcp_handler, http2.TCPHandler) + assert isinstance(p.tcp_handler, TCPHandler) assert p.tcp_handler.rfile == 'foo' assert p.tcp_handler.wfile == 'bar' @@ -32,8 +32,8 @@ class EchoHandler(tcp.BaseHandler): class TestProtocol: - @mock.patch("netlib.http.http2.HTTP2Protocol.perform_server_connection_preface") - @mock.patch("netlib.http.http2.HTTP2Protocol.perform_client_connection_preface") + @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_server_connection_preface") + @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_client_connection_preface") def test_perform_connection_preface(self, mock_client_method, mock_server_method): protocol = HTTP2Protocol(is_server=False) protocol.connection_preface_performed = True @@ -46,8 +46,8 @@ class TestProtocol: assert mock_client_method.called assert not mock_server_method.called - @mock.patch("netlib.http.http2.HTTP2Protocol.perform_server_connection_preface") - @mock.patch("netlib.http.http2.HTTP2Protocol.perform_client_connection_preface") + @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_server_connection_preface") + @mock.patch("netlib.http.http2.connections.HTTP2Protocol.perform_client_connection_preface") def test_perform_connection_preface_server(self, mock_client_method, mock_server_method): protocol = HTTP2Protocol(is_server=True) protocol.connection_preface_performed = True diff --git a/test/http/test_exceptions.py b/test/http/test_exceptions.py deleted file mode 100644 index 49588d0a..00000000 --- a/test/http/test_exceptions.py +++ /dev/null @@ -1,6 +0,0 @@ -from netlib.http.exceptions import * - -class TestHttpError: - def test_simple(self): - e = HttpError(404, "Not found") - assert str(e) diff --git a/test/http/test_models.py b/test/http/test_models.py new file mode 100644 index 00000000..0f4dcc3b --- /dev/null +++ b/test/http/test_models.py @@ -0,0 +1,540 @@ +import mock + +from netlib import tutils +from netlib import utils +from netlib.odict import ODict, ODictCaseless +from netlib.http import Request, Response, Headers, CONTENT_MISSING, HDR_FORM_URLENCODED, \ + HDR_FORM_MULTIPART + + +class TestRequest(object): + def test_repr(self): + r = tutils.treq() + assert repr(r) + + def test_headers(self): + tutils.raises(AssertionError, Request, + 'form_in', + 'method', + 'scheme', + 'host', + 'port', + 'path', + (1, 1), + 'foobar', + ) + + req = Request( + 'form_in', + 'method', + 'scheme', + 'host', + 'port', + 'path', + (1, 1), + ) + assert isinstance(req.headers, Headers) + + def test_equal(self): + a = tutils.treq() + b = tutils.treq() + assert a == b + + assert not a == 'foo' + assert not b == 'foo' + assert not 'foo' == a + assert not 'foo' == b + + + def test_anticache(self): + req = tutils.treq() + req.headers["If-Modified-Since"] = "foo" + req.headers["If-None-Match"] = "bar" + req.anticache() + assert "If-Modified-Since" not in req.headers + assert "If-None-Match" not in req.headers + + def test_anticomp(self): + req = tutils.treq() + req.headers["Accept-Encoding"] = "foobar" + req.anticomp() + assert req.headers["Accept-Encoding"] == "identity" + + def test_constrain_encoding(self): + req = tutils.treq() + req.headers["Accept-Encoding"] = "identity, gzip, foo" + req.constrain_encoding() + assert "foo" not in req.headers["Accept-Encoding"] + + def test_update_host(self): + req = tutils.treq() + req.headers["Host"] = "" + req.host = "foobar" + req.update_host_header() + assert req.headers["Host"] == "foobar" + + def test_get_form(self): + req = tutils.treq() + assert req.get_form() == ODict() + + @mock.patch("netlib.http.Request.get_form_multipart") + @mock.patch("netlib.http.Request.get_form_urlencoded") + def test_get_form_with_url_encoded(self, mock_method_urlencoded, mock_method_multipart): + req = tutils.treq() + assert req.get_form() == ODict() + + req = tutils.treq() + req.body = "foobar" + req.headers["Content-Type"] = HDR_FORM_URLENCODED + req.get_form() + assert req.get_form_urlencoded.called + assert not req.get_form_multipart.called + + @mock.patch("netlib.http.Request.get_form_multipart") + @mock.patch("netlib.http.Request.get_form_urlencoded") + def test_get_form_with_multipart(self, mock_method_urlencoded, mock_method_multipart): + req = tutils.treq() + req.body = "foobar" + req.headers["Content-Type"] = HDR_FORM_MULTIPART + req.get_form() + assert not req.get_form_urlencoded.called + assert req.get_form_multipart.called + + def test_get_form_urlencoded(self): + req = tutils.treq(body="foobar") + assert req.get_form_urlencoded() == ODict() + + req.headers["Content-Type"] = HDR_FORM_URLENCODED + assert req.get_form_urlencoded() == ODict(utils.urldecode(req.body)) + + def test_get_form_multipart(self): + req = tutils.treq(body="foobar") + assert req.get_form_multipart() == ODict() + + req.headers["Content-Type"] = HDR_FORM_MULTIPART + assert req.get_form_multipart() == ODict( + utils.multipartdecode( + req.headers, + req.body + ) + ) + + def test_set_form_urlencoded(self): + req = tutils.treq() + req.set_form_urlencoded(ODict([('foo', 'bar'), ('rab', 'oof')])) + assert req.headers["Content-Type"] == HDR_FORM_URLENCODED + assert req.body + + def test_get_path_components(self): + req = tutils.treq() + assert req.get_path_components() + # TODO: add meaningful assertions + + def test_set_path_components(self): + req = tutils.treq() + req.set_path_components(["foo", "bar"]) + # TODO: add meaningful assertions + + def test_get_query(self): + req = tutils.treq() + assert req.get_query().lst == [] + + req.url = "http://localhost:80/foo?bar=42" + assert req.get_query().lst == [("bar", "42")] + + def test_set_query(self): + req = tutils.treq() + req.set_query(ODict([])) + + def test_pretty_host(self): + r = tutils.treq() + assert r.pretty_host(True) == "address" + assert r.pretty_host(False) == "address" + r.headers["host"] = "other" + assert r.pretty_host(True) == "other" + assert r.pretty_host(False) == "address" + r.host = None + assert r.pretty_host(True) == "other" + assert r.pretty_host(False) is None + del r.headers["host"] + assert r.pretty_host(True) is None + assert r.pretty_host(False) is None + + # Invalid IDNA + r.headers["host"] = ".disqus.com" + assert r.pretty_host(True) == ".disqus.com" + + def test_pretty_url(self): + req = tutils.treq() + req.form_out = "authority" + assert req.pretty_url(True) == "address:22" + assert req.pretty_url(False) == "address:22" + + req.form_out = "relative" + assert req.pretty_url(True) == "http://address:22/path" + assert req.pretty_url(False) == "http://address:22/path" + + def test_get_cookies_none(self): + headers = Headers() + r = tutils.treq() + r.headers = headers + assert len(r.get_cookies()) == 0 + + def test_get_cookies_single(self): + r = tutils.treq() + r.headers = Headers(cookie="cookiename=cookievalue") + result = r.get_cookies() + assert len(result) == 1 + assert result['cookiename'] == ['cookievalue'] + + def test_get_cookies_double(self): + r = tutils.treq() + r.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue") + result = r.get_cookies() + assert len(result) == 2 + assert result['cookiename'] == ['cookievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_get_cookies_withequalsign(self): + r = tutils.treq() + r.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue") + result = r.get_cookies() + assert len(result) == 2 + assert result['cookiename'] == ['coo=kievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_set_cookies(self): + r = tutils.treq() + r.headers = Headers(cookie="cookiename=cookievalue") + result = r.get_cookies() + result["cookiename"] = ["foo"] + r.set_cookies(result) + assert r.get_cookies()["cookiename"] == ["foo"] + + def test_set_url(self): + r = tutils.treq(form_in="absolute") + r.url = "https://otheraddress:42/ORLY" + assert r.scheme == "https" + assert r.host == "otheraddress" + assert r.port == 42 + assert r.path == "/ORLY" + + try: + r.url = "//localhost:80/foo@bar" + assert False + except: + assert True + + # def test_asterisk_form_in(self): + # f = tutils.tflow(req=None) + # protocol = mock_protocol("OPTIONS * HTTP/1.1") + # f.request = HTTPRequest.from_protocol(protocol) + # + # assert f.request.form_in == "relative" + # f.request.host = f.server_conn.address.host + # f.request.port = f.server_conn.address.port + # f.request.scheme = "http" + # assert protocol.assemble(f.request) == ( + # "OPTIONS * HTTP/1.1\r\n" + # "Host: address:22\r\n" + # "Content-Length: 0\r\n\r\n") + # + # def test_relative_form_in(self): + # protocol = mock_protocol("GET /foo\xff HTTP/1.1") + # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) + # + # protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c") + # r = HTTPRequest.from_protocol(protocol) + # assert r.headers["Upgrade"] == ["h2c"] + # + # def test_expect_header(self): + # protocol = mock_protocol( + # "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar") + # r = HTTPRequest.from_protocol(protocol) + # assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n" + # assert r.content == "foo" + # assert protocol.tcp_handler.rfile.read(3) == "bar" + # + # def test_authority_form_in(self): + # protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1") + # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) + # + # protocol = mock_protocol("CONNECT address:22 HTTP/1.1") + # r = HTTPRequest.from_protocol(protocol) + # r.scheme, r.host, r.port = "http", "address", 22 + # assert protocol.assemble(r) == ( + # "CONNECT address:22 HTTP/1.1\r\n" + # "Host: address:22\r\n" + # "Content-Length: 0\r\n\r\n") + # assert r.pretty_url(False) == "address:22" + # + # def test_absolute_form_in(self): + # protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1") + # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) + # + # protocol = mock_protocol("GET http://address:22/ HTTP/1.1") + # r = HTTPRequest.from_protocol(protocol) + # assert protocol.assemble(r) == ( + # "GET http://address:22/ HTTP/1.1\r\n" + # "Host: address:22\r\n" + # "Content-Length: 0\r\n\r\n") + # + # def test_http_options_relative_form_in(self): + # """ + # Exercises fix for Issue #392. + # """ + # protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1") + # r = HTTPRequest.from_protocol(protocol) + # r.host = 'address' + # r.port = 80 + # r.scheme = "http" + # assert protocol.assemble(r) == ( + # "OPTIONS /secret/resource HTTP/1.1\r\n" + # "Host: address\r\n" + # "Content-Length: 0\r\n\r\n") + # + # def test_http_options_absolute_form_in(self): + # protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1") + # r = HTTPRequest.from_protocol(protocol) + # r.host = 'address' + # r.port = 80 + # r.scheme = "http" + # assert protocol.assemble(r) == ( + # "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n" + # "Host: address\r\n" + # "Content-Length: 0\r\n\r\n") + +class TestResponse(object): + def test_headers(self): + tutils.raises(AssertionError, Response, + (1, 1), + 200, + headers='foobar', + ) + + resp = Response( + (1, 1), + 200, + ) + assert isinstance(resp.headers, Headers) + + def test_equal(self): + a = tutils.tresp() + b = tutils.tresp() + assert a == b + + assert not a == 'foo' + assert not b == 'foo' + assert not 'foo' == a + assert not 'foo' == b + + def test_repr(self): + r = tutils.tresp() + assert "unknown content type" in repr(r) + r.headers["content-type"] = "foo" + assert "foo" in repr(r) + assert repr(tutils.tresp(body=CONTENT_MISSING)) + + def test_get_cookies_none(self): + resp = tutils.tresp() + resp.headers = Headers() + assert not resp.get_cookies() + + def test_get_cookies_simple(self): + resp = tutils.tresp() + resp.headers = Headers(set_cookie="cookiename=cookievalue") + result = resp.get_cookies() + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0] == ["cookievalue", ODict()] + + def test_get_cookies_with_parameters(self): + resp = tutils.tresp() + resp.headers = Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly") + result = resp.get_cookies() + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0][0] == "cookievalue" + attrs = result["cookiename"][0][1] + assert len(attrs) == 4 + assert attrs["domain"] == ["example.com"] + assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"] + assert attrs["path"] == ["/"] + assert attrs["httponly"] == [None] + + def test_get_cookies_no_value(self): + resp = tutils.tresp() + resp.headers = Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/") + result = resp.get_cookies() + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0][0] == "" + assert len(result["cookiename"][0][1]) == 2 + + def test_get_cookies_twocookies(self): + resp = tutils.tresp() + resp.headers = Headers([ + ["Set-Cookie", "cookiename=cookievalue"], + ["Set-Cookie", "othercookie=othervalue"] + ]) + result = resp.get_cookies() + assert len(result) == 2 + assert "cookiename" in result + assert result["cookiename"][0] == ["cookievalue", ODict()] + assert "othercookie" in result + assert result["othercookie"][0] == ["othervalue", ODict()] + + def test_set_cookies(self): + resp = tutils.tresp() + v = resp.get_cookies() + v.add("foo", ["bar", ODictCaseless()]) + resp.set_cookies(v) + + v = resp.get_cookies() + assert len(v) == 1 + assert v["foo"] == [["bar", ODictCaseless()]] + + +class TestHeaders(object): + def _2host(self): + return Headers( + [ + ["Host", "example.com"], + ["host", "example.org"] + ] + ) + + def test_init(self): + headers = Headers() + assert len(headers) == 0 + + headers = Headers([["Host", "example.com"]]) + assert len(headers) == 1 + assert headers["Host"] == "example.com" + + headers = Headers(Host="example.com") + assert len(headers) == 1 + assert headers["Host"] == "example.com" + + headers = Headers( + [["Host", "invalid"]], + Host="example.com" + ) + assert len(headers) == 1 + assert headers["Host"] == "example.com" + + headers = Headers( + [["Host", "invalid"], ["Accept", "text/plain"]], + Host="example.com" + ) + assert len(headers) == 2 + assert headers["Host"] == "example.com" + assert headers["Accept"] == "text/plain" + + def test_getitem(self): + headers = Headers(Host="example.com") + assert headers["Host"] == "example.com" + assert headers["host"] == "example.com" + tutils.raises(KeyError, headers.__getitem__, "Accept") + + headers = self._2host() + assert headers["Host"] == "example.com, example.org" + + def test_str(self): + headers = Headers(Host="example.com") + assert bytes(headers) == "Host: example.com\r\n" + + headers = Headers([ + ["Host", "example.com"], + ["Accept", "text/plain"] + ]) + assert str(headers) == "Host: example.com\r\nAccept: text/plain\r\n" + + def test_setitem(self): + headers = Headers() + headers["Host"] = "example.com" + assert "Host" in headers + assert "host" in headers + assert headers["Host"] == "example.com" + + headers["host"] = "example.org" + assert "Host" in headers + assert "host" in headers + assert headers["Host"] == "example.org" + + headers["accept"] = "text/plain" + assert len(headers) == 2 + assert "Accept" in headers + assert "Host" in headers + + headers = self._2host() + assert len(headers.fields) == 2 + headers["Host"] = "example.com" + assert len(headers.fields) == 1 + assert "Host" in headers + + def test_delitem(self): + headers = Headers(Host="example.com") + assert len(headers) == 1 + del headers["host"] + assert len(headers) == 0 + try: + del headers["host"] + except KeyError: + assert True + else: + assert False + + headers = self._2host() + del headers["Host"] + assert len(headers) == 0 + + def test_keys(self): + headers = Headers(Host="example.com") + assert len(headers.keys()) == 1 + assert headers.keys()[0] == "Host" + + headers = self._2host() + assert len(headers.keys()) == 1 + assert headers.keys()[0] == "Host" + + def test_eq_ne(self): + headers1 = Headers(Host="example.com") + headers2 = Headers(host="example.com") + assert not (headers1 == headers2) + assert headers1 != headers2 + + headers1 = Headers(Host="example.com") + headers2 = Headers(Host="example.com") + assert headers1 == headers2 + assert not (headers1 != headers2) + + assert headers1 != 42 + + def test_get_all(self): + headers = self._2host() + assert headers.get_all("host") == ["example.com", "example.org"] + assert headers.get_all("accept") == [] + + def test_set_all(self): + headers = Headers(Host="example.com") + headers.set_all("Accept", ["text/plain"]) + assert len(headers) == 2 + assert "accept" in headers + + headers = self._2host() + headers.set_all("Host", ["example.org"]) + assert headers["host"] == "example.org" + + headers.set_all("Host", ["example.org", "example.net"]) + assert headers["host"] == "example.org, example.net" + + def test_state(self): + headers = self._2host() + assert len(headers.get_state()) == 2 + assert headers == Headers.from_state(headers.get_state()) + + headers2 = Headers() + assert headers != headers2 + headers2.load_state(headers.get_state()) + assert headers == headers2 diff --git a/test/http/test_semantics.py b/test/http/test_semantics.py deleted file mode 100644 index 44d3c85e..00000000 --- a/test/http/test_semantics.py +++ /dev/null @@ -1,573 +0,0 @@ -import mock - -from netlib import http -from netlib import odict -from netlib import tutils -from netlib import utils -from netlib.http import semantics -from netlib.http.semantics import CONTENT_MISSING - -class TestProtocolMixin(object): - @mock.patch("netlib.http.semantics.ProtocolMixin.assemble_response") - @mock.patch("netlib.http.semantics.ProtocolMixin.assemble_request") - def test_assemble_request(self, mock_request_method, mock_response_method): - p = semantics.ProtocolMixin() - p.assemble(tutils.treq()) - assert mock_request_method.called - assert not mock_response_method.called - - @mock.patch("netlib.http.semantics.ProtocolMixin.assemble_response") - @mock.patch("netlib.http.semantics.ProtocolMixin.assemble_request") - def test_assemble_response(self, mock_request_method, mock_response_method): - p = semantics.ProtocolMixin() - p.assemble(tutils.tresp()) - assert not mock_request_method.called - assert mock_response_method.called - - def test_assemble_foo(self): - p = semantics.ProtocolMixin() - tutils.raises(ValueError, p.assemble, 'foo') - -class TestRequest(object): - def test_repr(self): - r = tutils.treq() - assert repr(r) - - def test_headers(self): - tutils.raises(AssertionError, semantics.Request, - 'form_in', - 'method', - 'scheme', - 'host', - 'port', - 'path', - (1, 1), - 'foobar', - ) - - req = semantics.Request( - 'form_in', - 'method', - 'scheme', - 'host', - 'port', - 'path', - (1, 1), - ) - assert isinstance(req.headers, http.Headers) - - def test_equal(self): - a = tutils.treq() - b = tutils.treq() - assert a == b - - assert not a == 'foo' - assert not b == 'foo' - assert not 'foo' == a - assert not 'foo' == b - - def test_legacy_first_line(self): - req = tutils.treq() - - assert req.legacy_first_line('relative') == "GET /path HTTP/1.1" - assert req.legacy_first_line('authority') == "GET address:22 HTTP/1.1" - assert req.legacy_first_line('absolute') == "GET http://address:22/path HTTP/1.1" - tutils.raises(http.HttpError, req.legacy_first_line, 'foobar') - - def test_anticache(self): - req = tutils.treq() - req.headers["If-Modified-Since"] = "foo" - req.headers["If-None-Match"] = "bar" - req.anticache() - assert "If-Modified-Since" not in req.headers - assert "If-None-Match" not in req.headers - - def test_anticomp(self): - req = tutils.treq() - req.headers["Accept-Encoding"] = "foobar" - req.anticomp() - assert req.headers["Accept-Encoding"] == "identity" - - def test_constrain_encoding(self): - req = tutils.treq() - req.headers["Accept-Encoding"] = "identity, gzip, foo" - req.constrain_encoding() - assert "foo" not in req.headers["Accept-Encoding"] - - def test_update_host(self): - req = tutils.treq() - req.headers["Host"] = "" - req.host = "foobar" - req.update_host_header() - assert req.headers["Host"] == "foobar" - - def test_get_form(self): - req = tutils.treq() - assert req.get_form() == odict.ODict() - - @mock.patch("netlib.http.semantics.Request.get_form_multipart") - @mock.patch("netlib.http.semantics.Request.get_form_urlencoded") - def test_get_form_with_url_encoded(self, mock_method_urlencoded, mock_method_multipart): - req = tutils.treq() - assert req.get_form() == odict.ODict() - - req = tutils.treq() - req.body = "foobar" - req.headers["Content-Type"] = semantics.HDR_FORM_URLENCODED - req.get_form() - assert req.get_form_urlencoded.called - assert not req.get_form_multipart.called - - @mock.patch("netlib.http.semantics.Request.get_form_multipart") - @mock.patch("netlib.http.semantics.Request.get_form_urlencoded") - def test_get_form_with_multipart(self, mock_method_urlencoded, mock_method_multipart): - req = tutils.treq() - req.body = "foobar" - req.headers["Content-Type"] = semantics.HDR_FORM_MULTIPART - req.get_form() - assert not req.get_form_urlencoded.called - assert req.get_form_multipart.called - - def test_get_form_urlencoded(self): - req = tutils.treq("foobar") - assert req.get_form_urlencoded() == odict.ODict() - - req.headers["Content-Type"] = semantics.HDR_FORM_URLENCODED - assert req.get_form_urlencoded() == odict.ODict(utils.urldecode(req.body)) - - def test_get_form_multipart(self): - req = tutils.treq("foobar") - assert req.get_form_multipart() == odict.ODict() - - req.headers["Content-Type"] = semantics.HDR_FORM_MULTIPART - assert req.get_form_multipart() == odict.ODict( - utils.multipartdecode( - req.headers, - req.body - ) - ) - - def test_set_form_urlencoded(self): - req = tutils.treq() - req.set_form_urlencoded(odict.ODict([('foo', 'bar'), ('rab', 'oof')])) - assert req.headers["Content-Type"] == semantics.HDR_FORM_URLENCODED - assert req.body - - def test_get_path_components(self): - req = tutils.treq() - assert req.get_path_components() - # TODO: add meaningful assertions - - def test_set_path_components(self): - req = tutils.treq() - req.set_path_components(["foo", "bar"]) - # TODO: add meaningful assertions - - def test_get_query(self): - req = tutils.treq() - assert req.get_query().lst == [] - - req.url = "http://localhost:80/foo?bar=42" - assert req.get_query().lst == [("bar", "42")] - - def test_set_query(self): - req = tutils.treq() - req.set_query(odict.ODict([])) - - def test_pretty_host(self): - r = tutils.treq() - assert r.pretty_host(True) == "address" - assert r.pretty_host(False) == "address" - r.headers["host"] = "other" - assert r.pretty_host(True) == "other" - assert r.pretty_host(False) == "address" - r.host = None - assert r.pretty_host(True) == "other" - assert r.pretty_host(False) is None - del r.headers["host"] - assert r.pretty_host(True) is None - assert r.pretty_host(False) is None - - # Invalid IDNA - r.headers["host"] = ".disqus.com" - assert r.pretty_host(True) == ".disqus.com" - - def test_pretty_url(self): - req = tutils.treq() - req.form_out = "authority" - assert req.pretty_url(True) == "address:22" - assert req.pretty_url(False) == "address:22" - - req.form_out = "relative" - assert req.pretty_url(True) == "http://address:22/path" - assert req.pretty_url(False) == "http://address:22/path" - - def test_get_cookies_none(self): - headers = http.Headers() - r = tutils.treq() - r.headers = headers - assert len(r.get_cookies()) == 0 - - def test_get_cookies_single(self): - r = tutils.treq() - r.headers = http.Headers(cookie="cookiename=cookievalue") - result = r.get_cookies() - assert len(result) == 1 - assert result['cookiename'] == ['cookievalue'] - - def test_get_cookies_double(self): - r = tutils.treq() - r.headers = http.Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue") - result = r.get_cookies() - assert len(result) == 2 - assert result['cookiename'] == ['cookievalue'] - assert result['othercookiename'] == ['othercookievalue'] - - def test_get_cookies_withequalsign(self): - r = tutils.treq() - r.headers = http.Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue") - result = r.get_cookies() - assert len(result) == 2 - assert result['cookiename'] == ['coo=kievalue'] - assert result['othercookiename'] == ['othercookievalue'] - - def test_set_cookies(self): - r = tutils.treq() - r.headers = http.Headers(cookie="cookiename=cookievalue") - result = r.get_cookies() - result["cookiename"] = ["foo"] - r.set_cookies(result) - assert r.get_cookies()["cookiename"] == ["foo"] - - def test_set_url(self): - r = tutils.treq_absolute() - r.url = "https://otheraddress:42/ORLY" - assert r.scheme == "https" - assert r.host == "otheraddress" - assert r.port == 42 - assert r.path == "/ORLY" - - try: - r.url = "//localhost:80/foo@bar" - assert False - except: - assert True - - # def test_asterisk_form_in(self): - # f = tutils.tflow(req=None) - # protocol = mock_protocol("OPTIONS * HTTP/1.1") - # f.request = HTTPRequest.from_protocol(protocol) - # - # assert f.request.form_in == "relative" - # f.request.host = f.server_conn.address.host - # f.request.port = f.server_conn.address.port - # f.request.scheme = "http" - # assert protocol.assemble(f.request) == ( - # "OPTIONS * HTTP/1.1\r\n" - # "Host: address:22\r\n" - # "Content-Length: 0\r\n\r\n") - # - # def test_relative_form_in(self): - # protocol = mock_protocol("GET /foo\xff HTTP/1.1") - # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) - # - # protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c") - # r = HTTPRequest.from_protocol(protocol) - # assert r.headers["Upgrade"] == ["h2c"] - # - # def test_expect_header(self): - # protocol = mock_protocol( - # "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar") - # r = HTTPRequest.from_protocol(protocol) - # assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n" - # assert r.content == "foo" - # assert protocol.tcp_handler.rfile.read(3) == "bar" - # - # def test_authority_form_in(self): - # protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1") - # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) - # - # protocol = mock_protocol("CONNECT address:22 HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # r.scheme, r.host, r.port = "http", "address", 22 - # assert protocol.assemble(r) == ( - # "CONNECT address:22 HTTP/1.1\r\n" - # "Host: address:22\r\n" - # "Content-Length: 0\r\n\r\n") - # assert r.pretty_url(False) == "address:22" - # - # def test_absolute_form_in(self): - # protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1") - # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) - # - # protocol = mock_protocol("GET http://address:22/ HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # assert protocol.assemble(r) == ( - # "GET http://address:22/ HTTP/1.1\r\n" - # "Host: address:22\r\n" - # "Content-Length: 0\r\n\r\n") - # - # def test_http_options_relative_form_in(self): - # """ - # Exercises fix for Issue #392. - # """ - # protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # r.host = 'address' - # r.port = 80 - # r.scheme = "http" - # assert protocol.assemble(r) == ( - # "OPTIONS /secret/resource HTTP/1.1\r\n" - # "Host: address\r\n" - # "Content-Length: 0\r\n\r\n") - # - # def test_http_options_absolute_form_in(self): - # protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # r.host = 'address' - # r.port = 80 - # r.scheme = "http" - # assert protocol.assemble(r) == ( - # "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n" - # "Host: address\r\n" - # "Content-Length: 0\r\n\r\n") - -class TestEmptyRequest(object): - def test_init(self): - req = semantics.EmptyRequest() - assert req - -class TestResponse(object): - def test_headers(self): - tutils.raises(AssertionError, semantics.Response, - (1, 1), - 200, - headers='foobar', - ) - - resp = semantics.Response( - (1, 1), - 200, - ) - assert isinstance(resp.headers, http.Headers) - - def test_equal(self): - a = tutils.tresp() - b = tutils.tresp() - assert a == b - - assert not a == 'foo' - assert not b == 'foo' - assert not 'foo' == a - assert not 'foo' == b - - def test_repr(self): - r = tutils.tresp() - assert "unknown content type" in repr(r) - r.headers["content-type"] = "foo" - assert "foo" in repr(r) - assert repr(tutils.tresp(content=CONTENT_MISSING)) - - def test_get_cookies_none(self): - resp = tutils.tresp() - resp.headers = http.Headers() - assert not resp.get_cookies() - - def test_get_cookies_simple(self): - resp = tutils.tresp() - resp.headers = http.Headers(set_cookie="cookiename=cookievalue") - result = resp.get_cookies() - assert len(result) == 1 - assert "cookiename" in result - assert result["cookiename"][0] == ["cookievalue", odict.ODict()] - - def test_get_cookies_with_parameters(self): - resp = tutils.tresp() - resp.headers = http.Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly") - result = resp.get_cookies() - assert len(result) == 1 - assert "cookiename" in result - assert result["cookiename"][0][0] == "cookievalue" - attrs = result["cookiename"][0][1] - assert len(attrs) == 4 - assert attrs["domain"] == ["example.com"] - assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"] - assert attrs["path"] == ["/"] - assert attrs["httponly"] == [None] - - def test_get_cookies_no_value(self): - resp = tutils.tresp() - resp.headers = http.Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/") - result = resp.get_cookies() - assert len(result) == 1 - assert "cookiename" in result - assert result["cookiename"][0][0] == "" - assert len(result["cookiename"][0][1]) == 2 - - def test_get_cookies_twocookies(self): - resp = tutils.tresp() - resp.headers = http.Headers([ - ["Set-Cookie", "cookiename=cookievalue"], - ["Set-Cookie", "othercookie=othervalue"] - ]) - result = resp.get_cookies() - assert len(result) == 2 - assert "cookiename" in result - assert result["cookiename"][0] == ["cookievalue", odict.ODict()] - assert "othercookie" in result - assert result["othercookie"][0] == ["othervalue", odict.ODict()] - - def test_set_cookies(self): - resp = tutils.tresp() - v = resp.get_cookies() - v.add("foo", ["bar", odict.ODictCaseless()]) - resp.set_cookies(v) - - v = resp.get_cookies() - assert len(v) == 1 - assert v["foo"] == [["bar", odict.ODictCaseless()]] - - -class TestHeaders(object): - def _2host(self): - return semantics.Headers( - [ - ["Host", "example.com"], - ["host", "example.org"] - ] - ) - - def test_init(self): - headers = semantics.Headers() - assert len(headers) == 0 - - headers = semantics.Headers([["Host", "example.com"]]) - assert len(headers) == 1 - assert headers["Host"] == "example.com" - - headers = semantics.Headers(Host="example.com") - assert len(headers) == 1 - assert headers["Host"] == "example.com" - - headers = semantics.Headers( - [["Host", "invalid"]], - Host="example.com" - ) - assert len(headers) == 1 - assert headers["Host"] == "example.com" - - headers = semantics.Headers( - [["Host", "invalid"], ["Accept", "text/plain"]], - Host="example.com" - ) - assert len(headers) == 2 - assert headers["Host"] == "example.com" - assert headers["Accept"] == "text/plain" - - def test_getitem(self): - headers = semantics.Headers(Host="example.com") - assert headers["Host"] == "example.com" - assert headers["host"] == "example.com" - tutils.raises(KeyError, headers.__getitem__, "Accept") - - headers = self._2host() - assert headers["Host"] == "example.com, example.org" - - def test_str(self): - headers = semantics.Headers(Host="example.com") - assert bytes(headers) == "Host: example.com\r\n" - - headers = semantics.Headers([ - ["Host", "example.com"], - ["Accept", "text/plain"] - ]) - assert str(headers) == "Host: example.com\r\nAccept: text/plain\r\n" - - def test_setitem(self): - headers = semantics.Headers() - headers["Host"] = "example.com" - assert "Host" in headers - assert "host" in headers - assert headers["Host"] == "example.com" - - headers["host"] = "example.org" - assert "Host" in headers - assert "host" in headers - assert headers["Host"] == "example.org" - - headers["accept"] = "text/plain" - assert len(headers) == 2 - assert "Accept" in headers - assert "Host" in headers - - headers = self._2host() - assert len(headers.fields) == 2 - headers["Host"] = "example.com" - assert len(headers.fields) == 1 - assert "Host" in headers - - def test_delitem(self): - headers = semantics.Headers(Host="example.com") - assert len(headers) == 1 - del headers["host"] - assert len(headers) == 0 - try: - del headers["host"] - except KeyError: - assert True - else: - assert False - - headers = self._2host() - del headers["Host"] - assert len(headers) == 0 - - def test_keys(self): - headers = semantics.Headers(Host="example.com") - assert len(headers.keys()) == 1 - assert headers.keys()[0] == "Host" - - headers = self._2host() - assert len(headers.keys()) == 1 - assert headers.keys()[0] == "Host" - - def test_eq_ne(self): - headers1 = semantics.Headers(Host="example.com") - headers2 = semantics.Headers(host="example.com") - assert not (headers1 == headers2) - assert headers1 != headers2 - - headers1 = semantics.Headers(Host="example.com") - headers2 = semantics.Headers(Host="example.com") - assert headers1 == headers2 - assert not (headers1 != headers2) - - assert headers1 != 42 - - def test_get_all(self): - headers = self._2host() - assert headers.get_all("host") == ["example.com", "example.org"] - assert headers.get_all("accept") == [] - - def test_set_all(self): - headers = semantics.Headers(Host="example.com") - headers.set_all("Accept", ["text/plain"]) - assert len(headers) == 2 - assert "accept" in headers - - headers = self._2host() - headers.set_all("Host", ["example.org"]) - assert headers["host"] == "example.org" - - headers.set_all("Host", ["example.org", "example.net"]) - assert headers["host"] == "example.org, example.net" - - def test_state(self): - headers = self._2host() - assert len(headers.get_state()) == 2 - assert headers == semantics.Headers.from_state(headers.get_state()) - - headers2 = semantics.Headers() - assert headers != headers2 - headers2.load_state(headers.get_state()) - assert headers == headers2 diff --git a/test/websockets/test_websockets.py b/test/websockets/test_websockets.py index 57cfd166..3fdeb683 100644 --- a/test/websockets/test_websockets.py +++ b/test/websockets/test_websockets.py @@ -1,11 +1,13 @@ import os from nose.tools import raises +from netlib.http.http1 import read_response, read_request from netlib import tcp, tutils, websockets, http from netlib.http import status_codes -from netlib.http.exceptions import * -from netlib.http.http1 import HTTP1Protocol +from netlib.tutils import treq + +from netlib.exceptions import * from .. import tservers @@ -34,9 +36,8 @@ class WebSocketsEchoHandler(tcp.BaseHandler): frame.to_file(self.wfile) def handshake(self): - http1_protocol = HTTP1Protocol(self) - req = http1_protocol.read_request() + req = read_request(self.rfile) key = self.protocol.check_client_handshake(req.headers) preamble = 'HTTP/1.1 101 %s' % status_codes.RESPONSES.get(101) @@ -61,8 +62,6 @@ class WebSocketsClient(tcp.TCPClient): def connect(self): super(WebSocketsClient, self).connect() - http1_protocol = HTTP1Protocol(self) - preamble = 'GET / HTTP/1.1' self.wfile.write(preamble + "\r\n") headers = self.protocol.client_handshake_headers() @@ -70,7 +69,7 @@ class WebSocketsClient(tcp.TCPClient): self.wfile.write(str(headers) + "\r\n") self.wfile.flush() - resp = http1_protocol.read_response("GET", None) + resp = read_response(self.rfile, treq(method="GET")) server_nonce = self.protocol.check_server_handshake(resp.headers) if not server_nonce == self.protocol.create_server_nonce( @@ -158,9 +157,8 @@ class TestWebSockets(tservers.ServerTestBase): class BadHandshakeHandler(WebSocketsEchoHandler): def handshake(self): - http1_protocol = HTTP1Protocol(self) - client_hs = http1_protocol.read_request() + client_hs = read_request(self.rfile) self.protocol.check_client_handshake(client_hs.headers) preamble = 'HTTP/1.1 101 %s' % status_codes.RESPONSES.get(101) -- cgit v1.2.3 From 265f31e8782ee9da511ce4b63aa2da00221cbf66 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Wed, 16 Sep 2015 18:43:24 +0200 Subject: adjust http1-related code --- test/http/http1/test_read.py | 12 ++++++++---- test/http/http2/test_protocol.py | 20 ++++++++++---------- test/http/test_models.py | 8 ++++---- 3 files changed, 22 insertions(+), 18 deletions(-) (limited to 'test') diff --git a/test/http/http1/test_read.py b/test/http/http1/test_read.py index 5e6680af..55def2a5 100644 --- a/test/http/http1/test_read.py +++ b/test/http/http1/test_read.py @@ -108,14 +108,14 @@ class TestReadBody(object): def test_connection_close(): headers = Headers() - assert connection_close((1, 0), headers) - assert not connection_close((1, 1), headers) + assert connection_close(b"HTTP/1.0", headers) + assert not connection_close(b"HTTP/1.1", headers) headers["connection"] = "keep-alive" - assert not connection_close((1, 1), headers) + assert not connection_close(b"HTTP/1.1", headers) headers["connection"] = "close" - assert connection_close((1, 1), headers) + assert connection_close(b"HTTP/1.1", headers) def test_expected_http_body_size(): @@ -281,6 +281,10 @@ class TestReadHeaders(object): with raises(HttpSyntaxException): self._read(data) + def test_read_empty_name(self): + data = b":foo" + with raises(HttpSyntaxException): + self._read(data) def test_read_chunked(): req = treq(body=None) diff --git a/test/http/http2/test_protocol.py b/test/http/http2/test_protocol.py index 789b6e63..a369eb49 100644 --- a/test/http/http2/test_protocol.py +++ b/test/http/http2/test_protocol.py @@ -64,7 +64,7 @@ class TestProtocol: class TestCheckALPNMatch(tservers.ServerTestBase): handler = EchoHandler ssl = dict( - alpn_select=HTTP2Protocol.ALPN_PROTO_H2, + alpn_select=ALPN_PROTO_H2, ) if OpenSSL._util.lib.Cryptography_HAS_ALPN: @@ -72,7 +72,7 @@ class TestCheckALPNMatch(tservers.ServerTestBase): def test_check_alpn(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - c.convert_to_ssl(alpn_protos=[HTTP2Protocol.ALPN_PROTO_H2]) + c.convert_to_ssl(alpn_protos=[ALPN_PROTO_H2]) protocol = HTTP2Protocol(c) assert protocol.check_alpn() @@ -88,7 +88,7 @@ class TestCheckALPNMismatch(tservers.ServerTestBase): def test_check_alpn(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - c.convert_to_ssl(alpn_protos=[HTTP2Protocol.ALPN_PROTO_H2]) + c.convert_to_ssl(alpn_protos=[ALPN_PROTO_H2]) protocol = HTTP2Protocol(c) tutils.raises(NotImplementedError, protocol.check_alpn) @@ -306,7 +306,7 @@ class TestReadRequest(tservers.ServerTestBase): protocol = HTTP2Protocol(c, is_server=True) protocol.connection_preface_performed = True - req = protocol.read_request() + req = protocol.read_request(NotImplemented) assert req.stream_id assert req.headers.fields == [[':method', 'GET'], [':path', '/'], [':scheme', 'https']] @@ -329,7 +329,7 @@ class TestReadRequestRelative(tservers.ServerTestBase): protocol = HTTP2Protocol(c, is_server=True) protocol.connection_preface_performed = True - req = protocol.read_request() + req = protocol.read_request(NotImplemented) assert req.form_in == "relative" assert req.method == "OPTIONS" @@ -352,7 +352,7 @@ class TestReadRequestAbsolute(tservers.ServerTestBase): protocol = HTTP2Protocol(c, is_server=True) protocol.connection_preface_performed = True - req = protocol.read_request() + req = protocol.read_request(NotImplemented) assert req.form_in == "absolute" assert req.scheme == "http" @@ -378,13 +378,13 @@ class TestReadRequestConnect(tservers.ServerTestBase): protocol = HTTP2Protocol(c, is_server=True) protocol.connection_preface_performed = True - req = protocol.read_request() + req = protocol.read_request(NotImplemented) assert req.form_in == "authority" assert req.method == "CONNECT" assert req.host == "address" assert req.port == 22 - req = protocol.read_request() + req = protocol.read_request(NotImplemented) assert req.form_in == "authority" assert req.method == "CONNECT" assert req.host == "example.com" @@ -410,7 +410,7 @@ class TestReadResponse(tservers.ServerTestBase): protocol = HTTP2Protocol(c) protocol.connection_preface_performed = True - resp = protocol.read_response(stream_id=42) + resp = protocol.read_response(NotImplemented, stream_id=42) assert resp.httpversion == (2, 0) assert resp.status_code == 200 @@ -436,7 +436,7 @@ class TestReadEmptyResponse(tservers.ServerTestBase): protocol = HTTP2Protocol(c) protocol.connection_preface_performed = True - resp = protocol.read_response(stream_id=42) + resp = protocol.read_response(NotImplemented, stream_id=42) assert resp.stream_id == 42 assert resp.httpversion == (2, 0) diff --git a/test/http/test_models.py b/test/http/test_models.py index 0f4dcc3b..8fce2e9d 100644 --- a/test/http/test_models.py +++ b/test/http/test_models.py @@ -20,7 +20,7 @@ class TestRequest(object): 'host', 'port', 'path', - (1, 1), + b"HTTP/1.1", 'foobar', ) @@ -31,7 +31,7 @@ class TestRequest(object): 'host', 'port', 'path', - (1, 1), + b"HTTP/1.1", ) assert isinstance(req.headers, Headers) @@ -307,13 +307,13 @@ class TestRequest(object): class TestResponse(object): def test_headers(self): tutils.raises(AssertionError, Response, - (1, 1), + b"HTTP/1.1", 200, headers='foobar', ) resp = Response( - (1, 1), + b"HTTP/1.1", 200, ) assert isinstance(resp.headers, Headers) -- cgit v1.2.3