aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/console
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2016-02-18 23:10:47 +0100
committerMaximilian Hils <git@maximilianhils.com>2016-02-18 23:10:47 +0100
commit7c6bf7abb3c0e94f9c4dfa77fe0690fe11c6d4d3 (patch)
tree3f583d91ff97924068f7017f770b710da2768abe /mitmproxy/console
parentbe02dd105b7803b7b2b6942f9d254539dfd6ba26 (diff)
parent61cde30ef8410dc5400039eea5d312fabf3779a9 (diff)
downloadmitmproxy-7c6bf7abb3c0e94f9c4dfa77fe0690fe11c6d4d3.tar.gz
mitmproxy-7c6bf7abb3c0e94f9c4dfa77fe0690fe11c6d4d3.tar.bz2
mitmproxy-7c6bf7abb3c0e94f9c4dfa77fe0690fe11c6d4d3.zip
Merge pull request #964 from mitmproxy/flat-structure
Flat structure
Diffstat (limited to 'mitmproxy/console')
-rw-r--r--mitmproxy/console/__init__.py744
-rw-r--r--mitmproxy/console/common.py444
-rw-r--r--mitmproxy/console/flowdetailview.py153
-rw-r--r--mitmproxy/console/flowlist.py397
-rw-r--r--mitmproxy/console/flowview.py714
-rw-r--r--mitmproxy/console/grideditor.py716
-rw-r--r--mitmproxy/console/help.py117
-rw-r--r--mitmproxy/console/options.py271
-rw-r--r--mitmproxy/console/palettepicker.py82
-rw-r--r--mitmproxy/console/palettes.py326
-rw-r--r--mitmproxy/console/pathedit.py71
-rw-r--r--mitmproxy/console/searchable.py93
-rw-r--r--mitmproxy/console/select.py120
-rw-r--r--mitmproxy/console/signals.py43
-rw-r--r--mitmproxy/console/statusbar.py258
-rw-r--r--mitmproxy/console/tabs.py70
-rw-r--r--mitmproxy/console/window.py90
17 files changed, 4709 insertions, 0 deletions
diff --git a/mitmproxy/console/__init__.py b/mitmproxy/console/__init__.py
new file mode 100644
index 00000000..e739ec61
--- /dev/null
+++ b/mitmproxy/console/__init__.py
@@ -0,0 +1,744 @@
+from __future__ import absolute_import
+
+import mailcap
+import mimetypes
+import tempfile
+import os
+import os.path
+import shlex
+import signal
+import stat
+import subprocess
+import sys
+import traceback
+import urwid
+import weakref
+
+from .. import controller, flow, script, contentviews
+from . import flowlist, flowview, help, window, signals, options
+from . import grideditor, palettes, statusbar, palettepicker
+
+EVENTLOG_SIZE = 500
+
+
+class ConsoleState(flow.State):
+
+ def __init__(self):
+ flow.State.__init__(self)
+ self.focus = None
+ self.follow_focus = None
+ self.default_body_view = contentviews.get("Auto")
+ self.flowsettings = weakref.WeakKeyDictionary()
+ self.last_search = None
+
+ def __setattr__(self, name, value):
+ self.__dict__[name] = value
+ signals.update_settings.send(self)
+
+ def add_flow_setting(self, flow, key, value):
+ d = self.flowsettings.setdefault(flow, {})
+ d[key] = value
+
+ def get_flow_setting(self, flow, key, default=None):
+ d = self.flowsettings.get(flow, {})
+ return d.get(key, default)
+
+ def add_flow(self, f):
+ super(ConsoleState, self).add_flow(f)
+ if self.focus is None:
+ self.set_focus(0)
+ elif self.follow_focus:
+ self.set_focus(len(self.view) - 1)
+ self.set_flow_marked(f, False)
+ return f
+
+ def update_flow(self, f):
+ super(ConsoleState, self).update_flow(f)
+ if self.focus is None:
+ self.set_focus(0)
+ return f
+
+ def set_limit(self, limit):
+ ret = flow.State.set_limit(self, limit)
+ self.set_focus(self.focus)
+ return ret
+
+ def get_focus(self):
+ if not self.view or self.focus is None:
+ return None, None
+ return self.view[self.focus], self.focus
+
+ def set_focus(self, idx):
+ if self.view:
+ if idx >= len(self.view):
+ idx = len(self.view) - 1
+ elif idx < 0:
+ idx = 0
+ self.focus = idx
+ else:
+ self.focus = None
+
+ def set_focus_flow(self, f):
+ self.set_focus(self.view.index(f))
+
+ def get_from_pos(self, pos):
+ if len(self.view) <= pos or pos < 0:
+ return None, None
+ return self.view[pos], pos
+
+ def get_next(self, pos):
+ return self.get_from_pos(pos + 1)
+
+ def get_prev(self, pos):
+ return self.get_from_pos(pos - 1)
+
+ def delete_flow(self, f):
+ if f in self.view and self.view.index(f) <= self.focus:
+ self.focus -= 1
+ if self.focus < 0:
+ self.focus = None
+ ret = flow.State.delete_flow(self, f)
+ self.set_focus(self.focus)
+ return ret
+
+ def clear(self):
+ marked_flows = []
+ for f in self.flows:
+ if self.flow_marked(f):
+ marked_flows.append(f)
+
+ super(ConsoleState, self).clear()
+
+ for f in marked_flows:
+ self.add_flow(f)
+ self.set_flow_marked(f, True)
+
+ if len(self.flows.views) == 0:
+ self.focus = None
+ else:
+ self.focus = 0
+ self.set_focus(self.focus)
+
+ def flow_marked(self, flow):
+ return self.get_flow_setting(flow, "marked", False)
+
+ def set_flow_marked(self, flow, marked):
+ self.add_flow_setting(flow, "marked", marked)
+
+
+class Options(object):
+ attributes = [
+ "app",
+ "app_domain",
+ "app_ip",
+ "anticache",
+ "anticomp",
+ "client_replay",
+ "eventlog",
+ "follow",
+ "keepserving",
+ "kill",
+ "intercept",
+ "limit",
+ "no_server",
+ "refresh_server_playback",
+ "rfile",
+ "scripts",
+ "showhost",
+ "replacements",
+ "rheaders",
+ "setheaders",
+ "server_replay",
+ "stickycookie",
+ "stickyauth",
+ "stream_large_bodies",
+ "verbosity",
+ "wfile",
+ "nopop",
+ "palette",
+ "palette_transparent",
+ "no_mouse"
+ ]
+
+ def __init__(self, **kwargs):
+ for k, v in kwargs.items():
+ setattr(self, k, v)
+ for i in self.attributes:
+ if not hasattr(self, i):
+ setattr(self, i, None)
+
+
+class ConsoleMaster(flow.FlowMaster):
+ palette = []
+
+ def __init__(self, server, options):
+ flow.FlowMaster.__init__(self, server, ConsoleState())
+ self.stream_path = None
+ self.options = options
+
+ for i in options.replacements:
+ self.replacehooks.add(*i)
+
+ for i in options.setheaders:
+ self.setheaders.add(*i)
+
+ r = self.set_intercept(options.intercept)
+ if r:
+ print >> sys.stderr, "Intercept error:", r
+ sys.exit(1)
+
+ if options.limit:
+ self.set_limit(options.limit)
+
+ r = self.set_stickycookie(options.stickycookie)
+ if r:
+ print >> sys.stderr, "Sticky cookies error:", r
+ sys.exit(1)
+
+ r = self.set_stickyauth(options.stickyauth)
+ if r:
+ print >> sys.stderr, "Sticky auth error:", r
+ sys.exit(1)
+
+ self.set_stream_large_bodies(options.stream_large_bodies)
+
+ self.refresh_server_playback = options.refresh_server_playback
+ self.anticache = options.anticache
+ self.anticomp = options.anticomp
+ self.killextra = options.kill
+ self.rheaders = options.rheaders
+ self.nopop = options.nopop
+ self.showhost = options.showhost
+ self.palette = options.palette
+ self.palette_transparent = options.palette_transparent
+
+ self.eventlog = options.eventlog
+ self.eventlist = urwid.SimpleListWalker([])
+ self.follow = options.follow
+
+ if options.client_replay:
+ self.client_playback_path(options.client_replay)
+
+ if options.server_replay:
+ self.server_playback_path(options.server_replay)
+
+ if options.scripts:
+ for i in options.scripts:
+ err = self.load_script(i)
+ if err:
+ print >> sys.stderr, "Script load error:", err
+ sys.exit(1)
+
+ if options.outfile:
+ err = self.start_stream_to_path(
+ options.outfile[0],
+ options.outfile[1]
+ )
+ if err:
+ print >> sys.stderr, "Stream file error:", err
+ sys.exit(1)
+
+ self.view_stack = []
+
+ if options.app:
+ self.start_app(self.options.app_host, self.options.app_port)
+ signals.call_in.connect(self.sig_call_in)
+ signals.pop_view_state.connect(self.sig_pop_view_state)
+ signals.push_view_state.connect(self.sig_push_view_state)
+ signals.sig_add_event.connect(self.sig_add_event)
+
+ def __setattr__(self, name, value):
+ self.__dict__[name] = value
+ signals.update_settings.send(self)
+
+ def load_script(self, command, use_reloader=True):
+ # We default to using the reloader in the console ui.
+ super(ConsoleMaster, self).load_script(command, use_reloader)
+
+ def sig_add_event(self, sender, e, level):
+ needed = dict(error=0, info=1, debug=2).get(level, 1)
+ if self.options.verbosity < needed:
+ return
+
+ if level == "error":
+ e = urwid.Text(("error", str(e)))
+ else:
+ e = urwid.Text(str(e))
+ self.eventlist.append(e)
+ if len(self.eventlist) > EVENTLOG_SIZE:
+ self.eventlist.pop(0)
+ self.eventlist.set_focus(len(self.eventlist) - 1)
+
+ def add_event(self, e, level):
+ signals.add_event(e, level)
+
+ def sig_call_in(self, sender, seconds, callback, args=()):
+ def cb(*_):
+ return callback(*args)
+ self.loop.set_alarm_in(seconds, cb)
+
+ def sig_pop_view_state(self, sender):
+ if len(self.view_stack) > 1:
+ self.view_stack.pop()
+ self.loop.widget = self.view_stack[-1]
+ else:
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Quit",
+ keys = (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ callback = self.quit,
+ )
+
+ def sig_push_view_state(self, sender, window):
+ self.view_stack.append(window)
+ self.loop.widget = window
+ self.loop.draw_screen()
+
+ def _run_script_method(self, method, s, f):
+ status, val = s.run(method, f)
+ if val:
+ if status:
+ signals.add_event("Method %s return: %s" % (method, val), "debug")
+ else:
+ signals.add_event(
+ "Method %s error: %s" %
+ (method, val[1]), "error")
+
+ def run_script_once(self, command, f):
+ if not command:
+ return
+ signals.add_event("Running script on flow: %s" % command, "debug")
+
+ try:
+ s = script.Script(command, script.ScriptContext(self))
+ except script.ScriptException as v:
+ signals.status_message.send(
+ message = "Error loading script."
+ )
+ signals.add_event("Error loading script:\n%s" % v.args[0], "error")
+ return
+
+ if f.request:
+ self._run_script_method("request", s, f)
+ if f.response:
+ self._run_script_method("response", s, f)
+ if f.error:
+ self._run_script_method("error", s, f)
+ s.unload()
+ signals.flow_change.send(self, flow = f)
+
+ def set_script(self, command):
+ if not command:
+ return
+ ret = self.load_script(command)
+ if ret:
+ signals.status_message.send(message=ret)
+
+ def toggle_eventlog(self):
+ self.eventlog = not self.eventlog
+ signals.pop_view_state.send(self)
+ self.view_flowlist()
+
+ def _readflows(self, path):
+ """
+ Utitility function that reads a list of flows
+ or prints an error to the UI if that fails.
+ Returns
+ - None, if there was an error.
+ - a list of flows, otherwise.
+ """
+ try:
+ return flow.read_flows_from_paths(path)
+ except flow.FlowReadError as e:
+ signals.status_message.send(message=e.strerror)
+
+ def client_playback_path(self, path):
+ if not isinstance(path, list):
+ path = [path]
+ flows = self._readflows(path)
+ if flows:
+ self.start_client_playback(flows, False)
+
+ def server_playback_path(self, path):
+ if not isinstance(path, list):
+ path = [path]
+ flows = self._readflows(path)
+ if flows:
+ self.start_server_playback(
+ flows,
+ self.killextra, self.rheaders,
+ False, self.nopop,
+ self.options.replay_ignore_params,
+ self.options.replay_ignore_content,
+ self.options.replay_ignore_payload_params,
+ self.options.replay_ignore_host
+ )
+
+ def spawn_editor(self, data):
+ fd, name = tempfile.mkstemp('', "mproxy")
+ os.write(fd, data)
+ os.close(fd)
+ c = os.environ.get("EDITOR")
+ # if no EDITOR is set, assume 'vi'
+ if not c:
+ c = "vi"
+ cmd = shlex.split(c)
+ cmd.append(name)
+ self.ui.stop()
+ try:
+ subprocess.call(cmd)
+ except:
+ signals.status_message.send(
+ message = "Can't start editor: %s" % " ".join(c)
+ )
+ else:
+ data = open(name, "rb").read()
+ self.ui.start()
+ os.unlink(name)
+ return data
+
+ def spawn_external_viewer(self, data, contenttype):
+ if contenttype:
+ contenttype = contenttype.split(";")[0]
+ ext = mimetypes.guess_extension(contenttype) or ""
+ else:
+ ext = ""
+ fd, name = tempfile.mkstemp(ext, "mproxy")
+ os.write(fd, data)
+ os.close(fd)
+
+ # read-only to remind the user that this is a view function
+ os.chmod(name, stat.S_IREAD)
+
+ cmd = None
+ shell = False
+
+ if contenttype:
+ c = mailcap.getcaps()
+ cmd, _ = mailcap.findmatch(c, contenttype, filename=name)
+ if cmd:
+ shell = True
+ if not cmd:
+ # hm which one should get priority?
+ c = os.environ.get("PAGER") or os.environ.get("EDITOR")
+ if not c:
+ c = "less"
+ cmd = shlex.split(c)
+ cmd.append(name)
+ self.ui.stop()
+ try:
+ subprocess.call(cmd, shell=shell)
+ except:
+ signals.status_message.send(
+ message="Can't start external viewer: %s" % " ".join(c)
+ )
+ self.ui.start()
+ os.unlink(name)
+
+ def set_palette(self, name):
+ self.palette = name
+ self.ui.register_palette(
+ palettes.palettes[name].palette(self.palette_transparent)
+ )
+ self.ui.clear()
+
+ def ticker(self, *userdata):
+ changed = self.tick(self.masterq, timeout=0)
+ if changed:
+ self.loop.draw_screen()
+ signals.update_settings.send()
+ self.loop.set_alarm_in(0.01, self.ticker)
+
+ def run(self):
+ self.ui = urwid.raw_display.Screen()
+ self.ui.set_terminal_properties(256)
+ self.set_palette(self.palette)
+ self.loop = urwid.MainLoop(
+ urwid.SolidFill("x"),
+ screen = self.ui,
+ handle_mouse = not self.options.no_mouse,
+ )
+
+ self.server.start_slave(
+ controller.Slave,
+ controller.Channel(self.masterq, self.should_exit)
+ )
+
+ if self.options.rfile:
+ ret = self.load_flows_path(self.options.rfile)
+ if ret and self.state.flow_count():
+ signals.add_event(
+ "File truncated or corrupted. "
+ "Loaded as many flows as possible.",
+ "error"
+ )
+ elif ret and not self.state.flow_count():
+ self.shutdown()
+ print >> sys.stderr, "Could not load file:", ret
+ sys.exit(1)
+
+ self.loop.set_alarm_in(0.01, self.ticker)
+
+ # It's not clear why we need to handle this explicitly - without this,
+ # mitmproxy hangs on keyboard interrupt. Remove if we ever figure it
+ # out.
+ def exit(s, f):
+ raise urwid.ExitMainLoop
+ signal.signal(signal.SIGINT, exit)
+
+ self.loop.set_alarm_in(
+ 0.0001,
+ lambda *args: self.view_flowlist()
+ )
+
+ try:
+ self.loop.run()
+ except Exception:
+ self.loop.stop()
+ sys.stdout.flush()
+ print >> sys.stderr, traceback.format_exc()
+ print >> sys.stderr, "mitmproxy has crashed!"
+ print >> sys.stderr, "Please lodge a bug report at:"
+ print >> sys.stderr, "\thttps://github.com/mitmproxy/mitmproxy"
+ print >> sys.stderr, "Shutting down..."
+ sys.stderr.flush()
+ self.shutdown()
+
+ def view_help(self, helpctx):
+ signals.push_view_state.send(
+ self,
+ window = window.Window(
+ self,
+ help.HelpView(helpctx),
+ None,
+ statusbar.StatusBar(self, help.footer),
+ None
+ )
+ )
+
+ def view_options(self):
+ for i in self.view_stack:
+ if isinstance(i["body"], options.Options):
+ return
+ signals.push_view_state.send(
+ self,
+ window = window.Window(
+ self,
+ options.Options(self),
+ None,
+ statusbar.StatusBar(self, options.footer),
+ options.help_context,
+ )
+ )
+
+ def view_palette_picker(self):
+ signals.push_view_state.send(
+ self,
+ window = window.Window(
+ self,
+ palettepicker.PalettePicker(self),
+ None,
+ statusbar.StatusBar(self, palettepicker.footer),
+ palettepicker.help_context,
+ )
+ )
+
+ def view_grideditor(self, ge):
+ signals.push_view_state.send(
+ self,
+ window = window.Window(
+ self,
+ ge,
+ None,
+ statusbar.StatusBar(self, grideditor.FOOTER),
+ ge.make_help()
+ )
+ )
+
+ def view_flowlist(self):
+ if self.ui.started:
+ self.ui.clear()
+ if self.state.follow_focus:
+ self.state.set_focus(self.state.flow_count())
+
+ if self.eventlog:
+ body = flowlist.BodyPile(self)
+ else:
+ body = flowlist.FlowListBox(self)
+
+ if self.follow:
+ self.toggle_follow_flows()
+
+ signals.push_view_state.send(
+ self,
+ window = window.Window(
+ self,
+ body,
+ None,
+ statusbar.StatusBar(self, flowlist.footer),
+ flowlist.help_context
+ )
+ )
+
+ def view_flow(self, flow, tab_offset=0):
+ self.state.set_focus_flow(flow)
+ signals.push_view_state.send(
+ self,
+ window = window.Window(
+ self,
+ flowview.FlowView(self, self.state, flow, tab_offset),
+ flowview.FlowViewHeader(self, flow),
+ statusbar.StatusBar(self, flowview.footer),
+ flowview.help_context
+ )
+ )
+
+ def _write_flows(self, path, flows):
+ if not path:
+ return
+ path = os.path.expanduser(path)
+ try:
+ f = file(path, "wb")
+ fw = flow.FlowWriter(f)
+ for i in flows:
+ fw.add(i)
+ f.close()
+ except IOError as v:
+ signals.status_message.send(message=v.strerror)
+
+ def save_one_flow(self, path, flow):
+ return self._write_flows(path, [flow])
+
+ def save_flows(self, path):
+ return self._write_flows(path, self.state.view)
+
+ def save_marked_flows(self, path):
+ marked_flows = []
+ for f in self.state.view:
+ if self.state.flow_marked(f):
+ marked_flows.append(f)
+ return self._write_flows(path, marked_flows)
+
+ def load_flows_callback(self, path):
+ if not path:
+ return
+ ret = self.load_flows_path(path)
+ return ret or "Flows loaded from %s" % path
+
+ def load_flows_path(self, path):
+ reterr = None
+ try:
+ flow.FlowMaster.load_flows_file(self, path)
+ except flow.FlowReadError as v:
+ reterr = str(v)
+ signals.flowlist_change.send(self)
+ return reterr
+
+ def accept_all(self):
+ self.state.accept_all(self)
+
+ def set_limit(self, txt):
+ v = self.state.set_limit(txt)
+ signals.flowlist_change.send(self)
+ return v
+
+ def set_intercept(self, txt):
+ return self.state.set_intercept(txt)
+
+ def change_default_display_mode(self, t):
+ v = contentviews.get_by_shortcut(t)
+ self.state.default_body_view = v
+ self.refresh_focus()
+
+ def edit_scripts(self, scripts):
+ commands = [x[0] for x in scripts] # remove outer array
+ if commands == [s.command for s in self.scripts]:
+ return
+
+ self.unload_scripts()
+ for command in commands:
+ self.load_script(command)
+ signals.update_settings.send(self)
+
+ def stop_client_playback_prompt(self, a):
+ if a != "n":
+ self.stop_client_playback()
+
+ def stop_server_playback_prompt(self, a):
+ if a != "n":
+ self.stop_server_playback()
+
+ def quit(self, a):
+ if a != "n":
+ raise urwid.ExitMainLoop
+
+ def shutdown(self):
+ self.state.killall(self)
+ flow.FlowMaster.shutdown(self)
+
+ def clear_flows(self):
+ self.state.clear()
+ signals.flowlist_change.send(self)
+
+ def toggle_follow_flows(self):
+ # toggle flow follow
+ self.state.follow_focus = not self.state.follow_focus
+ # jump to most recent flow if follow is now on
+ if self.state.follow_focus:
+ self.state.set_focus(self.state.flow_count())
+ signals.flowlist_change.send(self)
+
+ def delete_flow(self, f):
+ self.state.delete_flow(f)
+ signals.flowlist_change.send(self)
+
+ def refresh_focus(self):
+ if self.state.view:
+ signals.flow_change.send(
+ self,
+ flow = self.state.view[self.state.focus]
+ )
+
+ def process_flow(self, f):
+ if self.state.intercept and f.match(
+ self.state.intercept) and not f.request.is_replay:
+ f.intercept(self)
+ else:
+ # check if flow was intercepted within an inline script by flow.intercept()
+ if f.intercepted:
+ f.intercept(self)
+ else:
+ f.reply()
+ signals.flowlist_change.send(self)
+ signals.flow_change.send(self, flow = f)
+
+ def clear_events(self):
+ self.eventlist[:] = []
+
+ # Handlers
+ def handle_error(self, f):
+ f = flow.FlowMaster.handle_error(self, f)
+ if f:
+ self.process_flow(f)
+ return f
+
+ def handle_request(self, f):
+ f = flow.FlowMaster.handle_request(self, f)
+ if f:
+ self.process_flow(f)
+ return f
+
+ def handle_response(self, f):
+ f = flow.FlowMaster.handle_response(self, f)
+ if f:
+ self.process_flow(f)
+ return f
+
+ def handle_script_change(self, script):
+ if super(ConsoleMaster, self).handle_script_change(script):
+ signals.status_message.send(message='"{}" reloaded.'.format(script.filename))
+ else:
+ signals.status_message.send(message='Error reloading "{}".'.format(script.filename))
diff --git a/mitmproxy/console/common.py b/mitmproxy/console/common.py
new file mode 100644
index 00000000..c29ffddc
--- /dev/null
+++ b/mitmproxy/console/common.py
@@ -0,0 +1,444 @@
+from __future__ import absolute_import
+
+import urwid
+import urwid.util
+import os
+
+from netlib.http import CONTENT_MISSING
+import netlib.utils
+
+from .. import utils
+from .. import flow_export
+from ..models import decoded
+from . import signals
+
+
+try:
+ import pyperclip
+except:
+ pyperclip = False
+
+
+VIEW_FLOW_REQUEST = 0
+VIEW_FLOW_RESPONSE = 1
+
+METHOD_OPTIONS = [
+ ("get", "g"),
+ ("post", "p"),
+ ("put", "u"),
+ ("head", "h"),
+ ("trace", "t"),
+ ("delete", "d"),
+ ("options", "o"),
+ ("edit raw", "e"),
+]
+
+
+def is_keypress(k):
+ """
+ Is this input event a keypress?
+ """
+ if isinstance(k, basestring):
+ return True
+
+
+def highlight_key(str, key, textattr="text", keyattr="key"):
+ l = []
+ parts = str.split(key, 1)
+ if parts[0]:
+ l.append((textattr, parts[0]))
+ l.append((keyattr, key))
+ if parts[1]:
+ l.append((textattr, parts[1]))
+ return l
+
+
+KEY_MAX = 30
+
+
+def format_keyvals(lst, key="key", val="text", indent=0):
+ """
+ Format a list of (key, value) tuples.
+
+ If key is None, it's treated specially:
+ - We assume a sub-value, and add an extra indent.
+ - The value is treated as a pre-formatted list of directives.
+ """
+ ret = []
+ if lst:
+ maxk = min(max(len(i[0]) for i in lst if i and i[0]), KEY_MAX)
+ for i, kv in enumerate(lst):
+ if kv is None:
+ ret.append(urwid.Text(""))
+ else:
+ if isinstance(kv[1], urwid.Widget):
+ v = kv[1]
+ elif kv[1] is None:
+ v = urwid.Text("")
+ else:
+ v = urwid.Text([(val, kv[1])])
+ ret.append(
+ urwid.Columns(
+ [
+ ("fixed", indent, urwid.Text("")),
+ (
+ "fixed",
+ maxk,
+ urwid.Text([(key, kv[0] or "")])
+ ),
+ v
+ ],
+ dividechars = 2
+ )
+ )
+ return ret
+
+
+def shortcuts(k):
+ if k == " ":
+ k = "page down"
+ elif k == "ctrl f":
+ k = "page down"
+ elif k == "ctrl b":
+ k = "page up"
+ elif k == "j":
+ k = "down"
+ elif k == "k":
+ k = "up"
+ return k
+
+
+def fcol(s, attr):
+ s = unicode(s)
+ return (
+ "fixed",
+ len(s),
+ urwid.Text(
+ [
+ (attr, s)
+ ]
+ )
+ )
+
+if urwid.util.detected_encoding:
+ SYMBOL_REPLAY = u"\u21ba"
+ SYMBOL_RETURN = u"\u2190"
+ SYMBOL_MARK = u"\u25cf"
+else:
+ SYMBOL_REPLAY = u"[r]"
+ SYMBOL_RETURN = u"<-"
+ SYMBOL_MARK = "[m]"
+
+
+def raw_format_flow(f, focus, extended):
+ f = dict(f)
+ pile = []
+ req = []
+ if extended:
+ req.append(
+ fcol(
+ utils.format_timestamp(f["req_timestamp"]),
+ "highlight"
+ )
+ )
+ else:
+ req.append(fcol(">>" if focus else " ", "focus"))
+
+ if f["marked"]:
+ req.append(fcol(SYMBOL_MARK, "mark"))
+
+ if f["req_is_replay"]:
+ req.append(fcol(SYMBOL_REPLAY, "replay"))
+ req.append(fcol(f["req_method"], "method"))
+
+ preamble = sum(i[1] for i in req) + len(req) - 1
+
+ if f["intercepted"] and not f["acked"]:
+ uc = "intercept"
+ elif f["resp_code"] or f["err_msg"]:
+ uc = "text"
+ else:
+ uc = "title"
+
+ url = f["req_url"]
+ if f["req_http_version"] not in ("HTTP/1.0", "HTTP/1.1"):
+ url += " " + f["req_http_version"]
+ req.append(
+ urwid.Text([(uc, url)])
+ )
+
+ pile.append(urwid.Columns(req, dividechars=1))
+
+ resp = []
+ resp.append(
+ ("fixed", preamble, urwid.Text(""))
+ )
+
+ if f["resp_code"]:
+ codes = {
+ 2: "code_200",
+ 3: "code_300",
+ 4: "code_400",
+ 5: "code_500",
+ }
+ ccol = codes.get(f["resp_code"] / 100, "code_other")
+ resp.append(fcol(SYMBOL_RETURN, ccol))
+ if f["resp_is_replay"]:
+ resp.append(fcol(SYMBOL_REPLAY, "replay"))
+ resp.append(fcol(f["resp_code"], ccol))
+ if f["intercepted"] and f["resp_code"] and not f["acked"]:
+ rc = "intercept"
+ else:
+ rc = "text"
+
+ if f["resp_ctype"]:
+ resp.append(fcol(f["resp_ctype"], rc))
+ resp.append(fcol(f["resp_clen"], rc))
+ resp.append(fcol(f["roundtrip"], rc))
+
+ elif f["err_msg"]:
+ resp.append(fcol(SYMBOL_RETURN, "error"))
+ resp.append(
+ urwid.Text([
+ (
+ "error",
+ f["err_msg"]
+ )
+ ])
+ )
+ pile.append(urwid.Columns(resp, dividechars=1))
+ return urwid.Pile(pile)
+
+
+# Save file to disk
+def save_data(path, data):
+ if not path:
+ return
+ try:
+ with file(path, "wb") as f:
+ f.write(data)
+ except IOError as v:
+ signals.status_message.send(message=v.strerror)
+
+
+def ask_save_overwrite(path, data):
+ if not path:
+ return
+ path = os.path.expanduser(path)
+ if os.path.exists(path):
+ def save_overwrite(k):
+ if k == "y":
+ save_data(path, data)
+
+ signals.status_prompt_onekey.send(
+ prompt = "'" + path + "' already exists. Overwrite?",
+ keys = (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ callback = save_overwrite
+ )
+ else:
+ save_data(path, data)
+
+
+def ask_save_path(prompt, data):
+ signals.status_prompt_path.send(
+ prompt = prompt,
+ callback = ask_save_overwrite,
+ args = (data, )
+ )
+
+
+def copy_flow_format_data(part, scope, flow):
+ if part == "u":
+ data = flow.request.url
+ else:
+ data = ""
+ if scope in ("q", "a"):
+ if flow.request.content is None or flow.request.content == CONTENT_MISSING:
+ return None, "Request content is missing"
+ with decoded(flow.request):
+ if part == "h":
+ data += netlib.http.http1.assemble_request(flow.request)
+ elif part == "c":
+ data += flow.request.content
+ else:
+ raise ValueError("Unknown part: {}".format(part))
+ if scope == "a" and flow.request.content and flow.response:
+ # Add padding between request and response
+ data += "\r\n" * 2
+ if scope in ("s", "a") and flow.response:
+ if flow.response.content is None or flow.response.content == CONTENT_MISSING:
+ return None, "Response content is missing"
+ with decoded(flow.response):
+ if part == "h":
+ data += netlib.http.http1.assemble_response(flow.response)
+ elif part == "c":
+ data += flow.response.content
+ else:
+ raise ValueError("Unknown part: {}".format(part))
+ return data, False
+
+
+def export_prompt(k, flow):
+ exporters = {
+ "c": flow_export.curl_command,
+ "p": flow_export.python_code,
+ "r": flow_export.raw_request,
+ }
+ if k in exporters:
+ copy_to_clipboard_or_prompt(exporters[k](flow))
+
+
+def copy_to_clipboard_or_prompt(data):
+ # pyperclip calls encode('utf-8') on data to be copied without checking.
+ # if data are already encoded that way UnicodeDecodeError is thrown.
+ toclip = ""
+ try:
+ toclip = data.decode('utf-8')
+ except (UnicodeDecodeError):
+ toclip = data
+
+ try:
+ pyperclip.copy(toclip)
+ except (RuntimeError, UnicodeDecodeError, AttributeError):
+ def save(k):
+ if k == "y":
+ ask_save_path("Save data", data)
+ signals.status_prompt_onekey.send(
+ prompt = "Cannot copy data to clipboard. Save as file?",
+ keys = (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ callback = save
+ )
+
+
+def copy_flow(part, scope, flow, master, state):
+ """
+ part: _c_ontent, _h_eaders+content, _u_rl
+ scope: _a_ll, re_q_uest, re_s_ponse
+ """
+ data, err = copy_flow_format_data(part, scope, flow)
+
+ if err:
+ signals.status_message.send(message=err)
+ return
+
+ if not data:
+ if scope == "q":
+ signals.status_message.send(message="No request content to copy.")
+ elif scope == "s":
+ signals.status_message.send(message="No response content to copy.")
+ else:
+ signals.status_message.send(message="No contents to copy.")
+ return
+
+ copy_to_clipboard_or_prompt(data)
+
+
+def ask_copy_part(scope, flow, master, state):
+ choices = [
+ ("content", "c"),
+ ("headers+content", "h")
+ ]
+ if scope != "s":
+ choices.append(("url", "u"))
+
+ signals.status_prompt_onekey.send(
+ prompt = "Copy",
+ keys = choices,
+ callback = copy_flow,
+ args = (scope, flow, master, state)
+ )
+
+
+def ask_save_body(part, master, state, flow):
+ """
+ Save either the request or the response body to disk. part can either be
+ "q" (request), "s" (response) or None (ask user if necessary).
+ """
+
+ request_has_content = flow.request and flow.request.content
+ response_has_content = flow.response and flow.response.content
+
+ if part is None:
+ # We first need to determine whether we want to save the request or the
+ # response content.
+ if request_has_content and response_has_content:
+ signals.status_prompt_onekey.send(
+ prompt = "Save",
+ keys = (
+ ("request", "q"),
+ ("response", "s"),
+ ),
+ callback = ask_save_body,
+ args = (master, state, flow)
+ )
+ elif response_has_content:
+ ask_save_body("s", master, state, flow)
+ else:
+ ask_save_body("q", master, state, flow)
+
+ elif part == "q" and request_has_content:
+ ask_save_path(
+ "Save request content",
+ flow.request.get_decoded_content()
+ )
+ elif part == "s" and response_has_content:
+ ask_save_path(
+ "Save response content",
+ flow.response.get_decoded_content()
+ )
+ else:
+ signals.status_message.send(message="No content to save.")
+
+
+flowcache = utils.LRUCache(800)
+
+
+def format_flow(f, focus, extended=False, hostheader=False, marked=False):
+ d = dict(
+ intercepted = f.intercepted,
+ acked = f.reply.acked,
+
+ req_timestamp = f.request.timestamp_start,
+ req_is_replay = f.request.is_replay,
+ req_method = f.request.method,
+ req_url = f.request.pretty_url if hostheader else f.request.url,
+ req_http_version = f.request.http_version,
+
+ err_msg = f.error.msg if f.error else None,
+ resp_code = f.response.status_code if f.response else None,
+
+ marked = marked,
+ )
+ if f.response:
+ if f.response.content:
+ contentdesc = netlib.utils.pretty_size(len(f.response.content))
+ elif f.response.content == CONTENT_MISSING:
+ contentdesc = "[content missing]"
+ else:
+ contentdesc = "[no content]"
+ duration = 0
+ if f.response.timestamp_end and f.request.timestamp_start:
+ duration = f.response.timestamp_end - f.request.timestamp_start
+ roundtrip = utils.pretty_duration(duration)
+
+ d.update(dict(
+ resp_code = f.response.status_code,
+ resp_is_replay = f.response.is_replay,
+ resp_clen = contentdesc,
+ roundtrip = roundtrip,
+ ))
+ t = f.response.headers.get("content-type")
+ if t:
+ d["resp_ctype"] = t.split(";")[0]
+ else:
+ d["resp_ctype"] = ""
+ return flowcache.get(
+ raw_format_flow,
+ tuple(sorted(d.items())), focus, extended
+ )
diff --git a/mitmproxy/console/flowdetailview.py b/mitmproxy/console/flowdetailview.py
new file mode 100644
index 00000000..f4b4262e
--- /dev/null
+++ b/mitmproxy/console/flowdetailview.py
@@ -0,0 +1,153 @@
+from __future__ import absolute_import
+import urwid
+from . import common, searchable
+from .. import utils
+
+
+def maybe_timestamp(base, attr):
+ if base and getattr(base, attr):
+ return utils.format_timestamp_with_milli(getattr(base, attr))
+ else:
+ return "active"
+
+
+def flowdetails(state, flow):
+ text = []
+
+ cc = flow.client_conn
+ sc = flow.server_conn
+ req = flow.request
+ resp = flow.response
+
+ if sc is not None:
+ text.append(urwid.Text([("head", "Server Connection:")]))
+ parts = [
+ ["Address", "%s:%s" % sc.address()],
+ ]
+
+ text.extend(
+ common.format_keyvals(parts, key="key", val="text", indent=4)
+ )
+
+ c = sc.cert
+ if c:
+ text.append(urwid.Text([("head", "Server Certificate:")]))
+ parts = [
+ ["Type", "%s, %s bits" % c.keyinfo],
+ ["SHA1 digest", c.digest("sha1")],
+ ["Valid to", str(c.notafter)],
+ ["Valid from", str(c.notbefore)],
+ ["Serial", str(c.serial)],
+ [
+ "Subject",
+ urwid.BoxAdapter(
+ urwid.ListBox(
+ common.format_keyvals(
+ c.subject,
+ key="highlight",
+ val="text"
+ )
+ ),
+ len(c.subject)
+ )
+ ],
+ [
+ "Issuer",
+ urwid.BoxAdapter(
+ urwid.ListBox(
+ common.format_keyvals(
+ c.issuer, key="highlight", val="text"
+ )
+ ),
+ len(c.issuer)
+ )
+ ]
+ ]
+
+ if c.altnames:
+ parts.append(
+ [
+ "Alt names",
+ ", ".join(c.altnames)
+ ]
+ )
+ text.extend(
+ common.format_keyvals(parts, key="key", val="text", indent=4)
+ )
+
+ if cc is not None:
+ text.append(urwid.Text([("head", "Client Connection:")]))
+
+ parts = [
+ ["Address", "%s:%s" % cc.address()],
+ # ["Requests", "%s"%cc.requestcount],
+ ]
+
+ text.extend(
+ common.format_keyvals(parts, key="key", val="text", indent=4)
+ )
+
+ parts = []
+
+ parts.append(
+ [
+ "Client conn. established",
+ maybe_timestamp(cc, "timestamp_start")
+ ]
+ )
+ parts.append(
+ [
+ "Server conn. initiated",
+ maybe_timestamp(sc, "timestamp_start")
+ ]
+ )
+ parts.append(
+ [
+ "Server conn. TCP handshake",
+ maybe_timestamp(sc, "timestamp_tcp_setup")
+ ]
+ )
+ if sc.ssl_established:
+ parts.append(
+ [
+ "Server conn. SSL handshake",
+ maybe_timestamp(sc, "timestamp_ssl_setup")
+ ]
+ )
+ parts.append(
+ [
+ "Client conn. SSL handshake",
+ maybe_timestamp(cc, "timestamp_ssl_setup")
+ ]
+ )
+ parts.append(
+ [
+ "First request byte",
+ maybe_timestamp(req, "timestamp_start")
+ ]
+ )
+ parts.append(
+ [
+ "Request complete",
+ maybe_timestamp(req, "timestamp_end")
+ ]
+ )
+ parts.append(
+ [
+ "First response byte",
+ maybe_timestamp(resp, "timestamp_start")
+ ]
+ )
+ parts.append(
+ [
+ "Response complete",
+ maybe_timestamp(resp, "timestamp_end")
+ ]
+ )
+
+ # sort operations by timestamp
+ parts = sorted(parts, key=lambda p: p[1])
+
+ text.append(urwid.Text([("head", "Timing:")]))
+ text.extend(common.format_keyvals(parts, key="key", val="text", indent=4))
+ return searchable.Searchable(state, text)
diff --git a/mitmproxy/console/flowlist.py b/mitmproxy/console/flowlist.py
new file mode 100644
index 00000000..c2201055
--- /dev/null
+++ b/mitmproxy/console/flowlist.py
@@ -0,0 +1,397 @@
+from __future__ import absolute_import
+import urwid
+
+import netlib.utils
+
+from . import common, signals
+
+
+def _mkhelp():
+ text = []
+ keys = [
+ ("A", "accept all intercepted flows"),
+ ("a", "accept this intercepted flow"),
+ ("b", "save request/response body"),
+ ("C", "clear flow list or eventlog"),
+ ("d", "delete flow"),
+ ("D", "duplicate flow"),
+ ("E", "export"),
+ ("e", "toggle eventlog"),
+ ("F", "toggle follow flow list"),
+ ("l", "set limit filter pattern"),
+ ("L", "load saved flows"),
+ ("m", "toggle flow mark"),
+ ("n", "create a new request"),
+ ("P", "copy flow to clipboard"),
+ ("r", "replay request"),
+ ("U", "unmark all marked flows"),
+ ("V", "revert changes to request"),
+ ("w", "save flows "),
+ ("W", "stream flows to file"),
+ ("X", "kill and delete flow, even if it's mid-intercept"),
+ ("tab", "tab between eventlog and flow list"),
+ ("enter", "view flow"),
+ ("|", "run script on this flow"),
+ ]
+ text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
+ return text
+help_context = _mkhelp()
+
+footer = [
+ ('heading_key', "?"), ":help ",
+]
+
+
+class EventListBox(urwid.ListBox):
+
+ def __init__(self, master):
+ self.master = master
+ urwid.ListBox.__init__(self, master.eventlist)
+
+ def keypress(self, size, key):
+ key = common.shortcuts(key)
+ if key == "C":
+ self.master.clear_events()
+ key = None
+ elif key == "G":
+ self.set_focus(len(self.master.eventlist) - 1)
+ elif key == "g":
+ self.set_focus(0)
+ return urwid.ListBox.keypress(self, size, key)
+
+
+class BodyPile(urwid.Pile):
+
+ def __init__(self, master):
+ h = urwid.Text("Event log")
+ h = urwid.Padding(h, align="left", width=("relative", 100))
+
+ self.inactive_header = urwid.AttrWrap(h, "heading_inactive")
+ self.active_header = urwid.AttrWrap(h, "heading")
+
+ urwid.Pile.__init__(
+ self,
+ [
+ FlowListBox(master),
+ urwid.Frame(
+ EventListBox(master),
+ header = self.inactive_header
+ )
+ ]
+ )
+ self.master = master
+
+ def keypress(self, size, key):
+ if key == "tab":
+ self.focus_position = (
+ self.focus_position + 1) % len(self.widget_list)
+ if self.focus_position == 1:
+ self.widget_list[1].header = self.active_header
+ else:
+ self.widget_list[1].header = self.inactive_header
+ key = None
+ elif key == "e":
+ self.master.toggle_eventlog()
+ key = None
+
+ # This is essentially a copypasta from urwid.Pile's keypress handler.
+ # So much for "closed for modification, but open for extension".
+ item_rows = None
+ if len(size) == 2:
+ item_rows = self.get_item_rows(size, focus = True)
+ i = self.widget_list.index(self.focus_item)
+ tsize = self.get_item_size(size, i, True, item_rows)
+ return self.focus_item.keypress(tsize, key)
+
+
+class ConnectionItem(urwid.WidgetWrap):
+
+ def __init__(self, master, state, flow, focus):
+ self.master, self.state, self.flow = master, state, flow
+ self.f = focus
+ w = self.get_text()
+ urwid.WidgetWrap.__init__(self, w)
+
+ def get_text(self):
+ return common.format_flow(
+ self.flow,
+ self.f,
+ hostheader = self.master.showhost,
+ marked=self.state.flow_marked(self.flow)
+ )
+
+ def selectable(self):
+ return True
+
+ def save_flows_prompt(self, k):
+ if k == "a":
+ signals.status_prompt_path.send(
+ prompt = "Save all flows to",
+ callback = self.master.save_flows
+ )
+ elif k == "m":
+ signals.status_prompt_path.send(
+ prompt = "Save marked flows to",
+ callback = self.master.save_marked_flows
+ )
+ else:
+ signals.status_prompt_path.send(
+ prompt = "Save this flow to",
+ callback = self.master.save_one_flow,
+ args = (self.flow,)
+ )
+
+ def stop_server_playback_prompt(self, a):
+ if a != "n":
+ self.master.stop_server_playback()
+
+ def server_replay_prompt(self, k):
+ if k == "a":
+ self.master.start_server_playback(
+ [i.copy() for i in self.master.state.view],
+ self.master.killextra, self.master.rheaders,
+ False, self.master.nopop,
+ self.master.options.replay_ignore_params,
+ self.master.options.replay_ignore_content,
+ self.master.options.replay_ignore_payload_params,
+ self.master.options.replay_ignore_host
+ )
+ elif k == "t":
+ self.master.start_server_playback(
+ [self.flow.copy()],
+ self.master.killextra, self.master.rheaders,
+ False, self.master.nopop,
+ self.master.options.replay_ignore_params,
+ self.master.options.replay_ignore_content,
+ self.master.options.replay_ignore_payload_params,
+ self.master.options.replay_ignore_host
+ )
+ else:
+ signals.status_prompt_path.send(
+ prompt = "Server replay path",
+ callback = self.master.server_playback_path
+ )
+
+ def mouse_event(self, size, event, button, col, row, focus):
+ if event == "mouse press" and button == 1:
+ if self.flow.request:
+ self.master.view_flow(self.flow)
+ return True
+
+ def keypress(self, xxx_todo_changeme, key):
+ (maxcol,) = xxx_todo_changeme
+ key = common.shortcuts(key)
+ if key == "a":
+ self.flow.accept_intercept(self.master)
+ signals.flowlist_change.send(self)
+ elif key == "d":
+ self.flow.kill(self.master)
+ self.state.delete_flow(self.flow)
+ signals.flowlist_change.send(self)
+ elif key == "D":
+ f = self.master.duplicate_flow(self.flow)
+ self.master.view_flow(f)
+ elif key == "m":
+ if self.state.flow_marked(self.flow):
+ self.state.set_flow_marked(self.flow, False)
+ else:
+ self.state.set_flow_marked(self.flow, True)
+ signals.flowlist_change.send(self)
+ elif key == "r":
+ r = self.master.replay_request(self.flow)
+ if r:
+ signals.status_message.send(message=r)
+ signals.flowlist_change.send(self)
+ elif key == "S":
+ if not self.master.server_playback:
+ signals.status_prompt_onekey.send(
+ prompt = "Server Replay",
+ keys = (
+ ("all flows", "a"),
+ ("this flow", "t"),
+ ("file", "f"),
+ ),
+ callback = self.server_replay_prompt,
+ )
+ else:
+ signals.status_prompt_onekey.send(
+ prompt = "Stop current server replay?",
+ keys = (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ callback = self.stop_server_playback_prompt,
+ )
+ elif key == "U":
+ for f in self.state.flows:
+ self.state.set_flow_marked(f, False)
+ signals.flowlist_change.send(self)
+ elif key == "V":
+ if not self.flow.modified():
+ signals.status_message.send(message="Flow not modified.")
+ return
+ self.state.revert(self.flow)
+ signals.flowlist_change.send(self)
+ signals.status_message.send(message="Reverted.")
+ elif key == "w":
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Save",
+ keys = (
+ ("all flows", "a"),
+ ("this flow", "t"),
+ ("marked flows", "m"),
+ ),
+ callback = self.save_flows_prompt,
+ )
+ elif key == "X":
+ self.flow.kill(self.master)
+ elif key == "enter":
+ if self.flow.request:
+ self.master.view_flow(self.flow)
+ elif key == "|":
+ signals.status_prompt_path.send(
+ prompt = "Send flow to script",
+ callback = self.master.run_script_once,
+ args = (self.flow,)
+ )
+ elif key == "P":
+ common.ask_copy_part("a", self.flow, self.master, self.state)
+ elif key == "E":
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Export",
+ keys = (
+ ("as curl command", "c"),
+ ("as python code", "p"),
+ ("as raw request", "r"),
+ ),
+ callback = common.export_prompt,
+ args = (self.flow,)
+ )
+ elif key == "b":
+ common.ask_save_body(None, self.master, self.state, self.flow)
+ else:
+ return key
+
+
+class FlowListWalker(urwid.ListWalker):
+
+ def __init__(self, master, state):
+ self.master, self.state = master, state
+ signals.flowlist_change.connect(self.sig_flowlist_change)
+
+ def sig_flowlist_change(self, sender):
+ self._modified()
+
+ def get_focus(self):
+ f, i = self.state.get_focus()
+ f = ConnectionItem(self.master, self.state, f, True) if f else None
+ return f, i
+
+ def set_focus(self, focus):
+ ret = self.state.set_focus(focus)
+ return ret
+
+ def get_next(self, pos):
+ f, i = self.state.get_next(pos)
+ f = ConnectionItem(self.master, self.state, f, False) if f else None
+ return f, i
+
+ def get_prev(self, pos):
+ f, i = self.state.get_prev(pos)
+ f = ConnectionItem(self.master, self.state, f, False) if f else None
+ return f, i
+
+
+class FlowListBox(urwid.ListBox):
+
+ def __init__(self, master):
+ self.master = master
+ urwid.ListBox.__init__(
+ self,
+ FlowListWalker(master, master.state)
+ )
+
+ def get_method_raw(self, k):
+ if k:
+ self.get_url(k)
+
+ def get_method(self, k):
+ if k == "e":
+ signals.status_prompt.send(
+ self,
+ prompt = "Method",
+ text = "",
+ callback = self.get_method_raw
+ )
+ else:
+ method = ""
+ for i in common.METHOD_OPTIONS:
+ if i[1] == k:
+ method = i[0].upper()
+ self.get_url(method)
+
+ def get_url(self, method):
+ signals.status_prompt.send(
+ prompt = "URL",
+ text = "http://www.example.com/",
+ callback = self.new_request,
+ args = (method,)
+ )
+
+ def new_request(self, url, method):
+ parts = netlib.utils.parse_url(str(url))
+ if not parts:
+ signals.status_message.send(message="Invalid Url")
+ return
+ scheme, host, port, path = parts
+ f = self.master.create_request(method, scheme, host, port, path)
+ self.master.view_flow(f)
+
+ def keypress(self, size, key):
+ key = common.shortcuts(key)
+ if key == "A":
+ self.master.accept_all()
+ signals.flowlist_change.send(self)
+ elif key == "C":
+ self.master.clear_flows()
+ elif key == "e":
+ self.master.toggle_eventlog()
+ elif key == "g":
+ self.master.state.set_focus(0)
+ signals.flowlist_change.send(self)
+ elif key == "G":
+ self.master.state.set_focus(self.master.state.flow_count())
+ signals.flowlist_change.send(self)
+ elif key == "l":
+ signals.status_prompt.send(
+ prompt = "Limit",
+ text = self.master.state.limit_txt,
+ callback = self.master.set_limit
+ )
+ elif key == "L":
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Load flows",
+ callback = self.master.load_flows_callback
+ )
+ elif key == "n":
+ signals.status_prompt_onekey.send(
+ prompt = "Method",
+ keys = common.METHOD_OPTIONS,
+ callback = self.get_method
+ )
+ elif key == "F":
+ self.master.toggle_follow_flows()
+ elif key == "W":
+ if self.master.stream:
+ self.master.stop_stream()
+ else:
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Stream flows to",
+ callback = self.master.start_stream_to_path
+ )
+ else:
+ return urwid.ListBox.keypress(self, size, key)
diff --git a/mitmproxy/console/flowview.py b/mitmproxy/console/flowview.py
new file mode 100644
index 00000000..f74ab140
--- /dev/null
+++ b/mitmproxy/console/flowview.py
@@ -0,0 +1,714 @@
+from __future__ import absolute_import, division
+import os
+import traceback
+import sys
+
+import math
+import urwid
+
+from netlib import odict
+from netlib.http import CONTENT_MISSING, Headers
+from . import common, grideditor, signals, searchable, tabs
+from . import flowdetailview
+from .. import utils, controller, contentviews
+from ..models import HTTPRequest, HTTPResponse, decoded
+from ..exceptions import ContentViewException
+
+
+class SearchError(Exception):
+ pass
+
+
+def _mkhelp():
+ text = []
+ keys = [
+ ("A", "accept all intercepted flows"),
+ ("a", "accept this intercepted flow"),
+ ("b", "save request/response body"),
+ ("D", "duplicate flow"),
+ ("d", "delete flow"),
+ ("E", "export"),
+ ("e", "edit request/response"),
+ ("f", "load full body data"),
+ ("m", "change body display mode for this entity"),
+ (None,
+ common.highlight_key("automatic", "a") +
+ [("text", ": automatic detection")]
+ ),
+ (None,
+ common.highlight_key("hex", "e") +
+ [("text", ": Hex")]
+ ),
+ (None,
+ common.highlight_key("html", "h") +
+ [("text", ": HTML")]
+ ),
+ (None,
+ common.highlight_key("image", "i") +
+ [("text", ": Image")]
+ ),
+ (None,
+ common.highlight_key("javascript", "j") +
+ [("text", ": JavaScript")]
+ ),
+ (None,
+ common.highlight_key("json", "s") +
+ [("text", ": JSON")]
+ ),
+ (None,
+ common.highlight_key("urlencoded", "u") +
+ [("text", ": URL-encoded data")]
+ ),
+ (None,
+ common.highlight_key("raw", "r") +
+ [("text", ": raw data")]
+ ),
+ (None,
+ common.highlight_key("xml", "x") +
+ [("text", ": XML")]
+ ),
+ ("M", "change default body display mode"),
+ ("p", "previous flow"),
+ ("P", "copy request/response (content/headers) to clipboard"),
+ ("r", "replay request"),
+ ("V", "revert changes to request"),
+ ("v", "view body in external viewer"),
+ ("w", "save all flows matching current limit"),
+ ("W", "save this flow"),
+ ("x", "delete body"),
+ ("z", "encode/decode a request/response"),
+ ("tab", "next tab"),
+ ("h, l", "previous tab, next tab"),
+ ("space", "next flow"),
+ ("|", "run script on this flow"),
+ ("/", "search (case sensitive)"),
+ ("n", "repeat search forward"),
+ ("N", "repeat search backwards"),
+ ]
+ text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
+ return text
+help_context = _mkhelp()
+
+footer = [
+ ('heading_key', "?"), ":help ",
+ ('heading_key', "q"), ":back ",
+]
+
+
+class FlowViewHeader(urwid.WidgetWrap):
+
+ def __init__(self, master, f):
+ self.master, self.flow = master, f
+ self._w = common.format_flow(
+ f,
+ False,
+ extended=True,
+ hostheader=self.master.showhost
+ )
+ signals.flow_change.connect(self.sig_flow_change)
+
+ def sig_flow_change(self, sender, flow):
+ if flow == self.flow:
+ self._w = common.format_flow(
+ flow,
+ False,
+ extended=True,
+ hostheader=self.master.showhost
+ )
+
+
+cache = utils.LRUCache(200)
+
+TAB_REQ = 0
+TAB_RESP = 1
+
+
+class FlowView(tabs.Tabs):
+ highlight_color = "focusfield"
+
+ def __init__(self, master, state, flow, tab_offset):
+ self.master, self.state, self.flow = master, state, flow
+ tabs.Tabs.__init__(self,
+ [
+ (self.tab_request, self.view_request),
+ (self.tab_response, self.view_response),
+ (self.tab_details, self.view_details),
+ ],
+ tab_offset
+ )
+ self.show()
+ self.last_displayed_body = None
+ signals.flow_change.connect(self.sig_flow_change)
+
+ def tab_request(self):
+ if self.flow.intercepted and not self.flow.reply.acked and not self.flow.response:
+ return "Request intercepted"
+ else:
+ return "Request"
+
+ def tab_response(self):
+ if self.flow.intercepted and not self.flow.reply.acked and self.flow.response:
+ return "Response intercepted"
+ else:
+ return "Response"
+
+ def tab_details(self):
+ return "Detail"
+
+ def view_request(self):
+ return self.conn_text(self.flow.request)
+
+ def view_response(self):
+ return self.conn_text(self.flow.response)
+
+ def view_details(self):
+ return flowdetailview.flowdetails(self.state, self.flow)
+
+ def sig_flow_change(self, sender, flow):
+ if flow == self.flow:
+ self.show()
+
+ def content_view(self, viewmode, message):
+ if message.content == CONTENT_MISSING:
+ msg, body = "", [urwid.Text([("error", "[content missing]")])]
+ return msg, body
+ else:
+ full = self.state.get_flow_setting(
+ self.flow,
+ (self.tab_offset, "fullcontents"),
+ False
+ )
+ if full:
+ limit = sys.maxsize
+ else:
+ limit = contentviews.VIEW_CUTOFF
+ return cache.get(
+ self._get_content_view,
+ viewmode,
+ message,
+ limit,
+ (bytes(message.headers), message.content) # Cache invalidation
+ )
+
+ def _get_content_view(self, viewmode, message, max_lines, _):
+
+ try:
+ query = None
+ if isinstance(message, HTTPRequest):
+ query = message.query
+ description, lines = contentviews.get_content_view(
+ viewmode, message.content, headers=message.headers, query=query
+ )
+ except ContentViewException:
+ s = "Content viewer failed: \n" + traceback.format_exc()
+ signals.add_event(s, "error")
+ description, lines = contentviews.get_content_view(
+ contentviews.get("Raw"), message.content, headers=message.headers
+ )
+ description = description.replace("Raw", "Couldn't parse: falling back to Raw")
+
+ # Give hint that you have to tab for the response.
+ if description == "No content" and isinstance(message, HTTPRequest):
+ description = "No request content (press tab to view response)"
+
+ # If the users has a wide terminal, he gets fewer lines; this should not be an issue.
+ chars_per_line = 80
+ max_chars = max_lines * chars_per_line
+ total_chars = 0
+ text_objects = []
+ for line in lines:
+ txt = []
+ for (style, text) in line:
+ if total_chars + len(text) > max_chars:
+ text = text[:max_chars - total_chars]
+ txt.append((style, text))
+ total_chars += len(text)
+ if total_chars == max_chars:
+ break
+
+ # round up to the next line.
+ total_chars = int(math.ceil(total_chars / chars_per_line) * chars_per_line)
+
+ text_objects.append(urwid.Text(txt))
+ if total_chars == max_chars:
+ text_objects.append(urwid.Text([
+ ("highlight", "Stopped displaying data after %d lines. Press " % max_lines),
+ ("key", "f"),
+ ("highlight", " to load all data.")
+ ]))
+ break
+
+ return description, text_objects
+
+ def viewmode_get(self):
+ override = self.state.get_flow_setting(
+ self.flow,
+ (self.tab_offset, "prettyview")
+ )
+ return self.state.default_body_view if override is None else override
+
+ def conn_text(self, conn):
+ if conn:
+ txt = common.format_keyvals(
+ [(h + ":", v) for (h, v) in conn.headers.fields],
+ key = "header",
+ val = "text"
+ )
+ viewmode = self.viewmode_get()
+ msg, body = self.content_view(viewmode, conn)
+
+ cols = [
+ urwid.Text(
+ [
+ ("heading", msg),
+ ]
+ ),
+ urwid.Text(
+ [
+ " ",
+ ('heading', "["),
+ ('heading_key', "m"),
+ ('heading', (":%s]" % viewmode.name)),
+ ],
+ align="right"
+ )
+ ]
+ title = urwid.AttrWrap(urwid.Columns(cols), "heading")
+
+ txt.append(title)
+ txt.extend(body)
+ else:
+ txt = [
+ urwid.Text(""),
+ urwid.Text(
+ [
+ ("highlight", "No response. Press "),
+ ("key", "e"),
+ ("highlight", " and edit any aspect to add one."),
+ ]
+ )
+ ]
+ return searchable.Searchable(self.state, txt)
+
+ def set_method_raw(self, m):
+ if m:
+ self.flow.request.method = m
+ signals.flow_change.send(self, flow = self.flow)
+
+ def edit_method(self, m):
+ if m == "e":
+ signals.status_prompt.send(
+ prompt = "Method",
+ text = self.flow.request.method,
+ callback = self.set_method_raw
+ )
+ else:
+ for i in common.METHOD_OPTIONS:
+ if i[1] == m:
+ self.flow.request.method = i[0].upper()
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_url(self, url):
+ request = self.flow.request
+ try:
+ request.url = str(url)
+ except ValueError:
+ return "Invalid URL."
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_resp_code(self, code):
+ response = self.flow.response
+ try:
+ response.status_code = int(code)
+ except ValueError:
+ return None
+ import BaseHTTPServer
+ if int(code) in BaseHTTPServer.BaseHTTPRequestHandler.responses:
+ response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[
+ int(code)][0]
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_resp_msg(self, msg):
+ response = self.flow.response
+ response.msg = msg
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_headers(self, fields, conn):
+ conn.headers = Headers(fields)
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_query(self, lst, conn):
+ conn.set_query(odict.ODict(lst))
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_path_components(self, lst, conn):
+ conn.set_path_components(lst)
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_form(self, lst, conn):
+ conn.set_form_urlencoded(odict.ODict(lst))
+ signals.flow_change.send(self, flow = self.flow)
+
+ def edit_form(self, conn):
+ self.master.view_grideditor(
+ grideditor.URLEncodedFormEditor(
+ self.master,
+ conn.get_form_urlencoded().lst,
+ self.set_form,
+ conn
+ )
+ )
+
+ def edit_form_confirm(self, key, conn):
+ if key == "y":
+ self.edit_form(conn)
+
+ def set_cookies(self, lst, conn):
+ od = odict.ODict(lst)
+ conn.set_cookies(od)
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_setcookies(self, data, conn):
+ conn.set_cookies(data)
+ signals.flow_change.send(self, flow = self.flow)
+
+ def edit(self, part):
+ if self.tab_offset == TAB_REQ:
+ message = self.flow.request
+ else:
+ if not self.flow.response:
+ self.flow.response = HTTPResponse(
+ self.flow.request.http_version,
+ 200, "OK", Headers(), ""
+ )
+ self.flow.response.reply = controller.DummyReply()
+ message = self.flow.response
+
+ self.flow.backup()
+ if message == self.flow.request and part == "c":
+ self.master.view_grideditor(
+ grideditor.CookieEditor(
+ self.master,
+ message.get_cookies().lst,
+ self.set_cookies,
+ message
+ )
+ )
+ if message == self.flow.response and part == "c":
+ self.master.view_grideditor(
+ grideditor.SetCookieEditor(
+ self.master,
+ message.get_cookies(),
+ self.set_setcookies,
+ message
+ )
+ )
+ if part == "r":
+ with decoded(message):
+ # Fix an issue caused by some editors when editing a
+ # request/response body. Many editors make it hard to save a
+ # file without a terminating newline on the last line. When
+ # editing message bodies, this can cause problems. For now, I
+ # just strip the newlines off the end of the body when we return
+ # from an editor.
+ c = self.master.spawn_editor(message.content or "")
+ message.content = c.rstrip("\n")
+ elif part == "f":
+ if not message.get_form_urlencoded() and message.content:
+ signals.status_prompt_onekey.send(
+ prompt = "Existing body is not a URL-encoded form. Clear and edit?",
+ keys = [
+ ("yes", "y"),
+ ("no", "n"),
+ ],
+ callback = self.edit_form_confirm,
+ args = (message,)
+ )
+ else:
+ self.edit_form(message)
+ elif part == "h":
+ self.master.view_grideditor(
+ grideditor.HeaderEditor(
+ self.master,
+ message.headers.fields,
+ self.set_headers,
+ message
+ )
+ )
+ elif part == "p":
+ p = message.get_path_components()
+ self.master.view_grideditor(
+ grideditor.PathEditor(
+ self.master,
+ p,
+ self.set_path_components,
+ message
+ )
+ )
+ elif part == "q":
+ self.master.view_grideditor(
+ grideditor.QueryEditor(
+ self.master,
+ message.get_query().lst,
+ self.set_query, message
+ )
+ )
+ elif part == "u":
+ signals.status_prompt.send(
+ prompt = "URL",
+ text = message.url,
+ callback = self.set_url
+ )
+ elif part == "m":
+ signals.status_prompt_onekey.send(
+ prompt = "Method",
+ keys = common.METHOD_OPTIONS,
+ callback = self.edit_method
+ )
+ elif part == "o":
+ signals.status_prompt.send(
+ prompt = "Code",
+ text = str(message.status_code),
+ callback = self.set_resp_code
+ )
+ elif part == "m":
+ signals.status_prompt.send(
+ prompt = "Message",
+ text = message.msg,
+ callback = self.set_resp_msg
+ )
+ signals.flow_change.send(self, flow = self.flow)
+
+ def _view_nextprev_flow(self, np, flow):
+ try:
+ idx = self.state.view.index(flow)
+ except IndexError:
+ return
+ if np == "next":
+ new_flow, new_idx = self.state.get_next(idx)
+ else:
+ new_flow, new_idx = self.state.get_prev(idx)
+ if new_flow is None:
+ signals.status_message.send(message="No more flows!")
+ else:
+ signals.pop_view_state.send(self)
+ self.master.view_flow(new_flow, self.tab_offset)
+
+ def view_next_flow(self, flow):
+ return self._view_nextprev_flow("next", flow)
+
+ def view_prev_flow(self, flow):
+ return self._view_nextprev_flow("prev", flow)
+
+ def change_this_display_mode(self, t):
+ self.state.add_flow_setting(
+ self.flow,
+ (self.tab_offset, "prettyview"),
+ contentviews.get_by_shortcut(t)
+ )
+ signals.flow_change.send(self, flow = self.flow)
+
+ def delete_body(self, t):
+ if t == "m":
+ val = CONTENT_MISSING
+ else:
+ val = None
+ if self.tab_offset == TAB_REQ:
+ self.flow.request.content = val
+ else:
+ self.flow.response.content = val
+ signals.flow_change.send(self, flow = self.flow)
+
+ def keypress(self, size, key):
+ key = super(self.__class__, self).keypress(size, key)
+
+ if key == " ":
+ self.view_next_flow(self.flow)
+ return
+
+ key = common.shortcuts(key)
+ if self.tab_offset == TAB_REQ:
+ conn = self.flow.request
+ elif self.tab_offset == TAB_RESP:
+ conn = self.flow.response
+ else:
+ conn = None
+
+ if key in ("up", "down", "page up", "page down"):
+ # Why doesn't this just work??
+ self._w.keypress(size, key)
+ elif key == "a":
+ self.flow.accept_intercept(self.master)
+ signals.flow_change.send(self, flow = self.flow)
+ elif key == "A":
+ self.master.accept_all()
+ signals.flow_change.send(self, flow = self.flow)
+ elif key == "d":
+ if self.state.flow_count() == 1:
+ self.master.view_flowlist()
+ elif self.state.view.index(self.flow) == len(self.state.view) - 1:
+ self.view_prev_flow(self.flow)
+ else:
+ self.view_next_flow(self.flow)
+ f = self.flow
+ f.kill(self.master)
+ self.state.delete_flow(f)
+ elif key == "D":
+ f = self.master.duplicate_flow(self.flow)
+ self.master.view_flow(f)
+ signals.status_message.send(message="Duplicated.")
+ elif key == "p":
+ self.view_prev_flow(self.flow)
+ elif key == "r":
+ r = self.master.replay_request(self.flow)
+ if r:
+ signals.status_message.send(message=r)
+ signals.flow_change.send(self, flow = self.flow)
+ elif key == "V":
+ if not self.flow.modified():
+ signals.status_message.send(message="Flow not modified.")
+ return
+ self.state.revert(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
+ signals.status_message.send(message="Reverted.")
+ elif key == "W":
+ signals.status_prompt_path.send(
+ prompt = "Save this flow",
+ callback = self.master.save_one_flow,
+ args = (self.flow,)
+ )
+ elif key == "E":
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Export",
+ keys = (
+ ("as curl command", "c"),
+ ("as python code", "p"),
+ ("as raw request", "r"),
+ ),
+ callback = common.export_prompt,
+ args = (self.flow,)
+ )
+ elif key == "|":
+ signals.status_prompt_path.send(
+ prompt = "Send flow to script",
+ callback = self.master.run_script_once,
+ args = (self.flow,)
+ )
+
+ if not conn and key in set(list("befgmxvz")):
+ signals.status_message.send(
+ message = "Tab to the request or response",
+ expire = 1
+ )
+ elif conn:
+ if key == "b":
+ if self.tab_offset == TAB_REQ:
+ common.ask_save_body(
+ "q", self.master, self.state, self.flow
+ )
+ else:
+ common.ask_save_body(
+ "s", self.master, self.state, self.flow
+ )
+ elif key == "e":
+ if self.tab_offset == TAB_REQ:
+ signals.status_prompt_onekey.send(
+ prompt = "Edit request",
+ keys = (
+ ("cookies", "c"),
+ ("query", "q"),
+ ("path", "p"),
+ ("url", "u"),
+ ("header", "h"),
+ ("form", "f"),
+ ("raw body", "r"),
+ ("method", "m"),
+ ),
+ callback = self.edit
+ )
+ else:
+ signals.status_prompt_onekey.send(
+ prompt = "Edit response",
+ keys = (
+ ("cookies", "c"),
+ ("code", "o"),
+ ("message", "m"),
+ ("header", "h"),
+ ("raw body", "r"),
+ ),
+ callback = self.edit
+ )
+ key = None
+ elif key == "f":
+ signals.status_message.send(message="Loading all body data...")
+ self.state.add_flow_setting(
+ self.flow,
+ (self.tab_offset, "fullcontents"),
+ True
+ )
+ signals.flow_change.send(self, flow = self.flow)
+ signals.status_message.send(message="")
+ elif key == "P":
+ if self.tab_offset == TAB_REQ:
+ scope = "q"
+ else:
+ scope = "s"
+ common.ask_copy_part(scope, self.flow, self.master, self.state)
+ elif key == "m":
+ p = list(contentviews.view_prompts)
+ p.insert(0, ("Clear", "C"))
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Display mode",
+ keys = p,
+ callback = self.change_this_display_mode
+ )
+ key = None
+ elif key == "x":
+ signals.status_prompt_onekey.send(
+ prompt = "Delete body",
+ keys = (
+ ("completely", "c"),
+ ("mark as missing", "m"),
+ ),
+ callback = self.delete_body
+ )
+ key = None
+ elif key == "v":
+ if conn.content:
+ t = conn.headers.get("content-type")
+ if "EDITOR" in os.environ or "PAGER" in os.environ:
+ self.master.spawn_external_viewer(conn.content, t)
+ else:
+ signals.status_message.send(
+ message = "Error! Set $EDITOR or $PAGER."
+ )
+ elif key == "z":
+ self.flow.backup()
+ e = conn.headers.get("content-encoding", "identity")
+ if e != "identity":
+ if not conn.decode():
+ signals.status_message.send(
+ message = "Could not decode - invalid data?"
+ )
+ else:
+ signals.status_prompt_onekey.send(
+ prompt = "Select encoding: ",
+ keys = (
+ ("gzip", "z"),
+ ("deflate", "d"),
+ ),
+ callback = self.encode_callback,
+ args = (conn,)
+ )
+ signals.flow_change.send(self, flow = self.flow)
+ return key
+
+ def encode_callback(self, key, conn):
+ encoding_map = {
+ "z": "gzip",
+ "d": "deflate",
+ }
+ conn.encode(encoding_map[key])
+ signals.flow_change.send(self, flow = self.flow)
diff --git a/mitmproxy/console/grideditor.py b/mitmproxy/console/grideditor.py
new file mode 100644
index 00000000..a11c962c
--- /dev/null
+++ b/mitmproxy/console/grideditor.py
@@ -0,0 +1,716 @@
+from __future__ import absolute_import
+
+import copy
+import re
+import os
+import urwid
+
+from netlib import odict
+from netlib.http import user_agents
+
+from . import common, signals
+from .. import utils, filt, script
+
+
+FOOTER = [
+ ('heading_key', "enter"), ":edit ",
+ ('heading_key', "q"), ":back ",
+]
+FOOTER_EDITING = [
+ ('heading_key', "esc"), ":stop editing ",
+]
+
+
+class TextColumn:
+ subeditor = None
+
+ def __init__(self, heading):
+ self.heading = heading
+
+ def text(self, obj):
+ return SEscaped(obj or "")
+
+ def blank(self):
+ return ""
+
+ def keypress(self, key, editor):
+ if key == "r":
+ if editor.walker.get_current_value() is not None:
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Read file",
+ callback = editor.read_file
+ )
+ elif key == "R":
+ if editor.walker.get_current_value() is not None:
+ signals.status_prompt_path.send(
+ editor,
+ prompt = "Read unescaped file",
+ callback = editor.read_file,
+ args = (True,)
+ )
+ elif key == "e":
+ o = editor.walker.get_current_value()
+ if o is not None:
+ n = editor.master.spawn_editor(o.encode("string-escape"))
+ n = utils.clean_hanging_newline(n)
+ editor.walker.set_current_value(n, False)
+ editor.walker._modified()
+ elif key in ["enter"]:
+ editor.walker.start_edit()
+ else:
+ return key
+
+
+class SubgridColumn:
+
+ def __init__(self, heading, subeditor):
+ self.heading = heading
+ self.subeditor = subeditor
+
+ def text(self, obj):
+ p = http_cookies._format_pairs(obj, sep="\n")
+ return urwid.Text(p)
+
+ def blank(self):
+ return []
+
+ def keypress(self, key, editor):
+ if key in "rRe":
+ signals.status_message.send(
+ self,
+ message = "Press enter to edit this field.",
+ expire = 1000
+ )
+ return
+ elif key in ["enter"]:
+ editor.master.view_grideditor(
+ self.subeditor(
+ editor.master,
+ editor.walker.get_current_value(),
+ editor.set_subeditor_value,
+ editor.walker.focus,
+ editor.walker.focus_col
+ )
+ )
+ else:
+ return key
+
+
+class SEscaped(urwid.WidgetWrap):
+
+ def __init__(self, txt):
+ txt = txt.encode("string-escape")
+ w = urwid.Text(txt, wrap="any")
+ urwid.WidgetWrap.__init__(self, w)
+
+ def get_text(self):
+ return self._w.get_text()[0]
+
+ def keypress(self, size, key):
+ return key
+
+ def selectable(self):
+ return True
+
+
+class SEdit(urwid.WidgetWrap):
+
+ def __init__(self, txt):
+ txt = txt.encode("string-escape")
+ w = urwid.Edit(edit_text=txt, wrap="any", multiline=True)
+ w = urwid.AttrWrap(w, "editfield")
+ urwid.WidgetWrap.__init__(self, w)
+
+ def get_text(self):
+ return self._w.get_text()[0].strip()
+
+ def selectable(self):
+ return True
+
+
+class GridRow(urwid.WidgetWrap):
+
+ def __init__(self, focused, editing, editor, values):
+ self.focused, self.editing, self.editor = focused, editing, editor
+
+ errors = values[1]
+ self.fields = []
+ for i, v in enumerate(values[0]):
+ if focused == i and editing:
+ self.editing = SEdit(v)
+ self.fields.append(self.editing)
+ else:
+ w = self.editor.columns[i].text(v)
+ if focused == i:
+ if i in errors:
+ w = urwid.AttrWrap(w, "focusfield_error")
+ else:
+ w = urwid.AttrWrap(w, "focusfield")
+ elif i in errors:
+ w = urwid.AttrWrap(w, "field_error")
+ self.fields.append(w)
+
+ fspecs = self.fields[:]
+ if len(self.fields) > 1:
+ fspecs[0] = ("fixed", self.editor.first_width + 2, fspecs[0])
+ w = urwid.Columns(
+ fspecs,
+ dividechars = 2
+ )
+ if focused is not None:
+ w.set_focus_column(focused)
+ urwid.WidgetWrap.__init__(self, w)
+
+ def get_edit_value(self):
+ return self.editing.get_text()
+
+ def keypress(self, s, k):
+ if self.editing:
+ w = self._w.column_widths(s)[self.focused]
+ k = self.editing.keypress((w,), k)
+ return k
+
+ def selectable(self):
+ return True
+
+
+class GridWalker(urwid.ListWalker):
+
+ """
+ Stores rows as a list of (rows, errors) tuples, where rows is a list
+ and errors is a set with an entry of each offset in rows that is an
+ error.
+ """
+
+ def __init__(self, lst, editor):
+ self.lst = [(i, set([])) for i in lst]
+ self.editor = editor
+ self.focus = 0
+ self.focus_col = 0
+ self.editing = False
+
+ def _modified(self):
+ self.editor.show_empty_msg()
+ return urwid.ListWalker._modified(self)
+
+ def add_value(self, lst):
+ self.lst.append((lst[:], set([])))
+ self._modified()
+
+ def get_current_value(self):
+ if self.lst:
+ return self.lst[self.focus][0][self.focus_col]
+
+ def set_current_value(self, val, unescaped):
+ if not unescaped:
+ try:
+ val = val.decode("string-escape")
+ except ValueError:
+ signals.status_message.send(
+ self,
+ message = "Invalid Python-style string encoding.",
+ expire = 1000
+ )
+ return
+ errors = self.lst[self.focus][1]
+ emsg = self.editor.is_error(self.focus_col, val)
+ if emsg:
+ signals.status_message.send(message = emsg, expire = 1)
+ errors.add(self.focus_col)
+ else:
+ errors.discard(self.focus_col)
+ self.set_value(val, self.focus, self.focus_col, errors)
+
+ def set_value(self, val, focus, focus_col, errors=None):
+ if not errors:
+ errors = set([])
+ row = list(self.lst[focus][0])
+ row[focus_col] = val
+ self.lst[focus] = [tuple(row), errors]
+ self._modified()
+
+ def delete_focus(self):
+ if self.lst:
+ del self.lst[self.focus]
+ self.focus = min(len(self.lst) - 1, self.focus)
+ self._modified()
+
+ def _insert(self, pos):
+ self.focus = pos
+ self.lst.insert(
+ self.focus,
+ [
+ [c.blank() for c in self.editor.columns], set([])
+ ]
+ )
+ self.focus_col = 0
+ self.start_edit()
+
+ def insert(self):
+ return self._insert(self.focus)
+
+ def add(self):
+ return self._insert(min(self.focus + 1, len(self.lst)))
+
+ def start_edit(self):
+ col = self.editor.columns[self.focus_col]
+ if self.lst and not col.subeditor:
+ self.editing = GridRow(
+ self.focus_col, True, self.editor, self.lst[self.focus]
+ )
+ self.editor.master.loop.widget.footer.update(FOOTER_EDITING)
+ self._modified()
+
+ def stop_edit(self):
+ if self.editing:
+ self.editor.master.loop.widget.footer.update(FOOTER)
+ self.set_current_value(self.editing.get_edit_value(), False)
+ self.editing = False
+ self._modified()
+
+ def left(self):
+ self.focus_col = max(self.focus_col - 1, 0)
+ self._modified()
+
+ def right(self):
+ self.focus_col = min(self.focus_col + 1, len(self.editor.columns) - 1)
+ self._modified()
+
+ def tab_next(self):
+ self.stop_edit()
+ if self.focus_col < len(self.editor.columns) - 1:
+ self.focus_col += 1
+ elif self.focus != len(self.lst) - 1:
+ self.focus_col = 0
+ self.focus += 1
+ self._modified()
+
+ def get_focus(self):
+ if self.editing:
+ return self.editing, self.focus
+ elif self.lst:
+ return GridRow(
+ self.focus_col,
+ False,
+ self.editor,
+ self.lst[self.focus]
+ ), self.focus
+ else:
+ return None, None
+
+ def set_focus(self, focus):
+ self.stop_edit()
+ self.focus = focus
+ self._modified()
+
+ def get_next(self, pos):
+ if pos + 1 >= len(self.lst):
+ return None, None
+ return GridRow(None, False, self.editor, self.lst[pos + 1]), pos + 1
+
+ def get_prev(self, pos):
+ if pos - 1 < 0:
+ return None, None
+ return GridRow(None, False, self.editor, self.lst[pos - 1]), pos - 1
+
+
+class GridListBox(urwid.ListBox):
+
+ def __init__(self, lw):
+ urwid.ListBox.__init__(self, lw)
+
+
+FIRST_WIDTH_MAX = 40
+FIRST_WIDTH_MIN = 20
+
+
+class GridEditor(urwid.WidgetWrap):
+ title = None
+ columns = None
+
+ def __init__(self, master, value, callback, *cb_args, **cb_kwargs):
+ value = self.data_in(copy.deepcopy(value))
+ self.master, self.value, self.callback = master, value, callback
+ self.cb_args, self.cb_kwargs = cb_args, cb_kwargs
+
+ first_width = 20
+ if value:
+ for r in value:
+ assert len(r) == len(self.columns)
+ first_width = max(len(r), first_width)
+ self.first_width = min(first_width, FIRST_WIDTH_MAX)
+
+ title = urwid.Text(self.title)
+ title = urwid.Padding(title, align="left", width=("relative", 100))
+ title = urwid.AttrWrap(title, "heading")
+
+ headings = []
+ for i, col in enumerate(self.columns):
+ c = urwid.Text(col.heading)
+ if i == 0 and len(self.columns) > 1:
+ headings.append(("fixed", first_width + 2, c))
+ else:
+ headings.append(c)
+ h = urwid.Columns(
+ headings,
+ dividechars = 2
+ )
+ h = urwid.AttrWrap(h, "heading")
+
+ self.walker = GridWalker(self.value, self)
+ self.lb = GridListBox(self.walker)
+ self._w = urwid.Frame(
+ self.lb,
+ header = urwid.Pile([title, h])
+ )
+ self.master.loop.widget.footer.update("")
+ self.show_empty_msg()
+
+ def show_empty_msg(self):
+ if self.walker.lst:
+ self._w.set_footer(None)
+ else:
+ self._w.set_footer(
+ urwid.Text(
+ [
+ ("highlight", "No values. Press "),
+ ("key", "a"),
+ ("highlight", " to add some."),
+ ]
+ )
+ )
+
+ def encode(self, s):
+ if not self.encoding:
+ return s
+ try:
+ return s.encode(self.encoding)
+ except ValueError:
+ return None
+
+ def read_file(self, p, unescaped=False):
+ if p:
+ try:
+ p = os.path.expanduser(p)
+ d = file(p, "rb").read()
+ self.walker.set_current_value(d, unescaped)
+ self.walker._modified()
+ except IOError as v:
+ return str(v)
+
+ def set_subeditor_value(self, val, focus, focus_col):
+ self.walker.set_value(val, focus, focus_col)
+
+ def keypress(self, size, key):
+ if self.walker.editing:
+ if key in ["esc"]:
+ self.walker.stop_edit()
+ elif key == "tab":
+ pf, pfc = self.walker.focus, self.walker.focus_col
+ self.walker.tab_next()
+ if self.walker.focus == pf and self.walker.focus_col != pfc:
+ self.walker.start_edit()
+ else:
+ self._w.keypress(size, key)
+ return None
+
+ key = common.shortcuts(key)
+ column = self.columns[self.walker.focus_col]
+ if key in ["q", "esc"]:
+ res = []
+ for i in self.walker.lst:
+ if not i[1] and any([x for x in i[0]]):
+ res.append(i[0])
+ self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs)
+ signals.pop_view_state.send(self)
+ elif key == "g":
+ self.walker.set_focus(0)
+ elif key == "G":
+ self.walker.set_focus(len(self.walker.lst) - 1)
+ elif key in ["h", "left"]:
+ self.walker.left()
+ elif key in ["l", "right"]:
+ self.walker.right()
+ elif key == "tab":
+ self.walker.tab_next()
+ elif key == "a":
+ self.walker.add()
+ elif key == "A":
+ self.walker.insert()
+ elif key == "d":
+ self.walker.delete_focus()
+ elif column.keypress(key, self) and not self.handle_key(key):
+ return self._w.keypress(size, key)
+
+ def data_out(self, data):
+ """
+ Called on raw list data, before data is returned through the
+ callback.
+ """
+ return data
+
+ def data_in(self, data):
+ """
+ Called to prepare provided data.
+ """
+ return data
+
+ def is_error(self, col, val):
+ """
+ Return False, or a string error message.
+ """
+ return False
+
+ def handle_key(self, key):
+ return False
+
+ def make_help(self):
+ text = []
+ text.append(urwid.Text([("text", "Editor control:\n")]))
+ keys = [
+ ("A", "insert row before cursor"),
+ ("a", "add row after cursor"),
+ ("d", "delete row"),
+ ("e", "spawn external editor on current field"),
+ ("q", "save changes and exit editor"),
+ ("r", "read value from file"),
+ ("R", "read unescaped value from file"),
+ ("esc", "save changes and exit editor"),
+ ("tab", "next field"),
+ ("enter", "edit field"),
+ ]
+ text.extend(
+ common.format_keyvals(keys, key="key", val="text", indent=4)
+ )
+ text.append(
+ urwid.Text(
+ [
+ "\n",
+ ("text", "Values are escaped Python-style strings.\n"),
+ ]
+ )
+ )
+ return text
+
+
+class QueryEditor(GridEditor):
+ title = "Editing query"
+ columns = [
+ TextColumn("Key"),
+ TextColumn("Value")
+ ]
+
+
+class HeaderEditor(GridEditor):
+ title = "Editing headers"
+ columns = [
+ TextColumn("Key"),
+ TextColumn("Value")
+ ]
+
+ def make_help(self):
+ h = GridEditor.make_help(self)
+ text = []
+ text.append(urwid.Text([("text", "Special keys:\n")]))
+ keys = [
+ ("U", "add User-Agent header"),
+ ]
+ text.extend(
+ common.format_keyvals(keys, key="key", val="text", indent=4)
+ )
+ text.append(urwid.Text([("text", "\n")]))
+ text.extend(h)
+ return text
+
+ def set_user_agent(self, k):
+ ua = user_agents.get_by_shortcut(k)
+ if ua:
+ self.walker.add_value(
+ [
+ "User-Agent",
+ ua[2]
+ ]
+ )
+
+ def handle_key(self, key):
+ if key == "U":
+ signals.status_prompt_onekey.send(
+ prompt = "Add User-Agent header:",
+ keys = [(i[0], i[1]) for i in user_agents.UASTRINGS],
+ callback = self.set_user_agent,
+ )
+ return True
+
+
+class URLEncodedFormEditor(GridEditor):
+ title = "Editing URL-encoded form"
+ columns = [
+ TextColumn("Key"),
+ TextColumn("Value")
+ ]
+
+
+class ReplaceEditor(GridEditor):
+ title = "Editing replacement patterns"
+ columns = [
+ TextColumn("Filter"),
+ TextColumn("Regex"),
+ TextColumn("Replacement"),
+ ]
+
+ def is_error(self, col, val):
+ if col == 0:
+ if not filt.parse(val):
+ return "Invalid filter specification."
+ elif col == 1:
+ try:
+ re.compile(val)
+ except re.error:
+ return "Invalid regular expression."
+ return False
+
+
+class SetHeadersEditor(GridEditor):
+ title = "Editing header set patterns"
+ columns = [
+ TextColumn("Filter"),
+ TextColumn("Header"),
+ TextColumn("Value"),
+ ]
+
+ def is_error(self, col, val):
+ if col == 0:
+ if not filt.parse(val):
+ return "Invalid filter specification"
+ return False
+
+ def make_help(self):
+ h = GridEditor.make_help(self)
+ text = []
+ text.append(urwid.Text([("text", "Special keys:\n")]))
+ keys = [
+ ("U", "add User-Agent header"),
+ ]
+ text.extend(
+ common.format_keyvals(keys, key="key", val="text", indent=4)
+ )
+ text.append(urwid.Text([("text", "\n")]))
+ text.extend(h)
+ return text
+
+ def set_user_agent(self, k):
+ ua = user_agents.get_by_shortcut(k)
+ if ua:
+ self.walker.add_value(
+ [
+ ".*",
+ "User-Agent",
+ ua[2]
+ ]
+ )
+
+ def handle_key(self, key):
+ if key == "U":
+ signals.status_prompt_onekey.send(
+ prompt = "Add User-Agent header:",
+ keys = [(i[0], i[1]) for i in user_agents.UASTRINGS],
+ callback = self.set_user_agent,
+ )
+ return True
+
+
+class PathEditor(GridEditor):
+ title = "Editing URL path components"
+ columns = [
+ TextColumn("Component"),
+ ]
+
+ def data_in(self, data):
+ return [[i] for i in data]
+
+ def data_out(self, data):
+ return [i[0] for i in data]
+
+
+class ScriptEditor(GridEditor):
+ title = "Editing scripts"
+ columns = [
+ TextColumn("Command"),
+ ]
+
+ def is_error(self, col, val):
+ try:
+ script.Script.parse_command(val)
+ except script.ScriptException as v:
+ return str(v)
+
+
+class HostPatternEditor(GridEditor):
+ title = "Editing host patterns"
+ columns = [
+ TextColumn("Regex (matched on hostname:port / ip:port)")
+ ]
+
+ def is_error(self, col, val):
+ try:
+ re.compile(val, re.IGNORECASE)
+ except re.error as e:
+ return "Invalid regex: %s" % str(e)
+
+ def data_in(self, data):
+ return [[i] for i in data]
+
+ def data_out(self, data):
+ return [i[0] for i in data]
+
+
+class CookieEditor(GridEditor):
+ title = "Editing request Cookie header"
+ columns = [
+ TextColumn("Name"),
+ TextColumn("Value"),
+ ]
+
+
+class CookieAttributeEditor(GridEditor):
+ title = "Editing Set-Cookie attributes"
+ columns = [
+ TextColumn("Name"),
+ TextColumn("Value"),
+ ]
+
+ def data_out(self, data):
+ ret = []
+ for i in data:
+ if not i[1]:
+ ret.append([i[0], None])
+ else:
+ ret.append(i)
+ return ret
+
+
+class SetCookieEditor(GridEditor):
+ title = "Editing response SetCookie header"
+ columns = [
+ TextColumn("Name"),
+ TextColumn("Value"),
+ SubgridColumn("Attributes", CookieAttributeEditor),
+ ]
+
+ def data_in(self, data):
+ flattened = []
+ for k, v in data.items():
+ flattened.append([k, v[0], v[1].lst])
+ return flattened
+
+ def data_out(self, data):
+ vals = []
+ for i in data:
+ vals.append(
+ [
+ i[0],
+ [i[1], odict.ODictCaseless(i[2])]
+ ]
+ )
+ return odict.ODict(vals)
diff --git a/mitmproxy/console/help.py b/mitmproxy/console/help.py
new file mode 100644
index 00000000..0c264ebf
--- /dev/null
+++ b/mitmproxy/console/help.py
@@ -0,0 +1,117 @@
+from __future__ import absolute_import
+
+import urwid
+
+from . import common, signals
+from .. import filt, version
+
+footer = [
+ ("heading", 'mitmproxy v%s ' % version.VERSION),
+ ('heading_key', "q"), ":back ",
+]
+
+
+class HelpView(urwid.ListBox):
+
+ def __init__(self, help_context):
+ self.help_context = help_context or []
+ urwid.ListBox.__init__(
+ self,
+ self.helptext()
+ )
+
+ def helptext(self):
+ text = []
+ text.append(urwid.Text([("head", "This view:\n")]))
+ text.extend(self.help_context)
+
+ text.append(urwid.Text([("head", "\n\nMovement:\n")]))
+ keys = [
+ ("j, k", "down, up"),
+ ("h, l", "left, right (in some contexts)"),
+ ("g, G", "go to beginning, end"),
+ ("space", "page down"),
+ ("pg up/down", "page up/down"),
+ ("ctrl+b/ctrl+f", "page up/down"),
+ ("arrows", "up, down, left, right"),
+ ]
+ text.extend(
+ common.format_keyvals(
+ keys,
+ key="key",
+ val="text",
+ indent=4))
+
+ text.append(urwid.Text([("head", "\n\nGlobal keys:\n")]))
+ keys = [
+ ("c", "client replay of HTTP requests"),
+ ("i", "set interception pattern"),
+ ("o", "options"),
+ ("q", "quit / return to previous page"),
+ ("Q", "quit without confirm prompt"),
+ ("S", "server replay of HTTP responses"),
+ ]
+ text.extend(
+ common.format_keyvals(keys, key="key", val="text", indent=4)
+ )
+
+ text.append(urwid.Text([("head", "\n\nFilter expressions:\n")]))
+ f = []
+ for i in filt.filt_unary:
+ f.append(
+ ("~%s" % i.code, i.help)
+ )
+ for i in filt.filt_rex:
+ f.append(
+ ("~%s regex" % i.code, i.help)
+ )
+ for i in filt.filt_int:
+ f.append(
+ ("~%s int" % i.code, i.help)
+ )
+ f.sort()
+ f.extend(
+ [
+ ("!", "unary not"),
+ ("&", "and"),
+ ("|", "or"),
+ ("(...)", "grouping"),
+ ]
+ )
+ text.extend(common.format_keyvals(f, key="key", val="text", indent=4))
+
+ text.append(
+ urwid.Text(
+ [
+ "\n",
+ ("text", " Regexes are Python-style.\n"),
+ ("text", " Regexes can be specified as quoted strings.\n"),
+ ("text", " Header matching (~h, ~hq, ~hs) is against a string of the form \"name: value\".\n"),
+ ("text", " Expressions with no operators are regex matches against URL.\n"),
+ ("text", " Default binary operator is &.\n"),
+ ("head", "\n Examples:\n"),
+ ]
+ )
+ )
+ examples = [
+ ("google\.com", "Url containing \"google.com"),
+ ("~q ~b test", "Requests where body contains \"test\""),
+ ("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."),
+ ]
+ text.extend(
+ common.format_keyvals(examples, key="key", val="text", indent=4)
+ )
+ return text
+
+ def keypress(self, size, key):
+ key = common.shortcuts(key)
+ if key == "q":
+ signals.pop_view_state.send(self)
+ return None
+ elif key == "?":
+ key = None
+ elif key == "g":
+ self.set_focus(0)
+ elif key == "G":
+ self.set_focus(len(self.body.contents))
+ return urwid.ListBox.keypress(self, size, key)
diff --git a/mitmproxy/console/options.py b/mitmproxy/console/options.py
new file mode 100644
index 00000000..5c9e0cc9
--- /dev/null
+++ b/mitmproxy/console/options.py
@@ -0,0 +1,271 @@
+import urwid
+
+from .. import contentviews
+from . import common, signals, grideditor
+from . import select, palettes
+
+footer = [
+ ('heading_key', "enter/space"), ":toggle ",
+ ('heading_key', "C"), ":clear all ",
+]
+
+
+def _mkhelp():
+ text = []
+ keys = [
+ ("enter/space", "activate option"),
+ ("C", "clear all options"),
+ ]
+ text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
+ return text
+help_context = _mkhelp()
+
+
+class Options(urwid.WidgetWrap):
+
+ def __init__(self, master):
+ self.master = master
+ self.lb = select.Select(
+ [
+ select.Heading("Traffic Manipulation"),
+ select.Option(
+ "Header Set Patterns",
+ "H",
+ lambda: master.setheaders.count(),
+ self.setheaders
+ ),
+ select.Option(
+ "Ignore Patterns",
+ "I",
+ lambda: master.server.config.check_ignore,
+ self.ignorepatterns
+ ),
+ select.Option(
+ "Replacement Patterns",
+ "R",
+ lambda: master.replacehooks.count(),
+ self.replacepatterns
+ ),
+ select.Option(
+ "Scripts",
+ "S",
+ lambda: master.scripts,
+ self.scripts
+ ),
+
+ select.Heading("Interface"),
+ select.Option(
+ "Default Display Mode",
+ "M",
+ self.has_default_displaymode,
+ self.default_displaymode
+ ),
+ select.Option(
+ "Palette",
+ "P",
+ lambda: self.master.palette != palettes.DEFAULT,
+ self.palette
+ ),
+ select.Option(
+ "Show Host",
+ "w",
+ lambda: master.showhost,
+ self.toggle_showhost
+ ),
+
+ select.Heading("Network"),
+ select.Option(
+ "No Upstream Certs",
+ "U",
+ lambda: master.server.config.no_upstream_cert,
+ self.toggle_upstream_cert
+ ),
+ select.Option(
+ "TCP Proxying",
+ "T",
+ lambda: master.server.config.check_tcp,
+ self.tcp_proxy
+ ),
+
+ select.Heading("Utility"),
+ select.Option(
+ "Anti-Cache",
+ "a",
+ lambda: master.anticache,
+ self.toggle_anticache
+ ),
+ select.Option(
+ "Anti-Compression",
+ "o",
+ lambda: master.anticomp,
+ self.toggle_anticomp
+ ),
+ select.Option(
+ "Kill Extra",
+ "x",
+ lambda: master.killextra,
+ self.toggle_killextra
+ ),
+ select.Option(
+ "No Refresh",
+ "f",
+ lambda: not master.refresh_server_playback,
+ self.toggle_refresh_server_playback
+ ),
+ select.Option(
+ "Sticky Auth",
+ "A",
+ lambda: master.stickyauth_txt,
+ self.sticky_auth
+ ),
+ select.Option(
+ "Sticky Cookies",
+ "t",
+ lambda: master.stickycookie_txt,
+ self.sticky_cookie
+ ),
+ ]
+ )
+ title = urwid.Text("Options")
+ title = urwid.Padding(title, align="left", width=("relative", 100))
+ title = urwid.AttrWrap(title, "heading")
+ self._w = urwid.Frame(
+ self.lb,
+ header = title
+ )
+ self.master.loop.widget.footer.update("")
+ signals.update_settings.connect(self.sig_update_settings)
+
+ def sig_update_settings(self, sender):
+ self.lb.walker._modified()
+
+ def keypress(self, size, key):
+ if key == "C":
+ self.clearall()
+ return None
+ return super(self.__class__, self).keypress(size, key)
+
+ def clearall(self):
+ self.master.anticache = False
+ self.master.anticomp = False
+ self.master.killextra = False
+ self.master.showhost = False
+ self.master.refresh_server_playback = True
+ self.master.server.config.no_upstream_cert = False
+ self.master.setheaders.clear()
+ self.master.replacehooks.clear()
+ self.master.set_ignore_filter([])
+ self.master.set_tcp_filter([])
+ self.master.scripts = []
+ self.master.set_stickyauth(None)
+ self.master.set_stickycookie(None)
+ self.master.state.default_body_view = contentviews.get("Auto")
+
+ signals.update_settings.send(self)
+ signals.status_message.send(
+ message = "All select.Options cleared",
+ expire = 1
+ )
+
+ def toggle_anticache(self):
+ self.master.anticache = not self.master.anticache
+
+ def toggle_anticomp(self):
+ self.master.anticomp = not self.master.anticomp
+
+ def toggle_killextra(self):
+ self.master.killextra = not self.master.killextra
+
+ def toggle_showhost(self):
+ self.master.showhost = not self.master.showhost
+
+ def toggle_refresh_server_playback(self):
+ self.master.refresh_server_playback = not self.master.refresh_server_playback
+
+ def toggle_upstream_cert(self):
+ self.master.server.config.no_upstream_cert = not self.master.server.config.no_upstream_cert
+ signals.update_settings.send(self)
+
+ def setheaders(self):
+ def _set(*args, **kwargs):
+ self.master.setheaders.set(*args, **kwargs)
+ signals.update_settings.send(self)
+ self.master.view_grideditor(
+ grideditor.SetHeadersEditor(
+ self.master,
+ self.master.setheaders.get_specs(),
+ _set
+ )
+ )
+
+ def ignorepatterns(self):
+ def _set(ignore):
+ self.master.set_ignore_filter(ignore)
+ signals.update_settings.send(self)
+ self.master.view_grideditor(
+ grideditor.HostPatternEditor(
+ self.master,
+ self.master.get_ignore_filter(),
+ _set
+ )
+ )
+
+ def replacepatterns(self):
+ def _set(*args, **kwargs):
+ self.master.replacehooks.set(*args, **kwargs)
+ signals.update_settings.send(self)
+ self.master.view_grideditor(
+ grideditor.ReplaceEditor(
+ self.master,
+ self.master.replacehooks.get_specs(),
+ _set
+ )
+ )
+
+ def scripts(self):
+ self.master.view_grideditor(
+ grideditor.ScriptEditor(
+ self.master,
+ [[i.command] for i in self.master.scripts],
+ self.master.edit_scripts
+ )
+ )
+
+ def default_displaymode(self):
+ signals.status_prompt_onekey.send(
+ prompt = "Global default display mode",
+ keys = contentviews.view_prompts,
+ callback = self.master.change_default_display_mode
+ )
+
+ def has_default_displaymode(self):
+ return self.master.state.default_body_view.name != "Auto"
+
+ def tcp_proxy(self):
+ def _set(tcp):
+ self.master.set_tcp_filter(tcp)
+ signals.update_settings.send(self)
+ self.master.view_grideditor(
+ grideditor.HostPatternEditor(
+ self.master,
+ self.master.get_tcp_filter(),
+ _set
+ )
+ )
+
+ def sticky_auth(self):
+ signals.status_prompt.send(
+ prompt = "Sticky auth filter",
+ text = self.master.stickyauth_txt,
+ callback = self.master.set_stickyauth
+ )
+
+ def sticky_cookie(self):
+ signals.status_prompt.send(
+ prompt = "Sticky cookie filter",
+ text = self.master.stickycookie_txt,
+ callback = self.master.set_stickycookie
+ )
+
+ def palette(self):
+ self.master.view_palette_picker()
diff --git a/mitmproxy/console/palettepicker.py b/mitmproxy/console/palettepicker.py
new file mode 100644
index 00000000..51ad0606
--- /dev/null
+++ b/mitmproxy/console/palettepicker.py
@@ -0,0 +1,82 @@
+import urwid
+
+from . import select, common, palettes, signals
+
+footer = [
+ ('heading_key', "enter/space"), ":select",
+]
+
+
+def _mkhelp():
+ text = []
+ keys = [
+ ("enter/space", "select"),
+ ]
+ text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
+ return text
+help_context = _mkhelp()
+
+
+class PalettePicker(urwid.WidgetWrap):
+
+ def __init__(self, master):
+ self.master = master
+ low, high = [], []
+ for k, v in palettes.palettes.items():
+ if v.high:
+ high.append(k)
+ else:
+ low.append(k)
+ high.sort()
+ low.sort()
+
+ options = [
+ select.Heading("High Colour")
+ ]
+
+ def mkopt(name):
+ return select.Option(
+ i,
+ None,
+ lambda: self.master.palette == name,
+ lambda: self.select(name)
+ )
+
+ for i in high:
+ options.append(mkopt(i))
+ options.append(select.Heading("Low Colour"))
+ for i in low:
+ options.append(mkopt(i))
+
+ options.extend(
+ [
+ select.Heading("Options"),
+ select.Option(
+ "Transparent",
+ "T",
+ lambda: master.palette_transparent,
+ self.toggle_palette_transparent
+ )
+ ]
+ )
+
+ self.lb = select.Select(options)
+ title = urwid.Text("Palettes")
+ title = urwid.Padding(title, align="left", width=("relative", 100))
+ title = urwid.AttrWrap(title, "heading")
+ self._w = urwid.Frame(
+ self.lb,
+ header = title
+ )
+ signals.update_settings.connect(self.sig_update_settings)
+
+ def sig_update_settings(self, sender):
+ self.lb.walker._modified()
+
+ def select(self, name):
+ self.master.set_palette(name)
+
+ def toggle_palette_transparent(self):
+ self.master.palette_transparent = not self.master.palette_transparent
+ self.master.set_palette(self.master.palette)
+ signals.update_settings.send(self)
diff --git a/mitmproxy/console/palettes.py b/mitmproxy/console/palettes.py
new file mode 100644
index 00000000..bd370181
--- /dev/null
+++ b/mitmproxy/console/palettes.py
@@ -0,0 +1,326 @@
+# Low-color themes should ONLY use the standard foreground and background
+# colours listed here:
+#
+# http://urwid.org/manual/displayattributes.html
+#
+
+
+class Palette:
+ _fields = [
+ 'background',
+ 'title',
+
+ # Status bar & heading
+ 'heading', 'heading_key', 'heading_inactive',
+
+ # Help
+ 'key', 'head', 'text',
+
+ # Options
+ 'option_selected', 'option_active', 'option_active_selected',
+ 'option_selected_key',
+
+ # List and Connections
+ 'method', 'focus',
+ 'code_200', 'code_300', 'code_400', 'code_500', 'code_other',
+ 'error',
+ 'header', 'highlight', 'intercept', 'replay', 'mark',
+
+ # Hex view
+ 'offset',
+
+ # Grid Editor
+ 'focusfield', 'focusfield_error', 'field_error', 'editfield',
+ ]
+ high = None
+
+ def palette(self, transparent):
+ l = []
+ highback, lowback = None, None
+ if not transparent:
+ if self.high and self.high.get("background"):
+ highback = self.high["background"][1]
+ lowback = self.low["background"][1]
+
+ for i in self._fields:
+ if transparent and i == "background":
+ l.append(["background", "default", "default"])
+ else:
+ v = [i]
+ low = list(self.low[i])
+ if lowback and low[1] == "default":
+ low[1] = lowback
+ v.extend(low)
+ if self.high and i in self.high:
+ v.append(None)
+ high = list(self.high[i])
+ if highback and high[1] == "default":
+ high[1] = highback
+ v.extend(high)
+ elif highback and self.low[i][1] == "default":
+ high = [None, low[0], highback]
+ v.extend(high)
+ l.append(tuple(v))
+ return l
+
+
+class LowDark(Palette):
+
+ """
+ Low-color dark background
+ """
+ low = dict(
+ background = ('white', 'black'),
+ title = ('white,bold', 'default'),
+
+ # Status bar & heading
+ heading = ('white', 'dark blue'),
+ heading_key = ('light cyan', 'dark blue'),
+ heading_inactive = ('dark gray', 'light gray'),
+
+ # Help
+ key = ('light cyan', 'default'),
+ head = ('white,bold', 'default'),
+ text = ('light gray', 'default'),
+
+ # Options
+ option_selected = ('black', 'light gray'),
+ option_selected_key = ('light cyan', 'light gray'),
+ option_active = ('light red', 'default'),
+ option_active_selected = ('light red', 'light gray'),
+
+ # List and Connections
+ method = ('dark cyan', 'default'),
+ focus = ('yellow', 'default'),
+
+ code_200 = ('dark green', 'default'),
+ code_300 = ('light blue', 'default'),
+ code_400 = ('light red', 'default'),
+ code_500 = ('light red', 'default'),
+ code_other = ('dark red', 'default'),
+
+ error = ('light red', 'default'),
+
+ header = ('dark cyan', 'default'),
+ highlight = ('white,bold', 'default'),
+ intercept = ('brown', 'default'),
+ replay = ('light green', 'default'),
+ mark = ('light red', 'default'),
+
+ # Hex view
+ offset = ('dark cyan', 'default'),
+
+ # Grid Editor
+ focusfield = ('black', 'light gray'),
+ focusfield_error = ('dark red', 'light gray'),
+ field_error = ('dark red', 'default'),
+ editfield = ('white', 'default'),
+ )
+
+
+class Dark(LowDark):
+ high = dict(
+ heading_inactive = ('g58', 'g11'),
+ intercept = ('#f60', 'default'),
+
+ option_selected = ('g85', 'g45'),
+ option_selected_key = ('light cyan', 'g50'),
+ option_active_selected = ('light red', 'g50'),
+ )
+
+
+class LowLight(Palette):
+
+ """
+ Low-color light background
+ """
+ low = dict(
+ background = ('black', 'white'),
+ title = ('dark magenta', 'default'),
+
+ # Status bar & heading
+ heading = ('white', 'black'),
+ heading_key = ('dark blue', 'black'),
+ heading_inactive = ('black', 'light gray'),
+
+ # Help
+ key = ('dark blue', 'default'),
+ head = ('black', 'default'),
+ text = ('dark gray', 'default'),
+
+ # Options
+ option_selected = ('black', 'light gray'),
+ option_selected_key = ('dark blue', 'light gray'),
+ option_active = ('light red', 'default'),
+ option_active_selected = ('light red', 'light gray'),
+
+ # List and Connections
+ method = ('dark cyan', 'default'),
+ focus = ('black', 'default'),
+
+ code_200 = ('dark green', 'default'),
+ code_300 = ('light blue', 'default'),
+ code_400 = ('dark red', 'default'),
+ code_500 = ('dark red', 'default'),
+ code_other = ('light red', 'default'),
+
+ error = ('light red', 'default'),
+
+ header = ('dark blue', 'default'),
+ highlight = ('black,bold', 'default'),
+ intercept = ('brown', 'default'),
+ replay = ('dark green', 'default'),
+ mark = ('dark red', 'default'),
+
+ # Hex view
+ offset = ('dark blue', 'default'),
+
+ # Grid Editor
+ focusfield = ('black', 'light gray'),
+ focusfield_error = ('dark red', 'light gray'),
+ field_error = ('dark red', 'black'),
+ editfield = ('black', 'default'),
+ )
+
+
+class Light(LowLight):
+ high = dict(
+ background = ('black', 'g100'),
+ heading = ('g99', '#08f'),
+ heading_key = ('#0ff,bold', '#08f'),
+ heading_inactive = ('g35', 'g85'),
+ replay = ('#0a0,bold', 'default'),
+
+ option_selected = ('black', 'g85'),
+ option_selected_key = ('dark blue', 'g85'),
+ option_active_selected = ('light red', 'g85'),
+ )
+
+
+# Solarized palette in Urwid-style terminal high-colour offsets
+# See: http://ethanschoonover.com/solarized
+sol_base03 = "h234"
+sol_base02 = "h235"
+sol_base01 = "h240"
+sol_base00 = "h241"
+sol_base0 = "h244"
+sol_base1 = "h245"
+sol_base2 = "h254"
+sol_base3 = "h230"
+sol_yellow = "h136"
+sol_orange = "h166"
+sol_red = "h160"
+sol_magenta = "h125"
+sol_violet = "h61"
+sol_blue = "h33"
+sol_cyan = "h37"
+sol_green = "h64"
+
+
+class SolarizedLight(LowLight):
+ high = dict(
+ background = (sol_base00, sol_base3),
+ title = (sol_cyan, 'default'),
+ text = (sol_base00, 'default'),
+
+ # Status bar & heading
+ heading = (sol_base2, sol_base02),
+ heading_key = (sol_blue, sol_base03),
+ heading_inactive = (sol_base03, sol_base1),
+
+ # Help
+ key = (sol_blue, 'default',),
+ head = (sol_base00, 'default'),
+
+ # Options
+ option_selected = (sol_base03, sol_base2),
+ option_selected_key = (sol_blue, sol_base2),
+ option_active = (sol_orange, 'default'),
+ option_active_selected = (sol_orange, sol_base2),
+
+ # List and Connections
+ method = (sol_cyan, 'default'),
+ focus = (sol_base01, 'default'),
+
+ code_200 = (sol_green, 'default'),
+ code_300 = (sol_blue, 'default'),
+ code_400 = (sol_orange, 'default',),
+ code_500 = (sol_red, 'default'),
+ code_other = (sol_magenta, 'default'),
+
+ error = (sol_red, 'default'),
+
+ header = (sol_blue, 'default'),
+ highlight = (sol_base01, 'default'),
+ intercept = (sol_red, 'default',),
+ replay = (sol_green, 'default',),
+
+ # Hex view
+ offset = (sol_cyan, 'default'),
+
+ # Grid Editor
+ focusfield = (sol_base00, sol_base2),
+ focusfield_error = (sol_red, sol_base2),
+ field_error = (sol_red, 'default'),
+ editfield = (sol_base01, 'default'),
+ )
+
+
+class SolarizedDark(LowDark):
+ high = dict(
+ background = (sol_base2, sol_base03),
+ title = (sol_blue, 'default'),
+ text = (sol_base1, 'default'),
+
+ # Status bar & heading
+ heading = (sol_base2, sol_base01),
+ heading_key = (sol_blue + ",bold", sol_base01),
+ heading_inactive = (sol_base1, sol_base02),
+
+ # Help
+ key = (sol_blue, 'default',),
+ head = (sol_base2, 'default'),
+
+ # Options
+ option_selected = (sol_base03, sol_base00),
+ option_selected_key = (sol_blue, sol_base00),
+ option_active = (sol_orange, 'default'),
+ option_active_selected = (sol_orange, sol_base00),
+
+ # List and Connections
+ method = (sol_cyan, 'default'),
+ focus = (sol_base1, 'default'),
+
+ code_200 = (sol_green, 'default'),
+ code_300 = (sol_blue, 'default'),
+ code_400 = (sol_orange, 'default',),
+ code_500 = (sol_red, 'default'),
+ code_other = (sol_magenta, 'default'),
+
+ error = (sol_red, 'default'),
+
+ header = (sol_blue, 'default'),
+ highlight = (sol_base01, 'default'),
+ intercept = (sol_red, 'default',),
+ replay = (sol_green, 'default',),
+
+ # Hex view
+ offset = (sol_cyan, 'default'),
+
+ # Grid Editor
+ focusfield = (sol_base0, sol_base02),
+ focusfield_error = (sol_red, sol_base02),
+ field_error = (sol_red, 'default'),
+ editfield = (sol_base1, 'default'),
+ )
+
+
+DEFAULT = "dark"
+palettes = {
+ "lowlight": LowLight(),
+ "lowdark": LowDark(),
+ "light": Light(),
+ "dark": Dark(),
+ "solarized_light": SolarizedLight(),
+ "solarized_dark": SolarizedDark(),
+}
diff --git a/mitmproxy/console/pathedit.py b/mitmproxy/console/pathedit.py
new file mode 100644
index 00000000..4447070b
--- /dev/null
+++ b/mitmproxy/console/pathedit.py
@@ -0,0 +1,71 @@
+import glob
+import os.path
+
+import urwid
+
+
+class _PathCompleter:
+
+ def __init__(self, _testing=False):
+ """
+ _testing: disables reloading of the lookup table to make testing
+ possible.
+ """
+ self.lookup, self.offset = None, None
+ self.final = None
+ self._testing = _testing
+
+ def reset(self):
+ self.lookup = None
+ self.offset = -1
+
+ def complete(self, txt):
+ """
+ Returns the next completion for txt, or None if there is no
+ completion.
+ """
+ path = os.path.expanduser(txt)
+ if not self.lookup:
+ if not self._testing:
+ # Lookup is a set of (display value, actual value) tuples.
+ self.lookup = []
+ if os.path.isdir(path):
+ files = glob.glob(os.path.join(path, "*"))
+ prefix = txt
+ else:
+ files = glob.glob(path + "*")
+ prefix = os.path.dirname(txt)
+ prefix = prefix or "./"
+ for f in files:
+ display = os.path.join(prefix, os.path.basename(f))
+ if os.path.isdir(f):
+ display += "/"
+ self.lookup.append((display, f))
+ if not self.lookup:
+ self.final = path
+ return path
+ self.lookup.sort()
+ self.offset = -1
+ self.lookup.append((txt, txt))
+ self.offset += 1
+ if self.offset >= len(self.lookup):
+ self.offset = 0
+ ret = self.lookup[self.offset]
+ self.final = ret[1]
+ return ret[0]
+
+
+class PathEdit(urwid.Edit, _PathCompleter):
+
+ def __init__(self, *args, **kwargs):
+ urwid.Edit.__init__(self, *args, **kwargs)
+ _PathCompleter.__init__(self)
+
+ def keypress(self, size, key):
+ if key == "tab":
+ comp = self.complete(self.get_edit_text())
+ self.set_edit_text(comp)
+ self.set_edit_pos(len(comp))
+ else:
+ self.reset()
+ return urwid.Edit.keypress(self, size, key)
diff --git a/mitmproxy/console/searchable.py b/mitmproxy/console/searchable.py
new file mode 100644
index 00000000..cff1f0a1
--- /dev/null
+++ b/mitmproxy/console/searchable.py
@@ -0,0 +1,93 @@
+import urwid
+
+from . import signals
+
+
+class Highlight(urwid.AttrMap):
+
+ def __init__(self, t):
+ urwid.AttrMap.__init__(
+ self,
+ urwid.Text(t.text),
+ "focusfield",
+ )
+ self.backup = t
+
+
+class Searchable(urwid.ListBox):
+
+ def __init__(self, state, contents):
+ self.walker = urwid.SimpleFocusListWalker(contents)
+ urwid.ListBox.__init__(self, self.walker)
+ self.state = state
+ self.search_offset = 0
+ self.current_highlight = None
+ self.search_term = None
+
+ def keypress(self, size, key):
+ if key == "/":
+ signals.status_prompt.send(
+ prompt = "Search for",
+ text = "",
+ callback = self.set_search
+ )
+ elif key == "n":
+ self.find_next(False)
+ elif key == "N":
+ self.find_next(True)
+ elif key == "g":
+ self.set_focus(0)
+ self.walker._modified()
+ elif key == "G":
+ self.set_focus(len(self.walker) - 1)
+ self.walker._modified()
+ else:
+ return super(self.__class__, self).keypress(size, key)
+
+ def set_search(self, text):
+ self.state.last_search = text
+ self.search_term = text or None
+ self.find_next(False)
+
+ def set_highlight(self, offset):
+ if self.current_highlight is not None:
+ old = self.body[self.current_highlight]
+ self.body[self.current_highlight] = old.backup
+ if offset is None:
+ self.current_highlight = None
+ else:
+ self.body[offset] = Highlight(self.body[offset])
+ self.current_highlight = offset
+
+ def get_text(self, w):
+ if isinstance(w, urwid.Text):
+ return w.text
+ elif isinstance(w, Highlight):
+ return w.backup.text
+ else:
+ return None
+
+ def find_next(self, backwards):
+ if not self.search_term:
+ if self.state.last_search:
+ self.search_term = self.state.last_search
+ else:
+ self.set_highlight(None)
+ return
+ # Start search at focus + 1
+ if backwards:
+ rng = xrange(len(self.body) - 1, -1, -1)
+ else:
+ rng = xrange(1, len(self.body) + 1)
+ for i in rng:
+ off = (self.focus_position + i) % len(self.body)
+ w = self.body[off]
+ txt = self.get_text(w)
+ if txt and self.search_term in txt:
+ self.set_highlight(off)
+ self.set_focus(off, coming_from="above")
+ self.body._modified()
+ return
+ else:
+ self.set_highlight(None)
+ signals.status_message.send(message="Search not found.", expire=1)
diff --git a/mitmproxy/console/select.py b/mitmproxy/console/select.py
new file mode 100644
index 00000000..928a7ca5
--- /dev/null
+++ b/mitmproxy/console/select.py
@@ -0,0 +1,120 @@
+import urwid
+
+from . import common
+
+
+class _OptionWidget(urwid.WidgetWrap):
+
+ def __init__(self, option, text, shortcut, active, focus):
+ self.option = option
+ textattr = "text"
+ keyattr = "key"
+ if focus and active:
+ textattr = "option_active_selected"
+ keyattr = "option_selected_key"
+ elif focus:
+ textattr = "option_selected"
+ keyattr = "option_selected_key"
+ elif active:
+ textattr = "option_active"
+ if shortcut:
+ text = common.highlight_key(
+ text,
+ shortcut,
+ textattr = textattr,
+ keyattr = keyattr
+ )
+ opt = urwid.Text(text, align="left")
+ opt = urwid.AttrWrap(opt, textattr)
+ opt = urwid.Padding(opt, align = "center", width = 40)
+ urwid.WidgetWrap.__init__(self, opt)
+
+ def keypress(self, size, key):
+ return key
+
+ def selectable(self):
+ return True
+
+
+class OptionWalker(urwid.ListWalker):
+
+ def __init__(self, options):
+ urwid.ListWalker.__init__(self)
+ self.options = options
+ self.focus = 0
+
+ def set_focus(self, pos):
+ self.focus = pos
+
+ def get_focus(self):
+ return self.options[self.focus].render(True), self.focus
+
+ def get_next(self, pos):
+ if pos >= len(self.options) - 1:
+ return None, None
+ return self.options[pos + 1].render(False), pos + 1
+
+ def get_prev(self, pos):
+ if pos <= 0:
+ return None, None
+ return self.options[pos - 1].render(False), pos - 1
+
+
+class Heading:
+
+ def __init__(self, text):
+ self.text = text
+
+ def render(self, focus):
+ opt = urwid.Text("\n" + self.text, align="left")
+ opt = urwid.AttrWrap(opt, "title")
+ opt = urwid.Padding(opt, align = "center", width = 40)
+ return opt
+
+
+_neg = lambda: False
+
+
+class Option:
+
+ def __init__(self, text, shortcut, getstate=None, activate=None):
+ self.text = text
+ self.shortcut = shortcut
+ self.getstate = getstate or _neg
+ self.activate = activate or _neg
+
+ def render(self, focus):
+ return _OptionWidget(
+ self,
+ self.text,
+ self.shortcut,
+ self.getstate(),
+ focus)
+
+
+class Select(urwid.ListBox):
+
+ def __init__(self, options):
+ self.walker = OptionWalker(options)
+ urwid.ListBox.__init__(
+ self,
+ self.walker
+ )
+ self.options = options
+ self.keymap = {}
+ for i in options:
+ if hasattr(i, "shortcut") and i.shortcut:
+ if i.shortcut in self.keymap:
+ raise ValueError("Duplicate shortcut key: %s" % i.shortcut)
+ self.keymap[i.shortcut] = i
+
+ def keypress(self, size, key):
+ if key == "enter" or key == " ":
+ self.get_focus()[0].option.activate()
+ return None
+ key = common.shortcuts(key)
+ if key in self.keymap:
+ self.keymap[key].activate()
+ self.set_focus(self.options.index(self.keymap[key]))
+ return None
+ return super(self.__class__, self).keypress(size, key)
diff --git a/mitmproxy/console/signals.py b/mitmproxy/console/signals.py
new file mode 100644
index 00000000..6a439bf3
--- /dev/null
+++ b/mitmproxy/console/signals.py
@@ -0,0 +1,43 @@
+import blinker
+
+# Show a status message in the action bar
+sig_add_event = blinker.Signal()
+
+
+def add_event(e, level):
+ sig_add_event.send(
+ None,
+ e=e,
+ level=level
+ )
+
+# Show a status message in the action bar
+status_message = blinker.Signal()
+
+# Prompt for input
+status_prompt = blinker.Signal()
+
+# Prompt for a path
+status_prompt_path = blinker.Signal()
+
+# Prompt for a single keystroke
+status_prompt_onekey = blinker.Signal()
+
+# Call a callback in N seconds
+call_in = blinker.Signal()
+
+# Focus the body, footer or header of the main window
+focus = blinker.Signal()
+
+# Fired when settings change
+update_settings = blinker.Signal()
+
+# Fired when a flow changes
+flow_change = blinker.Signal()
+
+# Fired when the flow list or focus changes
+flowlist_change = blinker.Signal()
+
+# Pop and push view state onto a stack
+pop_view_state = blinker.Signal()
+push_view_state = blinker.Signal()
diff --git a/mitmproxy/console/statusbar.py b/mitmproxy/console/statusbar.py
new file mode 100644
index 00000000..4cc63a54
--- /dev/null
+++ b/mitmproxy/console/statusbar.py
@@ -0,0 +1,258 @@
+import os.path
+
+import urwid
+
+import netlib.utils
+from . import pathedit, signals, common
+
+
+class ActionBar(urwid.WidgetWrap):
+
+ def __init__(self):
+ urwid.WidgetWrap.__init__(self, None)
+ self.clear()
+ signals.status_message.connect(self.sig_message)
+ signals.status_prompt.connect(self.sig_prompt)
+ signals.status_prompt_path.connect(self.sig_path_prompt)
+ signals.status_prompt_onekey.connect(self.sig_prompt_onekey)
+
+ self.last_path = ""
+
+ self.prompting = False
+ self.onekey = False
+ self.pathprompt = False
+
+ def sig_message(self, sender, message, expire=None):
+ w = urwid.Text(message)
+ self._w = w
+ self.prompting = False
+ if expire:
+ def cb(*args):
+ if w == self._w:
+ self.clear()
+ signals.call_in.send(seconds=expire, callback=cb)
+
+ def prep_prompt(self, p):
+ return p.strip() + ": "
+
+ def sig_prompt(self, sender, prompt, text, callback, args=()):
+ signals.focus.send(self, section="footer")
+ self._w = urwid.Edit(self.prep_prompt(prompt), text or "")
+ self.prompting = (callback, args)
+
+ def sig_path_prompt(self, sender, prompt, callback, args=()):
+ signals.focus.send(self, section="footer")
+ self._w = pathedit.PathEdit(
+ self.prep_prompt(prompt),
+ os.path.dirname(self.last_path)
+ )
+ self.pathprompt = True
+ self.prompting = (callback, args)
+
+ def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()):
+ """
+ Keys are a set of (word, key) tuples. The appropriate key in the
+ word is highlighted.
+ """
+ signals.focus.send(self, section="footer")
+ prompt = [prompt, " ("]
+ mkup = []
+ for i, e in enumerate(keys):
+ mkup.extend(common.highlight_key(e[0], e[1]))
+ if i < len(keys) - 1:
+ mkup.append(",")
+ prompt.extend(mkup)
+ prompt.append(")? ")
+ self.onekey = set(i[1] for i in keys)
+ self._w = urwid.Edit(prompt, "")
+ self.prompting = (callback, args)
+
+ def selectable(self):
+ return True
+
+ def keypress(self, size, k):
+ if self.prompting:
+ if k == "esc":
+ self.prompt_done()
+ elif self.onekey:
+ if k == "enter":
+ self.prompt_done()
+ elif k in self.onekey:
+ self.prompt_execute(k)
+ elif k == "enter":
+ self.prompt_execute(self._w.get_edit_text())
+ else:
+ if common.is_keypress(k):
+ self._w.keypress(size, k)
+ else:
+ return k
+
+ def clear(self):
+ self._w = urwid.Text("")
+ self.prompting = False
+
+ def prompt_done(self):
+ self.prompting = False
+ self.onekey = False
+ self.pathprompt = False
+ signals.status_message.send(message="")
+ signals.focus.send(self, section="body")
+
+ def prompt_execute(self, txt):
+ if self.pathprompt:
+ self.last_path = txt
+ p, args = self.prompting
+ self.prompt_done()
+ msg = p(txt, *args)
+ if msg:
+ signals.status_message.send(message=msg, expire=1)
+
+
+class StatusBar(urwid.WidgetWrap):
+
+ def __init__(self, master, helptext):
+ self.master, self.helptext = master, helptext
+ self.ab = ActionBar()
+ self.ib = urwid.WidgetWrap(urwid.Text(""))
+ self._w = urwid.Pile([self.ib, self.ab])
+ signals.update_settings.connect(self.sig_update_settings)
+ signals.flowlist_change.connect(self.sig_update_settings)
+ self.redraw()
+
+ def sig_update_settings(self, sender):
+ self.redraw()
+
+ def keypress(self, *args, **kwargs):
+ return self.ab.keypress(*args, **kwargs)
+
+ def get_status(self):
+ r = []
+
+ if self.master.setheaders.count():
+ r.append("[")
+ r.append(("heading_key", "H"))
+ r.append("eaders]")
+ if self.master.replacehooks.count():
+ r.append("[")
+ r.append(("heading_key", "R"))
+ r.append("eplacing]")
+ if self.master.client_playback:
+ r.append("[")
+ r.append(("heading_key", "cplayback"))
+ r.append(":%s to go]" % self.master.client_playback.count())
+ if self.master.server_playback:
+ r.append("[")
+ r.append(("heading_key", "splayback"))
+ if self.master.nopop:
+ r.append(":%s in file]" % self.master.server_playback.count())
+ else:
+ r.append(":%s to go]" % self.master.server_playback.count())
+ if self.master.get_ignore_filter():
+ r.append("[")
+ r.append(("heading_key", "I"))
+ r.append("gnore:%d]" % len(self.master.get_ignore_filter()))
+ if self.master.get_tcp_filter():
+ r.append("[")
+ r.append(("heading_key", "T"))
+ r.append("CP:%d]" % len(self.master.get_tcp_filter()))
+ if self.master.state.intercept_txt:
+ r.append("[")
+ r.append(("heading_key", "i"))
+ r.append(":%s]" % self.master.state.intercept_txt)
+ if self.master.state.limit_txt:
+ r.append("[")
+ r.append(("heading_key", "l"))
+ r.append(":%s]" % self.master.state.limit_txt)
+ if self.master.stickycookie_txt:
+ r.append("[")
+ r.append(("heading_key", "t"))
+ r.append(":%s]" % self.master.stickycookie_txt)
+ if self.master.stickyauth_txt:
+ r.append("[")
+ r.append(("heading_key", "u"))
+ r.append(":%s]" % self.master.stickyauth_txt)
+ if self.master.state.default_body_view.name != "Auto":
+ r.append("[")
+ r.append(("heading_key", "M"))
+ r.append(":%s]" % self.master.state.default_body_view.name)
+
+ opts = []
+ if self.master.anticache:
+ opts.append("anticache")
+ if self.master.anticomp:
+ opts.append("anticomp")
+ if self.master.showhost:
+ opts.append("showhost")
+ if not self.master.refresh_server_playback:
+ opts.append("norefresh")
+ if self.master.killextra:
+ opts.append("killextra")
+ if self.master.server.config.no_upstream_cert:
+ opts.append("no-upstream-cert")
+ if self.master.state.follow_focus:
+ opts.append("following")
+ if self.master.stream_large_bodies:
+ opts.append(
+ "stream:%s" % netlib.utils.pretty_size(
+ self.master.stream_large_bodies.max_size
+ )
+ )
+
+ if opts:
+ r.append("[%s]" % (":".join(opts)))
+
+ if self.master.server.config.mode in ["reverse", "upstream"]:
+ dst = self.master.server.config.upstream_server
+ r.append("[dest:%s]" % netlib.utils.unparse_url(
+ dst.scheme,
+ dst.address.host,
+ dst.address.port
+ ))
+ if self.master.scripts:
+ r.append("[")
+ r.append(("heading_key", "s"))
+ r.append("cripts:%s]" % len(self.master.scripts))
+ # r.append("[lt:%0.3f]"%self.master.looptime)
+
+ if self.master.stream:
+ r.append("[W:%s]" % self.master.stream_path)
+
+ return r
+
+ def redraw(self):
+ fc = self.master.state.flow_count()
+ if self.master.state.focus is None:
+ offset = 0
+ else:
+ offset = min(self.master.state.focus + 1, fc)
+ t = [
+ ('heading', ("[%s/%s]" % (offset, fc)).ljust(9))
+ ]
+
+ if self.master.server.bound:
+ host = self.master.server.address.host
+ if host == "0.0.0.0":
+ host = "*"
+ boundaddr = "[%s:%s]" % (host, self.master.server.address.port)
+ else:
+ boundaddr = ""
+ t.extend(self.get_status())
+ status = urwid.AttrWrap(urwid.Columns([
+ urwid.Text(t),
+ urwid.Text(
+ [
+ self.helptext,
+ boundaddr
+ ],
+ align="right"
+ ),
+ ]), "heading")
+ self.ib._w = status
+
+ def update(self, text):
+ self.helptext = text
+ self.redraw()
+ self.master.loop.draw_screen()
+
+ def selectable(self):
+ return True
diff --git a/mitmproxy/console/tabs.py b/mitmproxy/console/tabs.py
new file mode 100644
index 00000000..b5423038
--- /dev/null
+++ b/mitmproxy/console/tabs.py
@@ -0,0 +1,70 @@
+import urwid
+
+
+class Tab(urwid.WidgetWrap):
+
+ def __init__(self, offset, content, attr, onclick):
+ """
+ onclick is called on click with the tab offset as argument
+ """
+ p = urwid.Text(content, align="center")
+ p = urwid.Padding(p, align="center", width=("relative", 100))
+ p = urwid.AttrWrap(p, attr)
+ urwid.WidgetWrap.__init__(self, p)
+ self.offset = offset
+ self.onclick = onclick
+
+ def mouse_event(self, size, event, button, col, row, focus):
+ if event == "mouse press" and button == 1:
+ self.onclick(self.offset)
+ return True
+
+
+class Tabs(urwid.WidgetWrap):
+
+ def __init__(self, tabs, tab_offset=0):
+ urwid.WidgetWrap.__init__(self, "")
+ self.tab_offset = tab_offset
+ self.tabs = tabs
+ self.show()
+
+ def change_tab(self, offset):
+ self.tab_offset = offset
+ self.show()
+
+ def keypress(self, size, key):
+ n = len(self.tabs)
+ if key in ["tab", "l"]:
+ self.change_tab((self.tab_offset + 1) % n)
+ elif key == "h":
+ self.change_tab((self.tab_offset - 1) % n)
+ return self._w.keypress(size, key)
+
+ def show(self):
+ headers = []
+ for i in range(len(self.tabs)):
+ txt = self.tabs[i][0]()
+ if i == self.tab_offset:
+ headers.append(
+ Tab(
+ i,
+ txt,
+ "heading",
+ self.change_tab
+ )
+ )
+ else:
+ headers.append(
+ Tab(
+ i,
+ txt,
+ "heading_inactive",
+ self.change_tab
+ )
+ )
+ headers = urwid.Columns(headers, dividechars=1)
+ self._w = urwid.Frame(
+ body = self.tabs[self.tab_offset][1](),
+ header = headers
+ )
+ self._w.set_focus("body")
diff --git a/mitmproxy/console/window.py b/mitmproxy/console/window.py
new file mode 100644
index 00000000..47c284e4
--- /dev/null
+++ b/mitmproxy/console/window.py
@@ -0,0 +1,90 @@
+import urwid
+from . import signals
+
+
+class Window(urwid.Frame):
+
+ def __init__(self, master, body, header, footer, helpctx):
+ urwid.Frame.__init__(
+ self,
+ urwid.AttrWrap(body, "background"),
+ header = urwid.AttrWrap(header, "background") if header else None,
+ footer = urwid.AttrWrap(footer, "background") if footer else None
+ )
+ self.master = master
+ self.helpctx = helpctx
+ signals.focus.connect(self.sig_focus)
+
+ def sig_focus(self, sender, section):
+ self.focus_position = section
+
+ def mouse_event(self, *args, **kwargs):
+ # args: (size, event, button, col, row)
+ k = super(self.__class__, self).mouse_event(*args, **kwargs)
+ if not k:
+ if args[1] == "mouse drag":
+ signals.status_message.send(
+ message = "Hold down shift, alt or ctrl to select text.",
+ expire = 1
+ )
+ elif args[1] == "mouse press" and args[2] == 4:
+ self.keypress(args[0], "up")
+ elif args[1] == "mouse press" and args[2] == 5:
+ self.keypress(args[0], "down")
+ else:
+ return False
+ return True
+
+ def keypress(self, size, k):
+ k = super(self.__class__, self).keypress(size, k)
+ if k == "?":
+ self.master.view_help(self.helpctx)
+ elif k == "c":
+ if not self.master.client_playback:
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Client replay",
+ callback = self.master.client_playback_path
+ )
+ else:
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Stop current client replay?",
+ keys = (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ callback = self.master.stop_client_playback_prompt,
+ )
+ elif k == "i":
+ signals.status_prompt.send(
+ self,
+ prompt = "Intercept filter",
+ text = self.master.state.intercept_txt,
+ callback = self.master.set_intercept
+ )
+ elif k == "o":
+ self.master.view_options()
+ elif k == "Q":
+ raise urwid.ExitMainLoop
+ elif k == "q":
+ signals.pop_view_state.send(self)
+ elif k == "S":
+ if not self.master.server_playback:
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Server replay path",
+ callback = self.master.server_playback_path
+ )
+ else:
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Stop current server replay?",
+ keys = (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ callback = self.master.stop_server_playback_prompt,
+ )
+ else:
+ return k