diff options
| -rw-r--r-- | mitmproxy/addons/core.py | 27 | ||||
| -rw-r--r-- | mitmproxy/addons/cut.py | 4 | ||||
| -rw-r--r-- | mitmproxy/addons/export.py | 2 | ||||
| -rw-r--r-- | mitmproxy/addons/save.py | 7 | ||||
| -rw-r--r-- | mitmproxy/addons/view.py | 26 | ||||
| -rw-r--r-- | mitmproxy/command.py | 52 | ||||
| -rw-r--r-- | mitmproxy/platform/pf.py | 4 | ||||
| -rw-r--r-- | mitmproxy/tools/console/consoleaddons.py | 44 | ||||
| -rw-r--r-- | mitmproxy/tools/console/defaultkeys.py | 14 | ||||
| -rw-r--r-- | mitmproxy/tools/web/master.py | 2 | ||||
| -rw-r--r-- | test/mitmproxy/addons/test_core.py | 6 | ||||
| -rw-r--r-- | test/mitmproxy/net/test_tcp.py | 9 | ||||
| -rw-r--r-- | test/mitmproxy/platform/test_pf.py | 1 | ||||
| -rw-r--r-- | test/mitmproxy/test_command.py | 40 |
14 files changed, 170 insertions, 68 deletions
diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py index 33d67279..4191d490 100644 --- a/mitmproxy/addons/core.py +++ b/mitmproxy/addons/core.py @@ -96,19 +96,16 @@ class Core: ] @command.command("flow.set") + @command.argument("spec", type=command.Choice("flow.set.options")) def flow_set( self, - flows: typing.Sequence[flow.Flow], spec: str, sval: str + flows: typing.Sequence[flow.Flow], + spec: str, + sval: str ) -> None: """ Quickly set a number of common values on flows. """ - opts = self.flow_set_options() - if spec not in opts: - raise exceptions.CommandError( - "Set spec must be one of: %s." % ", ".join(opts) - ) - val = sval # type: typing.Union[int, str] if spec == "status_code": try: @@ -190,13 +187,16 @@ class Core: ctx.log.alert("Toggled encoding on %s flows." % len(updated)) @command.command("flow.encode") - def encode(self, flows: typing.Sequence[flow.Flow], part: str, enc: str) -> None: + @command.argument("enc", type=command.Choice("flow.encode.options")) + def encode( + self, + flows: typing.Sequence[flow.Flow], + part: str, + enc: str, + ) -> None: """ Encode flows with a specified encoding. """ - if enc not in self.encode_options(): - raise exceptions.CommandError("Invalid encoding format: %s" % enc) - updated = [] for f in flows: p = getattr(f, part, None) @@ -212,12 +212,11 @@ class Core: def encode_options(self) -> typing.Sequence[str]: """ The possible values for an encoding specification. - """ return ["gzip", "deflate", "br"] @command.command("options.load") - def options_load(self, path: str) -> None: + def options_load(self, path: command.Path) -> None: """ Load options from a file. """ @@ -229,7 +228,7 @@ class Core: ) from e @command.command("options.save") - def options_save(self, path: str) -> None: + def options_save(self, path: command.Path) -> None: """ Save options to a file. """ diff --git a/mitmproxy/addons/cut.py b/mitmproxy/addons/cut.py index a4a2107b..5ec4c99e 100644 --- a/mitmproxy/addons/cut.py +++ b/mitmproxy/addons/cut.py @@ -96,7 +96,7 @@ class Cut: return ret @command.command("cut.save") - def save(self, cuts: command.Cuts, path: str) -> None: + def save(self, cuts: command.Cuts, path: command.Path) -> None: """ Save cuts to file. If there are multiple rows or columns, the format is UTF-8 encoded CSV. If there is exactly one row and one column, @@ -107,7 +107,7 @@ class Cut: append = False if path.startswith("+"): append = True - path = path[1:] + path = command.Path(path[1:]) if len(cuts) == 1 and len(cuts[0]) == 1: with open(path, "ab" if append else "wb") as fp: if fp.tell() > 0: diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py index fd0c830e..5388a0e8 100644 --- a/mitmproxy/addons/export.py +++ b/mitmproxy/addons/export.py @@ -49,7 +49,7 @@ class Export(): return list(sorted(formats.keys())) @command.command("export.file") - def file(self, fmt: str, f: flow.Flow, path: str) -> None: + def file(self, fmt: str, f: flow.Flow, path: command.Path) -> None: """ Export a flow to path. """ diff --git a/mitmproxy/addons/save.py b/mitmproxy/addons/save.py index 5e739039..40cd6f82 100644 --- a/mitmproxy/addons/save.py +++ b/mitmproxy/addons/save.py @@ -1,6 +1,7 @@ import os.path import typing +from mitmproxy import command from mitmproxy import exceptions from mitmproxy import flowfilter from mitmproxy import io @@ -48,7 +49,8 @@ class Save: if ctx.options.save_stream_file: self.start_stream_to_path(ctx.options.save_stream_file, self.filt) - def save(self, flows: typing.Sequence[flow.Flow], path: str) -> None: + @command.command("save.file") + def save(self, flows: typing.Sequence[flow.Flow], path: command.Path) -> None: """ Save flows to a file. If the path starts with a +, flows are appended to the file, otherwise it is over-written. @@ -63,9 +65,6 @@ class Save: f.close() ctx.log.alert("Saved %s flows." % len(flows)) - def load(self, l): - l.add_command("save.file", self.save) - def tcp_start(self, flow): if self.stream: self.active_flows.add(flow) diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py index 8ae1f341..e45f2baf 100644 --- a/mitmproxy/addons/view.py +++ b/mitmproxy/addons/view.py @@ -238,7 +238,7 @@ class View(collections.Sequence): @command.command("view.order.options") def order_options(self) -> typing.Sequence[str]: """ - A list of all the orders we support. + Choices supported by the console_order option. """ return list(sorted(self.orders.keys())) @@ -351,13 +351,13 @@ class View(collections.Sequence): ctx.master.addons.trigger("update", updated) @command.command("view.load") - def load_file(self, path: str) -> None: + def load_file(self, path: command.Path) -> None: """ Load flows into the view, without processing them with addons. """ - path = os.path.expanduser(path) + spath = os.path.expanduser(path) try: - with open(path, "rb") as f: + with open(spath, "rb") as f: for i in io.FlowReader(f).stream(): # Do this to get a new ID, so we can load the same file N times and # get new flows each time. It would be more efficient to just have a @@ -406,8 +406,11 @@ class View(collections.Sequence): if f.killable: f.kill() if f in self._view: + # We manually pass the index here because multiple flows may have the same + # sorting key, and we cannot reconstruct the index from that. + idx = self._view.index(f) self._view.remove(f) - self.sig_view_remove.send(self, flow=f) + self.sig_view_remove.send(self, flow=f, index=idx) del self._store[f.id] self.sig_store_remove.send(self, flow=f) if len(flows) > 1: @@ -507,11 +510,12 @@ class View(collections.Sequence): self.sig_view_update.send(self, flow=f) else: try: - self._view.remove(f) - self.sig_view_remove.send(self, flow=f) + idx = self._view.index(f) except ValueError: - # The value was not in the view - pass + pass # The value was not in the view + else: + self._view.remove(f) + self.sig_view_remove.send(self, flow=f, index=idx) class Focus: @@ -554,11 +558,11 @@ class Focus: def _nearest(self, f, v): return min(v._bisect(f), len(v) - 1) - def _sig_view_remove(self, view, flow): + def _sig_view_remove(self, view, flow, index): if len(view) == 0: self.flow = None elif flow is self.flow: - self.flow = view[self._nearest(self.flow, view)] + self.index = min(index, len(self.view) - 1) def _sig_view_refresh(self, view): if len(view) == 0: diff --git a/mitmproxy/command.py b/mitmproxy/command.py index eae3d80c..c4821973 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -2,6 +2,7 @@ This module manges and invokes typed commands. """ import inspect +import types import typing import shlex import textwrap @@ -18,13 +19,17 @@ Cuts = typing.Sequence[ ] +class Path(str): + pass + + def typename(t: type, ret: bool) -> str: """ Translates a type to an explanatory string. If ret is True, we're looking at a return type, else we're looking at a parameter type. """ - if issubclass(t, (str, int, bool)): - return t.__name__ + if isinstance(t, Choice): + return "choice" elif t == typing.Sequence[flow.Flow]: return "[flow]" if ret else "flowspec" elif t == typing.Sequence[str]: @@ -33,6 +38,8 @@ def typename(t: type, ret: bool) -> str: return "[cuts]" if ret else "cutspec" elif t == flow.Flow: return "flow" + elif issubclass(t, (str, int, bool)): + return t.__name__.lower() else: # pragma: no cover raise NotImplementedError(t) @@ -86,11 +93,11 @@ class Command: args = args[:len(self.paramtypes) - 1] pargs = [] - for i in range(len(args)): - if typecheck.check_command_type(args[i], self.paramtypes[i]): - pargs.append(args[i]) + for arg, paramtype in zip(args, self.paramtypes): + if typecheck.check_command_type(arg, paramtype): + pargs.append(arg) else: - pargs.append(parsearg(self.manager, args[i], self.paramtypes[i])) + pargs.append(parsearg(self.manager, arg, paramtype)) if remainder: chk = typecheck.check_command_type( @@ -157,7 +164,15 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any: """ Convert a string to a argument to the appropriate type. """ - if issubclass(argtype, str): + if isinstance(argtype, Choice): + cmd = argtype.options_command + opts = manager.call(cmd) + if spec not in opts: + raise exceptions.CommandError( + "Invalid choice: see %s for options" % cmd + ) + return spec + elif issubclass(argtype, str): return spec elif argtype == bool: if spec == "true": @@ -207,3 +222,26 @@ def command(path): wrapper.__dict__["command_path"] = path return wrapper return decorator + + +class Choice: + def __init__(self, options_command): + self.options_command = options_command + + def __instancecheck__(self, instance): + # return false here so that arguments are piped through parsearg, + # which does extended validation. + return False + + +def argument(name, type): + """ + Set the type of a command argument at runtime. + This is useful for more specific types such as command.Choice, which we cannot annotate + directly as mypy does not like that. + """ + def decorator(f: types.FunctionType) -> types.FunctionType: + assert name in f.__annotations__ + f.__annotations__[name] = type + return f + return decorator diff --git a/mitmproxy/platform/pf.py b/mitmproxy/platform/pf.py index c0397d78..bb5eb515 100644 --- a/mitmproxy/platform/pf.py +++ b/mitmproxy/platform/pf.py @@ -1,3 +1,4 @@ +import re import sys @@ -8,6 +9,9 @@ def lookup(address, port, s): Returns an (address, port) tuple, or None. """ + # We may get an ipv4-mapped ipv6 address here, e.g. ::ffff:127.0.0.1. + # Those still appear as "127.0.0.1" in the table, so we need to strip the prefix. + address = re.sub("^::ffff:(?=\d+.\d+.\d+.\d+$)", "", address) s = s.decode() spec = "%s:%s" % (address, port) for i in s.split("\n"): diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py index 8233d45e..06ee3341 100644 --- a/mitmproxy/tools/console/consoleaddons.py +++ b/mitmproxy/tools/console/consoleaddons.py @@ -111,8 +111,7 @@ class ConsoleAddon: @command.command("console.layout.options") def layout_options(self) -> typing.Sequence[str]: """ - Returns the valid options for console layout. Use these by setting - the console_layout option. + Returns the available options for the consoler_layout option. """ return ["single", "vertical", "horizontal"] @@ -340,6 +339,9 @@ class ConsoleAddon: @command.command("console.edit.focus.options") def edit_focus_options(self) -> typing.Sequence[str]: + """ + Possible components for console.edit.focus. + """ return [ "cookies", "form", @@ -355,9 +357,10 @@ class ConsoleAddon: ] @command.command("console.edit.focus") + @command.argument("part", type=command.Choice("console.edit.focus.options")) def edit_focus(self, part: str) -> None: """ - Edit the query of the current focus. + Edit a component of the currently focused flow. """ if part == "cookies": self.master.switch_view("edit_focus_cookies") @@ -406,14 +409,14 @@ class ConsoleAddon: self._grideditor().cmd_delete() @command.command("console.grideditor.readfile") - def grideditor_readfile(self, path: str) -> None: + def grideditor_readfile(self, path: command.Path) -> None: """ Read a file into the currrent cell. """ self._grideditor().cmd_read_file(path) @command.command("console.grideditor.readfile_escaped") - def grideditor_readfile_escaped(self, path: str) -> None: + def grideditor_readfile_escaped(self, path: command.Path) -> None: """ Read a file containing a Python-style escaped stringinto the currrent cell. @@ -428,26 +431,33 @@ class ConsoleAddon: self._grideditor().cmd_spawn_editor() @command.command("console.flowview.mode.set") - def flowview_mode_set(self) -> None: + @command.argument("mode", type=command.Choice("console.flowview.mode.options")) + def flowview_mode_set(self, mode: str) -> None: """ Set the display mode for the current flow view. """ - fv = self.master.window.current("flowview") + fv = self.master.window.current_window("flowview") if not fv: raise exceptions.CommandError("Not viewing a flow.") idx = fv.body.tab_offset - def callback(opt): - try: - self.master.commands.call_args( - "view.setval", - ["@focus", "flowview_mode_%s" % idx, opt] - ) - except exceptions.CommandError as e: - signals.status_message.send(message=str(e)) + if mode not in [i.name.lower() for i in contentviews.views]: + raise exceptions.CommandError("Invalid flowview mode.") + + try: + self.master.commands.call_args( + "view.setval", + ["@focus", "flowview_mode_%s" % idx, mode] + ) + except exceptions.CommandError as e: + signals.status_message.send(message=str(e)) - opts = [i.name.lower() for i in contentviews.views] - self.master.overlay(overlay.Chooser(self.master, "Mode", opts, "", callback)) + @command.command("console.flowview.mode.options") + def flowview_mode_options(self) -> typing.Sequence[str]: + """ + Returns the valid options for the flowview mode. + """ + return [i.name.lower() for i in contentviews.views] @command.command("console.flowview.mode") def flowview_mode(self) -> str: diff --git a/mitmproxy/tools/console/defaultkeys.py b/mitmproxy/tools/console/defaultkeys.py index 8c28524a..c4a44aca 100644 --- a/mitmproxy/tools/console/defaultkeys.py +++ b/mitmproxy/tools/console/defaultkeys.py @@ -2,8 +2,8 @@ def map(km): km.add(":", "console.command ", ["global"], "Command prompt") km.add("?", "console.view.help", ["global"], "View help") - km.add("B", "browser.start", ["global"], "View commands") - km.add("C", "console.view.commands", ["global"], "Start an attached browser") + km.add("B", "browser.start", ["global"], "Start an attached browser") + km.add("C", "console.view.commands", ["global"], "View commands") km.add("K", "console.view.keybindings", ["global"], "View key bindings") km.add("O", "console.view.options", ["global"], "View options") km.add("E", "console.view.eventlog", ["global"], "View event log") @@ -116,7 +116,15 @@ def map(km): "View flow body in an external viewer" ) km.add("p", "view.focus.prev", ["flowview"], "Go to previous flow") - km.add("m", "console.flowview.mode.set", ["flowview"], "Set flow view mode") + km.add( + "m", + """ + console.choose.cmd Mode console.flowview.mode.options + console.flowview.mode.set {choice} + """, + ["flowview"], + "Set flow view mode" + ) km.add( "z", """ diff --git a/mitmproxy/tools/web/master.py b/mitmproxy/tools/web/master.py index 694ee2f7..4c597f0e 100644 --- a/mitmproxy/tools/web/master.py +++ b/mitmproxy/tools/web/master.py @@ -60,7 +60,7 @@ class WebMaster(master.Master): data=app.flow_to_json(flow) ) - def _sig_view_remove(self, view, flow): + def _sig_view_remove(self, view, flow, index): app.ClientConnection.broadcast( resource="flows", cmd="remove", diff --git a/test/mitmproxy/addons/test_core.py b/test/mitmproxy/addons/test_core.py index c132d80a..5aa4ef37 100644 --- a/test/mitmproxy/addons/test_core.py +++ b/test/mitmproxy/addons/test_core.py @@ -69,9 +69,6 @@ def test_flow_set(): f = tflow.tflow(resp=True) assert sa.flow_set_options() - with pytest.raises(exceptions.CommandError): - sa.flow_set([f], "flibble", "post") - assert f.request.method != "post" sa.flow_set([f], "method", "post") assert f.request.method == "POST" @@ -126,9 +123,6 @@ def test_encoding(): sa.encode_toggle([f], "request") assert "content-encoding" not in f.request.headers - with pytest.raises(exceptions.CommandError): - sa.encode([f], "request", "invalid") - def test_options(tmpdir): p = str(tmpdir.join("path")) diff --git a/test/mitmproxy/net/test_tcp.py b/test/mitmproxy/net/test_tcp.py index 3e27929d..e9084be4 100644 --- a/test/mitmproxy/net/test_tcp.py +++ b/test/mitmproxy/net/test_tcp.py @@ -1,4 +1,5 @@ from io import BytesIO +import re import queue import time import socket @@ -95,7 +96,13 @@ class TestServerBind(tservers.ServerTestBase): class handler(tcp.BaseHandler): def handle(self): - self.wfile.write(str(self.connection.getpeername()).encode()) + # We may get an ipv4-mapped ipv6 address here, e.g. ::ffff:127.0.0.1. + # Those still appear as "127.0.0.1" in the table, so we need to strip the prefix. + peername = self.connection.getpeername() + address = re.sub("^::ffff:(?=\d+.\d+.\d+.\d+$)", "", peername[0]) + port = peername[1] + + self.wfile.write(str((address, port)).encode()) self.wfile.flush() def test_bind(self): diff --git a/test/mitmproxy/platform/test_pf.py b/test/mitmproxy/platform/test_pf.py index 3292d345..b048a697 100644 --- a/test/mitmproxy/platform/test_pf.py +++ b/test/mitmproxy/platform/test_pf.py @@ -15,6 +15,7 @@ class TestLookup: d = f.read() assert pf.lookup("192.168.1.111", 40000, d) == ("5.5.5.5", 80) + assert pf.lookup("::ffff:192.168.1.111", 40000, d) == ("5.5.5.5", 80) with pytest.raises(Exception, match="Could not resolve original destination"): pf.lookup("192.168.1.112", 40000, d) with pytest.raises(Exception, match="Could not resolve original destination"): diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py index 43b97742..e1879ba2 100644 --- a/test/mitmproxy/test_command.py +++ b/test/mitmproxy/test_command.py @@ -7,6 +7,8 @@ from mitmproxy.test import taddons import io import pytest +from mitmproxy.utils import typecheck + class TAddon: def cmd1(self, foo: str) -> str: @@ -25,6 +27,16 @@ class TAddon: def varargs(self, one: str, *var: str) -> typing.Sequence[str]: return list(var) + def choices(self) -> typing.Sequence[str]: + return ["one", "two", "three"] + + @command.argument("arg", type=command.Choice("choices")) + def choose(self, arg: str) -> typing.Sequence[str]: + return ["one", "two", "three"] + + def path(self, arg: command.Path) -> None: + pass + class TestCommand: def test_varargs(self): @@ -86,6 +98,9 @@ def test_typename(): assert command.typename(flow.Flow, False) == "flow" assert command.typename(typing.Sequence[str], False) == "[str]" + assert command.typename(command.Choice("foo"), False) == "choice" + assert command.typename(command.Path, False) == "path" + class DummyConsole: @command.command("view.resolve") @@ -134,6 +149,20 @@ def test_parsearg(): tctx.master.commands, "foo, bar", typing.Sequence[str] ) == ["foo", "bar"] + a = TAddon() + tctx.master.commands.add("choices", a.choices) + assert command.parsearg( + tctx.master.commands, "one", command.Choice("choices"), + ) == "one" + with pytest.raises(exceptions.CommandError): + assert command.parsearg( + tctx.master.commands, "invalid", command.Choice("choices"), + ) + + assert command.parsearg( + tctx.master.commands, "foo", command.Path + ) == "foo" + class TDec: @command.command("cmd1") @@ -169,4 +198,13 @@ def test_verify_arg_signature(): with pytest.raises(exceptions.CommandError): command.verify_arg_signature(lambda: None, [1, 2], {}) print('hello there') - command.verify_arg_signature(lambda a, b: None, [1, 2], {})
\ No newline at end of file + command.verify_arg_signature(lambda a, b: None, [1, 2], {}) + + +def test_choice(): + """ + basic typechecking for choices should fail as we cannot verify if strings are a valid choice + at this point. + """ + c = command.Choice("foo") + assert not typecheck.check_command_type("foo", c) |
