aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mitmproxy/contentviews.py9
-rw-r--r--mitmproxy/flow/export.py4
-rw-r--r--netlib/http/headers.py27
-rw-r--r--netlib/http/multipart.py32
-rw-r--r--netlib/http/request.py3
-rw-r--r--netlib/utils.py56
-rw-r--r--test/netlib/http/test_headers.py10
-rw-r--r--test/netlib/http/test_multipart.py23
-rw-r--r--test/netlib/test_utils.py32
9 files changed, 101 insertions, 95 deletions
diff --git a/mitmproxy/contentviews.py b/mitmproxy/contentviews.py
index 75e4273f..08a7e446 100644
--- a/mitmproxy/contentviews.py
+++ b/mitmproxy/contentviews.py
@@ -27,8 +27,9 @@ import html2text
import six
from netlib.odict import ODict
from netlib import encoding
-from netlib.http import url
-from netlib.utils import clean_bin, hexdump, multipartdecode, parse_content_type
+import netlib.http.headers
+from netlib.http import url, multipart
+from netlib.utils import clean_bin, hexdump
from . import utils
from .exceptions import ContentViewException
from .contrib import jsbeautifier
@@ -121,7 +122,7 @@ class ViewAuto(View):
headers = metadata.get("headers", {})
ctype = headers.get("content-type")
if data and ctype:
- ct = parse_content_type(ctype) if ctype else None
+ ct = netlib.http.headers.parse_content_type(ctype) if ctype else None
ct = "%s/%s" % (ct[0], ct[1])
if ct in content_types_map:
return content_types_map[ct][0](data, **metadata)
@@ -275,7 +276,7 @@ class ViewMultipart(View):
def __call__(self, data, **metadata):
headers = metadata.get("headers", {})
- v = multipartdecode(headers, data)
+ v = multipart.decode(headers, data)
if v:
return "Multipart form", self._format(v)
diff --git a/mitmproxy/flow/export.py b/mitmproxy/flow/export.py
index d2c7bceb..c2f54554 100644
--- a/mitmproxy/flow/export.py
+++ b/mitmproxy/flow/export.py
@@ -5,7 +5,7 @@ from textwrap import dedent
from six.moves.urllib.parse import quote, quote_plus
import netlib.http
-from netlib.utils import parse_content_type
+import netlib.http.headers
def curl_command(flow):
@@ -88,7 +88,7 @@ def raw_request(flow):
def is_json(headers, content):
if headers:
- ct = parse_content_type(headers.get("content-type", ""))
+ ct = netlib.http.headers.parse_content_type(headers.get("content-type", ""))
if ct and "%s/%s" % (ct[0], ct[1]) == "application/json":
try:
return json.loads(content)
diff --git a/netlib/http/headers.py b/netlib/http/headers.py
index 6165fd61..8f669ec1 100644
--- a/netlib/http/headers.py
+++ b/netlib/http/headers.py
@@ -175,3 +175,30 @@ class Headers(MultiDict):
fields.append([name, value])
self.fields = fields
return replacements
+
+
+def parse_content_type(c):
+ """
+ A simple parser for content-type values. Returns a (type, subtype,
+ parameters) tuple, where type and subtype are strings, and parameters
+ is a dict. If the string could not be parsed, return None.
+
+ E.g. the following string:
+
+ text/html; charset=UTF-8
+
+ Returns:
+
+ ("text", "html", {"charset": "UTF-8"})
+ """
+ parts = c.split(";", 1)
+ ts = parts[0].split("/", 1)
+ if len(ts) != 2:
+ return None
+ d = {}
+ if len(parts) == 2:
+ for i in parts[1].split(";"):
+ clause = i.split("=", 1)
+ if len(clause) == 2:
+ d[clause[0].strip()] = clause[1].strip()
+ return ts[0].lower(), ts[1].lower(), d
diff --git a/netlib/http/multipart.py b/netlib/http/multipart.py
new file mode 100644
index 00000000..a135eb86
--- /dev/null
+++ b/netlib/http/multipart.py
@@ -0,0 +1,32 @@
+import re
+
+from . import headers
+
+
+def decode(hdrs, content):
+ """
+ Takes a multipart boundary encoded string and returns list of (key, value) tuples.
+ """
+ v = hdrs.get("content-type")
+ if v:
+ v = headers.parse_content_type(v)
+ if not v:
+ return []
+ try:
+ boundary = v[2]["boundary"].encode("ascii")
+ except (KeyError, UnicodeError):
+ return []
+
+ rx = re.compile(br'\bname="([^"]+)"')
+ r = []
+
+ for i in content.split(b"--" + boundary):
+ parts = i.splitlines()
+ if len(parts) > 1 and parts[0][0:2] != b"--":
+ match = rx.search(parts[1])
+ if match:
+ key = match.group(1)
+ value = b"".join(parts[3 + parts[2:].index(b""):])
+ r.append((key, value))
+ return r
+ return []
diff --git a/netlib/http/request.py b/netlib/http/request.py
index d552bc70..2fcea67d 100644
--- a/netlib/http/request.py
+++ b/netlib/http/request.py
@@ -7,6 +7,7 @@ from six.moves import urllib
from netlib import utils
import netlib.http.url
+from netlib.http import multipart
from . import cookies
from .. import encoding
from ..multidict import MultiDictView
@@ -369,7 +370,7 @@ class Request(Message):
def _get_multipart_form(self):
is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower()
if is_valid_content_type:
- return utils.multipartdecode(self.headers, self.content)
+ return multipart.decode(self.headers, self.content)
return ()
def _set_multipart_form(self, value):
diff --git a/netlib/utils.py b/netlib/utils.py
index a2d8c97d..a0150e77 100644
--- a/netlib/utils.py
+++ b/netlib/utils.py
@@ -190,62 +190,6 @@ def hostport(scheme, host, port):
return "%s:%d" % (host, port)
-def parse_content_type(c):
- """
- A simple parser for content-type values. Returns a (type, subtype,
- parameters) tuple, where type and subtype are strings, and parameters
- is a dict. If the string could not be parsed, return None.
-
- E.g. the following string:
-
- text/html; charset=UTF-8
-
- Returns:
-
- ("text", "html", {"charset": "UTF-8"})
- """
- parts = c.split(";", 1)
- ts = parts[0].split("/", 1)
- if len(ts) != 2:
- return None
- d = {}
- if len(parts) == 2:
- for i in parts[1].split(";"):
- clause = i.split("=", 1)
- if len(clause) == 2:
- d[clause[0].strip()] = clause[1].strip()
- return ts[0].lower(), ts[1].lower(), d
-
-
-def multipartdecode(headers, content):
- """
- Takes a multipart boundary encoded string and returns list of (key, value) tuples.
- """
- v = headers.get("content-type")
- if v:
- v = parse_content_type(v)
- if not v:
- return []
- try:
- boundary = v[2]["boundary"].encode("ascii")
- except (KeyError, UnicodeError):
- return []
-
- rx = re.compile(br'\bname="([^"]+)"')
- r = []
-
- for i in content.split(b"--" + boundary):
- parts = i.splitlines()
- if len(parts) > 1 and parts[0][0:2] != b"--":
- match = rx.search(parts[1])
- if match:
- key = match.group(1)
- value = b"".join(parts[3 + parts[2:].index(b""):])
- r.append((key, value))
- return r
- return []
-
-
def safe_subn(pattern, repl, target, *args, **kwargs):
"""
There are Unicode conversion problems with re.subn. We try to smooth
diff --git a/test/netlib/http/test_headers.py b/test/netlib/http/test_headers.py
index cd2ca9d1..e12bceaf 100644
--- a/test/netlib/http/test_headers.py
+++ b/test/netlib/http/test_headers.py
@@ -1,4 +1,5 @@
from netlib.http import Headers
+from netlib.http.headers import parse_content_type
from netlib.tutils import raises
@@ -72,3 +73,12 @@ class TestHeaders(object):
replacements = headers.replace(r"Host: ", "X-Host ")
assert replacements == 0
assert headers["Host"] == "example.com"
+
+
+def test_parse_content_type():
+ p = parse_content_type
+ assert p("text/html") == ("text", "html", {})
+ assert p("text") is None
+
+ v = p("text/html; charset=UTF-8")
+ assert v == ('text', 'html', {'charset': 'UTF-8'})
diff --git a/test/netlib/http/test_multipart.py b/test/netlib/http/test_multipart.py
new file mode 100644
index 00000000..45ae996b
--- /dev/null
+++ b/test/netlib/http/test_multipart.py
@@ -0,0 +1,23 @@
+from netlib.http import Headers
+from netlib.http import multipart
+
+def test_decode():
+ boundary = 'somefancyboundary'
+ headers = Headers(
+ content_type='multipart/form-data; boundary=' + boundary
+ )
+ content = (
+ "--{0}\n"
+ "Content-Disposition: form-data; name=\"field1\"\n\n"
+ "value1\n"
+ "--{0}\n"
+ "Content-Disposition: form-data; name=\"field2\"\n\n"
+ "value2\n"
+ "--{0}--".format(boundary).encode()
+ )
+
+ form = multipart.decode(headers, content)
+
+ assert len(form) == 2
+ assert form[0] == (b"field1", b"value1")
+ assert form[1] == (b"field2", b"value2")
diff --git a/test/netlib/test_utils.py b/test/netlib/test_utils.py
index c4ee3c10..b3cc9a0b 100644
--- a/test/netlib/test_utils.py
+++ b/test/netlib/test_utils.py
@@ -1,7 +1,6 @@
# coding=utf-8
from netlib import utils, tutils
-from netlib.http import Headers
def test_bidi():
@@ -38,37 +37,6 @@ def test_pretty_size():
assert utils.pretty_size(1024 * 1024) == "1MB"
-def test_multipartdecode():
- boundary = 'somefancyboundary'
- headers = Headers(
- content_type='multipart/form-data; boundary=' + boundary
- )
- content = (
- "--{0}\n"
- "Content-Disposition: form-data; name=\"field1\"\n\n"
- "value1\n"
- "--{0}\n"
- "Content-Disposition: form-data; name=\"field2\"\n\n"
- "value2\n"
- "--{0}--".format(boundary).encode()
- )
-
- form = utils.multipartdecode(headers, content)
-
- assert len(form) == 2
- assert form[0] == (b"field1", b"value1")
- assert form[1] == (b"field2", b"value2")
-
-
-def test_parse_content_type():
- p = utils.parse_content_type
- assert p("text/html") == ("text", "html", {})
- assert p("text") is None
-
- v = p("text/html; charset=UTF-8")
- assert v == ('text', 'html', {'charset': 'UTF-8'})
-
-
def test_safe_subn():
assert utils.safe_subn("foo", u"bar", "\xc2foo")