aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--examples/complex/remote_debug.py4
-rw-r--r--mitmproxy/addons/core.py49
-rw-r--r--mitmproxy/addons/export.py20
-rw-r--r--mitmproxy/addons/view.py58
-rw-r--r--mitmproxy/command.py320
-rw-r--r--mitmproxy/command_lexer.py49
-rw-r--r--mitmproxy/tools/console/commander/commander.py157
-rw-r--r--mitmproxy/tools/console/commands.py23
-rw-r--r--mitmproxy/tools/console/consoleaddons.py131
-rw-r--r--mitmproxy/tools/console/defaultkeys.py10
-rw-r--r--mitmproxy/tools/console/statusbar.py12
-rw-r--r--mitmproxy/types.py125
-rw-r--r--setup.cfg2
-rw-r--r--test/mitmproxy/addons/test_core.py2
-rw-r--r--test/mitmproxy/addons/test_save.py2
-rw-r--r--test/mitmproxy/test_command.py321
-rw-r--r--test/mitmproxy/test_command_lexer.py38
-rw-r--r--test/mitmproxy/test_types.py15
-rw-r--r--test/mitmproxy/tools/console/test_commander.py143
-rw-r--r--test/mitmproxy/tools/console/test_defaultkeys.py19
20 files changed, 949 insertions, 551 deletions
diff --git a/examples/complex/remote_debug.py b/examples/complex/remote_debug.py
index fa6f3d33..4b117bdb 100644
--- a/examples/complex/remote_debug.py
+++ b/examples/complex/remote_debug.py
@@ -15,5 +15,5 @@ Usage:
def load(l):
- import pydevd
- pydevd.settrace("localhost", port=5678, stdoutToServer=True, stderrToServer=True)
+ import pydevd_pycharm
+ pydevd_pycharm.settrace("localhost", port=5678, stdoutToServer=True, stderrToServer=True, suspend=False)
diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py
index 5c9bbcd0..8a3acedb 100644
--- a/mitmproxy/addons/core.py
+++ b/mitmproxy/addons/core.py
@@ -83,15 +83,14 @@ class Core:
)
@command.command("set")
- def set(self, *spec: str) -> None:
+ def set(self, option: str, value: str = "") -> None:
"""
- Set an option of the form "key[=value]". When the value is omitted,
- booleans are set to true, strings and integers are set to None (if
- permitted), and sequences are emptied. Boolean values can be true,
- false or toggle. If multiple specs are passed, they are joined
- into one separated by spaces.
+ Set an option. When the value is omitted, booleans are set to true,
+ strings and integers are set to None (if permitted), and sequences
+ are emptied. Boolean values can be true, false or toggle.
+ Multiple values are concatenated with a single space.
"""
- strspec = " ".join(spec)
+ strspec = f"{option}={value}"
try:
ctx.options.set(strspec)
except exceptions.OptionsError as e:
@@ -109,14 +108,14 @@ class Core:
# FIXME: this will become view.mark later
@command.command("flow.mark")
- def mark(self, flows: typing.Sequence[flow.Flow], val: bool) -> None:
+ def mark(self, flows: typing.Sequence[flow.Flow], boolean: bool) -> None:
"""
Mark flows.
"""
updated = []
for i in flows:
- if i.marked != val:
- i.marked = val
+ if i.marked != boolean:
+ i.marked = boolean
updated.append(i)
ctx.master.addons.trigger("update", updated)
@@ -169,18 +168,18 @@ class Core:
]
@command.command("flow.set")
- @command.argument("spec", type=mitmproxy.types.Choice("flow.set.options"))
+ @command.argument("attr", type=mitmproxy.types.Choice("flow.set.options"))
def flow_set(
self,
flows: typing.Sequence[flow.Flow],
- spec: str,
- sval: str
+ attr: str,
+ value: str
) -> None:
"""
Quickly set a number of common values on flows.
"""
- val: typing.Union[int, str] = sval
- if spec == "status_code":
+ val: typing.Union[int, str] = value
+ if attr == "status_code":
try:
val = int(val) # type: ignore
except ValueError as v:
@@ -193,13 +192,13 @@ class Core:
req = getattr(f, "request", None)
rupdate = True
if req:
- if spec == "method":
+ if attr == "method":
req.method = val
- elif spec == "host":
+ elif attr == "host":
req.host = val
- elif spec == "path":
+ elif attr == "path":
req.path = val
- elif spec == "url":
+ elif attr == "url":
try:
req.url = val
except ValueError as e:
@@ -212,11 +211,11 @@ class Core:
resp = getattr(f, "response", None)
supdate = True
if resp:
- if spec == "status_code":
+ if attr == "status_code":
resp.status_code = val
if val in status_codes.RESPONSES:
resp.reason = status_codes.RESPONSES[val] # type: ignore
- elif spec == "reason":
+ elif attr == "reason":
resp.reason = val
else:
supdate = False
@@ -225,7 +224,7 @@ class Core:
updated.append(f)
ctx.master.addons.trigger("update", updated)
- ctx.log.alert("Set %s on %s flows." % (spec, len(updated)))
+ ctx.log.alert("Set %s on %s flows." % (attr, len(updated)))
@command.command("flow.decode")
def decode(self, flows: typing.Sequence[flow.Flow], part: str) -> None:
@@ -262,12 +261,12 @@ class Core:
ctx.log.alert("Toggled encoding on %s flows." % len(updated))
@command.command("flow.encode")
- @command.argument("enc", type=mitmproxy.types.Choice("flow.encode.options"))
+ @command.argument("encoding", type=mitmproxy.types.Choice("flow.encode.options"))
def encode(
self,
flows: typing.Sequence[flow.Flow],
part: str,
- enc: str,
+ encoding: str,
) -> None:
"""
Encode flows with a specified encoding.
@@ -279,7 +278,7 @@ class Core:
current_enc = p.headers.get("content-encoding", "identity")
if current_enc == "identity":
f.backup()
- p.encode(enc)
+ p.encode(encoding)
updated.append(f)
ctx.master.addons.trigger("update", updated)
ctx.log.alert("Encoded %s flows." % len(updated))
diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py
index 68df9374..d874c95a 100644
--- a/mitmproxy/addons/export.py
+++ b/mitmproxy/addons/export.py
@@ -121,14 +121,14 @@ class Export():
return list(sorted(formats.keys()))
@command.command("export.file")
- def file(self, fmt: str, f: flow.Flow, path: mitmproxy.types.Path) -> None:
+ def file(self, format: str, flow: flow.Flow, path: mitmproxy.types.Path) -> None:
"""
Export a flow to path.
"""
- if fmt not in formats:
- raise exceptions.CommandError("No such export format: %s" % fmt)
- func: typing.Any = formats[fmt]
- v = func(f)
+ if format not in formats:
+ raise exceptions.CommandError("No such export format: %s" % format)
+ func: typing.Any = formats[format]
+ v = func(flow)
try:
with open(path, "wb") as fp:
if isinstance(v, bytes):
@@ -139,14 +139,14 @@ class Export():
ctx.log.error(str(e))
@command.command("export.clip")
- def clip(self, fmt: str, f: flow.Flow) -> None:
+ def clip(self, format: str, flow: flow.Flow) -> None:
"""
Export a flow to the system clipboard.
"""
- if fmt not in formats:
- raise exceptions.CommandError("No such export format: %s" % fmt)
- func: typing.Any = formats[fmt]
- v = strutils.always_str(func(f))
+ if format not in formats:
+ raise exceptions.CommandError("No such export format: %s" % format)
+ func: typing.Any = formats[format]
+ v = strutils.always_str(func(flow))
try:
pyperclip.copy(v)
except pyperclip.PyperclipException as e:
diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py
index da9d19f9..1d57d781 100644
--- a/mitmproxy/addons/view.py
+++ b/mitmproxy/addons/view.py
@@ -217,7 +217,7 @@ class View(collections.abc.Sequence):
# Focus
@command.command("view.focus.go")
- def go(self, dst: int) -> None:
+ def go(self, offset: int) -> None:
"""
Go to a specified offset. Positive offests are from the beginning of
the view, negative from the end of the view, so that 0 is the first
@@ -225,13 +225,13 @@ class View(collections.abc.Sequence):
"""
if len(self) == 0:
return
- if dst < 0:
- dst = len(self) + dst
- if dst < 0:
- dst = 0
- if dst > len(self) - 1:
- dst = len(self) - 1
- self.focus.flow = self[dst]
+ if offset < 0:
+ offset = len(self) + offset
+ if offset < 0:
+ offset = 0
+ if offset > len(self) - 1:
+ offset = len(self) - 1
+ self.focus.flow = self[offset]
@command.command("view.focus.next")
def focus_next(self) -> None:
@@ -266,20 +266,20 @@ class View(collections.abc.Sequence):
return list(sorted(self.orders.keys()))
@command.command("view.order.reverse")
- def set_reversed(self, value: bool) -> None:
- self.order_reversed = value
+ def set_reversed(self, boolean: bool) -> None:
+ self.order_reversed = boolean
self.sig_view_refresh.send(self)
@command.command("view.order.set")
- def set_order(self, order: str) -> None:
+ def set_order(self, order_key: str) -> None:
"""
Sets the current view order.
"""
- if order not in self.orders:
+ if order_key not in self.orders:
raise exceptions.CommandError(
- "Unknown flow order: %s" % order
+ "Unknown flow order: %s" % order_key
)
- order_key = self.orders[order]
+ order_key = self.orders[order_key]
self.order_key = order_key
newview = sortedcontainers.SortedListWithKey(key=order_key)
newview.update(self._view)
@@ -298,16 +298,16 @@ class View(collections.abc.Sequence):
# Filter
@command.command("view.filter.set")
- def set_filter_cmd(self, f: str) -> None:
+ def set_filter_cmd(self, filter_expr: str) -> None:
"""
Sets the current view filter.
"""
filt = None
- if f:
- filt = flowfilter.parse(f)
+ if filter_expr:
+ filt = flowfilter.parse(filter_expr)
if not filt:
raise exceptions.CommandError(
- "Invalid interception filter: %s" % f
+ "Invalid interception filter: %s" % filter_expr
)
self.set_filter(filt)
@@ -340,11 +340,11 @@ class View(collections.abc.Sequence):
# View Settings
@command.command("view.settings.getval")
- def getvalue(self, f: mitmproxy.flow.Flow, key: str, default: str) -> str:
+ def getvalue(self, flow: mitmproxy.flow.Flow, key: str, default: str) -> str:
"""
Get a value from the settings store for the specified flow.
"""
- return self.settings[f].get(key, default)
+ return self.settings[flow].get(key, default)
@command.command("view.settings.setval.toggle")
def setvalue_toggle(
@@ -412,26 +412,26 @@ class View(collections.abc.Sequence):
ctx.log.alert("Removed %s flows" % len(flows))
@command.command("view.flows.resolve")
- def resolve(self, spec: str) -> typing.Sequence[mitmproxy.flow.Flow]:
+ def resolve(self, flow_spec: str) -> typing.Sequence[mitmproxy.flow.Flow]:
"""
Resolve a flow list specification to an actual list of flows.
"""
- if spec == "@all":
+ if flow_spec == "@all":
return [i for i in self._store.values()]
- if spec == "@focus":
+ if flow_spec == "@focus":
return [self.focus.flow] if self.focus.flow else []
- elif spec == "@shown":
+ elif flow_spec == "@shown":
return [i for i in self]
- elif spec == "@hidden":
+ elif flow_spec == "@hidden":
return [i for i in self._store.values() if i not in self._view]
- elif spec == "@marked":
+ elif flow_spec == "@marked":
return [i for i in self._store.values() if i.marked]
- elif spec == "@unmarked":
+ elif flow_spec == "@unmarked":
return [i for i in self._store.values() if not i.marked]
else:
- filt = flowfilter.parse(spec)
+ filt = flowfilter.parse(flow_spec)
if not filt:
- raise exceptions.CommandError("Invalid flow filter: %s" % spec)
+ raise exceptions.CommandError("Invalid flow filter: %s" % flow_spec)
return [i for i in self._store.values() if filt(i)]
@command.command("view.flows.create")
diff --git a/mitmproxy/command.py b/mitmproxy/command.py
index 0998601c..6977ff91 100644
--- a/mitmproxy/command.py
+++ b/mitmproxy/command.py
@@ -1,20 +1,19 @@
"""
This module manages and invokes typed commands.
"""
+import functools
import inspect
+import sys
+import textwrap
import types
-import io
import typing
-import shlex
-import textwrap
-import functools
-import sys
-from mitmproxy import exceptions
import mitmproxy.types
+from mitmproxy import exceptions, command_lexer
+from mitmproxy.command_lexer import unquote
-def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
+def verify_arg_signature(f: typing.Callable, args: typing.Iterable[typing.Any], kwargs: dict) -> None:
sig = inspect.signature(f)
try:
sig.bind(*args, **kwargs)
@@ -22,15 +21,6 @@ def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
raise exceptions.CommandError("command argument mismatch: %s" % v.args[0])
-def lexer(s):
- # mypy mis-identifies shlex.shlex as abstract
- lex = shlex.shlex(s, posix=True) # type: ignore
- lex.wordchars += "."
- lex.whitespace_split = True
- lex.commenters = ''
- return lex
-
-
def typename(t: type) -> str:
"""
Translates a type to an explanatory string.
@@ -43,208 +33,234 @@ def typename(t: type) -> str:
return to.display
+def _empty_as_none(x: typing.Any) -> typing.Any:
+ if x == inspect.Signature.empty:
+ return None
+ return x
+
+
+class CommandParameter(typing.NamedTuple):
+ name: str
+ type: typing.Type
+ kind: inspect._ParameterKind = inspect.Parameter.POSITIONAL_OR_KEYWORD
+
+ def __str__(self):
+ if self.kind is inspect.Parameter.VAR_POSITIONAL:
+ return f"*{self.name}"
+ else:
+ return self.name
+
+
class Command:
- returntype: typing.Optional[typing.Type]
+ name: str
+ manager: "CommandManager"
+ signature: inspect.Signature
+ help: typing.Optional[str]
- def __init__(self, manager, path, func) -> None:
- self.path = path
+ def __init__(self, manager: "CommandManager", name: str, func: typing.Callable) -> None:
+ self.name = name
self.manager = manager
self.func = func
- sig = inspect.signature(self.func)
- self.help = None
+ self.signature = inspect.signature(self.func)
+
if func.__doc__:
txt = func.__doc__.strip()
self.help = "\n".join(textwrap.wrap(txt))
-
- self.has_positional = False
- for i in sig.parameters.values():
- # This is the kind for *args parameters
- if i.kind == i.VAR_POSITIONAL:
- self.has_positional = True
- self.paramtypes = [v.annotation for v in sig.parameters.values()]
- if sig.return_annotation == inspect._empty: # type: ignore
- self.returntype = None
else:
- self.returntype = sig.return_annotation
+ self.help = None
+
# This fails with a CommandException if types are invalid
- self.signature_help()
+ for name, parameter in self.signature.parameters.items():
+ t = parameter.annotation
+ if not mitmproxy.types.CommandTypes.get(parameter.annotation, None):
+ raise exceptions.CommandError(f"Argument {name} has an unknown type ({_empty_as_none(t)}) in {func}.")
+ if self.return_type and not mitmproxy.types.CommandTypes.get(self.return_type, None):
+ raise exceptions.CommandError(f"Return type has an unknown type ({self.return_type}) in {func}.")
+
+ @property
+ def return_type(self) -> typing.Optional[typing.Type]:
+ return _empty_as_none(self.signature.return_annotation)
+
+ @property
+ def parameters(self) -> typing.List[CommandParameter]:
+ """Returns a list of CommandParameters."""
+ ret = []
+ for name, param in self.signature.parameters.items():
+ ret.append(CommandParameter(name, param.annotation, param.kind))
+ return ret
+
+ def signature_help(self) -> str:
+ params = " ".join(str(param) for param in self.parameters)
+ if self.return_type:
+ ret = f" -> {typename(self.return_type)}"
+ else:
+ ret = ""
+ return f"{self.name} {params}{ret}"
- def paramnames(self) -> typing.Sequence[str]:
- v = [typename(i) for i in self.paramtypes]
- if self.has_positional:
- v[-1] = "*" + v[-1]
- return v
+ def prepare_args(self, args: typing.Sequence[str]) -> inspect.BoundArguments:
+ try:
+ bound_arguments = self.signature.bind(*args)
+ except TypeError as v:
+ raise exceptions.CommandError(f"Command argument mismatch: {v.args[0]}")
- def retname(self) -> str:
- return typename(self.returntype) if self.returntype else ""
+ for name, value in bound_arguments.arguments.items():
+ convert_to = self.signature.parameters[name].annotation
+ bound_arguments.arguments[name] = parsearg(self.manager, value, convert_to)
- def signature_help(self) -> str:
- params = " ".join(self.paramnames())
- ret = self.retname()
- if ret:
- ret = " -> " + ret
- return "%s %s%s" % (self.path, params, ret)
-
- def prepare_args(self, args: typing.Sequence[str]) -> typing.List[typing.Any]:
- verify_arg_signature(self.func, list(args), {})
-
- remainder: typing.Sequence[str] = []
- if self.has_positional:
- remainder = args[len(self.paramtypes) - 1:]
- args = args[:len(self.paramtypes) - 1]
-
- pargs = []
- for arg, paramtype in zip(args, self.paramtypes):
- pargs.append(parsearg(self.manager, arg, paramtype))
- pargs.extend(remainder)
- return pargs
+ bound_arguments.apply_defaults()
+
+ return bound_arguments
def call(self, args: typing.Sequence[str]) -> typing.Any:
"""
- Call the command with a list of arguments. At this point, all
- arguments are strings.
+ Call the command with a list of arguments. At this point, all
+ arguments are strings.
"""
- ret = self.func(*self.prepare_args(args))
- if ret is None and self.returntype is None:
+ bound_args = self.prepare_args(args)
+ ret = self.func(*bound_args.args, **bound_args.kwargs)
+ if ret is None and self.return_type is None:
return
- typ = mitmproxy.types.CommandTypes.get(self.returntype)
+ typ = mitmproxy.types.CommandTypes.get(self.return_type)
+ assert typ
if not typ.is_valid(self.manager, typ, ret):
raise exceptions.CommandError(
- "%s returned unexpected data - expected %s" % (
- self.path, typ.display
- )
+ f"{self.name} returned unexpected data - expected {typ.display}"
)
return ret
-ParseResult = typing.NamedTuple(
- "ParseResult",
- [
- ("value", str),
- ("type", typing.Type),
- ("valid", bool),
- ],
-)
+class ParseResult(typing.NamedTuple):
+ value: str
+ type: typing.Type
+ valid: bool
+
+class CommandManager:
+ commands: typing.Dict[str, Command]
-class CommandManager(mitmproxy.types._CommandBase):
def __init__(self, master):
self.master = master
- self.commands: typing.Dict[str, Command] = {}
+ self.commands = {}
def collect_commands(self, addon):
for i in dir(addon):
if not i.startswith("__"):
o = getattr(addon, i)
try:
- is_command = hasattr(o, "command_path")
+ is_command = hasattr(o, "command_name")
except Exception:
pass # hasattr may raise if o implements __getattr__.
else:
if is_command:
try:
- self.add(o.command_path, o)
+ self.add(o.command_name, o)
except exceptions.CommandError as e:
self.master.log.warn(
- "Could not load command %s: %s" % (o.command_path, e)
+ "Could not load command %s: %s" % (o.command_name, e)
)
def add(self, path: str, func: typing.Callable):
self.commands[path] = Command(self, path, func)
+ @functools.lru_cache(maxsize=128)
def parse_partial(
- self,
- cmdstr: str
- ) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[str]]:
+ self,
+ cmdstr: str
+ ) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[CommandParameter]]:
"""
- Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items.
+ Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items.
"""
- buf = io.StringIO(cmdstr)
- parts: typing.List[str] = []
- lex = lexer(buf)
- while 1:
- remainder = cmdstr[buf.tell():]
- try:
- t = lex.get_token()
- except ValueError:
- parts.append(remainder)
- break
- if not t:
- break
- parts.append(t)
- if not parts:
- parts = [""]
- elif cmdstr.endswith(" "):
- parts.append("")
-
- parse: typing.List[ParseResult] = []
- params: typing.List[type] = []
- typ: typing.Type
- for i in range(len(parts)):
- if i == 0:
- typ = mitmproxy.types.Cmd
- if parts[i] in self.commands:
- params.extend(self.commands[parts[i]].paramtypes)
- elif params:
- typ = params.pop(0)
- if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg:
- if parts[i] in self.commands:
- params[:] = self.commands[parts[i]].paramtypes
+
+ parts: typing.List[str] = command_lexer.expr.parseString(cmdstr, parseAll=True)
+
+ parsed: typing.List[ParseResult] = []
+ next_params: typing.List[CommandParameter] = [
+ CommandParameter("", mitmproxy.types.Cmd),
+ CommandParameter("", mitmproxy.types.CmdArgs),
+ ]
+ expected: typing.Optional[CommandParameter] = None
+ for part in parts:
+ if part.isspace():
+ parsed.append(
+ ParseResult(
+ value=part,
+ type=mitmproxy.types.Space,
+ valid=True,
+ )
+ )
+ continue
+
+ if expected and expected.kind is inspect.Parameter.VAR_POSITIONAL:
+ assert not next_params
+ elif next_params:
+ expected = next_params.pop(0)
else:
- typ = mitmproxy.types.Unknown
+ expected = CommandParameter("", mitmproxy.types.Unknown)
- to = mitmproxy.types.CommandTypes.get(typ, None)
+ arg_is_known_command = (
+ expected.type == mitmproxy.types.Cmd and part in self.commands
+ )
+ arg_is_unknown_command = (
+ expected.type == mitmproxy.types.Cmd and part not in self.commands
+ )
+ command_args_following = (
+ next_params and next_params[0].type == mitmproxy.types.CmdArgs
+ )
+ if arg_is_known_command and command_args_following:
+ next_params = self.commands[part].parameters + next_params[1:]
+ if arg_is_unknown_command and command_args_following:
+ next_params.pop(0)
+
+ to = mitmproxy.types.CommandTypes.get(expected.type, None)
valid = False
if to:
try:
- to.parse(self, typ, parts[i])
+ to.parse(self, expected.type, part)
except exceptions.TypeError:
valid = False
else:
valid = True
- parse.append(
+ parsed.append(
ParseResult(
- value=parts[i],
- type=typ,
+ value=part,
+ type=expected.type,
valid=valid,
)
)
- remhelp: typing.List[str] = []
- for x in params:
- remt = mitmproxy.types.CommandTypes.get(x, None)
- remhelp.append(remt.display)
-
- return parse, remhelp
+ return parsed, next_params
- def call(self, path: str, *args: typing.Sequence[typing.Any]) -> typing.Any:
+ def call(self, command_name: str, *args: typing.Sequence[typing.Any]) -> typing.Any:
"""
- Call a command with native arguments. May raise CommandError.
+ Call a command with native arguments. May raise CommandError.
"""
- if path not in self.commands:
- raise exceptions.CommandError("Unknown command: %s" % path)
- return self.commands[path].func(*args)
+ if command_name not in self.commands:
+ raise exceptions.CommandError("Unknown command: %s" % command_name)
+ return self.commands[command_name].func(*args)
- def call_strings(self, path: str, args: typing.Sequence[str]) -> typing.Any:
+ def _call_strings(self, command_name: str, args: typing.Sequence[str]) -> typing.Any:
"""
- Call a command using a list of string arguments. May raise CommandError.
+ Call a command using a list of string arguments. May raise CommandError.
"""
- if path not in self.commands:
- raise exceptions.CommandError("Unknown command: %s" % path)
- return self.commands[path].call(args)
+ if command_name not in self.commands:
+ raise exceptions.CommandError("Unknown command: %s" % command_name)
- def execute(self, cmdstr: str):
+ return self.commands[command_name].call(args)
+
+ def execute(self, cmdstr: str) -> typing.Any:
"""
- Execute a command string. May raise CommandError.
+ Execute a command string. May raise CommandError.
"""
- try:
- parts = list(lexer(cmdstr))
- except ValueError as e:
- raise exceptions.CommandError("Command error: %s" % e)
- if not len(parts) >= 1:
- raise exceptions.CommandError("Invalid command: %s" % cmdstr)
- return self.call_strings(parts[0], parts[1:])
+ parts, _ = self.parse_partial(cmdstr)
+ if not parts:
+ raise exceptions.CommandError(f"Invalid command: {cmdstr!r}")
+ command_name, *args = [
+ unquote(part.value)
+ for part in parts
+ if part.type != mitmproxy.types.Space
+ ]
+ return self._call_strings(command_name, args)
def dump(self, out=sys.stdout) -> None:
cmds = list(self.commands.values())
@@ -262,21 +278,23 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"""
t = mitmproxy.types.CommandTypes.get(argtype, None)
if not t:
- raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
+ raise exceptions.CommandError(f"Unsupported argument type: {argtype}")
try:
- return t.parse(manager, argtype, spec) # type: ignore
+ return t.parse(manager, argtype, spec)
except exceptions.TypeError as e:
raise exceptions.CommandError from e
-def command(path):
+def command(name: typing.Optional[str] = None):
def decorator(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
verify_arg_signature(function, args, kwargs)
return function(*args, **kwargs)
- wrapper.__dict__["command_path"] = path
+
+ wrapper.__dict__["command_name"] = name or function.__name__.replace("_", ".")
return wrapper
+
return decorator
@@ -286,8 +304,10 @@ def argument(name, type):
specific types such as mitmproxy.types.Choice, which we cannot annotate
directly as mypy does not like that.
"""
+
def decorator(f: types.FunctionType) -> types.FunctionType:
assert name in f.__annotations__
f.__annotations__[name] = type
return f
+
return decorator
diff --git a/mitmproxy/command_lexer.py b/mitmproxy/command_lexer.py
new file mode 100644
index 00000000..f042f3c9
--- /dev/null
+++ b/mitmproxy/command_lexer.py
@@ -0,0 +1,49 @@
+import ast
+import re
+
+import pyparsing
+
+# TODO: There is a lot of work to be done here.
+# The current implementation is written in a way that _any_ input is valid,
+# which does not make sense once things get more complex.
+
+PartialQuotedString = pyparsing.Regex(
+ re.compile(
+ r'''
+ (["']) # start quote
+ (?:
+ (?!\1)[^\\] # unescaped character that is not our quote nor the begin of an escape sequence. We can't use \1 in []
+ |
+ (?:\\.) # escape sequence
+ )*
+ (?:\1|$) # end quote
+ ''',
+ re.VERBOSE
+ )
+)
+
+expr = pyparsing.ZeroOrMore(
+ PartialQuotedString
+ | pyparsing.Word(" \r\n\t")
+ | pyparsing.CharsNotIn("""'" \r\n\t""")
+).leaveWhitespace()
+
+
+def quote(val: str) -> str:
+ if val and all(char not in val for char in "'\" \r\n\t"):
+ return val
+ return repr(val) # TODO: More of a hack.
+
+
+def unquote(x: str) -> str:
+ quoted = (
+ (x.startswith('"') and x.endswith('"'))
+ or
+ (x.startswith("'") and x.endswith("'"))
+ )
+ if quoted:
+ try:
+ x = ast.literal_eval(x)
+ except Exception:
+ x = x[1:-1]
+ return x
diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py
index f291b8fd..d751422b 100644
--- a/mitmproxy/tools/console/commander/commander.py
+++ b/mitmproxy/tools/console/commander/commander.py
@@ -1,58 +1,55 @@
import abc
+import collections
import copy
import typing
-import collections
import urwid
from urwid.text_layout import calc_coords
+import mitmproxy.command
import mitmproxy.flow
import mitmproxy.master
-import mitmproxy.command
import mitmproxy.types
-class Completer: # pragma: no cover
+class Completer:
@abc.abstractmethod
- def cycle(self) -> str:
- pass
+ def cycle(self, forward: bool = True) -> str:
+ raise NotImplementedError()
class ListCompleter(Completer):
def __init__(
- self,
- start: str,
- options: typing.Sequence[str],
+ self,
+ start: str,
+ options: typing.Sequence[str],
) -> None:
self.start = start
- self.options: typing.Sequence[str] = []
+ self.options: typing.List[str] = []
for o in options:
if o.startswith(start):
self.options.append(o)
self.options.sort()
self.offset = 0
- def cycle(self) -> str:
+ def cycle(self, forward: bool = True) -> str:
if not self.options:
return self.start
ret = self.options[self.offset]
- self.offset = (self.offset + 1) % len(self.options)
+ delta = 1 if forward else -1
+ self.offset = (self.offset + delta) % len(self.options)
return ret
-CompletionState = typing.NamedTuple(
- "CompletionState",
- [
- ("completer", Completer),
- ("parse", typing.Sequence[mitmproxy.command.ParseResult])
- ]
-)
+class CompletionState(typing.NamedTuple):
+ completer: Completer
+ parsed: typing.Sequence[mitmproxy.command.ParseResult]
class CommandBuffer:
def __init__(self, master: mitmproxy.master.Master, start: str = "") -> None:
self.master = master
- self.text = self.flatten(start)
+ self.text = start
# Cursor is always within the range [0:len(buffer)].
self._cursor = len(self.text)
self.completion: typing.Optional[CompletionState] = None
@@ -70,51 +67,30 @@ class CommandBuffer:
else:
self._cursor = x
- def maybequote(self, value):
- if " " in value and not value.startswith("\""):
- return "\"%s\"" % value
- return value
-
- def parse_quoted(self, txt):
- parts, remhelp = self.master.commands.parse_partial(txt)
- for i, p in enumerate(parts):
- parts[i] = mitmproxy.command.ParseResult(
- value = self.maybequote(p.value),
- type = p.type,
- valid = p.valid
- )
- return parts, remhelp
-
def render(self):
- """
- This function is somewhat tricky - in order to make the cursor
- position valid, we have to make sure there is a
- character-for-character offset match in the rendered output, up
- to the cursor. Beyond that, we can add stuff.
- """
- parts, remhelp = self.parse_quoted(self.text)
+ parts, remaining = self.master.commands.parse_partial(self.text)
ret = []
- for p in parts:
- if p.valid:
- if p.type == mitmproxy.types.Cmd:
- ret.append(("commander_command", p.value))
- else:
- ret.append(("text", p.value))
- elif p.value:
- ret.append(("commander_invalid", p.value))
- else:
- ret.append(("text", ""))
- ret.append(("text", " "))
- if remhelp:
- ret.append(("text", " "))
- for v in remhelp:
- ret.append(("commander_hint", "%s " % v))
- return ret
+ if not parts:
+ # Means we just received the leader, so we need to give a blank
+ # text to the widget to render or it crashes
+ ret.append(("text", ""))
+ else:
+ for p in parts:
+ if p.valid:
+ if p.type == mitmproxy.types.Cmd:
+ ret.append(("commander_command", p.value))
+ else:
+ ret.append(("text", p.value))
+ elif p.value:
+ ret.append(("commander_invalid", p.value))
+
+ if remaining:
+ if parts[-1].type != mitmproxy.types.Space:
+ ret.append(("text", " "))
+ for param in remaining:
+ ret.append(("commander_hint", f"{param} "))
- def flatten(self, txt):
- parts, _ = self.parse_quoted(txt)
- ret = [x.value for x in parts]
- return " ".join(ret)
+ return ret
def left(self) -> None:
self.cursor = self.cursor - 1
@@ -122,30 +98,38 @@ class CommandBuffer:
def right(self) -> None:
self.cursor = self.cursor + 1
- def cycle_completion(self) -> None:
+ def cycle_completion(self, forward: bool = True) -> None:
if not self.completion:
- parts, remainhelp = self.master.commands.parse_partial(self.text[:self.cursor])
- last = parts[-1]
- ct = mitmproxy.types.CommandTypes.get(last.type, None)
+ parts, remaining = self.master.commands.parse_partial(self.text[:self.cursor])
+ if parts and parts[-1].type != mitmproxy.types.Space:
+ type_to_complete = parts[-1].type
+ cycle_prefix = parts[-1].value
+ parsed = parts[:-1]
+ elif remaining:
+ type_to_complete = remaining[0].type
+ cycle_prefix = ""
+ parsed = parts
+ else:
+ return
+ ct = mitmproxy.types.CommandTypes.get(type_to_complete, None)
if ct:
self.completion = CompletionState(
- completer = ListCompleter(
- parts[-1].value,
- ct.completion(self.master.commands, last.type, parts[-1].value)
+ completer=ListCompleter(
+ cycle_prefix,
+ ct.completion(self.master.commands, type_to_complete, cycle_prefix)
),
- parse = parts,
+ parsed=parsed,
)
if self.completion:
- nxt = self.completion.completer.cycle()
- buf = " ".join([i.value for i in self.completion.parse[:-1]]) + " " + nxt
- buf = buf.strip()
- self.text = self.flatten(buf)
+ nxt = self.completion.completer.cycle(forward)
+ buf = "".join([i.value for i in self.completion.parsed]) + nxt
+ self.text = buf
self.cursor = len(self.text)
def backspace(self) -> None:
if self.cursor == 0:
return
- self.text = self.flatten(self.text[:self.cursor - 1] + self.text[self.cursor:])
+ self.text = self.text[:self.cursor - 1] + self.text[self.cursor:]
self.cursor = self.cursor - 1
self.completion = None
@@ -153,13 +137,18 @@ class CommandBuffer:
"""
Inserts text at the cursor.
"""
- self.text = self.flatten(self.text[:self.cursor] + k + self.text[self.cursor:])
- self.cursor += 1
+
+ # We don't want to insert a space before the command
+ if k == ' ' and self.text[0:self.cursor].strip() == '':
+ return
+
+ self.text = self.text[:self.cursor] + k + self.text[self.cursor:]
+ self.cursor += len(k)
self.completion = None
class CommandHistory:
- def __init__(self, master: mitmproxy.master.Master, size: int=30) -> None:
+ def __init__(self, master: mitmproxy.master.Master, size: int = 30) -> None:
self.saved_commands: collections.deque = collections.deque(
[CommandBuffer(master, "")],
maxlen=size
@@ -182,7 +171,7 @@ class CommandHistory:
return self.saved_commands[self.index]
return None
- def add_command(self, command: CommandBuffer, execution: bool=False) -> None:
+ def add_command(self, command: CommandBuffer, execution: bool = False) -> None:
if self.index == self.last_index or execution:
last_item = self.saved_commands[-1]
last_item_empty = not last_item.text
@@ -207,7 +196,7 @@ class CommandEdit(urwid.WidgetWrap):
self.history = history
self.update()
- def keypress(self, size, key):
+ def keypress(self, size, key) -> None:
if key == "backspace":
self.cbuf.backspace()
elif key == "left":
@@ -219,27 +208,29 @@ class CommandEdit(urwid.WidgetWrap):
self.cbuf = self.history.get_prev() or self.cbuf
elif key == "down":
self.cbuf = self.history.get_next() or self.cbuf
+ elif key == "shift tab":
+ self.cbuf.cycle_completion(False)
elif key == "tab":
self.cbuf.cycle_completion()
elif len(key) == 1:
self.cbuf.insert(key)
self.update()
- def update(self):
+ def update(self) -> None:
self._w.set_text([self.leader, self.cbuf.render()])
- def render(self, size, focus=False):
+ def render(self, size, focus=False) -> urwid.Canvas:
(maxcol,) = size
canv = self._w.render((maxcol,))
canv = urwid.CompositeCanvas(canv)
canv.cursor = self.get_cursor_coords((maxcol,))
return canv
- def get_cursor_coords(self, size):
+ def get_cursor_coords(self, size) -> typing.Tuple[int, int]:
p = self.cbuf.cursor + len(self.leader)
trans = self._w.get_line_translation(size[0])
x, y = calc_coords(self._w.get_text()[0], trans, p)
return x, y
- def get_edit_text(self):
+ def get_edit_text(self) -> str:
return self.cbuf.text
diff --git a/mitmproxy/tools/console/commands.py b/mitmproxy/tools/console/commands.py
index 0f35742b..26a99b14 100644
--- a/mitmproxy/tools/console/commands.py
+++ b/mitmproxy/tools/console/commands.py
@@ -1,6 +1,8 @@
import urwid
import blinker
import textwrap
+
+from mitmproxy import command
from mitmproxy.tools.console import layoutwidget
from mitmproxy.tools.console import signals
@@ -10,7 +12,7 @@ command_focus_change = blinker.Signal()
class CommandItem(urwid.WidgetWrap):
- def __init__(self, walker, cmd, focused):
+ def __init__(self, walker, cmd: command.Command, focused: bool):
self.walker, self.cmd, self.focused = walker, cmd, focused
super().__init__(None)
self._w = self.get_widget()
@@ -18,15 +20,18 @@ class CommandItem(urwid.WidgetWrap):
def get_widget(self):
parts = [
("focus", ">> " if self.focused else " "),
- ("title", self.cmd.path),
- ("text", " "),
- ("text", " ".join(self.cmd.paramnames())),
+ ("title", self.cmd.name)
]
- if self.cmd.returntype:
- parts.append([
+ if self.cmd.parameters:
+ parts += [
+ ("text", " "),
+ ("text", " ".join(str(param) for param in self.cmd.parameters)),
+ ]
+ if self.cmd.return_type:
+ parts += [
("title", " -> "),
- ("text", self.cmd.retname()),
- ])
+ ("text", command.typename(self.cmd.return_type)),
+ ]
return urwid.AttrMap(
urwid.Padding(urwid.Text(parts)),
@@ -92,7 +97,7 @@ class CommandsList(urwid.ListBox):
def keypress(self, size, key):
if key == "m_select":
foc, idx = self.get_focus()
- signals.status_prompt_command.send(partial=foc.cmd.path + " ")
+ signals.status_prompt_command.send(partial=foc.cmd.name + " ")
elif key == "m_start":
self.set_focus(0)
self.walker._modified()
diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py
index 9f595b42..7fcd9b48 100644
--- a/mitmproxy/tools/console/consoleaddons.py
+++ b/mitmproxy/tools/console/consoleaddons.py
@@ -1,21 +1,18 @@
import csv
-import shlex
import typing
+import mitmproxy.types
+from mitmproxy import command, command_lexer
+from mitmproxy import contentviews
from mitmproxy import ctx
-from mitmproxy import command
from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import log
-from mitmproxy import contentviews
-from mitmproxy.utils import strutils
-import mitmproxy.types
-
-
+from mitmproxy.tools.console import keymap
from mitmproxy.tools.console import overlay
from mitmproxy.tools.console import signals
-from mitmproxy.tools.console import keymap
+from mitmproxy.utils import strutils
console_palettes = [
"lowlight",
@@ -48,10 +45,12 @@ class UnsupportedLog:
"""
A small addon to dump info on flow types we don't support yet.
"""
+
def websocket_message(self, f):
message = f.messages[-1]
ctx.log.info(f.message_info(message))
- ctx.log.debug(message.content if isinstance(message.content, str) else strutils.bytes_to_escaped_str(message.content))
+ ctx.log.debug(
+ message.content if isinstance(message.content, str) else strutils.bytes_to_escaped_str(message.content))
def websocket_end(self, f):
ctx.log.info("WebSocket connection closed by {}: {} {}, {}".format(
@@ -78,6 +77,7 @@ class ConsoleAddon:
An addon that exposes console-specific commands, and hooks into required
events.
"""
+
def __init__(self, master):
self.master = master
self.started = False
@@ -86,7 +86,7 @@ class ConsoleAddon:
loader.add_option(
"console_default_contentview", str, "auto",
"The default content view mode.",
- choices = [i.name.lower() for i in contentviews.views]
+ choices=[i.name.lower() for i in contentviews.views]
)
loader.add_option(
"console_eventlog_verbosity", str, 'info',
@@ -142,7 +142,7 @@ class ConsoleAddon:
opts = self.layout_options()
off = self.layout_options().index(ctx.options.console_layout)
ctx.options.update(
- console_layout = opts[(off + 1) % len(opts)]
+ console_layout=opts[(off + 1) % len(opts)]
)
@command.command("console.panes.next")
@@ -234,17 +234,18 @@ class ConsoleAddon:
@command.command("console.choose")
def console_choose(
- self,
- prompt: str,
- choices: typing.Sequence[str],
- cmd: mitmproxy.types.Cmd,
- *args: mitmproxy.types.Arg
+ self,
+ prompt: str,
+ choices: typing.Sequence[str],
+ cmd: mitmproxy.types.Cmd,
+ *args: mitmproxy.types.CmdArgs
) -> None:
"""
Prompt the user to choose from a specified list of strings, then
invoke another command with all occurrences of {choice} replaced by
the choice the user made.
"""
+
def callback(opt):
# We're now outside of the call context...
repl = cmd + " " + " ".join(args)
@@ -260,22 +261,22 @@ class ConsoleAddon:
@command.command("console.choose.cmd")
def console_choose_cmd(
- self,
- prompt: str,
- choicecmd: mitmproxy.types.Cmd,
- subcmd: mitmproxy.types.Cmd,
- *args: mitmproxy.types.Arg
+ self,
+ prompt: str,
+ choicecmd: mitmproxy.types.Cmd,
+ subcmd: mitmproxy.types.Cmd,
+ *args: mitmproxy.types.CmdArgs
) -> None:
"""
Prompt the user to choose from a list of strings returned by a
command, then invoke another command with all occurrences of {choice}
replaced by the choice the user made.
"""
- choices = ctx.master.commands.call_strings(choicecmd, [])
+ choices = ctx.master.commands.execute(choicecmd)
def callback(opt):
# We're now outside of the call context...
- repl = shlex.quote(" ".join(args))
+ repl = " ".join(command_lexer.quote(x) for x in args)
repl = repl.replace("{choice}", opt)
try:
self.master.commands.execute(subcmd + " " + repl)
@@ -287,21 +288,24 @@ class ConsoleAddon:
)
@command.command("console.command")
- def console_command(self, *partial: str) -> None:
+ def console_command(self, *command_str: str) -> None:
"""
Prompt the user to edit a command with a (possibly empty) starting value.
"""
- signals.status_prompt_command.send(partial=" ".join(partial)) # type: ignore
+ quoted = " ".join(command_lexer.quote(x) for x in command_str)
+ signals.status_prompt_command.send(partial=quoted)
@command.command("console.command.set")
- def console_command_set(self, option: str) -> None:
+ def console_command_set(self, option_name: str) -> None:
"""
- Prompt the user to set an option of the form "key[=value]".
+ Prompt the user to set an option.
"""
- option_value = getattr(self.master.options, option, None)
- current_value = option_value if option_value else ""
- self.master.commands.execute(
- "console.command set %s=%s" % (option, current_value)
+ option_value = getattr(self.master.options, option_name, None) or ""
+ set_command = f"set {option_name} {option_value!r}"
+ cursor = len(set_command) - 1
+ signals.status_prompt_command.send(
+ partial=set_command,
+ cursor=cursor
)
@command.command("console.view.keybindings")
@@ -351,14 +355,14 @@ class ConsoleAddon:
@command.command("console.bodyview")
@command.argument("part", type=mitmproxy.types.Choice("console.bodyview.options"))
- def bodyview(self, f: flow.Flow, part: str) -> None:
+ def bodyview(self, flow: flow.Flow, part: str) -> None:
"""
Spawn an external viewer for a flow request or response body based
on the detected MIME type. We use the mailcap system to find the
correct viewier, and fall back to the programs in $PAGER or $EDITOR
if necessary.
"""
- fpart = getattr(f, part, None)
+ fpart = getattr(flow, part, None)
if not fpart:
raise exceptions.CommandError("Part must be either request or response, not %s." % part)
t = fpart.headers.get("content-type")
@@ -397,8 +401,8 @@ class ConsoleAddon:
]
@command.command("console.edit.focus")
- @command.argument("part", type=mitmproxy.types.Choice("console.edit.focus.options"))
- def edit_focus(self, part: str) -> None:
+ @command.argument("flow_part", type=mitmproxy.types.Choice("console.edit.focus.options"))
+ def edit_focus(self, flow_part: str) -> None:
"""
Edit a component of the currently focused flow.
"""
@@ -410,27 +414,27 @@ class ConsoleAddon:
flow.backup()
require_dummy_response = (
- part in ("response-headers", "response-body", "set-cookies") and
- flow.response is None
+ flow_part in ("response-headers", "response-body", "set-cookies") and
+ flow.response is None
)
if require_dummy_response:
flow.response = http.HTTPResponse.make()
- if part == "cookies":
+ if flow_part == "cookies":
self.master.switch_view("edit_focus_cookies")
- elif part == "urlencoded form":
+ elif flow_part == "urlencoded form":
self.master.switch_view("edit_focus_urlencoded_form")
- elif part == "multipart form":
+ elif flow_part == "multipart form":
self.master.switch_view("edit_focus_multipart_form")
- elif part == "path":
+ elif flow_part == "path":
self.master.switch_view("edit_focus_path")
- elif part == "query":
+ elif flow_part == "query":
self.master.switch_view("edit_focus_query")
- elif part == "request-headers":
+ elif flow_part == "request-headers":
self.master.switch_view("edit_focus_request_headers")
- elif part == "response-headers":
+ elif flow_part == "response-headers":
self.master.switch_view("edit_focus_response_headers")
- elif part in ("request-body", "response-body"):
- if part == "request-body":
+ elif flow_part in ("request-body", "response-body"):
+ if flow_part == "request-body":
message = flow.request
else:
message = flow.response
@@ -442,16 +446,16 @@ class ConsoleAddon:
# just strip the newlines off the end of the body when we return
# from an editor.
message.content = c.rstrip(b"\n")
- elif part == "set-cookies":
+ elif flow_part == "set-cookies":
self.master.switch_view("edit_focus_setcookies")
- elif part == "url":
+ elif flow_part == "url":
url = flow.request.url.encode()
edited_url = self.master.spawn_editor(url)
url = edited_url.rstrip(b"\n")
flow.request.url = url.decode()
- elif part in ["method", "status_code", "reason"]:
+ elif flow_part in ["method", "status_code", "reason"]:
self.master.commands.execute(
- "console.command flow.set @focus %s " % part
+ "console.command flow.set @focus %s " % flow_part
)
def _grideditor(self):
@@ -535,10 +539,8 @@ class ConsoleAddon:
raise exceptions.CommandError("Invalid flowview mode.")
try:
- self.master.commands.call_strings(
- "view.settings.setval",
- ["@focus", "flowview_mode_%s" % idx, mode]
- )
+ cmd = 'view.settings.setval @focus flowview_mode_%s %s' % (idx, mode)
+ self.master.commands.execute(cmd)
except exceptions.CommandError as e:
signals.status_message.send(message=str(e))
@@ -558,14 +560,9 @@ class ConsoleAddon:
if not fv:
raise exceptions.CommandError("Not viewing a flow.")
idx = fv.body.tab_offset
- return self.master.commands.call_strings(
- "view.settings.getval",
- [
- "@focus",
- "flowview_mode_%s" % idx,
- self.master.options.console_default_contentview,
- ]
- )
+
+ cmd = 'view.settings.getval @focus flowview_mode_%s %s' % (idx, self.master.options.console_default_contentview)
+ return self.master.commands.execute(cmd)
@command.command("console.key.contexts")
def key_contexts(self) -> typing.Sequence[str]:
@@ -576,11 +573,11 @@ class ConsoleAddon:
@command.command("console.key.bind")
def key_bind(
- self,
- contexts: typing.Sequence[str],
- key: str,
- cmd: mitmproxy.types.Cmd,
- *args: mitmproxy.types.Arg
+ self,
+ contexts: typing.Sequence[str],
+ key: str,
+ cmd: mitmproxy.types.Cmd,
+ *args: mitmproxy.types.CmdArgs
) -> None:
"""
Bind a shortcut key.
diff --git a/mitmproxy/tools/console/defaultkeys.py b/mitmproxy/tools/console/defaultkeys.py
index 0a6c5561..a0f27625 100644
--- a/mitmproxy/tools/console/defaultkeys.py
+++ b/mitmproxy/tools/console/defaultkeys.py
@@ -26,7 +26,7 @@ def map(km):
km.add("ctrl f", "console.nav.pagedown", ["global"], "Page down")
km.add("ctrl b", "console.nav.pageup", ["global"], "Page up")
- km.add("I", "set intercept_active=toggle", ["global"], "Toggle intercept")
+ km.add("I", "set intercept_active toggle", ["global"], "Toggle intercept")
km.add("i", "console.command.set intercept", ["global"], "Set intercept")
km.add("W", "console.command.set save_stream_file", ["global"], "Stream to file")
km.add("A", "flow.resume @all", ["flowlist", "flowview"], "Resume all intercepted flows")
@@ -48,14 +48,14 @@ def map(km):
"Export this flow to file"
)
km.add("f", "console.command.set view_filter", ["flowlist"], "Set view filter")
- km.add("F", "set console_focus_follow=toggle", ["flowlist"], "Set focus follow")
+ km.add("F", "set console_focus_follow toggle", ["flowlist"], "Set focus follow")
km.add(
"ctrl l",
"console.command cut.clip ",
["flowlist", "flowview"],
"Send cuts to clipboard"
)
- km.add("L", "console.command view.load ", ["flowlist"], "Load flows from file")
+ km.add("L", "console.command view.flows.load ", ["flowlist"], "Load flows from file")
km.add("m", "flow.mark.toggle @focus", ["flowlist"], "Toggle mark on this flow")
km.add("M", "view.properties.marked.toggle", ["flowlist"], "Toggle viewing marked flows")
km.add(
@@ -68,14 +68,14 @@ def map(km):
"o",
"""
console.choose.cmd Order view.order.options
- set view_order={choice}
+ set view_order {choice}
""",
["flowlist"],
"Set flow list order"
)
km.add("r", "replay.client @focus", ["flowlist", "flowview"], "Replay this flow")
km.add("S", "console.command replay.server ", ["flowlist"], "Start server replay")
- km.add("v", "set view_order_reversed=toggle", ["flowlist"], "Reverse flow list order")
+ km.add("v", "set view_order_reversed toggle", ["flowlist"], "Reverse flow list order")
km.add("U", "flow.mark @all false", ["flowlist"], "Un-set all marks")
km.add("w", "console.command save.file @shown ", ["flowlist"], "Save listed flows to file")
km.add("V", "flow.revert @focus", ["flowlist", "flowview"], "Revert changes to this flow")
diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py
index 56f0674f..43f5170d 100644
--- a/mitmproxy/tools/console/statusbar.py
+++ b/mitmproxy/tools/console/statusbar.py
@@ -1,4 +1,5 @@
import os.path
+from typing import Optional
import urwid
@@ -98,10 +99,15 @@ class ActionBar(urwid.WidgetWrap):
self._w = urwid.Edit(self.prep_prompt(prompt), text or "")
self.prompting = PromptStub(callback, args)
- def sig_prompt_command(self, sender, partial=""):
+ def sig_prompt_command(self, sender, partial: str = "", cursor: Optional[int] = None):
signals.focus.send(self, section="footer")
- self._w = commander.CommandEdit(self.master, partial,
- self.command_history)
+ self._w = commander.CommandEdit(
+ self.master,
+ partial,
+ self.command_history,
+ )
+ if cursor is not None:
+ self._w.cbuf.cursor = cursor
self.prompting = commandexecutor.CommandExecutor(self.master)
def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()):
diff --git a/mitmproxy/types.py b/mitmproxy/types.py
index 0634e4d7..ac992217 100644
--- a/mitmproxy/types.py
+++ b/mitmproxy/types.py
@@ -5,6 +5,9 @@ import typing
from mitmproxy import exceptions
from mitmproxy import flow
+if typing.TYPE_CHECKING: # pragma: no cover
+ from mitmproxy.command import CommandManager
+
class Path(str):
pass
@@ -14,7 +17,7 @@ class Cmd(str):
pass
-class Arg(str):
+class CmdArgs(str):
pass
@@ -22,6 +25,10 @@ class Unknown(str):
pass
+class Space(str):
+ pass
+
+
class CutSpec(typing.Sequence[str]):
pass
@@ -40,27 +47,11 @@ class Choice:
return False
-# One of the many charming things about mypy is that introducing type
-# annotations can cause circular dependencies where there were none before.
-# Rather than putting types and the CommandManger in the same file, we introduce
-# a stub type with the signature we use.
-class _CommandBase:
- commands: typing.MutableMapping[str, typing.Any] = {}
-
- def call_strings(self, path: str, args: typing.Sequence[str]) -> typing.Any:
- raise NotImplementedError
-
- def execute(self, cmd: str) -> typing.Any:
- raise NotImplementedError
-
-
class _BaseType:
typ: typing.Type = object
display: str = ""
- def completion(
- self, manager: _CommandBase, t: typing.Any, s: str
- ) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: typing.Any, s: str) -> typing.Sequence[str]:
"""
Returns a list of completion strings for a given prefix. The strings
returned don't necessarily need to be suffixes of the prefix, since
@@ -68,9 +59,7 @@ class _BaseType:
"""
raise NotImplementedError
- def parse(
- self, manager: _CommandBase, typ: typing.Any, s: str
- ) -> typing.Any:
+ def parse(self, manager: "CommandManager", typ: typing.Any, s: str) -> typing.Any:
"""
Parse a string, given the specific type instance (to allow rich type annotations like Choice) and a string.
@@ -78,7 +67,7 @@ class _BaseType:
"""
raise NotImplementedError
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
"""
Check if data is valid for this type.
"""
@@ -89,10 +78,10 @@ class _BoolType(_BaseType):
typ = bool
display = "bool"
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return ["false", "true"]
- def parse(self, manager: _CommandBase, t: type, s: str) -> bool:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> bool:
if s == "true":
return True
elif s == "false":
@@ -102,7 +91,7 @@ class _BoolType(_BaseType):
"Booleans are 'true' or 'false', got %s" % s
)
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return val in [True, False]
@@ -110,13 +99,13 @@ class _StrType(_BaseType):
typ = str
display = "str"
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return []
- def parse(self, manager: _CommandBase, t: type, s: str) -> str:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> str:
return s
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return isinstance(val, str)
@@ -124,13 +113,13 @@ class _UnknownType(_BaseType):
typ = Unknown
display = "unknown"
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return []
- def parse(self, manager: _CommandBase, t: type, s: str) -> str:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> str:
return s
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return False
@@ -138,16 +127,16 @@ class _IntType(_BaseType):
typ = int
display = "int"
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return []
- def parse(self, manager: _CommandBase, t: type, s: str) -> int:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> int:
try:
return int(s)
except ValueError as e:
raise exceptions.TypeError from e
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return isinstance(val, int)
@@ -155,7 +144,7 @@ class _PathType(_BaseType):
typ = Path
display = "path"
- def completion(self, manager: _CommandBase, t: type, start: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, start: str) -> typing.Sequence[str]:
if not start:
start = "./"
path = os.path.expanduser(start)
@@ -177,10 +166,10 @@ class _PathType(_BaseType):
ret.sort()
return ret
- def parse(self, manager: _CommandBase, t: type, s: str) -> str:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> str:
return os.path.expanduser(s)
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return isinstance(val, str)
@@ -188,43 +177,43 @@ class _CmdType(_BaseType):
typ = Cmd
display = "cmd"
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return list(manager.commands.keys())
- def parse(self, manager: _CommandBase, t: type, s: str) -> str:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> str:
if s not in manager.commands:
raise exceptions.TypeError("Unknown command: %s" % s)
return s
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return val in manager.commands
class _ArgType(_BaseType):
- typ = Arg
+ typ = CmdArgs
display = "arg"
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return []
- def parse(self, manager: _CommandBase, t: type, s: str) -> str:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> str:
return s
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return isinstance(val, str)
class _StrSeqType(_BaseType):
typ = typing.Sequence[str]
- display = "[str]"
+ display = "str[]"
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return []
- def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return [x.strip() for x in s.split(",")]
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
if isinstance(val, str) or isinstance(val, bytes):
return False
try:
@@ -238,7 +227,7 @@ class _StrSeqType(_BaseType):
class _CutSpecType(_BaseType):
typ = CutSpec
- display = "[cut]"
+ display = "cut[]"
valid_prefixes = [
"request.method",
"request.scheme",
@@ -277,7 +266,7 @@ class _CutSpecType(_BaseType):
"server_conn.tls_established",
]
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
spec = s.split(",")
opts = []
for pref in self.valid_prefixes:
@@ -285,11 +274,11 @@ class _CutSpecType(_BaseType):
opts.append(",".join(spec))
return opts
- def parse(self, manager: _CommandBase, t: type, s: str) -> CutSpec:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> CutSpec:
parts: typing.Any = s.split(",")
return parts
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
if not isinstance(val, str):
return False
parts = [x.strip() for x in val.split(",")]
@@ -327,7 +316,7 @@ class _BaseFlowType(_BaseType):
"~c",
]
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return self.valid_prefixes
@@ -335,9 +324,9 @@ class _FlowType(_BaseFlowType):
typ = flow.Flow
display = "flow"
- def parse(self, manager: _CommandBase, t: type, s: str) -> flow.Flow:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> flow.Flow:
try:
- flows = manager.call_strings("view.flows.resolve", [s])
+ flows = manager.execute("view.flows.resolve %s" % (s))
except exceptions.CommandError as e:
raise exceptions.TypeError from e
if len(flows) != 1:
@@ -346,21 +335,21 @@ class _FlowType(_BaseFlowType):
)
return flows[0]
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return isinstance(val, flow.Flow)
class _FlowsType(_BaseFlowType):
typ = typing.Sequence[flow.Flow]
- display = "[flow]"
+ display = "flow[]"
- def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[flow.Flow]:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[flow.Flow]:
try:
- return manager.call_strings("view.flows.resolve", [s])
+ return manager.execute("view.flows.resolve %s" % (s))
except exceptions.CommandError as e:
raise exceptions.TypeError from e
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
try:
for v in val:
if not isinstance(v, flow.Flow):
@@ -372,19 +361,19 @@ class _FlowsType(_BaseFlowType):
class _DataType(_BaseType):
typ = Data
- display = "[data]"
+ display = "data[][]"
def completion(
- self, manager: _CommandBase, t: type, s: str
+ self, manager: "CommandManager", t: type, s: str
) -> typing.Sequence[str]: # pragma: no cover
raise exceptions.TypeError("data cannot be passed as argument")
def parse(
- self, manager: _CommandBase, t: type, s: str
+ self, manager: "CommandManager", t: type, s: str
) -> typing.Any: # pragma: no cover
raise exceptions.TypeError("data cannot be passed as argument")
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
# FIXME: validate that all rows have equal length, and all columns have equal types
try:
for row in val:
@@ -400,16 +389,16 @@ class _ChoiceType(_BaseType):
typ = Choice
display = "choice"
- def completion(self, manager: _CommandBase, t: Choice, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: Choice, s: str) -> typing.Sequence[str]:
return manager.execute(t.options_command)
- def parse(self, manager: _CommandBase, t: Choice, s: str) -> str:
+ def parse(self, manager: "CommandManager", t: Choice, s: str) -> str:
opts = manager.execute(t.options_command)
if s not in opts:
raise exceptions.TypeError("Invalid choice.")
return s
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
try:
opts = manager.execute(typ.options_command)
except exceptions.CommandError:
@@ -423,7 +412,7 @@ class TypeManager:
for t in types:
self.typemap[t.typ] = t()
- def get(self, t: typing.Optional[typing.Type], default=None) -> _BaseType:
+ def get(self, t: typing.Optional[typing.Type], default=None) -> typing.Optional[_BaseType]:
if type(t) in self.typemap:
return self.typemap[type(t)]
return self.typemap.get(t, default)
diff --git a/setup.cfg b/setup.cfg
index a2c49f48..d0dcc2df 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -18,6 +18,8 @@ show_missing = True
exclude_lines =
pragma: no cover
raise NotImplementedError()
+ if typing.TYPE_CHECKING:
+ if TYPE_CHECKING:
[mypy]
ignore_missing_imports = True
diff --git a/test/mitmproxy/addons/test_core.py b/test/mitmproxy/addons/test_core.py
index 59875c2b..e6924ead 100644
--- a/test/mitmproxy/addons/test_core.py
+++ b/test/mitmproxy/addons/test_core.py
@@ -11,7 +11,7 @@ def test_set():
sa = core.Core()
with taddons.context(loadcore=False) as tctx:
assert tctx.master.options.server
- tctx.command(sa.set, "server=false")
+ tctx.command(sa.set, "server", "false")
assert not tctx.master.options.server
with pytest.raises(exceptions.CommandError):
diff --git a/test/mitmproxy/addons/test_save.py b/test/mitmproxy/addons/test_save.py
index 4aa1f648..6727a96f 100644
--- a/test/mitmproxy/addons/test_save.py
+++ b/test/mitmproxy/addons/test_save.py
@@ -73,7 +73,7 @@ def test_save_command(tmpdir):
v = view.View()
tctx.master.addons.add(v)
tctx.master.addons.add(sa)
- tctx.master.commands.call_strings("save.file", ["@shown", p])
+ tctx.master.commands.execute("save.file @shown %s" % p)
def test_simple(tmpdir):
diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py
index d9dcf5f9..a432f9e3 100644
--- a/test/mitmproxy/test_command.py
+++ b/test/mitmproxy/test_command.py
@@ -1,13 +1,15 @@
-import typing
import inspect
+import io
+import typing
+
+import pytest
+
+import mitmproxy.types
from mitmproxy import command
-from mitmproxy import flow
from mitmproxy import exceptions
-from mitmproxy.test import tflow
+from mitmproxy import flow
from mitmproxy.test import taddons
-import mitmproxy.types
-import io
-import pytest
+from mitmproxy.test import tflow
class TAddon:
@@ -29,7 +31,7 @@ class TAddon:
return "ok"
@command.command("subcommand")
- def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.Arg) -> str:
+ def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.CmdArgs) -> str:
return "ok"
@command.command("empty")
@@ -83,17 +85,15 @@ class TestCommand:
with pytest.raises(exceptions.CommandError):
command.Command(cm, "invalidret", a.invalidret)
with pytest.raises(exceptions.CommandError):
- command.Command(cm, "invalidarg", a.invalidarg)
+ assert command.Command(cm, "invalidarg", a.invalidarg)
def test_varargs(self):
with taddons.context() as tctx:
cm = command.CommandManager(tctx.master)
a = TAddon()
c = command.Command(cm, "varargs", a.varargs)
- assert c.signature_help() == "varargs str *str -> [str]"
+ assert c.signature_help() == "varargs one *var -> str[]"
assert c.call(["one", "two", "three"]) == ["two", "three"]
- with pytest.raises(exceptions.CommandError):
- c.call(["one", "two", 3])
def test_call(self):
with taddons.context() as tctx:
@@ -101,7 +101,7 @@ class TestCommand:
a = TAddon()
c = command.Command(cm, "cmd.path", a.cmd1)
assert c.call(["foo"]) == "ret foo"
- assert c.signature_help() == "cmd.path str -> str"
+ assert c.signature_help() == "cmd.path foo -> str"
c = command.Command(cm, "cmd.two", a.cmd2)
with pytest.raises(exceptions.CommandError):
@@ -115,154 +115,305 @@ class TestCommand:
[
"foo bar",
[
- command.ParseResult(
- value = "foo", type = mitmproxy.types.Cmd, valid = False
- ),
- command.ParseResult(
- value = "bar", type = mitmproxy.types.Unknown, valid = False
- )
+ command.ParseResult(value="foo", type=mitmproxy.types.Cmd, valid=False),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="bar", type=mitmproxy.types.Unknown, valid=False)
],
[],
],
[
"cmd1 'bar",
[
- command.ParseResult(value = "cmd1", type = mitmproxy.types.Cmd, valid = True),
- command.ParseResult(value = "'bar", type = str, valid = True)
+ command.ParseResult(value="cmd1", type=mitmproxy.types.Cmd, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="'bar", type=str, valid=True)
],
[],
],
[
"a",
- [command.ParseResult(value = "a", type = mitmproxy.types.Cmd, valid = False)],
+ [command.ParseResult(value="a", type=mitmproxy.types.Cmd, valid=False)],
[],
],
[
"",
- [command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = False)],
- []
+ [],
+ [
+ command.CommandParameter("", mitmproxy.types.Cmd),
+ command.CommandParameter("", mitmproxy.types.CmdArgs)
+ ]
],
[
"cmd3 1",
[
- command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True),
- command.ParseResult(value = "1", type = int, valid = True),
+ command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="1", type=int, valid=True),
],
[]
],
[
"cmd3 ",
[
- command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True),
- command.ParseResult(value = "", type = int, valid = False),
+ command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
- []
+ [command.CommandParameter('foo', int)]
],
[
"subcommand ",
[
- command.ParseResult(
- value = "subcommand", type = mitmproxy.types.Cmd, valid = True,
- ),
- command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = False),
+ command.ParseResult(value="subcommand", type=mitmproxy.types.Cmd, valid=True, ),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ ],
+ [
+ command.CommandParameter('cmd', mitmproxy.types.Cmd),
+ command.CommandParameter('args', mitmproxy.types.CmdArgs, kind=inspect.Parameter.VAR_POSITIONAL),
],
- ["arg"],
],
[
- "subcommand cmd3 ",
+ "varargs one",
[
- command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd, valid = True),
- command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True),
- command.ParseResult(value = "", type = int, valid = False),
+ command.ParseResult(value="varargs", type=mitmproxy.types.Cmd, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="one", type=str, valid=True),
],
- []
+ [command.CommandParameter('var', str, kind=inspect.Parameter.VAR_POSITIONAL)]
],
[
- "cmd4",
+ "varargs one two three",
[
- command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True),
+ command.ParseResult(value="varargs", type=mitmproxy.types.Cmd, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="one", type=str, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="two", type=str, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="three", type=str, valid=True),
],
- ["int", "str", "path"]
+ [],
],
[
- "cmd4 ",
+ "subcommand cmd3 ",
[
- command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True),
- command.ParseResult(value = "", type = int, valid = False),
+ command.ParseResult(value="subcommand", type=mitmproxy.types.Cmd, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
- ["str", "path"]
+ [command.CommandParameter('foo', int)]
],
[
- "cmd4 1",
+ "cmd4",
[
- command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True),
- command.ParseResult(value = "1", type = int, valid = True),
+ command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True),
],
- ["str", "path"]
+ [
+ command.CommandParameter('a', int),
+ command.CommandParameter('b', str),
+ command.CommandParameter('c', mitmproxy.types.Path),
+ ]
+ ],
+ [
+ "cmd4 ",
+ [
+ command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ ],
+ [
+ command.CommandParameter('a', int),
+ command.CommandParameter('b', str),
+ command.CommandParameter('c', mitmproxy.types.Path),
+ ]
],
[
"cmd4 1",
[
- command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True),
- command.ParseResult(value = "1", type = int, valid = True),
+ command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="1", type=int, valid=True),
],
- ["str", "path"]
+ [
+ command.CommandParameter('b', str),
+ command.CommandParameter('c', mitmproxy.types.Path),
+ ]
],
[
"flow",
[
- command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
+ command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
],
- ["flow", "str"]
+ [
+ command.CommandParameter('f', flow.Flow),
+ command.CommandParameter('s', str),
+ ]
],
[
"flow ",
[
- command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
- command.ParseResult(value = "", type = flow.Flow, valid = False),
+ command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
- ["str"]
+ [
+ command.CommandParameter('f', flow.Flow),
+ command.CommandParameter('s', str),
+ ]
],
[
"flow x",
[
- command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
- command.ParseResult(value = "x", type = flow.Flow, valid = False),
+ command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="x", type=flow.Flow, valid=False),
],
- ["str"]
+ [
+ command.CommandParameter('s', str),
+ ]
],
[
"flow x ",
[
- command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
- command.ParseResult(value = "x", type = flow.Flow, valid = False),
- command.ParseResult(value = "", type = str, valid = True),
+ command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="x", type=flow.Flow, valid=False),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
],
- []
+ [
+ command.CommandParameter('s', str),
+ ]
],
[
"flow \"one two",
[
- command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
- command.ParseResult(value = "\"one two", type = flow.Flow, valid = False),
+ command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="\"one two", type=flow.Flow, valid=False),
],
- ["str"]
+ [
+ command.CommandParameter('s', str),
+ ]
],
[
- "flow \"one two\"",
+ "flow \"three four\"",
[
- command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True),
- command.ParseResult(value = "one two", type = flow.Flow, valid = False),
+ command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value='"three four"', type=flow.Flow, valid=False),
],
- ["str"]
+ [
+ command.CommandParameter('s', str),
+ ]
],
+ [
+ "spaces ' '",
+ [
+ command.ParseResult(value="spaces", type=mitmproxy.types.Cmd, valid=False),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="' '", type=mitmproxy.types.Unknown, valid=False)
+ ],
+ [],
+ ],
+ [
+ 'spaces2 " "',
+ [
+ command.ParseResult(value="spaces2", type=mitmproxy.types.Cmd, valid=False),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value='" "', type=mitmproxy.types.Unknown, valid=False)
+ ],
+ [],
+ ],
+ [
+ '"abc"',
+ [
+ command.ParseResult(value='"abc"', type=mitmproxy.types.Cmd, valid=False),
+ ],
+ [],
+ ],
+ [
+ "'def'",
+ [
+ command.ParseResult(value="'def'", type=mitmproxy.types.Cmd, valid=False),
+ ],
+ [],
+ ],
+ [
+ "cmd10 'a' \"b\" c",
+ [
+ command.ParseResult(value="cmd10", type=mitmproxy.types.Cmd, valid=False),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="'a'", type=mitmproxy.types.Unknown, valid=False),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value='"b"', type=mitmproxy.types.Unknown, valid=False),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="c", type=mitmproxy.types.Unknown, valid=False),
+ ],
+ [],
+ ],
+ [
+ "cmd11 'a \"b\" c'",
+ [
+ command.ParseResult(value="cmd11", type=mitmproxy.types.Cmd, valid=False),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="'a \"b\" c'", type=mitmproxy.types.Unknown, valid=False),
+ ],
+ [],
+ ],
+ [
+ 'cmd12 "a \'b\' c"',
+ [
+ command.ParseResult(value="cmd12", type=mitmproxy.types.Cmd, valid=False),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value='"a \'b\' c"', type=mitmproxy.types.Unknown, valid=False),
+ ],
+ [],
+ ],
+ [
+ r'cmd13 "a \"b\" c"',
+ [
+ command.ParseResult(value="cmd13", type=mitmproxy.types.Cmd, valid=False),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value=r'"a \"b\" c"', type=mitmproxy.types.Unknown, valid=False),
+ ],
+ [],
+ ],
+ [
+ r"cmd14 'a \'b\' c'",
+ [
+ command.ParseResult(value="cmd14", type=mitmproxy.types.Cmd, valid=False),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value=r"'a \'b\' c'", type=mitmproxy.types.Unknown, valid=False),
+ ],
+ [],
+ ],
+ [
+ " spaces_at_the_begining_are_not_stripped",
+ [
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="spaces_at_the_begining_are_not_stripped", type=mitmproxy.types.Cmd,
+ valid=False),
+ ],
+ [],
+ ],
+ [
+ " spaces_at_the_begining_are_not_stripped neither_at_the_end ",
+ [
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="spaces_at_the_begining_are_not_stripped", type=mitmproxy.types.Cmd,
+ valid=False),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ command.ParseResult(value="neither_at_the_end", type=mitmproxy.types.Unknown, valid=False),
+ command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),
+ ],
+ [],
+ ],
+
]
+
with taddons.context() as tctx:
tctx.master.addons.add(TAddon())
for s, expected, expectedremain in tests:
current, remain = tctx.master.commands.parse_partial(s)
- assert current == expected
- assert expectedremain == remain
+ assert (s, current, expectedremain) == (s, expected, remain)
def test_simple():
@@ -270,9 +421,11 @@ def test_simple():
c = command.CommandManager(tctx.master)
a = TAddon()
c.add("one.two", a.cmd1)
- assert c.commands["one.two"].help == "cmd1 help"
- assert(c.execute("one.two foo") == "ret foo")
- assert(c.call("one.two", "foo") == "ret foo")
+ assert (c.commands["one.two"].help == "cmd1 help")
+ assert (c.execute("one.two foo") == "ret foo")
+ assert (c.execute("one.two \"foo\"") == "ret foo")
+ assert (c.execute("one.two 'foo bar'") == "ret foo bar")
+ assert (c.call("one.two", "foo") == "ret foo")
with pytest.raises(exceptions.CommandError, match="Unknown"):
c.execute("nonexistent")
with pytest.raises(exceptions.CommandError, match="Invalid"):
@@ -281,8 +434,14 @@ def test_simple():
c.execute("one.two too many args")
with pytest.raises(exceptions.CommandError, match="Unknown"):
c.call("nonexistent")
- with pytest.raises(exceptions.CommandError, match="No escaped"):
+ with pytest.raises(exceptions.CommandError, match="Unknown"):
c.execute("\\")
+ with pytest.raises(exceptions.CommandError, match="Unknown"):
+ c.execute(r"\'")
+ with pytest.raises(exceptions.CommandError, match="Unknown"):
+ c.execute(r"\"")
+ with pytest.raises(exceptions.CommandError, match="Unknown"):
+ c.execute(r"\"")
c.add("empty", a.empty)
c.execute("empty")
@@ -294,13 +453,13 @@ def test_simple():
def test_typename():
assert command.typename(str) == "str"
- assert command.typename(typing.Sequence[flow.Flow]) == "[flow]"
+ assert command.typename(typing.Sequence[flow.Flow]) == "flow[]"
- assert command.typename(mitmproxy.types.Data) == "[data]"
- assert command.typename(mitmproxy.types.CutSpec) == "[cut]"
+ assert command.typename(mitmproxy.types.Data) == "data[][]"
+ assert command.typename(mitmproxy.types.CutSpec) == "cut[]"
assert command.typename(flow.Flow) == "flow"
- assert command.typename(typing.Sequence[str]) == "[str]"
+ assert command.typename(typing.Sequence[str]) == "str[]"
assert command.typename(mitmproxy.types.Choice("foo")) == "choice"
assert command.typename(mitmproxy.types.Path) == "path"
diff --git a/test/mitmproxy/test_command_lexer.py b/test/mitmproxy/test_command_lexer.py
new file mode 100644
index 00000000..3f009f88
--- /dev/null
+++ b/test/mitmproxy/test_command_lexer.py
@@ -0,0 +1,38 @@
+import pyparsing
+import pytest
+
+from mitmproxy import command_lexer
+
+
+@pytest.mark.parametrize(
+ "test_input,valid", [
+ ("'foo'", True),
+ ('"foo"', True),
+ ("'foo' bar'", False),
+ ("'foo\\' bar'", True),
+ ("'foo' 'bar'", False),
+ ("'foo'x", False),
+ ('''"foo ''', True),
+ ('''"foo 'bar' ''', True),
+ ]
+)
+def test_partial_quoted_string(test_input, valid):
+ if valid:
+ assert command_lexer.PartialQuotedString.parseString(test_input, parseAll=True)[0] == test_input
+ else:
+ with pytest.raises(pyparsing.ParseException):
+ command_lexer.PartialQuotedString.parseString(test_input, parseAll=True)
+
+
+@pytest.mark.parametrize(
+ "test_input,expected", [
+ ("'foo'", ["'foo'"]),
+ ('"foo"', ['"foo"']),
+ ("'foo' 'bar'", ["'foo'", ' ', "'bar'"]),
+ ("'foo'x", ["'foo'", 'x']),
+ ('''"foo''', ['"foo']),
+ ('''"foo 'bar' ''', ['''"foo 'bar' ''']),
+ ]
+)
+def test_expr(test_input, expected):
+ assert list(command_lexer.expr.parseString(test_input, parseAll=True)) == expected
diff --git a/test/mitmproxy/test_types.py b/test/mitmproxy/test_types.py
index 571985fb..2cd17d87 100644
--- a/test/mitmproxy/test_types.py
+++ b/test/mitmproxy/test_types.py
@@ -2,7 +2,6 @@ import pytest
import os
import typing
import contextlib
-from unittest import mock
import mitmproxy.exceptions
import mitmproxy.types
@@ -64,13 +63,14 @@ def test_int():
b.parse(tctx.master.commands, int, "foo")
-def test_path(tdata):
+def test_path(tdata, monkeypatch):
with taddons.context() as tctx:
b = mitmproxy.types._PathType()
assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/foo") == "/foo"
assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/bar") == "/bar"
- with mock.patch.dict("os.environ", {"HOME": "/home/test"}):
- assert b.parse(tctx.master.commands, mitmproxy.types.Path, "~/mitm") == "/home/test/mitm"
+ monkeypatch.setenv("HOME", "/home/test")
+ monkeypatch.setenv("USERPROFILE", "/home/test")
+ assert b.parse(tctx.master.commands, mitmproxy.types.Path, "~/mitm") == "/home/test/mitm"
assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, "foo") is True
assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, "~/mitm") is True
assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, 3) is False
@@ -127,10 +127,9 @@ def test_cutspec():
def test_arg():
with taddons.context() as tctx:
b = mitmproxy.types._ArgType()
- assert b.completion(tctx.master.commands, mitmproxy.types.Arg, "") == []
- assert b.parse(tctx.master.commands, mitmproxy.types.Arg, "foo") == "foo"
- assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, "foo") is True
- assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, 1) is False
+ assert b.completion(tctx.master.commands, mitmproxy.types.CmdArgs, "") == []
+ assert b.parse(tctx.master.commands, mitmproxy.types.CmdArgs, "foo") == "foo"
+ assert b.is_valid(tctx.master.commands, mitmproxy.types.CmdArgs, 1) is False
def test_strseq():
diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py
index b5e226fe..a77be043 100644
--- a/test/mitmproxy/tools/console/test_commander.py
+++ b/test/mitmproxy/tools/console/test_commander.py
@@ -1,6 +1,7 @@
+import pytest
-from mitmproxy.tools.console.commander import commander
from mitmproxy.test import taddons
+from mitmproxy.tools.console.commander import commander
class TestListCompleter:
@@ -28,6 +29,112 @@ class TestListCompleter:
assert c.cycle() == expected
+class TestCommandEdit:
+ def test_open_command_bar(self):
+ with taddons.context() as tctx:
+ history = commander.CommandHistory(tctx.master, size=3)
+ edit = commander.CommandEdit(tctx.master, '', history)
+
+ try:
+ edit.update()
+ except IndexError:
+ pytest.faied("Unexpected IndexError")
+
+ def test_insert(self):
+ with taddons.context() as tctx:
+ history = commander.CommandHistory(tctx.master, size=3)
+ edit = commander.CommandEdit(tctx.master, '', history)
+ edit.keypress(1, 'a')
+ assert edit.get_edit_text() == 'a'
+
+ # Don't let users type a space before starting a command
+ # as a usability feature
+ history = commander.CommandHistory(tctx.master, size=3)
+ edit = commander.CommandEdit(tctx.master, '', history)
+ edit.keypress(1, ' ')
+ assert edit.get_edit_text() == ''
+
+ def test_backspace(self):
+ with taddons.context() as tctx:
+ history = commander.CommandHistory(tctx.master, size=3)
+ edit = commander.CommandEdit(tctx.master, '', history)
+ edit.keypress(1, 'a')
+ edit.keypress(1, 'b')
+ assert edit.get_edit_text() == 'ab'
+ edit.keypress(1, 'backspace')
+ assert edit.get_edit_text() == 'a'
+
+ def test_left(self):
+ with taddons.context() as tctx:
+ history = commander.CommandHistory(tctx.master, size=3)
+ edit = commander.CommandEdit(tctx.master, '', history)
+ edit.keypress(1, 'a')
+ assert edit.cbuf.cursor == 1
+ edit.keypress(1, 'left')
+ assert edit.cbuf.cursor == 0
+
+ # Do it again to make sure it won't go negative
+ edit.keypress(1, 'left')
+ assert edit.cbuf.cursor == 0
+
+ def test_right(self):
+ with taddons.context() as tctx:
+ history = commander.CommandHistory(tctx.master, size=3)
+ edit = commander.CommandEdit(tctx.master, '', history)
+ edit.keypress(1, 'a')
+ assert edit.cbuf.cursor == 1
+
+ # Make sure cursor won't go past the text
+ edit.keypress(1, 'right')
+ assert edit.cbuf.cursor == 1
+
+ # Make sure cursor goes left and then back right
+ edit.keypress(1, 'left')
+ assert edit.cbuf.cursor == 0
+ edit.keypress(1, 'right')
+ assert edit.cbuf.cursor == 1
+
+ def test_up_and_down(self):
+ with taddons.context() as tctx:
+ history = commander.CommandHistory(tctx.master, size=3)
+ edit = commander.CommandEdit(tctx.master, '', history)
+
+ buf = commander.CommandBuffer(tctx.master, 'cmd1')
+ history.add_command(buf)
+ buf = commander.CommandBuffer(tctx.master, 'cmd2')
+ history.add_command(buf)
+
+ edit.keypress(1, 'up')
+ assert edit.get_edit_text() == 'cmd2'
+ edit.keypress(1, 'up')
+ assert edit.get_edit_text() == 'cmd1'
+ edit.keypress(1, 'up')
+ assert edit.get_edit_text() == 'cmd1'
+
+ history = commander.CommandHistory(tctx.master, size=5)
+ edit = commander.CommandEdit(tctx.master, '', history)
+ edit.keypress(1, 'a')
+ edit.keypress(1, 'b')
+ edit.keypress(1, 'c')
+ assert edit.get_edit_text() == 'abc'
+ edit.keypress(1, 'up')
+ assert edit.get_edit_text() == ''
+ edit.keypress(1, 'down')
+ assert edit.get_edit_text() == 'abc'
+ edit.keypress(1, 'down')
+ assert edit.get_edit_text() == 'abc'
+
+ history = commander.CommandHistory(tctx.master, size=5)
+ edit = commander.CommandEdit(tctx.master, '', history)
+ buf = commander.CommandBuffer(tctx.master, 'cmd3')
+ history.add_command(buf)
+ edit.keypress(1, 'z')
+ edit.keypress(1, 'up')
+ assert edit.get_edit_text() == 'cmd3'
+ edit.keypress(1, 'down')
+ assert edit.get_edit_text() == 'z'
+
+
class TestCommandHistory:
def fill_history(self, commands):
with taddons.context() as tctx:
@@ -148,13 +255,39 @@ class TestCommandBuffer:
cb.cursor = len(cb.text)
cb.cycle_completion()
+ ch = commander.CommandHistory(tctx.master, 30)
+ ce = commander.CommandEdit(tctx.master, "se", ch)
+ ce.keypress(1, 'tab')
+ ce.update()
+ ret = ce.cbuf.render()
+ assert ret == [
+ ('commander_command', 'set'),
+ ('text', ' '),
+ ('commander_hint', 'option '),
+ ('commander_hint', 'value '),
+ ]
+
def test_render(self):
with taddons.context() as tctx:
cb = commander.CommandBuffer(tctx.master)
cb.text = "foo"
assert cb.render()
- def test_flatten(self):
- with taddons.context() as tctx:
- cb = commander.CommandBuffer(tctx.master)
- assert cb.flatten("foo bar") == "foo bar"
+ cb.text = "set view_filter '~bq test'"
+ ret = cb.render()
+ assert ret == [
+ ('commander_command', 'set'),
+ ('text', ' '),
+ ('text', 'view_filter'),
+ ('text', ' '),
+ ('text', "'~bq test'"),
+ ]
+
+ cb.text = "set"
+ ret = cb.render()
+ assert ret == [
+ ('commander_command', 'set'),
+ ('text', ' '),
+ ('commander_hint', 'option '),
+ ('commander_hint', 'value '),
+ ]
diff --git a/test/mitmproxy/tools/console/test_defaultkeys.py b/test/mitmproxy/tools/console/test_defaultkeys.py
index 52075c84..58a0a585 100644
--- a/test/mitmproxy/tools/console/test_defaultkeys.py
+++ b/test/mitmproxy/tools/console/test_defaultkeys.py
@@ -1,14 +1,18 @@
+import pytest
+
+import mitmproxy.types
+from mitmproxy import command
+from mitmproxy import ctx
from mitmproxy.test.tflow import tflow
from mitmproxy.tools.console import defaultkeys
from mitmproxy.tools.console import keymap
from mitmproxy.tools.console import master
-from mitmproxy import command
-
-import pytest
@pytest.mark.asyncio
async def test_commands_exist():
+ command_manager = command.CommandManager(ctx)
+
km = keymap.Keymap(None)
defaultkeys.map(km)
assert km.bindings
@@ -16,7 +20,14 @@ async def test_commands_exist():
await m.load_flow(tflow())
for binding in km.bindings:
- cmd, *args = command.lexer(binding.command)
+ parsed, _ = command_manager.parse_partial(binding.command.strip())
+
+ cmd = parsed[0].value
+ args = [
+ a.value for a in parsed[1:]
+ if a.type != mitmproxy.types.Space
+ ]
+
assert cmd in m.commands.commands
cmd_obj = m.commands.commands[cmd]