From b0b67fe2a7a7e8d220f6917f91248c0ba8a7d64e Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Mon, 18 Dec 2017 07:20:31 +1300 Subject: commands: refactor types The type system was scattered over a number of places, making it hard to follow. This collects all command types in types.py, and completion, validation and parsing for each type is centralised. We should use the same mechanism for options. --- mitmproxy/addons/clientplayback.py | 3 +- mitmproxy/addons/core.py | 9 +- mitmproxy/addons/cut.py | 17 +- mitmproxy/addons/export.py | 3 +- mitmproxy/addons/save.py | 3 +- mitmproxy/addons/serverplayback.py | 3 +- mitmproxy/addons/view.py | 2 +- mitmproxy/command.py | 177 ++----------- mitmproxy/exceptions.py | 4 + mitmproxy/tools/console/commander/commander.py | 69 +----- mitmproxy/tools/console/consoleaddons.py | 22 +- mitmproxy/types.py | 330 +++++++++++++++++++++++++ test/mitmproxy/test_command.py | 53 ++-- test/mitmproxy/test_typemanager.py | 0 test/mitmproxy/test_types.py | 175 +++++++++++++ test/mitmproxy/tools/console/test_commander.py | 30 --- test/mitmproxy/utils/test_typecheck.py | 4 - 17 files changed, 591 insertions(+), 313 deletions(-) create mode 100644 mitmproxy/types.py create mode 100644 test/mitmproxy/test_typemanager.py create mode 100644 test/mitmproxy/test_types.py diff --git a/mitmproxy/addons/clientplayback.py b/mitmproxy/addons/clientplayback.py index fcc3209b..bed06e82 100644 --- a/mitmproxy/addons/clientplayback.py +++ b/mitmproxy/addons/clientplayback.py @@ -3,6 +3,7 @@ from mitmproxy import ctx from mitmproxy import io from mitmproxy import flow from mitmproxy import command +import mitmproxy.types import typing @@ -37,7 +38,7 @@ class ClientPlayback: ctx.master.addons.trigger("update", []) @command.command("replay.client.file") - def load_file(self, path: command.Path) -> None: + def load_file(self, path: mitmproxy.types.Path) -> None: try: flows = io.read_flows_from_paths([path]) except exceptions.FlowReadException as e: diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py index 4191d490..2b0b2f14 100644 --- a/mitmproxy/addons/core.py +++ b/mitmproxy/addons/core.py @@ -6,6 +6,7 @@ from mitmproxy import command from mitmproxy import flow from mitmproxy import optmanager from mitmproxy.net.http import status_codes +import mitmproxy.types class Core: @@ -96,7 +97,7 @@ class Core: ] @command.command("flow.set") - @command.argument("spec", type=command.Choice("flow.set.options")) + @command.argument("spec", type=mitmproxy.types.Choice("flow.set.options")) def flow_set( self, flows: typing.Sequence[flow.Flow], @@ -187,7 +188,7 @@ class Core: ctx.log.alert("Toggled encoding on %s flows." % len(updated)) @command.command("flow.encode") - @command.argument("enc", type=command.Choice("flow.encode.options")) + @command.argument("enc", type=mitmproxy.types.Choice("flow.encode.options")) def encode( self, flows: typing.Sequence[flow.Flow], @@ -216,7 +217,7 @@ class Core: return ["gzip", "deflate", "br"] @command.command("options.load") - def options_load(self, path: command.Path) -> None: + def options_load(self, path: mitmproxy.types.Path) -> None: """ Load options from a file. """ @@ -228,7 +229,7 @@ class Core: ) from e @command.command("options.save") - def options_save(self, path: command.Path) -> None: + def options_save(self, path: mitmproxy.types.Path) -> None: """ Save options to a file. """ diff --git a/mitmproxy/addons/cut.py b/mitmproxy/addons/cut.py index efc9e5df..b90df549 100644 --- a/mitmproxy/addons/cut.py +++ b/mitmproxy/addons/cut.py @@ -7,6 +7,7 @@ from mitmproxy import flow from mitmproxy import ctx from mitmproxy import certs from mitmproxy.utils import strutils +import mitmproxy.types import pyperclip @@ -51,8 +52,8 @@ class Cut: def cut( self, flows: typing.Sequence[flow.Flow], - cuts: typing.Sequence[command.Cut] - ) -> command.Cuts: + cuts: mitmproxy.types.CutSpec, + ) -> mitmproxy.types.Data: """ Cut data from a set of flows. Cut specifications are attribute paths from the base of the flow object, with a few conveniences - "port" @@ -62,17 +63,17 @@ class Cut: or "false", "bytes" are preserved, and all other values are converted to strings. """ - ret = [] + ret = [] # type:typing.List[typing.List[typing.Union[str, bytes]]] for f in flows: ret.append([extract(c, f) for c in cuts]) - return ret + return ret # type: ignore @command.command("cut.save") def save( self, flows: typing.Sequence[flow.Flow], - cuts: typing.Sequence[command.Cut], - path: command.Path + cuts: mitmproxy.types.CutSpec, + path: mitmproxy.types.Path ) -> None: """ Save cuts to file. If there are multiple flows or cuts, the format @@ -84,7 +85,7 @@ class Cut: append = False if path.startswith("+"): append = True - path = command.Path(path[1:]) + path = mitmproxy.types.Path(path[1:]) if len(cuts) == 1 and len(flows) == 1: with open(path, "ab" if append else "wb") as fp: if fp.tell() > 0: @@ -110,7 +111,7 @@ class Cut: def clip( self, flows: typing.Sequence[flow.Flow], - cuts: typing.Sequence[command.Cut], + cuts: mitmproxy.types.CutSpec, ) -> None: """ Send cuts to the clipboard. If there are multiple flows or cuts, the diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py index 5388a0e8..0169f5b1 100644 --- a/mitmproxy/addons/export.py +++ b/mitmproxy/addons/export.py @@ -5,6 +5,7 @@ from mitmproxy import flow from mitmproxy import exceptions from mitmproxy.utils import strutils from mitmproxy.net.http.http1 import assemble +import mitmproxy.types import pyperclip @@ -49,7 +50,7 @@ class Export(): return list(sorted(formats.keys())) @command.command("export.file") - def file(self, fmt: str, f: flow.Flow, path: command.Path) -> None: + def file(self, fmt: str, f: flow.Flow, path: mitmproxy.types.Path) -> None: """ Export a flow to path. """ diff --git a/mitmproxy/addons/save.py b/mitmproxy/addons/save.py index 40cd6f82..1778855d 100644 --- a/mitmproxy/addons/save.py +++ b/mitmproxy/addons/save.py @@ -7,6 +7,7 @@ from mitmproxy import flowfilter from mitmproxy import io from mitmproxy import ctx from mitmproxy import flow +import mitmproxy.types class Save: @@ -50,7 +51,7 @@ class Save: self.start_stream_to_path(ctx.options.save_stream_file, self.filt) @command.command("save.file") - def save(self, flows: typing.Sequence[flow.Flow], path: command.Path) -> None: + def save(self, flows: typing.Sequence[flow.Flow], path: mitmproxy.types.Path) -> None: """ Save flows to a file. If the path starts with a +, flows are appended to the file, otherwise it is over-written. diff --git a/mitmproxy/addons/serverplayback.py b/mitmproxy/addons/serverplayback.py index 46968a8d..20fcfc2a 100644 --- a/mitmproxy/addons/serverplayback.py +++ b/mitmproxy/addons/serverplayback.py @@ -9,6 +9,7 @@ from mitmproxy import flow from mitmproxy import exceptions from mitmproxy import io from mitmproxy import command +import mitmproxy.types class ServerPlayback: @@ -31,7 +32,7 @@ class ServerPlayback: ctx.master.addons.trigger("update", []) @command.command("replay.server.file") - def load_file(self, path: command.Path) -> None: + def load_file(self, path: mitmproxy.types.Path) -> None: try: flows = io.read_flows_from_paths([path]) except exceptions.FlowReadException as e: diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py index e45f2baf..3a15fd3e 100644 --- a/mitmproxy/addons/view.py +++ b/mitmproxy/addons/view.py @@ -351,7 +351,7 @@ class View(collections.Sequence): ctx.master.addons.trigger("update", updated) @command.command("view.load") - def load_file(self, path: command.Path) -> None: + def load_file(self, path: mitmproxy.types.Path) -> None: """ Load flows into the view, without processing them with addons. """ diff --git a/mitmproxy/command.py b/mitmproxy/command.py index c86d9792..a77658fd 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -12,7 +12,7 @@ import sys from mitmproxy.utils import typecheck from mitmproxy import exceptions -from mitmproxy import flow +import mitmproxy.types def lexer(s): @@ -24,113 +24,14 @@ def lexer(s): return lex -# This is an awkward location for these values, but it's better than having -# the console core import and depend on an addon. FIXME: Add a way for -# addons to add custom types and manage their completion and validation. -valid_flow_prefixes = [ - "@all", - "@focus", - "@shown", - "@hidden", - "@marked", - "@unmarked", - "~q", - "~s", - "~a", - "~hq", - "~hs", - "~b", - "~bq", - "~bs", - "~t", - "~d", - "~m", - "~u", - "~c", -] - - -Cuts = typing.Sequence[ - typing.Sequence[typing.Union[str, bytes]] -] - - -class Cut(str): - # This is an awkward location for these values, but it's better than having - # the console core import and depend on an addon. FIXME: Add a way for - # addons to add custom types and manage their completion and validation. - valid_prefixes = [ - "request.method", - "request.scheme", - "request.host", - "request.http_version", - "request.port", - "request.path", - "request.url", - "request.text", - "request.content", - "request.raw_content", - "request.timestamp_start", - "request.timestamp_end", - "request.header[", - - "response.status_code", - "response.reason", - "response.text", - "response.content", - "response.timestamp_start", - "response.timestamp_end", - "response.raw_content", - "response.header[", - - "client_conn.address.port", - "client_conn.address.host", - "client_conn.tls_version", - "client_conn.sni", - "client_conn.ssl_established", - - "server_conn.address.port", - "server_conn.address.host", - "server_conn.ip_address.host", - "server_conn.tls_version", - "server_conn.sni", - "server_conn.ssl_established", - ] - - -class Path(str): - pass - - -class Cmd(str): - pass - - -class Arg(str): - pass - - def typename(t: type) -> str: """ - Translates a type to an explanatory string. If ret is True, we're - looking at a return type, else we're looking at a parameter type. + Translates a type to an explanatory string. """ - if isinstance(t, Choice): - return "choice" - elif t == typing.Sequence[flow.Flow]: - return "[flow]" - elif t == typing.Sequence[str]: - return "[str]" - elif t == typing.Sequence[Cut]: - return "[cut]" - elif t == Cuts: - return "[cuts]" - elif t == flow.Flow: - return "flow" - elif issubclass(t, (str, int, bool)): - return t.__name__.lower() - else: # pragma: no cover + to = mitmproxy.types.CommandTypes.get(t, None) + if not to: raise NotImplementedError(t) + return to.display class Command: @@ -168,7 +69,7 @@ class Command: ret = " -> " + ret return "%s %s%s" % (self.path, params, ret) - def call(self, args: typing.Sequence[str]): + def call(self, args: typing.Sequence[str]) -> typing.Any: """ Call the command with a list of arguments. At this point, all arguments are strings. @@ -255,13 +156,13 @@ class CommandManager: typ = None # type: typing.Type for i in range(len(parts)): if i == 0: - typ = Cmd + typ = mitmproxy.types.Cmd if parts[i] in self.commands: params.extend(self.commands[parts[i]].paramtypes) elif params: typ = params.pop(0) # FIXME: Do we need to check that Arg is positional? - if typ == Cmd and params and params[0] == Arg: + if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg: if parts[i] in self.commands: params[:] = self.commands[parts[i]].paramtypes else: @@ -269,7 +170,7 @@ class CommandManager: parse.append(ParseResult(value=parts[i], type=typ)) return parse - def call_args(self, path, args): + def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any: """ Call a command using a list of string arguments. May raise CommandError. """ @@ -300,45 +201,13 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any: """ Convert a string to a argument to the appropriate type. """ - if isinstance(argtype, Choice): - cmd = argtype.options_command - opts = manager.call(cmd) - if spec not in opts: - raise exceptions.CommandError( - "Invalid choice: see %s for options" % cmd - ) - return spec - elif issubclass(argtype, str): - return spec - elif argtype == bool: - if spec == "true": - return True - elif spec == "false": - return False - else: - raise exceptions.CommandError( - "Booleans are 'true' or 'false', got %s" % spec - ) - elif issubclass(argtype, int): - try: - return int(spec) - except ValueError as e: - raise exceptions.CommandError("Expected an integer, got %s." % spec) - elif argtype == typing.Sequence[flow.Flow]: - return manager.call_args("view.resolve", [spec]) - elif argtype == Cuts: - return manager.call_args("cut", [spec]) - elif argtype == flow.Flow: - flows = manager.call_args("view.resolve", [spec]) - if len(flows) != 1: - raise exceptions.CommandError( - "Command requires one flow, specification matched %s." % len(flows) - ) - return flows[0] - elif argtype in (typing.Sequence[str], typing.Sequence[Cut]): - return [i.strip() for i in spec.split(",")] - else: + t = mitmproxy.types.CommandTypes.get(argtype, None) + if not t: raise exceptions.CommandError("Unsupported argument type: %s" % argtype) + try: + return t.parse(manager, argtype, spec) # type: ignore + except exceptions.TypeError as e: + raise exceptions.CommandError from e def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None: @@ -360,21 +229,11 @@ def command(path): return decorator -class Choice: - def __init__(self, options_command): - self.options_command = options_command - - def __instancecheck__(self, instance): - # return false here so that arguments are piped through parsearg, - # which does extended validation. - return False - - def argument(name, type): """ - Set the type of a command argument at runtime. - This is useful for more specific types such as command.Choice, which we cannot annotate - directly as mypy does not like that. + Set the type of a command argument at runtime. This is useful for more + specific types such as mitmproxy.types.Choice, which we cannot annotate + directly as mypy does not like that. """ def decorator(f: types.FunctionType) -> types.FunctionType: assert name in f.__annotations__ diff --git a/mitmproxy/exceptions.py b/mitmproxy/exceptions.py index 71517480..d568898b 100644 --- a/mitmproxy/exceptions.py +++ b/mitmproxy/exceptions.py @@ -112,6 +112,10 @@ class AddonHalt(MitmproxyException): pass +class TypeError(MitmproxyException): + pass + + """ Net-layer exceptions """ diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index ef32b953..13c80092 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -1,6 +1,4 @@ import abc -import glob -import os import typing import urwid @@ -9,6 +7,7 @@ from urwid.text_layout import calc_coords import mitmproxy.flow import mitmproxy.master import mitmproxy.command +import mitmproxy.types class Completer: # pragma: no cover @@ -39,30 +38,6 @@ class ListCompleter(Completer): return ret -# Generates the completion options for a specific starting input -def pathOptions(start: str) -> typing.Sequence[str]: - if not start: - start = "./" - path = os.path.expanduser(start) - ret = [] - if os.path.isdir(path): - files = glob.glob(os.path.join(path, "*")) - prefix = start - else: - files = glob.glob(path + "*") - prefix = os.path.dirname(start) - prefix = prefix or "./" - for f in files: - display = os.path.join(prefix, os.path.normpath(os.path.basename(f))) - if os.path.isdir(f): - display += "/" - ret.append(display) - if not ret: - ret = [start] - ret.sort() - return ret - - CompletionState = typing.NamedTuple( "CompletionState", [ @@ -106,48 +81,12 @@ class CommandBuffer(): if not self.completion: parts = self.master.commands.parse_partial(self.buf[:self.cursor]) last = parts[-1] - if last.type == mitmproxy.command.Cmd: - self.completion = CompletionState( - completer = ListCompleter( - parts[-1].value, - self.master.commands.commands.keys(), - ), - parse = parts, - ) - if last.type == typing.Sequence[mitmproxy.command.Cut]: - spec = parts[-1].value.split(",") - opts = [] - for pref in mitmproxy.command.Cut.valid_prefixes: - spec[-1] = pref - opts.append(",".join(spec)) - self.completion = CompletionState( - completer = ListCompleter( - parts[-1].value, - opts, - ), - parse = parts, - ) - elif isinstance(last.type, mitmproxy.command.Choice): + ct = mitmproxy.types.CommandTypes.get(last.type, None) + if ct: self.completion = CompletionState( completer = ListCompleter( parts[-1].value, - self.master.commands.call(last.type.options_command), - ), - parse = parts, - ) - elif last.type == mitmproxy.command.Path: - self.completion = CompletionState( - completer = ListCompleter( - "", - pathOptions(parts[1].value) - ), - parse = parts, - ) - elif last.type in (typing.Sequence[mitmproxy.flow.Flow], mitmproxy.flow.Flow): - self.completion = CompletionState( - completer = ListCompleter( - "", - mitmproxy.command.valid_flow_prefixes, + ct.completion(self.master.commands, last.type, parts[-1].value) ), parse = parts, ) diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py index 453e9e1c..37647e60 100644 --- a/mitmproxy/tools/console/consoleaddons.py +++ b/mitmproxy/tools/console/consoleaddons.py @@ -7,6 +7,8 @@ from mitmproxy import exceptions from mitmproxy import flow from mitmproxy import contentviews from mitmproxy.utils import strutils +import mitmproxy.types + from mitmproxy.tools.console import overlay from mitmproxy.tools.console import signals @@ -218,8 +220,8 @@ class ConsoleAddon: self, prompt: str, choices: typing.Sequence[str], - cmd: command.Cmd, - *args: command.Arg + cmd: mitmproxy.types.Cmd, + *args: mitmproxy.types.Arg ) -> None: """ Prompt the user to choose from a specified list of strings, then @@ -241,7 +243,7 @@ class ConsoleAddon: @command.command("console.choose.cmd") def console_choose_cmd( - self, prompt: str, choicecmd: command.Cmd, *cmd: command.Arg + self, prompt: str, choicecmd: mitmproxy.types.Cmd, *cmd: mitmproxy.types.Arg ) -> None: """ Prompt the user to choose from a list of strings returned by a @@ -352,7 +354,7 @@ class ConsoleAddon: ] @command.command("console.edit.focus") - @command.argument("part", type=command.Choice("console.edit.focus.options")) + @command.argument("part", type=mitmproxy.types.Choice("console.edit.focus.options")) def edit_focus(self, part: str) -> None: """ Edit a component of the currently focused flow. @@ -404,14 +406,14 @@ class ConsoleAddon: self._grideditor().cmd_delete() @command.command("console.grideditor.load") - def grideditor_load(self, path: command.Path) -> None: + def grideditor_load(self, path: mitmproxy.types.Path) -> None: """ Read a file into the currrent cell. """ self._grideditor().cmd_read_file(path) @command.command("console.grideditor.load_escaped") - def grideditor_load_escaped(self, path: command.Path) -> None: + def grideditor_load_escaped(self, path: mitmproxy.types.Path) -> None: """ Read a file containing a Python-style escaped string into the currrent cell. @@ -419,7 +421,7 @@ class ConsoleAddon: self._grideditor().cmd_read_file_escaped(path) @command.command("console.grideditor.save") - def grideditor_save(self, path: command.Path) -> None: + def grideditor_save(self, path: mitmproxy.types.Path) -> None: """ Save data to file as a CSV. """ @@ -440,7 +442,7 @@ class ConsoleAddon: self._grideditor().cmd_spawn_editor() @command.command("console.flowview.mode.set") - @command.argument("mode", type=command.Choice("console.flowview.mode.options")) + @command.argument("mode", type=mitmproxy.types.Choice("console.flowview.mode.options")) def flowview_mode_set(self, mode: str) -> None: """ Set the display mode for the current flow view. @@ -498,8 +500,8 @@ class ConsoleAddon: self, contexts: typing.Sequence[str], key: str, - cmd: command.Cmd, - *args: command.Arg + cmd: mitmproxy.types.Cmd, + *args: mitmproxy.types.Arg ) -> None: """ Bind a shortcut key. diff --git a/mitmproxy/types.py b/mitmproxy/types.py new file mode 100644 index 00000000..e1b6f95d --- /dev/null +++ b/mitmproxy/types.py @@ -0,0 +1,330 @@ +import os +import glob +import typing + +from mitmproxy import exceptions +from mitmproxy import flow + + +class Path(str): + pass + + +class Cmd(str): + pass + + +class Arg(str): + pass + + +class CutSpec(typing.Sequence[str]): + pass + + +class Data(typing.Sequence[typing.Sequence[typing.Union[str, bytes]]]): + pass + + +class Choice: + def __init__(self, options_command): + self.options_command = options_command + + def __instancecheck__(self, instance): # pragma: no cover + # return false here so that arguments are piped through parsearg, + # which does extended validation. + return False + + +# One of the many charming things about mypy is that introducing type +# annotations can cause circular dependencies where there were none before. +# Rather than putting types and the CommandManger in the same file, we introduce +# a stub type with the signature we use. +class _CommandStub: + commands = {} # type: typing.Mapping[str, typing.Any] + + def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any: # pragma: no cover + pass + + def call(self, args: typing.Sequence[str]) -> typing.Any: # pragma: no cover + pass + + +class BaseType: + typ = object # type: typing.Type + display = "" # type: str + + def completion( + self, manager: _CommandStub, t: type, s: str + ) -> typing.Sequence[str]: # pragma: no cover + pass + + def parse( + self, manager: _CommandStub, t: type, s: str + ) -> typing.Any: # pragma: no cover + pass + + +class Bool(BaseType): + typ = bool + display = "bool" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return ["false", "true"] + + def parse(self, manager: _CommandStub, t: type, s: str) -> bool: + if s == "true": + return True + elif s == "false": + return False + else: + raise exceptions.TypeError( + "Booleans are 'true' or 'false', got %s" % s + ) + + +class Str(BaseType): + typ = str + display = "str" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandStub, t: type, s: str) -> str: + return s + + +class Int(BaseType): + typ = int + display = "int" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandStub, t: type, s: str) -> int: + try: + return int(s) + except ValueError as e: + raise exceptions.TypeError from e + + +class PathType(BaseType): + typ = Path + display = "path" + + def completion(self, manager: _CommandStub, t: type, start: str) -> typing.Sequence[str]: + if not start: + start = "./" + path = os.path.expanduser(start) + ret = [] + if os.path.isdir(path): + files = glob.glob(os.path.join(path, "*")) + prefix = start + else: + files = glob.glob(path + "*") + prefix = os.path.dirname(start) + prefix = prefix or "./" + for f in files: + display = os.path.join(prefix, os.path.normpath(os.path.basename(f))) + if os.path.isdir(f): + display += "/" + ret.append(display) + if not ret: + ret = [start] + ret.sort() + return ret + + def parse(self, manager: _CommandStub, t: type, s: str) -> str: + return s + + +class CmdType(BaseType): + typ = Cmd + display = "cmd" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return list(manager.commands.keys()) + + def parse(self, manager: _CommandStub, t: type, s: str) -> str: + return s + + +class ArgType(BaseType): + typ = Arg + display = "arg" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandStub, t: type, s: str) -> str: + return s + + +class StrSeq(BaseType): + typ = typing.Sequence[str] + display = "[str]" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return [x.strip() for x in s.split(",")] + + +class CutSpecType(BaseType): + typ = CutSpec + display = "[cut]" + valid_prefixes = [ + "request.method", + "request.scheme", + "request.host", + "request.http_version", + "request.port", + "request.path", + "request.url", + "request.text", + "request.content", + "request.raw_content", + "request.timestamp_start", + "request.timestamp_end", + "request.header[", + + "response.status_code", + "response.reason", + "response.text", + "response.content", + "response.timestamp_start", + "response.timestamp_end", + "response.raw_content", + "response.header[", + + "client_conn.address.port", + "client_conn.address.host", + "client_conn.tls_version", + "client_conn.sni", + "client_conn.ssl_established", + + "server_conn.address.port", + "server_conn.address.host", + "server_conn.ip_address.host", + "server_conn.tls_version", + "server_conn.sni", + "server_conn.ssl_established", + ] + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + spec = s.split(",") + opts = [] + for pref in self.valid_prefixes: + spec[-1] = pref + opts.append(",".join(spec)) + return opts + + def parse(self, manager: _CommandStub, t: type, s: str) -> CutSpec: + parts = s.split(",") # type: typing.Any + return parts + + +class BaseFlowType(BaseType): + valid_prefixes = [ + "@all", + "@focus", + "@shown", + "@hidden", + "@marked", + "@unmarked", + "~q", + "~s", + "~a", + "~hq", + "~hs", + "~b", + "~bq", + "~bs", + "~t", + "~d", + "~m", + "~u", + "~c", + ] + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return self.valid_prefixes + + +class FlowType(BaseFlowType): + typ = flow.Flow + display = "flow" + + def parse(self, manager: _CommandStub, t: type, s: str) -> flow.Flow: + flows = manager.call_args("view.resolve", [s]) + if len(flows) != 1: + raise exceptions.TypeError( + "Command requires one flow, specification matched %s." % len(flows) + ) + return flows[0] + + +class FlowsType(BaseFlowType): + typ = typing.Sequence[flow.Flow] + display = "[flow]" + + def parse(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[flow.Flow]: + return manager.call_args("view.resolve", [s]) + + +class DataType: + typ = Data + display = "[data]" + + def completion( + self, manager: _CommandStub, t: type, s: str + ) -> typing.Sequence[str]: # pragma: no cover + raise exceptions.TypeError("data cannot be passed as argument") + + def parse( + self, manager: _CommandStub, t: type, s: str + ) -> typing.Any: # pragma: no cover + raise exceptions.TypeError("data cannot be passed as argument") + + +class ChoiceType: + typ = Choice + display = "choice" + + def completion(self, manager: _CommandStub, t: Choice, s: str) -> typing.Sequence[str]: + return manager.call(t.options_command) + + def parse(self, manager: _CommandStub, t: Choice, s: str) -> str: + opts = manager.call(t.options_command) + if s not in opts: + raise exceptions.TypeError("Invalid choice.") + return s + + +class TypeManager: + def __init__(self, *types): + self.typemap = {} + for t in types: + self.typemap[t.typ] = t() + + def get(self, t: type, default=None) -> BaseType: + if type(t) in self.typemap: + return self.typemap[type(t)] + return self.typemap.get(t, default) + + +CommandTypes = TypeManager( + ArgType, + Bool, + ChoiceType, + CmdType, + CutSpecType, + DataType, + FlowType, + FlowsType, + Int, + PathType, + Str, + StrSeq, +) diff --git a/test/mitmproxy/test_command.py b/test/mitmproxy/test_command.py index 50ad3d55..f9315dd2 100644 --- a/test/mitmproxy/test_command.py +++ b/test/mitmproxy/test_command.py @@ -4,6 +4,7 @@ from mitmproxy import flow from mitmproxy import exceptions from mitmproxy.test import tflow from mitmproxy.test import taddons +import mitmproxy.types import io import pytest @@ -25,7 +26,7 @@ class TAddon: return foo @command.command("subcommand") - def subcommand(self, cmd: command.Cmd, *args: command.Arg) -> str: + def subcommand(self, cmd: mitmproxy.types.Cmd, *args: mitmproxy.types.Arg) -> str: return "ok" @command.command("empty") @@ -39,12 +40,12 @@ class TAddon: def choices(self) -> typing.Sequence[str]: return ["one", "two", "three"] - @command.argument("arg", type=command.Choice("choices")) + @command.argument("arg", type=mitmproxy.types.Choice("choices")) def choose(self, arg: str) -> typing.Sequence[str]: return ["one", "two", "three"] @command.command("path") - def path(self, arg: command.Path) -> None: + def path(self, arg: mitmproxy.types.Path) -> None: pass @@ -79,45 +80,45 @@ class TestCommand: [ "foo bar", [ - command.ParseResult(value = "foo", type = command.Cmd), + command.ParseResult(value = "foo", type = mitmproxy.types.Cmd), command.ParseResult(value = "bar", type = str) ], ], [ "foo 'bar", [ - command.ParseResult(value = "foo", type = command.Cmd), + command.ParseResult(value = "foo", type = mitmproxy.types.Cmd), command.ParseResult(value = "'bar", type = str) ] ], - ["a", [command.ParseResult(value = "a", type = command.Cmd)]], - ["", [command.ParseResult(value = "", type = command.Cmd)]], + ["a", [command.ParseResult(value = "a", type = mitmproxy.types.Cmd)]], + ["", [command.ParseResult(value = "", type = mitmproxy.types.Cmd)]], [ "cmd3 1", [ - command.ParseResult(value = "cmd3", type = command.Cmd), + command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd), command.ParseResult(value = "1", type = int), ] ], [ "cmd3 ", [ - command.ParseResult(value = "cmd3", type = command.Cmd), + command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd), command.ParseResult(value = "", type = int), ] ], [ "subcommand ", [ - command.ParseResult(value = "subcommand", type = command.Cmd), - command.ParseResult(value = "", type = command.Cmd), + command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd), + command.ParseResult(value = "", type = mitmproxy.types.Cmd), ] ], [ "subcommand cmd3 ", [ - command.ParseResult(value = "subcommand", type = command.Cmd), - command.ParseResult(value = "cmd3", type = command.Cmd), + command.ParseResult(value = "subcommand", type = mitmproxy.types.Cmd), + command.ParseResult(value = "cmd3", type = mitmproxy.types.Cmd), command.ParseResult(value = "", type = int), ] ], @@ -154,15 +155,15 @@ def test_typename(): assert command.typename(str) == "str" assert command.typename(typing.Sequence[flow.Flow]) == "[flow]" - assert command.typename(command.Cuts) == "[cuts]" - assert command.typename(typing.Sequence[command.Cut]) == "[cut]" + assert command.typename(mitmproxy.types.Data) == "[data]" + assert command.typename(mitmproxy.types.CutSpec) == "[cut]" assert command.typename(flow.Flow) == "flow" assert command.typename(typing.Sequence[str]) == "[str]" - assert command.typename(command.Choice("foo")) == "choice" - assert command.typename(command.Path) == "path" - assert command.typename(command.Cmd) == "cmd" + assert command.typename(mitmproxy.types.Choice("foo")) == "choice" + assert command.typename(mitmproxy.types.Path) == "path" + assert command.typename(mitmproxy.types.Cmd) == "cmd" class DummyConsole: @@ -172,7 +173,7 @@ class DummyConsole: return [tflow.tflow(resp=True)] * n @command.command("cut") - def cut(self, spec: str) -> command.Cuts: + def cut(self, spec: str) -> mitmproxy.types.Data: return [["test"]] @@ -201,10 +202,6 @@ def test_parsearg(): with pytest.raises(exceptions.CommandError): command.parsearg(tctx.master.commands, "foo", Exception) - assert command.parsearg( - tctx.master.commands, "foo", command.Cuts - ) == [["test"]] - assert command.parsearg( tctx.master.commands, "foo", typing.Sequence[str] ) == ["foo"] @@ -215,18 +212,18 @@ def test_parsearg(): a = TAddon() tctx.master.commands.add("choices", a.choices) assert command.parsearg( - tctx.master.commands, "one", command.Choice("choices"), + tctx.master.commands, "one", mitmproxy.types.Choice("choices"), ) == "one" with pytest.raises(exceptions.CommandError): assert command.parsearg( - tctx.master.commands, "invalid", command.Choice("choices"), + tctx.master.commands, "invalid", mitmproxy.types.Choice("choices"), ) assert command.parsearg( - tctx.master.commands, "foo", command.Path + tctx.master.commands, "foo", mitmproxy.types.Path ) == "foo" assert command.parsearg( - tctx.master.commands, "foo", command.Cmd + tctx.master.commands, "foo", mitmproxy.types.Cmd ) == "foo" @@ -272,5 +269,5 @@ def test_choice(): basic typechecking for choices should fail as we cannot verify if strings are a valid choice at this point. """ - c = command.Choice("foo") + c = mitmproxy.types.Choice("foo") assert not typecheck.check_command_type("foo", c) diff --git a/test/mitmproxy/test_typemanager.py b/test/mitmproxy/test_typemanager.py new file mode 100644 index 00000000..e69de29b diff --git a/test/mitmproxy/test_types.py b/test/mitmproxy/test_types.py new file mode 100644 index 00000000..81aaed74 --- /dev/null +++ b/test/mitmproxy/test_types.py @@ -0,0 +1,175 @@ +import pytest +import os +import typing +import contextlib + +from mitmproxy.test import tutils +import mitmproxy.exceptions +import mitmproxy.types +from mitmproxy.test import taddons +from mitmproxy.test import tflow +from mitmproxy import command +from mitmproxy import flow + +from . import test_command + + +@contextlib.contextmanager +def chdir(path: str): + old_dir = os.getcwd() + os.chdir(path) + yield + os.chdir(old_dir) + + +def test_bool(): + with taddons.context() as tctx: + b = mitmproxy.types.Bool() + assert b.completion(tctx.master.commands, bool, "b") == ["false", "true"] + assert b.parse(tctx.master.commands, bool, "true") is True + assert b.parse(tctx.master.commands, bool, "false") is False + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, bool, "foo") + + +def test_str(): + with taddons.context() as tctx: + b = mitmproxy.types.Str() + assert b.completion(tctx.master.commands, str, "") == [] + assert b.parse(tctx.master.commands, str, "foo") == "foo" + + +def test_int(): + with taddons.context() as tctx: + b = mitmproxy.types.Int() + assert b.completion(tctx.master.commands, int, "b") == [] + assert b.parse(tctx.master.commands, int, "1") == 1 + assert b.parse(tctx.master.commands, int, "999") == 999 + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, int, "foo") + + +def test_path(): + with taddons.context() as tctx: + b = mitmproxy.types.PathType() + assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/foo") == "/foo" + assert b.parse(tctx.master.commands, mitmproxy.types.Path, "/bar") == "/bar" + + def normPathOpts(prefix, match): + ret = [] + for s in b.completion(tctx.master.commands, mitmproxy.types.Path, match): + s = s[len(prefix):] + s = s.replace(os.sep, "/") + ret.append(s) + return ret + + cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion")) + assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/'] + assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac'] + with chdir(cd): + assert normPathOpts("", "./") == ['./aaa', './aab', './aac', './bbb/'] + assert normPathOpts("", "") == ['./aaa', './aab', './aac', './bbb/'] + assert b.completion( + tctx.master.commands, mitmproxy.types.Path, "nonexistent" + ) == ["nonexistent"] + + +def test_cmd(): + with taddons.context() as tctx: + tctx.master.addons.add(test_command.TAddon()) + b = mitmproxy.types.CmdType() + assert b.parse(tctx.master.commands, mitmproxy.types.Cmd, "foo") == "foo" + assert len( + b.completion(tctx.master.commands, mitmproxy.types.Cmd, "") + ) == len(tctx.master.commands.commands.keys()) + + +def test_cutspec(): + with taddons.context() as tctx: + b = mitmproxy.types.CutSpecType() + b.parse(tctx.master.commands, mitmproxy.types.CutSpec, "foo,bar") == ["foo", "bar"] + assert b.completion( + tctx.master.commands, mitmproxy.types.CutSpec, "request.p" + ) == b.valid_prefixes + ret = b.completion(tctx.master.commands, mitmproxy.types.CutSpec, "request.port,f") + assert ret[0].startswith("request.port,") + assert len(ret) == len(b.valid_prefixes) + + +def test_arg(): + with taddons.context() as tctx: + b = mitmproxy.types.ArgType() + assert b.completion(tctx.master.commands, mitmproxy.types.Arg, "") == [] + assert b.parse(tctx.master.commands, mitmproxy.types.Arg, "foo") == "foo" + + +def test_strseq(): + with taddons.context() as tctx: + b = mitmproxy.types.StrSeq() + assert b.completion(tctx.master.commands, typing.Sequence[str], "") == [] + assert b.parse(tctx.master.commands, typing.Sequence[str], "foo") == ["foo"] + assert b.parse(tctx.master.commands, typing.Sequence[str], "foo,bar") == ["foo", "bar"] + + +class DummyConsole: + @command.command("view.resolve") + def resolve(self, spec: str) -> typing.Sequence[flow.Flow]: + n = int(spec) + return [tflow.tflow(resp=True)] * n + + @command.command("cut") + def cut(self, spec: str) -> mitmproxy.types.Data: + return [["test"]] + + @command.command("options") + def options(self) -> typing.Sequence[str]: + return ["one", "two", "three"] + + +def test_flow(): + with taddons.context() as tctx: + tctx.master.addons.add(DummyConsole()) + b = mitmproxy.types.FlowType() + assert len(b.completion(tctx.master.commands, flow.Flow, "")) == len(b.valid_prefixes) + assert b.parse(tctx.master.commands, flow.Flow, "1") + with pytest.raises(mitmproxy.exceptions.TypeError): + assert b.parse(tctx.master.commands, flow.Flow, "0") + with pytest.raises(mitmproxy.exceptions.TypeError): + assert b.parse(tctx.master.commands, flow.Flow, "2") + + +def test_flows(): + with taddons.context() as tctx: + tctx.master.addons.add(DummyConsole()) + b = mitmproxy.types.FlowsType() + assert len( + b.completion(tctx.master.commands, typing.Sequence[flow.Flow], "") + ) == len(b.valid_prefixes) + assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "0")) == 0 + assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "1")) == 1 + assert len(b.parse(tctx.master.commands, typing.Sequence[flow.Flow], "2")) == 2 + + +def test_data(): + with taddons.context() as tctx: + b = mitmproxy.types.DataType() + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, mitmproxy.types.Data, "foo") + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, mitmproxy.types.Data, "foo") + + +def test_choice(): + with taddons.context() as tctx: + tctx.master.addons.add(DummyConsole()) + b = mitmproxy.types.ChoiceType() + comp = b.completion(tctx.master.commands, mitmproxy.types.Choice("options"), "") + assert comp == ["one", "two", "three"] + assert b.parse(tctx.master.commands, mitmproxy.types.Choice("options"), "one") == "one" + with pytest.raises(mitmproxy.exceptions.TypeError): + b.parse(tctx.master.commands, mitmproxy.types.Choice("options"), "invalid") + + +def test_typemanager(): + assert mitmproxy.types.CommandTypes.get(bool, None) + assert mitmproxy.types.CommandTypes.get(mitmproxy.types.Choice("choide"), None) diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py index 823af06d..34062dcb 100644 --- a/test/mitmproxy/tools/console/test_commander.py +++ b/test/mitmproxy/tools/console/test_commander.py @@ -1,36 +1,6 @@ -import os -import contextlib from mitmproxy.tools.console.commander import commander from mitmproxy.test import taddons -from mitmproxy.test import tutils - - -@contextlib.contextmanager -def chdir(path: str): - old_dir = os.getcwd() - os.chdir(path) - yield - os.chdir(old_dir) - - -def normPathOpts(prefix, match): - ret = [] - for s in commander.pathOptions(match): - s = s[len(prefix):] - s = s.replace(os.sep, "/") - ret.append(s) - return ret - - -def test_pathOptions(): - cd = os.path.normpath(tutils.test_data.path("mitmproxy/completion")) - assert normPathOpts(cd, cd) == ['/aaa', '/aab', '/aac', '/bbb/'] - assert normPathOpts(cd, os.path.join(cd, "a")) == ['/aaa', '/aab', '/aac'] - with chdir(cd): - assert normPathOpts("", "./") == ['./aaa', './aab', './aac', './bbb/'] - assert normPathOpts("", "") == ['./aaa', './aab', './aac', './bbb/'] - assert commander.pathOptions("nonexistent") == ["nonexistent"] class TestListCompleter: diff --git a/test/mitmproxy/utils/test_typecheck.py b/test/mitmproxy/utils/test_typecheck.py index 66b1884e..365509f1 100644 --- a/test/mitmproxy/utils/test_typecheck.py +++ b/test/mitmproxy/utils/test_typecheck.py @@ -4,7 +4,6 @@ from unittest import mock import pytest from mitmproxy.utils import typecheck -from mitmproxy import command class TBase: @@ -95,9 +94,6 @@ def test_check_command_type(): assert(typecheck.check_command_type(None, None)) assert(not typecheck.check_command_type(["foo"], typing.Sequence[int])) assert(not typecheck.check_command_type("foo", typing.Sequence[int])) - assert(typecheck.check_command_type([["foo", b"bar"]], command.Cuts)) - assert(not typecheck.check_command_type(["foo", b"bar"], command.Cuts)) - assert(not typecheck.check_command_type([["foo", 22]], command.Cuts)) # Python 3.5 only defines __parameters__ m = mock.Mock() -- cgit v1.2.3