aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2019-11-21 14:13:08 +0100
committerGitHub <noreply@github.com>2019-11-21 14:13:08 +0100
commit3550bdfe006ba321c706b16f58a6b0d5b4e744b7 (patch)
tree46a1eb299a89866668a85fb2045f616377e538dc /mitmproxy
parent3a7ca3e1e7cc4e0d1d8a6702cbf443f7cddaf00f (diff)
parentf7a3e903ac69950ef862757886ed83506b7d1bd9 (diff)
downloadmitmproxy-3550bdfe006ba321c706b16f58a6b0d5b4e744b7.tar.gz
mitmproxy-3550bdfe006ba321c706b16f58a6b0d5b4e744b7.tar.bz2
mitmproxy-3550bdfe006ba321c706b16f58a6b0d5b4e744b7.zip
Merge pull request #3693 from typoon/fix-command-bar-issue-3259
Improve Command Bar UX
Diffstat (limited to 'mitmproxy')
-rw-r--r--mitmproxy/addons/core.py49
-rw-r--r--mitmproxy/addons/export.py20
-rw-r--r--mitmproxy/addons/view.py58
-rw-r--r--mitmproxy/command.py320
-rw-r--r--mitmproxy/command_lexer.py49
-rw-r--r--mitmproxy/tools/console/commander/commander.py157
-rw-r--r--mitmproxy/tools/console/commands.py23
-rw-r--r--mitmproxy/tools/console/consoleaddons.py131
-rw-r--r--mitmproxy/tools/console/defaultkeys.py10
-rw-r--r--mitmproxy/tools/console/statusbar.py12
-rw-r--r--mitmproxy/types.py125
11 files changed, 505 insertions, 449 deletions
diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py
index 5c9bbcd0..8a3acedb 100644
--- a/mitmproxy/addons/core.py
+++ b/mitmproxy/addons/core.py
@@ -83,15 +83,14 @@ class Core:
)
@command.command("set")
- def set(self, *spec: str) -> None:
+ def set(self, option: str, value: str = "") -> None:
"""
- Set an option of the form "key[=value]". When the value is omitted,
- booleans are set to true, strings and integers are set to None (if
- permitted), and sequences are emptied. Boolean values can be true,
- false or toggle. If multiple specs are passed, they are joined
- into one separated by spaces.
+ Set an option. When the value is omitted, booleans are set to true,
+ strings and integers are set to None (if permitted), and sequences
+ are emptied. Boolean values can be true, false or toggle.
+ Multiple values are concatenated with a single space.
"""
- strspec = " ".join(spec)
+ strspec = f"{option}={value}"
try:
ctx.options.set(strspec)
except exceptions.OptionsError as e:
@@ -109,14 +108,14 @@ class Core:
# FIXME: this will become view.mark later
@command.command("flow.mark")
- def mark(self, flows: typing.Sequence[flow.Flow], val: bool) -> None:
+ def mark(self, flows: typing.Sequence[flow.Flow], boolean: bool) -> None:
"""
Mark flows.
"""
updated = []
for i in flows:
- if i.marked != val:
- i.marked = val
+ if i.marked != boolean:
+ i.marked = boolean
updated.append(i)
ctx.master.addons.trigger("update", updated)
@@ -169,18 +168,18 @@ class Core:
]
@command.command("flow.set")
- @command.argument("spec", type=mitmproxy.types.Choice("flow.set.options"))
+ @command.argument("attr", type=mitmproxy.types.Choice("flow.set.options"))
def flow_set(
self,
flows: typing.Sequence[flow.Flow],
- spec: str,
- sval: str
+ attr: str,
+ value: str
) -> None:
"""
Quickly set a number of common values on flows.
"""
- val: typing.Union[int, str] = sval
- if spec == "status_code":
+ val: typing.Union[int, str] = value
+ if attr == "status_code":
try:
val = int(val) # type: ignore
except ValueError as v:
@@ -193,13 +192,13 @@ class Core:
req = getattr(f, "request", None)
rupdate = True
if req:
- if spec == "method":
+ if attr == "method":
req.method = val
- elif spec == "host":
+ elif attr == "host":
req.host = val
- elif spec == "path":
+ elif attr == "path":
req.path = val
- elif spec == "url":
+ elif attr == "url":
try:
req.url = val
except ValueError as e:
@@ -212,11 +211,11 @@ class Core:
resp = getattr(f, "response", None)
supdate = True
if resp:
- if spec == "status_code":
+ if attr == "status_code":
resp.status_code = val
if val in status_codes.RESPONSES:
resp.reason = status_codes.RESPONSES[val] # type: ignore
- elif spec == "reason":
+ elif attr == "reason":
resp.reason = val
else:
supdate = False
@@ -225,7 +224,7 @@ class Core:
updated.append(f)
ctx.master.addons.trigger("update", updated)
- ctx.log.alert("Set %s on %s flows." % (spec, len(updated)))
+ ctx.log.alert("Set %s on %s flows." % (attr, len(updated)))
@command.command("flow.decode")
def decode(self, flows: typing.Sequence[flow.Flow], part: str) -> None:
@@ -262,12 +261,12 @@ class Core:
ctx.log.alert("Toggled encoding on %s flows." % len(updated))
@command.command("flow.encode")
- @command.argument("enc", type=mitmproxy.types.Choice("flow.encode.options"))
+ @command.argument("encoding", type=mitmproxy.types.Choice("flow.encode.options"))
def encode(
self,
flows: typing.Sequence[flow.Flow],
part: str,
- enc: str,
+ encoding: str,
) -> None:
"""
Encode flows with a specified encoding.
@@ -279,7 +278,7 @@ class Core:
current_enc = p.headers.get("content-encoding", "identity")
if current_enc == "identity":
f.backup()
- p.encode(enc)
+ p.encode(encoding)
updated.append(f)
ctx.master.addons.trigger("update", updated)
ctx.log.alert("Encoded %s flows." % len(updated))
diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py
index 68df9374..d874c95a 100644
--- a/mitmproxy/addons/export.py
+++ b/mitmproxy/addons/export.py
@@ -121,14 +121,14 @@ class Export():
return list(sorted(formats.keys()))
@command.command("export.file")
- def file(self, fmt: str, f: flow.Flow, path: mitmproxy.types.Path) -> None:
+ def file(self, format: str, flow: flow.Flow, path: mitmproxy.types.Path) -> None:
"""
Export a flow to path.
"""
- if fmt not in formats:
- raise exceptions.CommandError("No such export format: %s" % fmt)
- func: typing.Any = formats[fmt]
- v = func(f)
+ if format not in formats:
+ raise exceptions.CommandError("No such export format: %s" % format)
+ func: typing.Any = formats[format]
+ v = func(flow)
try:
with open(path, "wb") as fp:
if isinstance(v, bytes):
@@ -139,14 +139,14 @@ class Export():
ctx.log.error(str(e))
@command.command("export.clip")
- def clip(self, fmt: str, f: flow.Flow) -> None:
+ def clip(self, format: str, flow: flow.Flow) -> None:
"""
Export a flow to the system clipboard.
"""
- if fmt not in formats:
- raise exceptions.CommandError("No such export format: %s" % fmt)
- func: typing.Any = formats[fmt]
- v = strutils.always_str(func(f))
+ if format not in formats:
+ raise exceptions.CommandError("No such export format: %s" % format)
+ func: typing.Any = formats[format]
+ v = strutils.always_str(func(flow))
try:
pyperclip.copy(v)
except pyperclip.PyperclipException as e:
diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py
index da9d19f9..1d57d781 100644
--- a/mitmproxy/addons/view.py
+++ b/mitmproxy/addons/view.py
@@ -217,7 +217,7 @@ class View(collections.abc.Sequence):
# Focus
@command.command("view.focus.go")
- def go(self, dst: int) -> None:
+ def go(self, offset: int) -> None:
"""
Go to a specified offset. Positive offests are from the beginning of
the view, negative from the end of the view, so that 0 is the first
@@ -225,13 +225,13 @@ class View(collections.abc.Sequence):
"""
if len(self) == 0:
return
- if dst < 0:
- dst = len(self) + dst
- if dst < 0:
- dst = 0
- if dst > len(self) - 1:
- dst = len(self) - 1
- self.focus.flow = self[dst]
+ if offset < 0:
+ offset = len(self) + offset
+ if offset < 0:
+ offset = 0
+ if offset > len(self) - 1:
+ offset = len(self) - 1
+ self.focus.flow = self[offset]
@command.command("view.focus.next")
def focus_next(self) -> None:
@@ -266,20 +266,20 @@ class View(collections.abc.Sequence):
return list(sorted(self.orders.keys()))
@command.command("view.order.reverse")
- def set_reversed(self, value: bool) -> None:
- self.order_reversed = value
+ def set_reversed(self, boolean: bool) -> None:
+ self.order_reversed = boolean
self.sig_view_refresh.send(self)
@command.command("view.order.set")
- def set_order(self, order: str) -> None:
+ def set_order(self, order_key: str) -> None:
"""
Sets the current view order.
"""
- if order not in self.orders:
+ if order_key not in self.orders:
raise exceptions.CommandError(
- "Unknown flow order: %s" % order
+ "Unknown flow order: %s" % order_key
)
- order_key = self.orders[order]
+ order_key = self.orders[order_key]
self.order_key = order_key
newview = sortedcontainers.SortedListWithKey(key=order_key)
newview.update(self._view)
@@ -298,16 +298,16 @@ class View(collections.abc.Sequence):
# Filter
@command.command("view.filter.set")
- def set_filter_cmd(self, f: str) -> None:
+ def set_filter_cmd(self, filter_expr: str) -> None:
"""
Sets the current view filter.
"""
filt = None
- if f:
- filt = flowfilter.parse(f)
+ if filter_expr:
+ filt = flowfilter.parse(filter_expr)
if not filt:
raise exceptions.CommandError(
- "Invalid interception filter: %s" % f
+ "Invalid interception filter: %s" % filter_expr
)
self.set_filter(filt)
@@ -340,11 +340,11 @@ class View(collections.abc.Sequence):
# View Settings
@command.command("view.settings.getval")
- def getvalue(self, f: mitmproxy.flow.Flow, key: str, default: str) -> str:
+ def getvalue(self, flow: mitmproxy.flow.Flow, key: str, default: str) -> str:
"""
Get a value from the settings store for the specified flow.
"""
- return self.settings[f].get(key, default)
+ return self.settings[flow].get(key, default)
@command.command("view.settings.setval.toggle")
def setvalue_toggle(
@@ -412,26 +412,26 @@ class View(collections.abc.Sequence):
ctx.log.alert("Removed %s flows" % len(flows))
@command.command("view.flows.resolve")
- def resolve(self, spec: str) -> typing.Sequence[mitmproxy.flow.Flow]:
+ def resolve(self, flow_spec: str) -> typing.Sequence[mitmproxy.flow.Flow]:
"""
Resolve a flow list specification to an actual list of flows.
"""
- if spec == "@all":
+ if flow_spec == "@all":
return [i for i in self._store.values()]
- if spec == "@focus":
+ if flow_spec == "@focus":
return [self.focus.flow] if self.focus.flow else []
- elif spec == "@shown":
+ elif flow_spec == "@shown":
return [i for i in self]
- elif spec == "@hidden":
+ elif flow_spec == "@hidden":
return [i for i in self._store.values() if i not in self._view]
- elif spec == "@marked":
+ elif flow_spec == "@marked":
return [i for i in self._store.values() if i.marked]
- elif spec == "@unmarked":
+ elif flow_spec == "@unmarked":
return [i for i in self._store.values() if not i.marked]
else:
- filt = flowfilter.parse(spec)
+ filt = flowfilter.parse(flow_spec)
if not filt:
- raise exceptions.CommandError("Invalid flow filter: %s" % spec)
+ raise exceptions.CommandError("Invalid flow filter: %s" % flow_spec)
return [i for i in self._store.values() if filt(i)]
@command.command("view.flows.create")
diff --git a/mitmproxy/command.py b/mitmproxy/command.py
index 0998601c..6977ff91 100644
--- a/mitmproxy/command.py
+++ b/mitmproxy/command.py
@@ -1,20 +1,19 @@
"""
This module manages and invokes typed commands.
"""
+import functools
import inspect
+import sys
+import textwrap
import types
-import io
import typing
-import shlex
-import textwrap
-import functools
-import sys
-from mitmproxy import exceptions
import mitmproxy.types
+from mitmproxy import exceptions, command_lexer
+from mitmproxy.command_lexer import unquote
-def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
+def verify_arg_signature(f: typing.Callable, args: typing.Iterable[typing.Any], kwargs: dict) -> None:
sig = inspect.signature(f)
try:
sig.bind(*args, **kwargs)
@@ -22,15 +21,6 @@ def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:
raise exceptions.CommandError("command argument mismatch: %s" % v.args[0])
-def lexer(s):
- # mypy mis-identifies shlex.shlex as abstract
- lex = shlex.shlex(s, posix=True) # type: ignore
- lex.wordchars += "."
- lex.whitespace_split = True
- lex.commenters = ''
- return lex
-
-
def typename(t: type) -> str:
"""
Translates a type to an explanatory string.
@@ -43,208 +33,234 @@ def typename(t: type) -> str:
return to.display
+def _empty_as_none(x: typing.Any) -> typing.Any:
+ if x == inspect.Signature.empty:
+ return None
+ return x
+
+
+class CommandParameter(typing.NamedTuple):
+ name: str
+ type: typing.Type
+ kind: inspect._ParameterKind = inspect.Parameter.POSITIONAL_OR_KEYWORD
+
+ def __str__(self):
+ if self.kind is inspect.Parameter.VAR_POSITIONAL:
+ return f"*{self.name}"
+ else:
+ return self.name
+
+
class Command:
- returntype: typing.Optional[typing.Type]
+ name: str
+ manager: "CommandManager"
+ signature: inspect.Signature
+ help: typing.Optional[str]
- def __init__(self, manager, path, func) -> None:
- self.path = path
+ def __init__(self, manager: "CommandManager", name: str, func: typing.Callable) -> None:
+ self.name = name
self.manager = manager
self.func = func
- sig = inspect.signature(self.func)
- self.help = None
+ self.signature = inspect.signature(self.func)
+
if func.__doc__:
txt = func.__doc__.strip()
self.help = "\n".join(textwrap.wrap(txt))
-
- self.has_positional = False
- for i in sig.parameters.values():
- # This is the kind for *args parameters
- if i.kind == i.VAR_POSITIONAL:
- self.has_positional = True
- self.paramtypes = [v.annotation for v in sig.parameters.values()]
- if sig.return_annotation == inspect._empty: # type: ignore
- self.returntype = None
else:
- self.returntype = sig.return_annotation
+ self.help = None
+
# This fails with a CommandException if types are invalid
- self.signature_help()
+ for name, parameter in self.signature.parameters.items():
+ t = parameter.annotation
+ if not mitmproxy.types.CommandTypes.get(parameter.annotation, None):
+ raise exceptions.CommandError(f"Argument {name} has an unknown type ({_empty_as_none(t)}) in {func}.")
+ if self.return_type and not mitmproxy.types.CommandTypes.get(self.return_type, None):
+ raise exceptions.CommandError(f"Return type has an unknown type ({self.return_type}) in {func}.")
+
+ @property
+ def return_type(self) -> typing.Optional[typing.Type]:
+ return _empty_as_none(self.signature.return_annotation)
+
+ @property
+ def parameters(self) -> typing.List[CommandParameter]:
+ """Returns a list of CommandParameters."""
+ ret = []
+ for name, param in self.signature.parameters.items():
+ ret.append(CommandParameter(name, param.annotation, param.kind))
+ return ret
+
+ def signature_help(self) -> str:
+ params = " ".join(str(param) for param in self.parameters)
+ if self.return_type:
+ ret = f" -> {typename(self.return_type)}"
+ else:
+ ret = ""
+ return f"{self.name} {params}{ret}"
- def paramnames(self) -> typing.Sequence[str]:
- v = [typename(i) for i in self.paramtypes]
- if self.has_positional:
- v[-1] = "*" + v[-1]
- return v
+ def prepare_args(self, args: typing.Sequence[str]) -> inspect.BoundArguments:
+ try:
+ bound_arguments = self.signature.bind(*args)
+ except TypeError as v:
+ raise exceptions.CommandError(f"Command argument mismatch: {v.args[0]}")
- def retname(self) -> str:
- return typename(self.returntype) if self.returntype else ""
+ for name, value in bound_arguments.arguments.items():
+ convert_to = self.signature.parameters[name].annotation
+ bound_arguments.arguments[name] = parsearg(self.manager, value, convert_to)
- def signature_help(self) -> str:
- params = " ".join(self.paramnames())
- ret = self.retname()
- if ret:
- ret = " -> " + ret
- return "%s %s%s" % (self.path, params, ret)
-
- def prepare_args(self, args: typing.Sequence[str]) -> typing.List[typing.Any]:
- verify_arg_signature(self.func, list(args), {})
-
- remainder: typing.Sequence[str] = []
- if self.has_positional:
- remainder = args[len(self.paramtypes) - 1:]
- args = args[:len(self.paramtypes) - 1]
-
- pargs = []
- for arg, paramtype in zip(args, self.paramtypes):
- pargs.append(parsearg(self.manager, arg, paramtype))
- pargs.extend(remainder)
- return pargs
+ bound_arguments.apply_defaults()
+
+ return bound_arguments
def call(self, args: typing.Sequence[str]) -> typing.Any:
"""
- Call the command with a list of arguments. At this point, all
- arguments are strings.
+ Call the command with a list of arguments. At this point, all
+ arguments are strings.
"""
- ret = self.func(*self.prepare_args(args))
- if ret is None and self.returntype is None:
+ bound_args = self.prepare_args(args)
+ ret = self.func(*bound_args.args, **bound_args.kwargs)
+ if ret is None and self.return_type is None:
return
- typ = mitmproxy.types.CommandTypes.get(self.returntype)
+ typ = mitmproxy.types.CommandTypes.get(self.return_type)
+ assert typ
if not typ.is_valid(self.manager, typ, ret):
raise exceptions.CommandError(
- "%s returned unexpected data - expected %s" % (
- self.path, typ.display
- )
+ f"{self.name} returned unexpected data - expected {typ.display}"
)
return ret
-ParseResult = typing.NamedTuple(
- "ParseResult",
- [
- ("value", str),
- ("type", typing.Type),
- ("valid", bool),
- ],
-)
+class ParseResult(typing.NamedTuple):
+ value: str
+ type: typing.Type
+ valid: bool
+
+class CommandManager:
+ commands: typing.Dict[str, Command]
-class CommandManager(mitmproxy.types._CommandBase):
def __init__(self, master):
self.master = master
- self.commands: typing.Dict[str, Command] = {}
+ self.commands = {}
def collect_commands(self, addon):
for i in dir(addon):
if not i.startswith("__"):
o = getattr(addon, i)
try:
- is_command = hasattr(o, "command_path")
+ is_command = hasattr(o, "command_name")
except Exception:
pass # hasattr may raise if o implements __getattr__.
else:
if is_command:
try:
- self.add(o.command_path, o)
+ self.add(o.command_name, o)
except exceptions.CommandError as e:
self.master.log.warn(
- "Could not load command %s: %s" % (o.command_path, e)
+ "Could not load command %s: %s" % (o.command_name, e)
)
def add(self, path: str, func: typing.Callable):
self.commands[path] = Command(self, path, func)
+ @functools.lru_cache(maxsize=128)
def parse_partial(
- self,
- cmdstr: str
- ) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[str]]:
+ self,
+ cmdstr: str
+ ) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[CommandParameter]]:
"""
- Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items.
+ Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items.
"""
- buf = io.StringIO(cmdstr)
- parts: typing.List[str] = []
- lex = lexer(buf)
- while 1:
- remainder = cmdstr[buf.tell():]
- try:
- t = lex.get_token()
- except ValueError:
- parts.append(remainder)
- break
- if not t:
- break
- parts.append(t)
- if not parts:
- parts = [""]
- elif cmdstr.endswith(" "):
- parts.append("")
-
- parse: typing.List[ParseResult] = []
- params: typing.List[type] = []
- typ: typing.Type
- for i in range(len(parts)):
- if i == 0:
- typ = mitmproxy.types.Cmd
- if parts[i] in self.commands:
- params.extend(self.commands[parts[i]].paramtypes)
- elif params:
- typ = params.pop(0)
- if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg:
- if parts[i] in self.commands:
- params[:] = self.commands[parts[i]].paramtypes
+
+ parts: typing.List[str] = command_lexer.expr.parseString(cmdstr, parseAll=True)
+
+ parsed: typing.List[ParseResult] = []
+ next_params: typing.List[CommandParameter] = [
+ CommandParameter("", mitmproxy.types.Cmd),
+ CommandParameter("", mitmproxy.types.CmdArgs),
+ ]
+ expected: typing.Optional[CommandParameter] = None
+ for part in parts:
+ if part.isspace():
+ parsed.append(
+ ParseResult(
+ value=part,
+ type=mitmproxy.types.Space,
+ valid=True,
+ )
+ )
+ continue
+
+ if expected and expected.kind is inspect.Parameter.VAR_POSITIONAL:
+ assert not next_params
+ elif next_params:
+ expected = next_params.pop(0)
else:
- typ = mitmproxy.types.Unknown
+ expected = CommandParameter("", mitmproxy.types.Unknown)
- to = mitmproxy.types.CommandTypes.get(typ, None)
+ arg_is_known_command = (
+ expected.type == mitmproxy.types.Cmd and part in self.commands
+ )
+ arg_is_unknown_command = (
+ expected.type == mitmproxy.types.Cmd and part not in self.commands
+ )
+ command_args_following = (
+ next_params and next_params[0].type == mitmproxy.types.CmdArgs
+ )
+ if arg_is_known_command and command_args_following:
+ next_params = self.commands[part].parameters + next_params[1:]
+ if arg_is_unknown_command and command_args_following:
+ next_params.pop(0)
+
+ to = mitmproxy.types.CommandTypes.get(expected.type, None)
valid = False
if to:
try:
- to.parse(self, typ, parts[i])
+ to.parse(self, expected.type, part)
except exceptions.TypeError:
valid = False
else:
valid = True
- parse.append(
+ parsed.append(
ParseResult(
- value=parts[i],
- type=typ,
+ value=part,
+ type=expected.type,
valid=valid,
)
)
- remhelp: typing.List[str] = []
- for x in params:
- remt = mitmproxy.types.CommandTypes.get(x, None)
- remhelp.append(remt.display)
-
- return parse, remhelp
+ return parsed, next_params
- def call(self, path: str, *args: typing.Sequence[typing.Any]) -> typing.Any:
+ def call(self, command_name: str, *args: typing.Sequence[typing.Any]) -> typing.Any:
"""
- Call a command with native arguments. May raise CommandError.
+ Call a command with native arguments. May raise CommandError.
"""
- if path not in self.commands:
- raise exceptions.CommandError("Unknown command: %s" % path)
- return self.commands[path].func(*args)
+ if command_name not in self.commands:
+ raise exceptions.CommandError("Unknown command: %s" % command_name)
+ return self.commands[command_name].func(*args)
- def call_strings(self, path: str, args: typing.Sequence[str]) -> typing.Any:
+ def _call_strings(self, command_name: str, args: typing.Sequence[str]) -> typing.Any:
"""
- Call a command using a list of string arguments. May raise CommandError.
+ Call a command using a list of string arguments. May raise CommandError.
"""
- if path not in self.commands:
- raise exceptions.CommandError("Unknown command: %s" % path)
- return self.commands[path].call(args)
+ if command_name not in self.commands:
+ raise exceptions.CommandError("Unknown command: %s" % command_name)
- def execute(self, cmdstr: str):
+ return self.commands[command_name].call(args)
+
+ def execute(self, cmdstr: str) -> typing.Any:
"""
- Execute a command string. May raise CommandError.
+ Execute a command string. May raise CommandError.
"""
- try:
- parts = list(lexer(cmdstr))
- except ValueError as e:
- raise exceptions.CommandError("Command error: %s" % e)
- if not len(parts) >= 1:
- raise exceptions.CommandError("Invalid command: %s" % cmdstr)
- return self.call_strings(parts[0], parts[1:])
+ parts, _ = self.parse_partial(cmdstr)
+ if not parts:
+ raise exceptions.CommandError(f"Invalid command: {cmdstr!r}")
+ command_name, *args = [
+ unquote(part.value)
+ for part in parts
+ if part.type != mitmproxy.types.Space
+ ]
+ return self._call_strings(command_name, args)
def dump(self, out=sys.stdout) -> None:
cmds = list(self.commands.values())
@@ -262,21 +278,23 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:
"""
t = mitmproxy.types.CommandTypes.get(argtype, None)
if not t:
- raise exceptions.CommandError("Unsupported argument type: %s" % argtype)
+ raise exceptions.CommandError(f"Unsupported argument type: {argtype}")
try:
- return t.parse(manager, argtype, spec) # type: ignore
+ return t.parse(manager, argtype, spec)
except exceptions.TypeError as e:
raise exceptions.CommandError from e
-def command(path):
+def command(name: typing.Optional[str] = None):
def decorator(function):
@functools.wraps(function)
def wrapper(*args, **kwargs):
verify_arg_signature(function, args, kwargs)
return function(*args, **kwargs)
- wrapper.__dict__["command_path"] = path
+
+ wrapper.__dict__["command_name"] = name or function.__name__.replace("_", ".")
return wrapper
+
return decorator
@@ -286,8 +304,10 @@ def argument(name, type):
specific types such as mitmproxy.types.Choice, which we cannot annotate
directly as mypy does not like that.
"""
+
def decorator(f: types.FunctionType) -> types.FunctionType:
assert name in f.__annotations__
f.__annotations__[name] = type
return f
+
return decorator
diff --git a/mitmproxy/command_lexer.py b/mitmproxy/command_lexer.py
new file mode 100644
index 00000000..f042f3c9
--- /dev/null
+++ b/mitmproxy/command_lexer.py
@@ -0,0 +1,49 @@
+import ast
+import re
+
+import pyparsing
+
+# TODO: There is a lot of work to be done here.
+# The current implementation is written in a way that _any_ input is valid,
+# which does not make sense once things get more complex.
+
+PartialQuotedString = pyparsing.Regex(
+ re.compile(
+ r'''
+ (["']) # start quote
+ (?:
+ (?!\1)[^\\] # unescaped character that is not our quote nor the begin of an escape sequence. We can't use \1 in []
+ |
+ (?:\\.) # escape sequence
+ )*
+ (?:\1|$) # end quote
+ ''',
+ re.VERBOSE
+ )
+)
+
+expr = pyparsing.ZeroOrMore(
+ PartialQuotedString
+ | pyparsing.Word(" \r\n\t")
+ | pyparsing.CharsNotIn("""'" \r\n\t""")
+).leaveWhitespace()
+
+
+def quote(val: str) -> str:
+ if val and all(char not in val for char in "'\" \r\n\t"):
+ return val
+ return repr(val) # TODO: More of a hack.
+
+
+def unquote(x: str) -> str:
+ quoted = (
+ (x.startswith('"') and x.endswith('"'))
+ or
+ (x.startswith("'") and x.endswith("'"))
+ )
+ if quoted:
+ try:
+ x = ast.literal_eval(x)
+ except Exception:
+ x = x[1:-1]
+ return x
diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py
index f291b8fd..d751422b 100644
--- a/mitmproxy/tools/console/commander/commander.py
+++ b/mitmproxy/tools/console/commander/commander.py
@@ -1,58 +1,55 @@
import abc
+import collections
import copy
import typing
-import collections
import urwid
from urwid.text_layout import calc_coords
+import mitmproxy.command
import mitmproxy.flow
import mitmproxy.master
-import mitmproxy.command
import mitmproxy.types
-class Completer: # pragma: no cover
+class Completer:
@abc.abstractmethod
- def cycle(self) -> str:
- pass
+ def cycle(self, forward: bool = True) -> str:
+ raise NotImplementedError()
class ListCompleter(Completer):
def __init__(
- self,
- start: str,
- options: typing.Sequence[str],
+ self,
+ start: str,
+ options: typing.Sequence[str],
) -> None:
self.start = start
- self.options: typing.Sequence[str] = []
+ self.options: typing.List[str] = []
for o in options:
if o.startswith(start):
self.options.append(o)
self.options.sort()
self.offset = 0
- def cycle(self) -> str:
+ def cycle(self, forward: bool = True) -> str:
if not self.options:
return self.start
ret = self.options[self.offset]
- self.offset = (self.offset + 1) % len(self.options)
+ delta = 1 if forward else -1
+ self.offset = (self.offset + delta) % len(self.options)
return ret
-CompletionState = typing.NamedTuple(
- "CompletionState",
- [
- ("completer", Completer),
- ("parse", typing.Sequence[mitmproxy.command.ParseResult])
- ]
-)
+class CompletionState(typing.NamedTuple):
+ completer: Completer
+ parsed: typing.Sequence[mitmproxy.command.ParseResult]
class CommandBuffer:
def __init__(self, master: mitmproxy.master.Master, start: str = "") -> None:
self.master = master
- self.text = self.flatten(start)
+ self.text = start
# Cursor is always within the range [0:len(buffer)].
self._cursor = len(self.text)
self.completion: typing.Optional[CompletionState] = None
@@ -70,51 +67,30 @@ class CommandBuffer:
else:
self._cursor = x
- def maybequote(self, value):
- if " " in value and not value.startswith("\""):
- return "\"%s\"" % value
- return value
-
- def parse_quoted(self, txt):
- parts, remhelp = self.master.commands.parse_partial(txt)
- for i, p in enumerate(parts):
- parts[i] = mitmproxy.command.ParseResult(
- value = self.maybequote(p.value),
- type = p.type,
- valid = p.valid
- )
- return parts, remhelp
-
def render(self):
- """
- This function is somewhat tricky - in order to make the cursor
- position valid, we have to make sure there is a
- character-for-character offset match in the rendered output, up
- to the cursor. Beyond that, we can add stuff.
- """
- parts, remhelp = self.parse_quoted(self.text)
+ parts, remaining = self.master.commands.parse_partial(self.text)
ret = []
- for p in parts:
- if p.valid:
- if p.type == mitmproxy.types.Cmd:
- ret.append(("commander_command", p.value))
- else:
- ret.append(("text", p.value))
- elif p.value:
- ret.append(("commander_invalid", p.value))
- else:
- ret.append(("text", ""))
- ret.append(("text", " "))
- if remhelp:
- ret.append(("text", " "))
- for v in remhelp:
- ret.append(("commander_hint", "%s " % v))
- return ret
+ if not parts:
+ # Means we just received the leader, so we need to give a blank
+ # text to the widget to render or it crashes
+ ret.append(("text", ""))
+ else:
+ for p in parts:
+ if p.valid:
+ if p.type == mitmproxy.types.Cmd:
+ ret.append(("commander_command", p.value))
+ else:
+ ret.append(("text", p.value))
+ elif p.value:
+ ret.append(("commander_invalid", p.value))
+
+ if remaining:
+ if parts[-1].type != mitmproxy.types.Space:
+ ret.append(("text", " "))
+ for param in remaining:
+ ret.append(("commander_hint", f"{param} "))
- def flatten(self, txt):
- parts, _ = self.parse_quoted(txt)
- ret = [x.value for x in parts]
- return " ".join(ret)
+ return ret
def left(self) -> None:
self.cursor = self.cursor - 1
@@ -122,30 +98,38 @@ class CommandBuffer:
def right(self) -> None:
self.cursor = self.cursor + 1
- def cycle_completion(self) -> None:
+ def cycle_completion(self, forward: bool = True) -> None:
if not self.completion:
- parts, remainhelp = self.master.commands.parse_partial(self.text[:self.cursor])
- last = parts[-1]
- ct = mitmproxy.types.CommandTypes.get(last.type, None)
+ parts, remaining = self.master.commands.parse_partial(self.text[:self.cursor])
+ if parts and parts[-1].type != mitmproxy.types.Space:
+ type_to_complete = parts[-1].type
+ cycle_prefix = parts[-1].value
+ parsed = parts[:-1]
+ elif remaining:
+ type_to_complete = remaining[0].type
+ cycle_prefix = ""
+ parsed = parts
+ else:
+ return
+ ct = mitmproxy.types.CommandTypes.get(type_to_complete, None)
if ct:
self.completion = CompletionState(
- completer = ListCompleter(
- parts[-1].value,
- ct.completion(self.master.commands, last.type, parts[-1].value)
+ completer=ListCompleter(
+ cycle_prefix,
+ ct.completion(self.master.commands, type_to_complete, cycle_prefix)
),
- parse = parts,
+ parsed=parsed,
)
if self.completion:
- nxt = self.completion.completer.cycle()
- buf = " ".join([i.value for i in self.completion.parse[:-1]]) + " " + nxt
- buf = buf.strip()
- self.text = self.flatten(buf)
+ nxt = self.completion.completer.cycle(forward)
+ buf = "".join([i.value for i in self.completion.parsed]) + nxt
+ self.text = buf
self.cursor = len(self.text)
def backspace(self) -> None:
if self.cursor == 0:
return
- self.text = self.flatten(self.text[:self.cursor - 1] + self.text[self.cursor:])
+ self.text = self.text[:self.cursor - 1] + self.text[self.cursor:]
self.cursor = self.cursor - 1
self.completion = None
@@ -153,13 +137,18 @@ class CommandBuffer:
"""
Inserts text at the cursor.
"""
- self.text = self.flatten(self.text[:self.cursor] + k + self.text[self.cursor:])
- self.cursor += 1
+
+ # We don't want to insert a space before the command
+ if k == ' ' and self.text[0:self.cursor].strip() == '':
+ return
+
+ self.text = self.text[:self.cursor] + k + self.text[self.cursor:]
+ self.cursor += len(k)
self.completion = None
class CommandHistory:
- def __init__(self, master: mitmproxy.master.Master, size: int=30) -> None:
+ def __init__(self, master: mitmproxy.master.Master, size: int = 30) -> None:
self.saved_commands: collections.deque = collections.deque(
[CommandBuffer(master, "")],
maxlen=size
@@ -182,7 +171,7 @@ class CommandHistory:
return self.saved_commands[self.index]
return None
- def add_command(self, command: CommandBuffer, execution: bool=False) -> None:
+ def add_command(self, command: CommandBuffer, execution: bool = False) -> None:
if self.index == self.last_index or execution:
last_item = self.saved_commands[-1]
last_item_empty = not last_item.text
@@ -207,7 +196,7 @@ class CommandEdit(urwid.WidgetWrap):
self.history = history
self.update()
- def keypress(self, size, key):
+ def keypress(self, size, key) -> None:
if key == "backspace":
self.cbuf.backspace()
elif key == "left":
@@ -219,27 +208,29 @@ class CommandEdit(urwid.WidgetWrap):
self.cbuf = self.history.get_prev() or self.cbuf
elif key == "down":
self.cbuf = self.history.get_next() or self.cbuf
+ elif key == "shift tab":
+ self.cbuf.cycle_completion(False)
elif key == "tab":
self.cbuf.cycle_completion()
elif len(key) == 1:
self.cbuf.insert(key)
self.update()
- def update(self):
+ def update(self) -> None:
self._w.set_text([self.leader, self.cbuf.render()])
- def render(self, size, focus=False):
+ def render(self, size, focus=False) -> urwid.Canvas:
(maxcol,) = size
canv = self._w.render((maxcol,))
canv = urwid.CompositeCanvas(canv)
canv.cursor = self.get_cursor_coords((maxcol,))
return canv
- def get_cursor_coords(self, size):
+ def get_cursor_coords(self, size) -> typing.Tuple[int, int]:
p = self.cbuf.cursor + len(self.leader)
trans = self._w.get_line_translation(size[0])
x, y = calc_coords(self._w.get_text()[0], trans, p)
return x, y
- def get_edit_text(self):
+ def get_edit_text(self) -> str:
return self.cbuf.text
diff --git a/mitmproxy/tools/console/commands.py b/mitmproxy/tools/console/commands.py
index 0f35742b..26a99b14 100644
--- a/mitmproxy/tools/console/commands.py
+++ b/mitmproxy/tools/console/commands.py
@@ -1,6 +1,8 @@
import urwid
import blinker
import textwrap
+
+from mitmproxy import command
from mitmproxy.tools.console import layoutwidget
from mitmproxy.tools.console import signals
@@ -10,7 +12,7 @@ command_focus_change = blinker.Signal()
class CommandItem(urwid.WidgetWrap):
- def __init__(self, walker, cmd, focused):
+ def __init__(self, walker, cmd: command.Command, focused: bool):
self.walker, self.cmd, self.focused = walker, cmd, focused
super().__init__(None)
self._w = self.get_widget()
@@ -18,15 +20,18 @@ class CommandItem(urwid.WidgetWrap):
def get_widget(self):
parts = [
("focus", ">> " if self.focused else " "),
- ("title", self.cmd.path),
- ("text", " "),
- ("text", " ".join(self.cmd.paramnames())),
+ ("title", self.cmd.name)
]
- if self.cmd.returntype:
- parts.append([
+ if self.cmd.parameters:
+ parts += [
+ ("text", " "),
+ ("text", " ".join(str(param) for param in self.cmd.parameters)),
+ ]
+ if self.cmd.return_type:
+ parts += [
("title", " -> "),
- ("text", self.cmd.retname()),
- ])
+ ("text", command.typename(self.cmd.return_type)),
+ ]
return urwid.AttrMap(
urwid.Padding(urwid.Text(parts)),
@@ -92,7 +97,7 @@ class CommandsList(urwid.ListBox):
def keypress(self, size, key):
if key == "m_select":
foc, idx = self.get_focus()
- signals.status_prompt_command.send(partial=foc.cmd.path + " ")
+ signals.status_prompt_command.send(partial=foc.cmd.name + " ")
elif key == "m_start":
self.set_focus(0)
self.walker._modified()
diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py
index 9f595b42..7fcd9b48 100644
--- a/mitmproxy/tools/console/consoleaddons.py
+++ b/mitmproxy/tools/console/consoleaddons.py
@@ -1,21 +1,18 @@
import csv
-import shlex
import typing
+import mitmproxy.types
+from mitmproxy import command, command_lexer
+from mitmproxy import contentviews
from mitmproxy import ctx
-from mitmproxy import command
from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import log
-from mitmproxy import contentviews
-from mitmproxy.utils import strutils
-import mitmproxy.types
-
-
+from mitmproxy.tools.console import keymap
from mitmproxy.tools.console import overlay
from mitmproxy.tools.console import signals
-from mitmproxy.tools.console import keymap
+from mitmproxy.utils import strutils
console_palettes = [
"lowlight",
@@ -48,10 +45,12 @@ class UnsupportedLog:
"""
A small addon to dump info on flow types we don't support yet.
"""
+
def websocket_message(self, f):
message = f.messages[-1]
ctx.log.info(f.message_info(message))
- ctx.log.debug(message.content if isinstance(message.content, str) else strutils.bytes_to_escaped_str(message.content))
+ ctx.log.debug(
+ message.content if isinstance(message.content, str) else strutils.bytes_to_escaped_str(message.content))
def websocket_end(self, f):
ctx.log.info("WebSocket connection closed by {}: {} {}, {}".format(
@@ -78,6 +77,7 @@ class ConsoleAddon:
An addon that exposes console-specific commands, and hooks into required
events.
"""
+
def __init__(self, master):
self.master = master
self.started = False
@@ -86,7 +86,7 @@ class ConsoleAddon:
loader.add_option(
"console_default_contentview", str, "auto",
"The default content view mode.",
- choices = [i.name.lower() for i in contentviews.views]
+ choices=[i.name.lower() for i in contentviews.views]
)
loader.add_option(
"console_eventlog_verbosity", str, 'info',
@@ -142,7 +142,7 @@ class ConsoleAddon:
opts = self.layout_options()
off = self.layout_options().index(ctx.options.console_layout)
ctx.options.update(
- console_layout = opts[(off + 1) % len(opts)]
+ console_layout=opts[(off + 1) % len(opts)]
)
@command.command("console.panes.next")
@@ -234,17 +234,18 @@ class ConsoleAddon:
@command.command("console.choose")
def console_choose(
- self,
- prompt: str,
- choices: typing.Sequence[str],
- cmd: mitmproxy.types.Cmd,
- *args: mitmproxy.types.Arg
+ self,
+ prompt: str,
+ choices: typing.Sequence[str],
+ cmd: mitmproxy.types.Cmd,
+ *args: mitmproxy.types.CmdArgs
) -> None:
"""
Prompt the user to choose from a specified list of strings, then
invoke another command with all occurrences of {choice} replaced by
the choice the user made.
"""
+
def callback(opt):
# We're now outside of the call context...
repl = cmd + " " + " ".join(args)
@@ -260,22 +261,22 @@ class ConsoleAddon:
@command.command("console.choose.cmd")
def console_choose_cmd(
- self,
- prompt: str,
- choicecmd: mitmproxy.types.Cmd,
- subcmd: mitmproxy.types.Cmd,
- *args: mitmproxy.types.Arg
+ self,
+ prompt: str,
+ choicecmd: mitmproxy.types.Cmd,
+ subcmd: mitmproxy.types.Cmd,
+ *args: mitmproxy.types.CmdArgs
) -> None:
"""
Prompt the user to choose from a list of strings returned by a
command, then invoke another command with all occurrences of {choice}
replaced by the choice the user made.
"""
- choices = ctx.master.commands.call_strings(choicecmd, [])
+ choices = ctx.master.commands.execute(choicecmd)
def callback(opt):
# We're now outside of the call context...
- repl = shlex.quote(" ".join(args))
+ repl = " ".join(command_lexer.quote(x) for x in args)
repl = repl.replace("{choice}", opt)
try:
self.master.commands.execute(subcmd + " " + repl)
@@ -287,21 +288,24 @@ class ConsoleAddon:
)
@command.command("console.command")
- def console_command(self, *partial: str) -> None:
+ def console_command(self, *command_str: str) -> None:
"""
Prompt the user to edit a command with a (possibly empty) starting value.
"""
- signals.status_prompt_command.send(partial=" ".join(partial)) # type: ignore
+ quoted = " ".join(command_lexer.quote(x) for x in command_str)
+ signals.status_prompt_command.send(partial=quoted)
@command.command("console.command.set")
- def console_command_set(self, option: str) -> None:
+ def console_command_set(self, option_name: str) -> None:
"""
- Prompt the user to set an option of the form "key[=value]".
+ Prompt the user to set an option.
"""
- option_value = getattr(self.master.options, option, None)
- current_value = option_value if option_value else ""
- self.master.commands.execute(
- "console.command set %s=%s" % (option, current_value)
+ option_value = getattr(self.master.options, option_name, None) or ""
+ set_command = f"set {option_name} {option_value!r}"
+ cursor = len(set_command) - 1
+ signals.status_prompt_command.send(
+ partial=set_command,
+ cursor=cursor
)
@command.command("console.view.keybindings")
@@ -351,14 +355,14 @@ class ConsoleAddon:
@command.command("console.bodyview")
@command.argument("part", type=mitmproxy.types.Choice("console.bodyview.options"))
- def bodyview(self, f: flow.Flow, part: str) -> None:
+ def bodyview(self, flow: flow.Flow, part: str) -> None:
"""
Spawn an external viewer for a flow request or response body based
on the detected MIME type. We use the mailcap system to find the
correct viewier, and fall back to the programs in $PAGER or $EDITOR
if necessary.
"""
- fpart = getattr(f, part, None)
+ fpart = getattr(flow, part, None)
if not fpart:
raise exceptions.CommandError("Part must be either request or response, not %s." % part)
t = fpart.headers.get("content-type")
@@ -397,8 +401,8 @@ class ConsoleAddon:
]
@command.command("console.edit.focus")
- @command.argument("part", type=mitmproxy.types.Choice("console.edit.focus.options"))
- def edit_focus(self, part: str) -> None:
+ @command.argument("flow_part", type=mitmproxy.types.Choice("console.edit.focus.options"))
+ def edit_focus(self, flow_part: str) -> None:
"""
Edit a component of the currently focused flow.
"""
@@ -410,27 +414,27 @@ class ConsoleAddon:
flow.backup()
require_dummy_response = (
- part in ("response-headers", "response-body", "set-cookies") and
- flow.response is None
+ flow_part in ("response-headers", "response-body", "set-cookies") and
+ flow.response is None
)
if require_dummy_response:
flow.response = http.HTTPResponse.make()
- if part == "cookies":
+ if flow_part == "cookies":
self.master.switch_view("edit_focus_cookies")
- elif part == "urlencoded form":
+ elif flow_part == "urlencoded form":
self.master.switch_view("edit_focus_urlencoded_form")
- elif part == "multipart form":
+ elif flow_part == "multipart form":
self.master.switch_view("edit_focus_multipart_form")
- elif part == "path":
+ elif flow_part == "path":
self.master.switch_view("edit_focus_path")
- elif part == "query":
+ elif flow_part == "query":
self.master.switch_view("edit_focus_query")
- elif part == "request-headers":
+ elif flow_part == "request-headers":
self.master.switch_view("edit_focus_request_headers")
- elif part == "response-headers":
+ elif flow_part == "response-headers":
self.master.switch_view("edit_focus_response_headers")
- elif part in ("request-body", "response-body"):
- if part == "request-body":
+ elif flow_part in ("request-body", "response-body"):
+ if flow_part == "request-body":
message = flow.request
else:
message = flow.response
@@ -442,16 +446,16 @@ class ConsoleAddon:
# just strip the newlines off the end of the body when we return
# from an editor.
message.content = c.rstrip(b"\n")
- elif part == "set-cookies":
+ elif flow_part == "set-cookies":
self.master.switch_view("edit_focus_setcookies")
- elif part == "url":
+ elif flow_part == "url":
url = flow.request.url.encode()
edited_url = self.master.spawn_editor(url)
url = edited_url.rstrip(b"\n")
flow.request.url = url.decode()
- elif part in ["method", "status_code", "reason"]:
+ elif flow_part in ["method", "status_code", "reason"]:
self.master.commands.execute(
- "console.command flow.set @focus %s " % part
+ "console.command flow.set @focus %s " % flow_part
)
def _grideditor(self):
@@ -535,10 +539,8 @@ class ConsoleAddon:
raise exceptions.CommandError("Invalid flowview mode.")
try:
- self.master.commands.call_strings(
- "view.settings.setval",
- ["@focus", "flowview_mode_%s" % idx, mode]
- )
+ cmd = 'view.settings.setval @focus flowview_mode_%s %s' % (idx, mode)
+ self.master.commands.execute(cmd)
except exceptions.CommandError as e:
signals.status_message.send(message=str(e))
@@ -558,14 +560,9 @@ class ConsoleAddon:
if not fv:
raise exceptions.CommandError("Not viewing a flow.")
idx = fv.body.tab_offset
- return self.master.commands.call_strings(
- "view.settings.getval",
- [
- "@focus",
- "flowview_mode_%s" % idx,
- self.master.options.console_default_contentview,
- ]
- )
+
+ cmd = 'view.settings.getval @focus flowview_mode_%s %s' % (idx, self.master.options.console_default_contentview)
+ return self.master.commands.execute(cmd)
@command.command("console.key.contexts")
def key_contexts(self) -> typing.Sequence[str]:
@@ -576,11 +573,11 @@ class ConsoleAddon:
@command.command("console.key.bind")
def key_bind(
- self,
- contexts: typing.Sequence[str],
- key: str,
- cmd: mitmproxy.types.Cmd,
- *args: mitmproxy.types.Arg
+ self,
+ contexts: typing.Sequence[str],
+ key: str,
+ cmd: mitmproxy.types.Cmd,
+ *args: mitmproxy.types.CmdArgs
) -> None:
"""
Bind a shortcut key.
diff --git a/mitmproxy/tools/console/defaultkeys.py b/mitmproxy/tools/console/defaultkeys.py
index 0a6c5561..a0f27625 100644
--- a/mitmproxy/tools/console/defaultkeys.py
+++ b/mitmproxy/tools/console/defaultkeys.py
@@ -26,7 +26,7 @@ def map(km):
km.add("ctrl f", "console.nav.pagedown", ["global"], "Page down")
km.add("ctrl b", "console.nav.pageup", ["global"], "Page up")
- km.add("I", "set intercept_active=toggle", ["global"], "Toggle intercept")
+ km.add("I", "set intercept_active toggle", ["global"], "Toggle intercept")
km.add("i", "console.command.set intercept", ["global"], "Set intercept")
km.add("W", "console.command.set save_stream_file", ["global"], "Stream to file")
km.add("A", "flow.resume @all", ["flowlist", "flowview"], "Resume all intercepted flows")
@@ -48,14 +48,14 @@ def map(km):
"Export this flow to file"
)
km.add("f", "console.command.set view_filter", ["flowlist"], "Set view filter")
- km.add("F", "set console_focus_follow=toggle", ["flowlist"], "Set focus follow")
+ km.add("F", "set console_focus_follow toggle", ["flowlist"], "Set focus follow")
km.add(
"ctrl l",
"console.command cut.clip ",
["flowlist", "flowview"],
"Send cuts to clipboard"
)
- km.add("L", "console.command view.load ", ["flowlist"], "Load flows from file")
+ km.add("L", "console.command view.flows.load ", ["flowlist"], "Load flows from file")
km.add("m", "flow.mark.toggle @focus", ["flowlist"], "Toggle mark on this flow")
km.add("M", "view.properties.marked.toggle", ["flowlist"], "Toggle viewing marked flows")
km.add(
@@ -68,14 +68,14 @@ def map(km):
"o",
"""
console.choose.cmd Order view.order.options
- set view_order={choice}
+ set view_order {choice}
""",
["flowlist"],
"Set flow list order"
)
km.add("r", "replay.client @focus", ["flowlist", "flowview"], "Replay this flow")
km.add("S", "console.command replay.server ", ["flowlist"], "Start server replay")
- km.add("v", "set view_order_reversed=toggle", ["flowlist"], "Reverse flow list order")
+ km.add("v", "set view_order_reversed toggle", ["flowlist"], "Reverse flow list order")
km.add("U", "flow.mark @all false", ["flowlist"], "Un-set all marks")
km.add("w", "console.command save.file @shown ", ["flowlist"], "Save listed flows to file")
km.add("V", "flow.revert @focus", ["flowlist", "flowview"], "Revert changes to this flow")
diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py
index 56f0674f..43f5170d 100644
--- a/mitmproxy/tools/console/statusbar.py
+++ b/mitmproxy/tools/console/statusbar.py
@@ -1,4 +1,5 @@
import os.path
+from typing import Optional
import urwid
@@ -98,10 +99,15 @@ class ActionBar(urwid.WidgetWrap):
self._w = urwid.Edit(self.prep_prompt(prompt), text or "")
self.prompting = PromptStub(callback, args)
- def sig_prompt_command(self, sender, partial=""):
+ def sig_prompt_command(self, sender, partial: str = "", cursor: Optional[int] = None):
signals.focus.send(self, section="footer")
- self._w = commander.CommandEdit(self.master, partial,
- self.command_history)
+ self._w = commander.CommandEdit(
+ self.master,
+ partial,
+ self.command_history,
+ )
+ if cursor is not None:
+ self._w.cbuf.cursor = cursor
self.prompting = commandexecutor.CommandExecutor(self.master)
def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()):
diff --git a/mitmproxy/types.py b/mitmproxy/types.py
index 0634e4d7..ac992217 100644
--- a/mitmproxy/types.py
+++ b/mitmproxy/types.py
@@ -5,6 +5,9 @@ import typing
from mitmproxy import exceptions
from mitmproxy import flow
+if typing.TYPE_CHECKING: # pragma: no cover
+ from mitmproxy.command import CommandManager
+
class Path(str):
pass
@@ -14,7 +17,7 @@ class Cmd(str):
pass
-class Arg(str):
+class CmdArgs(str):
pass
@@ -22,6 +25,10 @@ class Unknown(str):
pass
+class Space(str):
+ pass
+
+
class CutSpec(typing.Sequence[str]):
pass
@@ -40,27 +47,11 @@ class Choice:
return False
-# One of the many charming things about mypy is that introducing type
-# annotations can cause circular dependencies where there were none before.
-# Rather than putting types and the CommandManger in the same file, we introduce
-# a stub type with the signature we use.
-class _CommandBase:
- commands: typing.MutableMapping[str, typing.Any] = {}
-
- def call_strings(self, path: str, args: typing.Sequence[str]) -> typing.Any:
- raise NotImplementedError
-
- def execute(self, cmd: str) -> typing.Any:
- raise NotImplementedError
-
-
class _BaseType:
typ: typing.Type = object
display: str = ""
- def completion(
- self, manager: _CommandBase, t: typing.Any, s: str
- ) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: typing.Any, s: str) -> typing.Sequence[str]:
"""
Returns a list of completion strings for a given prefix. The strings
returned don't necessarily need to be suffixes of the prefix, since
@@ -68,9 +59,7 @@ class _BaseType:
"""
raise NotImplementedError
- def parse(
- self, manager: _CommandBase, typ: typing.Any, s: str
- ) -> typing.Any:
+ def parse(self, manager: "CommandManager", typ: typing.Any, s: str) -> typing.Any:
"""
Parse a string, given the specific type instance (to allow rich type annotations like Choice) and a string.
@@ -78,7 +67,7 @@ class _BaseType:
"""
raise NotImplementedError
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
"""
Check if data is valid for this type.
"""
@@ -89,10 +78,10 @@ class _BoolType(_BaseType):
typ = bool
display = "bool"
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return ["false", "true"]
- def parse(self, manager: _CommandBase, t: type, s: str) -> bool:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> bool:
if s == "true":
return True
elif s == "false":
@@ -102,7 +91,7 @@ class _BoolType(_BaseType):
"Booleans are 'true' or 'false', got %s" % s
)
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return val in [True, False]
@@ -110,13 +99,13 @@ class _StrType(_BaseType):
typ = str
display = "str"
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return []
- def parse(self, manager: _CommandBase, t: type, s: str) -> str:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> str:
return s
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return isinstance(val, str)
@@ -124,13 +113,13 @@ class _UnknownType(_BaseType):
typ = Unknown
display = "unknown"
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return []
- def parse(self, manager: _CommandBase, t: type, s: str) -> str:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> str:
return s
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return False
@@ -138,16 +127,16 @@ class _IntType(_BaseType):
typ = int
display = "int"
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return []
- def parse(self, manager: _CommandBase, t: type, s: str) -> int:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> int:
try:
return int(s)
except ValueError as e:
raise exceptions.TypeError from e
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return isinstance(val, int)
@@ -155,7 +144,7 @@ class _PathType(_BaseType):
typ = Path
display = "path"
- def completion(self, manager: _CommandBase, t: type, start: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, start: str) -> typing.Sequence[str]:
if not start:
start = "./"
path = os.path.expanduser(start)
@@ -177,10 +166,10 @@ class _PathType(_BaseType):
ret.sort()
return ret
- def parse(self, manager: _CommandBase, t: type, s: str) -> str:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> str:
return os.path.expanduser(s)
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return isinstance(val, str)
@@ -188,43 +177,43 @@ class _CmdType(_BaseType):
typ = Cmd
display = "cmd"
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return list(manager.commands.keys())
- def parse(self, manager: _CommandBase, t: type, s: str) -> str:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> str:
if s not in manager.commands:
raise exceptions.TypeError("Unknown command: %s" % s)
return s
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return val in manager.commands
class _ArgType(_BaseType):
- typ = Arg
+ typ = CmdArgs
display = "arg"
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return []
- def parse(self, manager: _CommandBase, t: type, s: str) -> str:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> str:
return s
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return isinstance(val, str)
class _StrSeqType(_BaseType):
typ = typing.Sequence[str]
- display = "[str]"
+ display = "str[]"
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return []
- def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return [x.strip() for x in s.split(",")]
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
if isinstance(val, str) or isinstance(val, bytes):
return False
try:
@@ -238,7 +227,7 @@ class _StrSeqType(_BaseType):
class _CutSpecType(_BaseType):
typ = CutSpec
- display = "[cut]"
+ display = "cut[]"
valid_prefixes = [
"request.method",
"request.scheme",
@@ -277,7 +266,7 @@ class _CutSpecType(_BaseType):
"server_conn.tls_established",
]
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
spec = s.split(",")
opts = []
for pref in self.valid_prefixes:
@@ -285,11 +274,11 @@ class _CutSpecType(_BaseType):
opts.append(",".join(spec))
return opts
- def parse(self, manager: _CommandBase, t: type, s: str) -> CutSpec:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> CutSpec:
parts: typing.Any = s.split(",")
return parts
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
if not isinstance(val, str):
return False
parts = [x.strip() for x in val.split(",")]
@@ -327,7 +316,7 @@ class _BaseFlowType(_BaseType):
"~c",
]
- def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:
return self.valid_prefixes
@@ -335,9 +324,9 @@ class _FlowType(_BaseFlowType):
typ = flow.Flow
display = "flow"
- def parse(self, manager: _CommandBase, t: type, s: str) -> flow.Flow:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> flow.Flow:
try:
- flows = manager.call_strings("view.flows.resolve", [s])
+ flows = manager.execute("view.flows.resolve %s" % (s))
except exceptions.CommandError as e:
raise exceptions.TypeError from e
if len(flows) != 1:
@@ -346,21 +335,21 @@ class _FlowType(_BaseFlowType):
)
return flows[0]
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
return isinstance(val, flow.Flow)
class _FlowsType(_BaseFlowType):
typ = typing.Sequence[flow.Flow]
- display = "[flow]"
+ display = "flow[]"
- def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[flow.Flow]:
+ def parse(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[flow.Flow]:
try:
- return manager.call_strings("view.flows.resolve", [s])
+ return manager.execute("view.flows.resolve %s" % (s))
except exceptions.CommandError as e:
raise exceptions.TypeError from e
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
try:
for v in val:
if not isinstance(v, flow.Flow):
@@ -372,19 +361,19 @@ class _FlowsType(_BaseFlowType):
class _DataType(_BaseType):
typ = Data
- display = "[data]"
+ display = "data[][]"
def completion(
- self, manager: _CommandBase, t: type, s: str
+ self, manager: "CommandManager", t: type, s: str
) -> typing.Sequence[str]: # pragma: no cover
raise exceptions.TypeError("data cannot be passed as argument")
def parse(
- self, manager: _CommandBase, t: type, s: str
+ self, manager: "CommandManager", t: type, s: str
) -> typing.Any: # pragma: no cover
raise exceptions.TypeError("data cannot be passed as argument")
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
# FIXME: validate that all rows have equal length, and all columns have equal types
try:
for row in val:
@@ -400,16 +389,16 @@ class _ChoiceType(_BaseType):
typ = Choice
display = "choice"
- def completion(self, manager: _CommandBase, t: Choice, s: str) -> typing.Sequence[str]:
+ def completion(self, manager: "CommandManager", t: Choice, s: str) -> typing.Sequence[str]:
return manager.execute(t.options_command)
- def parse(self, manager: _CommandBase, t: Choice, s: str) -> str:
+ def parse(self, manager: "CommandManager", t: Choice, s: str) -> str:
opts = manager.execute(t.options_command)
if s not in opts:
raise exceptions.TypeError("Invalid choice.")
return s
- def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool:
+ def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:
try:
opts = manager.execute(typ.options_command)
except exceptions.CommandError:
@@ -423,7 +412,7 @@ class TypeManager:
for t in types:
self.typemap[t.typ] = t()
- def get(self, t: typing.Optional[typing.Type], default=None) -> _BaseType:
+ def get(self, t: typing.Optional[typing.Type], default=None) -> typing.Optional[_BaseType]:
if type(t) in self.typemap:
return self.typemap[type(t)]
return self.typemap.get(t, default)