aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docs/dev/models.rst2
-rw-r--r--examples/har_extractor.py2
-rw-r--r--examples/iframe_injector.py22
-rw-r--r--examples/modify_response_body.py11
-rw-r--r--examples/sslstrip.py38
-rw-r--r--examples/upsidedownternet.py20
-rw-r--r--mitmproxy/console/common.py51
-rw-r--r--mitmproxy/console/flowview.py51
-rw-r--r--mitmproxy/contentviews.py18
-rw-r--r--mitmproxy/dump.py17
-rw-r--r--mitmproxy/filt.py16
-rw-r--r--mitmproxy/flow/export.py17
-rw-r--r--mitmproxy/flow/master.py7
-rw-r--r--mitmproxy/flow/modules.py4
-rw-r--r--mitmproxy/models/http.py8
-rw-r--r--mitmproxy/protocol/http.py4
-rw-r--r--mitmproxy/web/app.py4
-rw-r--r--netlib/encoding.py97
-rw-r--r--netlib/http/headers.py15
-rw-r--r--netlib/http/http1/assemble.py4
-rw-r--r--netlib/http/message.py254
-rw-r--r--netlib/http/request.py14
-rw-r--r--netlib/http/response.py5
-rw-r--r--netlib/wsgi.py4
-rw-r--r--test/mitmproxy/test_contentview.py23
-rw-r--r--test/mitmproxy/test_flow.py18
-rw-r--r--test/netlib/http/test_headers.py11
-rw-r--r--test/netlib/http/test_message.py220
-rw-r--r--test/netlib/test_encoding.py40
29 files changed, 644 insertions, 353 deletions
diff --git a/docs/dev/models.rst b/docs/dev/models.rst
index 02f36f58..7260f1f7 100644
--- a/docs/dev/models.rst
+++ b/docs/dev/models.rst
@@ -56,8 +56,6 @@ Datastructures
:special-members:
:no-undoc-members:
- .. autoclass:: decoded
-
.. automodule:: netlib.multidict
.. autoclass:: MultiDictView
diff --git a/examples/har_extractor.py b/examples/har_extractor.py
index 90412ec0..76059d8e 100644
--- a/examples/har_extractor.py
+++ b/examples/har_extractor.py
@@ -140,7 +140,7 @@ def response(flow):
for k, v in flow.request.query or {}]
response_body_size = len(flow.response.content)
- response_body_decoded_size = len(flow.response.get_decoded_content())
+ response_body_decoded_size = len(flow.response.content)
response_body_compression = response_body_decoded_size - response_body_size
entry = HAR.entries({
diff --git a/examples/iframe_injector.py b/examples/iframe_injector.py
index 70247d31..352c3c24 100644
--- a/examples/iframe_injector.py
+++ b/examples/iframe_injector.py
@@ -2,7 +2,6 @@
# (this script works best with --anticache)
import sys
from bs4 import BeautifulSoup
-from mitmproxy.models import decoded
iframe_url = None
@@ -17,14 +16,13 @@ def start():
def response(flow):
if flow.request.host in iframe_url:
return
- with decoded(flow.response): # Remove content encoding (gzip, ...)
- html = BeautifulSoup(flow.response.content, "lxml")
- if html.body:
- iframe = html.new_tag(
- "iframe",
- src=iframe_url,
- frameborder=0,
- height=0,
- width=0)
- html.body.insert(0, iframe)
- flow.response.content = str(html).encode("utf8")
+ html = BeautifulSoup(flow.response.content, "lxml")
+ if html.body:
+ iframe = html.new_tag(
+ "iframe",
+ src=iframe_url,
+ frameborder=0,
+ height=0,
+ width=0)
+ html.body.insert(0, iframe)
+ flow.response.content = str(html).encode("utf8")
diff --git a/examples/modify_response_body.py b/examples/modify_response_body.py
index 23ad0151..b4632248 100644
--- a/examples/modify_response_body.py
+++ b/examples/modify_response_body.py
@@ -2,8 +2,6 @@
# (this script works best with --anticache)
import sys
-from mitmproxy.models import decoded
-
state = {}
@@ -17,8 +15,7 @@ def start():
def response(flow):
- with decoded(flow.response): # automatically decode gzipped responses.
- flow.response.content = flow.response.content.replace(
- state["old"],
- state["new"]
- )
+ flow.response.content = flow.response.content.replace(
+ state["old"],
+ state["new"]
+ )
diff --git a/examples/sslstrip.py b/examples/sslstrip.py
index afc95fc8..0be1f020 100644
--- a/examples/sslstrip.py
+++ b/examples/sslstrip.py
@@ -1,4 +1,3 @@
-from netlib.http import decoded
import re
from six.moves import urllib
@@ -17,22 +16,21 @@ def request(flow):
def response(flow):
- with decoded(flow.response):
- flow.request.headers.pop('Strict-Transport-Security', None)
- flow.request.headers.pop('Public-Key-Pins', None)
-
- # strip links in response body
- flow.response.content = flow.response.content.replace('https://', 'http://')
-
- # strip links in 'Location' header
- if flow.response.headers.get('Location', '').startswith('https://'):
- location = flow.response.headers['Location']
- hostname = urllib.parse.urlparse(location).hostname
- if hostname:
- secure_hosts.add(hostname)
- flow.response.headers['Location'] = location.replace('https://', 'http://', 1)
-
- # strip secure flag from 'Set-Cookie' headers
- cookies = flow.response.headers.get_all('Set-Cookie')
- cookies = [re.sub(r';\s*secure\s*', '', s) for s in cookies]
- flow.response.headers.set_all('Set-Cookie', cookies)
+ flow.request.headers.pop('Strict-Transport-Security', None)
+ flow.request.headers.pop('Public-Key-Pins', None)
+
+ # strip links in response body
+ flow.response.content = flow.response.content.replace('https://', 'http://')
+
+ # strip links in 'Location' header
+ if flow.response.headers.get('Location', '').startswith('https://'):
+ location = flow.response.headers['Location']
+ hostname = urllib.parse.urlparse(location).hostname
+ if hostname:
+ secure_hosts.add(hostname)
+ flow.response.headers['Location'] = location.replace('https://', 'http://', 1)
+
+ # strip secure flag from 'Set-Cookie' headers
+ cookies = flow.response.headers.get_all('Set-Cookie')
+ cookies = [re.sub(r';\s*secure\s*', '', s) for s in cookies]
+ flow.response.headers.set_all('Set-Cookie', cookies)
diff --git a/examples/upsidedownternet.py b/examples/upsidedownternet.py
index fafdefce..d5059092 100644
--- a/examples/upsidedownternet.py
+++ b/examples/upsidedownternet.py
@@ -1,17 +1,15 @@
from six.moves import cStringIO as StringIO
from PIL import Image
-from mitmproxy.models import decoded
def response(flow):
if flow.response.headers.get("content-type", "").startswith("image"):
- with decoded(flow.response): # automatically decode gzipped responses.
- try:
- s = StringIO(flow.response.content)
- img = Image.open(s).rotate(180)
- s2 = StringIO()
- img.save(s2, "png")
- flow.response.content = s2.getvalue()
- flow.response.headers["content-type"] = "image/png"
- except: # Unknown image types etc.
- pass
+ try:
+ s = StringIO(flow.response.content)
+ img = Image.open(s).rotate(180)
+ s2 = StringIO()
+ img.save(s2, "png")
+ flow.response.content = s2.getvalue()
+ flow.response.headers["content-type"] = "image/png"
+ except: # Unknown image types etc.
+ pass
diff --git a/mitmproxy/console/common.py b/mitmproxy/console/common.py
index 66962729..f15031c2 100644
--- a/mitmproxy/console/common.py
+++ b/mitmproxy/console/common.py
@@ -8,7 +8,6 @@ import six
import netlib
from mitmproxy import flow
-from mitmproxy import models
from mitmproxy import utils
from mitmproxy.console import signals
from netlib import human
@@ -258,28 +257,30 @@ def copy_flow_format_data(part, scope, flow):
else:
data = ""
if scope in ("q", "a"):
- if flow.request.content is None:
+ request = flow.request.copy()
+ request.decode(strict=False)
+ if request.content is None:
return None, "Request content is missing"
- with models.decoded(flow.request):
- if part == "h":
- data += netlib.http.http1.assemble_request(flow.request)
- elif part == "c":
- data += flow.request.content
- else:
- raise ValueError("Unknown part: {}".format(part))
- if scope == "a" and flow.request.content and flow.response:
+ if part == "h":
+ data += netlib.http.http1.assemble_request(request)
+ elif part == "c":
+ data += request.content
+ else:
+ raise ValueError("Unknown part: {}".format(part))
+ if scope == "a" and flow.request.raw_content and flow.response:
# Add padding between request and response
data += "\r\n" * 2
if scope in ("s", "a") and flow.response:
- if flow.response.content is None:
+ response = flow.response.copy()
+ response.decode(strict=False)
+ if response.content is None:
return None, "Response content is missing"
- with models.decoded(flow.response):
- if part == "h":
- data += netlib.http.http1.assemble_response(flow.response)
- elif part == "c":
- data += flow.response.content
- else:
- raise ValueError("Unknown part: {}".format(part))
+ if part == "h":
+ data += netlib.http.http1.assemble_response(response)
+ elif part == "c":
+ data += response.content
+ else:
+ raise ValueError("Unknown part: {}".format(part))
return data, False
@@ -365,8 +366,8 @@ def ask_save_body(part, master, state, flow):
"q" (request), "s" (response) or None (ask user if necessary).
"""
- request_has_content = flow.request and flow.request.content
- response_has_content = flow.response and flow.response.content
+ request_has_content = flow.request and flow.request.raw_content
+ response_has_content = flow.response and flow.response.raw_content
if part is None:
# We first need to determine whether we want to save the request or the
@@ -389,12 +390,12 @@ def ask_save_body(part, master, state, flow):
elif part == "q" and request_has_content:
ask_save_path(
"Save request content",
- flow.request.get_decoded_content()
+ flow.request.get_content(strict=False),
)
elif part == "s" and response_has_content:
ask_save_path(
"Save response content",
- flow.response.get_decoded_content()
+ flow.response.get_content(strict=False),
)
else:
signals.status_message.send(message="No content to save.")
@@ -419,9 +420,9 @@ def format_flow(f, focus, extended=False, hostheader=False, marked=False):
marked = marked,
)
if f.response:
- if f.response.content:
- contentdesc = human.pretty_size(len(f.response.content))
- elif f.response.content is None:
+ if f.response.raw_content:
+ contentdesc = human.pretty_size(len(f.response.raw_content))
+ elif f.response.raw_content is None:
contentdesc = "[content missing]"
else:
contentdesc = "[no content]"
diff --git a/mitmproxy/console/flowview.py b/mitmproxy/console/flowview.py
index f4db5129..d13e9db0 100644
--- a/mitmproxy/console/flowview.py
+++ b/mitmproxy/console/flowview.py
@@ -176,7 +176,7 @@ class FlowView(tabs.Tabs):
self.show()
def content_view(self, viewmode, message):
- if message.content is None:
+ if message.raw_content is None:
msg, body = "", [urwid.Text([("error", "[content missing]")])]
return msg, body
else:
@@ -200,20 +200,34 @@ class FlowView(tabs.Tabs):
def _get_content_view(self, viewmode, message, max_lines, _):
try:
+ content = message.content
+ if content != message.raw_content:
+ enc = "[decoded {}]".format(
+ message.headers.get("content-encoding")
+ )
+ else:
+ enc = None
+ except ValueError:
+ content = message.raw_content
+ enc = "[cannot decode]"
+ try:
query = None
if isinstance(message, models.HTTPRequest):
query = message.query
description, lines = contentviews.get_content_view(
- viewmode, message.content, headers=message.headers, query=query
+ viewmode, content, headers=message.headers, query=query
)
except exceptions.ContentViewException:
s = "Content viewer failed: \n" + traceback.format_exc()
signals.add_log(s, "error")
description, lines = contentviews.get_content_view(
- contentviews.get("Raw"), message.content, headers=message.headers
+ contentviews.get("Raw"), content, headers=message.headers
)
description = description.replace("Raw", "Couldn't parse: falling back to Raw")
+ if enc:
+ description = " ".join([enc, description])
+
# Give hint that you have to tab for the response.
if description == "No content" and isinstance(message, models.HTTPRequest):
description = "No request content (press tab to view response)"
@@ -407,17 +421,16 @@ class FlowView(tabs.Tabs):
)
)
if part == "r":
- with models.decoded(message):
- # Fix an issue caused by some editors when editing a
- # request/response body. Many editors make it hard to save a
- # file without a terminating newline on the last line. When
- # editing message bodies, this can cause problems. For now, I
- # just strip the newlines off the end of the body when we return
- # from an editor.
- c = self.master.spawn_editor(message.content or "")
- message.content = c.rstrip("\n")
+ # Fix an issue caused by some editors when editing a
+ # request/response body. Many editors make it hard to save a
+ # file without a terminating newline on the last line. When
+ # editing message bodies, this can cause problems. For now, I
+ # just strip the newlines off the end of the body when we return
+ # from an editor.
+ c = self.master.spawn_editor(message.get_content(strict=False) or b"")
+ message.content = c.rstrip(b"\n")
elif part == "f":
- if not message.urlencoded_form and message.content:
+ if not message.urlencoded_form and message.raw_content:
signals.status_prompt_onekey.send(
prompt = "Existing body is not a URL-encoded form. Clear and edit?",
keys = [
@@ -512,14 +525,10 @@ class FlowView(tabs.Tabs):
signals.flow_change.send(self, flow = self.flow)
def delete_body(self, t):
- if t == "m":
- val = None
- else:
- val = None
if self.tab_offset == TAB_REQ:
- self.flow.request.content = val
+ self.flow.request.content = None
else:
- self.flow.response.content = val
+ self.flow.response.content = None
signals.flow_change.send(self, flow = self.flow)
def keypress(self, size, key):
@@ -681,10 +690,10 @@ class FlowView(tabs.Tabs):
)
key = None
elif key == "v":
- if conn.content:
+ if conn.raw_content:
t = conn.headers.get("content-type")
if "EDITOR" in os.environ or "PAGER" in os.environ:
- self.master.spawn_external_viewer(conn.content, t)
+ self.master.spawn_external_viewer(conn.get_content(strict=False), t)
else:
signals.status_message.send(
message = "Error! Set $EDITOR or $PAGER."
diff --git a/mitmproxy/contentviews.py b/mitmproxy/contentviews.py
index 331a706f..afdaad7f 100644
--- a/mitmproxy/contentviews.py
+++ b/mitmproxy/contentviews.py
@@ -31,7 +31,6 @@ from six import BytesIO
from mitmproxy import exceptions
from mitmproxy.contrib import jsbeautifier
from mitmproxy.contrib.wbxml import ASCommandResponse
-from netlib import encoding
from netlib import http
from netlib import multidict
from netlib.http import url
@@ -620,15 +619,6 @@ def get_content_view(viewmode, data, **metadata):
Raises:
ContentViewException, if the content view threw an error.
"""
- msg = []
-
- headers = metadata.get("headers", {})
- enc = headers.get("content-encoding")
- if enc and enc != "identity":
- decoded = encoding.decode(enc, data)
- if decoded:
- data = decoded
- msg.append("[decoded %s]" % enc)
try:
ret = viewmode(data, **metadata)
# Third-party viewers can fail in unexpected ways...
@@ -639,8 +629,8 @@ def get_content_view(viewmode, data, **metadata):
sys.exc_info()[2]
)
if not ret:
- ret = get("Raw")(data, **metadata)
- msg.append("Couldn't parse: falling back to Raw")
+ desc = "Couldn't parse: falling back to Raw"
+ _, content = get("Raw")(data, **metadata)
else:
- msg.append(ret[0])
- return " ".join(msg), safe_to_print(ret[1])
+ desc, content = ret
+ return desc, safe_to_print(content)
diff --git a/mitmproxy/dump.py b/mitmproxy/dump.py
index de63ca10..e7cebf99 100644
--- a/mitmproxy/dump.py
+++ b/mitmproxy/dump.py
@@ -143,15 +143,20 @@ class DumpMaster(flow.FlowMaster):
)
self.echo(headers, indent=4)
if self.options.flow_detail >= 3:
- if message.content is None:
+ try:
+ content = message.content
+ except ValueError:
+ content = message.get_content(strict=False)
+
+ if content is None:
self.echo("(content missing)", indent=4)
- elif message.content:
+ elif content:
self.echo("")
try:
type, lines = contentviews.get_content_view(
contentviews.get("Auto"),
- message.content,
+ content,
headers=getattr(message, "headers", None)
)
except exceptions.ContentViewException:
@@ -159,7 +164,7 @@ class DumpMaster(flow.FlowMaster):
self.add_log(s, "debug")
type, lines = contentviews.get_content_view(
contentviews.get("Raw"),
- message.content,
+ content,
headers=getattr(message, "headers", None)
)
@@ -248,10 +253,10 @@ class DumpMaster(flow.FlowMaster):
code = click.style(str(code), fg=code_color, bold=True, blink=(code == 418))
reason = click.style(strutils.escape_control_characters(flow.response.reason), fg=code_color, bold=True)
- if flow.response.content is None:
+ if flow.response.raw_content is None:
size = "(content missing)"
else:
- size = human.pretty_size(len(flow.response.content))
+ size = human.pretty_size(len(flow.response.raw_content))
size = click.style(size, bold=True)
arrows = click.style("<<", bold=True)
diff --git a/mitmproxy/filt.py b/mitmproxy/filt.py
index b1b72aa7..a42988f1 100644
--- a/mitmproxy/filt.py
+++ b/mitmproxy/filt.py
@@ -193,11 +193,11 @@ class FBod(_Rex):
help = "Body"
def __call__(self, f):
- if f.request and f.request.content:
- if self.re.search(f.request.get_decoded_content()):
+ if f.request and f.request.raw_content:
+ if self.re.search(f.request.get_content(strict=False)):
return True
- if f.response and f.response.content:
- if self.re.search(f.response.get_decoded_content()):
+ if f.response and f.response.raw_content:
+ if self.re.search(f.response.get_content(strict=False)):
return True
return False
@@ -207,8 +207,8 @@ class FBodRequest(_Rex):
help = "Request body"
def __call__(self, f):
- if f.request and f.request.content:
- if self.re.search(f.request.get_decoded_content()):
+ if f.request and f.request.raw_content:
+ if self.re.search(f.request.get_content(strict=False)):
return True
@@ -217,8 +217,8 @@ class FBodResponse(_Rex):
help = "Response body"
def __call__(self, f):
- if f.response and f.response.content:
- if self.re.search(f.response.get_decoded_content()):
+ if f.response and f.response.raw_content:
+ if self.re.search(f.response.get_content(strict=False)):
return True
diff --git a/mitmproxy/flow/export.py b/mitmproxy/flow/export.py
index 67401719..deeeb998 100644
--- a/mitmproxy/flow/export.py
+++ b/mitmproxy/flow/export.py
@@ -30,17 +30,20 @@ def dictstr(items, indent):
def curl_command(flow):
data = "curl "
- for k, v in flow.request.headers.fields:
- data += "-H '%s:%s' " % (_native(k), _native(v))
+ request = flow.request.copy()
+ request.decode(strict=False)
- if flow.request.method != "GET":
- data += "-X %s " % flow.request.method
+ for k, v in request.headers.items(multi=True):
+ data += "-H '%s:%s' " % (k, v)
- full_url = flow.request.scheme + "://" + flow.request.host + flow.request.path
+ if request.method != "GET":
+ data += "-X %s " % request.method
+
+ full_url = request.scheme + "://" + request.host + request.path
data += "'%s'" % full_url
- if flow.request.content:
- data += " --data-binary '%s'" % _native(flow.request.content)
+ if request.content:
+ data += " --data-binary '%s'" % _native(request.content)
return data
diff --git a/mitmproxy/flow/master.py b/mitmproxy/flow/master.py
index 02ae7c74..80949825 100644
--- a/mitmproxy/flow/master.py
+++ b/mitmproxy/flow/master.py
@@ -259,13 +259,16 @@ class FlowMaster(controller.Master):
return "Can't replay live request."
if f.intercepted:
return "Can't replay while intercepting..."
- if f.request.content is None:
+ if f.request.raw_content is None:
return "Can't replay request with missing content..."
if f.request:
f.backup()
f.request.is_replay = True
+
+ # TODO: We should be able to remove this.
if "Content-Length" in f.request.headers:
- f.request.headers["Content-Length"] = str(len(f.request.content))
+ f.request.headers["Content-Length"] = str(len(f.request.raw_content))
+
f.response = None
f.error = None
self.process_new_request(f)
diff --git a/mitmproxy/flow/modules.py b/mitmproxy/flow/modules.py
index 2ad514f0..d1f3dd42 100644
--- a/mitmproxy/flow/modules.py
+++ b/mitmproxy/flow/modules.py
@@ -155,7 +155,7 @@ class StreamLargeBodies(object):
expected_size = http1.expected_http_body_size(
flow.request, flow.response if not is_request else None
)
- if not r.content and not (0 <= expected_size <= self.max_size):
+ if not r.raw_content and not (0 <= expected_size <= self.max_size):
# r.stream may already be a callable, which we want to preserve.
r.stream = r.stream or True
@@ -249,7 +249,7 @@ class ServerPlaybackState:
if p[0] not in self.ignore_payload_params
)
else:
- key.append(str(r.content))
+ key.append(str(r.raw_content))
if not self.ignore_host:
key.append(r.host)
diff --git a/mitmproxy/models/http.py b/mitmproxy/models/http.py
index df546b9b..1fd28f00 100644
--- a/mitmproxy/models/http.py
+++ b/mitmproxy/models/http.py
@@ -1,10 +1,10 @@
from __future__ import absolute_import, print_function, division
import cgi
+import warnings
import six
from mitmproxy.models.flow import Flow
-from netlib import encoding
from netlib import version
from netlib.http import Headers
from netlib.http import Request
@@ -21,10 +21,8 @@ class MessageMixin(object):
header.
Doesn't change the message iteself or its headers.
"""
- ce = self.headers.get("content-encoding")
- if not self.content or ce not in encoding.ENCODINGS:
- return self.content
- return encoding.decode(ce, self.content)
+ warnings.warn(".get_decoded_content() is deprecated, please use .content directly instead.", DeprecationWarning)
+ return self.content
class HTTPRequest(MessageMixin, Request):
diff --git a/mitmproxy/protocol/http.py b/mitmproxy/protocol/http.py
index 187c17f6..2c70f288 100644
--- a/mitmproxy/protocol/http.py
+++ b/mitmproxy/protocol/http.py
@@ -41,10 +41,10 @@ class _HttpTransmissionLayer(base.Layer):
yield "this is a generator" # pragma: no cover
def send_response(self, response):
- if response.content is None:
+ if response.data.content is None:
raise netlib.exceptions.HttpException("Cannot assemble flow with missing content")
self.send_response_headers(response)
- self.send_response_body(response, [response.content])
+ self.send_response_body(response, [response.data.content])
def send_response_headers(self, response):
raise NotImplementedError()
diff --git a/mitmproxy/web/app.py b/mitmproxy/web/app.py
index ad149270..8c080e98 100644
--- a/mitmproxy/web/app.py
+++ b/mitmproxy/web/app.py
@@ -295,7 +295,7 @@ class FlowContent(RequestHandler):
def get(self, flow_id, message):
message = getattr(self.flow, message)
- if not message.content:
+ if not message.raw_content:
raise APIError(400, "No content.")
content_encoding = message.headers.get("Content-Encoding", None)
@@ -318,7 +318,7 @@ class FlowContent(RequestHandler):
self.set_header("Content-Type", "application/text")
self.set_header("X-Content-Type-Options", "nosniff")
self.set_header("X-Frame-Options", "DENY")
- self.write(message.content)
+ self.write(message.raw_content)
class Events(RequestHandler):
diff --git a/netlib/encoding.py b/netlib/encoding.py
index 98502451..8b67b543 100644
--- a/netlib/encoding.py
+++ b/netlib/encoding.py
@@ -1,39 +1,62 @@
"""
- Utility functions for decoding response bodies.
+Utility functions for decoding response bodies.
"""
from __future__ import absolute_import
+
+import codecs
from io import BytesIO
import gzip
import zlib
+from typing import Union # noqa
+
-ENCODINGS = {"identity", "gzip", "deflate"}
+def decode(obj, encoding, errors='strict'):
+ # type: (Union[str, bytes], str) -> Union[str, bytes]
+ """
+ Decode the given input object
+ Returns:
+ The decoded value
-def decode(e, content):
- if not isinstance(content, bytes):
- return None
- encoding_map = {
- "identity": identity,
- "gzip": decode_gzip,
- "deflate": decode_deflate,
- }
- if e not in encoding_map:
- return None
- return encoding_map[e](content)
+ Raises:
+ ValueError, if decoding fails.
+ """
+ try:
+ try:
+ return custom_decode[encoding](obj)
+ except KeyError:
+ return codecs.decode(obj, encoding, errors)
+ except Exception as e:
+ raise ValueError("{} when decoding {} with {}".format(
+ type(e).__name__,
+ repr(obj)[:10],
+ repr(encoding),
+ ))
+
+
+def encode(obj, encoding, errors='strict'):
+ # type: (Union[str, bytes], str) -> Union[str, bytes]
+ """
+ Encode the given input object
+ Returns:
+ The encoded value
-def encode(e, content):
- if not isinstance(content, bytes):
- return None
- encoding_map = {
- "identity": identity,
- "gzip": encode_gzip,
- "deflate": encode_deflate,
- }
- if e not in encoding_map:
- return None
- return encoding_map[e](content)
+ Raises:
+ ValueError, if encoding fails.
+ """
+ try:
+ try:
+ return custom_encode[encoding](obj)
+ except KeyError:
+ return codecs.encode(obj, encoding, errors)
+ except Exception as e:
+ raise ValueError("{} when encoding {} with {}".format(
+ type(e).__name__,
+ repr(obj)[:10],
+ repr(encoding),
+ ))
def identity(content):
@@ -46,10 +69,7 @@ def identity(content):
def decode_gzip(content):
gfile = gzip.GzipFile(fileobj=BytesIO(content))
- try:
- return gfile.read()
- except (IOError, EOFError):
- return None
+ return gfile.read()
def encode_gzip(content):
@@ -70,12 +90,9 @@ def decode_deflate(content):
http://bugs.python.org/issue5784
"""
try:
- try:
- return zlib.decompress(content)
- except zlib.error:
- return zlib.decompress(content, -15)
+ return zlib.decompress(content)
except zlib.error:
- return None
+ return zlib.decompress(content, -15)
def encode_deflate(content):
@@ -84,4 +101,16 @@ def encode_deflate(content):
"""
return zlib.compress(content)
-__all__ = ["ENCODINGS", "encode", "decode"]
+
+custom_decode = {
+ "identity": identity,
+ "gzip": decode_gzip,
+ "deflate": decode_deflate,
+}
+custom_encode = {
+ "identity": identity,
+ "gzip": encode_gzip,
+ "deflate": encode_deflate,
+}
+
+__all__ = ["encode", "decode"]
diff --git a/netlib/http/headers.py b/netlib/http/headers.py
index c8cf3e43..36e5060c 100644
--- a/netlib/http/headers.py
+++ b/netlib/http/headers.py
@@ -2,6 +2,7 @@ from __future__ import absolute_import, print_function, division
import re
+import collections
import six
from netlib import multidict
from netlib import strutils
@@ -206,10 +207,22 @@ def parse_content_type(c):
ts = parts[0].split("/", 1)
if len(ts) != 2:
return None
- d = {}
+ d = collections.OrderedDict()
if len(parts) == 2:
for i in parts[1].split(";"):
clause = i.split("=", 1)
if len(clause) == 2:
d[clause[0].strip()] = clause[1].strip()
return ts[0].lower(), ts[1].lower(), d
+
+
+def assemble_content_type(type, subtype, parameters):
+ if not parameters:
+ return "{}/{}".format(type, subtype)
+ params = "; ".join(
+ "{}={}".format(k, v)
+ for k, v in parameters.items()
+ )
+ return "{}/{}; {}".format(
+ type, subtype, params
+ )
diff --git a/netlib/http/http1/assemble.py b/netlib/http/http1/assemble.py
index 511328f1..e74732d2 100644
--- a/netlib/http/http1/assemble.py
+++ b/netlib/http/http1/assemble.py
@@ -5,7 +5,7 @@ from netlib import exceptions
def assemble_request(request):
- if request.content is None:
+ if request.data.content is None:
raise exceptions.HttpException("Cannot assemble flow with missing content")
head = assemble_request_head(request)
body = b"".join(assemble_body(request.data.headers, [request.data.content]))
@@ -19,7 +19,7 @@ def assemble_request_head(request):
def assemble_response(response):
- if response.content is None:
+ if response.data.content is None:
raise exceptions.HttpException("Cannot assemble flow with missing content")
head = assemble_response_head(response)
body = b"".join(assemble_body(response.data.headers, [response.data.content]))
diff --git a/netlib/http/message.py b/netlib/http/message.py
index b268fec9..34709f0a 100644
--- a/netlib/http/message.py
+++ b/netlib/http/message.py
@@ -52,7 +52,23 @@ class MessageData(basetypes.Serializable):
return cls(**state)
+class CachedDecode(object):
+ __slots__ = ["encoded", "encoding", "strict", "decoded"]
+
+ def __init__(self, object, encoding, strict, decoded):
+ self.encoded = object
+ self.encoding = encoding
+ self.strict = strict
+ self.decoded = decoded
+
+no_cached_decode = CachedDecode(None, None, None, None)
+
+
class Message(basetypes.Serializable):
+ def __init__(self):
+ self._content_cache = no_cached_decode # type: CachedDecode
+ self._text_cache = no_cached_decode # type: CachedDecode
+
def __eq__(self, other):
if isinstance(other, Message):
return self.data == other.data
@@ -90,22 +106,82 @@ class Message(basetypes.Serializable):
self.data.headers = h
@property
- def content(self):
+ def raw_content(self):
+ # type: () -> bytes
"""
The raw (encoded) HTTP message body
- See also: :py:attr:`text`
+ See also: :py:attr:`content`, :py:class:`text`
"""
return self.data.content
- @content.setter
- def content(self, content):
- # type: (Optional[bytes]) -> None
+ @raw_content.setter
+ def raw_content(self, content):
self.data.content = content
- if isinstance(content, six.text_type):
- raise ValueError("Message content must be bytes, not {}".format(type(content).__name__))
- if isinstance(content, bytes):
- self.headers["content-length"] = str(len(content))
+
+ def get_content(self, strict=True):
+ # type: (bool) -> bytes
+ """
+ The HTTP message body decoded with the content-encoding header (e.g. gzip)
+
+ Raises:
+ ValueError, when the content-encoding is invalid and strict is True.
+
+ See also: :py:class:`raw_content`, :py:attr:`text`
+ """
+ if self.raw_content is None:
+ return None
+ ce = self.headers.get("content-encoding")
+ cached = (
+ self._content_cache.encoded == self.raw_content and
+ (self._content_cache.strict or not strict) and
+ self._content_cache.encoding == ce
+ )
+ if not cached:
+ is_strict = True
+ if ce:
+ try:
+ decoded = encoding.decode(self.raw_content, ce)
+ except ValueError:
+ if strict:
+ raise
+ is_strict = False
+ decoded = self.raw_content
+ else:
+ decoded = self.raw_content
+ self._content_cache = CachedDecode(self.raw_content, ce, is_strict, decoded)
+ return self._content_cache.decoded
+
+ def set_content(self, value):
+ if value is None:
+ self.raw_content = None
+ return
+ if not isinstance(value, bytes):
+ raise TypeError(
+ "Message content must be bytes, not {}. "
+ "Please use .text if you want to assign a str."
+ .format(type(value).__name__)
+ )
+ ce = self.headers.get("content-encoding")
+ cached = (
+ self._content_cache.decoded == value and
+ self._content_cache.encoding == ce and
+ self._content_cache.strict
+ )
+ if not cached:
+ try:
+ encoded = encoding.encode(value, ce or "identity")
+ except ValueError:
+ # So we have an invalid content-encoding?
+ # Let's remove it!
+ del self.headers["content-encoding"]
+ ce = None
+ encoded = value
+ self._content_cache = CachedDecode(encoded, ce, True, value)
+ self.raw_content = self._content_cache.encoded
+ self.headers["content-length"] = str(len(self.raw_content))
+
+ content = property(get_content, set_content)
@property
def http_version(self):
@@ -140,56 +216,108 @@ class Message(basetypes.Serializable):
def timestamp_end(self, timestamp_end):
self.data.timestamp_end = timestamp_end
- @property
- def text(self):
- """
- The decoded HTTP message body.
- Decoded contents are not cached, so accessing this attribute repeatedly is relatively expensive.
+ def _get_content_type_charset(self):
+ # type: () -> Optional[str]
+ ct = headers.parse_content_type(self.headers.get("content-type", ""))
+ if ct:
+ return ct[2].get("charset")
- .. note::
- This is not implemented yet.
+ def _guess_encoding(self):
+ # type: () -> str
+ enc = self._get_content_type_charset()
+ if enc:
+ return enc
- See also: :py:attr:`content`, :py:class:`decoded`
+ if "json" in self.headers.get("content-type", ""):
+ return "utf8"
+ else:
+ # We may also want to check for HTML meta tags here at some point.
+ return "latin-1"
+
+ def get_text(self, strict=True):
+ # type: (bool) -> six.text_type
"""
- # This attribute should be called text, because that's what requests does.
- raise NotImplementedError()
+ The HTTP message body decoded with both content-encoding header (e.g. gzip)
+ and content-type header charset.
- @text.setter
- def text(self, text):
- raise NotImplementedError()
+ Raises:
+ ValueError, when either content-encoding or charset is invalid and strict is True.
- def decode(self):
+ See also: :py:attr:`content`, :py:class:`raw_content`
+ """
+ if self.raw_content is None:
+ return None
+ enc = self._guess_encoding()
+
+ content = self.get_content(strict)
+ cached = (
+ self._text_cache.encoded == content and
+ (self._text_cache.strict or not strict) and
+ self._text_cache.encoding == enc
+ )
+ if not cached:
+ is_strict = self._content_cache.strict
+ try:
+ decoded = encoding.decode(content, enc)
+ except ValueError:
+ if strict:
+ raise
+ is_strict = False
+ decoded = self.content.decode("utf8", "replace" if six.PY2 else "surrogateescape")
+ self._text_cache = CachedDecode(content, enc, is_strict, decoded)
+ return self._text_cache.decoded
+
+ def set_text(self, text):
+ if text is None:
+ self.content = None
+ return
+ enc = self._guess_encoding()
+
+ cached = (
+ self._text_cache.decoded == text and
+ self._text_cache.encoding == enc and
+ self._text_cache.strict
+ )
+ if not cached:
+ try:
+ encoded = encoding.encode(text, enc)
+ except ValueError:
+ # Fall back to UTF-8 and update the content-type header.
+ ct = headers.parse_content_type(self.headers.get("content-type", "")) or ("text", "plain", {})
+ ct[2]["charset"] = "utf-8"
+ self.headers["content-type"] = headers.assemble_content_type(*ct)
+ enc = "utf8"
+ encoded = text.encode(enc, "replace" if six.PY2 else "surrogateescape")
+ self._text_cache = CachedDecode(encoded, enc, True, text)
+ self.content = self._text_cache.encoded
+
+ text = property(get_text, set_text)
+
+ def decode(self, strict=True):
"""
- Decodes body based on the current Content-Encoding header, then
- removes the header. If there is no Content-Encoding header, no
- action is taken.
+ Decodes body based on the current Content-Encoding header, then
+ removes the header. If there is no Content-Encoding header, no
+ action is taken.
- Returns:
- True, if decoding succeeded.
- False, otherwise.
+ Raises:
+ ValueError, when the content-encoding is invalid and strict is True.
"""
- ce = self.headers.get("content-encoding")
- data = encoding.decode(ce, self.content)
- if data is None:
- return False
- self.content = data
+ self.raw_content = self.get_content(strict)
self.headers.pop("content-encoding", None)
- return True
def encode(self, e):
"""
- Encodes body with the encoding e, where e is "gzip", "deflate" or "identity".
+ Encodes body with the encoding e, where e is "gzip", "deflate" or "identity".
+ Any existing content-encodings are overwritten,
+ the content is not decoded beforehand.
- Returns:
- True, if decoding succeeded.
- False, otherwise.
+ Raises:
+ ValueError, when the specified content-encoding is invalid.
"""
- data = encoding.encode(e, self.content)
- if data is None:
- return False
- self.content = data
self.headers["content-encoding"] = e
- return True
+ self.content = self.raw_content
+ if "content-encoding" not in self.headers:
+ raise ValueError("Invalid content encoding {}".format(repr(e)))
def replace(self, pattern, repl, flags=0):
"""
@@ -206,10 +334,9 @@ class Message(basetypes.Serializable):
repl = strutils.escaped_str_to_bytes(repl)
replacements = 0
if self.content:
- with decoded(self):
- self.content, replacements = re.subn(
- pattern, repl, self.content, flags=flags
- )
+ self.content, replacements = re.subn(
+ pattern, repl, self.content, flags=flags
+ )
replacements += self.headers.replace(pattern, repl, flags)
return replacements
@@ -228,29 +355,16 @@ class Message(basetypes.Serializable):
class decoded(object):
"""
- A context manager that decodes a request or response, and then
- re-encodes it with the same encoding after execution of the block.
-
- Example:
-
- .. code-block:: python
-
- with decoded(request):
- request.content = request.content.replace("foo", "bar")
+ Deprecated: You can now directly use :py:attr:`content`.
+ :py:attr:`raw_content` has the encoded content.
"""
- def __init__(self, message):
- self.message = message
- ce = message.headers.get("content-encoding")
- if ce in encoding.ENCODINGS:
- self.ce = ce
- else:
- self.ce = None
+ def __init__(self, message): # pragma no cover
+ warnings.warn("decoded() is deprecated, you can now directly use .content instead. "
+ ".raw_content has the encoded content.", DeprecationWarning)
- def __enter__(self):
- if self.ce:
- self.message.decode()
+ def __enter__(self): # pragma no cover
+ pass
- def __exit__(self, type, value, tb):
- if self.ce:
- self.message.encode(self.ce)
+ def __exit__(self, type, value, tb): # pragma no cover
+ pass
diff --git a/netlib/http/request.py b/netlib/http/request.py
index c4c39942..ecaa9b79 100644
--- a/netlib/http/request.py
+++ b/netlib/http/request.py
@@ -5,7 +5,6 @@ import re
import six
from six.moves import urllib
-from netlib import encoding
from netlib import multidict
from netlib import strutils
from netlib.http import multipart
@@ -56,6 +55,7 @@ class Request(message.Message):
An HTTP request.
"""
def __init__(self, *args, **kwargs):
+ super(Request, self).__init__()
self.data = RequestData(*args, **kwargs)
def __repr__(self):
@@ -339,7 +339,7 @@ class Request(message.Message):
self.headers["accept-encoding"] = (
', '.join(
e
- for e in encoding.ENCODINGS
+ for e in {"gzip", "identity", "deflate"}
if e in accept_encoding
)
)
@@ -359,7 +359,10 @@ class Request(message.Message):
def _get_urlencoded_form(self):
is_valid_content_type = "application/x-www-form-urlencoded" in self.headers.get("content-type", "").lower()
if is_valid_content_type:
- return tuple(netlib.http.url.decode(self.content))
+ try:
+ return tuple(netlib.http.url.decode(self.content))
+ except ValueError:
+ pass
return ()
def _set_urlencoded_form(self, value):
@@ -388,7 +391,10 @@ class Request(message.Message):
def _get_multipart_form(self):
is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower()
if is_valid_content_type:
- return multipart.decode(self.headers, self.content)
+ try:
+ return multipart.decode(self.headers, self.content)
+ except ValueError:
+ pass
return ()
def _set_multipart_form(self, value):
diff --git a/netlib/http/response.py b/netlib/http/response.py
index 7cfb55c8..85f54940 100644
--- a/netlib/http/response.py
+++ b/netlib/http/response.py
@@ -37,13 +37,14 @@ class Response(message.Message):
An HTTP response.
"""
def __init__(self, *args, **kwargs):
+ super(Response, self).__init__()
self.data = ResponseData(*args, **kwargs)
def __repr__(self):
- if self.content:
+ if self.raw_content:
details = "{}, {}".format(
self.headers.get("content-type", "unknown content type"),
- human.pretty_size(len(self.content))
+ human.pretty_size(len(self.raw_content))
)
else:
details = "no content"
diff --git a/netlib/wsgi.py b/netlib/wsgi.py
index c66fddc2..0def75b5 100644
--- a/netlib/wsgi.py
+++ b/netlib/wsgi.py
@@ -54,6 +54,10 @@ class WSGIAdaptor(object):
self.app, self.domain, self.port, self.sversion = app, domain, port, sversion
def make_environ(self, flow, errsoc, **extra):
+ """
+ Raises:
+ ValueError, if the content-encoding is invalid.
+ """
path = strutils.native(flow.request.path, "latin-1")
if '?' in path:
path_info, query = strutils.native(path, "latin-1").split('?', 1)
diff --git a/test/mitmproxy/test_contentview.py b/test/mitmproxy/test_contentview.py
index c11a5fe5..2db9ab40 100644
--- a/test/mitmproxy/test_contentview.py
+++ b/test/mitmproxy/test_contentview.py
@@ -1,6 +1,5 @@
from mitmproxy.exceptions import ContentViewException
from netlib.http import Headers
-from netlib import encoding
from netlib.http import url
from netlib import multidict
@@ -216,28 +215,6 @@ Larry
headers=Headers()
)
- r = cv.get_content_view(
- cv.get("Auto"),
- encoding.encode('gzip', b"[1, 2, 3]"),
- headers=Headers(
- content_type="application/json",
- content_encoding="gzip"
- )
- )
- assert "decoded gzip" in r[0]
- assert "JSON" in r[0]
-
- r = cv.get_content_view(
- cv.get("XML"),
- encoding.encode('gzip', b"[1, 2, 3]"),
- headers=Headers(
- content_type="application/json",
- content_encoding="gzip"
- )
- )
- assert "decoded gzip" in r[0]
- assert "Raw" in r[0]
-
def test_add_cv(self):
class TestContentView(cv.View):
name = "test"
diff --git a/test/mitmproxy/test_flow.py b/test/mitmproxy/test_flow.py
index 8197ba08..f73616d1 100644
--- a/test/mitmproxy/test_flow.py
+++ b/test/mitmproxy/test_flow.py
@@ -434,13 +434,13 @@ class TestFlow(object):
f.replace("foo", "bar")
- assert f.request.content != b"abarb"
+ assert f.request.raw_content != b"abarb"
f.request.decode()
- assert f.request.content == b"abarb"
+ assert f.request.raw_content == b"abarb"
- assert f.response.content != b"abarb"
+ assert f.response.raw_content != b"abarb"
f.response.decode()
- assert f.response.content == b"abarb"
+ assert f.response.raw_content == b"abarb"
class TestState:
@@ -879,16 +879,6 @@ class TestRequest:
r.constrain_encoding()
assert "oink" not in r.headers["accept-encoding"]
- def test_get_decoded_content(self):
- r = HTTPRequest.wrap(netlib.tutils.treq())
- r.content = None
- r.headers["content-encoding"] = "identity"
- assert r.get_decoded_content() is None
-
- r.content = b"falafel"
- r.encode("gzip")
- assert r.get_decoded_content() == b"falafel"
-
def test_get_content_type(self):
resp = HTTPResponse.wrap(netlib.tutils.tresp())
resp.headers = Headers(content_type="text/plain")
diff --git a/test/netlib/http/test_headers.py b/test/netlib/http/test_headers.py
index 51819b86..51537310 100644
--- a/test/netlib/http/test_headers.py
+++ b/test/netlib/http/test_headers.py
@@ -1,4 +1,6 @@
-from netlib.http import Headers, parse_content_type
+import collections
+
+from netlib.http.headers import Headers, parse_content_type, assemble_content_type
from netlib.tutils import raises
@@ -81,3 +83,10 @@ def test_parse_content_type():
v = p("text/html; charset=UTF-8")
assert v == ('text', 'html', {'charset': 'UTF-8'})
+
+
+def test_assemble_content_type():
+ p = assemble_content_type
+ assert p("text", "html", {}) == "text/html"
+ assert p("text", "html", {"charset": "utf8"}) == "text/html; charset=utf8"
+ assert p("text", "html", collections.OrderedDict([("charset", "utf8"), ("foo", "bar")])) == "text/html; charset=utf8; foo=bar"
diff --git a/test/netlib/http/test_message.py b/test/netlib/http/test_message.py
index ab2ac628..deebd6f2 100644
--- a/test/netlib/http/test_message.py
+++ b/test/netlib/http/test_message.py
@@ -1,8 +1,11 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division
-from netlib.http import decoded
+import mock
+import six
+
from netlib.tutils import tresp
+from netlib import http, tutils
def _test_passthrough_attr(message, attr):
@@ -68,6 +71,15 @@ class TestMessage(object):
assert resp != 0
+ def test_hash(self):
+ resp = tresp()
+ assert hash(resp)
+
+ def test_serializable(self):
+ resp = tresp()
+ resp2 = http.Response.from_state(resp.get_state())
+ assert resp == resp2
+
def test_content_length_update(self):
resp = tresp()
resp.content = b"foo"
@@ -76,9 +88,9 @@ class TestMessage(object):
resp.content = b""
assert resp.data.content == b""
assert resp.headers["content-length"] == "0"
-
- def test_content_basic(self):
- _test_passthrough_attr(tresp(), "content")
+ resp.raw_content = b"bar"
+ assert resp.data.content == b"bar"
+ assert resp.headers["content-length"] == "0"
def test_headers(self):
_test_passthrough_attr(tresp(), "headers")
@@ -89,65 +101,201 @@ class TestMessage(object):
def test_timestamp_end(self):
_test_passthrough_attr(tresp(), "timestamp_end")
- def teste_http_version(self):
+ def test_http_version(self):
_test_decoded_attr(tresp(), "http_version")
-class TestDecodedDecorator(object):
-
+class TestMessageContentEncoding(object):
def test_simple(self):
r = tresp()
- assert r.content == b"message"
+ assert r.raw_content == b"message"
assert "content-encoding" not in r.headers
- assert r.encode("gzip")
+ r.encode("gzip")
assert r.headers["content-encoding"]
- assert r.content != b"message"
- with decoded(r):
- assert "content-encoding" not in r.headers
- assert r.content == b"message"
- assert r.headers["content-encoding"]
- assert r.content != b"message"
+ assert r.raw_content != b"message"
+ assert r.content == b"message"
+ assert r.raw_content != b"message"
+
+ r.raw_content = b"foo"
+ with mock.patch("netlib.encoding.decode") as e:
+ assert r.content
+ assert e.call_count == 1
+ e.reset_mock()
+ assert r.content
+ assert e.call_count == 0
def test_modify(self):
r = tresp()
assert "content-encoding" not in r.headers
- assert r.encode("gzip")
+ r.encode("gzip")
+
+ r.content = b"foo"
+ assert r.raw_content != b"foo"
+ r.decode()
+ assert r.raw_content == b"foo"
- with decoded(r):
+ r.encode("identity")
+ with mock.patch("netlib.encoding.encode") as e:
r.content = b"foo"
+ assert e.call_count == 0
+ r.content = b"bar"
+ assert e.call_count == 1
- assert r.content != b"foo"
- r.decode()
- assert r.content == b"foo"
+ with tutils.raises(TypeError):
+ r.content = u"foo"
def test_unknown_ce(self):
r = tresp()
r.headers["content-encoding"] = "zopfli"
- r.content = b"foo"
- with decoded(r):
- assert r.headers["content-encoding"]
- assert r.content == b"foo"
+ r.raw_content = b"foo"
+ with tutils.raises(ValueError):
+ assert r.content
assert r.headers["content-encoding"]
- assert r.content == b"foo"
+ assert r.get_content(strict=False) == b"foo"
def test_cannot_decode(self):
r = tresp()
- assert r.encode("gzip")
- r.content = b"foo"
- with decoded(r):
- assert r.headers["content-encoding"]
- assert r.content == b"foo"
+ r.encode("gzip")
+ r.raw_content = b"foo"
+ with tutils.raises(ValueError):
+ assert r.content
assert r.headers["content-encoding"]
- assert r.content != b"foo"
- r.decode()
+ assert r.get_content(strict=False) == b"foo"
+
+ with tutils.raises(ValueError):
+ r.decode()
+ assert r.raw_content == b"foo"
+ assert "content-encoding" in r.headers
+
+ r.decode(strict=False)
assert r.content == b"foo"
+ assert "content-encoding" not in r.headers
+
+ def test_none(self):
+ r = tresp(content=None)
+ assert r.content is None
+ r.content = b"foo"
+ assert r.content is not None
+ r.content = None
+ assert r.content is None
def test_cannot_encode(self):
r = tresp()
- assert r.encode("gzip")
- with decoded(r):
- r.content = None
+ r.encode("gzip")
+ r.content = None
+ assert r.headers["content-encoding"]
+ assert r.raw_content is None
+ r.headers["content-encoding"] = "zopfli"
+ r.content = b"foo"
assert "content-encoding" not in r.headers
- assert r.content is None
+ assert r.raw_content == b"foo"
+
+ with tutils.raises(ValueError):
+ r.encode("zopfli")
+ assert r.raw_content == b"foo"
+ assert "content-encoding" not in r.headers
+
+
+class TestMessageText(object):
+ def test_simple(self):
+ r = tresp(content=b'\xfc')
+ assert r.raw_content == b"\xfc"
+ assert r.content == b"\xfc"
+ assert r.text == u"ü"
+
+ r.encode("gzip")
+ assert r.text == u"ü"
+ r.decode()
+ assert r.text == u"ü"
+
+ r.headers["content-type"] = "text/html; charset=latin1"
+ r.content = b"\xc3\xbc"
+ assert r.text == u"ü"
+ r.headers["content-type"] = "text/html; charset=utf8"
+ assert r.text == u"ü"
+
+ r.encode("identity")
+ r.raw_content = b"foo"
+ with mock.patch("netlib.encoding.decode") as e:
+ assert r.text
+ assert e.call_count == 2
+ e.reset_mock()
+ assert r.text
+ assert e.call_count == 0
+
+ def test_guess_json(self):
+ r = tresp(content=b'"\xc3\xbc"')
+ r.headers["content-type"] = "application/json"
+ assert r.text == u'"ü"'
+
+ def test_none(self):
+ r = tresp(content=None)
+ assert r.text is None
+ r.text = u"foo"
+ assert r.text is not None
+ r.text = None
+ assert r.text is None
+
+ def test_modify(self):
+ r = tresp()
+
+ r.text = u"ü"
+ assert r.raw_content == b"\xfc"
+
+ r.headers["content-type"] = "text/html; charset=utf8"
+ r.text = u"ü"
+ assert r.raw_content == b"\xc3\xbc"
+ assert r.headers["content-length"] == "2"
+
+ r.encode("identity")
+ with mock.patch("netlib.encoding.encode") as e:
+ e.return_value = b""
+ r.text = u"ü"
+ assert e.call_count == 0
+ r.text = u"ä"
+ assert e.call_count == 2
+
+ def test_unknown_ce(self):
+ r = tresp()
+ r.headers["content-type"] = "text/html; charset=wtf"
+ r.raw_content = b"foo"
+ with tutils.raises(ValueError):
+ assert r.text == u"foo"
+ assert r.get_text(strict=False) == u"foo"
+
+ def test_cannot_decode(self):
+ r = tresp()
+ r.headers["content-type"] = "text/html; charset=utf8"
+ r.raw_content = b"\xFF"
+ with tutils.raises(ValueError):
+ assert r.text
+
+ assert r.get_text(strict=False) == u'\ufffd' if six.PY2 else '\udcff'
+
+ def test_cannot_encode(self):
+ r = tresp()
+ r.content = None
+ assert "content-type" not in r.headers
+ assert r.raw_content is None
+
+ r.headers["content-type"] = "text/html; charset=latin1; foo=bar"
+ r.text = u"☃"
+ assert r.headers["content-type"] == "text/html; charset=utf-8; foo=bar"
+ assert r.raw_content == b'\xe2\x98\x83'
+
+ r.headers["content-type"] = "gibberish"
+ r.text = u"☃"
+ assert r.headers["content-type"] == "text/plain; charset=utf-8"
+ assert r.raw_content == b'\xe2\x98\x83'
+
+ del r.headers["content-type"]
+ r.text = u"☃"
+ assert r.headers["content-type"] == "text/plain; charset=utf-8"
+ assert r.raw_content == b'\xe2\x98\x83'
+
+ r.headers["content-type"] = "text/html; charset=latin1"
+ r.text = u'\udcff'
+ assert r.headers["content-type"] == "text/html; charset=utf-8"
+ assert r.raw_content == b'\xed\xb3\xbf' if six.PY2 else b"\xFF"
diff --git a/test/netlib/test_encoding.py b/test/netlib/test_encoding.py
index 0ff1aad1..de10fc48 100644
--- a/test/netlib/test_encoding.py
+++ b/test/netlib/test_encoding.py
@@ -1,37 +1,39 @@
-from netlib import encoding
+from netlib import encoding, tutils
def test_identity():
- assert b"string" == encoding.decode("identity", b"string")
- assert b"string" == encoding.encode("identity", b"string")
- assert not encoding.encode("nonexistent", b"string")
- assert not encoding.decode("nonexistent encoding", b"string")
+ assert b"string" == encoding.decode(b"string", "identity")
+ assert b"string" == encoding.encode(b"string", "identity")
+ with tutils.raises(ValueError):
+ encoding.encode(b"string", "nonexistent encoding")
def test_gzip():
assert b"string" == encoding.decode(
- "gzip",
encoding.encode(
- "gzip",
- b"string"
- )
+ b"string",
+ "gzip"
+ ),
+ "gzip"
)
- assert encoding.decode("gzip", b"bogus") is None
+ with tutils.raises(ValueError):
+ encoding.decode(b"bogus", "gzip")
def test_deflate():
assert b"string" == encoding.decode(
- "deflate",
encoding.encode(
- "deflate",
- b"string"
- )
+ b"string",
+ "deflate"
+ ),
+ "deflate"
)
assert b"string" == encoding.decode(
- "deflate",
encoding.encode(
- "deflate",
- b"string"
- )[2:-4]
+ b"string",
+ "deflate"
+ )[2:-4],
+ "deflate"
)
- assert encoding.decode("deflate", b"bogus") is None
+ with tutils.raises(ValueError):
+ encoding.decode(b"bogus", "deflate")