aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mitmproxy/addons/core.py27
-rw-r--r--mitmproxy/addons/cut.py4
-rw-r--r--mitmproxy/addons/export.py2
-rw-r--r--mitmproxy/addons/save.py7
-rw-r--r--mitmproxy/addons/view.py26
-rw-r--r--mitmproxy/command.py52
-rw-r--r--mitmproxy/platform/pf.py4
-rw-r--r--mitmproxy/tools/console/consoleaddons.py44
-rw-r--r--mitmproxy/tools/console/defaultkeys.py14
-rw-r--r--mitmproxy/tools/web/master.py2
-rw-r--r--test/mitmproxy/addons/test_core.py6
-rw-r--r--test/mitmproxy/net/test_tcp.py9
-rw-r--r--test/mitmproxy/platform/test_pf.py1
-rw-r--r--test/mitmproxy/test_command.py40
14 files changed, 170 insertions, 68 deletions
diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py
index 33d67279..4191d490 100644
--- a/mitmproxy/addons/core.py
+++ b/mitmproxy/addons/core.py
@@ -96,19 +96,16 @@ class Core:
]
@command.command("flow.set")
+ @command.argument("spec", type=command.Choice("flow.set.options"))
def flow_set(
self,
- flows: typing.Sequence[flow.Flow], spec: str, sval: str
+ flows: typing.Sequence[flow.Flow],
+ spec: str,
+ sval: str
) -> None:
"""
Quickly set a number of common values on flows.
"""
- opts = self.flow_set_options()
- if spec not in opts:
- raise exceptions.CommandError(
- "Set spec must be one of: %s." % ", ".join(opts)
- )
-
val = sval # type: typing.Union[int, str]
if spec == "status_code":
try:
@@ -190,13 +187,16 @@ class Core:
ctx.log.alert("Toggled encoding on %s flows." % len(updated))
@command.command("flow.encode")
- def encode(self, flows: typing.Sequence[flow.Flow], part: str, enc: str) -> None:
+ @command.argument("enc", type=command.Choice("flow.encode.options"))
+ def encode(
+ self,
+ flows: typing.Sequence[flow.Flow],
+ part: str,
+ enc: str,
+ ) -> None:
"""
Encode flows with a specified encoding.
"""
- if enc not in self.encode_options():
- raise exceptions.CommandError("Invalid encoding format: %s" % enc)
-
updated = []
for f in flows:
p = getattr(f, part, None)
@@ -212,12 +212,11 @@ class Core:
def encode_options(self) -> typing.Sequence[str]:
"""
The possible values for an encoding specification.
-
"""
return ["gzip", "deflate", "br"]
@command.command("options.load")
- def options_load(self, path: str) -> None:
+ def options_load(self, path: command.Path) -> None:
"""
Load options from a file.
"""
@@ -229,7 +228,7 @@ class Core:
) from e
@command.command("options.save")
- def options_save(self, path: str) -> None:
+ def options_save(self, path: command.Path) -> None:
"""
Save options to a file.
"""
diff --git a/mitmproxy/addons/cut.py b/mitmproxy/addons/cut.py
index a4a2107b..5ec4c99e 100644
--- a/mitmproxy/addons/cut.py
+++ b/mitmproxy/addons/cut.py
@@ -96,7 +96,7 @@ class Cut:
return ret
@command.command("cut.save")
- def save(self, cuts: command.Cuts, path: str) -> None:
+ def save(self, cuts: command.Cuts, path: command.Path) -> None:
"""
Save cuts to file. If there are multiple rows or columns, the format
is UTF-8 encoded CSV. If there is exactly one row and one column,
@@ -107,7 +107,7 @@ class Cut:
append = False
if path.startswith("+"):
append = True
- path = path[1:]
+ path = command.Path(path[1:])
if len(cuts) == 1 and len(cuts[0]) == 1:
with open(path, "ab" if append else "wb") as fp:
if fp.tell() > 0:
diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py
index fd0c830e..5388a0e8 100644
--- a/mitmproxy/addons/export.py
+++ b/mitmproxy/addons/export.py
@@ -49,7 +49,7 @@ class Export():
return list(sorted(formats.keys()))
@command.command("export.file")
- def file(self, fmt: str, f: flow.Flow, path: str) -> None:
+ def file(self, fmt: str, f: flow.Flow, path: command.Path) -> None:
"""
Export a flow to path.
"""
diff --git a/mitmproxy/addons/save.py b/mitmproxy/addons/save.py
index 5e739039..40cd6f82 100644
--- a/mitmproxy/addons/save.py
+++ b/mitmproxy/addons/save.py
@@ -1,6 +1,7 @@
import os.path
import typing
+from mitmproxy import command
from mitmproxy import exceptions
from mitmproxy import flowfilter
from mitmproxy import io
@@ -48,7 +49,8 @@ class Save:
if ctx.options.save_stream_file:
self.start_stream_to_path(ctx.options.save_stream_file, self.filt)
- def save(self, flows: typing.Sequence[flow.Flow], path: str) -> None:
+ @command.command("save.file")
+ def save(self, flows: typing.Sequence[flow.Flow], path: command.Path) -> None:
"""
Save flows to a file. If the path starts with a +, flows are
appended to the file, otherwise it is over-written.
@@ -63,9 +65,6 @@ class Save:
f.close()
ctx.log.alert("Saved %s flows." % len(flows))
- def load(self, l):
- l.add_command("save.file", self.save)
-
def tcp_start(self, flow):
if self.stream:
self.active_flows.add(flow)
diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py
index 8ae1f341..e45f2baf 100644
--- a/mitmproxy/addons/view.py
+++ b/mitmproxy/addons/view.py
@@ -238,7 +238,7 @@ class View(collections.Sequence):
@command.command("view.order.options")
def order_options(self) -> typing.Sequence[str]:
"""
- A list of all the orders we support.
+ Choices supported by the console_order option.
"""
return list(sorted(self.orders.keys()))
@@ -351,13 +351,13 @@ class View(collections.Sequence):
ctx.master.addons.trigger("update", updated)
@command.command("view.load")
- def load_file(self, path: str) -> None:
+ def load_file(self, path: command.Path) -> None:
"""
Load flows into the view, without processing them with addons.
"""
- path = os.path.expanduser(path)
+ spath = os.path.expanduser(path)
try:
- with open(path, "rb") as f:
+ with open(spath, "rb") as f:
for i in io.FlowReader(f).stream():
# Do this to get a new ID, so we can load the same file N times and
# get new flows each time. It would be more efficient to just have a
@@ -406,8 +406,11 @@ class View(collections.Sequence):
if f.killable:
f.kill()
if f in self._view:
+ # We manually pass the index here because multiple flows may have the same
+ # sorting key, and we cannot reconstruct the index from that.
+ idx = self._view.index(f)
self._view.remove(f)
- self.sig_view_remove.send(self, flow=f)
+ self.sig_view_remove.send(self, flow=f, index=idx)
del self._store[f.id]
self.sig_store_remove.send(self, flow=f)
if len(flows) > 1:
@@ -507,11 +510,12 @@ class View(collections.Sequence):
self.sig_view_update.send(self, flow=f)
else:
try:
- self._view.remove(f)
- self.sig_view_remove.send(self, flow=f)
+ idx = self._view.index(f)
except ValueError:
- # The value was not in the view
- pass
+ pass # The value was not in the view
+ else:
+ self._view.remove(f)
+ self.sig_view_remove.send(self, flow=f, index=idx)
class Focus:
@@ -554,11 +558,11 @@ class Focus:
def _nearest(self, f, v):
return min(v._bisect(f), len(v) - 1)
- def _sig_view_remove(self, view, flow):
+ def _sig_view_remove(self, view, flow, index):
if len(view) == 0:
self.flow = None
elif flow is self.flow:
- self.flow = view[self._nearest(self.flow, view)]
+ self.index = min(index, len(self.view) - 1)
def _sig_view_refresh(self, view):
if len(view) == 0:
diff --git a/mitmproxy/command.py b/mitmproxy/command.py
index eae3d80c..c4821973 100644
--- a/mitmproxy/command.py
+++ b/mitmproxy/command.py
@@ -2,6 +2,7 @@
This module manges and invokes typed commands.
"""
import inspect
+import types
import typing
import shlex
import textwrap
@@ -18,13 +19,17 @@ Cuts = typing.Sequence[
]
+class Path(str):
+ pass
+
+
def typename(t: type, ret: bool) -> str:
"""
Translates a type to an explanatory string. If ret is True, we're
looking at a return type, else we're looking at a parameter type.
"""
- if issubclass(t, (str, int, bool)):
- return t.__name__
+ if isinstance(t, Choice):
+ return "choice"
elif t == typing.Sequence[flow.Flow]:
return "[flow]" if ret else "flowspec"
elif t == typing.Sequence[str]:
@@ -33,6 +38,8 @@ def typename(t: type, ret: bool) -> str:
return "[cuts]" if ret else "cutspec"
elif t == flow.Flow:
return "flow"
+ elif issubclass(t, (str, int, bool)):
+ return t.__name__.lower()
else: # pragma: no cover
raise NotImplementedError(t)
@@ -86,11 +93,11 @@ class Command:
args = args[:len(self.paramtypes) - 1]
pargs = []
- for i in range(len(args)):
- if typecheck.check_command_type(args[i], self.paramtypes[i]):
- pargs.append(args[i])
+ for arg, paramtype in zip(args, self.paramtypes):
+ if typecheck.check_command_type(arg, paramtype):
+ pargs.append(arg)
else:
- pargs.append(parsearg(self.manager, args[i], self.paramtypes[i]))
+ pargs.append(parsearg(self.manager, arg, paramtype))
if remainder:
chk = typecheck.check_command_type(
@@ -157,7 +164,15 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"""
Convert a string to a argument to the appropriate type.
"""
- if issubclass(argtype, str):
+ if isinstance(argtype, Choice):
+ cmd = argtype.options_command
+ opts = manager.call(cmd)
+ if spec not in opts:
+ raise exceptions.CommandError(
+ "Invalid choice: see %s for options" % cmd
+ )
+ return spec
+ elif issubclass(argtype, str):
return spec
elif argtype == bool:
if spec == "true":
@@ -207,3 +222,26 @@ def command(path):
wrapper.__dict__["command_path"] = path
return wrapper
return decorator
+
+
+class Choice:
+ def __init__(self, options_command):
+ self.options_command = options_command
+
+ def __instancecheck__(self, instance):
+ # return false here so that arguments are piped through parsearg,
+ # which does extended validation.
+ return False
+
+
+def argument(name, type):
+ """
+ Set the type of a command argument at runtime.
+ This is useful for more specific types such as command.Choice, which we cannot annotate
+ directly as mypy does not like that.
+ """
+ def decorator(f: types.FunctionType) -> types.FunctionType:
+ assert name in f.__annotations__
+ f.__annotations__[name] = type
+ return f
+ return decorator
diff --git a/mitmproxy/platform/pf.py b/mitmproxy/platform/pf.py
index c0397d78..bb5eb515 100644
--- a/mitmproxy/platform/pf.py
+++ b/mitmproxy/platform/pf.py
@@ -1,3 +1,4 @@
+import re
import sys
@@ -8,6 +9,9 @@ def lookup(address, port, s):
Returns an (address, port) tuple, or None.
"""
+ # We may get an ipv4-mapped ipv6 address here, e.g. ::ffff:127.0.0.1.
+ # Those still appear as "127.0.0.1" in the table, so we need to strip the prefix.
+ address = re.sub("^::ffff:(?=\d+.\d+.\d+.\d+$)", "", address)
s = s.decode()
spec = "%s:%s" % (address, port)
for i in s.split("\n"):
diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py
index 8233d45e..06ee3341 100644
--- a/mitmproxy/tools/console/consoleaddons.py
+++ b/mitmproxy/tools/console/consoleaddons.py
@@ -111,8 +111,7 @@ class ConsoleAddon:
@command.command("console.layout.options")
def layout_options(self) -> typing.Sequence[str]:
"""
- Returns the valid options for console layout. Use these by setting
- the console_layout option.
+ Returns the available options for the consoler_layout option.
"""
return ["single", "vertical", "horizontal"]
@@ -340,6 +339,9 @@ class ConsoleAddon:
@command.command("console.edit.focus.options")
def edit_focus_options(self) -> typing.Sequence[str]:
+ """
+ Possible components for console.edit.focus.
+ """
return [
"cookies",
"form",
@@ -355,9 +357,10 @@ class ConsoleAddon:
]
@command.command("console.edit.focus")
+ @command.argument("part", type=command.Choice("console.edit.focus.options"))
def edit_focus(self, part: str) -> None:
"""
- Edit the query of the current focus.
+ Edit a component of the currently focused flow.
"""
if part == "cookies":
self.master.switch_view("edit_focus_cookies")
@@ -406,14 +409,14 @@ class ConsoleAddon:
self._grideditor().cmd_delete()
@command.command("console.grideditor.readfile")
- def grideditor_readfile(self, path: str) -> None:
+ def grideditor_readfile(self, path: command.Path) -> None:
"""
Read a file into the currrent cell.
"""
self._grideditor().cmd_read_file(path)
@command.command("console.grideditor.readfile_escaped")
- def grideditor_readfile_escaped(self, path: str) -> None:
+ def grideditor_readfile_escaped(self, path: command.Path) -> None:
"""
Read a file containing a Python-style escaped stringinto the
currrent cell.
@@ -428,26 +431,33 @@ class ConsoleAddon:
self._grideditor().cmd_spawn_editor()
@command.command("console.flowview.mode.set")
- def flowview_mode_set(self) -> None:
+ @command.argument("mode", type=command.Choice("console.flowview.mode.options"))
+ def flowview_mode_set(self, mode: str) -> None:
"""
Set the display mode for the current flow view.
"""
- fv = self.master.window.current("flowview")
+ fv = self.master.window.current_window("flowview")
if not fv:
raise exceptions.CommandError("Not viewing a flow.")
idx = fv.body.tab_offset
- def callback(opt):
- try:
- self.master.commands.call_args(
- "view.setval",
- ["@focus", "flowview_mode_%s" % idx, opt]
- )
- except exceptions.CommandError as e:
- signals.status_message.send(message=str(e))
+ if mode not in [i.name.lower() for i in contentviews.views]:
+ raise exceptions.CommandError("Invalid flowview mode.")
+
+ try:
+ self.master.commands.call_args(
+ "view.setval",
+ ["@focus", "flowview_mode_%s" % idx, mode]
+ )
+ except exceptions.CommandError as e:
+ signals.status_message.send(message=str(e))
- opts = [i.name.lower() for i in contentviews.views]
- self.master.overlay(overlay.Chooser(self.master, "Mode", opts, "", callback))
+ @command.command("console.flowview.mode.options")
+ def flowview_mode_options(self) -> typing.Sequence[str]:
+ """
+ Returns the valid options for the flowview mode.
+ """
+ return [i.name.lower() for i in contentviews.views]
@command.command("console.flowview.mode")
def flowview_mode(self) -> str:
diff --git a/mitmproxy/tools/console/defaultkeys.py b/mitmproxy/tools/console/defaultkeys.py
index 8c28524a..c4a44aca 100644
--- a/mitmproxy/tools/console/defaultkeys.py
+++ b/mitmproxy/tools/console/defaultkeys.py
@@ -2,8 +2,8 @@
def map(km):
km.add(":", "console.command ", ["global"], "Command prompt")
km.add("?", "console.view.help", ["global"], "View help")
- km.add("B", "browser.start", ["global"], "View commands")
- km.add("C", "console.view.commands", ["global"], "Start an attached browser")
+ km.add("B", "browser.start", ["global"], "Start an attached browser")
+ km.add("C", "console.view.commands", ["global"], "View commands")
km.add("K", "console.view.keybindings", ["global"], "View key bindings")
km.add("O", "console.view.options", ["global"], "View options")
km.add("E", "console.view.eventlog", ["global"], "View event log")
@@ -116,7 +116,15 @@ def map(km):
"View flow body in an external viewer"
)
km.add("p", "view.focus.prev", ["flowview"], "Go to previous flow")
- km.add("m", "console.flowview.mode.set", ["flowview"], "Set flow view mode")
+ km.add(
+ "m",
+ """
+ console.choose.cmd Mode console.flowview.mode.options
+ console.flowview.mode.set {choice}
+ """,
+ ["flowview"],
+ "Set flow view mode"
+ )
km.add(
"z",
"""
diff --git a/mitmproxy/tools/web/master.py b/mitmproxy/tools/web/master.py
index 694ee2f7..4c597f0e 100644
--- a/mitmproxy/tools/web/master.py
+++ b/mitmproxy/tools/web/master.py
@@ -60,7 +60,7 @@ class WebMaster(master.Master):
data=app.flow_to_json(flow)
)
- def _sig_view_remove(self, view, flow):
+ def _sig_view_remove(self, view, flow, index):
app.ClientConnection.broadcast(
resource="flows",
cmd="remove",
diff --git a/test/mitmproxy/addons/test_core.py b/test/mitmproxy/addons/test_core.py
index c132d80a..5aa4ef37 100644
--- a/test/mitmproxy/addons/test_core.py
+++ b/test/mitmproxy/addons/test_core.py
@@ -69,9 +69,6 @@ def test_flow_set():
f = tflow.tflow(resp=True)
assert sa.flow_set_options()
- with pytest.raises(exceptions.CommandError):
- sa.flow_set([f], "flibble", "post")
-
assert f.request.method != "post"
sa.flow_set([f], "method", "post")
assert f.request.method == "POST"
@@ -126,9 +123,6 @@ def test_encoding():
sa.encode_toggle([f], "request")
assert "content-encoding" not in f.request.headers
- with pytest.raises(exceptions.CommandError):
- sa.encode([f], "request", "invalid")
-
def test_options(tmpdir):
p = str(tmpdir.join("path"))
diff --git a/test/mitmproxy/net/test_tcp.py b/test/mitmproxy/net/test_tcp.py
index 3e27929d..e9084be4 100644
--- a/test/mitmproxy/net/test_tcp.py
+++ b/test/mitmproxy/net/test_tcp.py
@@ -1,4 +1,5 @@
from io import BytesIO
+import re
import queue
import time
import socket
@@ -95,7 +96,13 @@ class TestServerBind(tservers.ServerTestBase):
class handler(tcp.BaseHandler):
def handle(self):
- self.wfile.write(str(self.connection.getpeername()).encode())
+ # We may get an ipv4-mapped ipv6 address here, e.g. ::ffff:127.0.0.1.
+ # Those still appear as "127.0.0.1" in the table, so we need to strip the prefix.
+ peername = self.connection.getpeername()
+ address = re.sub("^::ffff:(?=\d+.\d+.\d+.\d+$)", "", peername[0])
+ port = peername[1]
+
+ self.wfile.write(str((address, port)).encode())
self.wfile.flush()
def test_bind(self):
diff --git a/test/mitmproxy/platform/test_pf.py b/test/mitmproxy/platform/test_pf.py
index 3292d345..b048a697 100644
--- a/test/mitmproxy/platform/test_pf.py
+++ b/test/mitmproxy/platform/test_pf.py
@@ -15,6 +15,7 @@ class TestLookup:
d = f.read()
assert pf.lookup("192.168.1.111", 40000, d) == ("5.5.5.5", 80)
+ assert pf.lookup("::ffff:192.168.1.111", 40000, d) == ("5.5.5.5", 80)
with pytest.raises(Exception, match="Could not resolve original destination"):
pf.lookup("192.168.1.112", 40000, d)
with pytest.raises(Exception, match="Could not resolve original destination"):
diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py
index 43b97742..e1879ba2 100644
--- a/test/mitmproxy/test_command.py
+++ b/test/mitmproxy/test_command.py
@@ -7,6 +7,8 @@ from mitmproxy.test import taddons
import io
import pytest
+from mitmproxy.utils import typecheck
+
class TAddon:
def cmd1(self, foo: str) -> str:
@@ -25,6 +27,16 @@ class TAddon:
def varargs(self, one: str, *var: str) -> typing.Sequence[str]:
return list(var)
+ def choices(self) -> typing.Sequence[str]:
+ return ["one", "two", "three"]
+
+ @command.argument("arg", type=command.Choice("choices"))
+ def choose(self, arg: str) -> typing.Sequence[str]:
+ return ["one", "two", "three"]
+
+ def path(self, arg: command.Path) -> None:
+ pass
+
class TestCommand:
def test_varargs(self):
@@ -86,6 +98,9 @@ def test_typename():
assert command.typename(flow.Flow, False) == "flow"
assert command.typename(typing.Sequence[str], False) == "[str]"
+ assert command.typename(command.Choice("foo"), False) == "choice"
+ assert command.typename(command.Path, False) == "path"
+
class DummyConsole:
@command.command("view.resolve")
@@ -134,6 +149,20 @@ def test_parsearg():
tctx.master.commands, "foo, bar", typing.Sequence[str]
) == ["foo", "bar"]
+ a = TAddon()
+ tctx.master.commands.add("choices", a.choices)
+ assert command.parsearg(
+ tctx.master.commands, "one", command.Choice("choices"),
+ ) == "one"
+ with pytest.raises(exceptions.CommandError):
+ assert command.parsearg(
+ tctx.master.commands, "invalid", command.Choice("choices"),
+ )
+
+ assert command.parsearg(
+ tctx.master.commands, "foo", command.Path
+ ) == "foo"
+
class TDec:
@command.command("cmd1")
@@ -169,4 +198,13 @@ def test_verify_arg_signature():
with pytest.raises(exceptions.CommandError):
command.verify_arg_signature(lambda: None, [1, 2], {})
print('hello there')
- command.verify_arg_signature(lambda a, b: None, [1, 2], {}) \ No newline at end of file
+ command.verify_arg_signature(lambda a, b: None, [1, 2], {})
+
+
+def test_choice():
+ """
+ basic typechecking for choices should fail as we cannot verify if strings are a valid choice
+ at this point.
+ """
+ c = command.Choice("foo")
+ assert not typecheck.check_command_type("foo", c)