aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/command.py
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2019-11-18 02:55:51 +0100
committerMaximilian Hils <git@maximilianhils.com>2019-11-18 03:05:41 +0100
commitcb723c53fab93dae67f68b4414a35c8bec7fc144 (patch)
treea6cf5923a8e6c04d68ab634e23e42727351c9501 /mitmproxy/command.py
parent8a6370f1c2ae55fb03f606cb055de455ffe25eec (diff)
downloadmitmproxy-cb723c53fab93dae67f68b4414a35c8bec7fc144.tar.gz
mitmproxy-cb723c53fab93dae67f68b4414a35c8bec7fc144.tar.bz2
mitmproxy-cb723c53fab93dae67f68b4414a35c8bec7fc144.zip
revamp command processing
- Display the parameter name instead of the parameter type whenver users interact with commands. This makes it easy to enter commands just by their signature. We may want to expose type information in the command list, but some quick testing showed that this are rather intuitive anyways. - Add shift tab backward cycling for the command completion. - Use inspect.Signature instead of homebrew argument matching solution. This gets rid of quite a bit of cruft. - Remove some type checking hacks in mitmproxy.types
Diffstat (limited to 'mitmproxy/command.py')
-rw-r--r--mitmproxy/command.py333
1 files changed, 177 insertions, 156 deletions
diff --git a/mitmproxy/command.py b/mitmproxy/command.py
index f2a87e1e..a64c7404 100644
--- a/mitmproxy/command.py
+++ b/mitmproxy/command.py
@@ -1,19 +1,20 @@
"""
This module manages and invokes typed commands.
"""
+import functools
import inspect
+import sys
+import textwrap
import types
import typing
-import textwrap
-import functools
-import sys
+
import pyparsing
-from mitmproxy import exceptions
import mitmproxy.types
+from mitmproxy import exceptions
-def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
+def verify_arg_signature(f: typing.Callable, args: typing.Iterable[typing.Any], kwargs: dict) -> None:
sig = inspect.signature(f)
try:
sig.bind(*args, **kwargs)
@@ -33,122 +34,135 @@ def typename(t: type) -> str:
return to.display
+def _empty_as_none(x: typing.Any) -> typing.Any:
+ if x == inspect.Signature.empty:
+ return None
+ return x
+
+
+class CommandParameter(typing.NamedTuple):
+ display_name: str
+ type: typing.Type
+
+
class Command:
- returntype: typing.Optional[typing.Type]
+ name: str
+ manager: "CommandManager"
+ signature: inspect.Signature
+ help: typing.Optional[str]
- def __init__(self, manager, path, func) -> None:
- self.path = path
+ def __init__(self, manager: "CommandManager", name: str, func: typing.Callable) -> None:
+ self.name = name
self.manager = manager
self.func = func
- sig = inspect.signature(self.func)
- self.help = None
+ self.signature = inspect.signature(self.func)
+
if func.__doc__:
txt = func.__doc__.strip()
self.help = "\n".join(textwrap.wrap(txt))
-
- self.has_positional = False
- for i in sig.parameters.values():
- # This is the kind for *args parameters
- if i.kind == i.VAR_POSITIONAL:
- self.has_positional = True
- self.paramtypes = [v.annotation for v in sig.parameters.values()]
- if sig.return_annotation == inspect._empty: # type: ignore
- self.returntype = None
else:
- self.returntype = sig.return_annotation
+ self.help = None
+
# This fails with a CommandException if types are invalid
- self.signature_help()
+ for name, parameter in self.signature.parameters.items():
+ t = parameter.annotation
+ if not mitmproxy.types.CommandTypes.get(parameter.annotation, None):
+ raise exceptions.CommandError(f"Argument {name} has an unknown type ({_empty_as_none(t)}) in {func}.")
+ if self.return_type and not mitmproxy.types.CommandTypes.get(self.return_type, None):
+ raise exceptions.CommandError(f"Return type has an unknown type ({self.return_type}) in {func}.")
+
+ @property
+ def return_type(self) -> typing.Optional[typing.Type]:
+ return _empty_as_none(self.signature.return_annotation)
+
+ @property
+ def parameters(self) -> typing.List[CommandParameter]:
+ """Returns a list of (display name, type) tuples."""
+ ret = []
+ for name, param in self.signature.parameters.items():
+ if param.kind is param.VAR_POSITIONAL:
+ name = f"*{name}"
+ ret.append(CommandParameter(name, param.annotation))
+ return ret
- def paramnames(self) -> typing.Sequence[str]:
- v = [typename(i) for i in self.paramtypes]
- if self.has_positional:
- v[-1] = "*" + v[-1]
- return v
+ def signature_help(self) -> str:
+ params = " ".join(name for name, t in self.parameters)
+ if self.return_type:
+ ret = f" -> {typename(self.return_type)}"
+ else:
+ ret = ""
+ return f"{self.name} {params}{ret}"
- def retname(self) -> str:
- return typename(self.returntype) if self.returntype else ""
+ def prepare_args(self, args: typing.Sequence[str]) -> inspect.BoundArguments:
+ try:
+ bound_arguments = self.signature.bind(*args)
+ except TypeError as v:
+ raise exceptions.CommandError(f"Command argument mismatch: {v.args[0]}")
- def signature_help(self) -> str:
- params = " ".join(self.paramnames())
- ret = self.retname()
- if ret:
- ret = " -> " + ret
- return "%s %s%s" % (self.path, params, ret)
-
- def prepare_args(self, args: typing.Sequence[str]) -> typing.List[typing.Any]:
-
- # Arguments that are just blank spaces aren't really arguments
- # We need to get rid of those. If the user intended to pass a sequence
- # of spaces, it would come between quotes
- args = [a for a in args if a.strip() != '']
- verify_arg_signature(self.func, list(args), {})
-
- remainder: typing.Sequence[str] = []
- if self.has_positional:
- remainder = args[len(self.paramtypes) - 1:]
- args = args[:len(self.paramtypes) - 1]
-
- pargs = []
- for arg, paramtype in zip(args, self.paramtypes):
- pargs.append(parsearg(self.manager, arg, paramtype))
- pargs.extend(remainder)
- return pargs
+ for name, value in bound_arguments.arguments.items():
+ convert_to = self.signature.parameters[name].annotation
+ bound_arguments.arguments[name] = parsearg(self.manager, value, convert_to)
+
+ bound_arguments.apply_defaults()
+
+ return bound_arguments
def call(self, args: typing.Sequence[str]) -> typing.Any:
"""
- Call the command with a list of arguments. At this point, all
- arguments are strings.
+ Call the command with a list of arguments. At this point, all
+ arguments are strings.
"""
- ret = self.func(*self.prepare_args(args))
- if ret is None and self.returntype is None:
+ bound_args = self.prepare_args(args)
+ ret = self.func(*bound_args.args, **bound_args.kwargs)
+ if ret is None and self.return_type is None:
return
- typ = mitmproxy.types.CommandTypes.get(self.returntype)
+ typ = mitmproxy.types.CommandTypes.get(self.return_type)
+ assert typ
if not typ.is_valid(self.manager, typ, ret):
raise exceptions.CommandError(
- "%s returned unexpected data - expected %s" % (
- self.path, typ.display
- )
+ f"{self.name} returned unexpected data - expected {typ.display}"
)
return ret
-ParseResult = typing.NamedTuple(
- "ParseResult",
- [
- ("value", str),
- ("type", typing.Type),
- ("valid", bool),
- ],
-)
+class ParseResult(typing.NamedTuple):
+ value: str
+ type: typing.Type
+ valid: bool
-class CommandManager(mitmproxy.types._CommandBase):
+class CommandManager:
+ commands: typing.Dict[str, Command]
+
def __init__(self, master):
self.master = master
- self.commands: typing.Dict[str, Command] = {}
-
- self.regex = pyparsing.QuotedString("\"", escChar='\\', unquoteResults=False) |\
- pyparsing.QuotedString("'", escChar='\\', unquoteResults=False) |\
- pyparsing.Combine(pyparsing.Literal('"') + pyparsing.Word(pyparsing.printables + " ") + pyparsing.StringEnd()) |\
- pyparsing.Word(pyparsing.printables) |\
- pyparsing.Word(" \r\n\t")
- self.regex = self.regex.leaveWhitespace()
+ self.commands = {}
+
+ self.expr_parser = pyparsing.ZeroOrMore(
+ pyparsing.QuotedString('"', escChar='\\', unquoteResults=False)
+ | pyparsing.QuotedString("'", escChar='\\', unquoteResults=False)
+ | pyparsing.Combine(pyparsing.Literal('"')
+ + pyparsing.Word(pyparsing.printables + " ")
+ + pyparsing.StringEnd())
+ | pyparsing.Word(pyparsing.printables)
+ | pyparsing.Word(" \r\n\t")
+ ).leaveWhitespace()
def collect_commands(self, addon):
for i in dir(addon):
if not i.startswith("__"):
o = getattr(addon, i)
try:
- is_command = hasattr(o, "command_path")
+ is_command = hasattr(o, "command_name")
except Exception:
pass # hasattr may raise if o implements __getattr__.
else:
if is_command:
try:
- self.add(o.command_path, o)
+ self.add(o.command_name, o)
except exceptions.CommandError as e:
self.master.log.warn(
- "Could not load command %s: %s" % (o.command_path, e)
+ "Could not load command %s: %s" % (o.command_name, e)
)
def add(self, path: str, func: typing.Callable):
@@ -156,105 +170,100 @@ class CommandManager(mitmproxy.types._CommandBase):
@functools.lru_cache(maxsize=128)
def parse_partial(
- self,
- cmdstr: str
- ) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[str]]:
+ self,
+ cmdstr: str
+ ) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[CommandParameter]]:
"""
- Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items.
+ Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items.
"""
- parts: typing.List[str] = []
- for t, start, end in self.regex.scanString(cmdstr):
- parts.append(t[0])
-
- parse: typing.List[ParseResult] = []
- params: typing.List[type] = []
- typ: typing.Type
- cmd_found: bool = False
- for i in range(len(parts)):
- if not parts[i].isspace():
- if not cmd_found:
- cmd_found = True
- typ = mitmproxy.types.Cmd
- if parts[i] in self.commands:
- params.extend(self.commands[parts[i]].paramtypes)
- elif params:
- typ = params.pop(0)
- if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg:
- if parts[i] in self.commands:
- params[:] = self.commands[parts[i]].paramtypes
+
+ parts: typing.List[str] = self.expr_parser.parseString(cmdstr)
+
+ parsed: typing.List[ParseResult] = []
+ next_params: typing.List[CommandParameter] = [
+ CommandParameter("", mitmproxy.types.Cmd),
+ CommandParameter("", mitmproxy.types.CmdArgs),
+ ]
+ for part in parts:
+ if part.isspace():
+ parsed.append(
+ ParseResult(
+ value=part,
+ type=mitmproxy.types.Space,
+ valid=True,
+ )
+ )
+ continue
+
+ if next_params:
+ expected_type: typing.Type = next_params.pop(0).type
else:
- # If the token is just a bunch of spaces, then we don't
- # want to count it against the arguments of the command
- typ = mitmproxy.types.Unknown
+ expected_type = mitmproxy.types.Unknown
- to = mitmproxy.types.CommandTypes.get(typ, None)
+ arg_is_known_command = (
+ expected_type == mitmproxy.types.Cmd and part in self.commands
+ )
+ arg_is_unknown_command = (
+ expected_type == mitmproxy.types.Cmd and part not in self.commands
+ )
+ command_args_following = (
+ next_params and next_params[0].type == mitmproxy.types.CmdArgs
+ )
+ if arg_is_known_command and command_args_following:
+ next_params = self.commands[part].parameters + next_params[1:]
+ if arg_is_unknown_command and command_args_following:
+ next_params.pop(0)
+
+ to = mitmproxy.types.CommandTypes.get(expected_type, None)
valid = False
if to:
try:
- to.parse(self, typ, parts[i])
+ to.parse(self, expected_type, part)
except exceptions.TypeError:
valid = False
else:
valid = True
- parse.append(
+ parsed.append(
ParseResult(
- value=parts[i],
- type=typ,
+ value=part,
+ type=expected_type,
valid=valid,
)
)
- remhelp: typing.List[str] = []
- for x in params:
- remt = mitmproxy.types.CommandTypes.get(x, None)
- remhelp.append(remt.display)
-
- return parse, remhelp
+ return parsed, next_params
- def call(self, path: str, *args: typing.Sequence[typing.Any]) -> typing.Any:
+ def call(self, command_name: str, *args: typing.Sequence[typing.Any]) -> typing.Any:
"""
- Call a command with native arguments. May raise CommandError.
+ Call a command with native arguments. May raise CommandError.
"""
- if path not in self.commands:
- raise exceptions.CommandError("Unknown command: %s" % path)
- return self.commands[path].func(*args)
+ if command_name not in self.commands:
+ raise exceptions.CommandError("Unknown command: %s" % command_name)
+ return self.commands[command_name].func(*args)
- def _call_strings(self, path: str, args: typing.Sequence[str]) -> typing.Any:
+ def _call_strings(self, command_name: str, args: typing.Sequence[str]) -> typing.Any:
"""
- Call a command using a list of string arguments. May raise CommandError.
+ Call a command using a list of string arguments. May raise CommandError.
"""
- if path not in self.commands:
- raise exceptions.CommandError("Unknown command: %s" % path)
+ if command_name not in self.commands:
+ raise exceptions.CommandError("Unknown command: %s" % command_name)
- return self.commands[path].call(args)
+ return self.commands[command_name].call(args)
- def execute(self, cmdstr: str):
+ def execute(self, cmdstr: str) -> typing.Any:
"""
- Execute a command string. May raise CommandError.
+ Execute a command string. May raise CommandError.
"""
parts, _ = self.parse_partial(cmdstr)
- params = []
- for p in parts:
- v = p.value.strip()
- if v != '':
- if ((v.startswith("'") and v.endswith("'")) or
- (v.startswith("\"") and v.endswith("\""))) and \
- len(v.split(' ')) == 1:
- # If this parameter is between quotes but has no spaces in
- # it, then it is safe to remove the quotes to pass it down
- # This allows any commands that take a simple spaceless
- # string as a parameter to work. For example
- # view.flows.create get "http://www.example.com" won't work
- # if the quotes are there as it won't see the param as a URL
- v = v[1:-1]
-
- params.append(v)
-
- if len(parts) == 0:
- raise exceptions.CommandError("Invalid command: %s" % cmdstr)
-
- return self._call_strings(params[0], params[1:])
+ if not parts:
+ raise exceptions.CommandError(f"Invalid command: {cmdstr!r}")
+ command_name, *args = [
+ unquote(part.value)
+ for part in parts
+ if part.type != mitmproxy.types.Space
+ ]
+ return self._call_strings(command_name, args)
def dump(self, out=sys.stdout) -> None:
cmds = list(self.commands.values())
@@ -266,27 +275,37 @@ class CommandManager(mitmproxy.types._CommandBase):
print(file=out)
+def unquote(x: str) -> str:
+ if x.startswith("'") and x.endswith("'"):
+ return x[1:-1]
+ if x.startswith('"') and x.endswith('"'):
+ return x[1:-1]
+ return x
+
+
def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"""
Convert a string to a argument to the appropriate type.
"""
t = mitmproxy.types.CommandTypes.get(argtype, None)
if not t:
- raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
+ raise exceptions.CommandError(f"Unsupported argument type: {argtype}")
try:
- return t.parse(manager, argtype, spec) # type: ignore
+ return t.parse(manager, argtype, spec)
except exceptions.TypeError as e:
raise exceptions.CommandError from e
-def command(path):
+def command(name: typing.Optional[str]):
def decorator(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
verify_arg_signature(function, args, kwargs)
return function(*args, **kwargs)
- wrapper.__dict__["command_path"] = path
+
+ wrapper.__dict__["command_name"] = name or function.__name__
return wrapper
+
return decorator
@@ -296,8 +315,10 @@ def argument(name, type):
specific types such as mitmproxy.types.Choice, which we cannot annotate
directly as mypy does not like that.
"""
+
def decorator(f: types.FunctionType) -> types.FunctionType:
assert name in f.__annotations__
f.__annotations__[name] = type
return f
+
return decorator