aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2015-09-26 20:07:11 +0200
committerMaximilian Hils <git@maximilianhils.com>2015-09-26 20:07:11 +0200
commit466888b01a361e46fb3d4e66afa2c6a0fd168c8e (patch)
treed7e6c6180b108318d76698883ddf17ae4cb704b0
parent49ea8fc0ebcfe4861f099200044a553f092faec7 (diff)
downloadmitmproxy-466888b01a361e46fb3d4e66afa2c6a0fd168c8e.tar.gz
mitmproxy-466888b01a361e46fb3d4e66afa2c6a0fd168c8e.tar.bz2
mitmproxy-466888b01a361e46fb3d4e66afa2c6a0fd168c8e.zip
improve request tests, coverage++
-rw-r--r--netlib/encoding.py4
-rw-r--r--netlib/http/headers.py8
-rw-r--r--netlib/http/message.py42
-rw-r--r--netlib/http/request.py28
-rw-r--r--netlib/http/response.py8
-rw-r--r--netlib/http/status_codes.py4
-rw-r--r--test/http/http1/test_read.py17
-rw-r--r--test/http/test_headers.py3
-rw-r--r--test/http/test_message.py136
-rw-r--r--test/http/test_models.py266
-rw-r--r--test/http/test_request.py229
-rw-r--r--test/http/test_status_codes.py6
12 files changed, 455 insertions, 296 deletions
diff --git a/netlib/encoding.py b/netlib/encoding.py
index 4c11273b..14479e00 100644
--- a/netlib/encoding.py
+++ b/netlib/encoding.py
@@ -12,6 +12,8 @@ ENCODINGS = {"identity", "gzip", "deflate"}
def decode(e, content):
+ if not isinstance(content, bytes):
+ return None
encoding_map = {
"identity": identity,
"gzip": decode_gzip,
@@ -23,6 +25,8 @@ def decode(e, content):
def encode(e, content):
+ if not isinstance(content, bytes):
+ return None
encoding_map = {
"identity": identity,
"gzip": encode_gzip,
diff --git a/netlib/http/headers.py b/netlib/http/headers.py
index c79c3344..f64e6200 100644
--- a/netlib/http/headers.py
+++ b/netlib/http/headers.py
@@ -8,15 +8,15 @@ from __future__ import absolute_import, print_function, division
import copy
try:
from collections.abc import MutableMapping
-except ImportError: # Workaround for Python < 3.3
- from collections import MutableMapping
+except ImportError: # pragma: nocover
+ from collections import MutableMapping # Workaround for Python < 3.3
import six
from netlib.utils import always_byte_args, always_bytes
-if six.PY2:
+if six.PY2: # pragma: nocover
_native = lambda x: x
_always_bytes = lambda x: x
_always_byte_args = lambda x: x
@@ -106,7 +106,7 @@ class Headers(MutableMapping):
else:
return b""
- if six.PY2:
+ if six.PY2: # pragma: nocover
__str__ = __bytes__
@_always_byte_args
diff --git a/netlib/http/message.py b/netlib/http/message.py
index ee138746..7cb18f52 100644
--- a/netlib/http/message.py
+++ b/netlib/http/message.py
@@ -9,7 +9,7 @@ from .. import encoding, utils
CONTENT_MISSING = 0
-if six.PY2:
+if six.PY2: # pragma: nocover
_native = lambda x: x
_always_bytes = lambda x: x
else:
@@ -110,15 +110,48 @@ class Message(object):
def text(self, text):
raise NotImplementedError()
+ def decode(self):
+ """
+ Decodes body based on the current Content-Encoding header, then
+ removes the header. If there is no Content-Encoding header, no
+ action is taken.
+
+ Returns:
+ True, if decoding succeeded.
+ False, otherwise.
+ """
+ ce = self.headers.get("content-encoding")
+ data = encoding.decode(ce, self.content)
+ if data is None:
+ return False
+ self.content = data
+ self.headers.pop("content-encoding", None)
+ return True
+
+ def encode(self, e):
+ """
+ Encodes body with the encoding e, where e is "gzip", "deflate" or "identity".
+
+ Returns:
+ True, if decoding succeeded.
+ False, otherwise.
+ """
+ data = encoding.encode(e, self.content)
+ if data is None:
+ return False
+ self.content = data
+ self.headers["content-encoding"] = e
+ return True
+
# Legacy
@property
- def body(self):
+ def body(self): # pragma: nocover
warnings.warn(".body is deprecated, use .content instead.", DeprecationWarning)
return self.content
@body.setter
- def body(self, body):
+ def body(self, body): # pragma: nocover
warnings.warn(".body is deprecated, use .content instead.", DeprecationWarning)
self.content = body
@@ -146,8 +179,7 @@ class decoded(object):
def __enter__(self):
if self.ce:
- if not self.message.decode():
- self.ce = None
+ self.message.decode()
def __exit__(self, type, value, tb):
if self.ce:
diff --git a/netlib/http/request.py b/netlib/http/request.py
index f8a3b5b9..325c0080 100644
--- a/netlib/http/request.py
+++ b/netlib/http/request.py
@@ -102,7 +102,7 @@ class Request(Message):
or inferred from the proxy mode (e.g. an IP in transparent mode).
"""
- if six.PY2:
+ if six.PY2: # pragma: nocover
return self.data.host
if not self.data.host:
@@ -303,58 +303,58 @@ class Request(Message):
# Legacy
- def get_cookies(self):
+ def get_cookies(self): # pragma: nocover
warnings.warn(".get_cookies is deprecated, use .cookies instead.", DeprecationWarning)
return self.cookies
- def set_cookies(self, odict):
+ def set_cookies(self, odict): # pragma: nocover
warnings.warn(".set_cookies is deprecated, use .cookies instead.", DeprecationWarning)
self.cookies = odict
- def get_query(self):
+ def get_query(self): # pragma: nocover
warnings.warn(".get_query is deprecated, use .query instead.", DeprecationWarning)
return self.query or ODict([])
- def set_query(self, odict):
+ def set_query(self, odict): # pragma: nocover
warnings.warn(".set_query is deprecated, use .query instead.", DeprecationWarning)
self.query = odict
- def get_path_components(self):
+ def get_path_components(self): # pragma: nocover
warnings.warn(".get_path_components is deprecated, use .path_components instead.", DeprecationWarning)
return self.path_components
- def set_path_components(self, lst):
+ def set_path_components(self, lst): # pragma: nocover
warnings.warn(".set_path_components is deprecated, use .path_components instead.", DeprecationWarning)
self.path_components = lst
- def get_form_urlencoded(self):
+ def get_form_urlencoded(self): # pragma: nocover
warnings.warn(".get_form_urlencoded is deprecated, use .urlencoded_form instead.", DeprecationWarning)
return self.urlencoded_form or ODict([])
- def set_form_urlencoded(self, odict):
+ def set_form_urlencoded(self, odict): # pragma: nocover
warnings.warn(".set_form_urlencoded is deprecated, use .urlencoded_form instead.", DeprecationWarning)
self.urlencoded_form = odict
- def get_form_multipart(self):
+ def get_form_multipart(self): # pragma: nocover
warnings.warn(".get_form_multipart is deprecated, use .multipart_form instead.", DeprecationWarning)
return self.multipart_form or ODict([])
@property
- def form_in(self):
+ def form_in(self): # pragma: nocover
warnings.warn(".form_in is deprecated, use .first_line_format instead.", DeprecationWarning)
return self.first_line_format
@form_in.setter
- def form_in(self, form_in):
+ def form_in(self, form_in): # pragma: nocover
warnings.warn(".form_in is deprecated, use .first_line_format instead.", DeprecationWarning)
self.first_line_format = form_in
@property
- def form_out(self):
+ def form_out(self): # pragma: nocover
warnings.warn(".form_out is deprecated, use .first_line_format instead.", DeprecationWarning)
return self.first_line_format
@form_out.setter
- def form_out(self, form_out):
+ def form_out(self, form_out): # pragma: nocover
warnings.warn(".form_out is deprecated, use .first_line_format instead.", DeprecationWarning)
self.first_line_format = form_out \ No newline at end of file
diff --git a/netlib/http/response.py b/netlib/http/response.py
index 7d64243d..db31d2b9 100644
--- a/netlib/http/response.py
+++ b/netlib/http/response.py
@@ -106,20 +106,20 @@ class Response(Message):
# Legacy
- def get_cookies(self):
+ def get_cookies(self): # pragma: nocover
warnings.warn(".get_cookies is deprecated, use .cookies instead.", DeprecationWarning)
return self.cookies
- def set_cookies(self, odict):
+ def set_cookies(self, odict): # pragma: nocover
warnings.warn(".set_cookies is deprecated, use .cookies instead.", DeprecationWarning)
self.cookies = odict
@property
- def msg(self):
+ def msg(self): # pragma: nocover
warnings.warn(".msg is deprecated, use .reason instead.", DeprecationWarning)
return self.reason
@msg.setter
- def msg(self, reason):
+ def msg(self, reason): # pragma: nocover
warnings.warn(".msg is deprecated, use .reason instead.", DeprecationWarning)
self.reason = reason
diff --git a/netlib/http/status_codes.py b/netlib/http/status_codes.py
index dc09f465..8a4dc1f5 100644
--- a/netlib/http/status_codes.py
+++ b/netlib/http/status_codes.py
@@ -1,4 +1,4 @@
-from __future__ import (absolute_import, print_function, division)
+from __future__ import absolute_import, print_function, division
CONTINUE = 100
SWITCHING = 101
@@ -37,6 +37,7 @@ REQUEST_URI_TOO_LONG = 414
UNSUPPORTED_MEDIA_TYPE = 415
REQUESTED_RANGE_NOT_SATISFIABLE = 416
EXPECTATION_FAILED = 417
+IM_A_TEAPOT = 418
INTERNAL_SERVER_ERROR = 500
NOT_IMPLEMENTED = 501
@@ -91,6 +92,7 @@ RESPONSES = {
UNSUPPORTED_MEDIA_TYPE: "Unsupported Media Type",
REQUESTED_RANGE_NOT_SATISFIABLE: "Requested Range not satisfiable",
EXPECTATION_FAILED: "Expectation Failed",
+ IM_A_TEAPOT: "I'm a teapot",
# 500
INTERNAL_SERVER_ERROR: "Internal Server Error",
diff --git a/test/http/http1/test_read.py b/test/http/http1/test_read.py
index fadfe446..a0085db9 100644
--- a/test/http/http1/test_read.py
+++ b/test/http/http1/test_read.py
@@ -2,7 +2,7 @@ from __future__ import absolute_import, print_function, division
from io import BytesIO
import textwrap
from mock import Mock
-from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect
+from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect, TcpDisconnect
from netlib.http import Headers
from netlib.http.http1.read import (
read_request, read_response, read_request_head,
@@ -100,6 +100,11 @@ class TestReadBody(object):
with raises(HttpException):
b"".join(read_body(rfile, -1, 3))
+ def test_max_chunk_size(self):
+ rfile = BytesIO(b"123456")
+ assert list(read_body(rfile, -1, max_chunk_size=None)) == [b"123456"]
+ rfile = BytesIO(b"123456")
+ assert list(read_body(rfile, -1, max_chunk_size=1)) == [b"1", b"2", b"3", b"4", b"5", b"6"]
def test_connection_close():
headers = Headers()
@@ -169,6 +174,11 @@ def test_get_first_line():
rfile = BytesIO(b"")
_get_first_line(rfile)
+ with raises(HttpReadDisconnect):
+ rfile = Mock()
+ rfile.readline.side_effect = TcpDisconnect
+ _get_first_line(rfile)
+
with raises(HttpSyntaxException):
rfile = BytesIO(b"GET /\xff HTTP/1.1")
_get_first_line(rfile)
@@ -191,7 +201,8 @@ def test_read_request_line():
t(b"GET / WTF/1.1")
with raises(HttpSyntaxException):
t(b"this is not http")
-
+ with raises(HttpReadDisconnect):
+ t(b"")
def test_parse_authority_form():
assert _parse_authority_form(b"foo:42") == (b"foo", 42)
@@ -218,6 +229,8 @@ def test_read_response_line():
t(b"HTTP/1.1 OK OK")
with raises(HttpSyntaxException):
t(b"WTF/1.1 200 OK")
+ with raises(HttpReadDisconnect):
+ t(b"")
def test_check_http_version():
diff --git a/test/http/test_headers.py b/test/http/test_headers.py
index f1af1feb..8bddc0b2 100644
--- a/test/http/test_headers.py
+++ b/test/http/test_headers.py
@@ -38,6 +38,9 @@ class TestHeaders(object):
assert headers["Host"] == "example.com"
assert headers["Accept"] == "text/plain"
+ with raises(ValueError):
+ Headers([[b"Host", u"not-bytes"]])
+
def test_getitem(self):
headers = Headers(Host="example.com")
assert headers["Host"] == "example.com"
diff --git a/test/http/test_message.py b/test/http/test_message.py
new file mode 100644
index 00000000..b0b7e27f
--- /dev/null
+++ b/test/http/test_message.py
@@ -0,0 +1,136 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import, print_function, division
+
+from netlib.http import decoded
+from netlib.tutils import tresp
+
+
+def _test_passthrough_attr(message, attr):
+ def t(self=None):
+ assert getattr(message, attr) == getattr(message.data, attr)
+ setattr(message, attr, "foo")
+ assert getattr(message.data, attr) == "foo"
+ return t
+
+
+def _test_decoded_attr(message, attr):
+ def t(self=None):
+ assert getattr(message, attr) == getattr(message.data, attr).decode("utf8")
+ # Set str, get raw bytes
+ setattr(message, attr, "foo")
+ assert getattr(message.data, attr) == b"foo"
+ # Set raw bytes, get decoded
+ setattr(message.data, attr, b"bar")
+ assert getattr(message, attr) == "bar"
+ # Set bytes, get raw bytes
+ setattr(message, attr, b"baz")
+ assert getattr(message.data, attr) == b"baz"
+
+ # Set UTF8
+ setattr(message, attr, "Non-Autorisé")
+ assert getattr(message.data, attr) == b"Non-Autoris\xc3\xa9"
+ # Don't fail on garbage
+ setattr(message.data, attr, b"foo\xFF\x00bar")
+ assert getattr(message, attr).startswith("foo")
+ assert getattr(message, attr).endswith("bar")
+ # foo.bar = foo.bar should not cause any side effects.
+ d = getattr(message, attr)
+ setattr(message, attr, d)
+ assert getattr(message.data, attr) == b"foo\xFF\x00bar"
+ return t
+
+
+class TestMessage(object):
+
+ def test_init(self):
+ resp = tresp()
+ assert resp.data
+
+ def test_eq_ne(self):
+ resp = tresp(timestamp_start=42, timestamp_end=42)
+ same = tresp(timestamp_start=42, timestamp_end=42)
+ assert resp == same
+ assert not resp != same
+
+ other = tresp(timestamp_start=0, timestamp_end=0)
+ assert not resp == other
+ assert resp != other
+
+ assert resp != 0
+
+ def test_content_length_update(self):
+ resp = tresp()
+ resp.content = b"foo"
+ assert resp.data.content == b"foo"
+ assert resp.headers["content-length"] == "3"
+ resp.content = b""
+ assert resp.data.content == b""
+ assert resp.headers["content-length"] == "0"
+
+ test_content_basic = _test_passthrough_attr(tresp(), "content")
+ test_headers = _test_passthrough_attr(tresp(), "headers")
+ test_timestamp_start = _test_passthrough_attr(tresp(), "timestamp_start")
+ test_timestamp_end = _test_passthrough_attr(tresp(), "timestamp_end")
+
+ test_http_version = _test_decoded_attr(tresp(), "http_version")
+
+
+class TestDecodedDecorator(object):
+
+ def test_simple(self):
+ r = tresp()
+ assert r.content == b"message"
+ assert "content-encoding" not in r.headers
+ assert r.encode("gzip")
+
+ assert r.headers["content-encoding"]
+ assert r.content != b"message"
+ with decoded(r):
+ assert "content-encoding" not in r.headers
+ assert r.content == b"message"
+ assert r.headers["content-encoding"]
+ assert r.content != b"message"
+
+ def test_modify(self):
+ r = tresp()
+ assert "content-encoding" not in r.headers
+ assert r.encode("gzip")
+
+ with decoded(r):
+ r.content = b"foo"
+
+ assert r.content != b"foo"
+ r.decode()
+ assert r.content == b"foo"
+
+ def test_unknown_ce(self):
+ r = tresp()
+ r.headers["content-encoding"] = "zopfli"
+ r.content = b"foo"
+ with decoded(r):
+ assert r.headers["content-encoding"]
+ assert r.content == b"foo"
+ assert r.headers["content-encoding"]
+ assert r.content == b"foo"
+
+ def test_cannot_decode(self):
+ r = tresp()
+ assert r.encode("gzip")
+ r.content = b"foo"
+ with decoded(r):
+ assert r.headers["content-encoding"]
+ assert r.content == b"foo"
+ assert r.headers["content-encoding"]
+ assert r.content != b"foo"
+ r.decode()
+ assert r.content == b"foo"
+
+ def test_cannot_encode(self):
+ r = tresp()
+ assert r.encode("gzip")
+ with decoded(r):
+ r.content = None
+
+ assert "content-encoding" not in r.headers
+ assert r.content is None
+
diff --git a/test/http/test_models.py b/test/http/test_models.py
index aa267944..76a05446 100644
--- a/test/http/test_models.py
+++ b/test/http/test_models.py
@@ -1,271 +1,7 @@
-import mock
from netlib import tutils
-from netlib import utils
from netlib.odict import ODict, ODictCaseless
-from netlib.http import Request, Response, Headers, CONTENT_MISSING
-
-class TestRequest(object):
- def test_repr(self):
- r = tutils.treq()
- assert repr(r)
-
- def test_headers(self):
- tutils.raises(AssertionError, Request,
- 'form_in',
- 'method',
- 'scheme',
- 'host',
- 'port',
- 'path',
- b"HTTP/1.1",
- 'foobar',
- )
-
- req = Request(
- 'form_in',
- 'method',
- 'scheme',
- 'host',
- 'port',
- 'path',
- b"HTTP/1.1",
- )
- assert isinstance(req.headers, Headers)
-
- def test_equal(self):
- a = tutils.treq(timestamp_start=42, timestamp_end=43)
- b = tutils.treq(timestamp_start=42, timestamp_end=43)
- assert a == b
- assert not a != b
-
- assert not a == 'foo'
- assert not b == 'foo'
- assert not 'foo' == a
- assert not 'foo' == b
-
-
- def test_anticache(self):
- req = tutils.treq()
- req.headers["If-Modified-Since"] = "foo"
- req.headers["If-None-Match"] = "bar"
- req.anticache()
- assert "If-Modified-Since" not in req.headers
- assert "If-None-Match" not in req.headers
-
- def test_anticomp(self):
- req = tutils.treq()
- req.headers["Accept-Encoding"] = "foobar"
- req.anticomp()
- assert req.headers["Accept-Encoding"] == "identity"
-
- def test_constrain_encoding(self):
- req = tutils.treq()
- req.headers["Accept-Encoding"] = "identity, gzip, foo"
- req.constrain_encoding()
- assert "foo" not in req.headers["Accept-Encoding"]
-
- def test_update_host(self):
- req = tutils.treq()
- req.headers["Host"] = ""
- req.host = "foobar"
- assert req.headers["Host"] == "foobar"
-
- def test_get_form_urlencoded(self):
- req = tutils.treq(content="foobar")
- assert req.get_form_urlencoded() == ODict()
-
- req.headers["Content-Type"] = "application/x-www-form-urlencoded"
- assert req.get_form_urlencoded() == ODict(utils.urldecode(req.body))
-
- def test_get_form_multipart(self):
- req = tutils.treq(content="foobar")
- assert req.get_form_multipart() == ODict()
-
- req.headers["Content-Type"] = "multipart/form-data"
- assert req.get_form_multipart() == ODict(
- utils.multipartdecode(
- req.headers,
- req.body
- )
- )
-
- def test_set_form_urlencoded(self):
- req = tutils.treq()
- req.set_form_urlencoded(ODict([('foo', 'bar'), ('rab', 'oof')]))
- assert req.headers["Content-Type"] == "application/x-www-form-urlencoded"
- assert req.body
-
- def test_get_path_components(self):
- req = tutils.treq()
- assert req.get_path_components()
- # TODO: add meaningful assertions
-
- def test_set_path_components(self):
- req = tutils.treq()
- req.set_path_components([b"foo", b"bar"])
- # TODO: add meaningful assertions
-
- def test_get_query(self):
- req = tutils.treq()
- assert req.get_query().lst == []
-
- req.url = "http://localhost:80/foo?bar=42"
- assert req.get_query().lst == [("bar", "42")]
-
- def test_set_query(self):
- req = tutils.treq()
- req.set_query(ODict([]))
-
- def test_pretty_host(self):
- r = tutils.treq()
- assert r.pretty_host == "address"
- assert r.host == "address"
- r.headers["host"] = "other"
- assert r.pretty_host == "other"
- assert r.host == "address"
- r.host = None
- assert r.pretty_host is None
- assert r.host is None
-
- # Invalid IDNA
- r.headers["host"] = ".disqus.com"
- assert r.pretty_host == ".disqus.com"
-
- def test_pretty_url(self):
- req = tutils.treq(first_line_format="relative")
- assert req.pretty_url == "http://address:22/path"
- assert req.url == "http://address:22/path"
-
- def test_get_cookies_none(self):
- headers = Headers()
- r = tutils.treq()
- r.headers = headers
- assert len(r.get_cookies()) == 0
-
- def test_get_cookies_single(self):
- r = tutils.treq()
- r.headers = Headers(cookie="cookiename=cookievalue")
- result = r.get_cookies()
- assert len(result) == 1
- assert result['cookiename'] == ['cookievalue']
-
- def test_get_cookies_double(self):
- r = tutils.treq()
- r.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue")
- result = r.get_cookies()
- assert len(result) == 2
- assert result['cookiename'] == ['cookievalue']
- assert result['othercookiename'] == ['othercookievalue']
-
- def test_get_cookies_withequalsign(self):
- r = tutils.treq()
- r.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue")
- result = r.get_cookies()
- assert len(result) == 2
- assert result['cookiename'] == ['coo=kievalue']
- assert result['othercookiename'] == ['othercookievalue']
-
- def test_set_cookies(self):
- r = tutils.treq()
- r.headers = Headers(cookie="cookiename=cookievalue")
- result = r.get_cookies()
- result["cookiename"] = ["foo"]
- r.set_cookies(result)
- assert r.get_cookies()["cookiename"] == ["foo"]
-
- def test_set_url(self):
- r = tutils.treq(first_line_format="absolute")
- r.url = b"https://otheraddress:42/ORLY"
- assert r.scheme == "https"
- assert r.host == "otheraddress"
- assert r.port == 42
- assert r.path == "/ORLY"
-
- try:
- r.url = "//localhost:80/foo@bar"
- assert False
- except:
- assert True
-
- # def test_asterisk_form_in(self):
- # f = tutils.tflow(req=None)
- # protocol = mock_protocol("OPTIONS * HTTP/1.1")
- # f.request = HTTPRequest.from_protocol(protocol)
- #
- # assert f.request.first_line_format == "relative"
- # f.request.host = f.server_conn.address.host
- # f.request.port = f.server_conn.address.port
- # f.request.scheme = "http"
- # assert protocol.assemble(f.request) == (
- # "OPTIONS * HTTP/1.1\r\n"
- # "Host: address:22\r\n"
- # "Content-Length: 0\r\n\r\n")
- #
- # def test_relative_form_in(self):
- # protocol = mock_protocol("GET /foo\xff HTTP/1.1")
- # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
- #
- # protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c")
- # r = HTTPRequest.from_protocol(protocol)
- # assert r.headers["Upgrade"] == ["h2c"]
- #
- # def test_expect_header(self):
- # protocol = mock_protocol(
- # "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar")
- # r = HTTPRequest.from_protocol(protocol)
- # assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
- # assert r.content == "foo"
- # assert protocol.tcp_handler.rfile.read(3) == "bar"
- #
- # def test_authority_form_in(self):
- # protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1")
- # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
- #
- # protocol = mock_protocol("CONNECT address:22 HTTP/1.1")
- # r = HTTPRequest.from_protocol(protocol)
- # r.scheme, r.host, r.port = "http", "address", 22
- # assert protocol.assemble(r) == (
- # "CONNECT address:22 HTTP/1.1\r\n"
- # "Host: address:22\r\n"
- # "Content-Length: 0\r\n\r\n")
- # assert r.pretty_url == "address:22"
- #
- # def test_absolute_form_in(self):
- # protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1")
- # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol)
- #
- # protocol = mock_protocol("GET http://address:22/ HTTP/1.1")
- # r = HTTPRequest.from_protocol(protocol)
- # assert protocol.assemble(r) == (
- # "GET http://address:22/ HTTP/1.1\r\n"
- # "Host: address:22\r\n"
- # "Content-Length: 0\r\n\r\n")
- #
- # def test_http_options_relative_form_in(self):
- # """
- # Exercises fix for Issue #392.
- # """
- # protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1")
- # r = HTTPRequest.from_protocol(protocol)
- # r.host = 'address'
- # r.port = 80
- # r.scheme = "http"
- # assert protocol.assemble(r) == (
- # "OPTIONS /secret/resource HTTP/1.1\r\n"
- # "Host: address\r\n"
- # "Content-Length: 0\r\n\r\n")
- #
- # def test_http_options_absolute_form_in(self):
- # protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1")
- # r = HTTPRequest.from_protocol(protocol)
- # r.host = 'address'
- # r.port = 80
- # r.scheme = "http"
- # assert protocol.assemble(r) == (
- # "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n"
- # "Host: address\r\n"
- # "Content-Length: 0\r\n\r\n")
+from netlib.http import Response, Headers, CONTENT_MISSING
class TestResponse(object):
def test_headers(self):
diff --git a/test/http/test_request.py b/test/http/test_request.py
index 02fac3df..15bdd3e3 100644
--- a/test/http/test_request.py
+++ b/test/http/test_request.py
@@ -1,3 +1,230 @@
+# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division
-# TODO \ No newline at end of file
+import six
+
+from netlib import utils
+from netlib.http import Headers
+from netlib.odict import ODict
+from netlib.tutils import treq, raises
+from .test_message import _test_decoded_attr, _test_passthrough_attr
+
+
+class TestRequestData(object):
+ def test_init(self):
+ with raises(AssertionError):
+ treq(headers="foobar")
+
+ assert isinstance(treq(headers=None).headers, Headers)
+
+ def test_eq_ne(self):
+ request_data = treq().data
+ same = treq().data
+ assert request_data == same
+ assert not request_data != same
+
+ other = treq(content=b"foo").data
+ assert not request_data == other
+ assert request_data != other
+
+ assert request_data != 0
+
+
+class TestRequestCore(object):
+ def test_repr(self):
+ request = treq()
+ assert repr(request) == "Request(GET address:22/path)"
+ request.host = None
+ assert repr(request) == "Request(GET /path)"
+
+ test_first_line_format = _test_passthrough_attr(treq(), "first_line_format")
+ test_method = _test_decoded_attr(treq(), "method")
+ test_scheme = _test_decoded_attr(treq(), "scheme")
+ test_port = _test_passthrough_attr(treq(), "port")
+ test_path = _test_decoded_attr(treq(), "path")
+
+ def test_host(self):
+ if six.PY2:
+ from unittest import SkipTest
+ raise SkipTest()
+
+ request = treq()
+ assert request.host == request.data.host.decode("idna")
+
+ # Test IDNA encoding
+ # Set str, get raw bytes
+ request.host = "ídna.example"
+ assert request.data.host == b"xn--dna-qma.example"
+ # Set raw bytes, get decoded
+ request.data.host = b"xn--idn-gla.example"
+ assert request.host == "idná.example"
+ # Set bytes, get raw bytes
+ request.host = b"xn--dn-qia9b.example"
+ assert request.data.host == b"xn--dn-qia9b.example"
+ # IDNA encoding is not bijective
+ request.host = "fußball"
+ assert request.host == "fussball"
+
+ # Don't fail on garbage
+ request.data.host = b"foo\xFF\x00bar"
+ assert request.host.startswith("foo")
+ assert request.host.endswith("bar")
+ # foo.bar = foo.bar should not cause any side effects.
+ d = request.host
+ request.host = d
+ assert request.data.host == b"foo\xFF\x00bar"
+
+ def test_host_header_update(self):
+ request = treq()
+ assert "host" not in request.headers
+ request.host = "example.com"
+ assert "host" not in request.headers
+
+ request.headers["Host"] = "foo"
+ request.host = "example.org"
+ assert request.headers["Host"] == "example.org"
+
+
+class TestRequestUtils(object):
+ def test_url(self):
+ request = treq()
+ assert request.url == "http://address:22/path"
+
+ request.url = "https://otheraddress:42/foo"
+ assert request.scheme == "https"
+ assert request.host == "otheraddress"
+ assert request.port == 42
+ assert request.path == "/foo"
+
+ with raises(ValueError):
+ request.url = "not-a-url"
+
+ def test_pretty_host(self):
+ request = treq()
+ assert request.pretty_host == "address"
+ assert request.host == "address"
+ request.headers["host"] = "other"
+ assert request.pretty_host == "other"
+ assert request.host == "address"
+ request.host = None
+ assert request.pretty_host is None
+ assert request.host is None
+
+ # Invalid IDNA
+ request.headers["host"] = ".disqus.com"
+ assert request.pretty_host == ".disqus.com"
+
+ def test_pretty_url(self):
+ request = treq()
+ assert request.url == "http://address:22/path"
+ assert request.pretty_url == "http://address:22/path"
+ request.headers["host"] = "other"
+ assert request.pretty_url == "http://other:22/path"
+
+ def test_pretty_url_authority(self):
+ request = treq(first_line_format="authority")
+ assert request.pretty_url == "address:22"
+
+ def test_get_query(self):
+ request = treq()
+ assert request.query is None
+
+ request.url = "http://localhost:80/foo?bar=42"
+ assert request.query.lst == [("bar", "42")]
+
+ def test_set_query(self):
+ request = treq()
+ request.query = ODict([])
+
+ def test_get_cookies_none(self):
+ request = treq()
+ request.headers = Headers()
+ assert len(request.cookies) == 0
+
+ def test_get_cookies_single(self):
+ request = treq()
+ request.headers = Headers(cookie="cookiename=cookievalue")
+ result = request.cookies
+ assert len(result) == 1
+ assert result['cookiename'] == ['cookievalue']
+
+ def test_get_cookies_double(self):
+ request = treq()
+ request.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue")
+ result = request.cookies
+ assert len(result) == 2
+ assert result['cookiename'] == ['cookievalue']
+ assert result['othercookiename'] == ['othercookievalue']
+
+ def test_get_cookies_withequalsign(self):
+ request = treq()
+ request.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue")
+ result = request.cookies
+ assert len(result) == 2
+ assert result['cookiename'] == ['coo=kievalue']
+ assert result['othercookiename'] == ['othercookievalue']
+
+ def test_set_cookies(self):
+ request = treq()
+ request.headers = Headers(cookie="cookiename=cookievalue")
+ result = request.cookies
+ result["cookiename"] = ["foo"]
+ request.cookies = result
+ assert request.cookies["cookiename"] == ["foo"]
+
+ def test_get_path_components(self):
+ request = treq(path=b"/foo/bar")
+ assert request.path_components == ["foo", "bar"]
+
+ def test_set_path_components(self):
+ request = treq()
+ request.path_components = ["foo", "baz"]
+ assert request.path == "/foo/baz"
+ request.path_components = []
+ assert request.path == "/"
+
+ def test_anticache(self):
+ request = treq()
+ request.headers["If-Modified-Since"] = "foo"
+ request.headers["If-None-Match"] = "bar"
+ request.anticache()
+ assert "If-Modified-Since" not in request.headers
+ assert "If-None-Match" not in request.headers
+
+ def test_anticomp(self):
+ request = treq()
+ request.headers["Accept-Encoding"] = "foobar"
+ request.anticomp()
+ assert request.headers["Accept-Encoding"] == "identity"
+
+ def test_constrain_encoding(self):
+ request = treq()
+ request.headers["Accept-Encoding"] = "identity, gzip, foo"
+ request.constrain_encoding()
+ assert "foo" not in request.headers["Accept-Encoding"]
+ assert "gzip" in request.headers["Accept-Encoding"]
+
+ def test_get_urlencoded_form(self):
+ request = treq(content="foobar")
+ assert request.urlencoded_form is None
+
+ request.headers["Content-Type"] = "application/x-www-form-urlencoded"
+ assert request.urlencoded_form == ODict(utils.urldecode(request.content))
+
+ def test_set_urlencoded_form(self):
+ request = treq()
+ request.urlencoded_form = ODict([('foo', 'bar'), ('rab', 'oof')])
+ assert request.headers["Content-Type"] == "application/x-www-form-urlencoded"
+ assert request.content
+
+ def test_get_multipart_form(self):
+ request = treq(content="foobar")
+ assert request.multipart_form is None
+
+ request.headers["Content-Type"] = "multipart/form-data"
+ assert request.multipart_form == ODict(
+ utils.multipartdecode(
+ request.headers,
+ request.content
+ )
+ )
diff --git a/test/http/test_status_codes.py b/test/http/test_status_codes.py
new file mode 100644
index 00000000..9fea6b70
--- /dev/null
+++ b/test/http/test_status_codes.py
@@ -0,0 +1,6 @@
+from netlib.http import status_codes
+
+
+def test_simple():
+ assert status_codes.IM_A_TEAPOT == 418
+ assert status_codes.RESPONSES[418] == "I'm a teapot"