diff options
-rw-r--r-- | netlib/encoding.py | 4 | ||||
-rw-r--r-- | netlib/http/headers.py | 8 | ||||
-rw-r--r-- | netlib/http/message.py | 42 | ||||
-rw-r--r-- | netlib/http/request.py | 28 | ||||
-rw-r--r-- | netlib/http/response.py | 8 | ||||
-rw-r--r-- | netlib/http/status_codes.py | 4 | ||||
-rw-r--r-- | test/http/http1/test_read.py | 17 | ||||
-rw-r--r-- | test/http/test_headers.py | 3 | ||||
-rw-r--r-- | test/http/test_message.py | 136 | ||||
-rw-r--r-- | test/http/test_models.py | 266 | ||||
-rw-r--r-- | test/http/test_request.py | 229 | ||||
-rw-r--r-- | test/http/test_status_codes.py | 6 |
12 files changed, 455 insertions, 296 deletions
diff --git a/netlib/encoding.py b/netlib/encoding.py index 4c11273b..14479e00 100644 --- a/netlib/encoding.py +++ b/netlib/encoding.py @@ -12,6 +12,8 @@ ENCODINGS = {"identity", "gzip", "deflate"} def decode(e, content): + if not isinstance(content, bytes): + return None encoding_map = { "identity": identity, "gzip": decode_gzip, @@ -23,6 +25,8 @@ def decode(e, content): def encode(e, content): + if not isinstance(content, bytes): + return None encoding_map = { "identity": identity, "gzip": encode_gzip, diff --git a/netlib/http/headers.py b/netlib/http/headers.py index c79c3344..f64e6200 100644 --- a/netlib/http/headers.py +++ b/netlib/http/headers.py @@ -8,15 +8,15 @@ from __future__ import absolute_import, print_function, division import copy try: from collections.abc import MutableMapping -except ImportError: # Workaround for Python < 3.3 - from collections import MutableMapping +except ImportError: # pragma: nocover + from collections import MutableMapping # Workaround for Python < 3.3 import six from netlib.utils import always_byte_args, always_bytes -if six.PY2: +if six.PY2: # pragma: nocover _native = lambda x: x _always_bytes = lambda x: x _always_byte_args = lambda x: x @@ -106,7 +106,7 @@ class Headers(MutableMapping): else: return b"" - if six.PY2: + if six.PY2: # pragma: nocover __str__ = __bytes__ @_always_byte_args diff --git a/netlib/http/message.py b/netlib/http/message.py index ee138746..7cb18f52 100644 --- a/netlib/http/message.py +++ b/netlib/http/message.py @@ -9,7 +9,7 @@ from .. import encoding, utils CONTENT_MISSING = 0 -if six.PY2: +if six.PY2: # pragma: nocover _native = lambda x: x _always_bytes = lambda x: x else: @@ -110,15 +110,48 @@ class Message(object): def text(self, text): raise NotImplementedError() + def decode(self): + """ + Decodes body based on the current Content-Encoding header, then + removes the header. If there is no Content-Encoding header, no + action is taken. + + Returns: + True, if decoding succeeded. + False, otherwise. + """ + ce = self.headers.get("content-encoding") + data = encoding.decode(ce, self.content) + if data is None: + return False + self.content = data + self.headers.pop("content-encoding", None) + return True + + def encode(self, e): + """ + Encodes body with the encoding e, where e is "gzip", "deflate" or "identity". + + Returns: + True, if decoding succeeded. + False, otherwise. + """ + data = encoding.encode(e, self.content) + if data is None: + return False + self.content = data + self.headers["content-encoding"] = e + return True + # Legacy @property - def body(self): + def body(self): # pragma: nocover warnings.warn(".body is deprecated, use .content instead.", DeprecationWarning) return self.content @body.setter - def body(self, body): + def body(self, body): # pragma: nocover warnings.warn(".body is deprecated, use .content instead.", DeprecationWarning) self.content = body @@ -146,8 +179,7 @@ class decoded(object): def __enter__(self): if self.ce: - if not self.message.decode(): - self.ce = None + self.message.decode() def __exit__(self, type, value, tb): if self.ce: diff --git a/netlib/http/request.py b/netlib/http/request.py index f8a3b5b9..325c0080 100644 --- a/netlib/http/request.py +++ b/netlib/http/request.py @@ -102,7 +102,7 @@ class Request(Message): or inferred from the proxy mode (e.g. an IP in transparent mode). """ - if six.PY2: + if six.PY2: # pragma: nocover return self.data.host if not self.data.host: @@ -303,58 +303,58 @@ class Request(Message): # Legacy - def get_cookies(self): + def get_cookies(self): # pragma: nocover warnings.warn(".get_cookies is deprecated, use .cookies instead.", DeprecationWarning) return self.cookies - def set_cookies(self, odict): + def set_cookies(self, odict): # pragma: nocover warnings.warn(".set_cookies is deprecated, use .cookies instead.", DeprecationWarning) self.cookies = odict - def get_query(self): + def get_query(self): # pragma: nocover warnings.warn(".get_query is deprecated, use .query instead.", DeprecationWarning) return self.query or ODict([]) - def set_query(self, odict): + def set_query(self, odict): # pragma: nocover warnings.warn(".set_query is deprecated, use .query instead.", DeprecationWarning) self.query = odict - def get_path_components(self): + def get_path_components(self): # pragma: nocover warnings.warn(".get_path_components is deprecated, use .path_components instead.", DeprecationWarning) return self.path_components - def set_path_components(self, lst): + def set_path_components(self, lst): # pragma: nocover warnings.warn(".set_path_components is deprecated, use .path_components instead.", DeprecationWarning) self.path_components = lst - def get_form_urlencoded(self): + def get_form_urlencoded(self): # pragma: nocover warnings.warn(".get_form_urlencoded is deprecated, use .urlencoded_form instead.", DeprecationWarning) return self.urlencoded_form or ODict([]) - def set_form_urlencoded(self, odict): + def set_form_urlencoded(self, odict): # pragma: nocover warnings.warn(".set_form_urlencoded is deprecated, use .urlencoded_form instead.", DeprecationWarning) self.urlencoded_form = odict - def get_form_multipart(self): + def get_form_multipart(self): # pragma: nocover warnings.warn(".get_form_multipart is deprecated, use .multipart_form instead.", DeprecationWarning) return self.multipart_form or ODict([]) @property - def form_in(self): + def form_in(self): # pragma: nocover warnings.warn(".form_in is deprecated, use .first_line_format instead.", DeprecationWarning) return self.first_line_format @form_in.setter - def form_in(self, form_in): + def form_in(self, form_in): # pragma: nocover warnings.warn(".form_in is deprecated, use .first_line_format instead.", DeprecationWarning) self.first_line_format = form_in @property - def form_out(self): + def form_out(self): # pragma: nocover warnings.warn(".form_out is deprecated, use .first_line_format instead.", DeprecationWarning) return self.first_line_format @form_out.setter - def form_out(self, form_out): + def form_out(self, form_out): # pragma: nocover warnings.warn(".form_out is deprecated, use .first_line_format instead.", DeprecationWarning) self.first_line_format = form_out
\ No newline at end of file diff --git a/netlib/http/response.py b/netlib/http/response.py index 7d64243d..db31d2b9 100644 --- a/netlib/http/response.py +++ b/netlib/http/response.py @@ -106,20 +106,20 @@ class Response(Message): # Legacy - def get_cookies(self): + def get_cookies(self): # pragma: nocover warnings.warn(".get_cookies is deprecated, use .cookies instead.", DeprecationWarning) return self.cookies - def set_cookies(self, odict): + def set_cookies(self, odict): # pragma: nocover warnings.warn(".set_cookies is deprecated, use .cookies instead.", DeprecationWarning) self.cookies = odict @property - def msg(self): + def msg(self): # pragma: nocover warnings.warn(".msg is deprecated, use .reason instead.", DeprecationWarning) return self.reason @msg.setter - def msg(self, reason): + def msg(self, reason): # pragma: nocover warnings.warn(".msg is deprecated, use .reason instead.", DeprecationWarning) self.reason = reason diff --git a/netlib/http/status_codes.py b/netlib/http/status_codes.py index dc09f465..8a4dc1f5 100644 --- a/netlib/http/status_codes.py +++ b/netlib/http/status_codes.py @@ -1,4 +1,4 @@ -from __future__ import (absolute_import, print_function, division) +from __future__ import absolute_import, print_function, division CONTINUE = 100 SWITCHING = 101 @@ -37,6 +37,7 @@ REQUEST_URI_TOO_LONG = 414 UNSUPPORTED_MEDIA_TYPE = 415 REQUESTED_RANGE_NOT_SATISFIABLE = 416 EXPECTATION_FAILED = 417 +IM_A_TEAPOT = 418 INTERNAL_SERVER_ERROR = 500 NOT_IMPLEMENTED = 501 @@ -91,6 +92,7 @@ RESPONSES = { UNSUPPORTED_MEDIA_TYPE: "Unsupported Media Type", REQUESTED_RANGE_NOT_SATISFIABLE: "Requested Range not satisfiable", EXPECTATION_FAILED: "Expectation Failed", + IM_A_TEAPOT: "I'm a teapot", # 500 INTERNAL_SERVER_ERROR: "Internal Server Error", diff --git a/test/http/http1/test_read.py b/test/http/http1/test_read.py index fadfe446..a0085db9 100644 --- a/test/http/http1/test_read.py +++ b/test/http/http1/test_read.py @@ -2,7 +2,7 @@ from __future__ import absolute_import, print_function, division from io import BytesIO import textwrap from mock import Mock -from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect +from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect, TcpDisconnect from netlib.http import Headers from netlib.http.http1.read import ( read_request, read_response, read_request_head, @@ -100,6 +100,11 @@ class TestReadBody(object): with raises(HttpException): b"".join(read_body(rfile, -1, 3)) + def test_max_chunk_size(self): + rfile = BytesIO(b"123456") + assert list(read_body(rfile, -1, max_chunk_size=None)) == [b"123456"] + rfile = BytesIO(b"123456") + assert list(read_body(rfile, -1, max_chunk_size=1)) == [b"1", b"2", b"3", b"4", b"5", b"6"] def test_connection_close(): headers = Headers() @@ -169,6 +174,11 @@ def test_get_first_line(): rfile = BytesIO(b"") _get_first_line(rfile) + with raises(HttpReadDisconnect): + rfile = Mock() + rfile.readline.side_effect = TcpDisconnect + _get_first_line(rfile) + with raises(HttpSyntaxException): rfile = BytesIO(b"GET /\xff HTTP/1.1") _get_first_line(rfile) @@ -191,7 +201,8 @@ def test_read_request_line(): t(b"GET / WTF/1.1") with raises(HttpSyntaxException): t(b"this is not http") - + with raises(HttpReadDisconnect): + t(b"") def test_parse_authority_form(): assert _parse_authority_form(b"foo:42") == (b"foo", 42) @@ -218,6 +229,8 @@ def test_read_response_line(): t(b"HTTP/1.1 OK OK") with raises(HttpSyntaxException): t(b"WTF/1.1 200 OK") + with raises(HttpReadDisconnect): + t(b"") def test_check_http_version(): diff --git a/test/http/test_headers.py b/test/http/test_headers.py index f1af1feb..8bddc0b2 100644 --- a/test/http/test_headers.py +++ b/test/http/test_headers.py @@ -38,6 +38,9 @@ class TestHeaders(object): assert headers["Host"] == "example.com" assert headers["Accept"] == "text/plain" + with raises(ValueError): + Headers([[b"Host", u"not-bytes"]]) + def test_getitem(self): headers = Headers(Host="example.com") assert headers["Host"] == "example.com" diff --git a/test/http/test_message.py b/test/http/test_message.py new file mode 100644 index 00000000..b0b7e27f --- /dev/null +++ b/test/http/test_message.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, print_function, division + +from netlib.http import decoded +from netlib.tutils import tresp + + +def _test_passthrough_attr(message, attr): + def t(self=None): + assert getattr(message, attr) == getattr(message.data, attr) + setattr(message, attr, "foo") + assert getattr(message.data, attr) == "foo" + return t + + +def _test_decoded_attr(message, attr): + def t(self=None): + assert getattr(message, attr) == getattr(message.data, attr).decode("utf8") + # Set str, get raw bytes + setattr(message, attr, "foo") + assert getattr(message.data, attr) == b"foo" + # Set raw bytes, get decoded + setattr(message.data, attr, b"bar") + assert getattr(message, attr) == "bar" + # Set bytes, get raw bytes + setattr(message, attr, b"baz") + assert getattr(message.data, attr) == b"baz" + + # Set UTF8 + setattr(message, attr, "Non-Autorisé") + assert getattr(message.data, attr) == b"Non-Autoris\xc3\xa9" + # Don't fail on garbage + setattr(message.data, attr, b"foo\xFF\x00bar") + assert getattr(message, attr).startswith("foo") + assert getattr(message, attr).endswith("bar") + # foo.bar = foo.bar should not cause any side effects. + d = getattr(message, attr) + setattr(message, attr, d) + assert getattr(message.data, attr) == b"foo\xFF\x00bar" + return t + + +class TestMessage(object): + + def test_init(self): + resp = tresp() + assert resp.data + + def test_eq_ne(self): + resp = tresp(timestamp_start=42, timestamp_end=42) + same = tresp(timestamp_start=42, timestamp_end=42) + assert resp == same + assert not resp != same + + other = tresp(timestamp_start=0, timestamp_end=0) + assert not resp == other + assert resp != other + + assert resp != 0 + + def test_content_length_update(self): + resp = tresp() + resp.content = b"foo" + assert resp.data.content == b"foo" + assert resp.headers["content-length"] == "3" + resp.content = b"" + assert resp.data.content == b"" + assert resp.headers["content-length"] == "0" + + test_content_basic = _test_passthrough_attr(tresp(), "content") + test_headers = _test_passthrough_attr(tresp(), "headers") + test_timestamp_start = _test_passthrough_attr(tresp(), "timestamp_start") + test_timestamp_end = _test_passthrough_attr(tresp(), "timestamp_end") + + test_http_version = _test_decoded_attr(tresp(), "http_version") + + +class TestDecodedDecorator(object): + + def test_simple(self): + r = tresp() + assert r.content == b"message" + assert "content-encoding" not in r.headers + assert r.encode("gzip") + + assert r.headers["content-encoding"] + assert r.content != b"message" + with decoded(r): + assert "content-encoding" not in r.headers + assert r.content == b"message" + assert r.headers["content-encoding"] + assert r.content != b"message" + + def test_modify(self): + r = tresp() + assert "content-encoding" not in r.headers + assert r.encode("gzip") + + with decoded(r): + r.content = b"foo" + + assert r.content != b"foo" + r.decode() + assert r.content == b"foo" + + def test_unknown_ce(self): + r = tresp() + r.headers["content-encoding"] = "zopfli" + r.content = b"foo" + with decoded(r): + assert r.headers["content-encoding"] + assert r.content == b"foo" + assert r.headers["content-encoding"] + assert r.content == b"foo" + + def test_cannot_decode(self): + r = tresp() + assert r.encode("gzip") + r.content = b"foo" + with decoded(r): + assert r.headers["content-encoding"] + assert r.content == b"foo" + assert r.headers["content-encoding"] + assert r.content != b"foo" + r.decode() + assert r.content == b"foo" + + def test_cannot_encode(self): + r = tresp() + assert r.encode("gzip") + with decoded(r): + r.content = None + + assert "content-encoding" not in r.headers + assert r.content is None + diff --git a/test/http/test_models.py b/test/http/test_models.py index aa267944..76a05446 100644 --- a/test/http/test_models.py +++ b/test/http/test_models.py @@ -1,271 +1,7 @@ -import mock from netlib import tutils -from netlib import utils from netlib.odict import ODict, ODictCaseless -from netlib.http import Request, Response, Headers, CONTENT_MISSING - -class TestRequest(object): - def test_repr(self): - r = tutils.treq() - assert repr(r) - - def test_headers(self): - tutils.raises(AssertionError, Request, - 'form_in', - 'method', - 'scheme', - 'host', - 'port', - 'path', - b"HTTP/1.1", - 'foobar', - ) - - req = Request( - 'form_in', - 'method', - 'scheme', - 'host', - 'port', - 'path', - b"HTTP/1.1", - ) - assert isinstance(req.headers, Headers) - - def test_equal(self): - a = tutils.treq(timestamp_start=42, timestamp_end=43) - b = tutils.treq(timestamp_start=42, timestamp_end=43) - assert a == b - assert not a != b - - assert not a == 'foo' - assert not b == 'foo' - assert not 'foo' == a - assert not 'foo' == b - - - def test_anticache(self): - req = tutils.treq() - req.headers["If-Modified-Since"] = "foo" - req.headers["If-None-Match"] = "bar" - req.anticache() - assert "If-Modified-Since" not in req.headers - assert "If-None-Match" not in req.headers - - def test_anticomp(self): - req = tutils.treq() - req.headers["Accept-Encoding"] = "foobar" - req.anticomp() - assert req.headers["Accept-Encoding"] == "identity" - - def test_constrain_encoding(self): - req = tutils.treq() - req.headers["Accept-Encoding"] = "identity, gzip, foo" - req.constrain_encoding() - assert "foo" not in req.headers["Accept-Encoding"] - - def test_update_host(self): - req = tutils.treq() - req.headers["Host"] = "" - req.host = "foobar" - assert req.headers["Host"] == "foobar" - - def test_get_form_urlencoded(self): - req = tutils.treq(content="foobar") - assert req.get_form_urlencoded() == ODict() - - req.headers["Content-Type"] = "application/x-www-form-urlencoded" - assert req.get_form_urlencoded() == ODict(utils.urldecode(req.body)) - - def test_get_form_multipart(self): - req = tutils.treq(content="foobar") - assert req.get_form_multipart() == ODict() - - req.headers["Content-Type"] = "multipart/form-data" - assert req.get_form_multipart() == ODict( - utils.multipartdecode( - req.headers, - req.body - ) - ) - - def test_set_form_urlencoded(self): - req = tutils.treq() - req.set_form_urlencoded(ODict([('foo', 'bar'), ('rab', 'oof')])) - assert req.headers["Content-Type"] == "application/x-www-form-urlencoded" - assert req.body - - def test_get_path_components(self): - req = tutils.treq() - assert req.get_path_components() - # TODO: add meaningful assertions - - def test_set_path_components(self): - req = tutils.treq() - req.set_path_components([b"foo", b"bar"]) - # TODO: add meaningful assertions - - def test_get_query(self): - req = tutils.treq() - assert req.get_query().lst == [] - - req.url = "http://localhost:80/foo?bar=42" - assert req.get_query().lst == [("bar", "42")] - - def test_set_query(self): - req = tutils.treq() - req.set_query(ODict([])) - - def test_pretty_host(self): - r = tutils.treq() - assert r.pretty_host == "address" - assert r.host == "address" - r.headers["host"] = "other" - assert r.pretty_host == "other" - assert r.host == "address" - r.host = None - assert r.pretty_host is None - assert r.host is None - - # Invalid IDNA - r.headers["host"] = ".disqus.com" - assert r.pretty_host == ".disqus.com" - - def test_pretty_url(self): - req = tutils.treq(first_line_format="relative") - assert req.pretty_url == "http://address:22/path" - assert req.url == "http://address:22/path" - - def test_get_cookies_none(self): - headers = Headers() - r = tutils.treq() - r.headers = headers - assert len(r.get_cookies()) == 0 - - def test_get_cookies_single(self): - r = tutils.treq() - r.headers = Headers(cookie="cookiename=cookievalue") - result = r.get_cookies() - assert len(result) == 1 - assert result['cookiename'] == ['cookievalue'] - - def test_get_cookies_double(self): - r = tutils.treq() - r.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue") - result = r.get_cookies() - assert len(result) == 2 - assert result['cookiename'] == ['cookievalue'] - assert result['othercookiename'] == ['othercookievalue'] - - def test_get_cookies_withequalsign(self): - r = tutils.treq() - r.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue") - result = r.get_cookies() - assert len(result) == 2 - assert result['cookiename'] == ['coo=kievalue'] - assert result['othercookiename'] == ['othercookievalue'] - - def test_set_cookies(self): - r = tutils.treq() - r.headers = Headers(cookie="cookiename=cookievalue") - result = r.get_cookies() - result["cookiename"] = ["foo"] - r.set_cookies(result) - assert r.get_cookies()["cookiename"] == ["foo"] - - def test_set_url(self): - r = tutils.treq(first_line_format="absolute") - r.url = b"https://otheraddress:42/ORLY" - assert r.scheme == "https" - assert r.host == "otheraddress" - assert r.port == 42 - assert r.path == "/ORLY" - - try: - r.url = "//localhost:80/foo@bar" - assert False - except: - assert True - - # def test_asterisk_form_in(self): - # f = tutils.tflow(req=None) - # protocol = mock_protocol("OPTIONS * HTTP/1.1") - # f.request = HTTPRequest.from_protocol(protocol) - # - # assert f.request.first_line_format == "relative" - # f.request.host = f.server_conn.address.host - # f.request.port = f.server_conn.address.port - # f.request.scheme = "http" - # assert protocol.assemble(f.request) == ( - # "OPTIONS * HTTP/1.1\r\n" - # "Host: address:22\r\n" - # "Content-Length: 0\r\n\r\n") - # - # def test_relative_form_in(self): - # protocol = mock_protocol("GET /foo\xff HTTP/1.1") - # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) - # - # protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c") - # r = HTTPRequest.from_protocol(protocol) - # assert r.headers["Upgrade"] == ["h2c"] - # - # def test_expect_header(self): - # protocol = mock_protocol( - # "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar") - # r = HTTPRequest.from_protocol(protocol) - # assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n" - # assert r.content == "foo" - # assert protocol.tcp_handler.rfile.read(3) == "bar" - # - # def test_authority_form_in(self): - # protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1") - # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) - # - # protocol = mock_protocol("CONNECT address:22 HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # r.scheme, r.host, r.port = "http", "address", 22 - # assert protocol.assemble(r) == ( - # "CONNECT address:22 HTTP/1.1\r\n" - # "Host: address:22\r\n" - # "Content-Length: 0\r\n\r\n") - # assert r.pretty_url == "address:22" - # - # def test_absolute_form_in(self): - # protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1") - # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) - # - # protocol = mock_protocol("GET http://address:22/ HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # assert protocol.assemble(r) == ( - # "GET http://address:22/ HTTP/1.1\r\n" - # "Host: address:22\r\n" - # "Content-Length: 0\r\n\r\n") - # - # def test_http_options_relative_form_in(self): - # """ - # Exercises fix for Issue #392. - # """ - # protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # r.host = 'address' - # r.port = 80 - # r.scheme = "http" - # assert protocol.assemble(r) == ( - # "OPTIONS /secret/resource HTTP/1.1\r\n" - # "Host: address\r\n" - # "Content-Length: 0\r\n\r\n") - # - # def test_http_options_absolute_form_in(self): - # protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # r.host = 'address' - # r.port = 80 - # r.scheme = "http" - # assert protocol.assemble(r) == ( - # "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n" - # "Host: address\r\n" - # "Content-Length: 0\r\n\r\n") +from netlib.http import Response, Headers, CONTENT_MISSING class TestResponse(object): def test_headers(self): diff --git a/test/http/test_request.py b/test/http/test_request.py index 02fac3df..15bdd3e3 100644 --- a/test/http/test_request.py +++ b/test/http/test_request.py @@ -1,3 +1,230 @@ +# -*- coding: utf-8 -*- from __future__ import absolute_import, print_function, division -# TODO
\ No newline at end of file +import six + +from netlib import utils +from netlib.http import Headers +from netlib.odict import ODict +from netlib.tutils import treq, raises +from .test_message import _test_decoded_attr, _test_passthrough_attr + + +class TestRequestData(object): + def test_init(self): + with raises(AssertionError): + treq(headers="foobar") + + assert isinstance(treq(headers=None).headers, Headers) + + def test_eq_ne(self): + request_data = treq().data + same = treq().data + assert request_data == same + assert not request_data != same + + other = treq(content=b"foo").data + assert not request_data == other + assert request_data != other + + assert request_data != 0 + + +class TestRequestCore(object): + def test_repr(self): + request = treq() + assert repr(request) == "Request(GET address:22/path)" + request.host = None + assert repr(request) == "Request(GET /path)" + + test_first_line_format = _test_passthrough_attr(treq(), "first_line_format") + test_method = _test_decoded_attr(treq(), "method") + test_scheme = _test_decoded_attr(treq(), "scheme") + test_port = _test_passthrough_attr(treq(), "port") + test_path = _test_decoded_attr(treq(), "path") + + def test_host(self): + if six.PY2: + from unittest import SkipTest + raise SkipTest() + + request = treq() + assert request.host == request.data.host.decode("idna") + + # Test IDNA encoding + # Set str, get raw bytes + request.host = "ídna.example" + assert request.data.host == b"xn--dna-qma.example" + # Set raw bytes, get decoded + request.data.host = b"xn--idn-gla.example" + assert request.host == "idná.example" + # Set bytes, get raw bytes + request.host = b"xn--dn-qia9b.example" + assert request.data.host == b"xn--dn-qia9b.example" + # IDNA encoding is not bijective + request.host = "fußball" + assert request.host == "fussball" + + # Don't fail on garbage + request.data.host = b"foo\xFF\x00bar" + assert request.host.startswith("foo") + assert request.host.endswith("bar") + # foo.bar = foo.bar should not cause any side effects. + d = request.host + request.host = d + assert request.data.host == b"foo\xFF\x00bar" + + def test_host_header_update(self): + request = treq() + assert "host" not in request.headers + request.host = "example.com" + assert "host" not in request.headers + + request.headers["Host"] = "foo" + request.host = "example.org" + assert request.headers["Host"] == "example.org" + + +class TestRequestUtils(object): + def test_url(self): + request = treq() + assert request.url == "http://address:22/path" + + request.url = "https://otheraddress:42/foo" + assert request.scheme == "https" + assert request.host == "otheraddress" + assert request.port == 42 + assert request.path == "/foo" + + with raises(ValueError): + request.url = "not-a-url" + + def test_pretty_host(self): + request = treq() + assert request.pretty_host == "address" + assert request.host == "address" + request.headers["host"] = "other" + assert request.pretty_host == "other" + assert request.host == "address" + request.host = None + assert request.pretty_host is None + assert request.host is None + + # Invalid IDNA + request.headers["host"] = ".disqus.com" + assert request.pretty_host == ".disqus.com" + + def test_pretty_url(self): + request = treq() + assert request.url == "http://address:22/path" + assert request.pretty_url == "http://address:22/path" + request.headers["host"] = "other" + assert request.pretty_url == "http://other:22/path" + + def test_pretty_url_authority(self): + request = treq(first_line_format="authority") + assert request.pretty_url == "address:22" + + def test_get_query(self): + request = treq() + assert request.query is None + + request.url = "http://localhost:80/foo?bar=42" + assert request.query.lst == [("bar", "42")] + + def test_set_query(self): + request = treq() + request.query = ODict([]) + + def test_get_cookies_none(self): + request = treq() + request.headers = Headers() + assert len(request.cookies) == 0 + + def test_get_cookies_single(self): + request = treq() + request.headers = Headers(cookie="cookiename=cookievalue") + result = request.cookies + assert len(result) == 1 + assert result['cookiename'] == ['cookievalue'] + + def test_get_cookies_double(self): + request = treq() + request.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue") + result = request.cookies + assert len(result) == 2 + assert result['cookiename'] == ['cookievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_get_cookies_withequalsign(self): + request = treq() + request.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue") + result = request.cookies + assert len(result) == 2 + assert result['cookiename'] == ['coo=kievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_set_cookies(self): + request = treq() + request.headers = Headers(cookie="cookiename=cookievalue") + result = request.cookies + result["cookiename"] = ["foo"] + request.cookies = result + assert request.cookies["cookiename"] == ["foo"] + + def test_get_path_components(self): + request = treq(path=b"/foo/bar") + assert request.path_components == ["foo", "bar"] + + def test_set_path_components(self): + request = treq() + request.path_components = ["foo", "baz"] + assert request.path == "/foo/baz" + request.path_components = [] + assert request.path == "/" + + def test_anticache(self): + request = treq() + request.headers["If-Modified-Since"] = "foo" + request.headers["If-None-Match"] = "bar" + request.anticache() + assert "If-Modified-Since" not in request.headers + assert "If-None-Match" not in request.headers + + def test_anticomp(self): + request = treq() + request.headers["Accept-Encoding"] = "foobar" + request.anticomp() + assert request.headers["Accept-Encoding"] == "identity" + + def test_constrain_encoding(self): + request = treq() + request.headers["Accept-Encoding"] = "identity, gzip, foo" + request.constrain_encoding() + assert "foo" not in request.headers["Accept-Encoding"] + assert "gzip" in request.headers["Accept-Encoding"] + + def test_get_urlencoded_form(self): + request = treq(content="foobar") + assert request.urlencoded_form is None + + request.headers["Content-Type"] = "application/x-www-form-urlencoded" + assert request.urlencoded_form == ODict(utils.urldecode(request.content)) + + def test_set_urlencoded_form(self): + request = treq() + request.urlencoded_form = ODict([('foo', 'bar'), ('rab', 'oof')]) + assert request.headers["Content-Type"] == "application/x-www-form-urlencoded" + assert request.content + + def test_get_multipart_form(self): + request = treq(content="foobar") + assert request.multipart_form is None + + request.headers["Content-Type"] = "multipart/form-data" + assert request.multipart_form == ODict( + utils.multipartdecode( + request.headers, + request.content + ) + ) diff --git a/test/http/test_status_codes.py b/test/http/test_status_codes.py new file mode 100644 index 00000000..9fea6b70 --- /dev/null +++ b/test/http/test_status_codes.py @@ -0,0 +1,6 @@ +from netlib.http import status_codes + + +def test_simple(): + assert status_codes.IM_A_TEAPOT == 418 + assert status_codes.RESPONSES[418] == "I'm a teapot" |