aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mitmproxy/command.py44
-rw-r--r--mitmproxy/tools/console/flowlist.py5
-rw-r--r--mitmproxy/tools/console/master.py38
-rw-r--r--mitmproxy/utils/typecheck.py6
-rw-r--r--test/mitmproxy/test_command.py53
-rw-r--r--test/mitmproxy/utils/test_typecheck.py23
6 files changed, 122 insertions, 47 deletions
diff --git a/mitmproxy/command.py b/mitmproxy/command.py
index 031d2cae..82b8fae4 100644
--- a/mitmproxy/command.py
+++ b/mitmproxy/command.py
@@ -1,3 +1,6 @@
+"""
+ This module manges and invokes typed commands.
+"""
import inspect
import typing
import shlex
@@ -17,10 +20,10 @@ Cuts = typing.Sequence[
def typename(t: type, ret: bool) -> str:
"""
- Translates a type to an explanatory string. Ifl ret is True, we're
+ Translates a type to an explanatory string. If ret is True, we're
looking at a return type, else we're looking at a parameter type.
"""
- if t in (str, int, bool):
+ if issubclass(t, (str, int, bool)):
return t.__name__
elif t == typing.Sequence[flow.Flow]:
return "[flow]" if ret else "flowspec"
@@ -44,11 +47,20 @@ class Command:
if func.__doc__:
txt = func.__doc__.strip()
self.help = "\n".join(textwrap.wrap(txt))
+
+ self.has_positional = False
+ for i in sig.parameters.values():
+ # This is the kind for *args paramters
+ if i.kind == i.VAR_POSITIONAL:
+ self.has_positional = True
self.paramtypes = [v.annotation for v in sig.parameters.values()]
self.returntype = sig.return_annotation
def paramnames(self) -> typing.Sequence[str]:
- return [typename(i, False) for i in self.paramtypes]
+ v = [typename(i, False) for i in self.paramtypes]
+ if self.has_positional:
+ v[-1] = "*" + v[-1][1:-1]
+ return v
def retname(self) -> str:
return typename(self.returntype, True) if self.returntype else ""
@@ -64,17 +76,31 @@ class Command:
"""
Call the command with a set of arguments. At this point, all argumets are strings.
"""
- if len(self.paramtypes) != len(args):
+ if not self.has_positional and (len(self.paramtypes) != len(args)):
raise exceptions.CommandError("Usage: %s" % self.signature_help())
+ remainder = [] # type: typing.Sequence[str]
+ if self.has_positional:
+ remainder = args[len(self.paramtypes) - 1:]
+ args = args[:len(self.paramtypes) - 1]
+
pargs = []
for i in range(len(args)):
- pargs.append(parsearg(self.manager, args[i], self.paramtypes[i]))
+ if typecheck.check_command_type(args[i], self.paramtypes[i]):
+ pargs.append(args[i])
+ else:
+ pargs.append(parsearg(self.manager, args[i], self.paramtypes[i]))
+
+ if remainder:
+ if typecheck.check_command_type(remainder, self.paramtypes[-1]):
+ pargs.extend(remainder)
+ else:
+ raise exceptions.CommandError("Invalid value type.")
with self.manager.master.handlecontext():
ret = self.func(*pargs)
- if not typecheck.check_command_return_type(ret, self.returntype):
+ if not typecheck.check_command_type(ret, self.returntype):
raise exceptions.CommandError("Command returned unexpected data")
return ret
@@ -126,7 +152,7 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"""
Convert a string to a argument to the appropriate type.
"""
- if argtype == str:
+ if issubclass(argtype, str):
return spec
elif argtype == bool:
if spec == "true":
@@ -137,7 +163,7 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
raise exceptions.CommandError(
"Booleans are 'true' or 'false', got %s" % spec
)
- elif argtype == int:
+ elif issubclass(argtype, int):
try:
return int(spec)
except ValueError as e:
@@ -153,6 +179,8 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"Command requires one flow, specification matched %s." % len(flows)
)
return flows[0]
+ elif argtype == typing.Sequence[str]:
+ return [i.strip() for i in spec.split(",")]
else:
raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
diff --git a/mitmproxy/tools/console/flowlist.py b/mitmproxy/tools/console/flowlist.py
index be3d2dae..7364524f 100644
--- a/mitmproxy/tools/console/flowlist.py
+++ b/mitmproxy/tools/console/flowlist.py
@@ -2,7 +2,6 @@ import urwid
from mitmproxy.tools.console import common
from mitmproxy.tools.console import signals
-from mitmproxy.tools.console import master
from mitmproxy.addons import view
import mitmproxy.tools.console.master # noqa
@@ -185,7 +184,9 @@ class FlowListWalker(urwid.ListWalker):
class FlowListBox(urwid.ListBox):
- def __init__(self, master: master.ConsoleMaster) -> None:
+ def __init__(
+ self, master: "mitmproxy.tools.console.master.ConsoleMaster"
+ ) -> None:
self.master = master # type: "mitmproxy.tools.console.master.ConsoleMaster"
super().__init__(FlowListWalker(master))
diff --git a/mitmproxy/tools/console/master.py b/mitmproxy/tools/console/master.py
index b79125fb..9b651dcc 100644
--- a/mitmproxy/tools/console/master.py
+++ b/mitmproxy/tools/console/master.py
@@ -9,9 +9,11 @@ import subprocess
import sys
import tempfile
import traceback
+import typing
import urwid
+from mitmproxy import ctx
from mitmproxy import addons
from mitmproxy import command
from mitmproxy import master
@@ -84,12 +86,31 @@ class ConsoleAddon:
self.master = master
self.started = False
+ @command.command("console.choose")
+ def console_choose(
+ self, prompt: str, choicecmd: str, *cmd: typing.Sequence[str]
+ ) -> None:
+ """
+ Prompt the user to choose from a list of strings returned by a
+ command, then invoke another command with all occurances of {choice}
+ replaced by the choice the user made.
+ """
+ choices = ctx.master.commands.call_args(choicecmd, [])
+
+ def callback(opt):
+ repl = " ".join(cmd)
+ repl = repl.replace("{choice}", opt)
+ self.master.commands.call(repl)
+
+ self.master.overlay(overlay.Chooser(choicecmd, choices, "", callback))
+ ctx.log.info(choices)
+
@command.command("console.command")
- def console_command(self, partial: str) -> None:
+ def console_command(self, *partial: typing.Sequence[str]) -> None:
"""
Prompt the user to edit a command with a (possilby empty) starting value.
"""
- signals.status_prompt_command.send(partial=partial)
+ signals.status_prompt_command.send(partial=" ".join(partial) + " ") # type: ignore
@command.command("console.view.commands")
def view_commands(self) -> None:
@@ -146,16 +167,21 @@ def default_keymap(km):
km.add("O", "console.view.options")
km.add("Q", "console.exit")
km.add("q", "console.view.pop")
- km.add("i", "console.command 'set intercept='")
- km.add("W", "console.command 'set save_stream_file='")
+ km.add("i", "console.command set intercept=")
+ km.add("W", "console.command set save_stream_file=")
km.add("A", "flow.resume @all", context="flowlist")
km.add("a", "flow.resume @focus", context="flowlist")
- km.add("b", "console.command 'cut.save s.content|@focus '", context="flowlist")
+ km.add("b", "console.command cut.save s.content|@focus ''", context="flowlist")
km.add("d", "view.remove @focus", context="flowlist")
km.add("D", "view.duplicate @focus", context="flowlist")
km.add("e", "set console_eventlog=toggle", context="flowlist")
- km.add("E", "console.command 'export.file curl @focus '", context="flowlist")
+ km.add(
+ "E",
+ "console.choose Format export.formats "
+ "console.command export.file {choice} @focus ''",
+ context="flowlist"
+ )
km.add("f", "console.command 'set view_filter='", context="flowlist")
km.add("F", "set console_focus_follow=toggle", context="flowlist")
km.add("g", "view.go 0", context="flowlist")
diff --git a/mitmproxy/utils/typecheck.py b/mitmproxy/utils/typecheck.py
index c97ff529..a5f27fee 100644
--- a/mitmproxy/utils/typecheck.py
+++ b/mitmproxy/utils/typecheck.py
@@ -1,7 +1,7 @@
import typing
-def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool:
+def check_command_type(value: typing.Any, typeinfo: typing.Any) -> bool:
"""
Check if the provided value is an instance of typeinfo. Returns True if the
types match, False otherwise. This function supports only those types
@@ -17,7 +17,7 @@ def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool:
if not isinstance(value, (tuple, list)):
return False
for v in value:
- if not check_command_return_type(v, T):
+ if not check_command_type(v, T):
return False
elif typename.startswith("typing.Union"):
try:
@@ -26,7 +26,7 @@ def check_command_return_type(value: typing.Any, typeinfo: typing.Any) -> bool:
# Python 3.5.x
types = typeinfo.__union_params__ # type: ignore
for T in types:
- checks = [check_command_return_type(value, T) for T in types]
+ checks = [check_command_type(value, T) for T in types]
if not any(checks):
return False
elif value is None and typeinfo is None:
diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py
index ac082153..958328b2 100644
--- a/test/mitmproxy/test_command.py
+++ b/test/mitmproxy/test_command.py
@@ -1,9 +1,6 @@
import typing
from mitmproxy import command
from mitmproxy import flow
-from mitmproxy import master
-from mitmproxy import options
-from mitmproxy import proxy
from mitmproxy import exceptions
from mitmproxy.test import tflow
from mitmproxy.test import taddons
@@ -19,24 +16,41 @@ class TAddon:
def cmd2(self, foo: str) -> str:
return 99
+ def cmd3(self, foo: int) -> int:
+ return foo
+
def empty(self) -> None:
pass
+ def varargs(self, one: str, *var: typing.Sequence[str]) -> typing.Sequence[str]:
+ return list(var)
+
class TestCommand:
+ def test_varargs(self):
+ with taddons.context() as tctx:
+ cm = command.CommandManager(tctx.master)
+ a = TAddon()
+ c = command.Command(cm, "varargs", a.varargs)
+ assert c.signature_help() == "varargs str *str -> [str]"
+ assert c.call(["one", "two", "three"]) == ["two", "three"]
+ with pytest.raises(exceptions.CommandError):
+ c.call(["one", "two", 3])
+
def test_call(self):
- o = options.Options()
- m = master.Master(o, proxy.DummyServer(o))
- cm = command.CommandManager(m)
+ with taddons.context() as tctx:
+ cm = command.CommandManager(tctx.master)
+ a = TAddon()
+ c = command.Command(cm, "cmd.path", a.cmd1)
+ assert c.call(["foo"]) == "ret foo"
+ assert c.signature_help() == "cmd.path str -> str"
- a = TAddon()
- c = command.Command(cm, "cmd.path", a.cmd1)
- assert c.call(["foo"]) == "ret foo"
- assert c.signature_help() == "cmd.path str -> str"
+ c = command.Command(cm, "cmd.two", a.cmd2)
+ with pytest.raises(exceptions.CommandError):
+ c.call(["foo"])
- c = command.Command(cm, "cmd.two", a.cmd2)
- with pytest.raises(exceptions.CommandError):
- c.call(["foo"])
+ c = command.Command(cm, "cmd.three", a.cmd3)
+ assert c.call(["1"]) == 1
def test_simple():
@@ -74,14 +88,12 @@ def test_typename():
class DummyConsole:
- def load(self, l):
- l.add_command("view.resolve", self.resolve)
- l.add_command("cut", self.cut)
-
+ @command.command("view.resolve")
def resolve(self, spec: str) -> typing.Sequence[flow.Flow]:
n = int(spec)
return [tflow.tflow(resp=True)] * n
+ @command.command("cut")
def cut(self, spec: str) -> command.Cuts:
return [["test"]]
@@ -115,6 +127,13 @@ def test_parsearg():
tctx.master.commands, "foo", command.Cuts
) == [["test"]]
+ assert command.parsearg(
+ tctx.master.commands, "foo", typing.Sequence[str]
+ ) == ["foo"]
+ assert command.parsearg(
+ tctx.master.commands, "foo, bar", typing.Sequence[str]
+ ) == ["foo", "bar"]
+
class TDec:
@command.command("cmd1")
diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py
index 17f70d37..fe33070e 100644
--- a/test/mitmproxy/utils/test_typecheck.py
+++ b/test/mitmproxy/utils/test_typecheck.py
@@ -88,25 +88,26 @@ def test_check_any():
typecheck.check_option_type("foo", None, typing.Any)
-def test_check_command_return_type():
- assert(typecheck.check_command_return_type("foo", str))
- assert(typecheck.check_command_return_type(["foo"], typing.Sequence[str]))
- assert(typecheck.check_command_return_type(None, None))
- assert(not typecheck.check_command_return_type(["foo"], typing.Sequence[int]))
- assert(not typecheck.check_command_return_type("foo", typing.Sequence[int]))
- assert(typecheck.check_command_return_type([["foo", b"bar"]], command.Cuts))
- assert(not typecheck.check_command_return_type(["foo", b"bar"], command.Cuts))
- assert(not typecheck.check_command_return_type([["foo", 22]], command.Cuts))
+def test_check_command_type():
+ assert(typecheck.check_command_type("foo", str))
+ assert(typecheck.check_command_type(["foo"], typing.Sequence[str]))
+ assert(not typecheck.check_command_type(["foo", 1], typing.Sequence[str]))
+ assert(typecheck.check_command_type(None, None))
+ assert(not typecheck.check_command_type(["foo"], typing.Sequence[int]))
+ assert(not typecheck.check_command_type("foo", typing.Sequence[int]))
+ assert(typecheck.check_command_type([["foo", b"bar"]], command.Cuts))
+ assert(not typecheck.check_command_type(["foo", b"bar"], command.Cuts))
+ assert(not typecheck.check_command_type([["foo", 22]], command.Cuts))
# Python 3.5 only defines __parameters__
m = mock.Mock()
m.__str__ = lambda self: "typing.Sequence"
m.__parameters__ = (int,)
- typecheck.check_command_return_type([10], m)
+ typecheck.check_command_type([10], m)
# Python 3.5 only defines __union_params__
m = mock.Mock()
m.__str__ = lambda self: "typing.Union"
m.__union_params__ = (int,)
- assert not typecheck.check_command_return_type([22], m)
+ assert not typecheck.check_command_type([22], m)