diff options
Diffstat (limited to 'mitmproxy/tools')
-rw-r--r-- | mitmproxy/tools/console/commander/commander.py | 157 | ||||
-rw-r--r-- | mitmproxy/tools/console/commands.py | 23 | ||||
-rw-r--r-- | mitmproxy/tools/console/consoleaddons.py | 131 | ||||
-rw-r--r-- | mitmproxy/tools/console/defaultkeys.py | 10 | ||||
-rw-r--r-- | mitmproxy/tools/console/statusbar.py | 12 |
5 files changed, 166 insertions, 167 deletions
diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index f291b8fd..d751422b 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -1,58 +1,55 @@ import abc +import collections import copy import typing -import collections import urwid from urwid.text_layout import calc_coords +import mitmproxy.command import mitmproxy.flow import mitmproxy.master -import mitmproxy.command import mitmproxy.types -class Completer: # pragma: no cover +class Completer: @abc.abstractmethod - def cycle(self) -> str: - pass + def cycle(self, forward: bool = True) -> str: + raise NotImplementedError() class ListCompleter(Completer): def __init__( - self, - start: str, - options: typing.Sequence[str], + self, + start: str, + options: typing.Sequence[str], ) -> None: self.start = start - self.options: typing.Sequence[str] = [] + self.options: typing.List[str] = [] for o in options: if o.startswith(start): self.options.append(o) self.options.sort() self.offset = 0 - def cycle(self) -> str: + def cycle(self, forward: bool = True) -> str: if not self.options: return self.start ret = self.options[self.offset] - self.offset = (self.offset + 1) % len(self.options) + delta = 1 if forward else -1 + self.offset = (self.offset + delta) % len(self.options) return ret -CompletionState = typing.NamedTuple( - "CompletionState", - [ - ("completer", Completer), - ("parse", typing.Sequence[mitmproxy.command.ParseResult]) - ] -) +class CompletionState(typing.NamedTuple): + completer: Completer + parsed: typing.Sequence[mitmproxy.command.ParseResult] class CommandBuffer: def __init__(self, master: mitmproxy.master.Master, start: str = "") -> None: self.master = master - self.text = self.flatten(start) + self.text = start # Cursor is always within the range [0:len(buffer)]. self._cursor = len(self.text) self.completion: typing.Optional[CompletionState] = None @@ -70,51 +67,30 @@ class CommandBuffer: else: self._cursor = x - def maybequote(self, value): - if " " in value and not value.startswith("\""): - return "\"%s\"" % value - return value - - def parse_quoted(self, txt): - parts, remhelp = self.master.commands.parse_partial(txt) - for i, p in enumerate(parts): - parts[i] = mitmproxy.command.ParseResult( - value = self.maybequote(p.value), - type = p.type, - valid = p.valid - ) - return parts, remhelp - def render(self): - """ - This function is somewhat tricky - in order to make the cursor - position valid, we have to make sure there is a - character-for-character offset match in the rendered output, up - to the cursor. Beyond that, we can add stuff. - """ - parts, remhelp = self.parse_quoted(self.text) + parts, remaining = self.master.commands.parse_partial(self.text) ret = [] - for p in parts: - if p.valid: - if p.type == mitmproxy.types.Cmd: - ret.append(("commander_command", p.value)) - else: - ret.append(("text", p.value)) - elif p.value: - ret.append(("commander_invalid", p.value)) - else: - ret.append(("text", "")) - ret.append(("text", " ")) - if remhelp: - ret.append(("text", " ")) - for v in remhelp: - ret.append(("commander_hint", "%s " % v)) - return ret + if not parts: + # Means we just received the leader, so we need to give a blank + # text to the widget to render or it crashes + ret.append(("text", "")) + else: + for p in parts: + if p.valid: + if p.type == mitmproxy.types.Cmd: + ret.append(("commander_command", p.value)) + else: + ret.append(("text", p.value)) + elif p.value: + ret.append(("commander_invalid", p.value)) + + if remaining: + if parts[-1].type != mitmproxy.types.Space: + ret.append(("text", " ")) + for param in remaining: + ret.append(("commander_hint", f"{param} ")) - def flatten(self, txt): - parts, _ = self.parse_quoted(txt) - ret = [x.value for x in parts] - return " ".join(ret) + return ret def left(self) -> None: self.cursor = self.cursor - 1 @@ -122,30 +98,38 @@ class CommandBuffer: def right(self) -> None: self.cursor = self.cursor + 1 - def cycle_completion(self) -> None: + def cycle_completion(self, forward: bool = True) -> None: if not self.completion: - parts, remainhelp = self.master.commands.parse_partial(self.text[:self.cursor]) - last = parts[-1] - ct = mitmproxy.types.CommandTypes.get(last.type, None) + parts, remaining = self.master.commands.parse_partial(self.text[:self.cursor]) + if parts and parts[-1].type != mitmproxy.types.Space: + type_to_complete = parts[-1].type + cycle_prefix = parts[-1].value + parsed = parts[:-1] + elif remaining: + type_to_complete = remaining[0].type + cycle_prefix = "" + parsed = parts + else: + return + ct = mitmproxy.types.CommandTypes.get(type_to_complete, None) if ct: self.completion = CompletionState( - completer = ListCompleter( - parts[-1].value, - ct.completion(self.master.commands, last.type, parts[-1].value) + completer=ListCompleter( + cycle_prefix, + ct.completion(self.master.commands, type_to_complete, cycle_prefix) ), - parse = parts, + parsed=parsed, ) if self.completion: - nxt = self.completion.completer.cycle() - buf = " ".join([i.value for i in self.completion.parse[:-1]]) + " " + nxt - buf = buf.strip() - self.text = self.flatten(buf) + nxt = self.completion.completer.cycle(forward) + buf = "".join([i.value for i in self.completion.parsed]) + nxt + self.text = buf self.cursor = len(self.text) def backspace(self) -> None: if self.cursor == 0: return - self.text = self.flatten(self.text[:self.cursor - 1] + self.text[self.cursor:]) + self.text = self.text[:self.cursor - 1] + self.text[self.cursor:] self.cursor = self.cursor - 1 self.completion = None @@ -153,13 +137,18 @@ class CommandBuffer: """ Inserts text at the cursor. """ - self.text = self.flatten(self.text[:self.cursor] + k + self.text[self.cursor:]) - self.cursor += 1 + + # We don't want to insert a space before the command + if k == ' ' and self.text[0:self.cursor].strip() == '': + return + + self.text = self.text[:self.cursor] + k + self.text[self.cursor:] + self.cursor += len(k) self.completion = None class CommandHistory: - def __init__(self, master: mitmproxy.master.Master, size: int=30) -> None: + def __init__(self, master: mitmproxy.master.Master, size: int = 30) -> None: self.saved_commands: collections.deque = collections.deque( [CommandBuffer(master, "")], maxlen=size @@ -182,7 +171,7 @@ class CommandHistory: return self.saved_commands[self.index] return None - def add_command(self, command: CommandBuffer, execution: bool=False) -> None: + def add_command(self, command: CommandBuffer, execution: bool = False) -> None: if self.index == self.last_index or execution: last_item = self.saved_commands[-1] last_item_empty = not last_item.text @@ -207,7 +196,7 @@ class CommandEdit(urwid.WidgetWrap): self.history = history self.update() - def keypress(self, size, key): + def keypress(self, size, key) -> None: if key == "backspace": self.cbuf.backspace() elif key == "left": @@ -219,27 +208,29 @@ class CommandEdit(urwid.WidgetWrap): self.cbuf = self.history.get_prev() or self.cbuf elif key == "down": self.cbuf = self.history.get_next() or self.cbuf + elif key == "shift tab": + self.cbuf.cycle_completion(False) elif key == "tab": self.cbuf.cycle_completion() elif len(key) == 1: self.cbuf.insert(key) self.update() - def update(self): + def update(self) -> None: self._w.set_text([self.leader, self.cbuf.render()]) - def render(self, size, focus=False): + def render(self, size, focus=False) -> urwid.Canvas: (maxcol,) = size canv = self._w.render((maxcol,)) canv = urwid.CompositeCanvas(canv) canv.cursor = self.get_cursor_coords((maxcol,)) return canv - def get_cursor_coords(self, size): + def get_cursor_coords(self, size) -> typing.Tuple[int, int]: p = self.cbuf.cursor + len(self.leader) trans = self._w.get_line_translation(size[0]) x, y = calc_coords(self._w.get_text()[0], trans, p) return x, y - def get_edit_text(self): + def get_edit_text(self) -> str: return self.cbuf.text diff --git a/mitmproxy/tools/console/commands.py b/mitmproxy/tools/console/commands.py index 0f35742b..26a99b14 100644 --- a/mitmproxy/tools/console/commands.py +++ b/mitmproxy/tools/console/commands.py @@ -1,6 +1,8 @@ import urwid import blinker import textwrap + +from mitmproxy import command from mitmproxy.tools.console import layoutwidget from mitmproxy.tools.console import signals @@ -10,7 +12,7 @@ command_focus_change = blinker.Signal() class CommandItem(urwid.WidgetWrap): - def __init__(self, walker, cmd, focused): + def __init__(self, walker, cmd: command.Command, focused: bool): self.walker, self.cmd, self.focused = walker, cmd, focused super().__init__(None) self._w = self.get_widget() @@ -18,15 +20,18 @@ class CommandItem(urwid.WidgetWrap): def get_widget(self): parts = [ ("focus", ">> " if self.focused else " "), - ("title", self.cmd.path), - ("text", " "), - ("text", " ".join(self.cmd.paramnames())), + ("title", self.cmd.name) ] - if self.cmd.returntype: - parts.append([ + if self.cmd.parameters: + parts += [ + ("text", " "), + ("text", " ".join(str(param) for param in self.cmd.parameters)), + ] + if self.cmd.return_type: + parts += [ ("title", " -> "), - ("text", self.cmd.retname()), - ]) + ("text", command.typename(self.cmd.return_type)), + ] return urwid.AttrMap( urwid.Padding(urwid.Text(parts)), @@ -92,7 +97,7 @@ class CommandsList(urwid.ListBox): def keypress(self, size, key): if key == "m_select": foc, idx = self.get_focus() - signals.status_prompt_command.send(partial=foc.cmd.path + " ") + signals.status_prompt_command.send(partial=foc.cmd.name + " ") elif key == "m_start": self.set_focus(0) self.walker._modified() diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py index 9f595b42..7fcd9b48 100644 --- a/mitmproxy/tools/console/consoleaddons.py +++ b/mitmproxy/tools/console/consoleaddons.py @@ -1,21 +1,18 @@ import csv -import shlex import typing +import mitmproxy.types +from mitmproxy import command, command_lexer +from mitmproxy import contentviews from mitmproxy import ctx -from mitmproxy import command from mitmproxy import exceptions from mitmproxy import flow from mitmproxy import http from mitmproxy import log -from mitmproxy import contentviews -from mitmproxy.utils import strutils -import mitmproxy.types - - +from mitmproxy.tools.console import keymap from mitmproxy.tools.console import overlay from mitmproxy.tools.console import signals -from mitmproxy.tools.console import keymap +from mitmproxy.utils import strutils console_palettes = [ "lowlight", @@ -48,10 +45,12 @@ class UnsupportedLog: """ A small addon to dump info on flow types we don't support yet. """ + def websocket_message(self, f): message = f.messages[-1] ctx.log.info(f.message_info(message)) - ctx.log.debug(message.content if isinstance(message.content, str) else strutils.bytes_to_escaped_str(message.content)) + ctx.log.debug( + message.content if isinstance(message.content, str) else strutils.bytes_to_escaped_str(message.content)) def websocket_end(self, f): ctx.log.info("WebSocket connection closed by {}: {} {}, {}".format( @@ -78,6 +77,7 @@ class ConsoleAddon: An addon that exposes console-specific commands, and hooks into required events. """ + def __init__(self, master): self.master = master self.started = False @@ -86,7 +86,7 @@ class ConsoleAddon: loader.add_option( "console_default_contentview", str, "auto", "The default content view mode.", - choices = [i.name.lower() for i in contentviews.views] + choices=[i.name.lower() for i in contentviews.views] ) loader.add_option( "console_eventlog_verbosity", str, 'info', @@ -142,7 +142,7 @@ class ConsoleAddon: opts = self.layout_options() off = self.layout_options().index(ctx.options.console_layout) ctx.options.update( - console_layout = opts[(off + 1) % len(opts)] + console_layout=opts[(off + 1) % len(opts)] ) @command.command("console.panes.next") @@ -234,17 +234,18 @@ class ConsoleAddon: @command.command("console.choose") def console_choose( - self, - prompt: str, - choices: typing.Sequence[str], - cmd: mitmproxy.types.Cmd, - *args: mitmproxy.types.Arg + self, + prompt: str, + choices: typing.Sequence[str], + cmd: mitmproxy.types.Cmd, + *args: mitmproxy.types.CmdArgs ) -> None: """ Prompt the user to choose from a specified list of strings, then invoke another command with all occurrences of {choice} replaced by the choice the user made. """ + def callback(opt): # We're now outside of the call context... repl = cmd + " " + " ".join(args) @@ -260,22 +261,22 @@ class ConsoleAddon: @command.command("console.choose.cmd") def console_choose_cmd( - self, - prompt: str, - choicecmd: mitmproxy.types.Cmd, - subcmd: mitmproxy.types.Cmd, - *args: mitmproxy.types.Arg + self, + prompt: str, + choicecmd: mitmproxy.types.Cmd, + subcmd: mitmproxy.types.Cmd, + *args: mitmproxy.types.CmdArgs ) -> None: """ Prompt the user to choose from a list of strings returned by a command, then invoke another command with all occurrences of {choice} replaced by the choice the user made. """ - choices = ctx.master.commands.call_strings(choicecmd, []) + choices = ctx.master.commands.execute(choicecmd) def callback(opt): # We're now outside of the call context... - repl = shlex.quote(" ".join(args)) + repl = " ".join(command_lexer.quote(x) for x in args) repl = repl.replace("{choice}", opt) try: self.master.commands.execute(subcmd + " " + repl) @@ -287,21 +288,24 @@ class ConsoleAddon: ) @command.command("console.command") - def console_command(self, *partial: str) -> None: + def console_command(self, *command_str: str) -> None: """ Prompt the user to edit a command with a (possibly empty) starting value. """ - signals.status_prompt_command.send(partial=" ".join(partial)) # type: ignore + quoted = " ".join(command_lexer.quote(x) for x in command_str) + signals.status_prompt_command.send(partial=quoted) @command.command("console.command.set") - def console_command_set(self, option: str) -> None: + def console_command_set(self, option_name: str) -> None: """ - Prompt the user to set an option of the form "key[=value]". + Prompt the user to set an option. """ - option_value = getattr(self.master.options, option, None) - current_value = option_value if option_value else "" - self.master.commands.execute( - "console.command set %s=%s" % (option, current_value) + option_value = getattr(self.master.options, option_name, None) or "" + set_command = f"set {option_name} {option_value!r}" + cursor = len(set_command) - 1 + signals.status_prompt_command.send( + partial=set_command, + cursor=cursor ) @command.command("console.view.keybindings") @@ -351,14 +355,14 @@ class ConsoleAddon: @command.command("console.bodyview") @command.argument("part", type=mitmproxy.types.Choice("console.bodyview.options")) - def bodyview(self, f: flow.Flow, part: str) -> None: + def bodyview(self, flow: flow.Flow, part: str) -> None: """ Spawn an external viewer for a flow request or response body based on the detected MIME type. We use the mailcap system to find the correct viewier, and fall back to the programs in $PAGER or $EDITOR if necessary. """ - fpart = getattr(f, part, None) + fpart = getattr(flow, part, None) if not fpart: raise exceptions.CommandError("Part must be either request or response, not %s." % part) t = fpart.headers.get("content-type") @@ -397,8 +401,8 @@ class ConsoleAddon: ] @command.command("console.edit.focus") - @command.argument("part", type=mitmproxy.types.Choice("console.edit.focus.options")) - def edit_focus(self, part: str) -> None: + @command.argument("flow_part", type=mitmproxy.types.Choice("console.edit.focus.options")) + def edit_focus(self, flow_part: str) -> None: """ Edit a component of the currently focused flow. """ @@ -410,27 +414,27 @@ class ConsoleAddon: flow.backup() require_dummy_response = ( - part in ("response-headers", "response-body", "set-cookies") and - flow.response is None + flow_part in ("response-headers", "response-body", "set-cookies") and + flow.response is None ) if require_dummy_response: flow.response = http.HTTPResponse.make() - if part == "cookies": + if flow_part == "cookies": self.master.switch_view("edit_focus_cookies") - elif part == "urlencoded form": + elif flow_part == "urlencoded form": self.master.switch_view("edit_focus_urlencoded_form") - elif part == "multipart form": + elif flow_part == "multipart form": self.master.switch_view("edit_focus_multipart_form") - elif part == "path": + elif flow_part == "path": self.master.switch_view("edit_focus_path") - elif part == "query": + elif flow_part == "query": self.master.switch_view("edit_focus_query") - elif part == "request-headers": + elif flow_part == "request-headers": self.master.switch_view("edit_focus_request_headers") - elif part == "response-headers": + elif flow_part == "response-headers": self.master.switch_view("edit_focus_response_headers") - elif part in ("request-body", "response-body"): - if part == "request-body": + elif flow_part in ("request-body", "response-body"): + if flow_part == "request-body": message = flow.request else: message = flow.response @@ -442,16 +446,16 @@ class ConsoleAddon: # just strip the newlines off the end of the body when we return # from an editor. message.content = c.rstrip(b"\n") - elif part == "set-cookies": + elif flow_part == "set-cookies": self.master.switch_view("edit_focus_setcookies") - elif part == "url": + elif flow_part == "url": url = flow.request.url.encode() edited_url = self.master.spawn_editor(url) url = edited_url.rstrip(b"\n") flow.request.url = url.decode() - elif part in ["method", "status_code", "reason"]: + elif flow_part in ["method", "status_code", "reason"]: self.master.commands.execute( - "console.command flow.set @focus %s " % part + "console.command flow.set @focus %s " % flow_part ) def _grideditor(self): @@ -535,10 +539,8 @@ class ConsoleAddon: raise exceptions.CommandError("Invalid flowview mode.") try: - self.master.commands.call_strings( - "view.settings.setval", - ["@focus", "flowview_mode_%s" % idx, mode] - ) + cmd = 'view.settings.setval @focus flowview_mode_%s %s' % (idx, mode) + self.master.commands.execute(cmd) except exceptions.CommandError as e: signals.status_message.send(message=str(e)) @@ -558,14 +560,9 @@ class ConsoleAddon: if not fv: raise exceptions.CommandError("Not viewing a flow.") idx = fv.body.tab_offset - return self.master.commands.call_strings( - "view.settings.getval", - [ - "@focus", - "flowview_mode_%s" % idx, - self.master.options.console_default_contentview, - ] - ) + + cmd = 'view.settings.getval @focus flowview_mode_%s %s' % (idx, self.master.options.console_default_contentview) + return self.master.commands.execute(cmd) @command.command("console.key.contexts") def key_contexts(self) -> typing.Sequence[str]: @@ -576,11 +573,11 @@ class ConsoleAddon: @command.command("console.key.bind") def key_bind( - self, - contexts: typing.Sequence[str], - key: str, - cmd: mitmproxy.types.Cmd, - *args: mitmproxy.types.Arg + self, + contexts: typing.Sequence[str], + key: str, + cmd: mitmproxy.types.Cmd, + *args: mitmproxy.types.CmdArgs ) -> None: """ Bind a shortcut key. diff --git a/mitmproxy/tools/console/defaultkeys.py b/mitmproxy/tools/console/defaultkeys.py index 0a6c5561..a0f27625 100644 --- a/mitmproxy/tools/console/defaultkeys.py +++ b/mitmproxy/tools/console/defaultkeys.py @@ -26,7 +26,7 @@ def map(km): km.add("ctrl f", "console.nav.pagedown", ["global"], "Page down") km.add("ctrl b", "console.nav.pageup", ["global"], "Page up") - km.add("I", "set intercept_active=toggle", ["global"], "Toggle intercept") + km.add("I", "set intercept_active toggle", ["global"], "Toggle intercept") km.add("i", "console.command.set intercept", ["global"], "Set intercept") km.add("W", "console.command.set save_stream_file", ["global"], "Stream to file") km.add("A", "flow.resume @all", ["flowlist", "flowview"], "Resume all intercepted flows") @@ -48,14 +48,14 @@ def map(km): "Export this flow to file" ) km.add("f", "console.command.set view_filter", ["flowlist"], "Set view filter") - km.add("F", "set console_focus_follow=toggle", ["flowlist"], "Set focus follow") + km.add("F", "set console_focus_follow toggle", ["flowlist"], "Set focus follow") km.add( "ctrl l", "console.command cut.clip ", ["flowlist", "flowview"], "Send cuts to clipboard" ) - km.add("L", "console.command view.load ", ["flowlist"], "Load flows from file") + km.add("L", "console.command view.flows.load ", ["flowlist"], "Load flows from file") km.add("m", "flow.mark.toggle @focus", ["flowlist"], "Toggle mark on this flow") km.add("M", "view.properties.marked.toggle", ["flowlist"], "Toggle viewing marked flows") km.add( @@ -68,14 +68,14 @@ def map(km): "o", """ console.choose.cmd Order view.order.options - set view_order={choice} + set view_order {choice} """, ["flowlist"], "Set flow list order" ) km.add("r", "replay.client @focus", ["flowlist", "flowview"], "Replay this flow") km.add("S", "console.command replay.server ", ["flowlist"], "Start server replay") - km.add("v", "set view_order_reversed=toggle", ["flowlist"], "Reverse flow list order") + km.add("v", "set view_order_reversed toggle", ["flowlist"], "Reverse flow list order") km.add("U", "flow.mark @all false", ["flowlist"], "Un-set all marks") km.add("w", "console.command save.file @shown ", ["flowlist"], "Save listed flows to file") km.add("V", "flow.revert @focus", ["flowlist", "flowview"], "Revert changes to this flow") diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index 56f0674f..43f5170d 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -1,4 +1,5 @@ import os.path +from typing import Optional import urwid @@ -98,10 +99,15 @@ class ActionBar(urwid.WidgetWrap): self._w = urwid.Edit(self.prep_prompt(prompt), text or "") self.prompting = PromptStub(callback, args) - def sig_prompt_command(self, sender, partial=""): + def sig_prompt_command(self, sender, partial: str = "", cursor: Optional[int] = None): signals.focus.send(self, section="footer") - self._w = commander.CommandEdit(self.master, partial, - self.command_history) + self._w = commander.CommandEdit( + self.master, + partial, + self.command_history, + ) + if cursor is not None: + self._w.cbuf.cursor = cursor self.prompting = commandexecutor.CommandExecutor(self.master) def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()): |