diff options
| -rw-r--r-- | examples/complex/remote_debug.py | 4 | ||||
| -rw-r--r-- | mitmproxy/addons/core.py | 49 | ||||
| -rw-r--r-- | mitmproxy/addons/export.py | 20 | ||||
| -rw-r--r-- | mitmproxy/addons/view.py | 58 | ||||
| -rw-r--r-- | mitmproxy/command.py | 320 | ||||
| -rw-r--r-- | mitmproxy/command_lexer.py | 49 | ||||
| -rw-r--r-- | mitmproxy/tools/console/commander/commander.py | 157 | ||||
| -rw-r--r-- | mitmproxy/tools/console/commands.py | 23 | ||||
| -rw-r--r-- | mitmproxy/tools/console/consoleaddons.py | 131 | ||||
| -rw-r--r-- | mitmproxy/tools/console/defaultkeys.py | 10 | ||||
| -rw-r--r-- | mitmproxy/tools/console/statusbar.py | 12 | ||||
| -rw-r--r-- | mitmproxy/types.py | 125 | ||||
| -rw-r--r-- | setup.cfg | 2 | ||||
| -rw-r--r-- | test/mitmproxy/addons/test_core.py | 2 | ||||
| -rw-r--r-- | test/mitmproxy/addons/test_save.py | 2 | ||||
| -rw-r--r-- | test/mitmproxy/test_command.py | 321 | ||||
| -rw-r--r-- | test/mitmproxy/test_command_lexer.py | 38 | ||||
| -rw-r--r-- | test/mitmproxy/test_types.py | 15 | ||||
| -rw-r--r-- | test/mitmproxy/tools/console/test_commander.py | 143 | ||||
| -rw-r--r-- | test/mitmproxy/tools/console/test_defaultkeys.py | 19 | 
20 files changed, 949 insertions, 551 deletions
| diff --git a/examples/complex/remote_debug.py b/examples/complex/remote_debug.py index fa6f3d33..4b117bdb 100644 --- a/examples/complex/remote_debug.py +++ b/examples/complex/remote_debug.py @@ -15,5 +15,5 @@ Usage:  def load(l): -    import pydevd -    pydevd.settrace("localhost", port=5678, stdoutToServer=True, stderrToServer=True) +    import pydevd_pycharm +    pydevd_pycharm.settrace("localhost", port=5678, stdoutToServer=True, stderrToServer=True, suspend=False) diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py index 5c9bbcd0..8a3acedb 100644 --- a/mitmproxy/addons/core.py +++ b/mitmproxy/addons/core.py @@ -83,15 +83,14 @@ class Core:                      )      @command.command("set") -    def set(self, *spec: str) -> None: +    def set(self, option: str, value: str = "") -> None:          """ -            Set an option of the form "key[=value]". When the value is omitted, -            booleans are set to true, strings and integers are set to None (if -            permitted), and sequences are emptied. Boolean values can be true, -            false or toggle. If multiple specs are passed, they are joined -            into one separated by spaces. +            Set an option. When the value is omitted, booleans are set to true, +            strings and integers are set to None (if permitted), and sequences +            are emptied. Boolean values can be true, false or toggle. +            Multiple values are concatenated with a single space.          """ -        strspec = " ".join(spec) +        strspec = f"{option}={value}"          try:              ctx.options.set(strspec)          except exceptions.OptionsError as e: @@ -109,14 +108,14 @@ class Core:      # FIXME: this will become view.mark later      @command.command("flow.mark") -    def mark(self, flows: typing.Sequence[flow.Flow], val: bool) -> None: +    def mark(self, flows: typing.Sequence[flow.Flow], boolean: bool) -> None:          """              Mark flows.          """          updated = []          for i in flows: -            if i.marked != val: -                i.marked = val +            if i.marked != boolean: +                i.marked = boolean                  updated.append(i)          ctx.master.addons.trigger("update", updated) @@ -169,18 +168,18 @@ class Core:          ]      @command.command("flow.set") -    @command.argument("spec", type=mitmproxy.types.Choice("flow.set.options")) +    @command.argument("attr", type=mitmproxy.types.Choice("flow.set.options"))      def flow_set(          self,          flows: typing.Sequence[flow.Flow], -        spec: str, -        sval: str +        attr: str, +        value: str      ) -> None:          """              Quickly set a number of common values on flows.          """ -        val: typing.Union[int, str] = sval -        if spec == "status_code": +        val: typing.Union[int, str] = value +        if attr == "status_code":              try:                  val = int(val)  # type: ignore              except ValueError as v: @@ -193,13 +192,13 @@ class Core:              req = getattr(f, "request", None)              rupdate = True              if req: -                if spec == "method": +                if attr == "method":                      req.method = val -                elif spec == "host": +                elif attr == "host":                      req.host = val -                elif spec == "path": +                elif attr == "path":                      req.path = val -                elif spec == "url": +                elif attr == "url":                      try:                          req.url = val                      except ValueError as e: @@ -212,11 +211,11 @@ class Core:              resp = getattr(f, "response", None)              supdate = True              if resp: -                if spec == "status_code": +                if attr == "status_code":                      resp.status_code = val                      if val in status_codes.RESPONSES:                          resp.reason = status_codes.RESPONSES[val]  # type: ignore -                elif spec == "reason": +                elif attr == "reason":                      resp.reason = val                  else:                      supdate = False @@ -225,7 +224,7 @@ class Core:                  updated.append(f)          ctx.master.addons.trigger("update", updated) -        ctx.log.alert("Set %s on  %s flows." % (spec, len(updated))) +        ctx.log.alert("Set %s on  %s flows." % (attr, len(updated)))      @command.command("flow.decode")      def decode(self, flows: typing.Sequence[flow.Flow], part: str) -> None: @@ -262,12 +261,12 @@ class Core:          ctx.log.alert("Toggled encoding on %s flows." % len(updated))      @command.command("flow.encode") -    @command.argument("enc", type=mitmproxy.types.Choice("flow.encode.options")) +    @command.argument("encoding", type=mitmproxy.types.Choice("flow.encode.options"))      def encode(          self,          flows: typing.Sequence[flow.Flow],          part: str, -        enc: str, +        encoding: str,      ) -> None:          """              Encode flows with a specified encoding. @@ -279,7 +278,7 @@ class Core:                  current_enc = p.headers.get("content-encoding", "identity")                  if current_enc == "identity":                      f.backup() -                    p.encode(enc) +                    p.encode(encoding)                      updated.append(f)          ctx.master.addons.trigger("update", updated)          ctx.log.alert("Encoded %s flows." % len(updated)) diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py index 68df9374..d874c95a 100644 --- a/mitmproxy/addons/export.py +++ b/mitmproxy/addons/export.py @@ -121,14 +121,14 @@ class Export():          return list(sorted(formats.keys()))      @command.command("export.file") -    def file(self, fmt: str, f: flow.Flow, path: mitmproxy.types.Path) -> None: +    def file(self, format: str, flow: flow.Flow, path: mitmproxy.types.Path) -> None:          """              Export a flow to path.          """ -        if fmt not in formats: -            raise exceptions.CommandError("No such export format: %s" % fmt) -        func: typing.Any = formats[fmt] -        v = func(f) +        if format not in formats: +            raise exceptions.CommandError("No such export format: %s" % format) +        func: typing.Any = formats[format] +        v = func(flow)          try:              with open(path, "wb") as fp:                  if isinstance(v, bytes): @@ -139,14 +139,14 @@ class Export():              ctx.log.error(str(e))      @command.command("export.clip") -    def clip(self, fmt: str, f: flow.Flow) -> None: +    def clip(self, format: str, flow: flow.Flow) -> None:          """              Export a flow to the system clipboard.          """ -        if fmt not in formats: -            raise exceptions.CommandError("No such export format: %s" % fmt) -        func: typing.Any = formats[fmt] -        v = strutils.always_str(func(f)) +        if format not in formats: +            raise exceptions.CommandError("No such export format: %s" % format) +        func: typing.Any = formats[format] +        v = strutils.always_str(func(flow))          try:              pyperclip.copy(v)          except pyperclip.PyperclipException as e: diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py index da9d19f9..1d57d781 100644 --- a/mitmproxy/addons/view.py +++ b/mitmproxy/addons/view.py @@ -217,7 +217,7 @@ class View(collections.abc.Sequence):      # Focus      @command.command("view.focus.go") -    def go(self, dst: int) -> None: +    def go(self, offset: int) -> None:          """              Go to a specified offset. Positive offests are from the beginning of              the view, negative from the end of the view, so that 0 is the first @@ -225,13 +225,13 @@ class View(collections.abc.Sequence):          """          if len(self) == 0:              return -        if dst < 0: -            dst = len(self) + dst -        if dst < 0: -            dst = 0 -        if dst > len(self) - 1: -            dst = len(self) - 1 -        self.focus.flow = self[dst] +        if offset < 0: +            offset = len(self) + offset +        if offset < 0: +            offset = 0 +        if offset > len(self) - 1: +            offset = len(self) - 1 +        self.focus.flow = self[offset]      @command.command("view.focus.next")      def focus_next(self) -> None: @@ -266,20 +266,20 @@ class View(collections.abc.Sequence):          return list(sorted(self.orders.keys()))      @command.command("view.order.reverse") -    def set_reversed(self, value: bool) -> None: -        self.order_reversed = value +    def set_reversed(self, boolean: bool) -> None: +        self.order_reversed = boolean          self.sig_view_refresh.send(self)      @command.command("view.order.set") -    def set_order(self, order: str) -> None: +    def set_order(self, order_key: str) -> None:          """              Sets the current view order.          """ -        if order not in self.orders: +        if order_key not in self.orders:              raise exceptions.CommandError( -                "Unknown flow order: %s" % order +                "Unknown flow order: %s" % order_key              ) -        order_key = self.orders[order] +        order_key = self.orders[order_key]          self.order_key = order_key          newview = sortedcontainers.SortedListWithKey(key=order_key)          newview.update(self._view) @@ -298,16 +298,16 @@ class View(collections.abc.Sequence):      # Filter      @command.command("view.filter.set") -    def set_filter_cmd(self, f: str) -> None: +    def set_filter_cmd(self, filter_expr: str) -> None:          """              Sets the current view filter.          """          filt = None -        if f: -            filt = flowfilter.parse(f) +        if filter_expr: +            filt = flowfilter.parse(filter_expr)              if not filt:                  raise exceptions.CommandError( -                    "Invalid interception filter: %s" % f +                    "Invalid interception filter: %s" % filter_expr                  )          self.set_filter(filt) @@ -340,11 +340,11 @@ class View(collections.abc.Sequence):      # View Settings      @command.command("view.settings.getval") -    def getvalue(self, f: mitmproxy.flow.Flow, key: str, default: str) -> str: +    def getvalue(self, flow: mitmproxy.flow.Flow, key: str, default: str) -> str:          """              Get a value from the settings store for the specified flow.          """ -        return self.settings[f].get(key, default) +        return self.settings[flow].get(key, default)      @command.command("view.settings.setval.toggle")      def setvalue_toggle( @@ -412,26 +412,26 @@ class View(collections.abc.Sequence):              ctx.log.alert("Removed %s flows" % len(flows))      @command.command("view.flows.resolve") -    def resolve(self, spec: str) -> typing.Sequence[mitmproxy.flow.Flow]: +    def resolve(self, flow_spec: str) -> typing.Sequence[mitmproxy.flow.Flow]:          """              Resolve a flow list specification to an actual list of flows.          """ -        if spec == "@all": +        if flow_spec == "@all":              return [i for i in self._store.values()] -        if spec == "@focus": +        if flow_spec == "@focus":              return [self.focus.flow] if self.focus.flow else [] -        elif spec == "@shown": +        elif flow_spec == "@shown":              return [i for i in self] -        elif spec == "@hidden": +        elif flow_spec == "@hidden":              return [i for i in self._store.values() if i not in self._view] -        elif spec == "@marked": +        elif flow_spec == "@marked":              return [i for i in self._store.values() if i.marked] -        elif spec == "@unmarked": +        elif flow_spec == "@unmarked":              return [i for i in self._store.values() if not i.marked]          else: -            filt = flowfilter.parse(spec) +            filt = flowfilter.parse(flow_spec)              if not filt: -                raise exceptions.CommandError("Invalid flow filter: %s" % spec) +                raise exceptions.CommandError("Invalid flow filter: %s" % flow_spec)              return [i for i in self._store.values() if filt(i)]      @command.command("view.flows.create") diff --git a/mitmproxy/command.py b/mitmproxy/command.py index 0998601c..6977ff91 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -1,20 +1,19 @@  """      This module manages and invokes typed commands.  """ +import functools  import inspect +import sys +import textwrap  import types -import io  import typing -import shlex -import textwrap -import functools -import sys -from mitmproxy import exceptions  import mitmproxy.types +from mitmproxy import exceptions, command_lexer +from mitmproxy.command_lexer import unquote -def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None: +def verify_arg_signature(f: typing.Callable, args: typing.Iterable[typing.Any], kwargs: dict) -> None:      sig = inspect.signature(f)      try:          sig.bind(*args, **kwargs) @@ -22,15 +21,6 @@ def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None:          raise exceptions.CommandError("command argument mismatch: %s" % v.args[0]) -def lexer(s): -    # mypy mis-identifies shlex.shlex as abstract -    lex = shlex.shlex(s, posix=True)  # type: ignore -    lex.wordchars += "." -    lex.whitespace_split = True -    lex.commenters = '' -    return lex - -  def typename(t: type) -> str:      """          Translates a type to an explanatory string. @@ -43,208 +33,234 @@ def typename(t: type) -> str:      return to.display +def _empty_as_none(x: typing.Any) -> typing.Any: +    if x == inspect.Signature.empty: +        return None +    return x + + +class CommandParameter(typing.NamedTuple): +    name: str +    type: typing.Type +    kind: inspect._ParameterKind = inspect.Parameter.POSITIONAL_OR_KEYWORD + +    def __str__(self): +        if self.kind is inspect.Parameter.VAR_POSITIONAL: +            return f"*{self.name}" +        else: +            return self.name + +  class Command: -    returntype: typing.Optional[typing.Type] +    name: str +    manager: "CommandManager" +    signature: inspect.Signature +    help: typing.Optional[str] -    def __init__(self, manager, path, func) -> None: -        self.path = path +    def __init__(self, manager: "CommandManager", name: str, func: typing.Callable) -> None: +        self.name = name          self.manager = manager          self.func = func -        sig = inspect.signature(self.func) -        self.help = None +        self.signature = inspect.signature(self.func) +          if func.__doc__:              txt = func.__doc__.strip()              self.help = "\n".join(textwrap.wrap(txt)) - -        self.has_positional = False -        for i in sig.parameters.values(): -            # This is the kind for *args parameters -            if i.kind == i.VAR_POSITIONAL: -                self.has_positional = True -        self.paramtypes = [v.annotation for v in sig.parameters.values()] -        if sig.return_annotation == inspect._empty:  # type: ignore -            self.returntype = None          else: -            self.returntype = sig.return_annotation +            self.help = None +          # This fails with a CommandException if types are invalid -        self.signature_help() +        for name, parameter in self.signature.parameters.items(): +            t = parameter.annotation +            if not mitmproxy.types.CommandTypes.get(parameter.annotation, None): +                raise exceptions.CommandError(f"Argument {name} has an unknown type ({_empty_as_none(t)}) in {func}.") +        if self.return_type and not mitmproxy.types.CommandTypes.get(self.return_type, None): +            raise exceptions.CommandError(f"Return type has an unknown type ({self.return_type}) in {func}.") + +    @property +    def return_type(self) -> typing.Optional[typing.Type]: +        return _empty_as_none(self.signature.return_annotation) + +    @property +    def parameters(self) -> typing.List[CommandParameter]: +        """Returns a list of CommandParameters.""" +        ret = [] +        for name, param in self.signature.parameters.items(): +            ret.append(CommandParameter(name, param.annotation, param.kind)) +        return ret + +    def signature_help(self) -> str: +        params = " ".join(str(param) for param in self.parameters) +        if self.return_type: +            ret = f" -> {typename(self.return_type)}" +        else: +            ret = "" +        return f"{self.name} {params}{ret}" -    def paramnames(self) -> typing.Sequence[str]: -        v = [typename(i) for i in self.paramtypes] -        if self.has_positional: -            v[-1] = "*" + v[-1] -        return v +    def prepare_args(self, args: typing.Sequence[str]) -> inspect.BoundArguments: +        try: +            bound_arguments = self.signature.bind(*args) +        except TypeError as v: +            raise exceptions.CommandError(f"Command argument mismatch: {v.args[0]}") -    def retname(self) -> str: -        return typename(self.returntype) if self.returntype else "" +        for name, value in bound_arguments.arguments.items(): +            convert_to = self.signature.parameters[name].annotation +            bound_arguments.arguments[name] = parsearg(self.manager, value, convert_to) -    def signature_help(self) -> str: -        params = " ".join(self.paramnames()) -        ret = self.retname() -        if ret: -            ret = " -> " + ret -        return "%s %s%s" % (self.path, params, ret) - -    def prepare_args(self, args: typing.Sequence[str]) -> typing.List[typing.Any]: -        verify_arg_signature(self.func, list(args), {}) - -        remainder: typing.Sequence[str] = [] -        if self.has_positional: -            remainder = args[len(self.paramtypes) - 1:] -            args = args[:len(self.paramtypes) - 1] - -        pargs = [] -        for arg, paramtype in zip(args, self.paramtypes): -            pargs.append(parsearg(self.manager, arg, paramtype)) -        pargs.extend(remainder) -        return pargs +        bound_arguments.apply_defaults() + +        return bound_arguments      def call(self, args: typing.Sequence[str]) -> typing.Any:          """ -            Call the command with a list of arguments. At this point, all -            arguments are strings. +        Call the command with a list of arguments. At this point, all +        arguments are strings.          """ -        ret = self.func(*self.prepare_args(args)) -        if ret is None and self.returntype is None: +        bound_args = self.prepare_args(args) +        ret = self.func(*bound_args.args, **bound_args.kwargs) +        if ret is None and self.return_type is None:              return -        typ = mitmproxy.types.CommandTypes.get(self.returntype) +        typ = mitmproxy.types.CommandTypes.get(self.return_type) +        assert typ          if not typ.is_valid(self.manager, typ, ret):              raise exceptions.CommandError( -                "%s returned unexpected data - expected %s" % ( -                    self.path, typ.display -                ) +                f"{self.name} returned unexpected data - expected {typ.display}"              )          return ret -ParseResult = typing.NamedTuple( -    "ParseResult", -    [ -        ("value", str), -        ("type", typing.Type), -        ("valid", bool), -    ], -) +class ParseResult(typing.NamedTuple): +    value: str +    type: typing.Type +    valid: bool + +class CommandManager: +    commands: typing.Dict[str, Command] -class CommandManager(mitmproxy.types._CommandBase):      def __init__(self, master):          self.master = master -        self.commands: typing.Dict[str, Command] = {} +        self.commands = {}      def collect_commands(self, addon):          for i in dir(addon):              if not i.startswith("__"):                  o = getattr(addon, i)                  try: -                    is_command = hasattr(o, "command_path") +                    is_command = hasattr(o, "command_name")                  except Exception:                      pass  # hasattr may raise if o implements __getattr__.                  else:                      if is_command:                          try: -                            self.add(o.command_path, o) +                            self.add(o.command_name, o)                          except exceptions.CommandError as e:                              self.master.log.warn( -                                "Could not load command %s: %s" % (o.command_path, e) +                                "Could not load command %s: %s" % (o.command_name, e)                              )      def add(self, path: str, func: typing.Callable):          self.commands[path] = Command(self, path, func) +    @functools.lru_cache(maxsize=128)      def parse_partial( -        self, -        cmdstr: str -    ) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[str]]: +            self, +            cmdstr: str +    ) -> typing.Tuple[typing.Sequence[ParseResult], typing.Sequence[CommandParameter]]:          """ -            Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items. +        Parse a possibly partial command. Return a sequence of ParseResults and a sequence of remainder type help items.          """ -        buf = io.StringIO(cmdstr) -        parts: typing.List[str] = [] -        lex = lexer(buf) -        while 1: -            remainder = cmdstr[buf.tell():] -            try: -                t = lex.get_token() -            except ValueError: -                parts.append(remainder) -                break -            if not t: -                break -            parts.append(t) -        if not parts: -            parts = [""] -        elif cmdstr.endswith(" "): -            parts.append("") - -        parse: typing.List[ParseResult] = [] -        params: typing.List[type] = [] -        typ: typing.Type -        for i in range(len(parts)): -            if i == 0: -                typ = mitmproxy.types.Cmd -                if parts[i] in self.commands: -                    params.extend(self.commands[parts[i]].paramtypes) -            elif params: -                typ = params.pop(0) -                if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg: -                    if parts[i] in self.commands: -                        params[:] = self.commands[parts[i]].paramtypes + +        parts: typing.List[str] = command_lexer.expr.parseString(cmdstr, parseAll=True) + +        parsed: typing.List[ParseResult] = [] +        next_params: typing.List[CommandParameter] = [ +            CommandParameter("", mitmproxy.types.Cmd), +            CommandParameter("", mitmproxy.types.CmdArgs), +        ] +        expected: typing.Optional[CommandParameter] = None +        for part in parts: +            if part.isspace(): +                parsed.append( +                    ParseResult( +                        value=part, +                        type=mitmproxy.types.Space, +                        valid=True, +                    ) +                ) +                continue + +            if expected and expected.kind is inspect.Parameter.VAR_POSITIONAL: +                assert not next_params +            elif next_params: +                expected = next_params.pop(0)              else: -                typ = mitmproxy.types.Unknown +                expected = CommandParameter("", mitmproxy.types.Unknown) -            to = mitmproxy.types.CommandTypes.get(typ, None) +            arg_is_known_command = ( +                    expected.type == mitmproxy.types.Cmd and part in self.commands +            ) +            arg_is_unknown_command = ( +                    expected.type == mitmproxy.types.Cmd and part not in self.commands +            ) +            command_args_following = ( +                    next_params and next_params[0].type == mitmproxy.types.CmdArgs +            ) +            if arg_is_known_command and command_args_following: +                next_params = self.commands[part].parameters + next_params[1:] +            if arg_is_unknown_command and command_args_following: +                next_params.pop(0) + +            to = mitmproxy.types.CommandTypes.get(expected.type, None)              valid = False              if to:                  try: -                    to.parse(self, typ, parts[i]) +                    to.parse(self, expected.type, part)                  except exceptions.TypeError:                      valid = False                  else:                      valid = True -            parse.append( +            parsed.append(                  ParseResult( -                    value=parts[i], -                    type=typ, +                    value=part, +                    type=expected.type,                      valid=valid,                  )              ) -        remhelp: typing.List[str] = [] -        for x in params: -            remt = mitmproxy.types.CommandTypes.get(x, None) -            remhelp.append(remt.display) - -        return parse, remhelp +        return parsed, next_params -    def call(self, path: str, *args: typing.Sequence[typing.Any]) -> typing.Any: +    def call(self, command_name: str, *args: typing.Sequence[typing.Any]) -> typing.Any:          """ -            Call a command with native arguments. May raise CommandError. +        Call a command with native arguments. May raise CommandError.          """ -        if path not in self.commands: -            raise exceptions.CommandError("Unknown command: %s" % path) -        return self.commands[path].func(*args) +        if command_name not in self.commands: +            raise exceptions.CommandError("Unknown command: %s" % command_name) +        return self.commands[command_name].func(*args) -    def call_strings(self, path: str, args: typing.Sequence[str]) -> typing.Any: +    def _call_strings(self, command_name: str, args: typing.Sequence[str]) -> typing.Any:          """ -            Call a command using a list of string arguments. May raise CommandError. +        Call a command using a list of string arguments. May raise CommandError.          """ -        if path not in self.commands: -            raise exceptions.CommandError("Unknown command: %s" % path) -        return self.commands[path].call(args) +        if command_name not in self.commands: +            raise exceptions.CommandError("Unknown command: %s" % command_name) -    def execute(self, cmdstr: str): +        return self.commands[command_name].call(args) + +    def execute(self, cmdstr: str) -> typing.Any:          """ -            Execute a command string. May raise CommandError. +        Execute a command string. May raise CommandError.          """ -        try: -            parts = list(lexer(cmdstr)) -        except ValueError as e: -            raise exceptions.CommandError("Command error: %s" % e) -        if not len(parts) >= 1: -            raise exceptions.CommandError("Invalid command: %s" % cmdstr) -        return self.call_strings(parts[0], parts[1:]) +        parts, _ = self.parse_partial(cmdstr) +        if not parts: +            raise exceptions.CommandError(f"Invalid command: {cmdstr!r}") +        command_name, *args = [ +            unquote(part.value) +            for part in parts +            if part.type != mitmproxy.types.Space +        ] +        return self._call_strings(command_name, args)      def dump(self, out=sys.stdout) -> None:          cmds = list(self.commands.values()) @@ -262,21 +278,23 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any:      """      t = mitmproxy.types.CommandTypes.get(argtype, None)      if not t: -        raise exceptions.CommandError("Unsupported argument type: %s" % argtype) +        raise exceptions.CommandError(f"Unsupported argument type: {argtype}")      try: -        return t.parse(manager, argtype, spec)  # type: ignore +        return t.parse(manager, argtype, spec)      except exceptions.TypeError as e:          raise exceptions.CommandError from e -def command(path): +def command(name: typing.Optional[str] = None):      def decorator(function):          @functools.wraps(function)          def wrapper(*args, **kwargs):              verify_arg_signature(function, args, kwargs)              return function(*args, **kwargs) -        wrapper.__dict__["command_path"] = path + +        wrapper.__dict__["command_name"] = name or function.__name__.replace("_", ".")          return wrapper +      return decorator @@ -286,8 +304,10 @@ def argument(name, type):          specific types such as mitmproxy.types.Choice, which we cannot annotate          directly as mypy does not like that.      """ +      def decorator(f: types.FunctionType) -> types.FunctionType:          assert name in f.__annotations__          f.__annotations__[name] = type          return f +      return decorator diff --git a/mitmproxy/command_lexer.py b/mitmproxy/command_lexer.py new file mode 100644 index 00000000..f042f3c9 --- /dev/null +++ b/mitmproxy/command_lexer.py @@ -0,0 +1,49 @@ +import ast +import re + +import pyparsing + +# TODO: There is a lot of work to be done here. +# The current implementation is written in a way that _any_ input is valid, +# which does not make sense once things get more complex. + +PartialQuotedString = pyparsing.Regex( +    re.compile( +        r''' +            (["'])  # start quote +            (?: +                (?!\1)[^\\]  # unescaped character that is not our quote nor the begin of an escape sequence. We can't use \1 in [] +                | +                (?:\\.)  # escape sequence +            )* +            (?:\1|$)  # end quote +        ''', +        re.VERBOSE +    ) +) + +expr = pyparsing.ZeroOrMore( +    PartialQuotedString +    | pyparsing.Word(" \r\n\t") +    | pyparsing.CharsNotIn("""'" \r\n\t""") +).leaveWhitespace() + + +def quote(val: str) -> str: +    if val and all(char not in val for char in "'\" \r\n\t"): +        return val +    return repr(val)  # TODO: More of a hack. + + +def unquote(x: str) -> str: +    quoted = ( +            (x.startswith('"') and x.endswith('"')) +            or +            (x.startswith("'") and x.endswith("'")) +    ) +    if quoted: +        try: +            x = ast.literal_eval(x) +        except Exception: +            x = x[1:-1] +    return x diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index f291b8fd..d751422b 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -1,58 +1,55 @@  import abc +import collections  import copy  import typing -import collections  import urwid  from urwid.text_layout import calc_coords +import mitmproxy.command  import mitmproxy.flow  import mitmproxy.master -import mitmproxy.command  import mitmproxy.types -class Completer:  # pragma: no cover +class Completer:      @abc.abstractmethod -    def cycle(self) -> str: -        pass +    def cycle(self, forward: bool = True) -> str: +        raise NotImplementedError()  class ListCompleter(Completer):      def __init__( -        self, -        start: str, -        options: typing.Sequence[str], +            self, +            start: str, +            options: typing.Sequence[str],      ) -> None:          self.start = start -        self.options: typing.Sequence[str] = [] +        self.options: typing.List[str] = []          for o in options:              if o.startswith(start):                  self.options.append(o)          self.options.sort()          self.offset = 0 -    def cycle(self) -> str: +    def cycle(self, forward: bool = True) -> str:          if not self.options:              return self.start          ret = self.options[self.offset] -        self.offset = (self.offset + 1) % len(self.options) +        delta = 1 if forward else -1 +        self.offset = (self.offset + delta) % len(self.options)          return ret -CompletionState = typing.NamedTuple( -    "CompletionState", -    [ -        ("completer", Completer), -        ("parse", typing.Sequence[mitmproxy.command.ParseResult]) -    ] -) +class CompletionState(typing.NamedTuple): +    completer: Completer +    parsed: typing.Sequence[mitmproxy.command.ParseResult]  class CommandBuffer:      def __init__(self, master: mitmproxy.master.Master, start: str = "") -> None:          self.master = master -        self.text = self.flatten(start) +        self.text = start          # Cursor is always within the range [0:len(buffer)].          self._cursor = len(self.text)          self.completion: typing.Optional[CompletionState] = None @@ -70,51 +67,30 @@ class CommandBuffer:          else:              self._cursor = x -    def maybequote(self, value): -        if " " in value and not value.startswith("\""): -            return "\"%s\"" % value -        return value - -    def parse_quoted(self, txt): -        parts, remhelp = self.master.commands.parse_partial(txt) -        for i, p in enumerate(parts): -            parts[i] = mitmproxy.command.ParseResult( -                value = self.maybequote(p.value), -                type = p.type, -                valid = p.valid -            ) -        return parts, remhelp -      def render(self): -        """ -            This function is somewhat tricky - in order to make the cursor -            position valid, we have to make sure there is a -            character-for-character offset match in the rendered output, up -            to the cursor. Beyond that, we can add stuff. -        """ -        parts, remhelp = self.parse_quoted(self.text) +        parts, remaining = self.master.commands.parse_partial(self.text)          ret = [] -        for p in parts: -            if p.valid: -                if p.type == mitmproxy.types.Cmd: -                    ret.append(("commander_command", p.value)) -                else: -                    ret.append(("text", p.value)) -            elif p.value: -                ret.append(("commander_invalid", p.value)) -            else: -                ret.append(("text", "")) -            ret.append(("text", " ")) -        if remhelp: -            ret.append(("text", " ")) -            for v in remhelp: -                ret.append(("commander_hint", "%s " % v)) -        return ret +        if not parts: +            # Means we just received the leader, so we need to give a blank +            # text to the widget to render or it crashes +            ret.append(("text", "")) +        else: +            for p in parts: +                if p.valid: +                    if p.type == mitmproxy.types.Cmd: +                        ret.append(("commander_command", p.value)) +                    else: +                        ret.append(("text", p.value)) +                elif p.value: +                    ret.append(("commander_invalid", p.value)) + +            if remaining: +                if parts[-1].type != mitmproxy.types.Space: +                    ret.append(("text", " ")) +                for param in remaining: +                    ret.append(("commander_hint", f"{param} ")) -    def flatten(self, txt): -        parts, _ = self.parse_quoted(txt) -        ret = [x.value for x in parts] -        return " ".join(ret) +        return ret      def left(self) -> None:          self.cursor = self.cursor - 1 @@ -122,30 +98,38 @@ class CommandBuffer:      def right(self) -> None:          self.cursor = self.cursor + 1 -    def cycle_completion(self) -> None: +    def cycle_completion(self, forward: bool = True) -> None:          if not self.completion: -            parts, remainhelp = self.master.commands.parse_partial(self.text[:self.cursor]) -            last = parts[-1] -            ct = mitmproxy.types.CommandTypes.get(last.type, None) +            parts, remaining = self.master.commands.parse_partial(self.text[:self.cursor]) +            if parts and parts[-1].type != mitmproxy.types.Space: +                type_to_complete = parts[-1].type +                cycle_prefix = parts[-1].value +                parsed = parts[:-1] +            elif remaining: +                type_to_complete = remaining[0].type +                cycle_prefix = "" +                parsed = parts +            else: +                return +            ct = mitmproxy.types.CommandTypes.get(type_to_complete, None)              if ct:                  self.completion = CompletionState( -                    completer = ListCompleter( -                        parts[-1].value, -                        ct.completion(self.master.commands, last.type, parts[-1].value) +                    completer=ListCompleter( +                        cycle_prefix, +                        ct.completion(self.master.commands, type_to_complete, cycle_prefix)                      ), -                    parse = parts, +                    parsed=parsed,                  )          if self.completion: -            nxt = self.completion.completer.cycle() -            buf = " ".join([i.value for i in self.completion.parse[:-1]]) + " " + nxt -            buf = buf.strip() -            self.text = self.flatten(buf) +            nxt = self.completion.completer.cycle(forward) +            buf = "".join([i.value for i in self.completion.parsed]) + nxt +            self.text = buf              self.cursor = len(self.text)      def backspace(self) -> None:          if self.cursor == 0:              return -        self.text = self.flatten(self.text[:self.cursor - 1] + self.text[self.cursor:]) +        self.text = self.text[:self.cursor - 1] + self.text[self.cursor:]          self.cursor = self.cursor - 1          self.completion = None @@ -153,13 +137,18 @@ class CommandBuffer:          """              Inserts text at the cursor.          """ -        self.text = self.flatten(self.text[:self.cursor] + k + self.text[self.cursor:]) -        self.cursor += 1 + +        # We don't want to insert a space before the command +        if k == ' ' and self.text[0:self.cursor].strip() == '': +            return + +        self.text = self.text[:self.cursor] + k + self.text[self.cursor:] +        self.cursor += len(k)          self.completion = None  class CommandHistory: -    def __init__(self, master: mitmproxy.master.Master, size: int=30) -> None: +    def __init__(self, master: mitmproxy.master.Master, size: int = 30) -> None:          self.saved_commands: collections.deque = collections.deque(              [CommandBuffer(master, "")],              maxlen=size @@ -182,7 +171,7 @@ class CommandHistory:              return self.saved_commands[self.index]          return None -    def add_command(self, command: CommandBuffer, execution: bool=False) -> None: +    def add_command(self, command: CommandBuffer, execution: bool = False) -> None:          if self.index == self.last_index or execution:              last_item = self.saved_commands[-1]              last_item_empty = not last_item.text @@ -207,7 +196,7 @@ class CommandEdit(urwid.WidgetWrap):          self.history = history          self.update() -    def keypress(self, size, key): +    def keypress(self, size, key) -> None:          if key == "backspace":              self.cbuf.backspace()          elif key == "left": @@ -219,27 +208,29 @@ class CommandEdit(urwid.WidgetWrap):              self.cbuf = self.history.get_prev() or self.cbuf          elif key == "down":              self.cbuf = self.history.get_next() or self.cbuf +        elif key == "shift tab": +            self.cbuf.cycle_completion(False)          elif key == "tab":              self.cbuf.cycle_completion()          elif len(key) == 1:              self.cbuf.insert(key)          self.update() -    def update(self): +    def update(self) -> None:          self._w.set_text([self.leader, self.cbuf.render()]) -    def render(self, size, focus=False): +    def render(self, size, focus=False) -> urwid.Canvas:          (maxcol,) = size          canv = self._w.render((maxcol,))          canv = urwid.CompositeCanvas(canv)          canv.cursor = self.get_cursor_coords((maxcol,))          return canv -    def get_cursor_coords(self, size): +    def get_cursor_coords(self, size) -> typing.Tuple[int, int]:          p = self.cbuf.cursor + len(self.leader)          trans = self._w.get_line_translation(size[0])          x, y = calc_coords(self._w.get_text()[0], trans, p)          return x, y -    def get_edit_text(self): +    def get_edit_text(self) -> str:          return self.cbuf.text diff --git a/mitmproxy/tools/console/commands.py b/mitmproxy/tools/console/commands.py index 0f35742b..26a99b14 100644 --- a/mitmproxy/tools/console/commands.py +++ b/mitmproxy/tools/console/commands.py @@ -1,6 +1,8 @@  import urwid  import blinker  import textwrap + +from mitmproxy import command  from mitmproxy.tools.console import layoutwidget  from mitmproxy.tools.console import signals @@ -10,7 +12,7 @@ command_focus_change = blinker.Signal()  class CommandItem(urwid.WidgetWrap): -    def __init__(self, walker, cmd, focused): +    def __init__(self, walker, cmd: command.Command, focused: bool):          self.walker, self.cmd, self.focused = walker, cmd, focused          super().__init__(None)          self._w = self.get_widget() @@ -18,15 +20,18 @@ class CommandItem(urwid.WidgetWrap):      def get_widget(self):          parts = [              ("focus", ">> " if self.focused else "   "), -            ("title", self.cmd.path), -            ("text", " "), -            ("text", " ".join(self.cmd.paramnames())), +            ("title", self.cmd.name)          ] -        if self.cmd.returntype: -            parts.append([ +        if self.cmd.parameters: +            parts += [ +                ("text", " "), +                ("text", " ".join(str(param) for param in self.cmd.parameters)), +            ] +        if self.cmd.return_type: +            parts += [                  ("title", " -> "), -                ("text", self.cmd.retname()), -            ]) +                ("text", command.typename(self.cmd.return_type)), +            ]          return urwid.AttrMap(              urwid.Padding(urwid.Text(parts)), @@ -92,7 +97,7 @@ class CommandsList(urwid.ListBox):      def keypress(self, size, key):          if key == "m_select":              foc, idx = self.get_focus() -            signals.status_prompt_command.send(partial=foc.cmd.path + " ") +            signals.status_prompt_command.send(partial=foc.cmd.name + " ")          elif key == "m_start":              self.set_focus(0)              self.walker._modified() diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py index 9f595b42..7fcd9b48 100644 --- a/mitmproxy/tools/console/consoleaddons.py +++ b/mitmproxy/tools/console/consoleaddons.py @@ -1,21 +1,18 @@  import csv -import shlex  import typing +import mitmproxy.types +from mitmproxy import command, command_lexer +from mitmproxy import contentviews  from mitmproxy import ctx -from mitmproxy import command  from mitmproxy import exceptions  from mitmproxy import flow  from mitmproxy import http  from mitmproxy import log -from mitmproxy import contentviews -from mitmproxy.utils import strutils -import mitmproxy.types - - +from mitmproxy.tools.console import keymap  from mitmproxy.tools.console import overlay  from mitmproxy.tools.console import signals -from mitmproxy.tools.console import keymap +from mitmproxy.utils import strutils  console_palettes = [      "lowlight", @@ -48,10 +45,12 @@ class UnsupportedLog:      """          A small addon to dump info on flow types we don't support yet.      """ +      def websocket_message(self, f):          message = f.messages[-1]          ctx.log.info(f.message_info(message)) -        ctx.log.debug(message.content if isinstance(message.content, str) else strutils.bytes_to_escaped_str(message.content)) +        ctx.log.debug( +            message.content if isinstance(message.content, str) else strutils.bytes_to_escaped_str(message.content))      def websocket_end(self, f):          ctx.log.info("WebSocket connection closed by {}: {} {}, {}".format( @@ -78,6 +77,7 @@ class ConsoleAddon:          An addon that exposes console-specific commands, and hooks into required          events.      """ +      def __init__(self, master):          self.master = master          self.started = False @@ -86,7 +86,7 @@ class ConsoleAddon:          loader.add_option(              "console_default_contentview", str, "auto",              "The default content view mode.", -            choices = [i.name.lower() for i in contentviews.views] +            choices=[i.name.lower() for i in contentviews.views]          )          loader.add_option(              "console_eventlog_verbosity", str, 'info', @@ -142,7 +142,7 @@ class ConsoleAddon:          opts = self.layout_options()          off = self.layout_options().index(ctx.options.console_layout)          ctx.options.update( -            console_layout = opts[(off + 1) % len(opts)] +            console_layout=opts[(off + 1) % len(opts)]          )      @command.command("console.panes.next") @@ -234,17 +234,18 @@ class ConsoleAddon:      @command.command("console.choose")      def console_choose( -        self, -        prompt: str, -        choices: typing.Sequence[str], -        cmd: mitmproxy.types.Cmd, -        *args: mitmproxy.types.Arg +            self, +            prompt: str, +            choices: typing.Sequence[str], +            cmd: mitmproxy.types.Cmd, +            *args: mitmproxy.types.CmdArgs      ) -> None:          """              Prompt the user to choose from a specified list of strings, then              invoke another command with all occurrences of {choice} replaced by              the choice the user made.          """ +          def callback(opt):              # We're now outside of the call context...              repl = cmd + " " + " ".join(args) @@ -260,22 +261,22 @@ class ConsoleAddon:      @command.command("console.choose.cmd")      def console_choose_cmd( -        self, -        prompt: str, -        choicecmd: mitmproxy.types.Cmd, -        subcmd: mitmproxy.types.Cmd, -        *args: mitmproxy.types.Arg +            self, +            prompt: str, +            choicecmd: mitmproxy.types.Cmd, +            subcmd: mitmproxy.types.Cmd, +            *args: mitmproxy.types.CmdArgs      ) -> None:          """              Prompt the user to choose from a list of strings returned by a              command, then invoke another command with all occurrences of {choice}              replaced by the choice the user made.          """ -        choices = ctx.master.commands.call_strings(choicecmd, []) +        choices = ctx.master.commands.execute(choicecmd)          def callback(opt):              # We're now outside of the call context... -            repl = shlex.quote(" ".join(args)) +            repl = " ".join(command_lexer.quote(x) for x in args)              repl = repl.replace("{choice}", opt)              try:                  self.master.commands.execute(subcmd + " " + repl) @@ -287,21 +288,24 @@ class ConsoleAddon:          )      @command.command("console.command") -    def console_command(self, *partial: str) -> None: +    def console_command(self, *command_str: str) -> None:          """          Prompt the user to edit a command with a (possibly empty) starting value.          """ -        signals.status_prompt_command.send(partial=" ".join(partial))  # type: ignore +        quoted = " ".join(command_lexer.quote(x) for x in command_str) +        signals.status_prompt_command.send(partial=quoted)      @command.command("console.command.set") -    def console_command_set(self, option: str) -> None: +    def console_command_set(self, option_name: str) -> None:          """ -        Prompt the user to set an option of the form "key[=value]". +        Prompt the user to set an option.          """ -        option_value = getattr(self.master.options, option, None) -        current_value = option_value if option_value else "" -        self.master.commands.execute( -            "console.command set %s=%s" % (option, current_value) +        option_value = getattr(self.master.options, option_name, None) or "" +        set_command = f"set {option_name} {option_value!r}" +        cursor = len(set_command) - 1 +        signals.status_prompt_command.send( +            partial=set_command, +            cursor=cursor          )      @command.command("console.view.keybindings") @@ -351,14 +355,14 @@ class ConsoleAddon:      @command.command("console.bodyview")      @command.argument("part", type=mitmproxy.types.Choice("console.bodyview.options")) -    def bodyview(self, f: flow.Flow, part: str) -> None: +    def bodyview(self, flow: flow.Flow, part: str) -> None:          """              Spawn an external viewer for a flow request or response body based              on the detected MIME type. We use the mailcap system to find the              correct viewier, and fall back to the programs in $PAGER or $EDITOR              if necessary.          """ -        fpart = getattr(f, part, None) +        fpart = getattr(flow, part, None)          if not fpart:              raise exceptions.CommandError("Part must be either request or response, not %s." % part)          t = fpart.headers.get("content-type") @@ -397,8 +401,8 @@ class ConsoleAddon:          ]      @command.command("console.edit.focus") -    @command.argument("part", type=mitmproxy.types.Choice("console.edit.focus.options")) -    def edit_focus(self, part: str) -> None: +    @command.argument("flow_part", type=mitmproxy.types.Choice("console.edit.focus.options")) +    def edit_focus(self, flow_part: str) -> None:          """              Edit a component of the currently focused flow.          """ @@ -410,27 +414,27 @@ class ConsoleAddon:          flow.backup()          require_dummy_response = ( -            part in ("response-headers", "response-body", "set-cookies") and -            flow.response is None +                flow_part in ("response-headers", "response-body", "set-cookies") and +                flow.response is None          )          if require_dummy_response:              flow.response = http.HTTPResponse.make() -        if part == "cookies": +        if flow_part == "cookies":              self.master.switch_view("edit_focus_cookies") -        elif part == "urlencoded form": +        elif flow_part == "urlencoded form":              self.master.switch_view("edit_focus_urlencoded_form") -        elif part == "multipart form": +        elif flow_part == "multipart form":              self.master.switch_view("edit_focus_multipart_form") -        elif part == "path": +        elif flow_part == "path":              self.master.switch_view("edit_focus_path") -        elif part == "query": +        elif flow_part == "query":              self.master.switch_view("edit_focus_query") -        elif part == "request-headers": +        elif flow_part == "request-headers":              self.master.switch_view("edit_focus_request_headers") -        elif part == "response-headers": +        elif flow_part == "response-headers":              self.master.switch_view("edit_focus_response_headers") -        elif part in ("request-body", "response-body"): -            if part == "request-body": +        elif flow_part in ("request-body", "response-body"): +            if flow_part == "request-body":                  message = flow.request              else:                  message = flow.response @@ -442,16 +446,16 @@ class ConsoleAddon:              # just strip the newlines off the end of the body when we return              # from an editor.              message.content = c.rstrip(b"\n") -        elif part == "set-cookies": +        elif flow_part == "set-cookies":              self.master.switch_view("edit_focus_setcookies") -        elif part == "url": +        elif flow_part == "url":              url = flow.request.url.encode()              edited_url = self.master.spawn_editor(url)              url = edited_url.rstrip(b"\n")              flow.request.url = url.decode() -        elif part in ["method", "status_code", "reason"]: +        elif flow_part in ["method", "status_code", "reason"]:              self.master.commands.execute( -                "console.command flow.set @focus %s " % part +                "console.command flow.set @focus %s " % flow_part              )      def _grideditor(self): @@ -535,10 +539,8 @@ class ConsoleAddon:              raise exceptions.CommandError("Invalid flowview mode.")          try: -            self.master.commands.call_strings( -                "view.settings.setval", -                ["@focus", "flowview_mode_%s" % idx, mode] -            ) +            cmd = 'view.settings.setval @focus flowview_mode_%s %s' % (idx, mode) +            self.master.commands.execute(cmd)          except exceptions.CommandError as e:              signals.status_message.send(message=str(e)) @@ -558,14 +560,9 @@ class ConsoleAddon:          if not fv:              raise exceptions.CommandError("Not viewing a flow.")          idx = fv.body.tab_offset -        return self.master.commands.call_strings( -            "view.settings.getval", -            [ -                "@focus", -                "flowview_mode_%s" % idx, -                self.master.options.console_default_contentview, -            ] -        ) + +        cmd = 'view.settings.getval @focus flowview_mode_%s %s' % (idx, self.master.options.console_default_contentview) +        return self.master.commands.execute(cmd)      @command.command("console.key.contexts")      def key_contexts(self) -> typing.Sequence[str]: @@ -576,11 +573,11 @@ class ConsoleAddon:      @command.command("console.key.bind")      def key_bind( -        self, -        contexts: typing.Sequence[str], -        key: str, -        cmd: mitmproxy.types.Cmd, -        *args: mitmproxy.types.Arg +            self, +            contexts: typing.Sequence[str], +            key: str, +            cmd: mitmproxy.types.Cmd, +            *args: mitmproxy.types.CmdArgs      ) -> None:          """              Bind a shortcut key. diff --git a/mitmproxy/tools/console/defaultkeys.py b/mitmproxy/tools/console/defaultkeys.py index 0a6c5561..a0f27625 100644 --- a/mitmproxy/tools/console/defaultkeys.py +++ b/mitmproxy/tools/console/defaultkeys.py @@ -26,7 +26,7 @@ def map(km):      km.add("ctrl f", "console.nav.pagedown", ["global"], "Page down")      km.add("ctrl b", "console.nav.pageup", ["global"], "Page up") -    km.add("I", "set intercept_active=toggle", ["global"], "Toggle intercept") +    km.add("I", "set intercept_active toggle", ["global"], "Toggle intercept")      km.add("i", "console.command.set intercept", ["global"], "Set intercept")      km.add("W", "console.command.set save_stream_file", ["global"], "Stream to file")      km.add("A", "flow.resume @all", ["flowlist", "flowview"], "Resume all intercepted flows") @@ -48,14 +48,14 @@ def map(km):          "Export this flow to file"      )      km.add("f", "console.command.set view_filter", ["flowlist"], "Set view filter") -    km.add("F", "set console_focus_follow=toggle", ["flowlist"], "Set focus follow") +    km.add("F", "set console_focus_follow toggle", ["flowlist"], "Set focus follow")      km.add(          "ctrl l",          "console.command cut.clip ",          ["flowlist", "flowview"],          "Send cuts to clipboard"      ) -    km.add("L", "console.command view.load ", ["flowlist"], "Load flows from file") +    km.add("L", "console.command view.flows.load ", ["flowlist"], "Load flows from file")      km.add("m", "flow.mark.toggle @focus", ["flowlist"], "Toggle mark on this flow")      km.add("M", "view.properties.marked.toggle", ["flowlist"], "Toggle viewing marked flows")      km.add( @@ -68,14 +68,14 @@ def map(km):          "o",          """          console.choose.cmd Order view.order.options -        set view_order={choice} +        set view_order {choice}          """,          ["flowlist"],          "Set flow list order"      )      km.add("r", "replay.client @focus", ["flowlist", "flowview"], "Replay this flow")      km.add("S", "console.command replay.server ", ["flowlist"], "Start server replay") -    km.add("v", "set view_order_reversed=toggle", ["flowlist"], "Reverse flow list order") +    km.add("v", "set view_order_reversed toggle", ["flowlist"], "Reverse flow list order")      km.add("U", "flow.mark @all false", ["flowlist"], "Un-set all marks")      km.add("w", "console.command save.file @shown ", ["flowlist"], "Save listed flows to file")      km.add("V", "flow.revert @focus", ["flowlist", "flowview"], "Revert changes to this flow") diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index 56f0674f..43f5170d 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -1,4 +1,5 @@  import os.path +from typing import Optional  import urwid @@ -98,10 +99,15 @@ class ActionBar(urwid.WidgetWrap):          self._w = urwid.Edit(self.prep_prompt(prompt), text or "")          self.prompting = PromptStub(callback, args) -    def sig_prompt_command(self, sender, partial=""): +    def sig_prompt_command(self, sender, partial: str = "", cursor: Optional[int] = None):          signals.focus.send(self, section="footer") -        self._w = commander.CommandEdit(self.master, partial, -                                        self.command_history) +        self._w = commander.CommandEdit( +            self.master, +            partial, +            self.command_history, +        ) +        if cursor is not None: +            self._w.cbuf.cursor = cursor          self.prompting = commandexecutor.CommandExecutor(self.master)      def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()): diff --git a/mitmproxy/types.py b/mitmproxy/types.py index 0634e4d7..ac992217 100644 --- a/mitmproxy/types.py +++ b/mitmproxy/types.py @@ -5,6 +5,9 @@ import typing  from mitmproxy import exceptions  from mitmproxy import flow +if typing.TYPE_CHECKING:  # pragma: no cover +    from mitmproxy.command import CommandManager +  class Path(str):      pass @@ -14,7 +17,7 @@ class Cmd(str):      pass -class Arg(str): +class CmdArgs(str):      pass @@ -22,6 +25,10 @@ class Unknown(str):      pass +class Space(str): +    pass + +  class CutSpec(typing.Sequence[str]):      pass @@ -40,27 +47,11 @@ class Choice:          return False -# One of the many charming things about mypy is that introducing type -# annotations can cause circular dependencies where there were none before. -# Rather than putting types and the CommandManger in the same file, we introduce -# a stub type with the signature we use. -class _CommandBase: -    commands: typing.MutableMapping[str, typing.Any] = {} - -    def call_strings(self, path: str, args: typing.Sequence[str]) -> typing.Any: -        raise NotImplementedError - -    def execute(self, cmd: str) -> typing.Any: -        raise NotImplementedError - -  class _BaseType:      typ: typing.Type = object      display: str = "" -    def completion( -        self, manager: _CommandBase, t: typing.Any, s: str -    ) -> typing.Sequence[str]: +    def completion(self, manager: "CommandManager", t: typing.Any, s: str) -> typing.Sequence[str]:          """              Returns a list of completion strings for a given prefix. The strings              returned don't necessarily need to be suffixes of the prefix, since @@ -68,9 +59,7 @@ class _BaseType:          """          raise NotImplementedError -    def parse( -        self, manager: _CommandBase, typ: typing.Any, s: str -    ) -> typing.Any: +    def parse(self, manager: "CommandManager", typ: typing.Any, s: str) -> typing.Any:          """              Parse a string, given the specific type instance (to allow rich type annotations like Choice) and a string. @@ -78,7 +67,7 @@ class _BaseType:          """          raise NotImplementedError -    def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: +    def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:          """              Check if data is valid for this type.          """ @@ -89,10 +78,10 @@ class _BoolType(_BaseType):      typ = bool      display = "bool" -    def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: +    def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:          return ["false", "true"] -    def parse(self, manager: _CommandBase, t: type, s: str) -> bool: +    def parse(self, manager: "CommandManager", t: type, s: str) -> bool:          if s == "true":              return True          elif s == "false": @@ -102,7 +91,7 @@ class _BoolType(_BaseType):                  "Booleans are 'true' or 'false', got %s" % s              ) -    def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: +    def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:          return val in [True, False] @@ -110,13 +99,13 @@ class _StrType(_BaseType):      typ = str      display = "str" -    def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: +    def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:          return [] -    def parse(self, manager: _CommandBase, t: type, s: str) -> str: +    def parse(self, manager: "CommandManager", t: type, s: str) -> str:          return s -    def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: +    def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:          return isinstance(val, str) @@ -124,13 +113,13 @@ class _UnknownType(_BaseType):      typ = Unknown      display = "unknown" -    def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: +    def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:          return [] -    def parse(self, manager: _CommandBase, t: type, s: str) -> str: +    def parse(self, manager: "CommandManager", t: type, s: str) -> str:          return s -    def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: +    def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:          return False @@ -138,16 +127,16 @@ class _IntType(_BaseType):      typ = int      display = "int" -    def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: +    def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:          return [] -    def parse(self, manager: _CommandBase, t: type, s: str) -> int: +    def parse(self, manager: "CommandManager", t: type, s: str) -> int:          try:              return int(s)          except ValueError as e:              raise exceptions.TypeError from e -    def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: +    def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:          return isinstance(val, int) @@ -155,7 +144,7 @@ class _PathType(_BaseType):      typ = Path      display = "path" -    def completion(self, manager: _CommandBase, t: type, start: str) -> typing.Sequence[str]: +    def completion(self, manager: "CommandManager", t: type, start: str) -> typing.Sequence[str]:          if not start:              start = "./"          path = os.path.expanduser(start) @@ -177,10 +166,10 @@ class _PathType(_BaseType):          ret.sort()          return ret -    def parse(self, manager: _CommandBase, t: type, s: str) -> str: +    def parse(self, manager: "CommandManager", t: type, s: str) -> str:          return os.path.expanduser(s) -    def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: +    def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:          return isinstance(val, str) @@ -188,43 +177,43 @@ class _CmdType(_BaseType):      typ = Cmd      display = "cmd" -    def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: +    def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:          return list(manager.commands.keys()) -    def parse(self, manager: _CommandBase, t: type, s: str) -> str: +    def parse(self, manager: "CommandManager", t: type, s: str) -> str:          if s not in manager.commands:              raise exceptions.TypeError("Unknown command: %s" % s)          return s -    def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: +    def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:          return val in manager.commands  class _ArgType(_BaseType): -    typ = Arg +    typ = CmdArgs      display = "arg" -    def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: +    def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:          return [] -    def parse(self, manager: _CommandBase, t: type, s: str) -> str: +    def parse(self, manager: "CommandManager", t: type, s: str) -> str:          return s -    def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: +    def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:          return isinstance(val, str)  class _StrSeqType(_BaseType):      typ = typing.Sequence[str] -    display = "[str]" +    display = "str[]" -    def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: +    def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:          return [] -    def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: +    def parse(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:          return [x.strip() for x in s.split(",")] -    def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: +    def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:          if isinstance(val, str) or isinstance(val, bytes):              return False          try: @@ -238,7 +227,7 @@ class _StrSeqType(_BaseType):  class _CutSpecType(_BaseType):      typ = CutSpec -    display = "[cut]" +    display = "cut[]"      valid_prefixes = [          "request.method",          "request.scheme", @@ -277,7 +266,7 @@ class _CutSpecType(_BaseType):          "server_conn.tls_established",      ] -    def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: +    def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:          spec = s.split(",")          opts = []          for pref in self.valid_prefixes: @@ -285,11 +274,11 @@ class _CutSpecType(_BaseType):              opts.append(",".join(spec))          return opts -    def parse(self, manager: _CommandBase, t: type, s: str) -> CutSpec: +    def parse(self, manager: "CommandManager", t: type, s: str) -> CutSpec:          parts: typing.Any = s.split(",")          return parts -    def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: +    def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:          if not isinstance(val, str):              return False          parts = [x.strip() for x in val.split(",")] @@ -327,7 +316,7 @@ class _BaseFlowType(_BaseType):          "~c",      ] -    def completion(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[str]: +    def completion(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[str]:          return self.valid_prefixes @@ -335,9 +324,9 @@ class _FlowType(_BaseFlowType):      typ = flow.Flow      display = "flow" -    def parse(self, manager: _CommandBase, t: type, s: str) -> flow.Flow: +    def parse(self, manager: "CommandManager", t: type, s: str) -> flow.Flow:          try: -            flows = manager.call_strings("view.flows.resolve", [s]) +            flows = manager.execute("view.flows.resolve %s" % (s))          except exceptions.CommandError as e:              raise exceptions.TypeError from e          if len(flows) != 1: @@ -346,21 +335,21 @@ class _FlowType(_BaseFlowType):              )          return flows[0] -    def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: +    def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:          return isinstance(val, flow.Flow)  class _FlowsType(_BaseFlowType):      typ = typing.Sequence[flow.Flow] -    display = "[flow]" +    display = "flow[]" -    def parse(self, manager: _CommandBase, t: type, s: str) -> typing.Sequence[flow.Flow]: +    def parse(self, manager: "CommandManager", t: type, s: str) -> typing.Sequence[flow.Flow]:          try: -            return manager.call_strings("view.flows.resolve", [s]) +            return manager.execute("view.flows.resolve %s" % (s))          except exceptions.CommandError as e:              raise exceptions.TypeError from e -    def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: +    def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:          try:              for v in val:                  if not isinstance(v, flow.Flow): @@ -372,19 +361,19 @@ class _FlowsType(_BaseFlowType):  class _DataType(_BaseType):      typ = Data -    display = "[data]" +    display = "data[][]"      def completion( -        self, manager: _CommandBase, t: type, s: str +        self, manager: "CommandManager", t: type, s: str      ) -> typing.Sequence[str]:  # pragma: no cover          raise exceptions.TypeError("data cannot be passed as argument")      def parse( -        self, manager: _CommandBase, t: type, s: str +        self, manager: "CommandManager", t: type, s: str      ) -> typing.Any:  # pragma: no cover          raise exceptions.TypeError("data cannot be passed as argument") -    def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: +    def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:          # FIXME: validate that all rows have equal length, and all columns have equal types          try:              for row in val: @@ -400,16 +389,16 @@ class _ChoiceType(_BaseType):      typ = Choice      display = "choice" -    def completion(self, manager: _CommandBase, t: Choice, s: str) -> typing.Sequence[str]: +    def completion(self, manager: "CommandManager", t: Choice, s: str) -> typing.Sequence[str]:          return manager.execute(t.options_command) -    def parse(self, manager: _CommandBase, t: Choice, s: str) -> str: +    def parse(self, manager: "CommandManager", t: Choice, s: str) -> str:          opts = manager.execute(t.options_command)          if s not in opts:              raise exceptions.TypeError("Invalid choice.")          return s -    def is_valid(self, manager: _CommandBase, typ: typing.Any, val: typing.Any) -> bool: +    def is_valid(self, manager: "CommandManager", typ: typing.Any, val: typing.Any) -> bool:          try:              opts = manager.execute(typ.options_command)          except exceptions.CommandError: @@ -423,7 +412,7 @@ class TypeManager:          for t in types:              self.typemap[t.typ] = t() -    def get(self, t: typing.Optional[typing.Type], default=None) -> _BaseType: +    def get(self, t: typing.Optional[typing.Type], default=None) -> typing.Optional[_BaseType]:          if type(t) in self.typemap:              return self.typemap[type(t)]          return self.typemap.get(t, default) @@ -18,6 +18,8 @@ show_missing = True  exclude_lines =      pragma: no cover      raise NotImplementedError() +    if typing.TYPE_CHECKING: +    if TYPE_CHECKING:  [mypy]  ignore_missing_imports = True diff --git a/test/mitmproxy/addons/test_core.py b/test/mitmproxy/addons/test_core.py index 59875c2b..e6924ead 100644 --- a/test/mitmproxy/addons/test_core.py +++ b/test/mitmproxy/addons/test_core.py @@ -11,7 +11,7 @@ def test_set():      sa = core.Core()      with taddons.context(loadcore=False) as tctx:          assert tctx.master.options.server -        tctx.command(sa.set, "server=false") +        tctx.command(sa.set, "server", "false")          assert not tctx.master.options.server          with pytest.raises(exceptions.CommandError): diff --git a/test/mitmproxy/addons/test_save.py b/test/mitmproxy/addons/test_save.py index 4aa1f648..6727a96f 100644 --- a/test/mitmproxy/addons/test_save.py +++ b/test/mitmproxy/addons/test_save.py @@ -73,7 +73,7 @@ def test_save_command(tmpdir):          v = view.View()          tctx.master.addons.add(v)          tctx.master.addons.add(sa) -        tctx.master.commands.call_strings("save.file", ["@shown", p]) +        tctx.master.commands.execute("save.file @shown %s" % p)  def test_simple(tmpdir): diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py index d9dcf5f9..a432f9e3 100644 --- a/test/mitmproxy/test_command.py +++ b/test/mitmproxy/test_command.py @@ -1,13 +1,15 @@ -import typing  import inspect +import io +import typing + +import pytest + +import mitmproxy.types  from mitmproxy import command -from mitmproxy import flow  from mitmproxy import exceptions -from mitmproxy.test import tflow +from mitmproxy import flow  from mitmproxy.test import taddons -import mitmproxy.types -import io -import pytest +from mitmproxy.test import tflow  class TAddon: @@ -29,7 +31,7 @@ class TAddon:          return "ok"      @command.command("subcommand") -    def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.Arg) -> str: +    def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.CmdArgs) -> str:          return "ok"      @command.command("empty") @@ -83,17 +85,15 @@ class TestCommand:              with pytest.raises(exceptions.CommandError):                  command.Command(cm, "invalidret", a.invalidret)              with pytest.raises(exceptions.CommandError): -                command.Command(cm, "invalidarg", a.invalidarg) +                assert command.Command(cm, "invalidarg", a.invalidarg)      def test_varargs(self):          with taddons.context() as tctx:              cm = command.CommandManager(tctx.master)              a = TAddon()              c = command.Command(cm, "varargs", a.varargs) -            assert c.signature_help() == "varargs str *str -> [str]" +            assert c.signature_help() == "varargs one *var -> str[]"              assert c.call(["one", "two", "three"]) == ["two", "three"] -            with pytest.raises(exceptions.CommandError): -                c.call(["one", "two", 3])      def test_call(self):          with taddons.context() as tctx: @@ -101,7 +101,7 @@ class TestCommand:              a = TAddon()              c = command.Command(cm, "cmd.path", a.cmd1)              assert c.call(["foo"]) == "ret foo" -            assert c.signature_help() == "cmd.path str -> str" +            assert c.signature_help() == "cmd.path foo -> str"              c = command.Command(cm, "cmd.two", a.cmd2)              with pytest.raises(exceptions.CommandError): @@ -115,154 +115,305 @@ class TestCommand:              [                  "foo bar",                  [ -                    command.ParseResult( -                        value = "foo", type = mitmproxy.types.Cmd, valid = False -                    ), -                    command.ParseResult( -                        value = "bar", type = mitmproxy.types.Unknown, valid = False -                    ) +                    command.ParseResult(value="foo", type=mitmproxy.types.Cmd, valid=False), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="bar", type=mitmproxy.types.Unknown, valid=False)                  ],                  [],              ],              [                  "cmd1 'bar",                  [ -                    command.ParseResult(value = "cmd1", type = mitmproxy.types.Cmd, valid = True), -                    command.ParseResult(value = "'bar", type = str, valid = True) +                    command.ParseResult(value="cmd1", type=mitmproxy.types.Cmd, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="'bar", type=str, valid=True)                  ],                  [],              ],              [                  "a", -                [command.ParseResult(value = "a", type = mitmproxy.types.Cmd, valid = False)], +                [command.ParseResult(value="a", type=mitmproxy.types.Cmd, valid=False)],                  [],              ],              [                  "", -                [command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = False)], -                [] +                [], +                [ +                    command.CommandParameter("", mitmproxy.types.Cmd), +                    command.CommandParameter("", mitmproxy.types.CmdArgs) +                ]              ],              [                  "cmd3 1",                  [ -                    command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True), -                    command.ParseResult(value = "1", type = int, valid = True), +                    command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="1", type=int, valid=True),                  ],                  []              ],              [                  "cmd3 ",                  [ -                    command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True), -                    command.ParseResult(value = "", type = int, valid = False), +                    command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),                  ], -                [] +                [command.CommandParameter('foo', int)]              ],              [                  "subcommand ",                  [ -                    command.ParseResult( -                        value = "subcommand", type = mitmproxy.types.Cmd, valid = True, -                    ), -                    command.ParseResult(value = "", type = mitmproxy.types.Cmd, valid = False), +                    command.ParseResult(value="subcommand", type=mitmproxy.types.Cmd, valid=True, ), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                ], +                [ +                    command.CommandParameter('cmd', mitmproxy.types.Cmd), +                    command.CommandParameter('args', mitmproxy.types.CmdArgs, kind=inspect.Parameter.VAR_POSITIONAL),                  ], -                ["arg"],              ],              [ -                "subcommand cmd3 ", +                "varargs one",                  [ -                    command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd, valid = True), -                    command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd, valid = True), -                    command.ParseResult(value = "", type = int, valid = False), +                    command.ParseResult(value="varargs", type=mitmproxy.types.Cmd, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="one", type=str, valid=True),                  ], -                [] +                [command.CommandParameter('var', str, kind=inspect.Parameter.VAR_POSITIONAL)]              ],              [ -                "cmd4", +                "varargs one two three",                  [ -                    command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True), +                    command.ParseResult(value="varargs", type=mitmproxy.types.Cmd, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="one", type=str, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="two", type=str, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="three", type=str, valid=True),                  ], -                ["int", "str", "path"] +                [],              ],              [ -                "cmd4 ", +                "subcommand cmd3 ",                  [ -                    command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True), -                    command.ParseResult(value = "", type = int, valid = False), +                    command.ParseResult(value="subcommand", type=mitmproxy.types.Cmd, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="cmd3", type=mitmproxy.types.Cmd, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),                  ], -                ["str", "path"] +                [command.CommandParameter('foo', int)]              ],              [ -                "cmd4 1", +                "cmd4",                  [ -                    command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True), -                    command.ParseResult(value = "1", type = int, valid = True), +                    command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True),                  ], -                ["str", "path"] +                [ +                    command.CommandParameter('a', int), +                    command.CommandParameter('b', str), +                    command.CommandParameter('c', mitmproxy.types.Path), +                ] +            ], +            [ +                "cmd4 ", +                [ +                    command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                ], +                [ +                    command.CommandParameter('a', int), +                    command.CommandParameter('b', str), +                    command.CommandParameter('c', mitmproxy.types.Path), +                ]              ],              [                  "cmd4 1",                  [ -                    command.ParseResult(value = "cmd4", type = mitmproxy.types.Cmd, valid = True), -                    command.ParseResult(value = "1", type = int, valid = True), +                    command.ParseResult(value="cmd4", type=mitmproxy.types.Cmd, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="1", type=int, valid=True),                  ], -                ["str", "path"] +                [ +                    command.CommandParameter('b', str), +                    command.CommandParameter('c', mitmproxy.types.Path), +                ]              ],              [                  "flow",                  [ -                    command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), +                    command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True),                  ], -                ["flow", "str"] +                [ +                    command.CommandParameter('f', flow.Flow), +                    command.CommandParameter('s', str), +                ]              ],              [                  "flow ",                  [ -                    command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), -                    command.ParseResult(value = "", type = flow.Flow, valid = False), +                    command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),                  ], -                ["str"] +                [ +                    command.CommandParameter('f', flow.Flow), +                    command.CommandParameter('s', str), +                ]              ],              [                  "flow x",                  [ -                    command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), -                    command.ParseResult(value = "x", type = flow.Flow, valid = False), +                    command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="x", type=flow.Flow, valid=False),                  ], -                ["str"] +                [ +                    command.CommandParameter('s', str), +                ]              ],              [                  "flow x ",                  [ -                    command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), -                    command.ParseResult(value = "x", type = flow.Flow, valid = False), -                    command.ParseResult(value = "", type = str, valid = True), +                    command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="x", type=flow.Flow, valid=False), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True),                  ], -                [] +                [ +                    command.CommandParameter('s', str), +                ]              ],              [                  "flow \"one two",                  [ -                    command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), -                    command.ParseResult(value = "\"one two", type = flow.Flow, valid = False), +                    command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="\"one two", type=flow.Flow, valid=False),                  ], -                ["str"] +                [ +                    command.CommandParameter('s', str), +                ]              ],              [ -                "flow \"one two\"", +                "flow \"three four\"",                  [ -                    command.ParseResult(value = "flow", type = mitmproxy.types.Cmd, valid = True), -                    command.ParseResult(value = "one two", type = flow.Flow, valid = False), +                    command.ParseResult(value="flow", type=mitmproxy.types.Cmd, valid=True), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value='"three four"', type=flow.Flow, valid=False),                  ], -                ["str"] +                [ +                    command.CommandParameter('s', str), +                ]              ], +            [ +                "spaces '    '", +                [ +                    command.ParseResult(value="spaces", type=mitmproxy.types.Cmd, valid=False), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="'    '", type=mitmproxy.types.Unknown, valid=False) +                ], +                [], +            ], +            [ +                'spaces2 "    "', +                [ +                    command.ParseResult(value="spaces2", type=mitmproxy.types.Cmd, valid=False), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value='"    "', type=mitmproxy.types.Unknown, valid=False) +                ], +                [], +            ], +            [ +                '"abc"', +                [ +                    command.ParseResult(value='"abc"', type=mitmproxy.types.Cmd, valid=False), +                ], +                [], +            ], +            [ +                "'def'", +                [ +                    command.ParseResult(value="'def'", type=mitmproxy.types.Cmd, valid=False), +                ], +                [], +            ], +            [ +                "cmd10 'a' \"b\" c", +                [ +                    command.ParseResult(value="cmd10", type=mitmproxy.types.Cmd, valid=False), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="'a'", type=mitmproxy.types.Unknown, valid=False), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value='"b"', type=mitmproxy.types.Unknown, valid=False), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="c", type=mitmproxy.types.Unknown, valid=False), +                ], +                [], +            ], +            [ +                "cmd11 'a \"b\" c'", +                [ +                    command.ParseResult(value="cmd11", type=mitmproxy.types.Cmd, valid=False), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="'a \"b\" c'", type=mitmproxy.types.Unknown, valid=False), +                ], +                [], +            ], +            [ +                'cmd12 "a \'b\' c"', +                [ +                    command.ParseResult(value="cmd12", type=mitmproxy.types.Cmd, valid=False), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value='"a \'b\' c"', type=mitmproxy.types.Unknown, valid=False), +                ], +                [], +            ], +            [ +                r'cmd13 "a \"b\" c"', +                [ +                    command.ParseResult(value="cmd13", type=mitmproxy.types.Cmd, valid=False), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value=r'"a \"b\" c"', type=mitmproxy.types.Unknown, valid=False), +                ], +                [], +            ], +            [ +                r"cmd14 'a \'b\' c'", +                [ +                    command.ParseResult(value="cmd14", type=mitmproxy.types.Cmd, valid=False), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value=r"'a \'b\' c'", type=mitmproxy.types.Unknown, valid=False), +                ], +                [], +            ], +            [ +                "    spaces_at_the_begining_are_not_stripped", +                [ +                    command.ParseResult(value="    ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="spaces_at_the_begining_are_not_stripped", type=mitmproxy.types.Cmd, +                                        valid=False), +                ], +                [], +            ], +            [ +                "    spaces_at_the_begining_are_not_stripped neither_at_the_end      ", +                [ +                    command.ParseResult(value="    ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="spaces_at_the_begining_are_not_stripped", type=mitmproxy.types.Cmd, +                                        valid=False), +                    command.ParseResult(value=" ", type=mitmproxy.types.Space, valid=True), +                    command.ParseResult(value="neither_at_the_end", type=mitmproxy.types.Unknown, valid=False), +                    command.ParseResult(value="      ", type=mitmproxy.types.Space, valid=True), +                ], +                [], +            ], +          ] +          with taddons.context() as tctx:              tctx.master.addons.add(TAddon())              for s, expected, expectedremain in tests:                  current, remain = tctx.master.commands.parse_partial(s) -                assert current == expected -                assert expectedremain == remain +                assert (s, current, expectedremain) == (s, expected, remain)  def test_simple(): @@ -270,9 +421,11 @@ def test_simple():          c = command.CommandManager(tctx.master)          a = TAddon()          c.add("one.two", a.cmd1) -        assert c.commands["one.two"].help == "cmd1 help" -        assert(c.execute("one.two foo") == "ret foo") -        assert(c.call("one.two", "foo") == "ret foo") +        assert (c.commands["one.two"].help == "cmd1 help") +        assert (c.execute("one.two foo") == "ret foo") +        assert (c.execute("one.two \"foo\"") == "ret foo") +        assert (c.execute("one.two 'foo bar'") == "ret foo bar") +        assert (c.call("one.two", "foo") == "ret foo")          with pytest.raises(exceptions.CommandError, match="Unknown"):              c.execute("nonexistent")          with pytest.raises(exceptions.CommandError, match="Invalid"): @@ -281,8 +434,14 @@ def test_simple():              c.execute("one.two too many args")          with pytest.raises(exceptions.CommandError, match="Unknown"):              c.call("nonexistent") -        with pytest.raises(exceptions.CommandError, match="No escaped"): +        with pytest.raises(exceptions.CommandError, match="Unknown"):              c.execute("\\") +        with pytest.raises(exceptions.CommandError, match="Unknown"): +            c.execute(r"\'") +        with pytest.raises(exceptions.CommandError, match="Unknown"): +            c.execute(r"\"") +        with pytest.raises(exceptions.CommandError, match="Unknown"): +            c.execute(r"\"")          c.add("empty", a.empty)          c.execute("empty") @@ -294,13 +453,13 @@ def test_simple():  def test_typename():      assert command.typename(str) == "str" -    assert command.typename(typing.Sequence[flow.Flow]) == "[flow]" +    assert command.typename(typing.Sequence[flow.Flow]) == "flow[]" -    assert command.typename(mitmproxy.types.Data) == "[data]" -    assert command.typename(mitmproxy.types.CutSpec) == "[cut]" +    assert command.typename(mitmproxy.types.Data) == "data[][]" +    assert command.typename(mitmproxy.types.CutSpec) == "cut[]"      assert command.typename(flow.Flow) == "flow" -    assert command.typename(typing.Sequence[str]) == "[str]" +    assert command.typename(typing.Sequence[str]) == "str[]"      assert command.typename(mitmproxy.types.Choice("foo")) == "choice"      assert command.typename(mitmproxy.types.Path) == "path" diff --git a/test/mitmproxy/test_command_lexer.py b/test/mitmproxy/test_command_lexer.py new file mode 100644 index 00000000..3f009f88 --- /dev/null +++ b/test/mitmproxy/test_command_lexer.py @@ -0,0 +1,38 @@ +import pyparsing +import pytest + +from mitmproxy import command_lexer + + +@pytest.mark.parametrize( +    "test_input,valid", [ +        ("'foo'", True), +        ('"foo"', True), +        ("'foo' bar'", False), +        ("'foo\\' bar'", True), +        ("'foo' 'bar'", False), +        ("'foo'x", False), +        ('''"foo    ''', True), +        ('''"foo 'bar'   ''', True), +    ] +) +def test_partial_quoted_string(test_input, valid): +    if valid: +        assert command_lexer.PartialQuotedString.parseString(test_input, parseAll=True)[0] == test_input +    else: +        with pytest.raises(pyparsing.ParseException): +            command_lexer.PartialQuotedString.parseString(test_input, parseAll=True) + + +@pytest.mark.parametrize( +    "test_input,expected", [ +        ("'foo'", ["'foo'"]), +        ('"foo"', ['"foo"']), +        ("'foo' 'bar'", ["'foo'", ' ', "'bar'"]), +        ("'foo'x", ["'foo'", 'x']), +        ('''"foo''', ['"foo']), +        ('''"foo 'bar' ''', ['''"foo 'bar' ''']), +    ] +) +def test_expr(test_input, expected): +    assert list(command_lexer.expr.parseString(test_input, parseAll=True)) == expected diff --git a/test/mitmproxy/test_types.py b/test/mitmproxy/test_types.py index 571985fb..2cd17d87 100644 --- a/test/mitmproxy/test_types.py +++ b/test/mitmproxy/test_types.py @@ -2,7 +2,6 @@ import pytest  import os  import typing  import contextlib -from unittest import mock  import mitmproxy.exceptions  import mitmproxy.types @@ -64,13 +63,14 @@ def test_int():              b.parse(tctx.master.commands, int, "foo") -def test_path(tdata): +def test_path(tdata, monkeypatch):      with taddons.context() as tctx:          b = mitmproxy.types._PathType()          assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/foo") == "/foo"          assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/bar") == "/bar" -        with mock.patch.dict("os.environ", {"HOME": "/home/test"}): -            assert b.parse(tctx.master.commands, mitmproxy.types.Path, "~/mitm") == "/home/test/mitm" +        monkeypatch.setenv("HOME", "/home/test") +        monkeypatch.setenv("USERPROFILE", "/home/test") +        assert b.parse(tctx.master.commands, mitmproxy.types.Path, "~/mitm") == "/home/test/mitm"          assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, "foo") is True          assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, "~/mitm") is True          assert b.is_valid(tctx.master.commands, mitmproxy.types.Path, 3) is False @@ -127,10 +127,9 @@ def test_cutspec():  def test_arg():      with taddons.context() as tctx:          b = mitmproxy.types._ArgType() -        assert b.completion(tctx.master.commands, mitmproxy.types.Arg, "") == [] -        assert b.parse(tctx.master.commands, mitmproxy.types.Arg, "foo") == "foo" -        assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, "foo") is True -        assert b.is_valid(tctx.master.commands, mitmproxy.types.Arg, 1) is False +        assert b.completion(tctx.master.commands, mitmproxy.types.CmdArgs, "") == [] +        assert b.parse(tctx.master.commands, mitmproxy.types.CmdArgs, "foo") == "foo" +        assert b.is_valid(tctx.master.commands, mitmproxy.types.CmdArgs, 1) is False  def test_strseq(): diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py index b5e226fe..a77be043 100644 --- a/test/mitmproxy/tools/console/test_commander.py +++ b/test/mitmproxy/tools/console/test_commander.py @@ -1,6 +1,7 @@ +import pytest -from mitmproxy.tools.console.commander import commander  from mitmproxy.test import taddons +from mitmproxy.tools.console.commander import commander  class TestListCompleter: @@ -28,6 +29,112 @@ class TestListCompleter:                  assert c.cycle() == expected +class TestCommandEdit: +    def test_open_command_bar(self): +        with taddons.context() as tctx: +            history = commander.CommandHistory(tctx.master, size=3) +            edit = commander.CommandEdit(tctx.master, '', history) + +            try: +                edit.update() +            except IndexError: +                pytest.faied("Unexpected IndexError") + +    def test_insert(self): +        with taddons.context() as tctx: +            history = commander.CommandHistory(tctx.master, size=3) +            edit = commander.CommandEdit(tctx.master, '', history) +            edit.keypress(1, 'a') +            assert edit.get_edit_text() == 'a' + +            # Don't let users type a space before starting a command +            # as a usability feature +            history = commander.CommandHistory(tctx.master, size=3) +            edit = commander.CommandEdit(tctx.master, '', history) +            edit.keypress(1, ' ') +            assert edit.get_edit_text() == '' + +    def test_backspace(self): +        with taddons.context() as tctx: +            history = commander.CommandHistory(tctx.master, size=3) +            edit = commander.CommandEdit(tctx.master, '', history) +            edit.keypress(1, 'a') +            edit.keypress(1, 'b') +            assert edit.get_edit_text() == 'ab' +            edit.keypress(1, 'backspace') +            assert edit.get_edit_text() == 'a' + +    def test_left(self): +        with taddons.context() as tctx: +            history = commander.CommandHistory(tctx.master, size=3) +            edit = commander.CommandEdit(tctx.master, '', history) +            edit.keypress(1, 'a') +            assert edit.cbuf.cursor == 1 +            edit.keypress(1, 'left') +            assert edit.cbuf.cursor == 0 + +            # Do it again to make sure it won't go negative +            edit.keypress(1, 'left') +            assert edit.cbuf.cursor == 0 + +    def test_right(self): +        with taddons.context() as tctx: +            history = commander.CommandHistory(tctx.master, size=3) +            edit = commander.CommandEdit(tctx.master, '', history) +            edit.keypress(1, 'a') +            assert edit.cbuf.cursor == 1 + +            # Make sure cursor won't go past the text +            edit.keypress(1, 'right') +            assert edit.cbuf.cursor == 1 + +            # Make sure cursor goes left and then back right +            edit.keypress(1, 'left') +            assert edit.cbuf.cursor == 0 +            edit.keypress(1, 'right') +            assert edit.cbuf.cursor == 1 + +    def test_up_and_down(self): +        with taddons.context() as tctx: +            history = commander.CommandHistory(tctx.master, size=3) +            edit = commander.CommandEdit(tctx.master, '', history) + +            buf = commander.CommandBuffer(tctx.master, 'cmd1') +            history.add_command(buf) +            buf = commander.CommandBuffer(tctx.master, 'cmd2') +            history.add_command(buf) + +            edit.keypress(1, 'up') +            assert edit.get_edit_text() == 'cmd2' +            edit.keypress(1, 'up') +            assert edit.get_edit_text() == 'cmd1' +            edit.keypress(1, 'up') +            assert edit.get_edit_text() == 'cmd1' + +            history = commander.CommandHistory(tctx.master, size=5) +            edit = commander.CommandEdit(tctx.master, '', history) +            edit.keypress(1, 'a') +            edit.keypress(1, 'b') +            edit.keypress(1, 'c') +            assert edit.get_edit_text() == 'abc' +            edit.keypress(1, 'up') +            assert edit.get_edit_text() == '' +            edit.keypress(1, 'down') +            assert edit.get_edit_text() == 'abc' +            edit.keypress(1, 'down') +            assert edit.get_edit_text() == 'abc' + +            history = commander.CommandHistory(tctx.master, size=5) +            edit = commander.CommandEdit(tctx.master, '', history) +            buf = commander.CommandBuffer(tctx.master, 'cmd3') +            history.add_command(buf) +            edit.keypress(1, 'z') +            edit.keypress(1, 'up') +            assert edit.get_edit_text() == 'cmd3' +            edit.keypress(1, 'down') +            assert edit.get_edit_text() == 'z' + +  class TestCommandHistory:      def fill_history(self, commands):          with taddons.context() as tctx: @@ -148,13 +255,39 @@ class TestCommandBuffer:              cb.cursor = len(cb.text)              cb.cycle_completion() +            ch = commander.CommandHistory(tctx.master, 30) +            ce = commander.CommandEdit(tctx.master, "se", ch) +            ce.keypress(1, 'tab') +            ce.update() +            ret = ce.cbuf.render() +            assert ret == [ +                ('commander_command', 'set'), +                ('text', ' '), +                ('commander_hint', 'option '), +                ('commander_hint', 'value '), +            ] +      def test_render(self):          with taddons.context() as tctx:              cb = commander.CommandBuffer(tctx.master)              cb.text = "foo"              assert cb.render() -    def test_flatten(self): -        with taddons.context() as tctx: -            cb = commander.CommandBuffer(tctx.master) -            assert cb.flatten("foo  bar") == "foo bar" +            cb.text = "set view_filter '~bq test'" +            ret = cb.render() +            assert ret == [ +                ('commander_command', 'set'), +                ('text', ' '), +                ('text', 'view_filter'), +                ('text', ' '), +                ('text', "'~bq test'"), +            ] + +            cb.text = "set" +            ret = cb.render() +            assert ret == [ +                ('commander_command', 'set'), +                ('text', ' '), +                ('commander_hint', 'option '), +                ('commander_hint', 'value '), +            ] diff --git a/test/mitmproxy/tools/console/test_defaultkeys.py b/test/mitmproxy/tools/console/test_defaultkeys.py index 52075c84..58a0a585 100644 --- a/test/mitmproxy/tools/console/test_defaultkeys.py +++ b/test/mitmproxy/tools/console/test_defaultkeys.py @@ -1,14 +1,18 @@ +import pytest + +import mitmproxy.types +from mitmproxy import command +from mitmproxy import ctx  from mitmproxy.test.tflow import tflow  from mitmproxy.tools.console import defaultkeys  from mitmproxy.tools.console import keymap  from mitmproxy.tools.console import master -from mitmproxy import command - -import pytest  @pytest.mark.asyncio  async def test_commands_exist(): +    command_manager = command.CommandManager(ctx) +      km = keymap.Keymap(None)      defaultkeys.map(km)      assert km.bindings @@ -16,7 +20,14 @@ async def test_commands_exist():      await m.load_flow(tflow())      for binding in km.bindings: -        cmd, *args = command.lexer(binding.command) +        parsed, _ = command_manager.parse_partial(binding.command.strip()) + +        cmd = parsed[0].value +        args = [ +            a.value for a in parsed[1:] +            if a.type != mitmproxy.types.Space +        ] +          assert cmd in m.commands.commands          cmd_obj = m.commands.commands[cmd] | 
