diff options
28 files changed, 1320 insertions, 853 deletions
diff --git a/netlib/encoding.py b/netlib/encoding.py index 4c11273b..14479e00 100644 --- a/netlib/encoding.py +++ b/netlib/encoding.py @@ -12,6 +12,8 @@ ENCODINGS = {"identity", "gzip", "deflate"} def decode(e, content): + if not isinstance(content, bytes): + return None encoding_map = { "identity": identity, "gzip": decode_gzip, @@ -23,6 +25,8 @@ def decode(e, content): def encode(e, content): + if not isinstance(content, bytes): + return None encoding_map = { "identity": identity, "gzip": encode_gzip, diff --git a/netlib/http/__init__.py b/netlib/http/__init__.py index 0ccf6b32..fd632cd5 100644 --- a/netlib/http/__init__.py +++ b/netlib/http/__init__.py @@ -1,14 +1,14 @@ from __future__ import absolute_import, print_function, division +from .request import Request +from .response import Response from .headers import Headers -from .models import Request, Response -from .models import ALPN_PROTO_HTTP1, ALPN_PROTO_H2 -from .models import HDR_FORM_MULTIPART, HDR_FORM_URLENCODED, CONTENT_MISSING +from .message import decoded, CONTENT_MISSING from . import http1, http2 __all__ = [ + "Request", + "Response", "Headers", - "Request", "Response", - "ALPN_PROTO_HTTP1", "ALPN_PROTO_H2", - "HDR_FORM_MULTIPART", "HDR_FORM_URLENCODED", "CONTENT_MISSING", + "decoded", "CONTENT_MISSING", "http1", "http2", ] diff --git a/netlib/http/cookies.py b/netlib/http/cookies.py index 78b03a83..18544b5e 100644 --- a/netlib/http/cookies.py +++ b/netlib/http/cookies.py @@ -58,6 +58,7 @@ def _read_quoted_string(s, start): escaping = False ret = [] # Skip the first quote + i = start # initialize in case the loop doesn't run. for i in range(start + 1, len(s)): if escaping: ret.append(s[i]) diff --git a/netlib/http/headers.py b/netlib/http/headers.py index 613beb4f..f64e6200 100644 --- a/netlib/http/headers.py +++ b/netlib/http/headers.py @@ -8,15 +8,15 @@ from __future__ import absolute_import, print_function, division import copy try: from collections.abc import MutableMapping -except ImportError: # Workaround for Python < 3.3 - from collections import MutableMapping +except ImportError: # pragma: nocover + from collections import MutableMapping # Workaround for Python < 3.3 import six from netlib.utils import always_byte_args, always_bytes -if six.PY2: +if six.PY2: # pragma: nocover _native = lambda x: x _always_bytes = lambda x: x _always_byte_args = lambda x: x @@ -27,7 +27,7 @@ else: _always_byte_args = always_byte_args("utf-8", "surrogateescape") -class Headers(MutableMapping, object): +class Headers(MutableMapping): """ Header class which allows both convenient access to individual headers as well as direct access to the underlying raw data. Provides a full dictionary interface. @@ -36,12 +36,8 @@ class Headers(MutableMapping, object): .. code-block:: python - # Create header from a list of (header_name, header_value) tuples - >>> h = Headers([ - ["Host","example.com"], - ["Accept","text/html"], - ["accept","application/xml"] - ]) + # Create headers with keyword arguments + >>> h = Headers(host="example.com", content_type="application/xml") # Headers mostly behave like a normal dict. >>> h["Host"] @@ -51,6 +47,13 @@ class Headers(MutableMapping, object): >>> h["host"] "example.com" + # Headers can also be creatd from a list of raw (header_name, header_value) byte tuples + >>> h = Headers([ + [b"Host",b"example.com"], + [b"Accept",b"text/html"], + [b"accept",b"application/xml"] + ]) + # Multiple headers are folded into a single header as per RFC7230 >>> h["Accept"] "text/html, application/xml" @@ -60,17 +63,14 @@ class Headers(MutableMapping, object): >>> h["Accept"] "application/text" - # str(h) returns a HTTP1 header block. - >>> print(h) + # bytes(h) returns a HTTP1 header block. + >>> print(bytes(h)) Host: example.com Accept: application/text # For full control, the raw header fields can be accessed >>> h.fields - # Headers can also be crated from keyword arguments - >>> h = Headers(host="example.com", content_type="application/xml") - Caveats: For use with the "Set-Cookie" header, see :py:meth:`get_all`. """ @@ -79,8 +79,8 @@ class Headers(MutableMapping, object): def __init__(self, fields=None, **headers): """ Args: - fields: (optional) list of ``(name, value)`` header tuples, - e.g. ``[("Host","example.com")]``. All names and values must be bytes. + fields: (optional) list of ``(name, value)`` header byte tuples, + e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes. **headers: Additional headers to set. Will overwrite existing values from `fields`. For convenience, underscores in header names will be transformed to dashes - this behaviour does not extend to other methods. @@ -106,7 +106,7 @@ class Headers(MutableMapping, object): else: return b"" - if six.PY2: + if six.PY2: # pragma: nocover __str__ = __bytes__ @_always_byte_args diff --git a/netlib/http/http1/assemble.py b/netlib/http/http1/assemble.py index 88aeac05..785ee8d3 100644 --- a/netlib/http/http1/assemble.py +++ b/netlib/http/http1/assemble.py @@ -7,30 +7,30 @@ from .. import CONTENT_MISSING def assemble_request(request): - if request.body == CONTENT_MISSING: + if request.content == CONTENT_MISSING: raise HttpException("Cannot assemble flow with CONTENT_MISSING") head = assemble_request_head(request) - body = b"".join(assemble_body(request.headers, [request.body])) + body = b"".join(assemble_body(request.data.headers, [request.data.content])) return head + body def assemble_request_head(request): - first_line = _assemble_request_line(request) - headers = _assemble_request_headers(request) + first_line = _assemble_request_line(request.data) + headers = _assemble_request_headers(request.data) return b"%s\r\n%s\r\n" % (first_line, headers) def assemble_response(response): - if response.body == CONTENT_MISSING: + if response.content == CONTENT_MISSING: raise HttpException("Cannot assemble flow with CONTENT_MISSING") head = assemble_response_head(response) - body = b"".join(assemble_body(response.headers, [response.body])) + body = b"".join(assemble_body(response.data.headers, [response.data.content])) return head + body def assemble_response_head(response): - first_line = _assemble_response_line(response) - headers = _assemble_response_headers(response) + first_line = _assemble_response_line(response.data) + headers = _assemble_response_headers(response.data) return b"%s\r\n%s\r\n" % (first_line, headers) @@ -45,51 +45,58 @@ def assemble_body(headers, body_chunks): yield chunk -def _assemble_request_line(request, form=None): - if form is None: - form = request.form_out +def _assemble_request_line(request_data): + """ + Args: + request_data (netlib.http.request.RequestData) + """ + form = request_data.first_line_format if form == "relative": return b"%s %s %s" % ( - request.method, - request.path, - request.http_version + request_data.method, + request_data.path, + request_data.http_version ) elif form == "authority": return b"%s %s:%d %s" % ( - request.method, - request.host, - request.port, - request.http_version + request_data.method, + request_data.host, + request_data.port, + request_data.http_version ) elif form == "absolute": return b"%s %s://%s:%d%s %s" % ( - request.method, - request.scheme, - request.host, - request.port, - request.path, - request.http_version + request_data.method, + request_data.scheme, + request_data.host, + request_data.port, + request_data.path, + request_data.http_version ) - else: # pragma: nocover + else: raise RuntimeError("Invalid request form") -def _assemble_request_headers(request): - headers = request.headers.copy() - if "host" not in headers and request.scheme and request.host and request.port: +def _assemble_request_headers(request_data): + """ + Args: + request_data (netlib.http.request.RequestData) + """ + headers = request_data.headers.copy() + if "host" not in headers and request_data.scheme and request_data.host and request_data.port: headers["host"] = utils.hostport( - request.scheme, - request.host, - request.port + request_data.scheme, + request_data.host, + request_data.port ) return bytes(headers) -def _assemble_response_line(response): +def _assemble_response_line(response_data): return b"%s %d %s" % ( - response.http_version, - response.status_code, - response.msg, + response_data.http_version, + response_data.status_code, + response_data.reason, ) diff --git a/netlib/http/http1/read.py b/netlib/http/http1/read.py index 73c7deed..0f6de26c 100644 --- a/netlib/http/http1/read.py +++ b/netlib/http/http1/read.py @@ -11,7 +11,7 @@ from .. import Request, Response, Headers def read_request(rfile, body_size_limit=None): request = read_request_head(rfile) expected_body_size = expected_http_body_size(request) - request._body = b"".join(read_body(rfile, expected_body_size, limit=body_size_limit)) + request.data.content = b"".join(read_body(rfile, expected_body_size, limit=body_size_limit)) request.timestamp_end = time.time() return request @@ -50,7 +50,7 @@ def read_request_head(rfile): def read_response(rfile, request, body_size_limit=None): response = read_response_head(rfile) expected_body_size = expected_http_body_size(request, response) - response._body = b"".join(read_body(rfile, expected_body_size, body_size_limit)) + response.data.content = b"".join(read_body(rfile, expected_body_size, body_size_limit)) response.timestamp_end = time.time() return response @@ -155,7 +155,7 @@ def connection_close(http_version, headers): # If we don't have a Connection header, HTTP 1.1 connections are assumed to # be persistent - return http_version != b"HTTP/1.1" + return http_version != "HTTP/1.1" and http_version != b"HTTP/1.1" # FIXME: Remove one case. def expected_http_body_size(request, response=None): @@ -184,11 +184,11 @@ def expected_http_body_size(request, response=None): if headers.get("expect", "").lower() == "100-continue": return 0 else: - if request.method.upper() == b"HEAD": + if request.method.upper() == "HEAD": return 0 if 100 <= response_code <= 199: return 0 - if response_code == 200 and request.method.upper() == b"CONNECT": + if response_code == 200 and request.method.upper() == "CONNECT": return 0 if response_code in (204, 304): return 0 diff --git a/netlib/http/http2/connections.py b/netlib/http/http2/connections.py index 5220d5d2..c493abe6 100644 --- a/netlib/http/http2/connections.py +++ b/netlib/http/http2/connections.py @@ -4,7 +4,7 @@ import time from hpack.hpack import Encoder, Decoder from ... import utils -from .. import Headers, Response, Request, ALPN_PROTO_H2 +from .. import Headers, Response, Request from . import frame @@ -283,7 +283,7 @@ class HTTP2Protocol(object): def check_alpn(self): alp = self.tcp_handler.get_alpn_proto_negotiated() - if alp != ALPN_PROTO_H2: + if alp != b'h2': raise NotImplementedError( "HTTP2Protocol can not handle unknown ALP: %s" % alp) return True diff --git a/netlib/http/http2/frame.py b/netlib/http/http2/frame.py index cb2cde99..188629d4 100644 --- a/netlib/http/http2/frame.py +++ b/netlib/http/http2/frame.py @@ -25,9 +25,6 @@ ERROR_CODES = BiDi( CLIENT_CONNECTION_PREFACE = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" -ALPN_PROTO_H2 = b'h2' - - class Frame(object): """ diff --git a/netlib/http/message.py b/netlib/http/message.py new file mode 100644 index 00000000..e4e799ca --- /dev/null +++ b/netlib/http/message.py @@ -0,0 +1,196 @@ +from __future__ import absolute_import, print_function, division + +import warnings + +import six + +from .. import encoding, utils + + +CONTENT_MISSING = 0 + +if six.PY2: # pragma: nocover + _native = lambda x: x + _always_bytes = lambda x: x +else: + # While the HTTP head _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded. + _native = lambda x: x.decode("utf-8", "surrogateescape") + _always_bytes = lambda x: utils.always_bytes(x, "utf-8", "surrogateescape") + + +class MessageData(object): + def __eq__(self, other): + if isinstance(other, MessageData): + return self.__dict__ == other.__dict__ + return False + + def __ne__(self, other): + return not self.__eq__(other) + + +class Message(object): + def __init__(self, data): + self.data = data + + def __eq__(self, other): + if isinstance(other, Message): + return self.data == other.data + return False + + def __ne__(self, other): + return not self.__eq__(other) + + @property + def headers(self): + """ + Message headers object + + Returns: + netlib.http.Headers + """ + return self.data.headers + + @headers.setter + def headers(self, h): + self.data.headers = h + + @property + def content(self): + """ + The raw (encoded) HTTP message body + + See also: :py:attr:`text` + """ + return self.data.content + + @content.setter + def content(self, content): + self.data.content = content + if isinstance(content, bytes): + self.headers["content-length"] = str(len(content)) + + @property + def http_version(self): + """ + Version string, e.g. "HTTP/1.1" + """ + return _native(self.data.http_version) + + @http_version.setter + def http_version(self, http_version): + self.data.http_version = _always_bytes(http_version) + + @property + def timestamp_start(self): + """ + First byte timestamp + """ + return self.data.timestamp_start + + @timestamp_start.setter + def timestamp_start(self, timestamp_start): + self.data.timestamp_start = timestamp_start + + @property + def timestamp_end(self): + """ + Last byte timestamp + """ + return self.data.timestamp_end + + @timestamp_end.setter + def timestamp_end(self, timestamp_end): + self.data.timestamp_end = timestamp_end + + @property + def text(self): + """ + The decoded HTTP message body. + Decoded contents are not cached, so accessing this attribute repeatedly is relatively expensive. + + .. note:: + This is not implemented yet. + + See also: :py:attr:`content`, :py:class:`decoded` + """ + # This attribute should be called text, because that's what requests does. + raise NotImplementedError() + + @text.setter + def text(self, text): + raise NotImplementedError() + + def decode(self): + """ + Decodes body based on the current Content-Encoding header, then + removes the header. If there is no Content-Encoding header, no + action is taken. + + Returns: + True, if decoding succeeded. + False, otherwise. + """ + ce = self.headers.get("content-encoding") + data = encoding.decode(ce, self.content) + if data is None: + return False + self.content = data + self.headers.pop("content-encoding", None) + return True + + def encode(self, e): + """ + Encodes body with the encoding e, where e is "gzip", "deflate" or "identity". + + Returns: + True, if decoding succeeded. + False, otherwise. + """ + data = encoding.encode(e, self.content) + if data is None: + return False + self.content = data + self.headers["content-encoding"] = e + return True + + # Legacy + + @property + def body(self): # pragma: nocover + warnings.warn(".body is deprecated, use .content instead.", DeprecationWarning) + return self.content + + @body.setter + def body(self, body): # pragma: nocover + warnings.warn(".body is deprecated, use .content instead.", DeprecationWarning) + self.content = body + + +class decoded(object): + """ + A context manager that decodes a request or response, and then + re-encodes it with the same encoding after execution of the block. + + Example: + + .. code-block:: python + + with decoded(request): + request.content = request.content.replace("foo", "bar") + """ + + def __init__(self, message): + self.message = message + ce = message.headers.get("content-encoding") + if ce in encoding.ENCODINGS: + self.ce = ce + else: + self.ce = None + + def __enter__(self): + if self.ce: + self.message.decode() + + def __exit__(self, type, value, tb): + if self.ce: + self.message.encode(self.ce)
\ No newline at end of file diff --git a/netlib/http/models.py b/netlib/http/models.py deleted file mode 100644 index 55664533..00000000 --- a/netlib/http/models.py +++ /dev/null @@ -1,345 +0,0 @@ - - -from ..odict import ODict -from .. import utils, encoding -from ..utils import always_bytes, native -from . import cookies -from .headers import Headers - -from six.moves import urllib - -# TODO: Move somewhere else? -ALPN_PROTO_HTTP1 = b'http/1.1' -ALPN_PROTO_H2 = b'h2' -HDR_FORM_URLENCODED = "application/x-www-form-urlencoded" -HDR_FORM_MULTIPART = "multipart/form-data" - -CONTENT_MISSING = 0 - - -class Message(object): - def __init__(self, http_version, headers, body, timestamp_start, timestamp_end): - self.http_version = http_version - if not headers: - headers = Headers() - assert isinstance(headers, Headers) - self.headers = headers - - self._body = body - self.timestamp_start = timestamp_start - self.timestamp_end = timestamp_end - - @property - def body(self): - return self._body - - @body.setter - def body(self, body): - self._body = body - if isinstance(body, bytes): - self.headers["content-length"] = str(len(body)).encode() - - content = body - - def __eq__(self, other): - if isinstance(other, Message): - return self.__dict__ == other.__dict__ - return False - - -class Request(Message): - def __init__( - self, - form_in, - method, - scheme, - host, - port, - path, - http_version, - headers=None, - body=None, - timestamp_start=None, - timestamp_end=None, - form_out=None - ): - super(Request, self).__init__(http_version, headers, body, timestamp_start, timestamp_end) - - self.form_in = form_in - self.method = method - self.scheme = scheme - self.host = host - self.port = port - self.path = path - self.form_out = form_out or form_in - - def __repr__(self): - if self.host and self.port: - hostport = "{}:{}".format(native(self.host,"idna"), self.port) - else: - hostport = "" - path = self.path or "" - return "HTTPRequest({} {}{})".format( - self.method, hostport, path - ) - - def anticache(self): - """ - Modifies this request to remove headers that might produce a cached - response. That is, we remove ETags and If-Modified-Since headers. - """ - delheaders = [ - "if-modified-since", - "if-none-match", - ] - for i in delheaders: - self.headers.pop(i, None) - - def anticomp(self): - """ - Modifies this request to remove headers that will compress the - resource's data. - """ - self.headers["accept-encoding"] = "identity" - - def constrain_encoding(self): - """ - Limits the permissible Accept-Encoding values, based on what we can - decode appropriately. - """ - accept_encoding = self.headers.get("accept-encoding") - if accept_encoding: - self.headers["accept-encoding"] = ( - ', '.join( - e - for e in encoding.ENCODINGS - if e in accept_encoding - ) - ) - - def update_host_header(self): - """ - Update the host header to reflect the current target. - """ - self.headers["host"] = self.host - - def get_form(self): - """ - Retrieves the URL-encoded or multipart form data, returning an ODict object. - Returns an empty ODict if there is no data or the content-type - indicates non-form data. - """ - if self.body: - if HDR_FORM_URLENCODED in self.headers.get("content-type", "").lower(): - return self.get_form_urlencoded() - elif HDR_FORM_MULTIPART in self.headers.get("content-type", "").lower(): - return self.get_form_multipart() - return ODict([]) - - def get_form_urlencoded(self): - """ - Retrieves the URL-encoded form data, returning an ODict object. - Returns an empty ODict if there is no data or the content-type - indicates non-form data. - """ - if self.body and HDR_FORM_URLENCODED in self.headers.get("content-type", "").lower(): - return ODict(utils.urldecode(self.body)) - return ODict([]) - - def get_form_multipart(self): - if self.body and HDR_FORM_MULTIPART in self.headers.get("content-type", "").lower(): - return ODict( - utils.multipartdecode( - self.headers, - self.body)) - return ODict([]) - - def set_form_urlencoded(self, odict): - """ - Sets the body to the URL-encoded form data, and adds the - appropriate content-type header. Note that this will destory the - existing body if there is one. - """ - # FIXME: If there's an existing content-type header indicating a - # url-encoded form, leave it alone. - self.headers["content-type"] = HDR_FORM_URLENCODED - self.body = utils.urlencode(odict.lst) - - def get_path_components(self): - """ - Returns the path components of the URL as a list of strings. - - Components are unquoted. - """ - _, _, path, _, _, _ = urllib.parse.urlparse(self.url) - return [urllib.parse.unquote(native(i,"ascii")) for i in path.split(b"/") if i] - - def set_path_components(self, lst): - """ - Takes a list of strings, and sets the path component of the URL. - - Components are quoted. - """ - lst = [urllib.parse.quote(i, safe="") for i in lst] - path = always_bytes("/" + "/".join(lst)) - scheme, netloc, _, params, query, fragment = urllib.parse.urlparse(self.url) - self.url = urllib.parse.urlunparse( - [scheme, netloc, path, params, query, fragment] - ) - - def get_query(self): - """ - Gets the request query string. Returns an ODict object. - """ - _, _, _, _, query, _ = urllib.parse.urlparse(self.url) - if query: - return ODict(utils.urldecode(query)) - return ODict([]) - - def set_query(self, odict): - """ - Takes an ODict object, and sets the request query string. - """ - scheme, netloc, path, params, _, fragment = urllib.parse.urlparse(self.url) - query = utils.urlencode(odict.lst) - self.url = urllib.parse.urlunparse( - [scheme, netloc, path, params, query, fragment] - ) - - def pretty_host(self, hostheader): - """ - Heuristic to get the host of the request. - - Note that pretty_host() does not always return the TCP destination - of the request, e.g. if an upstream proxy is in place - - If hostheader is set to True, the Host: header will be used as - additional (and preferred) data source. This is handy in - transparent mode, where only the IO of the destination is known, - but not the resolved name. This is disabled by default, as an - attacker may spoof the host header to confuse an analyst. - """ - if hostheader and "host" in self.headers: - try: - return self.headers["host"] - except ValueError: - pass - if self.host: - return self.host.decode("idna") - - def pretty_url(self, hostheader): - if self.form_out == "authority": # upstream proxy mode - return b"%s:%d" % (always_bytes(self.pretty_host(hostheader)), self.port) - return utils.unparse_url(self.scheme, - self.pretty_host(hostheader), - self.port, - self.path) - - def get_cookies(self): - """ - Returns a possibly empty netlib.odict.ODict object. - """ - ret = ODict() - for i in self.headers.get_all("Cookie"): - ret.extend(cookies.parse_cookie_header(i)) - return ret - - def set_cookies(self, odict): - """ - Takes an netlib.odict.ODict object. Over-writes any existing Cookie - headers. - """ - v = cookies.format_cookie_header(odict) - self.headers["cookie"] = v - - @property - def url(self): - """ - Returns a URL string, constructed from the Request's URL components. - """ - return utils.unparse_url( - self.scheme, - self.host, - self.port, - self.path - ) - - @url.setter - def url(self, url): - """ - Parses a URL specification, and updates the Request's information - accordingly. - - Raises: - ValueError if the URL was invalid - """ - # TODO: Should handle incoming unicode here. - parts = utils.parse_url(url) - if not parts: - raise ValueError("Invalid URL: %s" % url) - self.scheme, self.host, self.port, self.path = parts - - -class Response(Message): - def __init__( - self, - http_version, - status_code, - msg=None, - headers=None, - body=None, - timestamp_start=None, - timestamp_end=None, - ): - super(Response, self).__init__(http_version, headers, body, timestamp_start, timestamp_end) - self.status_code = status_code - self.msg = msg - - def __repr__(self): - # return "Response(%s - %s)" % (self.status_code, self.msg) - - if self.body: - size = utils.pretty_size(len(self.body)) - else: - size = "content missing" - # TODO: Remove "(unknown content type, content missing)" edge-case - return "<Response: {status_code} {msg} ({contenttype}, {size})>".format( - status_code=self.status_code, - msg=self.msg, - contenttype=self.headers.get("content-type", "unknown content type"), - size=size) - - def get_cookies(self): - """ - Get the contents of all Set-Cookie headers. - - Returns a possibly empty ODict, where keys are cookie name strings, - and values are [value, attr] lists. Value is a string, and attr is - an ODictCaseless containing cookie attributes. Within attrs, unary - attributes (e.g. HTTPOnly) are indicated by a Null value. - """ - ret = [] - for header in self.headers.get_all("set-cookie"): - v = cookies.parse_set_cookie_header(header) - if v: - name, value, attrs = v - ret.append([name, [value, attrs]]) - return ODict(ret) - - def set_cookies(self, odict): - """ - Set the Set-Cookie headers on this response, over-writing existing - headers. - - Accepts an ODict of the same format as that returned by get_cookies. - """ - values = [] - for i in odict.lst: - values.append( - cookies.format_set_cookie_header( - i[0], - i[1][0], - i[1][1] - ) - ) - self.headers.set_all("set-cookie", values) diff --git a/netlib/http/request.py b/netlib/http/request.py new file mode 100644 index 00000000..92d99532 --- /dev/null +++ b/netlib/http/request.py @@ -0,0 +1,352 @@ +from __future__ import absolute_import, print_function, division + +import warnings + +import six +from six.moves import urllib + +from netlib import utils +from netlib.http import cookies +from netlib.odict import ODict +from .. import encoding +from .headers import Headers +from .message import Message, _native, _always_bytes, MessageData + + +class RequestData(MessageData): + def __init__(self, first_line_format, method, scheme, host, port, path, http_version, headers=None, content=None, + timestamp_start=None, timestamp_end=None): + if not headers: + headers = Headers() + assert isinstance(headers, Headers) + + self.first_line_format = first_line_format + self.method = method + self.scheme = scheme + self.host = host + self.port = port + self.path = path + self.http_version = http_version + self.headers = headers + self.content = content + self.timestamp_start = timestamp_start + self.timestamp_end = timestamp_end + + +class Request(Message): + """ + An HTTP request. + """ + def __init__(self, *args, **kwargs): + data = RequestData(*args, **kwargs) + super(Request, self).__init__(data) + + def __repr__(self): + if self.host and self.port: + hostport = "{}:{}".format(self.host, self.port) + else: + hostport = "" + path = self.path or "" + return "Request({} {}{})".format( + self.method, hostport, path + ) + + @property + def first_line_format(self): + """ + HTTP request form as defined in `RFC7230 <https://tools.ietf.org/html/rfc7230#section-5.3>`_. + + origin-form and asterisk-form are subsumed as "relative". + """ + return self.data.first_line_format + + @first_line_format.setter + def first_line_format(self, first_line_format): + self.data.first_line_format = first_line_format + + @property + def method(self): + """ + HTTP request method, e.g. "GET". + """ + return _native(self.data.method).upper() + + @method.setter + def method(self, method): + self.data.method = _always_bytes(method) + + @property + def scheme(self): + """ + HTTP request scheme, which should be "http" or "https". + """ + return _native(self.data.scheme) + + @scheme.setter + def scheme(self, scheme): + self.data.scheme = _always_bytes(scheme) + + @property + def host(self): + """ + Target host. This may be parsed from the raw request + (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line) + or inferred from the proxy mode (e.g. an IP in transparent mode). + """ + + if six.PY2: # pragma: nocover + return self.data.host + + if not self.data.host: + return self.data.host + try: + return self.data.host.decode("idna") + except UnicodeError: + return self.data.host.decode("utf8", "surrogateescape") + + @host.setter + def host(self, host): + if isinstance(host, six.text_type): + try: + # There's no non-strict mode for IDNA encoding. + # We don't want this operation to fail though, so we try + # utf8 as a last resort. + host = host.encode("idna", "strict") + except UnicodeError: + host = host.encode("utf8", "surrogateescape") + + self.data.host = host + + # Update host header + if "host" in self.headers: + if host: + self.headers["host"] = host + else: + self.headers.pop("host") + + @property + def port(self): + """ + Target port + """ + return self.data.port + + @port.setter + def port(self, port): + self.data.port = port + + @property + def path(self): + """ + HTTP request path, e.g. "/index.html". + Guaranteed to start with a slash. + """ + return _native(self.data.path) + + @path.setter + def path(self, path): + self.data.path = _always_bytes(path) + + @property + def url(self): + """ + The URL string, constructed from the request's URL components + """ + return utils.unparse_url(self.scheme, self.host, self.port, self.path) + + @url.setter + def url(self, url): + self.scheme, self.host, self.port, self.path = utils.parse_url(url) + + @property + def pretty_host(self): + """ + Similar to :py:attr:`host`, but using the Host headers as an additional preferred data source. + This is useful in transparent mode where :py:attr:`host` is only an IP address, + but may not reflect the actual destination as the Host header could be spoofed. + """ + return self.headers.get("host", self.host) + + @property + def pretty_url(self): + """ + Like :py:attr:`url`, but using :py:attr:`pretty_host` instead of :py:attr:`host`. + """ + if self.first_line_format == "authority": + return "%s:%d" % (self.pretty_host, self.port) + return utils.unparse_url(self.scheme, self.pretty_host, self.port, self.path) + + @property + def query(self): + """ + The request query string as an :py:class:`ODict` object. + None, if there is no query. + """ + _, _, _, _, query, _ = urllib.parse.urlparse(self.url) + if query: + return ODict(utils.urldecode(query)) + return None + + @query.setter + def query(self, odict): + query = utils.urlencode(odict.lst) + scheme, netloc, path, params, _, fragment = urllib.parse.urlparse(self.url) + self.url = urllib.parse.urlunparse([scheme, netloc, path, params, query, fragment]) + + @property + def cookies(self): + """ + The request cookies. + An empty :py:class:`ODict` object if the cookie monster ate them all. + """ + ret = ODict() + for i in self.headers.get_all("Cookie"): + ret.extend(cookies.parse_cookie_header(i)) + return ret + + @cookies.setter + def cookies(self, odict): + self.headers["cookie"] = cookies.format_cookie_header(odict) + + @property + def path_components(self): + """ + The URL's path components as a list of strings. + Components are unquoted. + """ + _, _, path, _, _, _ = urllib.parse.urlparse(self.url) + return [urllib.parse.unquote(i) for i in path.split("/") if i] + + @path_components.setter + def path_components(self, components): + components = map(lambda x: urllib.parse.quote(x, safe=""), components) + path = "/" + "/".join(components) + scheme, netloc, _, params, query, fragment = urllib.parse.urlparse(self.url) + self.url = urllib.parse.urlunparse([scheme, netloc, path, params, query, fragment]) + + def anticache(self): + """ + Modifies this request to remove headers that might produce a cached + response. That is, we remove ETags and If-Modified-Since headers. + """ + delheaders = [ + "if-modified-since", + "if-none-match", + ] + for i in delheaders: + self.headers.pop(i, None) + + def anticomp(self): + """ + Modifies this request to remove headers that will compress the + resource's data. + """ + self.headers["accept-encoding"] = "identity" + + def constrain_encoding(self): + """ + Limits the permissible Accept-Encoding values, based on what we can + decode appropriately. + """ + accept_encoding = self.headers.get("accept-encoding") + if accept_encoding: + self.headers["accept-encoding"] = ( + ', '.join( + e + for e in encoding.ENCODINGS + if e in accept_encoding + ) + ) + + @property + def urlencoded_form(self): + """ + The URL-encoded form data as an :py:class:`ODict` object. + None if there is no data or the content-type indicates non-form data. + """ + is_valid_content_type = "application/x-www-form-urlencoded" in self.headers.get("content-type", "").lower() + if self.content and is_valid_content_type: + return ODict(utils.urldecode(self.content)) + return None + + @urlencoded_form.setter + def urlencoded_form(self, odict): + """ + Sets the body to the URL-encoded form data, and adds the appropriate content-type header. + This will overwrite the existing content if there is one. + """ + self.headers["content-type"] = "application/x-www-form-urlencoded" + self.content = utils.urlencode(odict.lst) + + @property + def multipart_form(self): + """ + The multipart form data as an :py:class:`ODict` object. + None if there is no data or the content-type indicates non-form data. + """ + is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower() + if self.content and is_valid_content_type: + return ODict(utils.multipartdecode(self.headers,self.content)) + return None + + @multipart_form.setter + def multipart_form(self): + raise NotImplementedError() + + # Legacy + + def get_cookies(self): # pragma: nocover + warnings.warn(".get_cookies is deprecated, use .cookies instead.", DeprecationWarning) + return self.cookies + + def set_cookies(self, odict): # pragma: nocover + warnings.warn(".set_cookies is deprecated, use .cookies instead.", DeprecationWarning) + self.cookies = odict + + def get_query(self): # pragma: nocover + warnings.warn(".get_query is deprecated, use .query instead.", DeprecationWarning) + return self.query or ODict([]) + + def set_query(self, odict): # pragma: nocover + warnings.warn(".set_query is deprecated, use .query instead.", DeprecationWarning) + self.query = odict + + def get_path_components(self): # pragma: nocover + warnings.warn(".get_path_components is deprecated, use .path_components instead.", DeprecationWarning) + return self.path_components + + def set_path_components(self, lst): # pragma: nocover + warnings.warn(".set_path_components is deprecated, use .path_components instead.", DeprecationWarning) + self.path_components = lst + + def get_form_urlencoded(self): # pragma: nocover + warnings.warn(".get_form_urlencoded is deprecated, use .urlencoded_form instead.", DeprecationWarning) + return self.urlencoded_form or ODict([]) + + def set_form_urlencoded(self, odict): # pragma: nocover + warnings.warn(".set_form_urlencoded is deprecated, use .urlencoded_form instead.", DeprecationWarning) + self.urlencoded_form = odict + + def get_form_multipart(self): # pragma: nocover + warnings.warn(".get_form_multipart is deprecated, use .multipart_form instead.", DeprecationWarning) + return self.multipart_form or ODict([]) + + @property + def form_in(self): # pragma: nocover + warnings.warn(".form_in is deprecated, use .first_line_format instead.", DeprecationWarning) + return self.first_line_format + + @form_in.setter + def form_in(self, form_in): # pragma: nocover + warnings.warn(".form_in is deprecated, use .first_line_format instead.", DeprecationWarning) + self.first_line_format = form_in + + @property + def form_out(self): # pragma: nocover + warnings.warn(".form_out is deprecated, use .first_line_format instead.", DeprecationWarning) + return self.first_line_format + + @form_out.setter + def form_out(self, form_out): # pragma: nocover + warnings.warn(".form_out is deprecated, use .first_line_format instead.", DeprecationWarning) + self.first_line_format = form_out
\ No newline at end of file diff --git a/netlib/http/response.py b/netlib/http/response.py new file mode 100644 index 00000000..66e5ded6 --- /dev/null +++ b/netlib/http/response.py @@ -0,0 +1,117 @@ +from __future__ import absolute_import, print_function, division + +import warnings + +from . import cookies +from .headers import Headers +from .message import Message, _native, _always_bytes, MessageData +from .. import utils +from ..odict import ODict + + +class ResponseData(MessageData): + def __init__(self, http_version, status_code, reason=None, headers=None, content=None, + timestamp_start=None, timestamp_end=None): + if not headers: + headers = Headers() + assert isinstance(headers, Headers) + + self.http_version = http_version + self.status_code = status_code + self.reason = reason + self.headers = headers + self.content = content + self.timestamp_start = timestamp_start + self.timestamp_end = timestamp_end + + +class Response(Message): + """ + An HTTP response. + """ + def __init__(self, *args, **kwargs): + data = ResponseData(*args, **kwargs) + super(Response, self).__init__(data) + + def __repr__(self): + if self.content: + details = "{}, {}".format( + self.headers.get("content-type", "unknown content type"), + utils.pretty_size(len(self.content)) + ) + else: + details = "no content" + return "Response({status_code} {reason}, {details})".format( + status_code=self.status_code, + reason=self.reason, + details=details + ) + + @property + def status_code(self): + """ + HTTP Status Code, e.g. ``200``. + """ + return self.data.status_code + + @status_code.setter + def status_code(self, status_code): + self.data.status_code = status_code + + @property + def reason(self): + """ + HTTP Reason Phrase, e.g. "Not Found". + This is always :py:obj:`None` for HTTP2 requests, because HTTP2 responses do not contain a reason phrase. + """ + return _native(self.data.reason) + + @reason.setter + def reason(self, reason): + self.data.reason = _always_bytes(reason) + + @property + def cookies(self): + """ + Get the contents of all Set-Cookie headers. + + A possibly empty :py:class:`ODict`, where keys are cookie name strings, + and values are [value, attr] lists. Value is a string, and attr is + an ODictCaseless containing cookie attributes. Within attrs, unary + attributes (e.g. HTTPOnly) are indicated by a Null value. + """ + ret = [] + for header in self.headers.get_all("set-cookie"): + v = cookies.parse_set_cookie_header(header) + if v: + name, value, attrs = v + ret.append([name, [value, attrs]]) + return ODict(ret) + + @cookies.setter + def cookies(self, odict): + values = [] + for i in odict.lst: + header = cookies.format_set_cookie_header(i[0], i[1][0], i[1][1]) + values.append(header) + self.headers.set_all("set-cookie", values) + + # Legacy + + def get_cookies(self): # pragma: nocover + warnings.warn(".get_cookies is deprecated, use .cookies instead.", DeprecationWarning) + return self.cookies + + def set_cookies(self, odict): # pragma: nocover + warnings.warn(".set_cookies is deprecated, use .cookies instead.", DeprecationWarning) + self.cookies = odict + + @property + def msg(self): # pragma: nocover + warnings.warn(".msg is deprecated, use .reason instead.", DeprecationWarning) + return self.reason + + @msg.setter + def msg(self, reason): # pragma: nocover + warnings.warn(".msg is deprecated, use .reason instead.", DeprecationWarning) + self.reason = reason diff --git a/netlib/http/status_codes.py b/netlib/http/status_codes.py index dc09f465..8a4dc1f5 100644 --- a/netlib/http/status_codes.py +++ b/netlib/http/status_codes.py @@ -1,4 +1,4 @@ -from __future__ import (absolute_import, print_function, division) +from __future__ import absolute_import, print_function, division CONTINUE = 100 SWITCHING = 101 @@ -37,6 +37,7 @@ REQUEST_URI_TOO_LONG = 414 UNSUPPORTED_MEDIA_TYPE = 415 REQUESTED_RANGE_NOT_SATISFIABLE = 416 EXPECTATION_FAILED = 417 +IM_A_TEAPOT = 418 INTERNAL_SERVER_ERROR = 500 NOT_IMPLEMENTED = 501 @@ -91,6 +92,7 @@ RESPONSES = { UNSUPPORTED_MEDIA_TYPE: "Unsupported Media Type", REQUESTED_RANGE_NOT_SATISFIABLE: "Requested Range not satisfiable", EXPECTATION_FAILED: "Expectation Failed", + IM_A_TEAPOT: "I'm a teapot", # 500 INTERNAL_SERVER_ERROR: "Internal Server Error", diff --git a/netlib/tutils.py b/netlib/tutils.py index 1665a792..e16f1a76 100644 --- a/netlib/tutils.py +++ b/netlib/tutils.py @@ -98,7 +98,7 @@ def treq(**kwargs): netlib.http.Request """ default = dict( - form_in="relative", + first_line_format="relative", method=b"GET", scheme=b"http", host=b"address", @@ -106,7 +106,7 @@ def treq(**kwargs): path=b"/path", http_version=b"HTTP/1.1", headers=Headers(header="qvalue"), - body=b"content" + content=b"content" ) default.update(kwargs) return Request(**default) @@ -120,9 +120,9 @@ def tresp(**kwargs): default = dict( http_version=b"HTTP/1.1", status_code=200, - msg=b"OK", - headers=Headers(header_response=b"svalue"), - body=b"message", + reason=b"OK", + headers=Headers(header_response="svalue"), + content=b"message", timestamp_start=time.time(), timestamp_end=time.time(), ) diff --git a/netlib/utils.py b/netlib/utils.py index 8b9548ed..acc7ccd4 100644 --- a/netlib/utils.py +++ b/netlib/utils.py @@ -274,22 +274,27 @@ def get_header_tokens(headers, key): return [token.strip() for token in tokens] -@always_byte_args() def hostport(scheme, host, port): """ Returns the host component, with a port specifcation if needed. """ - if (port, scheme) in [(80, b"http"), (443, b"https")]: + if (port, scheme) in [(80, "http"), (443, "https"), (80, b"http"), (443, b"https")]: return host else: - return b"%s:%d" % (host, port) + if isinstance(host, six.binary_type): + return b"%s:%d" % (host, port) + else: + return "%s:%d" % (host, port) def unparse_url(scheme, host, port, path=""): """ - Returns a URL string, constructed from the specified compnents. + Returns a URL string, constructed from the specified components. + + Args: + All args must be str. """ - return b"%s://%s%s" % (scheme, hostport(scheme, host, port), path) + return "%s://%s%s" % (scheme, hostport(scheme, host, port), path) def urlencode(s): diff --git a/netlib/wsgi.py b/netlib/wsgi.py index 4fcd5178..df248a19 100644 --- a/netlib/wsgi.py +++ b/netlib/wsgi.py @@ -25,9 +25,9 @@ class Flow(object): class Request(object): - def __init__(self, scheme, method, path, http_version, headers, body): + def __init__(self, scheme, method, path, http_version, headers, content): self.scheme, self.method, self.path = scheme, method, path - self.headers, self.body = headers, body + self.headers, self.content = headers, content self.http_version = http_version @@ -64,7 +64,7 @@ class WSGIAdaptor(object): environ = { 'wsgi.version': (1, 0), 'wsgi.url_scheme': native(flow.request.scheme, "latin-1"), - 'wsgi.input': BytesIO(flow.request.body or b""), + 'wsgi.input': BytesIO(flow.request.content or b""), 'wsgi.errors': errsoc, 'wsgi.multithread': True, 'wsgi.multiprocess': False, diff --git a/test/http/http1/test_assemble.py b/test/http/http1/test_assemble.py index 963e7549..ed94292d 100644 --- a/test/http/http1/test_assemble.py +++ b/test/http/http1/test_assemble.py @@ -20,7 +20,7 @@ def test_assemble_request(): ) with raises(HttpException): - assemble_request(treq(body=CONTENT_MISSING)) + assemble_request(treq(content=CONTENT_MISSING)) def test_assemble_request_head(): @@ -40,7 +40,7 @@ def test_assemble_response(): ) with raises(HttpException): - assemble_response(tresp(body=CONTENT_MISSING)) + assemble_response(tresp(content=CONTENT_MISSING)) def test_assemble_response_head(): @@ -62,31 +62,40 @@ def test_assemble_body(): def test_assemble_request_line(): - assert _assemble_request_line(treq()) == b"GET /path HTTP/1.1" + assert _assemble_request_line(treq().data) == b"GET /path HTTP/1.1" - authority_request = treq(method=b"CONNECT", form_in="authority") + authority_request = treq(method=b"CONNECT", first_line_format="authority").data assert _assemble_request_line(authority_request) == b"CONNECT address:22 HTTP/1.1" - absolute_request = treq(form_in="absolute") + absolute_request = treq(first_line_format="absolute").data assert _assemble_request_line(absolute_request) == b"GET http://address:22/path HTTP/1.1" with raises(RuntimeError): - _assemble_request_line(treq(), "invalid_form") + _assemble_request_line(treq(first_line_format="invalid_form").data) def test_assemble_request_headers(): # https://github.com/mitmproxy/mitmproxy/issues/186 - r = treq(body=b"") + r = treq(content=b"") r.headers["Transfer-Encoding"] = "chunked" - c = _assemble_request_headers(r) + c = _assemble_request_headers(r.data) assert b"Transfer-Encoding" in c - assert b"host" in _assemble_request_headers(treq(headers=Headers())) + +def test_assemble_request_headers_host_header(): + r = treq() + r.headers = Headers() + c = _assemble_request_headers(r.data) + assert b"host" in c + + r.host = None + c = _assemble_request_headers(r.data) + assert b"host" not in c def test_assemble_response_headers(): # https://github.com/mitmproxy/mitmproxy/issues/186 - r = tresp(body=b"") + r = tresp(content=b"") r.headers["Transfer-Encoding"] = "chunked" c = _assemble_response_headers(r) assert b"Transfer-Encoding" in c diff --git a/test/http/http1/test_read.py b/test/http/http1/test_read.py index 98d31bc2..45f61b4f 100644 --- a/test/http/http1/test_read.py +++ b/test/http/http1/test_read.py @@ -2,7 +2,7 @@ from __future__ import absolute_import, print_function, division from io import BytesIO import textwrap from mock import Mock -from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect +from netlib.exceptions import HttpException, HttpSyntaxException, HttpReadDisconnect, TcpDisconnect from netlib.http import Headers from netlib.http.http1.read import ( read_request, read_response, read_request_head, @@ -16,8 +16,8 @@ from netlib.tutils import treq, tresp, raises def test_read_request(): rfile = BytesIO(b"GET / HTTP/1.1\r\n\r\nskip") r = read_request(rfile) - assert r.method == b"GET" - assert r.body == b"" + assert r.method == "GET" + assert r.content == b"" assert r.timestamp_end assert rfile.read() == b"skip" @@ -32,9 +32,9 @@ def test_read_request_head(): rfile.reset_timestamps = Mock() rfile.first_byte_timestamp = 42 r = read_request_head(rfile) - assert r.method == b"GET" + assert r.method == "GET" assert r.headers["Content-Length"] == "4" - assert r.body is None + assert r.content is None assert rfile.reset_timestamps.called assert r.timestamp_start == 42 assert rfile.read() == b"skip" @@ -45,7 +45,7 @@ def test_read_response(): rfile = BytesIO(b"HTTP/1.1 418 I'm a teapot\r\n\r\nbody") r = read_response(rfile, req) assert r.status_code == 418 - assert r.body == b"body" + assert r.content == b"body" assert r.timestamp_end @@ -61,7 +61,7 @@ def test_read_response_head(): r = read_response_head(rfile) assert r.status_code == 418 assert r.headers["Content-Length"] == "4" - assert r.body is None + assert r.content is None assert rfile.reset_timestamps.called assert r.timestamp_start == 42 assert rfile.read() == b"skip" @@ -100,6 +100,11 @@ class TestReadBody(object): with raises(HttpException): b"".join(read_body(rfile, -1, 3)) + def test_max_chunk_size(self): + rfile = BytesIO(b"123456") + assert list(read_body(rfile, -1, max_chunk_size=None)) == [b"123456"] + rfile = BytesIO(b"123456") + assert list(read_body(rfile, -1, max_chunk_size=1)) == [b"1", b"2", b"3", b"4", b"5", b"6"] def test_connection_close(): headers = Headers() @@ -112,6 +117,9 @@ def test_connection_close(): headers["connection"] = "close" assert connection_close(b"HTTP/1.1", headers) + headers["connection"] = "foobar" + assert connection_close(b"HTTP/1.0", headers) + assert not connection_close(b"HTTP/1.1", headers) def test_expected_http_body_size(): # Expect: 100-continue @@ -169,6 +177,11 @@ def test_get_first_line(): rfile = BytesIO(b"") _get_first_line(rfile) + with raises(HttpReadDisconnect): + rfile = Mock() + rfile.readline.side_effect = TcpDisconnect + _get_first_line(rfile) + def test_read_request_line(): def t(b): @@ -187,7 +200,8 @@ def test_read_request_line(): t(b"GET / WTF/1.1") with raises(HttpSyntaxException): t(b"this is not http") - + with raises(HttpReadDisconnect): + t(b"") def test_parse_authority_form(): assert _parse_authority_form(b"foo:42") == (b"foo", 42) @@ -218,6 +232,8 @@ def test_read_response_line(): t(b"HTTP/1.1 OK OK") with raises(HttpSyntaxException): t(b"WTF/1.1 200 OK") + with raises(HttpReadDisconnect): + t(b"") def test_check_http_version(): @@ -283,7 +299,7 @@ class TestReadHeaders(object): def test_read_chunked(): - req = treq(body=None) + req = treq(content=None) req.headers["Transfer-Encoding"] = "chunked" data = b"1\r\na\r\n0\r\n" diff --git a/test/http/http2/test_protocol.py b/test/http/http2/test_protocol.py index a55941e0..6bda96f5 100644 --- a/test/http/http2/test_protocol.py +++ b/test/http/http2/test_protocol.py @@ -65,7 +65,7 @@ class TestProtocol: class TestCheckALPNMatch(tservers.ServerTestBase): handler = EchoHandler ssl = dict( - alpn_select=ALPN_PROTO_H2, + alpn_select=b'h2', ) if OpenSSL._util.lib.Cryptography_HAS_ALPN: @@ -73,7 +73,7 @@ class TestCheckALPNMatch(tservers.ServerTestBase): def test_check_alpn(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - c.convert_to_ssl(alpn_protos=[ALPN_PROTO_H2]) + c.convert_to_ssl(alpn_protos=[b'h2']) protocol = HTTP2Protocol(c) assert protocol.check_alpn() @@ -89,7 +89,7 @@ class TestCheckALPNMismatch(tservers.ServerTestBase): def test_check_alpn(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - c.convert_to_ssl(alpn_protos=[ALPN_PROTO_H2]) + c.convert_to_ssl(alpn_protos=[b'h2']) protocol = HTTP2Protocol(c) tutils.raises(NotImplementedError, protocol.check_alpn) @@ -311,7 +311,7 @@ class TestReadRequest(tservers.ServerTestBase): assert req.stream_id assert req.headers.fields == [[':method', 'GET'], [':path', '/'], [':scheme', 'https']] - assert req.body == b'foobar' + assert req.content == b'foobar' class TestReadRequestRelative(tservers.ServerTestBase): @@ -417,7 +417,7 @@ class TestReadResponse(tservers.ServerTestBase): assert resp.status_code == 200 assert resp.msg == "" assert resp.headers.fields == [[':status', '200'], ['etag', 'foobar']] - assert resp.body == b'foobar' + assert resp.content == b'foobar' assert resp.timestamp_end @@ -444,7 +444,7 @@ class TestReadEmptyResponse(tservers.ServerTestBase): assert resp.status_code == 200 assert resp.msg == "" assert resp.headers.fields == [[':status', '200'], ['etag', 'foobar']] - assert resp.body == b'' + assert resp.content == b'' class TestAssembleRequest(object): diff --git a/test/http/test_cookies.py b/test/http/test_cookies.py index 413b6241..34bb64f2 100644 --- a/test/http/test_cookies.py +++ b/test/http/test_cookies.py @@ -21,6 +21,7 @@ def test_read_quoted_string(): [(r'"f\\o" x', 0), (r"f\o", 6)], [(r'"f\\" x', 0), (r"f" + '\\', 5)], [('"fo\\\"" x', 0), ("fo\"", 6)], + [('"foo" x', 7), ("", 8)], ] for q, a in tokens: assert cookies._read_quoted_string(*q) == a diff --git a/test/http/test_headers.py b/test/http/test_headers.py index f1af1feb..8bddc0b2 100644 --- a/test/http/test_headers.py +++ b/test/http/test_headers.py @@ -38,6 +38,9 @@ class TestHeaders(object): assert headers["Host"] == "example.com" assert headers["Accept"] == "text/plain" + with raises(ValueError): + Headers([[b"Host", u"not-bytes"]]) + def test_getitem(self): headers = Headers(Host="example.com") assert headers["Host"] == "example.com" diff --git a/test/http/test_message.py b/test/http/test_message.py new file mode 100644 index 00000000..2c37dc3e --- /dev/null +++ b/test/http/test_message.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, print_function, division + +from netlib.http import decoded, Headers +from netlib.tutils import tresp, raises + + +def _test_passthrough_attr(message, attr): + assert getattr(message, attr) == getattr(message.data, attr) + setattr(message, attr, "foo") + assert getattr(message.data, attr) == "foo" + + +def _test_decoded_attr(message, attr): + assert getattr(message, attr) == getattr(message.data, attr).decode("utf8") + # Set str, get raw bytes + setattr(message, attr, "foo") + assert getattr(message.data, attr) == b"foo" + # Set raw bytes, get decoded + setattr(message.data, attr, b"bar") + assert getattr(message, attr) == "bar" + # Set bytes, get raw bytes + setattr(message, attr, b"baz") + assert getattr(message.data, attr) == b"baz" + + # Set UTF8 + setattr(message, attr, "Non-Autorisé") + assert getattr(message.data, attr) == b"Non-Autoris\xc3\xa9" + # Don't fail on garbage + setattr(message.data, attr, b"foo\xFF\x00bar") + assert getattr(message, attr).startswith("foo") + assert getattr(message, attr).endswith("bar") + # foo.bar = foo.bar should not cause any side effects. + d = getattr(message, attr) + setattr(message, attr, d) + assert getattr(message.data, attr) == b"foo\xFF\x00bar" + + +class TestMessageData(object): + def test_eq_ne(self): + data = tresp(timestamp_start=42, timestamp_end=42).data + same = tresp(timestamp_start=42, timestamp_end=42).data + assert data == same + assert not data != same + + other = tresp(content=b"foo").data + assert not data == other + assert data != other + + assert data != 0 + + +class TestMessage(object): + + def test_init(self): + resp = tresp() + assert resp.data + + def test_eq_ne(self): + resp = tresp(timestamp_start=42, timestamp_end=42) + same = tresp(timestamp_start=42, timestamp_end=42) + assert resp == same + assert not resp != same + + other = tresp(timestamp_start=0, timestamp_end=0) + assert not resp == other + assert resp != other + + assert resp != 0 + + def test_content_length_update(self): + resp = tresp() + resp.content = b"foo" + assert resp.data.content == b"foo" + assert resp.headers["content-length"] == "3" + resp.content = b"" + assert resp.data.content == b"" + assert resp.headers["content-length"] == "0" + + def test_content_basic(self): + _test_passthrough_attr(tresp(), "content") + + def test_headers(self): + _test_passthrough_attr(tresp(), "headers") + + def test_timestamp_start(self): + _test_passthrough_attr(tresp(), "timestamp_start") + + def test_timestamp_end(self): + _test_passthrough_attr(tresp(), "timestamp_end") + + def teste_http_version(self): + _test_decoded_attr(tresp(), "http_version") + + +class TestDecodedDecorator(object): + + def test_simple(self): + r = tresp() + assert r.content == b"message" + assert "content-encoding" not in r.headers + assert r.encode("gzip") + + assert r.headers["content-encoding"] + assert r.content != b"message" + with decoded(r): + assert "content-encoding" not in r.headers + assert r.content == b"message" + assert r.headers["content-encoding"] + assert r.content != b"message" + + def test_modify(self): + r = tresp() + assert "content-encoding" not in r.headers + assert r.encode("gzip") + + with decoded(r): + r.content = b"foo" + + assert r.content != b"foo" + r.decode() + assert r.content == b"foo" + + def test_unknown_ce(self): + r = tresp() + r.headers["content-encoding"] = "zopfli" + r.content = b"foo" + with decoded(r): + assert r.headers["content-encoding"] + assert r.content == b"foo" + assert r.headers["content-encoding"] + assert r.content == b"foo" + + def test_cannot_decode(self): + r = tresp() + assert r.encode("gzip") + r.content = b"foo" + with decoded(r): + assert r.headers["content-encoding"] + assert r.content == b"foo" + assert r.headers["content-encoding"] + assert r.content != b"foo" + r.decode() + assert r.content == b"foo" + + def test_cannot_encode(self): + r = tresp() + assert r.encode("gzip") + with decoded(r): + r.content = None + + assert "content-encoding" not in r.headers + assert r.content is None diff --git a/test/http/test_models.py b/test/http/test_models.py deleted file mode 100644 index 10e0795a..00000000 --- a/test/http/test_models.py +++ /dev/null @@ -1,395 +0,0 @@ -import mock - -from netlib import tutils -from netlib import utils -from netlib.odict import ODict, ODictCaseless -from netlib.http import Request, Response, Headers, CONTENT_MISSING, HDR_FORM_URLENCODED, \ - HDR_FORM_MULTIPART - - -class TestRequest(object): - def test_repr(self): - r = tutils.treq() - assert repr(r) - - def test_headers(self): - tutils.raises(AssertionError, Request, - 'form_in', - 'method', - 'scheme', - 'host', - 'port', - 'path', - b"HTTP/1.1", - 'foobar', - ) - - req = Request( - 'form_in', - 'method', - 'scheme', - 'host', - 'port', - 'path', - b"HTTP/1.1", - ) - assert isinstance(req.headers, Headers) - - def test_equal(self): - a = tutils.treq(timestamp_start=42, timestamp_end=43) - b = tutils.treq(timestamp_start=42, timestamp_end=43) - assert a == b - - assert not a == 'foo' - assert not b == 'foo' - assert not 'foo' == a - assert not 'foo' == b - - - def test_anticache(self): - req = tutils.treq() - req.headers["If-Modified-Since"] = "foo" - req.headers["If-None-Match"] = "bar" - req.anticache() - assert "If-Modified-Since" not in req.headers - assert "If-None-Match" not in req.headers - - def test_anticomp(self): - req = tutils.treq() - req.headers["Accept-Encoding"] = "foobar" - req.anticomp() - assert req.headers["Accept-Encoding"] == "identity" - - def test_constrain_encoding(self): - req = tutils.treq() - req.headers["Accept-Encoding"] = "identity, gzip, foo" - req.constrain_encoding() - assert "foo" not in req.headers["Accept-Encoding"] - - def test_update_host(self): - req = tutils.treq() - req.headers["Host"] = "" - req.host = "foobar" - req.update_host_header() - assert req.headers["Host"] == "foobar" - - def test_get_form(self): - req = tutils.treq() - assert req.get_form() == ODict() - - @mock.patch("netlib.http.Request.get_form_multipart") - @mock.patch("netlib.http.Request.get_form_urlencoded") - def test_get_form_with_url_encoded(self, mock_method_urlencoded, mock_method_multipart): - req = tutils.treq() - assert req.get_form() == ODict() - - req = tutils.treq() - req.body = "foobar" - req.headers["Content-Type"] = HDR_FORM_URLENCODED - req.get_form() - assert req.get_form_urlencoded.called - assert not req.get_form_multipart.called - - @mock.patch("netlib.http.Request.get_form_multipart") - @mock.patch("netlib.http.Request.get_form_urlencoded") - def test_get_form_with_multipart(self, mock_method_urlencoded, mock_method_multipart): - req = tutils.treq() - req.body = "foobar" - req.headers["Content-Type"] = HDR_FORM_MULTIPART - req.get_form() - assert not req.get_form_urlencoded.called - assert req.get_form_multipart.called - - def test_get_form_urlencoded(self): - req = tutils.treq(body="foobar") - assert req.get_form_urlencoded() == ODict() - - req.headers["Content-Type"] = HDR_FORM_URLENCODED - assert req.get_form_urlencoded() == ODict(utils.urldecode(req.body)) - - def test_get_form_multipart(self): - req = tutils.treq(body="foobar") - assert req.get_form_multipart() == ODict() - - req.headers["Content-Type"] = HDR_FORM_MULTIPART - assert req.get_form_multipart() == ODict( - utils.multipartdecode( - req.headers, - req.body - ) - ) - - def test_set_form_urlencoded(self): - req = tutils.treq() - req.set_form_urlencoded(ODict([('foo', 'bar'), ('rab', 'oof')])) - assert req.headers["Content-Type"] == HDR_FORM_URLENCODED - assert req.body - - def test_get_path_components(self): - req = tutils.treq() - assert req.get_path_components() - # TODO: add meaningful assertions - - def test_set_path_components(self): - req = tutils.treq() - req.set_path_components([b"foo", b"bar"]) - # TODO: add meaningful assertions - - def test_get_query(self): - req = tutils.treq() - assert req.get_query().lst == [] - - req.url = "http://localhost:80/foo?bar=42" - assert req.get_query().lst == [(b"bar", b"42")] - - def test_set_query(self): - req = tutils.treq() - req.set_query(ODict([])) - - def test_pretty_host(self): - r = tutils.treq() - assert r.pretty_host(True) == "address" - assert r.pretty_host(False) == "address" - r.headers["host"] = "other" - assert r.pretty_host(True) == "other" - assert r.pretty_host(False) == "address" - r.host = None - assert r.pretty_host(True) == "other" - assert r.pretty_host(False) is None - del r.headers["host"] - assert r.pretty_host(True) is None - assert r.pretty_host(False) is None - - # Invalid IDNA - r.headers["host"] = ".disqus.com" - assert r.pretty_host(True) == ".disqus.com" - - def test_pretty_url(self): - req = tutils.treq() - req.form_out = "authority" - assert req.pretty_url(True) == b"address:22" - assert req.pretty_url(False) == b"address:22" - - req.form_out = "relative" - assert req.pretty_url(True) == b"http://address:22/path" - assert req.pretty_url(False) == b"http://address:22/path" - - def test_get_cookies_none(self): - headers = Headers() - r = tutils.treq() - r.headers = headers - assert len(r.get_cookies()) == 0 - - def test_get_cookies_single(self): - r = tutils.treq() - r.headers = Headers(cookie="cookiename=cookievalue") - result = r.get_cookies() - assert len(result) == 1 - assert result['cookiename'] == ['cookievalue'] - - def test_get_cookies_double(self): - r = tutils.treq() - r.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue") - result = r.get_cookies() - assert len(result) == 2 - assert result['cookiename'] == ['cookievalue'] - assert result['othercookiename'] == ['othercookievalue'] - - def test_get_cookies_withequalsign(self): - r = tutils.treq() - r.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue") - result = r.get_cookies() - assert len(result) == 2 - assert result['cookiename'] == ['coo=kievalue'] - assert result['othercookiename'] == ['othercookievalue'] - - def test_set_cookies(self): - r = tutils.treq() - r.headers = Headers(cookie="cookiename=cookievalue") - result = r.get_cookies() - result["cookiename"] = ["foo"] - r.set_cookies(result) - assert r.get_cookies()["cookiename"] == ["foo"] - - def test_set_url(self): - r = tutils.treq(form_in="absolute") - r.url = b"https://otheraddress:42/ORLY" - assert r.scheme == b"https" - assert r.host == b"otheraddress" - assert r.port == 42 - assert r.path == b"/ORLY" - - try: - r.url = "//localhost:80/foo@bar" - assert False - except: - assert True - - # def test_asterisk_form_in(self): - # f = tutils.tflow(req=None) - # protocol = mock_protocol("OPTIONS * HTTP/1.1") - # f.request = HTTPRequest.from_protocol(protocol) - # - # assert f.request.form_in == "relative" - # f.request.host = f.server_conn.address.host - # f.request.port = f.server_conn.address.port - # f.request.scheme = "http" - # assert protocol.assemble(f.request) == ( - # "OPTIONS * HTTP/1.1\r\n" - # "Host: address:22\r\n" - # "Content-Length: 0\r\n\r\n") - # - # def test_relative_form_in(self): - # protocol = mock_protocol("GET /foo\xff HTTP/1.1") - # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) - # - # protocol = mock_protocol("GET /foo HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: h2c") - # r = HTTPRequest.from_protocol(protocol) - # assert r.headers["Upgrade"] == ["h2c"] - # - # def test_expect_header(self): - # protocol = mock_protocol( - # "GET / HTTP/1.1\r\nContent-Length: 3\r\nExpect: 100-continue\r\n\r\nfoobar") - # r = HTTPRequest.from_protocol(protocol) - # assert protocol.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n" - # assert r.content == "foo" - # assert protocol.tcp_handler.rfile.read(3) == "bar" - # - # def test_authority_form_in(self): - # protocol = mock_protocol("CONNECT oops-no-port.com HTTP/1.1") - # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) - # - # protocol = mock_protocol("CONNECT address:22 HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # r.scheme, r.host, r.port = "http", "address", 22 - # assert protocol.assemble(r) == ( - # "CONNECT address:22 HTTP/1.1\r\n" - # "Host: address:22\r\n" - # "Content-Length: 0\r\n\r\n") - # assert r.pretty_url(False) == "address:22" - # - # def test_absolute_form_in(self): - # protocol = mock_protocol("GET oops-no-protocol.com HTTP/1.1") - # tutils.raises("Bad HTTP request line", HTTPRequest.from_protocol, protocol) - # - # protocol = mock_protocol("GET http://address:22/ HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # assert protocol.assemble(r) == ( - # "GET http://address:22/ HTTP/1.1\r\n" - # "Host: address:22\r\n" - # "Content-Length: 0\r\n\r\n") - # - # def test_http_options_relative_form_in(self): - # """ - # Exercises fix for Issue #392. - # """ - # protocol = mock_protocol("OPTIONS /secret/resource HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # r.host = 'address' - # r.port = 80 - # r.scheme = "http" - # assert protocol.assemble(r) == ( - # "OPTIONS /secret/resource HTTP/1.1\r\n" - # "Host: address\r\n" - # "Content-Length: 0\r\n\r\n") - # - # def test_http_options_absolute_form_in(self): - # protocol = mock_protocol("OPTIONS http://address/secret/resource HTTP/1.1") - # r = HTTPRequest.from_protocol(protocol) - # r.host = 'address' - # r.port = 80 - # r.scheme = "http" - # assert protocol.assemble(r) == ( - # "OPTIONS http://address:80/secret/resource HTTP/1.1\r\n" - # "Host: address\r\n" - # "Content-Length: 0\r\n\r\n") - -class TestResponse(object): - def test_headers(self): - tutils.raises(AssertionError, Response, - b"HTTP/1.1", - 200, - headers='foobar', - ) - - resp = Response( - b"HTTP/1.1", - 200, - ) - assert isinstance(resp.headers, Headers) - - def test_equal(self): - a = tutils.tresp(timestamp_start=42, timestamp_end=43) - b = tutils.tresp(timestamp_start=42, timestamp_end=43) - assert a == b - - assert not a == 'foo' - assert not b == 'foo' - assert not 'foo' == a - assert not 'foo' == b - - def test_repr(self): - r = tutils.tresp() - assert "unknown content type" in repr(r) - r.headers["content-type"] = "foo" - assert "foo" in repr(r) - assert repr(tutils.tresp(body=CONTENT_MISSING)) - - def test_get_cookies_none(self): - resp = tutils.tresp() - resp.headers = Headers() - assert not resp.get_cookies() - - def test_get_cookies_simple(self): - resp = tutils.tresp() - resp.headers = Headers(set_cookie="cookiename=cookievalue") - result = resp.get_cookies() - assert len(result) == 1 - assert "cookiename" in result - assert result["cookiename"][0] == ["cookievalue", ODict()] - - def test_get_cookies_with_parameters(self): - resp = tutils.tresp() - resp.headers = Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly") - result = resp.get_cookies() - assert len(result) == 1 - assert "cookiename" in result - assert result["cookiename"][0][0] == "cookievalue" - attrs = result["cookiename"][0][1] - assert len(attrs) == 4 - assert attrs["domain"] == ["example.com"] - assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"] - assert attrs["path"] == ["/"] - assert attrs["httponly"] == [None] - - def test_get_cookies_no_value(self): - resp = tutils.tresp() - resp.headers = Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/") - result = resp.get_cookies() - assert len(result) == 1 - assert "cookiename" in result - assert result["cookiename"][0][0] == "" - assert len(result["cookiename"][0][1]) == 2 - - def test_get_cookies_twocookies(self): - resp = tutils.tresp() - resp.headers = Headers([ - [b"Set-Cookie", b"cookiename=cookievalue"], - [b"Set-Cookie", b"othercookie=othervalue"] - ]) - result = resp.get_cookies() - assert len(result) == 2 - assert "cookiename" in result - assert result["cookiename"][0] == ["cookievalue", ODict()] - assert "othercookie" in result - assert result["othercookie"][0] == ["othervalue", ODict()] - - def test_set_cookies(self): - resp = tutils.tresp() - v = resp.get_cookies() - v.add("foo", ["bar", ODictCaseless()]) - resp.set_cookies(v) - - v = resp.get_cookies() - assert len(v) == 1 - assert v["foo"] == [["bar", ODictCaseless()]] diff --git a/test/http/test_request.py b/test/http/test_request.py new file mode 100644 index 00000000..8cf69ffe --- /dev/null +++ b/test/http/test_request.py @@ -0,0 +1,238 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, print_function, division + +import six + +from netlib import utils +from netlib.http import Headers +from netlib.odict import ODict +from netlib.tutils import treq, raises +from .test_message import _test_decoded_attr, _test_passthrough_attr + + +class TestRequestData(object): + def test_init(self): + with raises(AssertionError): + treq(headers="foobar") + + assert isinstance(treq(headers=None).headers, Headers) + + +class TestRequestCore(object): + """ + Tests for builtins and the attributes that are directly proxied from the data structure + """ + def test_repr(self): + request = treq() + assert repr(request) == "Request(GET address:22/path)" + request.host = None + assert repr(request) == "Request(GET /path)" + + def test_first_line_format(self): + _test_passthrough_attr(treq(), "first_line_format") + + def test_method(self): + _test_decoded_attr(treq(), "method") + + def test_scheme(self): + _test_decoded_attr(treq(), "scheme") + + def test_port(self): + _test_passthrough_attr(treq(), "port") + + def test_path(self): + _test_decoded_attr(treq(), "path") + + def test_host(self): + if six.PY2: + from unittest import SkipTest + raise SkipTest() + + request = treq() + assert request.host == request.data.host.decode("idna") + + # Test IDNA encoding + # Set str, get raw bytes + request.host = "ídna.example" + assert request.data.host == b"xn--dna-qma.example" + # Set raw bytes, get decoded + request.data.host = b"xn--idn-gla.example" + assert request.host == "idná.example" + # Set bytes, get raw bytes + request.host = b"xn--dn-qia9b.example" + assert request.data.host == b"xn--dn-qia9b.example" + # IDNA encoding is not bijective + request.host = "fußball" + assert request.host == "fussball" + + # Don't fail on garbage + request.data.host = b"foo\xFF\x00bar" + assert request.host.startswith("foo") + assert request.host.endswith("bar") + # foo.bar = foo.bar should not cause any side effects. + d = request.host + request.host = d + assert request.data.host == b"foo\xFF\x00bar" + + def test_host_header_update(self): + request = treq() + assert "host" not in request.headers + request.host = "example.com" + assert "host" not in request.headers + + request.headers["Host"] = "foo" + request.host = "example.org" + assert request.headers["Host"] == "example.org" + + +class TestRequestUtils(object): + """ + Tests for additional convenience methods. + """ + def test_url(self): + request = treq() + assert request.url == "http://address:22/path" + + request.url = "https://otheraddress:42/foo" + assert request.scheme == "https" + assert request.host == "otheraddress" + assert request.port == 42 + assert request.path == "/foo" + + with raises(ValueError): + request.url = "not-a-url" + + def test_pretty_host(self): + request = treq() + assert request.pretty_host == "address" + assert request.host == "address" + request.headers["host"] = "other" + assert request.pretty_host == "other" + assert request.host == "address" + request.host = None + assert request.pretty_host is None + assert request.host is None + + # Invalid IDNA + request.headers["host"] = ".disqus.com" + assert request.pretty_host == ".disqus.com" + + def test_pretty_url(self): + request = treq() + assert request.url == "http://address:22/path" + assert request.pretty_url == "http://address:22/path" + request.headers["host"] = "other" + assert request.pretty_url == "http://other:22/path" + + def test_pretty_url_authority(self): + request = treq(first_line_format="authority") + assert request.pretty_url == "address:22" + + def test_get_query(self): + request = treq() + assert request.query is None + + request.url = "http://localhost:80/foo?bar=42" + assert request.query.lst == [("bar", "42")] + + def test_set_query(self): + request = treq() + request.query = ODict([]) + + def test_get_cookies_none(self): + request = treq() + request.headers = Headers() + assert len(request.cookies) == 0 + + def test_get_cookies_single(self): + request = treq() + request.headers = Headers(cookie="cookiename=cookievalue") + result = request.cookies + assert len(result) == 1 + assert result['cookiename'] == ['cookievalue'] + + def test_get_cookies_double(self): + request = treq() + request.headers = Headers(cookie="cookiename=cookievalue;othercookiename=othercookievalue") + result = request.cookies + assert len(result) == 2 + assert result['cookiename'] == ['cookievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_get_cookies_withequalsign(self): + request = treq() + request.headers = Headers(cookie="cookiename=coo=kievalue;othercookiename=othercookievalue") + result = request.cookies + assert len(result) == 2 + assert result['cookiename'] == ['coo=kievalue'] + assert result['othercookiename'] == ['othercookievalue'] + + def test_set_cookies(self): + request = treq() + request.headers = Headers(cookie="cookiename=cookievalue") + result = request.cookies + result["cookiename"] = ["foo"] + request.cookies = result + assert request.cookies["cookiename"] == ["foo"] + + def test_get_path_components(self): + request = treq(path=b"/foo/bar") + assert request.path_components == ["foo", "bar"] + + def test_set_path_components(self): + request = treq() + request.path_components = ["foo", "baz"] + assert request.path == "/foo/baz" + request.path_components = [] + assert request.path == "/" + + def test_anticache(self): + request = treq() + request.headers["If-Modified-Since"] = "foo" + request.headers["If-None-Match"] = "bar" + request.anticache() + assert "If-Modified-Since" not in request.headers + assert "If-None-Match" not in request.headers + + def test_anticomp(self): + request = treq() + request.headers["Accept-Encoding"] = "foobar" + request.anticomp() + assert request.headers["Accept-Encoding"] == "identity" + + def test_constrain_encoding(self): + request = treq() + + h = request.headers.copy() + request.constrain_encoding() # no-op if there is no accept_encoding header. + assert request.headers == h + + request.headers["Accept-Encoding"] = "identity, gzip, foo" + request.constrain_encoding() + assert "foo" not in request.headers["Accept-Encoding"] + assert "gzip" in request.headers["Accept-Encoding"] + + def test_get_urlencoded_form(self): + request = treq(content="foobar") + assert request.urlencoded_form is None + + request.headers["Content-Type"] = "application/x-www-form-urlencoded" + assert request.urlencoded_form == ODict(utils.urldecode(request.content)) + + def test_set_urlencoded_form(self): + request = treq() + request.urlencoded_form = ODict([('foo', 'bar'), ('rab', 'oof')]) + assert request.headers["Content-Type"] == "application/x-www-form-urlencoded" + assert request.content + + def test_get_multipart_form(self): + request = treq(content="foobar") + assert request.multipart_form is None + + request.headers["Content-Type"] = "multipart/form-data" + assert request.multipart_form == ODict( + utils.multipartdecode( + request.headers, + request.content + ) + ) diff --git a/test/http/test_response.py b/test/http/test_response.py new file mode 100644 index 00000000..a1f4abd7 --- /dev/null +++ b/test/http/test_response.py @@ -0,0 +1,100 @@ +from __future__ import absolute_import, print_function, division + +from netlib.http import Headers +from netlib.odict import ODict, ODictCaseless +from netlib.tutils import raises, tresp +from .test_message import _test_passthrough_attr, _test_decoded_attr + + +class TestResponseData(object): + def test_init(self): + with raises(AssertionError): + tresp(headers="foobar") + + assert isinstance(tresp(headers=None).headers, Headers) + + +class TestResponseCore(object): + """ + Tests for builtins and the attributes that are directly proxied from the data structure + """ + def test_repr(self): + response = tresp() + assert repr(response) == "Response(200 OK, unknown content type, 7B)" + response.content = None + assert repr(response) == "Response(200 OK, no content)" + + def test_status_code(self): + _test_passthrough_attr(tresp(), "status_code") + + def test_reason(self): + _test_decoded_attr(tresp(), "reason") + + +class TestResponseUtils(object): + """ + Tests for additional convenience methods. + """ + def test_get_cookies_none(self): + resp = tresp() + resp.headers = Headers() + assert not resp.cookies + + def test_get_cookies_empty(self): + resp = tresp() + resp.headers = Headers(set_cookie="") + assert not resp.cookies + + def test_get_cookies_simple(self): + resp = tresp() + resp.headers = Headers(set_cookie="cookiename=cookievalue") + result = resp.cookies + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0] == ["cookievalue", ODict()] + + def test_get_cookies_with_parameters(self): + resp = tresp() + resp.headers = Headers(set_cookie="cookiename=cookievalue;domain=example.com;expires=Wed Oct 21 16:29:41 2015;path=/; HttpOnly") + result = resp.cookies + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0][0] == "cookievalue" + attrs = result["cookiename"][0][1] + assert len(attrs) == 4 + assert attrs["domain"] == ["example.com"] + assert attrs["expires"] == ["Wed Oct 21 16:29:41 2015"] + assert attrs["path"] == ["/"] + assert attrs["httponly"] == [None] + + def test_get_cookies_no_value(self): + resp = tresp() + resp.headers = Headers(set_cookie="cookiename=; Expires=Thu, 01-Jan-1970 00:00:01 GMT; path=/") + result = resp.cookies + assert len(result) == 1 + assert "cookiename" in result + assert result["cookiename"][0][0] == "" + assert len(result["cookiename"][0][1]) == 2 + + def test_get_cookies_twocookies(self): + resp = tresp() + resp.headers = Headers([ + [b"Set-Cookie", b"cookiename=cookievalue"], + [b"Set-Cookie", b"othercookie=othervalue"] + ]) + result = resp.cookies + assert len(result) == 2 + assert "cookiename" in result + assert result["cookiename"][0] == ["cookievalue", ODict()] + assert "othercookie" in result + assert result["othercookie"][0] == ["othervalue", ODict()] + + def test_set_cookies(self): + resp = tresp() + v = resp.cookies + v.add("foo", ["bar", ODictCaseless()]) + resp.set_cookies(v) + + v = resp.cookies + assert len(v) == 1 + assert v["foo"] == [["bar", ODictCaseless()]] diff --git a/test/http/test_status_codes.py b/test/http/test_status_codes.py new file mode 100644 index 00000000..9fea6b70 --- /dev/null +++ b/test/http/test_status_codes.py @@ -0,0 +1,6 @@ +from netlib.http import status_codes + + +def test_simple(): + assert status_codes.IM_A_TEAPOT == 418 + assert status_codes.RESPONSES[418] == "I'm a teapot" diff --git a/test/test_utils.py b/test/test_utils.py index 17636cc4..b096e5bc 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -84,10 +84,10 @@ def test_parse_url(): def test_unparse_url(): - assert utils.unparse_url(b"http", b"foo.com", 99, b"") == b"http://foo.com:99" - assert utils.unparse_url(b"http", b"foo.com", 80, b"/bar") == b"http://foo.com/bar" - assert utils.unparse_url(b"https", b"foo.com", 80, b"") == b"https://foo.com:80" - assert utils.unparse_url(b"https", b"foo.com", 443, b"") == b"https://foo.com" + assert utils.unparse_url("http", "foo.com", 99, "") == "http://foo.com:99" + assert utils.unparse_url("http", "foo.com", 80, "/bar") == "http://foo.com/bar" + assert utils.unparse_url("https", "foo.com", 80, "") == "https://foo.com:80" + assert utils.unparse_url("https", "foo.com", 443, "") == "https://foo.com" def test_urlencode(): diff --git a/test/websockets/test_websockets.py b/test/websockets/test_websockets.py index 4ae4cf45..9a1e5d3d 100644 --- a/test/websockets/test_websockets.py +++ b/test/websockets/test_websockets.py @@ -68,7 +68,7 @@ class WebSocketsClient(tcp.TCPClient): self.wfile.write(bytes(headers) + b"\r\n") self.wfile.flush() - resp = read_response(self.rfile, treq(method="GET")) + resp = read_response(self.rfile, treq(method=b"GET")) server_nonce = self.protocol.check_server_handshake(resp.headers) if not server_nonce == self.protocol.create_server_nonce(self.client_nonce): |