diff options
| author | Aldo Cortesi <aldo@nullcube.com> | 2017-12-18 07:20:31 +1300 |
|---|---|---|
| committer | Aldo Cortesi <aldo@nullcube.com> | 2017-12-18 17:02:48 +1300 |
| commit | b0b67fe2a7a7e8d220f6917f91248c0ba8a7d64e (patch) | |
| tree | bd46fe963f29e11691143aad5ae82ea7f974f3eb /mitmproxy | |
| parent | b1f923e1482bf95418c955a5867dcbd30e1a00ec (diff) | |
| download | mitmproxy-b0b67fe2a7a7e8d220f6917f91248c0ba8a7d64e.tar.gz mitmproxy-b0b67fe2a7a7e8d220f6917f91248c0ba8a7d64e.tar.bz2 mitmproxy-b0b67fe2a7a7e8d220f6917f91248c0ba8a7d64e.zip | |
commands: refactor types
The type system was scattered over a number of places, making it hard to
follow. This collects all command types in types.py, and completion, validation
and parsing for each type is centralised. We should use the same mechanism for
options.
Diffstat (limited to 'mitmproxy')
| -rw-r--r-- | mitmproxy/addons/clientplayback.py | 3 | ||||
| -rw-r--r-- | mitmproxy/addons/core.py | 9 | ||||
| -rw-r--r-- | mitmproxy/addons/cut.py | 17 | ||||
| -rw-r--r-- | mitmproxy/addons/export.py | 3 | ||||
| -rw-r--r-- | mitmproxy/addons/save.py | 3 | ||||
| -rw-r--r-- | mitmproxy/addons/serverplayback.py | 3 | ||||
| -rw-r--r-- | mitmproxy/addons/view.py | 2 | ||||
| -rw-r--r-- | mitmproxy/command.py | 177 | ||||
| -rw-r--r-- | mitmproxy/exceptions.py | 4 | ||||
| -rw-r--r-- | mitmproxy/tools/console/commander/commander.py | 69 | ||||
| -rw-r--r-- | mitmproxy/tools/console/consoleaddons.py | 22 | ||||
| -rw-r--r-- | mitmproxy/types.py | 330 |
12 files changed, 391 insertions, 251 deletions
diff --git a/mitmproxy/addons/clientplayback.py b/mitmproxy/addons/clientplayback.py index fcc3209b..bed06e82 100644 --- a/mitmproxy/addons/clientplayback.py +++ b/mitmproxy/addons/clientplayback.py @@ -3,6 +3,7 @@ from mitmproxy import ctx from mitmproxy import io from mitmproxy import flow from mitmproxy import command +import mitmproxy.types import typing @@ -37,7 +38,7 @@ class ClientPlayback: ctx.master.addons.trigger("update", []) @command.command("replay.client.file") - def load_file(self, path: command.Path) -> None: + def load_file(self, path: mitmproxy.types.Path) -> None: try: flows = io.read_flows_from_paths([path]) except exceptions.FlowReadException as e: diff --git a/mitmproxy/addons/core.py b/mitmproxy/addons/core.py index 4191d490..2b0b2f14 100644 --- a/mitmproxy/addons/core.py +++ b/mitmproxy/addons/core.py @@ -6,6 +6,7 @@ from mitmproxy import command from mitmproxy import flow from mitmproxy import optmanager from mitmproxy.net.http import status_codes +import mitmproxy.types class Core: @@ -96,7 +97,7 @@ class Core: ] @command.command("flow.set") - @command.argument("spec", type=command.Choice("flow.set.options")) + @command.argument("spec", type=mitmproxy.types.Choice("flow.set.options")) def flow_set( self, flows: typing.Sequence[flow.Flow], @@ -187,7 +188,7 @@ class Core: ctx.log.alert("Toggled encoding on %s flows." % len(updated)) @command.command("flow.encode") - @command.argument("enc", type=command.Choice("flow.encode.options")) + @command.argument("enc", type=mitmproxy.types.Choice("flow.encode.options")) def encode( self, flows: typing.Sequence[flow.Flow], @@ -216,7 +217,7 @@ class Core: return ["gzip", "deflate", "br"] @command.command("options.load") - def options_load(self, path: command.Path) -> None: + def options_load(self, path: mitmproxy.types.Path) -> None: """ Load options from a file. """ @@ -228,7 +229,7 @@ class Core: ) from e @command.command("options.save") - def options_save(self, path: command.Path) -> None: + def options_save(self, path: mitmproxy.types.Path) -> None: """ Save options to a file. """ diff --git a/mitmproxy/addons/cut.py b/mitmproxy/addons/cut.py index efc9e5df..b90df549 100644 --- a/mitmproxy/addons/cut.py +++ b/mitmproxy/addons/cut.py @@ -7,6 +7,7 @@ from mitmproxy import flow from mitmproxy import ctx from mitmproxy import certs from mitmproxy.utils import strutils +import mitmproxy.types import pyperclip @@ -51,8 +52,8 @@ class Cut: def cut( self, flows: typing.Sequence[flow.Flow], - cuts: typing.Sequence[command.Cut] - ) -> command.Cuts: + cuts: mitmproxy.types.CutSpec, + ) -> mitmproxy.types.Data: """ Cut data from a set of flows. Cut specifications are attribute paths from the base of the flow object, with a few conveniences - "port" @@ -62,17 +63,17 @@ class Cut: or "false", "bytes" are preserved, and all other values are converted to strings. """ - ret = [] + ret = [] # type:typing.List[typing.List[typing.Union[str, bytes]]] for f in flows: ret.append([extract(c, f) for c in cuts]) - return ret + return ret # type: ignore @command.command("cut.save") def save( self, flows: typing.Sequence[flow.Flow], - cuts: typing.Sequence[command.Cut], - path: command.Path + cuts: mitmproxy.types.CutSpec, + path: mitmproxy.types.Path ) -> None: """ Save cuts to file. If there are multiple flows or cuts, the format @@ -84,7 +85,7 @@ class Cut: append = False if path.startswith("+"): append = True - path = command.Path(path[1:]) + path = mitmproxy.types.Path(path[1:]) if len(cuts) == 1 and len(flows) == 1: with open(path, "ab" if append else "wb") as fp: if fp.tell() > 0: @@ -110,7 +111,7 @@ class Cut: def clip( self, flows: typing.Sequence[flow.Flow], - cuts: typing.Sequence[command.Cut], + cuts: mitmproxy.types.CutSpec, ) -> None: """ Send cuts to the clipboard. If there are multiple flows or cuts, the diff --git a/mitmproxy/addons/export.py b/mitmproxy/addons/export.py index 5388a0e8..0169f5b1 100644 --- a/mitmproxy/addons/export.py +++ b/mitmproxy/addons/export.py @@ -5,6 +5,7 @@ from mitmproxy import flow from mitmproxy import exceptions from mitmproxy.utils import strutils from mitmproxy.net.http.http1 import assemble +import mitmproxy.types import pyperclip @@ -49,7 +50,7 @@ class Export(): return list(sorted(formats.keys())) @command.command("export.file") - def file(self, fmt: str, f: flow.Flow, path: command.Path) -> None: + def file(self, fmt: str, f: flow.Flow, path: mitmproxy.types.Path) -> None: """ Export a flow to path. """ diff --git a/mitmproxy/addons/save.py b/mitmproxy/addons/save.py index 40cd6f82..1778855d 100644 --- a/mitmproxy/addons/save.py +++ b/mitmproxy/addons/save.py @@ -7,6 +7,7 @@ from mitmproxy import flowfilter from mitmproxy import io from mitmproxy import ctx from mitmproxy import flow +import mitmproxy.types class Save: @@ -50,7 +51,7 @@ class Save: self.start_stream_to_path(ctx.options.save_stream_file, self.filt) @command.command("save.file") - def save(self, flows: typing.Sequence[flow.Flow], path: command.Path) -> None: + def save(self, flows: typing.Sequence[flow.Flow], path: mitmproxy.types.Path) -> None: """ Save flows to a file. If the path starts with a +, flows are appended to the file, otherwise it is over-written. diff --git a/mitmproxy/addons/serverplayback.py b/mitmproxy/addons/serverplayback.py index 46968a8d..20fcfc2a 100644 --- a/mitmproxy/addons/serverplayback.py +++ b/mitmproxy/addons/serverplayback.py @@ -9,6 +9,7 @@ from mitmproxy import flow from mitmproxy import exceptions from mitmproxy import io from mitmproxy import command +import mitmproxy.types class ServerPlayback: @@ -31,7 +32,7 @@ class ServerPlayback: ctx.master.addons.trigger("update", []) @command.command("replay.server.file") - def load_file(self, path: command.Path) -> None: + def load_file(self, path: mitmproxy.types.Path) -> None: try: flows = io.read_flows_from_paths([path]) except exceptions.FlowReadException as e: diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py index e45f2baf..3a15fd3e 100644 --- a/mitmproxy/addons/view.py +++ b/mitmproxy/addons/view.py @@ -351,7 +351,7 @@ class View(collections.Sequence): ctx.master.addons.trigger("update", updated) @command.command("view.load") - def load_file(self, path: command.Path) -> None: + def load_file(self, path: mitmproxy.types.Path) -> None: """ Load flows into the view, without processing them with addons. """ diff --git a/mitmproxy/command.py b/mitmproxy/command.py index c86d9792..a77658fd 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -12,7 +12,7 @@ import sys from mitmproxy.utils import typecheck from mitmproxy import exceptions -from mitmproxy import flow +import mitmproxy.types def lexer(s): @@ -24,113 +24,14 @@ def lexer(s): return lex -# This is an awkward location for these values, but it's better than having -# the console core import and depend on an addon. FIXME: Add a way for -# addons to add custom types and manage their completion and validation. -valid_flow_prefixes = [ - "@all", - "@focus", - "@shown", - "@hidden", - "@marked", - "@unmarked", - "~q", - "~s", - "~a", - "~hq", - "~hs", - "~b", - "~bq", - "~bs", - "~t", - "~d", - "~m", - "~u", - "~c", -] - - -Cuts = typing.Sequence[ - typing.Sequence[typing.Union[str, bytes]] -] - - -class Cut(str): - # This is an awkward location for these values, but it's better than having - # the console core import and depend on an addon. FIXME: Add a way for - # addons to add custom types and manage their completion and validation. - valid_prefixes = [ - "request.method", - "request.scheme", - "request.host", - "request.http_version", - "request.port", - "request.path", - "request.url", - "request.text", - "request.content", - "request.raw_content", - "request.timestamp_start", - "request.timestamp_end", - "request.header[", - - "response.status_code", - "response.reason", - "response.text", - "response.content", - "response.timestamp_start", - "response.timestamp_end", - "response.raw_content", - "response.header[", - - "client_conn.address.port", - "client_conn.address.host", - "client_conn.tls_version", - "client_conn.sni", - "client_conn.ssl_established", - - "server_conn.address.port", - "server_conn.address.host", - "server_conn.ip_address.host", - "server_conn.tls_version", - "server_conn.sni", - "server_conn.ssl_established", - ] - - -class Path(str): - pass - - -class Cmd(str): - pass - - -class Arg(str): - pass - - def typename(t: type) -> str: """ - Translates a type to an explanatory string. If ret is True, we're - looking at a return type, else we're looking at a parameter type. + Translates a type to an explanatory string. """ - if isinstance(t, Choice): - return "choice" - elif t == typing.Sequence[flow.Flow]: - return "[flow]" - elif t == typing.Sequence[str]: - return "[str]" - elif t == typing.Sequence[Cut]: - return "[cut]" - elif t == Cuts: - return "[cuts]" - elif t == flow.Flow: - return "flow" - elif issubclass(t, (str, int, bool)): - return t.__name__.lower() - else: # pragma: no cover + to = mitmproxy.types.CommandTypes.get(t, None) + if not to: raise NotImplementedError(t) + return to.display class Command: @@ -168,7 +69,7 @@ class Command: ret = " -> " + ret return "%s %s%s" % (self.path, params, ret) - def call(self, args: typing.Sequence[str]): + def call(self, args: typing.Sequence[str]) -> typing.Any: """ Call the command with a list of arguments. At this point, all arguments are strings. @@ -255,13 +156,13 @@ class CommandManager: typ = None # type: typing.Type for i in range(len(parts)): if i == 0: - typ = Cmd + typ = mitmproxy.types.Cmd if parts[i] in self.commands: params.extend(self.commands[parts[i]].paramtypes) elif params: typ = params.pop(0) # FIXME: Do we need to check that Arg is positional? - if typ == Cmd and params and params[0] == Arg: + if typ == mitmproxy.types.Cmd and params and params[0] == mitmproxy.types.Arg: if parts[i] in self.commands: params[:] = self.commands[parts[i]].paramtypes else: @@ -269,7 +170,7 @@ class CommandManager: parse.append(ParseResult(value=parts[i], type=typ)) return parse - def call_args(self, path, args): + def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any: """ Call a command using a list of string arguments. May raise CommandError. """ @@ -300,45 +201,13 @@ def parsearg(manager: CommandManager, spec: str, argtype: type) -> typing.Any: """ Convert a string to a argument to the appropriate type. """ - if isinstance(argtype, Choice): - cmd = argtype.options_command - opts = manager.call(cmd) - if spec not in opts: - raise exceptions.CommandError( - "Invalid choice: see %s for options" % cmd - ) - return spec - elif issubclass(argtype, str): - return spec - elif argtype == bool: - if spec == "true": - return True - elif spec == "false": - return False - else: - raise exceptions.CommandError( - "Booleans are 'true' or 'false', got %s" % spec - ) - elif issubclass(argtype, int): - try: - return int(spec) - except ValueError as e: - raise exceptions.CommandError("Expected an integer, got %s." % spec) - elif argtype == typing.Sequence[flow.Flow]: - return manager.call_args("view.resolve", [spec]) - elif argtype == Cuts: - return manager.call_args("cut", [spec]) - elif argtype == flow.Flow: - flows = manager.call_args("view.resolve", [spec]) - if len(flows) != 1: - raise exceptions.CommandError( - "Command requires one flow, specification matched %s." % len(flows) - ) - return flows[0] - elif argtype in (typing.Sequence[str], typing.Sequence[Cut]): - return [i.strip() for i in spec.split(",")] - else: + t = mitmproxy.types.CommandTypes.get(argtype, None) + if not t: raise exceptions.CommandError("Unsupported argument type: %s" % argtype) + try: + return t.parse(manager, argtype, spec) # type: ignore + except exceptions.TypeError as e: + raise exceptions.CommandError from e def verify_arg_signature(f: typing.Callable, args: list, kwargs: dict) -> None: @@ -360,21 +229,11 @@ def command(path): return decorator -class Choice: - def __init__(self, options_command): - self.options_command = options_command - - def __instancecheck__(self, instance): - # return false here so that arguments are piped through parsearg, - # which does extended validation. - return False - - def argument(name, type): """ - Set the type of a command argument at runtime. - This is useful for more specific types such as command.Choice, which we cannot annotate - directly as mypy does not like that. + Set the type of a command argument at runtime. This is useful for more + specific types such as mitmproxy.types.Choice, which we cannot annotate + directly as mypy does not like that. """ def decorator(f: types.FunctionType) -> types.FunctionType: assert name in f.__annotations__ diff --git a/mitmproxy/exceptions.py b/mitmproxy/exceptions.py index 71517480..d568898b 100644 --- a/mitmproxy/exceptions.py +++ b/mitmproxy/exceptions.py @@ -112,6 +112,10 @@ class AddonHalt(MitmproxyException): pass +class TypeError(MitmproxyException): + pass + + """ Net-layer exceptions """ diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index ef32b953..13c80092 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -1,6 +1,4 @@ import abc -import glob -import os import typing import urwid @@ -9,6 +7,7 @@ from urwid.text_layout import calc_coords import mitmproxy.flow import mitmproxy.master import mitmproxy.command +import mitmproxy.types class Completer: # pragma: no cover @@ -39,30 +38,6 @@ class ListCompleter(Completer): return ret -# Generates the completion options for a specific starting input -def pathOptions(start: str) -> typing.Sequence[str]: - if not start: - start = "./" - path = os.path.expanduser(start) - ret = [] - if os.path.isdir(path): - files = glob.glob(os.path.join(path, "*")) - prefix = start - else: - files = glob.glob(path + "*") - prefix = os.path.dirname(start) - prefix = prefix or "./" - for f in files: - display = os.path.join(prefix, os.path.normpath(os.path.basename(f))) - if os.path.isdir(f): - display += "/" - ret.append(display) - if not ret: - ret = [start] - ret.sort() - return ret - - CompletionState = typing.NamedTuple( "CompletionState", [ @@ -106,48 +81,12 @@ class CommandBuffer(): if not self.completion: parts = self.master.commands.parse_partial(self.buf[:self.cursor]) last = parts[-1] - if last.type == mitmproxy.command.Cmd: - self.completion = CompletionState( - completer = ListCompleter( - parts[-1].value, - self.master.commands.commands.keys(), - ), - parse = parts, - ) - if last.type == typing.Sequence[mitmproxy.command.Cut]: - spec = parts[-1].value.split(",") - opts = [] - for pref in mitmproxy.command.Cut.valid_prefixes: - spec[-1] = pref - opts.append(",".join(spec)) - self.completion = CompletionState( - completer = ListCompleter( - parts[-1].value, - opts, - ), - parse = parts, - ) - elif isinstance(last.type, mitmproxy.command.Choice): + ct = mitmproxy.types.CommandTypes.get(last.type, None) + if ct: self.completion = CompletionState( completer = ListCompleter( parts[-1].value, - self.master.commands.call(last.type.options_command), - ), - parse = parts, - ) - elif last.type == mitmproxy.command.Path: - self.completion = CompletionState( - completer = ListCompleter( - "", - pathOptions(parts[1].value) - ), - parse = parts, - ) - elif last.type in (typing.Sequence[mitmproxy.flow.Flow], mitmproxy.flow.Flow): - self.completion = CompletionState( - completer = ListCompleter( - "", - mitmproxy.command.valid_flow_prefixes, + ct.completion(self.master.commands, last.type, parts[-1].value) ), parse = parts, ) diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py index 453e9e1c..37647e60 100644 --- a/mitmproxy/tools/console/consoleaddons.py +++ b/mitmproxy/tools/console/consoleaddons.py @@ -7,6 +7,8 @@ from mitmproxy import exceptions from mitmproxy import flow from mitmproxy import contentviews from mitmproxy.utils import strutils +import mitmproxy.types + from mitmproxy.tools.console import overlay from mitmproxy.tools.console import signals @@ -218,8 +220,8 @@ class ConsoleAddon: self, prompt: str, choices: typing.Sequence[str], - cmd: command.Cmd, - *args: command.Arg + cmd: mitmproxy.types.Cmd, + *args: mitmproxy.types.Arg ) -> None: """ Prompt the user to choose from a specified list of strings, then @@ -241,7 +243,7 @@ class ConsoleAddon: @command.command("console.choose.cmd") def console_choose_cmd( - self, prompt: str, choicecmd: command.Cmd, *cmd: command.Arg + self, prompt: str, choicecmd: mitmproxy.types.Cmd, *cmd: mitmproxy.types.Arg ) -> None: """ Prompt the user to choose from a list of strings returned by a @@ -352,7 +354,7 @@ class ConsoleAddon: ] @command.command("console.edit.focus") - @command.argument("part", type=command.Choice("console.edit.focus.options")) + @command.argument("part", type=mitmproxy.types.Choice("console.edit.focus.options")) def edit_focus(self, part: str) -> None: """ Edit a component of the currently focused flow. @@ -404,14 +406,14 @@ class ConsoleAddon: self._grideditor().cmd_delete() @command.command("console.grideditor.load") - def grideditor_load(self, path: command.Path) -> None: + def grideditor_load(self, path: mitmproxy.types.Path) -> None: """ Read a file into the currrent cell. """ self._grideditor().cmd_read_file(path) @command.command("console.grideditor.load_escaped") - def grideditor_load_escaped(self, path: command.Path) -> None: + def grideditor_load_escaped(self, path: mitmproxy.types.Path) -> None: """ Read a file containing a Python-style escaped string into the currrent cell. @@ -419,7 +421,7 @@ class ConsoleAddon: self._grideditor().cmd_read_file_escaped(path) @command.command("console.grideditor.save") - def grideditor_save(self, path: command.Path) -> None: + def grideditor_save(self, path: mitmproxy.types.Path) -> None: """ Save data to file as a CSV. """ @@ -440,7 +442,7 @@ class ConsoleAddon: self._grideditor().cmd_spawn_editor() @command.command("console.flowview.mode.set") - @command.argument("mode", type=command.Choice("console.flowview.mode.options")) + @command.argument("mode", type=mitmproxy.types.Choice("console.flowview.mode.options")) def flowview_mode_set(self, mode: str) -> None: """ Set the display mode for the current flow view. @@ -498,8 +500,8 @@ class ConsoleAddon: self, contexts: typing.Sequence[str], key: str, - cmd: command.Cmd, - *args: command.Arg + cmd: mitmproxy.types.Cmd, + *args: mitmproxy.types.Arg ) -> None: """ Bind a shortcut key. diff --git a/mitmproxy/types.py b/mitmproxy/types.py new file mode 100644 index 00000000..e1b6f95d --- /dev/null +++ b/mitmproxy/types.py @@ -0,0 +1,330 @@ +import os +import glob +import typing + +from mitmproxy import exceptions +from mitmproxy import flow + + +class Path(str): + pass + + +class Cmd(str): + pass + + +class Arg(str): + pass + + +class CutSpec(typing.Sequence[str]): + pass + + +class Data(typing.Sequence[typing.Sequence[typing.Union[str, bytes]]]): + pass + + +class Choice: + def __init__(self, options_command): + self.options_command = options_command + + def __instancecheck__(self, instance): # pragma: no cover + # return false here so that arguments are piped through parsearg, + # which does extended validation. + return False + + +# One of the many charming things about mypy is that introducing type +# annotations can cause circular dependencies where there were none before. +# Rather than putting types and the CommandManger in the same file, we introduce +# a stub type with the signature we use. +class _CommandStub: + commands = {} # type: typing.Mapping[str, typing.Any] + + def call_args(self, path: str, args: typing.Sequence[str]) -> typing.Any: # pragma: no cover + pass + + def call(self, args: typing.Sequence[str]) -> typing.Any: # pragma: no cover + pass + + +class BaseType: + typ = object # type: typing.Type + display = "" # type: str + + def completion( + self, manager: _CommandStub, t: type, s: str + ) -> typing.Sequence[str]: # pragma: no cover + pass + + def parse( + self, manager: _CommandStub, t: type, s: str + ) -> typing.Any: # pragma: no cover + pass + + +class Bool(BaseType): + typ = bool + display = "bool" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return ["false", "true"] + + def parse(self, manager: _CommandStub, t: type, s: str) -> bool: + if s == "true": + return True + elif s == "false": + return False + else: + raise exceptions.TypeError( + "Booleans are 'true' or 'false', got %s" % s + ) + + +class Str(BaseType): + typ = str + display = "str" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandStub, t: type, s: str) -> str: + return s + + +class Int(BaseType): + typ = int + display = "int" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandStub, t: type, s: str) -> int: + try: + return int(s) + except ValueError as e: + raise exceptions.TypeError from e + + +class PathType(BaseType): + typ = Path + display = "path" + + def completion(self, manager: _CommandStub, t: type, start: str) -> typing.Sequence[str]: + if not start: + start = "./" + path = os.path.expanduser(start) + ret = [] + if os.path.isdir(path): + files = glob.glob(os.path.join(path, "*")) + prefix = start + else: + files = glob.glob(path + "*") + prefix = os.path.dirname(start) + prefix = prefix or "./" + for f in files: + display = os.path.join(prefix, os.path.normpath(os.path.basename(f))) + if os.path.isdir(f): + display += "/" + ret.append(display) + if not ret: + ret = [start] + ret.sort() + return ret + + def parse(self, manager: _CommandStub, t: type, s: str) -> str: + return s + + +class CmdType(BaseType): + typ = Cmd + display = "cmd" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return list(manager.commands.keys()) + + def parse(self, manager: _CommandStub, t: type, s: str) -> str: + return s + + +class ArgType(BaseType): + typ = Arg + display = "arg" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandStub, t: type, s: str) -> str: + return s + + +class StrSeq(BaseType): + typ = typing.Sequence[str] + display = "[str]" + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return [] + + def parse(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return [x.strip() for x in s.split(",")] + + +class CutSpecType(BaseType): + typ = CutSpec + display = "[cut]" + valid_prefixes = [ + "request.method", + "request.scheme", + "request.host", + "request.http_version", + "request.port", + "request.path", + "request.url", + "request.text", + "request.content", + "request.raw_content", + "request.timestamp_start", + "request.timestamp_end", + "request.header[", + + "response.status_code", + "response.reason", + "response.text", + "response.content", + "response.timestamp_start", + "response.timestamp_end", + "response.raw_content", + "response.header[", + + "client_conn.address.port", + "client_conn.address.host", + "client_conn.tls_version", + "client_conn.sni", + "client_conn.ssl_established", + + "server_conn.address.port", + "server_conn.address.host", + "server_conn.ip_address.host", + "server_conn.tls_version", + "server_conn.sni", + "server_conn.ssl_established", + ] + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + spec = s.split(",") + opts = [] + for pref in self.valid_prefixes: + spec[-1] = pref + opts.append(",".join(spec)) + return opts + + def parse(self, manager: _CommandStub, t: type, s: str) -> CutSpec: + parts = s.split(",") # type: typing.Any + return parts + + +class BaseFlowType(BaseType): + valid_prefixes = [ + "@all", + "@focus", + "@shown", + "@hidden", + "@marked", + "@unmarked", + "~q", + "~s", + "~a", + "~hq", + "~hs", + "~b", + "~bq", + "~bs", + "~t", + "~d", + "~m", + "~u", + "~c", + ] + + def completion(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[str]: + return self.valid_prefixes + + +class FlowType(BaseFlowType): + typ = flow.Flow + display = "flow" + + def parse(self, manager: _CommandStub, t: type, s: str) -> flow.Flow: + flows = manager.call_args("view.resolve", [s]) + if len(flows) != 1: + raise exceptions.TypeError( + "Command requires one flow, specification matched %s." % len(flows) + ) + return flows[0] + + +class FlowsType(BaseFlowType): + typ = typing.Sequence[flow.Flow] + display = "[flow]" + + def parse(self, manager: _CommandStub, t: type, s: str) -> typing.Sequence[flow.Flow]: + return manager.call_args("view.resolve", [s]) + + +class DataType: + typ = Data + display = "[data]" + + def completion( + self, manager: _CommandStub, t: type, s: str + ) -> typing.Sequence[str]: # pragma: no cover + raise exceptions.TypeError("data cannot be passed as argument") + + def parse( + self, manager: _CommandStub, t: type, s: str + ) -> typing.Any: # pragma: no cover + raise exceptions.TypeError("data cannot be passed as argument") + + +class ChoiceType: + typ = Choice + display = "choice" + + def completion(self, manager: _CommandStub, t: Choice, s: str) -> typing.Sequence[str]: + return manager.call(t.options_command) + + def parse(self, manager: _CommandStub, t: Choice, s: str) -> str: + opts = manager.call(t.options_command) + if s not in opts: + raise exceptions.TypeError("Invalid choice.") + return s + + +class TypeManager: + def __init__(self, *types): + self.typemap = {} + for t in types: + self.typemap[t.typ] = t() + + def get(self, t: type, default=None) -> BaseType: + if type(t) in self.typemap: + return self.typemap[type(t)] + return self.typemap.get(t, default) + + +CommandTypes = TypeManager( + ArgType, + Bool, + ChoiceType, + CmdType, + CutSpecType, + DataType, + FlowType, + FlowsType, + Int, + PathType, + Str, + StrSeq, +) |
