diff options
| -rw-r--r-- | mitmproxy/addons/export.py | 81 | ||||
| -rw-r--r-- | mitmproxy/addons/serverplayback.py | 13 | ||||
| -rw-r--r-- | mitmproxy/flowfilter.py | 112 | ||||
| -rw-r--r-- | mitmproxy/net/http/multipart.py | 55 | ||||
| -rw-r--r-- | mitmproxy/net/http/request.py | 3 | ||||
| -rw-r--r-- | mitmproxy/proxy/protocol/http.py | 13 | ||||
| -rw-r--r-- | mitmproxy/tools/console/consoleaddons.py | 9 | ||||
| -rw-r--r-- | mitmproxy/tools/console/grideditor/editors.py | 20 | ||||
| -rw-r--r-- | mitmproxy/tools/console/master.py | 4 | ||||
| -rw-r--r-- | mitmproxy/tools/console/window.py | 3 | ||||
| -rw-r--r-- | setup.cfg | 2 | ||||
| -rw-r--r-- | test/mitmproxy/addons/test_export.py | 65 | ||||
| -rw-r--r-- | test/mitmproxy/net/http/test_multipart.py | 37 | ||||
| -rw-r--r-- | test/mitmproxy/net/http/test_request.py | 7 | 
14 files changed, 288 insertions, 136 deletions
| diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py index 2776118a..80413ac9 100644 --- a/mitmproxy/addons/export.py +++ b/mitmproxy/addons/export.py @@ -1,56 +1,73 @@ +import shlex  import typing -from mitmproxy import ctx +import pyperclip + +import mitmproxy.types  from mitmproxy import command -from mitmproxy import flow +from mitmproxy import ctx, http  from mitmproxy import exceptions -from mitmproxy.utils import strutils +from mitmproxy import flow  from mitmproxy.net.http.http1 import assemble -import mitmproxy.types - -import pyperclip +from mitmproxy.utils import strutils -def cleanup_request(f: flow.Flow): +def cleanup_request(f: flow.Flow) -> http.HTTPRequest:      if not hasattr(f, "request"):          raise exceptions.CommandError("Can't export flow with no request.") -    request = f.request.copy()  # type: ignore +    assert isinstance(f, http.HTTPFlow) +    request = f.request.copy()      request.decode(strict=False) -    # a bit of clean-up -    if request.method == 'GET' and request.headers.get("content-length", None) == "0": -        request.headers.pop('content-length') -    request.headers.pop(':authority', None) +    # a bit of clean-up - these headers should be automatically set by curl/httpie +    request.headers.pop('content-length') +    if request.headers.get("host", "") == request.host: +        request.headers.pop("host") +    if request.headers.get(":authority", "") == request.host: +        request.headers.pop(":authority")      return request +def request_content_for_console(request: http.HTTPRequest) -> str: +    try: +        text = request.get_text(strict=True) +        assert text +    except ValueError: +        # shlex.quote doesn't support a bytes object +        # see https://github.com/python/cpython/pull/10871 +        raise exceptions.CommandError("Request content must be valid unicode") +    escape_control_chars = {chr(i): f"\\x{i:02x}" for i in range(32)} +    return "".join( +        escape_control_chars.get(x, x) +        for x in text +    ) + +  def curl_command(f: flow.Flow) -> str: -    data = "curl "      request = cleanup_request(f) +    args = ["curl"]      for k, v in request.headers.items(multi=True): -        data += "--compressed " if k == 'accept-encoding' else "" -        data += "-H '%s:%s' " % (k, v) +        if k.lower() == "accept-encoding": +            args.append("--compressed") +        else: +            args += ["-H", f"{k}: {v}"] +      if request.method != "GET": -        data += "-X %s " % request.method -    data += "'%s'" % request.url +        args += ["-X", request.method] +    args.append(request.url)      if request.content: -        data += " --data-binary '%s'" % strutils.bytes_to_escaped_str( -            request.content, -            escape_single_quotes=True -        ) -    return data +        args += ["-d", request_content_for_console(request)] +    return ' '.join(shlex.quote(arg) for arg in args)  def httpie_command(f: flow.Flow) -> str:      request = cleanup_request(f) -    data = "http %s %s" % (request.method, request.url) +    args = ["http", request.method, request.url]      for k, v in request.headers.items(multi=True): -        data += " '%s:%s'" % (k, v) +        args.append(f"{k}: {v}") +    cmd = ' '.join(shlex.quote(arg) for arg in args)      if request.content: -        data += " <<< '%s'" % strutils.bytes_to_escaped_str( -            request.content, -            escape_single_quotes=True -        ) -    return data +        cmd += " <<< " + shlex.quote(request_content_for_console(request)) +    return cmd  def raw(f: flow.Flow) -> bytes: @@ -58,9 +75,9 @@ def raw(f: flow.Flow) -> bytes:  formats = dict( -    curl = curl_command, -    httpie = httpie_command, -    raw = raw, +    curl=curl_command, +    httpie=httpie_command, +    raw=raw,  ) diff --git a/mitmproxy/addons/serverplayback.py b/mitmproxy/addons/serverplayback.py index 51ba60b4..18bc3545 100644 --- a/mitmproxy/addons/serverplayback.py +++ b/mitmproxy/addons/serverplayback.py @@ -68,6 +68,13 @@ class ServerPlayback:              to replay.              """          ) +        loader.add_option( +            "server_replay_ignore_port", bool, False, +            """ +            Ignore request's destination port while searching for a saved flow +            to replay. +            """ +        )      @command.command("replay.server")      def load_flows(self, flows: typing.Sequence[flow.Flow]) -> None: @@ -110,7 +117,7 @@ class ServerPlayback:          _, _, path, _, query, _ = urllib.parse.urlparse(r.url)          queriesArray = urllib.parse.parse_qsl(query, keep_blank_values=True) -        key: typing.List[typing.Any] = [str(r.port), str(r.scheme), str(r.method), str(path)] +        key: typing.List[typing.Any] = [str(r.scheme), str(r.method), str(path)]          if not ctx.options.server_replay_ignore_content:              if ctx.options.server_replay_ignore_payload_params and r.multipart_form:                  key.extend( @@ -128,7 +135,9 @@ class ServerPlayback:                  key.append(str(r.raw_content))          if not ctx.options.server_replay_ignore_host: -            key.append(r.host) +            key.append(r.pretty_host) +        if not ctx.options.server_replay_ignore_port: +            key.append(r.port)          filtered = []          ignore_params = ctx.options.server_replay_ignore_params or [] diff --git a/mitmproxy/flowfilter.py b/mitmproxy/flowfilter.py index 3f5afb48..b222d2a8 100644 --- a/mitmproxy/flowfilter.py +++ b/mitmproxy/flowfilter.py @@ -32,19 +32,17 @@          rex         Equivalent to ~u rex  """ +import functools  import re  import sys -import functools +from typing import Callable, ClassVar, Optional, Sequence, Type + +import pyparsing as pp +from mitmproxy import flow  from mitmproxy import http -from mitmproxy import websocket  from mitmproxy import tcp -from mitmproxy import flow - -from mitmproxy.utils import strutils - -import pyparsing as pp -from typing import Callable, Sequence, Type, Optional, ClassVar +from mitmproxy import websocket  def only(*types): @@ -54,7 +52,9 @@ def only(*types):              if isinstance(flow, types):                  return fn(self, flow)              return False +          return filter_types +      return decorator @@ -146,10 +146,10 @@ class _Rex(_Action):      def __init__(self, expr):          self.expr = expr          if self.is_binary: -            expr = strutils.escaped_str_to_bytes(expr) +            expr = expr.encode()          try:              self.re = re.compile(expr, self.flags) -        except: +        except Exception:              raise ValueError("Cannot compile expression.") @@ -336,6 +336,7 @@ class FUrl(_Rex):      code = "u"      help = "URL"      is_binary = False +      # FUrl is special, because it can be "naked".      @classmethod @@ -469,68 +470,51 @@ def _make():      # Order is important - multi-char expressions need to come before narrow      # ones.      parts = [] -    for klass in filter_unary: -        f = pp.Literal("~%s" % klass.code) + pp.WordEnd() -        f.setParseAction(klass.make) +    for cls in filter_unary: +        f = pp.Literal(f"~{cls.code}") + pp.WordEnd() +        f.setParseAction(cls.make)          parts.append(f) -    simplerex = "".join(c for c in pp.printables if c not in "()~'\"") -    alphdevanagari = pp.pyparsing_unicode.Devanagari.alphas -    alphcyrillic = pp.pyparsing_unicode.Cyrillic.alphas -    alphgreek = pp.pyparsing_unicode.Greek.alphas -    alphchinese = pp.pyparsing_unicode.Chinese.alphas -    alpharabic = pp.pyparsing_unicode.Arabic.alphas -    alphhebrew = pp.pyparsing_unicode.Hebrew.alphas -    alphjapanese = pp.pyparsing_unicode.Japanese.alphas -    alphkorean = pp.pyparsing_unicode.Korean.alphas -    alphlatin1 = pp.pyparsing_unicode.Latin1.alphas -    alphlatinA = pp.pyparsing_unicode.LatinA.alphas -    alphlatinB = pp.pyparsing_unicode.LatinB.alphas - -    rex = pp.Word(simplerex) |\ -        pp.Word(alphcyrillic) |\ -        pp.Word(alphgreek) |\ -        pp.Word(alphchinese) |\ -        pp.Word(alpharabic) |\ -        pp.Word(alphdevanagari) |\ -        pp.Word(alphhebrew) |\ -        pp.Word(alphjapanese) |\ -        pp.Word(alphkorean) |\ -        pp.Word(alphlatin1) |\ -        pp.Word(alphlatinA) |\ -        pp.Word(alphlatinB) |\ -        pp.QuotedString("\"", escChar='\\') |\ -        pp.QuotedString("'", escChar='\\') -    for klass in filter_rex: -        f = pp.Literal("~%s" % klass.code) + pp.WordEnd() + rex.copy() -        f.setParseAction(klass.make) +    # This is a bit of a hack to simulate Word(pyparsing_unicode.printables), +    # which has a horrible performance with len(pyparsing.pyparsing_unicode.printables) == 1114060 +    unicode_words = pp.CharsNotIn("()~'\"" + pp.ParserElement.DEFAULT_WHITE_CHARS) +    unicode_words.skipWhitespace = True +    regex = ( +            unicode_words +            | pp.QuotedString('"', escChar='\\') +            | pp.QuotedString("'", escChar='\\') +    ) +    for cls in filter_rex: +        f = pp.Literal(f"~{cls.code}") + pp.WordEnd() + regex.copy() +        f.setParseAction(cls.make)          parts.append(f) -    for klass in filter_int: -        f = pp.Literal("~%s" % klass.code) + pp.WordEnd() + pp.Word(pp.nums) -        f.setParseAction(klass.make) +    for cls in filter_int: +        f = pp.Literal(f"~{cls.code}") + pp.WordEnd() + pp.Word(pp.nums) +        f.setParseAction(cls.make)          parts.append(f)      # A naked rex is a URL rex: -    f = rex.copy() +    f = regex.copy()      f.setParseAction(FUrl.make)      parts.append(f)      atom = pp.MatchFirst(parts) -    expr = pp.operatorPrecedence(atom, -                                 [(pp.Literal("!").suppress(), -                                   1, -                                   pp.opAssoc.RIGHT, -                                   lambda x: FNot(*x)), -                                     (pp.Literal("&").suppress(), -                                      2, -                                      pp.opAssoc.LEFT, -                                      lambda x: FAnd(*x)), -                                     (pp.Literal("|").suppress(), -                                      2, -                                      pp.opAssoc.LEFT, -                                      lambda x: FOr(*x)), -                                  ]) +    expr = pp.infixNotation( +        atom, +        [(pp.Literal("!").suppress(), +          1, +          pp.opAssoc.RIGHT, +          lambda x: FNot(*x)), +         (pp.Literal("&").suppress(), +          2, +          pp.opAssoc.LEFT, +          lambda x: FAnd(*x)), +         (pp.Literal("|").suppress(), +          2, +          pp.opAssoc.LEFT, +          lambda x: FOr(*x)), +         ])      expr = pp.OneOrMore(expr)      return expr.setParseAction(lambda x: FAnd(x) if len(x) != 1 else x) @@ -570,15 +554,15 @@ def match(flt, flow):  help = []  for a in filter_unary:      help.append( -        ("~%s" % a.code, a.help) +        (f"~{a.code}", a.help)      )  for b in filter_rex:      help.append( -        ("~%s regex" % b.code, b.help) +        (f"~{b.code} regex", b.help)      )  for c in filter_int:      help.append( -        ("~%s int" % c.code, c.help) +        (f"~{c.code} int", c.help)      )  help.sort()  help.extend( diff --git a/mitmproxy/net/http/multipart.py b/mitmproxy/net/http/multipart.py index a854d47f..4edf76ac 100644 --- a/mitmproxy/net/http/multipart.py +++ b/mitmproxy/net/http/multipart.py @@ -1,8 +1,43 @@  import re - +import mimetypes +from urllib.parse import quote  from mitmproxy.net.http import headers +def encode(head, l): + +    k = head.get("content-type") +    if k: +        k = headers.parse_content_type(k) +        if k is not None: +            try: +                boundary = k[2]["boundary"].encode("ascii") +                boundary = quote(boundary) +            except (KeyError, UnicodeError): +                return b"" +            hdrs = [] +            for key, value in l: +                file_type = mimetypes.guess_type(str(key))[0] or "text/plain; charset=utf-8" + +                if key: +                    hdrs.append(b"--%b" % boundary.encode('utf-8')) +                    disposition = b'form-data; name="%b"' % key +                    hdrs.append(b"Content-Disposition: %b" % disposition) +                    hdrs.append(b"Content-Type: %b" % file_type.encode('utf-8')) +                    hdrs.append(b'') +                    hdrs.append(value) +                hdrs.append(b'') + +                if value is not None: +                    # If boundary is found in value then raise ValueError +                    if re.search(rb"^--%b$" % re.escape(boundary.encode('utf-8')), value): +                        raise ValueError(b"boundary found in encoded string") + +            hdrs.append(b"--%b--\r\n" % boundary.encode('utf-8')) +            temp = b"\r\n".join(hdrs) +            return temp + +  def decode(hdrs, content):      """          Takes a multipart boundary encoded string and returns list of (key, value) tuples. @@ -19,14 +54,14 @@ def decode(hdrs, content):          rx = re.compile(br'\bname="([^"]+)"')          r = [] - -        for i in content.split(b"--" + boundary): -            parts = i.splitlines() -            if len(parts) > 1 and parts[0][0:2] != b"--": -                match = rx.search(parts[1]) -                if match: -                    key = match.group(1) -                    value = b"".join(parts[3 + parts[2:].index(b""):]) -                    r.append((key, value)) +        if content is not None: +            for i in content.split(b"--" + boundary): +                parts = i.splitlines() +                if len(parts) > 1 and parts[0][0:2] != b"--": +                    match = rx.search(parts[1]) +                    if match: +                        key = match.group(1) +                        value = b"".join(parts[3 + parts[2:].index(b""):]) +                        r.append((key, value))          return r      return [] diff --git a/mitmproxy/net/http/request.py b/mitmproxy/net/http/request.py index 1569ea72..ba699e2a 100644 --- a/mitmproxy/net/http/request.py +++ b/mitmproxy/net/http/request.py @@ -472,7 +472,8 @@ class Request(message.Message):          return ()      def _set_multipart_form(self, value): -        raise NotImplementedError() +        self.content = mitmproxy.net.http.multipart.encode(self.headers, value) +        self.headers["content-type"] = "multipart/form-data"      @property      def multipart_form(self): diff --git a/mitmproxy/proxy/protocol/http.py b/mitmproxy/proxy/protocol/http.py index 2ae656b3..4c20617b 100644 --- a/mitmproxy/proxy/protocol/http.py +++ b/mitmproxy/proxy/protocol/http.py @@ -263,7 +263,7 @@ class HttpLayer(base.Layer):                  else:                      msg = "Unexpected CONNECT request."                      self.send_error_response(400, msg) -                    raise exceptions.ProtocolException(msg) +                    return False              validate_request_form(self.mode, request)              self.channel.ask("requestheaders", f) @@ -289,9 +289,12 @@ class HttpLayer(base.Layer):              f.request = None              f.error = flow.Error(str(e))              self.channel.ask("error", f) -            raise exceptions.ProtocolException( -                "HTTP protocol error in client request: {}".format(e) -            ) from e +            self.log( +                "request", +                "warn", +                ["HTTP protocol error in client request: {}".format(e)] +            ) +            return False          self.log("request", "debug", [repr(request)]) @@ -448,8 +451,8 @@ class HttpLayer(base.Layer):                  return False  # should never be reached          except (exceptions.ProtocolException, exceptions.NetlibException) as e: -            self.send_error_response(502, repr(e))              if not f.response: +                self.send_error_response(502, repr(e))                  f.error = flow.Error(str(e))                  self.channel.ask("error", f)                  return False diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py index b6602413..9f595b42 100644 --- a/mitmproxy/tools/console/consoleaddons.py +++ b/mitmproxy/tools/console/consoleaddons.py @@ -381,7 +381,8 @@ class ConsoleAddon:          """          return [              "cookies", -            "form", +            "urlencoded form", +            "multipart form",              "path",              "method",              "query", @@ -416,8 +417,10 @@ class ConsoleAddon:              flow.response = http.HTTPResponse.make()          if part == "cookies":              self.master.switch_view("edit_focus_cookies") -        elif part == "form": -            self.master.switch_view("edit_focus_form") +        elif part == "urlencoded form": +            self.master.switch_view("edit_focus_urlencoded_form") +        elif part == "multipart form": +            self.master.switch_view("edit_focus_multipart_form")          elif part == "path":              self.master.switch_view("edit_focus_path")          elif part == "query": diff --git a/mitmproxy/tools/console/grideditor/editors.py b/mitmproxy/tools/console/grideditor/editors.py index b4d59384..a4b46a51 100644 --- a/mitmproxy/tools/console/grideditor/editors.py +++ b/mitmproxy/tools/console/grideditor/editors.py @@ -53,14 +53,30 @@ class ResponseHeaderEditor(HeaderEditor):          flow.response.headers = Headers(vals) -class RequestFormEditor(base.FocusEditor): -    title = "Edit URL-encoded Form" +class RequestMultipartEditor(base.FocusEditor): +    title = "Edit Multipart Form"      columns = [          col_text.Column("Key"),          col_text.Column("Value")      ]      def get_data(self, flow): + +        return flow.request.multipart_form.items(multi=True) + +    def set_data(self, vals, flow): +        flow.request.multipart_form = vals + + +class RequestUrlEncodedEditor(base.FocusEditor): +    title = "Edit UrlEncoded Form" +    columns = [ +        col_text.Column("Key"), +        col_text.Column("Value") +    ] + +    def get_data(self, flow): +          return flow.request.urlencoded_form.items(multi=True)      def set_data(self, vals, flow): diff --git a/mitmproxy/tools/console/master.py b/mitmproxy/tools/console/master.py index dd15a2f5..6ab9ba5a 100644 --- a/mitmproxy/tools/console/master.py +++ b/mitmproxy/tools/console/master.py @@ -120,7 +120,7 @@ class ConsoleMaster(master.Master):          with open(fd, "w" if text else "wb") as f:              f.write(data)          # if no EDITOR is set, assume 'vi' -        c = os.environ.get("EDITOR") or "vi" +        c = os.environ.get("MITMPROXY_EDITOR") or os.environ.get("EDITOR") or "vi"          cmd = shlex.split(c)          cmd.append(name)          with self.uistopped(): @@ -159,7 +159,7 @@ class ConsoleMaster(master.Master):                  shell = True          if not cmd:              # hm which one should get priority? -            c = os.environ.get("PAGER") or os.environ.get("EDITOR") +            c = os.environ.get("MITMPROXY_EDITOR") or os.environ.get("PAGER") or os.environ.get("EDITOR")              if not c:                  c = "less"              cmd = shlex.split(c) diff --git a/mitmproxy/tools/console/window.py b/mitmproxy/tools/console/window.py index 7669299c..fb2e8c1e 100644 --- a/mitmproxy/tools/console/window.py +++ b/mitmproxy/tools/console/window.py @@ -64,7 +64,8 @@ class WindowStack:              edit_focus_cookies = grideditor.CookieEditor(master),              edit_focus_setcookies = grideditor.SetCookieEditor(master),              edit_focus_setcookie_attrs = grideditor.CookieAttributeEditor(master), -            edit_focus_form = grideditor.RequestFormEditor(master), +            edit_focus_multipart_form=grideditor.RequestMultipartEditor(master), +            edit_focus_urlencoded_form=grideditor.RequestUrlEncodedEditor(master),              edit_focus_path = grideditor.PathEditor(master),              edit_focus_request_headers = grideditor.RequestHeaderEditor(master),              edit_focus_response_headers = grideditor.ResponseHeaderEditor(master), @@ -1,7 +1,7 @@  [flake8]  max-line-length = 140  max-complexity = 25 -ignore = E251,E252,C901,W292,W503,W504,W605,E722,E741 +ignore = E251,E252,C901,W292,W503,W504,W605,E722,E741,E126  exclude = mitmproxy/contrib/*,test/mitmproxy/data/*,release/build/*,mitmproxy/io/proto/*  addons = file,open,basestring,xrange,unicode,long,cmp diff --git a/test/mitmproxy/addons/test_export.py b/test/mitmproxy/addons/test_export.py index c86e0c7d..4b905722 100644 --- a/test/mitmproxy/addons/test_export.py +++ b/test/mitmproxy/addons/test_export.py @@ -1,4 +1,5 @@  import os +import shlex  import pytest  import pyperclip @@ -41,43 +42,87 @@ def tcp_flow():  class TestExportCurlCommand:      def test_get(self, get_request): -        result = """curl -H 'header:qvalue' 'http://address:22/path?a=foo&a=bar&b=baz'""" +        result = """curl -H 'header: qvalue' 'http://address:22/path?a=foo&a=bar&b=baz'"""          assert export.curl_command(get_request) == result      def test_post(self, post_request): -        result = "curl -H 'content-length:256' -X POST 'http://address:22/path' --data-binary '{}'".format( -            str(bytes(range(256)))[2:-1] -        ) +        post_request.request.content = b'nobinarysupport' +        result = "curl -X POST http://address:22/path -d nobinarysupport"          assert export.curl_command(post_request) == result +    def test_fails_with_binary_data(self, post_request): +        # shlex.quote doesn't support a bytes object +        # see https://github.com/python/cpython/pull/10871 +        post_request.request.headers["Content-Type"] = "application/json; charset=utf-8" +        with pytest.raises(exceptions.CommandError): +            export.curl_command(post_request) +      def test_patch(self, patch_request): -        result = """curl -H 'header:qvalue' -H 'content-length:7' -X PATCH 'http://address:22/path?query=param' --data-binary 'content'""" +        result = """curl -H 'header: qvalue' -X PATCH 'http://address:22/path?query=param' -d content"""          assert export.curl_command(patch_request) == result      def test_tcp(self, tcp_flow):          with pytest.raises(exceptions.CommandError):              export.curl_command(tcp_flow) +    def test_escape_single_quotes_in_body(self): +        request = tflow.tflow( +            req=tutils.treq( +                method=b'POST', +                headers=(), +                content=b"'&#" +            ) +        ) +        command = export.curl_command(request) +        assert shlex.split(command)[-2] == '-d' +        assert shlex.split(command)[-1] == "'&#" + +    def test_strip_unnecessary(self, get_request): +        get_request.request.headers.clear() +        get_request.request.headers["host"] = "address" +        get_request.request.headers[":authority"] = "address" +        get_request.request.headers["accept-encoding"] = "br" +        result = """curl --compressed 'http://address:22/path?a=foo&a=bar&b=baz'""" +        assert export.curl_command(get_request) == result +  class TestExportHttpieCommand:      def test_get(self, get_request): -        result = """http GET http://address:22/path?a=foo&a=bar&b=baz 'header:qvalue'""" +        result = """http GET 'http://address:22/path?a=foo&a=bar&b=baz' 'header: qvalue'"""          assert export.httpie_command(get_request) == result      def test_post(self, post_request): -        result = "http POST http://address:22/path 'content-length:256' <<< '{}'".format( -            str(bytes(range(256)))[2:-1] -        ) +        post_request.request.content = b'nobinarysupport' +        result = "http POST http://address:22/path <<< nobinarysupport"          assert export.httpie_command(post_request) == result +    def test_fails_with_binary_data(self, post_request): +        # shlex.quote doesn't support a bytes object +        # see https://github.com/python/cpython/pull/10871 +        post_request.request.headers["Content-Type"] = "application/json; charset=utf-8" +        with pytest.raises(exceptions.CommandError): +            export.httpie_command(post_request) +      def test_patch(self, patch_request): -        result = """http PATCH http://address:22/path?query=param 'header:qvalue' 'content-length:7' <<< 'content'""" +        result = """http PATCH 'http://address:22/path?query=param' 'header: qvalue' <<< content"""          assert export.httpie_command(patch_request) == result      def test_tcp(self, tcp_flow):          with pytest.raises(exceptions.CommandError):              export.httpie_command(tcp_flow) +    def test_escape_single_quotes_in_body(self): +        request = tflow.tflow( +            req=tutils.treq( +                method=b'POST', +                headers=(), +                content=b"'&#" +            ) +        ) +        command = export.httpie_command(request) +        assert shlex.split(command)[-2] == '<<<' +        assert shlex.split(command)[-1] == "'&#" +  class TestRaw:      def test_get(self, get_request): diff --git a/test/mitmproxy/net/http/test_multipart.py b/test/mitmproxy/net/http/test_multipart.py index 68ae6bbd..6d2e5017 100644 --- a/test/mitmproxy/net/http/test_multipart.py +++ b/test/mitmproxy/net/http/test_multipart.py @@ -1,5 +1,6 @@  from mitmproxy.net.http import Headers  from mitmproxy.net.http import multipart +import pytest  def test_decode(): @@ -22,3 +23,39 @@ def test_decode():      assert len(form) == 2      assert form[0] == (b"field1", b"value1")      assert form[1] == (b"field2", b"value2") + +    boundary = 'boundary茅莽' +    headers = Headers( +        content_type='multipart/form-data; boundary=' + boundary +    ) +    result = multipart.decode(headers, content) +    assert result == [] + +    headers = Headers( +        content_type='' +    ) +    assert multipart.decode(headers, content) == [] + + +def test_encode(): +    data = [("file".encode('utf-8'), "shell.jpg".encode('utf-8')), +            ("file_size".encode('utf-8'), "1000".encode('utf-8'))] +    headers = Headers( +        content_type='multipart/form-data; boundary=127824672498' +    ) +    content = multipart.encode(headers, data) + +    assert b'Content-Disposition: form-data; name="file"' in content +    assert b'Content-Type: text/plain; charset=utf-8\r\n\r\nshell.jpg\r\n\r\n--127824672498\r\n' in content +    assert b'1000\r\n\r\n--127824672498--\r\n' +    assert len(content) == 252 + +    with pytest.raises(ValueError, match=r"boundary found in encoded string"): +        multipart.encode(headers, [("key".encode('utf-8'), "--127824672498".encode('utf-8'))]) + +    boundary = 'boundary茅莽' +    headers = Headers( +        content_type='multipart/form-data; boundary=' + boundary +    ) +    result = multipart.encode(headers, data) +    assert result == b'' diff --git a/test/mitmproxy/net/http/test_request.py b/test/mitmproxy/net/http/test_request.py index ef581a91..71d5c7a1 100644 --- a/test/mitmproxy/net/http/test_request.py +++ b/test/mitmproxy/net/http/test_request.py @@ -371,6 +371,7 @@ class TestRequestUtils:              assert list(request.multipart_form.items()) == []      def test_set_multipart_form(self): -        request = treq(content=b"foobar") -        with pytest.raises(NotImplementedError): -            request.multipart_form = "foobar" +        request = treq() +        request.multipart_form = [("file", "shell.jpg"), ("file_size", "1000")] +        assert request.headers["Content-Type"] == 'multipart/form-data' +        assert request.content is None | 
