diff options
author | Aldo Cortesi <aldo@corte.si> | 2017-06-14 11:09:26 +1200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-06-14 11:09:26 +1200 |
commit | 309274689c3d2eb910b368218706cecbfcc11044 (patch) | |
tree | e1c4946bf2319872c80df0bfaca98f1455fe2266 | |
parent | 08972c3f5bb76b6ff60ea3124c85e3e3cd6f30f0 (diff) | |
parent | e8939b8b9fd53f49cc06f1f75c88ca9e63823b9e (diff) | |
download | mitmproxy-309274689c3d2eb910b368218706cecbfcc11044.tar.gz mitmproxy-309274689c3d2eb910b368218706cecbfcc11044.tar.bz2 mitmproxy-309274689c3d2eb910b368218706cecbfcc11044.zip |
Merge pull request #2397 from cortesi/neverenoughconsole
console: keymap-related improvements
-rw-r--r-- | mitmproxy/command.py | 3 | ||||
-rw-r--r-- | mitmproxy/tools/console/consoleaddons.py | 500 | ||||
-rw-r--r-- | mitmproxy/tools/console/defaultkeys.py | 36 | ||||
-rw-r--r-- | mitmproxy/tools/console/keybindings.py | 13 | ||||
-rw-r--r-- | mitmproxy/tools/console/keymap.py | 108 | ||||
-rw-r--r-- | mitmproxy/tools/console/master.py | 430 | ||||
-rw-r--r-- | mitmproxy/tools/console/signals.py | 3 | ||||
-rw-r--r-- | test/mitmproxy/tools/console/test_keymap.py | 40 |
8 files changed, 675 insertions, 458 deletions
diff --git a/mitmproxy/command.py b/mitmproxy/command.py index 2256e4ca..c9776bc3 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -74,7 +74,8 @@ class Command: def call(self, args: typing.Sequence[str]): """ - Call the command with a set of arguments. At this point, all argumets are strings. + Call the command with a list of arguments. At this point, all + arguments are strings. """ if not self.has_positional and (len(self.paramtypes) != len(args)): raise exceptions.CommandError("Usage: %s" % self.signature_help()) diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py new file mode 100644 index 00000000..a65f0afe --- /dev/null +++ b/mitmproxy/tools/console/consoleaddons.py @@ -0,0 +1,500 @@ +import typing + +from mitmproxy import ctx +from mitmproxy import command +from mitmproxy import exceptions +from mitmproxy import flow +from mitmproxy import contentviews +from mitmproxy.utils import strutils + +from mitmproxy.tools.console import overlay +from mitmproxy.tools.console import signals +from mitmproxy.tools.console import keymap + + +class Logger: + def log(self, evt): + signals.add_log(evt.msg, evt.level) + if evt.level == "alert": + signals.status_message.send( + message=str(evt.msg), + expire=2 + ) + + +class UnsupportedLog: + """ + A small addon to dump info on flow types we don't support yet. + """ + def websocket_message(self, f): + message = f.messages[-1] + signals.add_log(f.message_info(message), "info") + signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug") + + def websocket_end(self, f): + signals.add_log("WebSocket connection closed by {}: {} {}, {}".format( + f.close_sender, + f.close_code, + f.close_message, + f.close_reason), "info") + + def tcp_message(self, f): + message = f.messages[-1] + direction = "->" if message.from_client else "<-" + signals.add_log("{client_host}:{client_port} {direction} tcp {direction} {server_host}:{server_port}".format( + client_host=f.client_conn.address[0], + client_port=f.client_conn.address[1], + server_host=f.server_conn.address[0], + server_port=f.server_conn.address[1], + direction=direction, + ), "info") + signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug") + + +class ConsoleAddon: + """ + An addon that exposes console-specific commands, and hooks into required + events. + """ + def __init__(self, master): + self.master = master + self.started = False + + @command.command("console.layout.options") + def layout_options(self) -> typing.Sequence[str]: + """ + Returns the valid options for console layout. Use these by setting + the console_layout option. + """ + return ["single", "vertical", "horizontal"] + + @command.command("console.layout.cycle") + def layout_cycle(self) -> None: + """ + Cycle through the console layout options. + """ + opts = self.layout_options() + off = self.layout_options().index(ctx.options.console_layout) + ctx.options.update( + console_layout = opts[(off + 1) % len(opts)] + ) + + @command.command("console.panes.next") + def panes_next(self) -> None: + """ + Go to the next layout pane. + """ + self.master.window.switch() + + @command.command("console.options.reset.focus") + def options_reset_current(self) -> None: + """ + Reset the current option in the options editor. + """ + fv = self.master.window.current("options") + if not fv: + raise exceptions.CommandError("Not viewing options.") + self.master.commands.call("options.reset.one %s" % fv.current_name()) + + @command.command("console.nav.start") + def nav_start(self) -> None: + """ + Go to the start of a list or scrollable. + """ + self.master.inject_key("m_start") + + @command.command("console.nav.end") + def nav_end(self) -> None: + """ + Go to the end of a list or scrollable. + """ + self.master.inject_key("m_end") + + @command.command("console.nav.next") + def nav_next(self) -> None: + """ + Go to the next navigatable item. + """ + self.master.inject_key("m_next") + + @command.command("console.nav.select") + def nav_select(self) -> None: + """ + Select a navigable item for viewing or editing. + """ + self.master.inject_key("m_select") + + @command.command("console.nav.up") + def nav_up(self) -> None: + """ + Go up. + """ + self.master.inject_key("up") + + @command.command("console.nav.down") + def nav_down(self) -> None: + """ + Go down. + """ + self.master.inject_key("down") + + @command.command("console.nav.pageup") + def nav_pageup(self) -> None: + """ + Go up. + """ + self.master.inject_key("page up") + + @command.command("console.nav.pagedown") + def nav_pagedown(self) -> None: + """ + Go down. + """ + self.master.inject_key("page down") + + @command.command("console.nav.left") + def nav_left(self) -> None: + """ + Go left. + """ + self.master.inject_key("left") + + @command.command("console.nav.right") + def nav_right(self) -> None: + """ + Go right. + """ + self.master.inject_key("right") + + @command.command("console.choose") + def console_choose( + self, prompt: str, choices: typing.Sequence[str], *cmd: str + ) -> None: + """ + Prompt the user to choose from a specified list of strings, then + invoke another command with all occurances of {choice} replaced by + the choice the user made. + """ + def callback(opt): + # We're now outside of the call context... + repl = " ".join(cmd) + repl = repl.replace("{choice}", opt) + try: + self.master.commands.call(repl) + except exceptions.CommandError as e: + signals.status_message.send(message=str(e)) + + self.master.overlay( + overlay.Chooser(self.master, prompt, choices, "", callback) + ) + + @command.command("console.choose.cmd") + def console_choose_cmd( + self, prompt: str, choicecmd: str, *cmd: str + ) -> None: + """ + Prompt the user to choose from a list of strings returned by a + command, then invoke another command with all occurances of {choice} + replaced by the choice the user made. + """ + choices = ctx.master.commands.call_args(choicecmd, []) + + def callback(opt): + # We're now outside of the call context... + repl = " ".join(cmd) + repl = repl.replace("{choice}", opt) + try: + self.master.commands.call(repl) + except exceptions.CommandError as e: + signals.status_message.send(message=str(e)) + + self.master.overlay( + overlay.Chooser(self.master, prompt, choices, "", callback) + ) + + @command.command("console.command") + def console_command(self, *partial: str) -> None: + """ + Prompt the user to edit a command with a (possilby empty) starting value. + """ + signals.status_prompt_command.send(partial=" ".join(partial)) # type: ignore + + @command.command("console.view.keybindings") + def view_keybindings(self) -> None: + """View the commands list.""" + self.master.switch_view("keybindings") + + @command.command("console.view.commands") + def view_commands(self) -> None: + """View the commands list.""" + self.master.switch_view("commands") + + @command.command("console.view.options") + def view_options(self) -> None: + """View the options editor.""" + self.master.switch_view("options") + + @command.command("console.view.eventlog") + def view_eventlog(self) -> None: + """View the options editor.""" + self.master.switch_view("eventlog") + + @command.command("console.view.help") + def view_help(self) -> None: + """View help.""" + self.master.switch_view("help") + + @command.command("console.view.flow") + def view_flow(self, flow: flow.Flow) -> None: + """View a flow.""" + if hasattr(flow, "request"): + # FIME: Also set focus? + self.master.switch_view("flowview") + + @command.command("console.exit") + def exit(self) -> None: + """Exit mitmproxy.""" + self.master.shutdown() + + @command.command("console.view.pop") + def view_pop(self) -> None: + """ + Pop a view off the console stack. At the top level, this prompts the + user to exit mitmproxy. + """ + signals.pop_view_state.send(self) + + @command.command("console.bodyview") + def bodyview(self, f: flow.Flow, part: str) -> None: + """ + Spawn an external viewer for a flow request or response body based + on the detected MIME type. We use the mailcap system to find the + correct viewier, and fall back to the programs in $PAGER or $EDITOR + if necessary. + """ + fpart = getattr(f, part) + if not fpart: + raise exceptions.CommandError("Could not view part %s." % part) + t = fpart.headers.get("content-type") + content = fpart.get_content(strict=False) + if not content: + raise exceptions.CommandError("No content to view.") + self.master.spawn_external_viewer(content, t) + + @command.command("console.edit.focus.options") + def edit_focus_options(self) -> typing.Sequence[str]: + return [ + "cookies", + "form", + "path", + "method", + "query", + "reason", + "request-headers", + "response-headers", + "status_code", + "set-cookies", + "url", + ] + + @command.command("console.edit.focus") + def edit_focus(self, part: str) -> None: + """ + Edit the query of the current focus. + """ + if part == "cookies": + self.master.switch_view("edit_focus_cookies") + elif part == "form": + self.master.switch_view("edit_focus_form") + elif part == "path": + self.master.switch_view("edit_focus_path") + elif part == "query": + self.master.switch_view("edit_focus_query") + elif part == "request-headers": + self.master.switch_view("edit_focus_request_headers") + elif part == "response-headers": + self.master.switch_view("edit_focus_response_headers") + elif part == "set-cookies": + self.master.switch_view("edit_focus_setcookies") + elif part in ["url", "method", "status_code", "reason"]: + self.master.commands.call( + "console.command flow.set @focus %s " % part + ) + + def _grideditor(self): + gewidget = self.master.window.current("grideditor") + if not gewidget: + raise exceptions.CommandError("Not in a grideditor.") + return gewidget.key_responder() + + @command.command("console.grideditor.add") + def grideditor_add(self) -> None: + """ + Add a row after the cursor. + """ + self._grideditor().cmd_add() + + @command.command("console.grideditor.insert") + def grideditor_insert(self) -> None: + """ + Insert a row before the cursor. + """ + self._grideditor().cmd_insert() + + @command.command("console.grideditor.delete") + def grideditor_delete(self) -> None: + """ + Delete row + """ + self._grideditor().cmd_delete() + + @command.command("console.grideditor.readfile") + def grideditor_readfile(self, path: str) -> None: + """ + Read a file into the currrent cell. + """ + self._grideditor().cmd_read_file(path) + + @command.command("console.grideditor.readfile_escaped") + def grideditor_readfile_escaped(self, path: str) -> None: + """ + Read a file containing a Python-style escaped stringinto the + currrent cell. + """ + self._grideditor().cmd_read_file_escaped(path) + + @command.command("console.grideditor.editor") + def grideditor_editor(self) -> None: + """ + Spawn an external editor on the current cell. + """ + self._grideditor().cmd_spawn_editor() + + @command.command("console.flowview.mode.set") + def flowview_mode_set(self) -> None: + """ + Set the display mode for the current flow view. + """ + fv = self.master.window.current("flowview") + if not fv: + raise exceptions.CommandError("Not viewing a flow.") + idx = fv.body.tab_offset + + def callback(opt): + try: + self.master.commands.call_args( + "view.setval", + ["@focus", "flowview_mode_%s" % idx, opt] + ) + except exceptions.CommandError as e: + signals.status_message.send(message=str(e)) + + opts = [i.name.lower() for i in contentviews.views] + self.master.overlay(overlay.Chooser(self.master, "Mode", opts, "", callback)) + + @command.command("console.flowview.mode") + def flowview_mode(self) -> str: + """ + Get the display mode for the current flow view. + """ + fv = self.master.window.current_window("flowview") + if not fv: + raise exceptions.CommandError("Not viewing a flow.") + idx = fv.body.tab_offset + return self.master.commands.call_args( + "view.getval", + [ + "@focus", + "flowview_mode_%s" % idx, + self.master.options.default_contentview, + ] + ) + + @command.command("console.eventlog.clear") + def eventlog_clear(self) -> None: + """ + Clear the event log. + """ + signals.sig_clear_log.send(self) + + @command.command("console.key.contexts") + def key_contexts(self) -> typing.Sequence[str]: + """ + The available contexts for key binding. + """ + return list(sorted(keymap.Contexts)) + + @command.command("console.key.bind") + def key_bind(self, contexts: typing.Sequence[str], key: str, *command: str) -> None: + """ + Bind a shortcut key. + """ + try: + self.master.keymap.add( + key, + " ".join(command), + contexts, + "" + ) + except ValueError as v: + raise exceptions.CommandError(v) + + @command.command("console.key.unbind") + def key_unbind(self, contexts: typing.Sequence[str], key: str) -> None: + """ + Un-bind a shortcut key. + """ + try: + self.master.keymap.remove(key, contexts) + except ValueError as v: + raise exceptions.CommandError(v) + + def _keyfocus(self): + kwidget = self.master.window.current("keybindings") + if not kwidget: + raise exceptions.CommandError("Not viewing key bindings.") + f = kwidget.focus() + if not f: + raise exceptions.CommandError("No key binding focused") + return f + + @command.command("console.key.unbind.focus") + def key_unbind_focus(self) -> None: + """ + Un-bind the shortcut key currently focused in the key binding viewer. + """ + b = self._keyfocus() + try: + self.master.keymap.remove(b.key, b.contexts) + except ValueError as v: + raise exceptions.CommandError(v) + + @command.command("console.key.execute.focus") + def key_execute_focus(self) -> None: + """ + Execute the currently focused key binding. + """ + b = self._keyfocus() + self.console_command(b.command) + + @command.command("console.key.edit.focus") + def key_edit_focus(self) -> None: + """ + Execute the currently focused key binding. + """ + b = self._keyfocus() + self.console_command( + "console.key.bind", + ",".join(b.contexts), + b.key, + b.command, + ) + + def running(self): + self.started = True + + def update(self, flows): + if not flows: + signals.update_settings.send(self) + for f in flows: + signals.flow_change.send(self, flow=f) diff --git a/mitmproxy/tools/console/defaultkeys.py b/mitmproxy/tools/console/defaultkeys.py index cfefd533..105be2be 100644 --- a/mitmproxy/tools/console/defaultkeys.py +++ b/mitmproxy/tools/console/defaultkeys.py @@ -1,6 +1,6 @@ def map(km): - km.add(":", "console.command ''", ["global"], "Command prompt") + km.add(":", "console.command ", ["global"], "Command prompt") km.add("?", "console.view.help", ["global"], "View help") km.add("C", "console.view.commands", ["global"], "View commands") km.add("K", "console.view.keybindings", ["global"], "View key bindings") @@ -20,7 +20,7 @@ def map(km): km.add("h", "console.nav.left", ["global"], "Left") km.add("tab", "console.nav.next", ["global"], "Next") km.add("enter", "console.nav.select", ["global"], "Select") - km.add(" ", "console.nav.pagedown", ["global"], "Page down") + km.add("space", "console.nav.pagedown", ["global"], "Page down") km.add("ctrl f", "console.nav.pagedown", ["global"], "Page down") km.add("ctrl b", "console.nav.pageup", ["global"], "Page up") @@ -102,7 +102,7 @@ def map(km): "Toggle viewing full contents on this flow", ) km.add("w", "console.command save.file @focus ", ["flowview"], "Save flow to file") - km.add(" ", "view.focus.next", ["flowview"], "Go to next flow") + km.add("space", "view.focus.next", ["flowview"], "Go to next flow") km.add( "v", @@ -128,7 +128,7 @@ def map(km): km.add("L", "console.command options.load ", ["options"], "Load from file") km.add("S", "console.command options.save ", ["options"], "Save to file") km.add("D", "options.reset", ["options"], "Reset all options") - km.add("d", "console.options.reset.current", ["options"], "Reset this option") + km.add("d", "console.options.reset.focus", ["options"], "Reset this option") km.add("a", "console.grideditor.add", ["grideditor"], "Add a row after cursor") km.add("A", "console.grideditor.insert", ["grideditor"], "Insert a row before cursor") @@ -148,3 +148,31 @@ def map(km): km.add("e", "console.grideditor.editor", ["grideditor"], "Edit in external editor") km.add("z", "console.eventlog.clear", ["eventlog"], "Clear") + + km.add( + "a", + """ + console.choose.cmd "Context" console.key.contexts + console.command console.key.bind {choice} + """, + ["keybindings"], + "Add a key binding" + ) + km.add( + "d", + "console.key.unbind.focus", + ["keybindings"], + "Unbind the currently focused key binding" + ) + km.add( + "x", + "console.key.execute.focus", + ["keybindings"], + "Execute the currently focused key binding" + ) + km.add( + "enter", + "console.key.edit.focus", + ["keybindings"], + "Edit the currently focused key binding" + ) diff --git a/mitmproxy/tools/console/keybindings.py b/mitmproxy/tools/console/keybindings.py index 6bd13429..45f5c33c 100644 --- a/mitmproxy/tools/console/keybindings.py +++ b/mitmproxy/tools/console/keybindings.py @@ -2,6 +2,7 @@ import urwid import blinker import textwrap from mitmproxy.tools.console import layoutwidget +from mitmproxy.tools.console import signals HELP_HEIGHT = 5 @@ -43,6 +44,12 @@ class KeyListWalker(urwid.ListWalker): self.focusobj = None self.bindings = list(master.keymap.list("all")) self.set_focus(0) + signals.keybindings_change.connect(self.sig_modified) + + def sig_modified(self, sender): + self.bindings = list(self.master.keymap.list("all")) + self.set_focus(min(self.index, len(self.bindings) - 1)) + self._modified() def get_edit_text(self): return self.focus_obj.get_edit_text() @@ -128,6 +135,12 @@ class KeyBindings(urwid.Pile, layoutwidget.LayoutWidget): ) self.master = master + def focus(self): + if self.focus_position != 0: + return None + f = self.widget_list[0] + return f.walker.get_focus()[0].binding + def keypress(self, size, key): if key == "m_next": self.focus_position = ( diff --git a/mitmproxy/tools/console/keymap.py b/mitmproxy/tools/console/keymap.py index 4d8c3ec2..e406905d 100644 --- a/mitmproxy/tools/console/keymap.py +++ b/mitmproxy/tools/console/keymap.py @@ -1,9 +1,9 @@ import typing -import collections from mitmproxy.tools.console import commandeditor +from mitmproxy.tools.console import signals -SupportedContexts = { +Contexts = { "chooser", "commands", "eventlog", @@ -12,59 +12,113 @@ SupportedContexts = { "global", "grideditor", "help", + "keybindings", "options", } -Binding = collections.namedtuple( - "Binding", - ["key", "command", "contexts", "help"] -) +class Binding: + def __init__(self, key, command, contexts, help): + self.key, self.command, self.contexts = key, command, sorted(contexts) + self.help = help + + def keyspec(self): + """ + Translate the key spec from a convenient user specification to one + Urwid understands. + """ + return self.key.replace("space", " ") + + def sortkey(self): + return self.key + ",".join(self.contexts) class Keymap: def __init__(self, master): self.executor = commandeditor.CommandExecutor(master) self.keys = {} + for c in Contexts: + self.keys[c] = {} self.bindings = [] - def add(self, key: str, command: str, contexts: typing.Sequence[str], help="") -> None: - """ - Add a key to the key map. If context is empty, it's considered to be - a global binding. - """ + def _check_contexts(self, contexts): if not contexts: raise ValueError("Must specify at least one context.") for c in contexts: - if c not in SupportedContexts: + if c not in Contexts: raise ValueError("Unsupported context: %s" % c) - b = Binding(key=key, command=command, contexts=contexts, help=help) - self.bindings.append(b) - self.bind(b) + def add( + self, + key: str, + command: str, + contexts: typing.Sequence[str], + help="" + ) -> None: + """ + Add a key to the key map. + """ + self._check_contexts(contexts) + + for b in self.bindings: + if b.key == key and b.command.strip() == command.strip(): + b.contexts = sorted(list(set(b.contexts + contexts))) + if help: + b.help = help + self.bind(b) + break + else: + self.remove(key, contexts) + b = Binding(key=key, command=command, contexts=contexts, help=help) + self.bindings.append(b) + self.bind(b) + signals.keybindings_change.send(self) - def bind(self, binding): + def remove(self, key: str, contexts: typing.Sequence[str]) -> None: + """ + Remove a key from the key map. + """ + self._check_contexts(contexts) + for c in contexts: + b = self.get(c, key) + if b: + self.unbind(b) + b.contexts = [x for x in b.contexts if x != c] + if b.contexts: + self.bindings.append(b) + self.bind(b) + signals.keybindings_change.send(self) + + def bind(self, binding: Binding) -> None: + for c in binding.contexts: + self.keys[c][binding.keyspec()] = binding + + def unbind(self, binding: Binding) -> None: + """ + Unbind also removes the binding from the list. + """ for c in binding.contexts: - d = self.keys.setdefault(c, {}) - d[binding.key] = binding.command + del self.keys[c][binding.keyspec()] + self.bindings = [b for b in self.bindings if b != binding] - def get(self, context: str, key: str) -> typing.Optional[str]: + def get(self, context: str, key: str) -> typing.Optional[Binding]: if context in self.keys: return self.keys[context].get(key, None) return None def list(self, context: str) -> typing.Sequence[Binding]: - b = [b for b in self.bindings if context in b.contexts or context == "all"] - b.sort(key=lambda x: x.key) - return b + b = [x for x in self.bindings if context in x.contexts or context == "all"] + single = [x for x in b if len(x.key.split()) == 1] + multi = [x for x in b if len(x.key.split()) != 1] + single.sort(key=lambda x: x.sortkey()) + multi.sort(key=lambda x: x.sortkey()) + return single + multi def handle(self, context: str, key: str) -> typing.Optional[str]: """ Returns the key if it has not been handled, or None. """ - cmd = self.get(context, key) - if not cmd: - cmd = self.get("global", key) - if cmd: - return self.executor(cmd) + b = self.get(context, key) or self.get("global", key) + if b: + return self.executor(b.command) return key diff --git a/mitmproxy/tools/console/master.py b/mitmproxy/tools/console/master.py index 315fad94..cd29dba9 100644 --- a/mitmproxy/tools/console/master.py +++ b/mitmproxy/tools/console/master.py @@ -9,443 +9,21 @@ import subprocess import sys import tempfile import traceback -import typing import urwid -from mitmproxy import ctx from mitmproxy import addons -from mitmproxy import command -from mitmproxy import exceptions from mitmproxy import master from mitmproxy import log -from mitmproxy import flow from mitmproxy.addons import intercept from mitmproxy.addons import readfile from mitmproxy.addons import view +from mitmproxy.tools.console import consoleaddons from mitmproxy.tools.console import defaultkeys from mitmproxy.tools.console import keymap -from mitmproxy.tools.console import overlay from mitmproxy.tools.console import palettes from mitmproxy.tools.console import signals from mitmproxy.tools.console import window -from mitmproxy import contentviews -from mitmproxy.utils import strutils - - -class Logger: - def log(self, evt): - signals.add_log(evt.msg, evt.level) - if evt.level == "alert": - signals.status_message.send( - message=str(evt.msg), - expire=2 - ) - - -class UnsupportedLog: - """ - A small addon to dump info on flow types we don't support yet. - """ - def websocket_message(self, f): - message = f.messages[-1] - signals.add_log(f.message_info(message), "info") - signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug") - - def websocket_end(self, f): - signals.add_log("WebSocket connection closed by {}: {} {}, {}".format( - f.close_sender, - f.close_code, - f.close_message, - f.close_reason), "info") - - def tcp_message(self, f): - message = f.messages[-1] - direction = "->" if message.from_client else "<-" - signals.add_log("{client_host}:{client_port} {direction} tcp {direction} {server_host}:{server_port}".format( - client_host=f.client_conn.address[0], - client_port=f.client_conn.address[1], - server_host=f.server_conn.address[0], - server_port=f.server_conn.address[1], - direction=direction, - ), "info") - signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug") - - -class ConsoleAddon: - """ - An addon that exposes console-specific commands, and hooks into required - events. - """ - def __init__(self, master): - self.master = master - self.started = False - - @command.command("console.layout.options") - def layout_options(self) -> typing.Sequence[str]: - """ - Returns the valid options for console layout. Use these by setting - the console_layout option. - """ - return ["single", "vertical", "horizontal"] - - @command.command("console.layout.cycle") - def layout_cycle(self) -> None: - """ - Cycle through the console layout options. - """ - opts = self.layout_options() - off = self.layout_options().index(ctx.options.console_layout) - ctx.options.update( - console_layout = opts[(off + 1) % len(opts)] - ) - - @command.command("console.panes.next") - def panes_next(self) -> None: - """ - Go to the next layout pane. - """ - self.master.window.switch() - - @command.command("console.options.reset.current") - def options_reset_current(self) -> None: - """ - Reset the current option in the options editor. - """ - fv = self.master.window.current("options") - if not fv: - raise exceptions.CommandError("Not viewing options.") - self.master.commands.call("options.reset.one %s" % fv.current_name()) - - @command.command("console.nav.start") - def nav_start(self) -> None: - """ - Go to the start of a list or scrollable. - """ - self.master.inject_key("m_start") - - @command.command("console.nav.end") - def nav_end(self) -> None: - """ - Go to the end of a list or scrollable. - """ - self.master.inject_key("m_end") - - @command.command("console.nav.next") - def nav_next(self) -> None: - """ - Go to the next navigatable item. - """ - self.master.inject_key("m_next") - - @command.command("console.nav.select") - def nav_select(self) -> None: - """ - Select a navigable item for viewing or editing. - """ - self.master.inject_key("m_select") - - @command.command("console.nav.up") - def nav_up(self) -> None: - """ - Go up. - """ - self.master.inject_key("up") - - @command.command("console.nav.down") - def nav_down(self) -> None: - """ - Go down. - """ - self.master.inject_key("down") - - @command.command("console.nav.pageup") - def nav_pageup(self) -> None: - """ - Go up. - """ - self.master.inject_key("page up") - - @command.command("console.nav.pagedown") - def nav_pagedown(self) -> None: - """ - Go down. - """ - self.master.inject_key("page down") - - @command.command("console.nav.left") - def nav_left(self) -> None: - """ - Go left. - """ - self.master.inject_key("left") - - @command.command("console.nav.right") - def nav_right(self) -> None: - """ - Go right. - """ - self.master.inject_key("right") - - @command.command("console.choose") - def console_choose( - self, prompt: str, choices: typing.Sequence[str], *cmd: str - ) -> None: - """ - Prompt the user to choose from a specified list of strings, then - invoke another command with all occurances of {choice} replaced by - the choice the user made. - """ - def callback(opt): - # We're now outside of the call context... - repl = " ".join(cmd) - repl = repl.replace("{choice}", opt) - try: - self.master.commands.call(repl) - except exceptions.CommandError as e: - signals.status_message.send(message=str(e)) - - self.master.overlay( - overlay.Chooser(self.master, prompt, choices, "", callback) - ) - - @command.command("console.choose.cmd") - def console_choose_cmd( - self, prompt: str, choicecmd: str, *cmd: str - ) -> None: - """ - Prompt the user to choose from a list of strings returned by a - command, then invoke another command with all occurances of {choice} - replaced by the choice the user made. - """ - choices = ctx.master.commands.call_args(choicecmd, []) - - def callback(opt): - # We're now outside of the call context... - repl = " ".join(cmd) - repl = repl.replace("{choice}", opt) - try: - self.master.commands.call(repl) - except exceptions.CommandError as e: - signals.status_message.send(message=str(e)) - - self.master.overlay( - overlay.Chooser(self.master, prompt, choices, "", callback) - ) - - @command.command("console.command") - def console_command(self, *partial: str) -> None: - """ - Prompt the user to edit a command with a (possilby empty) starting value. - """ - signals.status_prompt_command.send(partial=" ".join(partial)) # type: ignore - - @command.command("console.view.keybindings") - def view_keybindings(self) -> None: - """View the commands list.""" - self.master.switch_view("keybindings") - - @command.command("console.view.commands") - def view_commands(self) -> None: - """View the commands list.""" - self.master.switch_view("commands") - - @command.command("console.view.options") - def view_options(self) -> None: - """View the options editor.""" - self.master.switch_view("options") - - @command.command("console.view.eventlog") - def view_eventlog(self) -> None: - """View the options editor.""" - self.master.switch_view("eventlog") - - @command.command("console.view.help") - def view_help(self) -> None: - """View help.""" - self.master.switch_view("help") - - @command.command("console.view.flow") - def view_flow(self, flow: flow.Flow) -> None: - """View a flow.""" - if hasattr(flow, "request"): - # FIME: Also set focus? - self.master.switch_view("flowview") - - @command.command("console.exit") - def exit(self) -> None: - """Exit mitmproxy.""" - raise urwid.ExitMainLoop - - @command.command("console.view.pop") - def view_pop(self) -> None: - """ - Pop a view off the console stack. At the top level, this prompts the - user to exit mitmproxy. - """ - signals.pop_view_state.send(self) - - @command.command("console.bodyview") - def bodyview(self, f: flow.Flow, part: str) -> None: - """ - Spawn an external viewer for a flow request or response body based - on the detected MIME type. We use the mailcap system to find the - correct viewier, and fall back to the programs in $PAGER or $EDITOR - if necessary. - """ - fpart = getattr(f, part) - if not fpart: - raise exceptions.CommandError("Could not view part %s." % part) - t = fpart.headers.get("content-type") - content = fpart.get_content(strict=False) - if not content: - raise exceptions.CommandError("No content to view.") - self.master.spawn_external_viewer(content, t) - - @command.command("console.edit.focus.options") - def edit_focus_options(self) -> typing.Sequence[str]: - return [ - "cookies", - "form", - "path", - "method", - "query", - "reason", - "request-headers", - "response-headers", - "status_code", - "set-cookies", - "url", - ] - - @command.command("console.edit.focus") - def edit_focus(self, part: str) -> None: - """ - Edit the query of the current focus. - """ - if part == "cookies": - self.master.switch_view("edit_focus_cookies") - elif part == "form": - self.master.switch_view("edit_focus_form") - elif part == "path": - self.master.switch_view("edit_focus_path") - elif part == "query": - self.master.switch_view("edit_focus_query") - elif part == "request-headers": - self.master.switch_view("edit_focus_request_headers") - elif part == "response-headers": - self.master.switch_view("edit_focus_response_headers") - elif part == "set-cookies": - self.master.switch_view("edit_focus_setcookies") - elif part in ["url", "method", "status_code", "reason"]: - self.master.commands.call( - "console.command flow.set @focus %s " % part - ) - - def _grideditor(self): - gewidget = self.master.window.current("grideditor") - if not gewidget: - raise exceptions.CommandError("Not in a grideditor.") - return gewidget.key_responder() - - @command.command("console.grideditor.add") - def grideditor_add(self) -> None: - """ - Add a row after the cursor. - """ - self._grideditor().cmd_add() - - @command.command("console.grideditor.insert") - def grideditor_insert(self) -> None: - """ - Insert a row before the cursor. - """ - self._grideditor().cmd_insert() - - @command.command("console.grideditor.delete") - def grideditor_delete(self) -> None: - """ - Delete row - """ - self._grideditor().cmd_delete() - - @command.command("console.grideditor.readfile") - def grideditor_readfile(self, path: str) -> None: - """ - Read a file into the currrent cell. - """ - self._grideditor().cmd_read_file(path) - - @command.command("console.grideditor.readfile_escaped") - def grideditor_readfile_escaped(self, path: str) -> None: - """ - Read a file containing a Python-style escaped stringinto the - currrent cell. - """ - self._grideditor().cmd_read_file_escaped(path) - - @command.command("console.grideditor.editor") - def grideditor_editor(self) -> None: - """ - Spawn an external editor on the current cell. - """ - self._grideditor().cmd_spawn_editor() - - @command.command("console.flowview.mode.set") - def flowview_mode_set(self) -> None: - """ - Set the display mode for the current flow view. - """ - fv = self.master.window.current("flowview") - if not fv: - raise exceptions.CommandError("Not viewing a flow.") - idx = fv.body.tab_offset - - def callback(opt): - try: - self.master.commands.call_args( - "view.setval", - ["@focus", "flowview_mode_%s" % idx, opt] - ) - except exceptions.CommandError as e: - signals.status_message.send(message=str(e)) - - opts = [i.name.lower() for i in contentviews.views] - self.master.overlay(overlay.Chooser(self.master, "Mode", opts, "", callback)) - - @command.command("console.flowview.mode") - def flowview_mode(self) -> str: - """ - Get the display mode for the current flow view. - """ - fv = self.master.window.current_window("flowview") - if not fv: - raise exceptions.CommandError("Not viewing a flow.") - idx = fv.body.tab_offset - return self.master.commands.call_args( - "view.getval", - [ - "@focus", - "flowview_mode_%s" % idx, - self.master.options.default_contentview, - ] - ) - - @command.command("console.eventlog.clear") - def eventlog_clear(self) -> None: - """ - Clear the event log. - """ - signals.sig_clear_log.send(self) - - def running(self): - self.started = True - - def update(self, flows): - if not flows: - signals.update_settings.send(self) - for f in flows: - signals.flow_change.send(self, flow=f) class ConsoleMaster(master.Master): @@ -470,14 +48,14 @@ class ConsoleMaster(master.Master): signals.call_in.connect(self.sig_call_in) signals.sig_add_log.connect(self.sig_add_log) - self.addons.add(Logger()) + self.addons.add(consoleaddons.Logger()) self.addons.add(*addons.default_addons()) self.addons.add( intercept.Intercept(), self.view, - UnsupportedLog(), + consoleaddons.UnsupportedLog(), readfile.ReadFile(), - ConsoleAddon(self), + consoleaddons.ConsoleAddon(self), ) def sigint_handler(*args, **kwargs): diff --git a/mitmproxy/tools/console/signals.py b/mitmproxy/tools/console/signals.py index 49115a5d..5d39d96a 100644 --- a/mitmproxy/tools/console/signals.py +++ b/mitmproxy/tools/console/signals.py @@ -48,3 +48,6 @@ flowlist_change = blinker.Signal() # Pop and push view state onto a stack pop_view_state = blinker.Signal() push_view_state = blinker.Signal() + +# Fired when the key bindings change +keybindings_change = blinker.Signal() diff --git a/test/mitmproxy/tools/console/test_keymap.py b/test/mitmproxy/tools/console/test_keymap.py index bbca4ac9..00e64991 100644 --- a/test/mitmproxy/tools/console/test_keymap.py +++ b/test/mitmproxy/tools/console/test_keymap.py @@ -4,6 +4,11 @@ from unittest import mock import pytest +def test_binding(): + b = keymap.Binding("space", "cmd", ["options"], "") + assert b.keyspec() == " " + + def test_bind(): with taddons.context() as tctx: km = keymap.Keymap(tctx.master) @@ -30,3 +35,38 @@ def test_bind(): assert km.executor.called assert len((km.list("global"))) == 1 + + +def test_join(): + with taddons.context() as tctx: + km = keymap.Keymap(tctx.master) + km.add("key", "str", ["options"], "help1") + km.add("key", "str", ["commands"]) + return + assert len(km.bindings) == 1 + assert len(km.bindings[0].contexts) == 2 + assert km.bindings[0].help == "help1" + km.add("key", "str", ["commands"], "help2") + assert len(km.bindings) == 1 + assert len(km.bindings[0].contexts) == 2 + assert km.bindings[0].help == "help2" + + assert km.get("commands", "key") + km.unbind(km.bindings[0]) + assert len(km.bindings) == 0 + assert not km.get("commands", "key") + + +def test_remove(): + with taddons.context() as tctx: + km = keymap.Keymap(tctx.master) + km.add("key", "str", ["options", "commands"], "help1") + assert len(km.bindings) == 1 + assert "options" in km.bindings[0].contexts + + km.remove("key", ["options"]) + assert len(km.bindings) == 1 + assert "options" not in km.bindings[0].contexts + + km.remove("key", ["commands"]) + assert len(km.bindings) == 0 |