aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/tools
diff options
context:
space:
mode:
Diffstat (limited to 'mitmproxy/tools')
-rw-r--r--mitmproxy/tools/console/commands.py10
-rw-r--r--mitmproxy/tools/console/consoleaddons.py500
-rw-r--r--mitmproxy/tools/console/defaultkeys.py67
-rw-r--r--mitmproxy/tools/console/flowlist.py2
-rw-r--r--mitmproxy/tools/console/keybindings.py159
-rw-r--r--mitmproxy/tools/console/keymap.py108
-rw-r--r--mitmproxy/tools/console/master.py425
-rw-r--r--mitmproxy/tools/console/signals.py3
-rw-r--r--mitmproxy/tools/console/window.py2
9 files changed, 803 insertions, 473 deletions
diff --git a/mitmproxy/tools/console/commands.py b/mitmproxy/tools/console/commands.py
index e4535314..20efcee3 100644
--- a/mitmproxy/tools/console/commands.py
+++ b/mitmproxy/tools/console/commands.py
@@ -6,16 +6,6 @@ from mitmproxy.tools.console import signals
HELP_HEIGHT = 5
-
-def fcol(s, width, attr):
- s = str(s)
- return (
- "fixed",
- width,
- urwid.Text((attr, s))
- )
-
-
command_focus_change = blinker.Signal()
diff --git a/mitmproxy/tools/console/consoleaddons.py b/mitmproxy/tools/console/consoleaddons.py
new file mode 100644
index 00000000..a65f0afe
--- /dev/null
+++ b/mitmproxy/tools/console/consoleaddons.py
@@ -0,0 +1,500 @@
+import typing
+
+from mitmproxy import ctx
+from mitmproxy import command
+from mitmproxy import exceptions
+from mitmproxy import flow
+from mitmproxy import contentviews
+from mitmproxy.utils import strutils
+
+from mitmproxy.tools.console import overlay
+from mitmproxy.tools.console import signals
+from mitmproxy.tools.console import keymap
+
+
+class Logger:
+ def log(self, evt):
+ signals.add_log(evt.msg, evt.level)
+ if evt.level == "alert":
+ signals.status_message.send(
+ message=str(evt.msg),
+ expire=2
+ )
+
+
+class UnsupportedLog:
+ """
+ A small addon to dump info on flow types we don't support yet.
+ """
+ def websocket_message(self, f):
+ message = f.messages[-1]
+ signals.add_log(f.message_info(message), "info")
+ signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug")
+
+ def websocket_end(self, f):
+ signals.add_log("WebSocket connection closed by {}: {} {}, {}".format(
+ f.close_sender,
+ f.close_code,
+ f.close_message,
+ f.close_reason), "info")
+
+ def tcp_message(self, f):
+ message = f.messages[-1]
+ direction = "->" if message.from_client else "<-"
+ signals.add_log("{client_host}:{client_port} {direction} tcp {direction} {server_host}:{server_port}".format(
+ client_host=f.client_conn.address[0],
+ client_port=f.client_conn.address[1],
+ server_host=f.server_conn.address[0],
+ server_port=f.server_conn.address[1],
+ direction=direction,
+ ), "info")
+ signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug")
+
+
+class ConsoleAddon:
+ """
+ An addon that exposes console-specific commands, and hooks into required
+ events.
+ """
+ def __init__(self, master):
+ self.master = master
+ self.started = False
+
+ @command.command("console.layout.options")
+ def layout_options(self) -> typing.Sequence[str]:
+ """
+ Returns the valid options for console layout. Use these by setting
+ the console_layout option.
+ """
+ return ["single", "vertical", "horizontal"]
+
+ @command.command("console.layout.cycle")
+ def layout_cycle(self) -> None:
+ """
+ Cycle through the console layout options.
+ """
+ opts = self.layout_options()
+ off = self.layout_options().index(ctx.options.console_layout)
+ ctx.options.update(
+ console_layout = opts[(off + 1) % len(opts)]
+ )
+
+ @command.command("console.panes.next")
+ def panes_next(self) -> None:
+ """
+ Go to the next layout pane.
+ """
+ self.master.window.switch()
+
+ @command.command("console.options.reset.focus")
+ def options_reset_current(self) -> None:
+ """
+ Reset the current option in the options editor.
+ """
+ fv = self.master.window.current("options")
+ if not fv:
+ raise exceptions.CommandError("Not viewing options.")
+ self.master.commands.call("options.reset.one %s" % fv.current_name())
+
+ @command.command("console.nav.start")
+ def nav_start(self) -> None:
+ """
+ Go to the start of a list or scrollable.
+ """
+ self.master.inject_key("m_start")
+
+ @command.command("console.nav.end")
+ def nav_end(self) -> None:
+ """
+ Go to the end of a list or scrollable.
+ """
+ self.master.inject_key("m_end")
+
+ @command.command("console.nav.next")
+ def nav_next(self) -> None:
+ """
+ Go to the next navigatable item.
+ """
+ self.master.inject_key("m_next")
+
+ @command.command("console.nav.select")
+ def nav_select(self) -> None:
+ """
+ Select a navigable item for viewing or editing.
+ """
+ self.master.inject_key("m_select")
+
+ @command.command("console.nav.up")
+ def nav_up(self) -> None:
+ """
+ Go up.
+ """
+ self.master.inject_key("up")
+
+ @command.command("console.nav.down")
+ def nav_down(self) -> None:
+ """
+ Go down.
+ """
+ self.master.inject_key("down")
+
+ @command.command("console.nav.pageup")
+ def nav_pageup(self) -> None:
+ """
+ Go up.
+ """
+ self.master.inject_key("page up")
+
+ @command.command("console.nav.pagedown")
+ def nav_pagedown(self) -> None:
+ """
+ Go down.
+ """
+ self.master.inject_key("page down")
+
+ @command.command("console.nav.left")
+ def nav_left(self) -> None:
+ """
+ Go left.
+ """
+ self.master.inject_key("left")
+
+ @command.command("console.nav.right")
+ def nav_right(self) -> None:
+ """
+ Go right.
+ """
+ self.master.inject_key("right")
+
+ @command.command("console.choose")
+ def console_choose(
+ self, prompt: str, choices: typing.Sequence[str], *cmd: str
+ ) -> None:
+ """
+ Prompt the user to choose from a specified list of strings, then
+ invoke another command with all occurances of {choice} replaced by
+ the choice the user made.
+ """
+ def callback(opt):
+ # We're now outside of the call context...
+ repl = " ".join(cmd)
+ repl = repl.replace("{choice}", opt)
+ try:
+ self.master.commands.call(repl)
+ except exceptions.CommandError as e:
+ signals.status_message.send(message=str(e))
+
+ self.master.overlay(
+ overlay.Chooser(self.master, prompt, choices, "", callback)
+ )
+
+ @command.command("console.choose.cmd")
+ def console_choose_cmd(
+ self, prompt: str, choicecmd: str, *cmd: str
+ ) -> None:
+ """
+ Prompt the user to choose from a list of strings returned by a
+ command, then invoke another command with all occurances of {choice}
+ replaced by the choice the user made.
+ """
+ choices = ctx.master.commands.call_args(choicecmd, [])
+
+ def callback(opt):
+ # We're now outside of the call context...
+ repl = " ".join(cmd)
+ repl = repl.replace("{choice}", opt)
+ try:
+ self.master.commands.call(repl)
+ except exceptions.CommandError as e:
+ signals.status_message.send(message=str(e))
+
+ self.master.overlay(
+ overlay.Chooser(self.master, prompt, choices, "", callback)
+ )
+
+ @command.command("console.command")
+ def console_command(self, *partial: str) -> None:
+ """
+ Prompt the user to edit a command with a (possilby empty) starting value.
+ """
+ signals.status_prompt_command.send(partial=" ".join(partial)) # type: ignore
+
+ @command.command("console.view.keybindings")
+ def view_keybindings(self) -> None:
+ """View the commands list."""
+ self.master.switch_view("keybindings")
+
+ @command.command("console.view.commands")
+ def view_commands(self) -> None:
+ """View the commands list."""
+ self.master.switch_view("commands")
+
+ @command.command("console.view.options")
+ def view_options(self) -> None:
+ """View the options editor."""
+ self.master.switch_view("options")
+
+ @command.command("console.view.eventlog")
+ def view_eventlog(self) -> None:
+ """View the options editor."""
+ self.master.switch_view("eventlog")
+
+ @command.command("console.view.help")
+ def view_help(self) -> None:
+ """View help."""
+ self.master.switch_view("help")
+
+ @command.command("console.view.flow")
+ def view_flow(self, flow: flow.Flow) -> None:
+ """View a flow."""
+ if hasattr(flow, "request"):
+ # FIME: Also set focus?
+ self.master.switch_view("flowview")
+
+ @command.command("console.exit")
+ def exit(self) -> None:
+ """Exit mitmproxy."""
+ self.master.shutdown()
+
+ @command.command("console.view.pop")
+ def view_pop(self) -> None:
+ """
+ Pop a view off the console stack. At the top level, this prompts the
+ user to exit mitmproxy.
+ """
+ signals.pop_view_state.send(self)
+
+ @command.command("console.bodyview")
+ def bodyview(self, f: flow.Flow, part: str) -> None:
+ """
+ Spawn an external viewer for a flow request or response body based
+ on the detected MIME type. We use the mailcap system to find the
+ correct viewier, and fall back to the programs in $PAGER or $EDITOR
+ if necessary.
+ """
+ fpart = getattr(f, part)
+ if not fpart:
+ raise exceptions.CommandError("Could not view part %s." % part)
+ t = fpart.headers.get("content-type")
+ content = fpart.get_content(strict=False)
+ if not content:
+ raise exceptions.CommandError("No content to view.")
+ self.master.spawn_external_viewer(content, t)
+
+ @command.command("console.edit.focus.options")
+ def edit_focus_options(self) -> typing.Sequence[str]:
+ return [
+ "cookies",
+ "form",
+ "path",
+ "method",
+ "query",
+ "reason",
+ "request-headers",
+ "response-headers",
+ "status_code",
+ "set-cookies",
+ "url",
+ ]
+
+ @command.command("console.edit.focus")
+ def edit_focus(self, part: str) -> None:
+ """
+ Edit the query of the current focus.
+ """
+ if part == "cookies":
+ self.master.switch_view("edit_focus_cookies")
+ elif part == "form":
+ self.master.switch_view("edit_focus_form")
+ elif part == "path":
+ self.master.switch_view("edit_focus_path")
+ elif part == "query":
+ self.master.switch_view("edit_focus_query")
+ elif part == "request-headers":
+ self.master.switch_view("edit_focus_request_headers")
+ elif part == "response-headers":
+ self.master.switch_view("edit_focus_response_headers")
+ elif part == "set-cookies":
+ self.master.switch_view("edit_focus_setcookies")
+ elif part in ["url", "method", "status_code", "reason"]:
+ self.master.commands.call(
+ "console.command flow.set @focus %s " % part
+ )
+
+ def _grideditor(self):
+ gewidget = self.master.window.current("grideditor")
+ if not gewidget:
+ raise exceptions.CommandError("Not in a grideditor.")
+ return gewidget.key_responder()
+
+ @command.command("console.grideditor.add")
+ def grideditor_add(self) -> None:
+ """
+ Add a row after the cursor.
+ """
+ self._grideditor().cmd_add()
+
+ @command.command("console.grideditor.insert")
+ def grideditor_insert(self) -> None:
+ """
+ Insert a row before the cursor.
+ """
+ self._grideditor().cmd_insert()
+
+ @command.command("console.grideditor.delete")
+ def grideditor_delete(self) -> None:
+ """
+ Delete row
+ """
+ self._grideditor().cmd_delete()
+
+ @command.command("console.grideditor.readfile")
+ def grideditor_readfile(self, path: str) -> None:
+ """
+ Read a file into the currrent cell.
+ """
+ self._grideditor().cmd_read_file(path)
+
+ @command.command("console.grideditor.readfile_escaped")
+ def grideditor_readfile_escaped(self, path: str) -> None:
+ """
+ Read a file containing a Python-style escaped stringinto the
+ currrent cell.
+ """
+ self._grideditor().cmd_read_file_escaped(path)
+
+ @command.command("console.grideditor.editor")
+ def grideditor_editor(self) -> None:
+ """
+ Spawn an external editor on the current cell.
+ """
+ self._grideditor().cmd_spawn_editor()
+
+ @command.command("console.flowview.mode.set")
+ def flowview_mode_set(self) -> None:
+ """
+ Set the display mode for the current flow view.
+ """
+ fv = self.master.window.current("flowview")
+ if not fv:
+ raise exceptions.CommandError("Not viewing a flow.")
+ idx = fv.body.tab_offset
+
+ def callback(opt):
+ try:
+ self.master.commands.call_args(
+ "view.setval",
+ ["@focus", "flowview_mode_%s" % idx, opt]
+ )
+ except exceptions.CommandError as e:
+ signals.status_message.send(message=str(e))
+
+ opts = [i.name.lower() for i in contentviews.views]
+ self.master.overlay(overlay.Chooser(self.master, "Mode", opts, "", callback))
+
+ @command.command("console.flowview.mode")
+ def flowview_mode(self) -> str:
+ """
+ Get the display mode for the current flow view.
+ """
+ fv = self.master.window.current_window("flowview")
+ if not fv:
+ raise exceptions.CommandError("Not viewing a flow.")
+ idx = fv.body.tab_offset
+ return self.master.commands.call_args(
+ "view.getval",
+ [
+ "@focus",
+ "flowview_mode_%s" % idx,
+ self.master.options.default_contentview,
+ ]
+ )
+
+ @command.command("console.eventlog.clear")
+ def eventlog_clear(self) -> None:
+ """
+ Clear the event log.
+ """
+ signals.sig_clear_log.send(self)
+
+ @command.command("console.key.contexts")
+ def key_contexts(self) -> typing.Sequence[str]:
+ """
+ The available contexts for key binding.
+ """
+ return list(sorted(keymap.Contexts))
+
+ @command.command("console.key.bind")
+ def key_bind(self, contexts: typing.Sequence[str], key: str, *command: str) -> None:
+ """
+ Bind a shortcut key.
+ """
+ try:
+ self.master.keymap.add(
+ key,
+ " ".join(command),
+ contexts,
+ ""
+ )
+ except ValueError as v:
+ raise exceptions.CommandError(v)
+
+ @command.command("console.key.unbind")
+ def key_unbind(self, contexts: typing.Sequence[str], key: str) -> None:
+ """
+ Un-bind a shortcut key.
+ """
+ try:
+ self.master.keymap.remove(key, contexts)
+ except ValueError as v:
+ raise exceptions.CommandError(v)
+
+ def _keyfocus(self):
+ kwidget = self.master.window.current("keybindings")
+ if not kwidget:
+ raise exceptions.CommandError("Not viewing key bindings.")
+ f = kwidget.focus()
+ if not f:
+ raise exceptions.CommandError("No key binding focused")
+ return f
+
+ @command.command("console.key.unbind.focus")
+ def key_unbind_focus(self) -> None:
+ """
+ Un-bind the shortcut key currently focused in the key binding viewer.
+ """
+ b = self._keyfocus()
+ try:
+ self.master.keymap.remove(b.key, b.contexts)
+ except ValueError as v:
+ raise exceptions.CommandError(v)
+
+ @command.command("console.key.execute.focus")
+ def key_execute_focus(self) -> None:
+ """
+ Execute the currently focused key binding.
+ """
+ b = self._keyfocus()
+ self.console_command(b.command)
+
+ @command.command("console.key.edit.focus")
+ def key_edit_focus(self) -> None:
+ """
+ Execute the currently focused key binding.
+ """
+ b = self._keyfocus()
+ self.console_command(
+ "console.key.bind",
+ ",".join(b.contexts),
+ b.key,
+ b.command,
+ )
+
+ def running(self):
+ self.started = True
+
+ def update(self, flows):
+ if not flows:
+ signals.update_settings.send(self)
+ for f in flows:
+ signals.flow_change.send(self, flow=f)
diff --git a/mitmproxy/tools/console/defaultkeys.py b/mitmproxy/tools/console/defaultkeys.py
index d5b868d0..105be2be 100644
--- a/mitmproxy/tools/console/defaultkeys.py
+++ b/mitmproxy/tools/console/defaultkeys.py
@@ -1,8 +1,9 @@
def map(km):
- km.add(":", "console.command ''", ["global"], "Command prompt")
+ km.add(":", "console.command ", ["global"], "Command prompt")
km.add("?", "console.view.help", ["global"], "View help")
km.add("C", "console.view.commands", ["global"], "View commands")
+ km.add("K", "console.view.keybindings", ["global"], "View key bindings")
km.add("O", "console.view.options", ["global"], "View options")
km.add("E", "console.view.eventlog", ["global"], "View event log")
km.add("Q", "console.exit", ["global"], "Exit immediately")
@@ -19,7 +20,7 @@ def map(km):
km.add("h", "console.nav.left", ["global"], "Left")
km.add("tab", "console.nav.next", ["global"], "Next")
km.add("enter", "console.nav.select", ["global"], "Select")
- km.add(" ", "console.nav.pagedown", ["global"], "Page down")
+ km.add("space", "console.nav.pagedown", ["global"], "Page down")
km.add("ctrl f", "console.nav.pagedown", ["global"], "Page down")
km.add("ctrl b", "console.nav.pageup", ["global"], "Page up")
@@ -36,8 +37,10 @@ def map(km):
km.add("D", "view.duplicate @focus", ["flowlist", "flowview"], "Duplicate flow")
km.add(
"e",
- "console.choose.cmd Format export.formats "
- "console.command export.file {choice} @focus ''",
+ """
+ console.choose.cmd Format export.formats
+ console.command export.file {choice} @focus ''
+ """,
["flowlist", "flowview"],
"Export this flow to file"
)
@@ -60,8 +63,10 @@ def map(km):
)
km.add(
"o",
- "console.choose.cmd Order view.order.options "
- "set console_order={choice}",
+ """
+ console.choose.cmd Order view.order.options
+ set console_order={choice}
+ """,
["flowlist"],
"Set flow list order"
)
@@ -83,8 +88,10 @@ def map(km):
km.add(
"e",
- "console.choose.cmd Part console.edit.focus.options "
- "console.edit.focus {choice}",
+ """
+ console.choose.cmd Part console.edit.focus.options
+ console.edit.focus {choice}
+ """,
["flowview"],
"Edit a flow component"
)
@@ -95,12 +102,14 @@ def map(km):
"Toggle viewing full contents on this flow",
)
km.add("w", "console.command save.file @focus ", ["flowview"], "Save flow to file")
- km.add(" ", "view.focus.next", ["flowview"], "Go to next flow")
+ km.add("space", "view.focus.next", ["flowview"], "Go to next flow")
km.add(
"v",
- "console.choose \"View Part\" request,response "
- "console.bodyview @focus {choice}",
+ """
+ console.choose "View Part" request,response
+ console.bodyview @focus {choice}
+ """,
["flowview"],
"View flow body in an external viewer"
)
@@ -108,8 +117,10 @@ def map(km):
km.add("m", "console.flowview.mode.set", ["flowview"], "Set flow view mode")
km.add(
"z",
- "console.choose \"Part\" request,response "
- "flow.encode.toggle @focus {choice}",
+ """
+ console.choose "Part" request,response
+ flow.encode.toggle @focus {choice}
+ """,
["flowview"],
"Encode/decode flow body"
)
@@ -117,7 +128,7 @@ def map(km):
km.add("L", "console.command options.load ", ["options"], "Load from file")
km.add("S", "console.command options.save ", ["options"], "Save to file")
km.add("D", "options.reset", ["options"], "Reset all options")
- km.add("d", "console.options.reset.current", ["options"], "Reset this option")
+ km.add("d", "console.options.reset.focus", ["options"], "Reset this option")
km.add("a", "console.grideditor.add", ["grideditor"], "Add a row after cursor")
km.add("A", "console.grideditor.insert", ["grideditor"], "Insert a row before cursor")
@@ -137,3 +148,31 @@ def map(km):
km.add("e", "console.grideditor.editor", ["grideditor"], "Edit in external editor")
km.add("z", "console.eventlog.clear", ["eventlog"], "Clear")
+
+ km.add(
+ "a",
+ """
+ console.choose.cmd "Context" console.key.contexts
+ console.command console.key.bind {choice}
+ """,
+ ["keybindings"],
+ "Add a key binding"
+ )
+ km.add(
+ "d",
+ "console.key.unbind.focus",
+ ["keybindings"],
+ "Unbind the currently focused key binding"
+ )
+ km.add(
+ "x",
+ "console.key.execute.focus",
+ ["keybindings"],
+ "Execute the currently focused key binding"
+ )
+ km.add(
+ "enter",
+ "console.key.edit.focus",
+ ["keybindings"],
+ "Edit the currently focused key binding"
+ )
diff --git a/mitmproxy/tools/console/flowlist.py b/mitmproxy/tools/console/flowlist.py
index f00ed9fa..852c5163 100644
--- a/mitmproxy/tools/console/flowlist.py
+++ b/mitmproxy/tools/console/flowlist.py
@@ -30,7 +30,7 @@ class FlowItem(urwid.WidgetWrap):
self.master.commands.call("console.view.flow @focus")
return True
- def keypress(self, xxx_todo_changeme, key):
+ def keypress(self, size, key):
return key
diff --git a/mitmproxy/tools/console/keybindings.py b/mitmproxy/tools/console/keybindings.py
new file mode 100644
index 00000000..45f5c33c
--- /dev/null
+++ b/mitmproxy/tools/console/keybindings.py
@@ -0,0 +1,159 @@
+import urwid
+import blinker
+import textwrap
+from mitmproxy.tools.console import layoutwidget
+from mitmproxy.tools.console import signals
+
+HELP_HEIGHT = 5
+
+
+keybinding_focus_change = blinker.Signal()
+
+
+class KeyItem(urwid.WidgetWrap):
+ def __init__(self, walker, binding, focused):
+ self.walker, self.binding, self.focused = walker, binding, focused
+ super().__init__(None)
+ self._w = self.get_widget()
+
+ def get_widget(self):
+ cmd = textwrap.dedent(self.binding.command).strip()
+ parts = [
+ (4, urwid.Text([("focus", ">> " if self.focused else " ")])),
+ (10, urwid.Text([("title", self.binding.key)])),
+ (12, urwid.Text([("highlight", "\n".join(self.binding.contexts))])),
+ urwid.Text([("text", cmd)]),
+ ]
+ return urwid.Columns(parts)
+
+ def get_edit_text(self):
+ return self._w[1].get_edit_text()
+
+ def selectable(self):
+ return True
+
+ def keypress(self, size, key):
+ return key
+
+
+class KeyListWalker(urwid.ListWalker):
+ def __init__(self, master):
+ self.master = master
+
+ self.index = 0
+ self.focusobj = None
+ self.bindings = list(master.keymap.list("all"))
+ self.set_focus(0)
+ signals.keybindings_change.connect(self.sig_modified)
+
+ def sig_modified(self, sender):
+ self.bindings = list(self.master.keymap.list("all"))
+ self.set_focus(min(self.index, len(self.bindings) - 1))
+ self._modified()
+
+ def get_edit_text(self):
+ return self.focus_obj.get_edit_text()
+
+ def _get(self, pos):
+ binding = self.bindings[pos]
+ return KeyItem(self, binding, pos == self.index)
+
+ def get_focus(self):
+ return self.focus_obj, self.index
+
+ def set_focus(self, index):
+ binding = self.bindings[index]
+ self.index = index
+ self.focus_obj = self._get(self.index)
+ keybinding_focus_change.send(binding.help or "")
+
+ def get_next(self, pos):
+ if pos >= len(self.bindings) - 1:
+ return None, None
+ pos = pos + 1
+ return self._get(pos), pos
+
+ def get_prev(self, pos):
+ pos = pos - 1
+ if pos < 0:
+ return None, None
+ return self._get(pos), pos
+
+
+class KeyList(urwid.ListBox):
+ def __init__(self, master):
+ self.master = master
+ self.walker = KeyListWalker(master)
+ super().__init__(self.walker)
+
+ def keypress(self, size, key):
+ if key == "m_select":
+ foc, idx = self.get_focus()
+ # Act here
+ elif key == "m_start":
+ self.set_focus(0)
+ self.walker._modified()
+ elif key == "m_end":
+ self.set_focus(len(self.walker.bindings) - 1)
+ self.walker._modified()
+ return super().keypress(size, key)
+
+
+class KeyHelp(urwid.Frame):
+ def __init__(self, master):
+ self.master = master
+ super().__init__(self.widget(""))
+ self.set_active(False)
+ keybinding_focus_change.connect(self.sig_mod)
+
+ def set_active(self, val):
+ h = urwid.Text("Key Binding Help")
+ style = "heading" if val else "heading_inactive"
+ self.header = urwid.AttrWrap(h, style)
+
+ def widget(self, txt):
+ cols, _ = self.master.ui.get_cols_rows()
+ return urwid.ListBox(
+ [urwid.Text(i) for i in textwrap.wrap(txt, cols)]
+ )
+
+ def sig_mod(self, txt):
+ self.set_body(self.widget(txt))
+
+
+class KeyBindings(urwid.Pile, layoutwidget.LayoutWidget):
+ title = "Key Bindings"
+ keyctx = "keybindings"
+
+ def __init__(self, master):
+ oh = KeyHelp(master)
+ super().__init__(
+ [
+ KeyList(master),
+ (HELP_HEIGHT, oh),
+ ]
+ )
+ self.master = master
+
+ def focus(self):
+ if self.focus_position != 0:
+ return None
+ f = self.widget_list[0]
+ return f.walker.get_focus()[0].binding
+
+ def keypress(self, size, key):
+ if key == "m_next":
+ self.focus_position = (
+ self.focus_position + 1
+ ) % len(self.widget_list)
+ self.widget_list[1].set_active(self.focus_position == 1)
+ key = None
+
+ # This is essentially a copypasta from urwid.Pile's keypress handler.
+ # So much for "closed for modification, but open for extension".
+ item_rows = None
+ if len(size) == 2:
+ item_rows = self.get_item_rows(size, focus = True)
+ i = self.widget_list.index(self.focus_item)
+ tsize = self.get_item_size(size, i, True, item_rows)
+ return self.focus_item.keypress(tsize, key)
diff --git a/mitmproxy/tools/console/keymap.py b/mitmproxy/tools/console/keymap.py
index b904f706..e406905d 100644
--- a/mitmproxy/tools/console/keymap.py
+++ b/mitmproxy/tools/console/keymap.py
@@ -1,9 +1,9 @@
import typing
-import collections
from mitmproxy.tools.console import commandeditor
+from mitmproxy.tools.console import signals
-SupportedContexts = {
+Contexts = {
"chooser",
"commands",
"eventlog",
@@ -12,59 +12,113 @@ SupportedContexts = {
"global",
"grideditor",
"help",
+ "keybindings",
"options",
}
-Binding = collections.namedtuple(
- "Binding",
- ["key", "command", "contexts", "help"]
-)
+class Binding:
+ def __init__(self, key, command, contexts, help):
+ self.key, self.command, self.contexts = key, command, sorted(contexts)
+ self.help = help
+
+ def keyspec(self):
+ """
+ Translate the key spec from a convenient user specification to one
+ Urwid understands.
+ """
+ return self.key.replace("space", " ")
+
+ def sortkey(self):
+ return self.key + ",".join(self.contexts)
class Keymap:
def __init__(self, master):
self.executor = commandeditor.CommandExecutor(master)
self.keys = {}
+ for c in Contexts:
+ self.keys[c] = {}
self.bindings = []
- def add(self, key: str, command: str, contexts: typing.Sequence[str], help="") -> None:
- """
- Add a key to the key map. If context is empty, it's considered to be
- a global binding.
- """
+ def _check_contexts(self, contexts):
if not contexts:
raise ValueError("Must specify at least one context.")
for c in contexts:
- if c not in SupportedContexts:
+ if c not in Contexts:
raise ValueError("Unsupported context: %s" % c)
- b = Binding(key=key, command=command, contexts=contexts, help=help)
- self.bindings.append(b)
- self.bind(b)
+ def add(
+ self,
+ key: str,
+ command: str,
+ contexts: typing.Sequence[str],
+ help=""
+ ) -> None:
+ """
+ Add a key to the key map.
+ """
+ self._check_contexts(contexts)
+
+ for b in self.bindings:
+ if b.key == key and b.command.strip() == command.strip():
+ b.contexts = sorted(list(set(b.contexts + contexts)))
+ if help:
+ b.help = help
+ self.bind(b)
+ break
+ else:
+ self.remove(key, contexts)
+ b = Binding(key=key, command=command, contexts=contexts, help=help)
+ self.bindings.append(b)
+ self.bind(b)
+ signals.keybindings_change.send(self)
- def bind(self, binding):
+ def remove(self, key: str, contexts: typing.Sequence[str]) -> None:
+ """
+ Remove a key from the key map.
+ """
+ self._check_contexts(contexts)
+ for c in contexts:
+ b = self.get(c, key)
+ if b:
+ self.unbind(b)
+ b.contexts = [x for x in b.contexts if x != c]
+ if b.contexts:
+ self.bindings.append(b)
+ self.bind(b)
+ signals.keybindings_change.send(self)
+
+ def bind(self, binding: Binding) -> None:
+ for c in binding.contexts:
+ self.keys[c][binding.keyspec()] = binding
+
+ def unbind(self, binding: Binding) -> None:
+ """
+ Unbind also removes the binding from the list.
+ """
for c in binding.contexts:
- d = self.keys.setdefault(c, {})
- d[binding.key] = binding.command
+ del self.keys[c][binding.keyspec()]
+ self.bindings = [b for b in self.bindings if b != binding]
- def get(self, context: str, key: str) -> typing.Optional[str]:
+ def get(self, context: str, key: str) -> typing.Optional[Binding]:
if context in self.keys:
return self.keys[context].get(key, None)
return None
def list(self, context: str) -> typing.Sequence[Binding]:
- b = [b for b in self.bindings if context in b.contexts]
- b.sort(key=lambda x: x.key)
- return b
+ b = [x for x in self.bindings if context in x.contexts or context == "all"]
+ single = [x for x in b if len(x.key.split()) == 1]
+ multi = [x for x in b if len(x.key.split()) != 1]
+ single.sort(key=lambda x: x.sortkey())
+ multi.sort(key=lambda x: x.sortkey())
+ return single + multi
def handle(self, context: str, key: str) -> typing.Optional[str]:
"""
Returns the key if it has not been handled, or None.
"""
- cmd = self.get(context, key)
- if not cmd:
- cmd = self.get("global", key)
- if cmd:
- return self.executor(cmd)
+ b = self.get(context, key) or self.get("global", key)
+ if b:
+ return self.executor(b.command)
return key
diff --git a/mitmproxy/tools/console/master.py b/mitmproxy/tools/console/master.py
index ce4e4d9d..cd29dba9 100644
--- a/mitmproxy/tools/console/master.py
+++ b/mitmproxy/tools/console/master.py
@@ -9,438 +9,21 @@ import subprocess
import sys
import tempfile
import traceback
-import typing
import urwid
-from mitmproxy import ctx
from mitmproxy import addons
-from mitmproxy import command
-from mitmproxy import exceptions
from mitmproxy import master
from mitmproxy import log
-from mitmproxy import flow
from mitmproxy.addons import intercept
from mitmproxy.addons import readfile
from mitmproxy.addons import view
+from mitmproxy.tools.console import consoleaddons
from mitmproxy.tools.console import defaultkeys
from mitmproxy.tools.console import keymap
-from mitmproxy.tools.console import overlay
from mitmproxy.tools.console import palettes
from mitmproxy.tools.console import signals
from mitmproxy.tools.console import window
-from mitmproxy import contentviews
-from mitmproxy.utils import strutils
-
-
-class Logger:
- def log(self, evt):
- signals.add_log(evt.msg, evt.level)
- if evt.level == "alert":
- signals.status_message.send(
- message=str(evt.msg),
- expire=2
- )
-
-
-class UnsupportedLog:
- """
- A small addon to dump info on flow types we don't support yet.
- """
- def websocket_message(self, f):
- message = f.messages[-1]
- signals.add_log(f.message_info(message), "info")
- signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug")
-
- def websocket_end(self, f):
- signals.add_log("WebSocket connection closed by {}: {} {}, {}".format(
- f.close_sender,
- f.close_code,
- f.close_message,
- f.close_reason), "info")
-
- def tcp_message(self, f):
- message = f.messages[-1]
- direction = "->" if message.from_client else "<-"
- signals.add_log("{client_host}:{client_port} {direction} tcp {direction} {server_host}:{server_port}".format(
- client_host=f.client_conn.address[0],
- client_port=f.client_conn.address[1],
- server_host=f.server_conn.address[0],
- server_port=f.server_conn.address[1],
- direction=direction,
- ), "info")
- signals.add_log(strutils.bytes_to_escaped_str(message.content), "debug")
-
-
-class ConsoleAddon:
- """
- An addon that exposes console-specific commands, and hooks into required
- events.
- """
- def __init__(self, master):
- self.master = master
- self.started = False
-
- @command.command("console.layout.options")
- def layout_options(self) -> typing.Sequence[str]:
- """
- Returns the valid options for console layout. Use these by setting
- the console_layout option.
- """
- return ["single", "vertical", "horizontal"]
-
- @command.command("console.layout.cycle")
- def layout_cycle(self) -> None:
- """
- Cycle through the console layout options.
- """
- opts = self.layout_options()
- off = self.layout_options().index(ctx.options.console_layout)
- ctx.options.update(
- console_layout = opts[(off + 1) % len(opts)]
- )
-
- @command.command("console.panes.next")
- def panes_next(self) -> None:
- """
- Go to the next layout pane.
- """
- self.master.window.switch()
-
- @command.command("console.options.reset.current")
- def options_reset_current(self) -> None:
- """
- Reset the current option in the options editor.
- """
- fv = self.master.window.current("options")
- if not fv:
- raise exceptions.CommandError("Not viewing options.")
- self.master.commands.call("options.reset.one %s" % fv.current_name())
-
- @command.command("console.nav.start")
- def nav_start(self) -> None:
- """
- Go to the start of a list or scrollable.
- """
- self.master.inject_key("m_start")
-
- @command.command("console.nav.end")
- def nav_end(self) -> None:
- """
- Go to the end of a list or scrollable.
- """
- self.master.inject_key("m_end")
-
- @command.command("console.nav.next")
- def nav_next(self) -> None:
- """
- Go to the next navigatable item.
- """
- self.master.inject_key("m_next")
-
- @command.command("console.nav.select")
- def nav_select(self) -> None:
- """
- Select a navigable item for viewing or editing.
- """
- self.master.inject_key("m_select")
-
- @command.command("console.nav.up")
- def nav_up(self) -> None:
- """
- Go up.
- """
- self.master.inject_key("up")
-
- @command.command("console.nav.down")
- def nav_down(self) -> None:
- """
- Go down.
- """
- self.master.inject_key("down")
-
- @command.command("console.nav.pageup")
- def nav_pageup(self) -> None:
- """
- Go up.
- """
- self.master.inject_key("page up")
-
- @command.command("console.nav.pagedown")
- def nav_pagedown(self) -> None:
- """
- Go down.
- """
- self.master.inject_key("page down")
-
- @command.command("console.nav.left")
- def nav_left(self) -> None:
- """
- Go left.
- """
- self.master.inject_key("left")
-
- @command.command("console.nav.right")
- def nav_right(self) -> None:
- """
- Go right.
- """
- self.master.inject_key("right")
-
- @command.command("console.choose")
- def console_choose(
- self, prompt: str, choices: typing.Sequence[str], *cmd: typing.Sequence[str]
- ) -> None:
- """
- Prompt the user to choose from a specified list of strings, then
- invoke another command with all occurances of {choice} replaced by
- the choice the user made.
- """
- def callback(opt):
- # We're now outside of the call context...
- repl = " ".join(cmd)
- repl = repl.replace("{choice}", opt)
- try:
- self.master.commands.call(repl)
- except exceptions.CommandError as e:
- signals.status_message.send(message=str(e))
-
- self.master.overlay(
- overlay.Chooser(self.master, prompt, choices, "", callback)
- )
-
- @command.command("console.choose.cmd")
- def console_choose_cmd(
- self, prompt: str, choicecmd: str, *cmd: typing.Sequence[str]
- ) -> None:
- """
- Prompt the user to choose from a list of strings returned by a
- command, then invoke another command with all occurances of {choice}
- replaced by the choice the user made.
- """
- choices = ctx.master.commands.call_args(choicecmd, [])
-
- def callback(opt):
- # We're now outside of the call context...
- repl = " ".join(cmd)
- repl = repl.replace("{choice}", opt)
- try:
- self.master.commands.call(repl)
- except exceptions.CommandError as e:
- signals.status_message.send(message=str(e))
-
- self.master.overlay(
- overlay.Chooser(self.master, prompt, choices, "", callback)
- )
-
- @command.command("console.command")
- def console_command(self, *partial: typing.Sequence[str]) -> None:
- """
- Prompt the user to edit a command with a (possilby empty) starting value.
- """
- signals.status_prompt_command.send(partial=" ".join(partial) + " ") # type: ignore
-
- @command.command("console.view.commands")
- def view_commands(self) -> None:
- """View the commands list."""
- self.master.switch_view("commands")
-
- @command.command("console.view.options")
- def view_options(self) -> None:
- """View the options editor."""
- self.master.switch_view("options")
-
- @command.command("console.view.eventlog")
- def view_eventlog(self) -> None:
- """View the options editor."""
- self.master.switch_view("eventlog")
-
- @command.command("console.view.help")
- def view_help(self) -> None:
- """View help."""
- self.master.switch_view("help")
-
- @command.command("console.view.flow")
- def view_flow(self, flow: flow.Flow) -> None:
- """View a flow."""
- if hasattr(flow, "request"):
- # FIME: Also set focus?
- self.master.switch_view("flowview")
-
- @command.command("console.exit")
- def exit(self) -> None:
- """Exit mitmproxy."""
- raise urwid.ExitMainLoop
-
- @command.command("console.view.pop")
- def view_pop(self) -> None:
- """
- Pop a view off the console stack. At the top level, this prompts the
- user to exit mitmproxy.
- """
- signals.pop_view_state.send(self)
-
- @command.command("console.bodyview")
- def bodyview(self, f: flow.Flow, part: str) -> None:
- """
- Spawn an external viewer for a flow request or response body based
- on the detected MIME type. We use the mailcap system to find the
- correct viewier, and fall back to the programs in $PAGER or $EDITOR
- if necessary.
- """
- fpart = getattr(f, part)
- if not fpart:
- raise exceptions.CommandError("Could not view part %s." % part)
- t = fpart.headers.get("content-type")
- content = fpart.get_content(strict=False)
- if not content:
- raise exceptions.CommandError("No content to view.")
- self.master.spawn_external_viewer(content, t)
-
- @command.command("console.edit.focus.options")
- def edit_focus_options(self) -> typing.Sequence[str]:
- return [
- "cookies",
- "form",
- "path",
- "method",
- "query",
- "reason",
- "request-headers",
- "response-headers",
- "status_code",
- "set-cookies",
- "url",
- ]
-
- @command.command("console.edit.focus")
- def edit_focus(self, part: str) -> None:
- """
- Edit the query of the current focus.
- """
- if part == "cookies":
- self.master.switch_view("edit_focus_cookies")
- elif part == "form":
- self.master.switch_view("edit_focus_form")
- elif part == "path":
- self.master.switch_view("edit_focus_path")
- elif part == "query":
- self.master.switch_view("edit_focus_query")
- elif part == "request-headers":
- self.master.switch_view("edit_focus_request_headers")
- elif part == "response-headers":
- self.master.switch_view("edit_focus_response_headers")
- elif part == "set-cookies":
- self.master.switch_view("edit_focus_setcookies")
- elif part in ["url", "method", "status_code", "reason"]:
- self.master.commands.call(
- "console.command flow.set @focus %s " % part
- )
-
- def _grideditor(self):
- gewidget = self.master.window.current("grideditor")
- if not gewidget:
- raise exceptions.CommandError("Not in a grideditor.")
- return gewidget.key_responder()
-
- @command.command("console.grideditor.add")
- def grideditor_add(self) -> None:
- """
- Add a row after the cursor.
- """
- self._grideditor().cmd_add()
-
- @command.command("console.grideditor.insert")
- def grideditor_insert(self) -> None:
- """
- Insert a row before the cursor.
- """
- self._grideditor().cmd_insert()
-
- @command.command("console.grideditor.delete")
- def grideditor_delete(self) -> None:
- """
- Delete row
- """
- self._grideditor().cmd_delete()
-
- @command.command("console.grideditor.readfile")
- def grideditor_readfile(self, path: str) -> None:
- """
- Read a file into the currrent cell.
- """
- self._grideditor().cmd_read_file(path)
-
- @command.command("console.grideditor.readfile_escaped")
- def grideditor_readfile_escaped(self, path: str) -> None:
- """
- Read a file containing a Python-style escaped stringinto the
- currrent cell.
- """
- self._grideditor().cmd_read_file_escaped(path)
-
- @command.command("console.grideditor.editor")
- def grideditor_editor(self) -> None:
- """
- Spawn an external editor on the current cell.
- """
- self._grideditor().cmd_spawn_editor()
-
- @command.command("console.flowview.mode.set")
- def flowview_mode_set(self) -> None:
- """
- Set the display mode for the current flow view.
- """
- fv = self.master.window.current("flowview")
- if not fv:
- raise exceptions.CommandError("Not viewing a flow.")
- idx = fv.body.tab_offset
-
- def callback(opt):
- try:
- self.master.commands.call_args(
- "view.setval",
- ["@focus", "flowview_mode_%s" % idx, opt]
- )
- except exceptions.CommandError as e:
- signals.status_message.send(message=str(e))
-
- opts = [i.name.lower() for i in contentviews.views]
- self.master.overlay(overlay.Chooser(self.master, "Mode", opts, "", callback))
-
- @command.command("console.flowview.mode")
- def flowview_mode(self) -> str:
- """
- Get the display mode for the current flow view.
- """
- fv = self.master.window.current_window("flowview")
- if not fv:
- raise exceptions.CommandError("Not viewing a flow.")
- idx = fv.body.tab_offset
- return self.master.commands.call_args(
- "view.getval",
- [
- "@focus",
- "flowview_mode_%s" % idx,
- self.master.options.default_contentview,
- ]
- )
-
- @command.command("console.eventlog.clear")
- def eventlog_clear(self) -> None:
- """
- Clear the event log.
- """
- signals.sig_clear_log.send(self)
-
- def running(self):
- self.started = True
-
- def update(self, flows):
- if not flows:
- signals.update_settings.send(self)
- for f in flows:
- signals.flow_change.send(self, flow=f)
class ConsoleMaster(master.Master):
@@ -465,14 +48,14 @@ class ConsoleMaster(master.Master):
signals.call_in.connect(self.sig_call_in)
signals.sig_add_log.connect(self.sig_add_log)
- self.addons.add(Logger())
+ self.addons.add(consoleaddons.Logger())
self.addons.add(*addons.default_addons())
self.addons.add(
intercept.Intercept(),
self.view,
- UnsupportedLog(),
+ consoleaddons.UnsupportedLog(),
readfile.ReadFile(),
- ConsoleAddon(self),
+ consoleaddons.ConsoleAddon(self),
)
def sigint_handler(*args, **kwargs):
diff --git a/mitmproxy/tools/console/signals.py b/mitmproxy/tools/console/signals.py
index 49115a5d..5d39d96a 100644
--- a/mitmproxy/tools/console/signals.py
+++ b/mitmproxy/tools/console/signals.py
@@ -48,3 +48,6 @@ flowlist_change = blinker.Signal()
# Pop and push view state onto a stack
pop_view_state = blinker.Signal()
push_view_state = blinker.Signal()
+
+# Fired when the key bindings change
+keybindings_change = blinker.Signal()
diff --git a/mitmproxy/tools/console/window.py b/mitmproxy/tools/console/window.py
index 43e5cceb..6145b645 100644
--- a/mitmproxy/tools/console/window.py
+++ b/mitmproxy/tools/console/window.py
@@ -4,6 +4,7 @@ from mitmproxy.tools.console import statusbar
from mitmproxy.tools.console import flowlist
from mitmproxy.tools.console import flowview
from mitmproxy.tools.console import commands
+from mitmproxy.tools.console import keybindings
from mitmproxy.tools.console import options
from mitmproxy.tools.console import overlay
from mitmproxy.tools.console import help
@@ -29,6 +30,7 @@ class WindowStack:
flowlist = flowlist.FlowListBox(master),
flowview = flowview.FlowView(master),
commands = commands.Commands(master),
+ keybindings = keybindings.KeyBindings(master),
options = options.Options(master),
help = help.HelpView(master),
eventlog = eventlog.EventLog(master),