diff options
| author | Maximilian Hils <git@maximilianhils.com> | 2016-02-18 23:10:47 +0100 |
|---|---|---|
| committer | Maximilian Hils <git@maximilianhils.com> | 2016-02-18 23:10:47 +0100 |
| commit | 7c6bf7abb3c0e94f9c4dfa77fe0690fe11c6d4d3 (patch) | |
| tree | 3f583d91ff97924068f7017f770b710da2768abe /mitmproxy/console | |
| parent | be02dd105b7803b7b2b6942f9d254539dfd6ba26 (diff) | |
| parent | 61cde30ef8410dc5400039eea5d312fabf3779a9 (diff) | |
| download | mitmproxy-7c6bf7abb3c0e94f9c4dfa77fe0690fe11c6d4d3.tar.gz mitmproxy-7c6bf7abb3c0e94f9c4dfa77fe0690fe11c6d4d3.tar.bz2 mitmproxy-7c6bf7abb3c0e94f9c4dfa77fe0690fe11c6d4d3.zip | |
Merge pull request #964 from mitmproxy/flat-structure
Flat structure
Diffstat (limited to 'mitmproxy/console')
| -rw-r--r-- | mitmproxy/console/__init__.py | 744 | ||||
| -rw-r--r-- | mitmproxy/console/common.py | 444 | ||||
| -rw-r--r-- | mitmproxy/console/flowdetailview.py | 153 | ||||
| -rw-r--r-- | mitmproxy/console/flowlist.py | 397 | ||||
| -rw-r--r-- | mitmproxy/console/flowview.py | 714 | ||||
| -rw-r--r-- | mitmproxy/console/grideditor.py | 716 | ||||
| -rw-r--r-- | mitmproxy/console/help.py | 117 | ||||
| -rw-r--r-- | mitmproxy/console/options.py | 271 | ||||
| -rw-r--r-- | mitmproxy/console/palettepicker.py | 82 | ||||
| -rw-r--r-- | mitmproxy/console/palettes.py | 326 | ||||
| -rw-r--r-- | mitmproxy/console/pathedit.py | 71 | ||||
| -rw-r--r-- | mitmproxy/console/searchable.py | 93 | ||||
| -rw-r--r-- | mitmproxy/console/select.py | 120 | ||||
| -rw-r--r-- | mitmproxy/console/signals.py | 43 | ||||
| -rw-r--r-- | mitmproxy/console/statusbar.py | 258 | ||||
| -rw-r--r-- | mitmproxy/console/tabs.py | 70 | ||||
| -rw-r--r-- | mitmproxy/console/window.py | 90 |
17 files changed, 4709 insertions, 0 deletions
diff --git a/mitmproxy/console/__init__.py b/mitmproxy/console/__init__.py new file mode 100644 index 00000000..e739ec61 --- /dev/null +++ b/mitmproxy/console/__init__.py @@ -0,0 +1,744 @@ +from __future__ import absolute_import + +import mailcap +import mimetypes +import tempfile +import os +import os.path +import shlex +import signal +import stat +import subprocess +import sys +import traceback +import urwid +import weakref + +from .. import controller, flow, script, contentviews +from . import flowlist, flowview, help, window, signals, options +from . import grideditor, palettes, statusbar, palettepicker + +EVENTLOG_SIZE = 500 + + +class ConsoleState(flow.State): + + def __init__(self): + flow.State.__init__(self) + self.focus = None + self.follow_focus = None + self.default_body_view = contentviews.get("Auto") + self.flowsettings = weakref.WeakKeyDictionary() + self.last_search = None + + def __setattr__(self, name, value): + self.__dict__[name] = value + signals.update_settings.send(self) + + def add_flow_setting(self, flow, key, value): + d = self.flowsettings.setdefault(flow, {}) + d[key] = value + + def get_flow_setting(self, flow, key, default=None): + d = self.flowsettings.get(flow, {}) + return d.get(key, default) + + def add_flow(self, f): + super(ConsoleState, self).add_flow(f) + if self.focus is None: + self.set_focus(0) + elif self.follow_focus: + self.set_focus(len(self.view) - 1) + self.set_flow_marked(f, False) + return f + + def update_flow(self, f): + super(ConsoleState, self).update_flow(f) + if self.focus is None: + self.set_focus(0) + return f + + def set_limit(self, limit): + ret = flow.State.set_limit(self, limit) + self.set_focus(self.focus) + return ret + + def get_focus(self): + if not self.view or self.focus is None: + return None, None + return self.view[self.focus], self.focus + + def set_focus(self, idx): + if self.view: + if idx >= len(self.view): + idx = len(self.view) - 1 + elif idx < 0: + idx = 0 + self.focus = idx + else: + self.focus = None + + def set_focus_flow(self, f): + self.set_focus(self.view.index(f)) + + def get_from_pos(self, pos): + if len(self.view) <= pos or pos < 0: + return None, None + return self.view[pos], pos + + def get_next(self, pos): + return self.get_from_pos(pos + 1) + + def get_prev(self, pos): + return self.get_from_pos(pos - 1) + + def delete_flow(self, f): + if f in self.view and self.view.index(f) <= self.focus: + self.focus -= 1 + if self.focus < 0: + self.focus = None + ret = flow.State.delete_flow(self, f) + self.set_focus(self.focus) + return ret + + def clear(self): + marked_flows = [] + for f in self.flows: + if self.flow_marked(f): + marked_flows.append(f) + + super(ConsoleState, self).clear() + + for f in marked_flows: + self.add_flow(f) + self.set_flow_marked(f, True) + + if len(self.flows.views) == 0: + self.focus = None + else: + self.focus = 0 + self.set_focus(self.focus) + + def flow_marked(self, flow): + return self.get_flow_setting(flow, "marked", False) + + def set_flow_marked(self, flow, marked): + self.add_flow_setting(flow, "marked", marked) + + +class Options(object): + attributes = [ + "app", + "app_domain", + "app_ip", + "anticache", + "anticomp", + "client_replay", + "eventlog", + "follow", + "keepserving", + "kill", + "intercept", + "limit", + "no_server", + "refresh_server_playback", + "rfile", + "scripts", + "showhost", + "replacements", + "rheaders", + "setheaders", + "server_replay", + "stickycookie", + "stickyauth", + "stream_large_bodies", + "verbosity", + "wfile", + "nopop", + "palette", + "palette_transparent", + "no_mouse" + ] + + def __init__(self, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + for i in self.attributes: + if not hasattr(self, i): + setattr(self, i, None) + + +class ConsoleMaster(flow.FlowMaster): + palette = [] + + def __init__(self, server, options): + flow.FlowMaster.__init__(self, server, ConsoleState()) + self.stream_path = None + self.options = options + + for i in options.replacements: + self.replacehooks.add(*i) + + for i in options.setheaders: + self.setheaders.add(*i) + + r = self.set_intercept(options.intercept) + if r: + print >> sys.stderr, "Intercept error:", r + sys.exit(1) + + if options.limit: + self.set_limit(options.limit) + + r = self.set_stickycookie(options.stickycookie) + if r: + print >> sys.stderr, "Sticky cookies error:", r + sys.exit(1) + + r = self.set_stickyauth(options.stickyauth) + if r: + print >> sys.stderr, "Sticky auth error:", r + sys.exit(1) + + self.set_stream_large_bodies(options.stream_large_bodies) + + self.refresh_server_playback = options.refresh_server_playback + self.anticache = options.anticache + self.anticomp = options.anticomp + self.killextra = options.kill + self.rheaders = options.rheaders + self.nopop = options.nopop + self.showhost = options.showhost + self.palette = options.palette + self.palette_transparent = options.palette_transparent + + self.eventlog = options.eventlog + self.eventlist = urwid.SimpleListWalker([]) + self.follow = options.follow + + if options.client_replay: + self.client_playback_path(options.client_replay) + + if options.server_replay: + self.server_playback_path(options.server_replay) + + if options.scripts: + for i in options.scripts: + err = self.load_script(i) + if err: + print >> sys.stderr, "Script load error:", err + sys.exit(1) + + if options.outfile: + err = self.start_stream_to_path( + options.outfile[0], + options.outfile[1] + ) + if err: + print >> sys.stderr, "Stream file error:", err + sys.exit(1) + + self.view_stack = [] + + if options.app: + self.start_app(self.options.app_host, self.options.app_port) + signals.call_in.connect(self.sig_call_in) + signals.pop_view_state.connect(self.sig_pop_view_state) + signals.push_view_state.connect(self.sig_push_view_state) + signals.sig_add_event.connect(self.sig_add_event) + + def __setattr__(self, name, value): + self.__dict__[name] = value + signals.update_settings.send(self) + + def load_script(self, command, use_reloader=True): + # We default to using the reloader in the console ui. + super(ConsoleMaster, self).load_script(command, use_reloader) + + def sig_add_event(self, sender, e, level): + needed = dict(error=0, info=1, debug=2).get(level, 1) + if self.options.verbosity < needed: + return + + if level == "error": + e = urwid.Text(("error", str(e))) + else: + e = urwid.Text(str(e)) + self.eventlist.append(e) + if len(self.eventlist) > EVENTLOG_SIZE: + self.eventlist.pop(0) + self.eventlist.set_focus(len(self.eventlist) - 1) + + def add_event(self, e, level): + signals.add_event(e, level) + + def sig_call_in(self, sender, seconds, callback, args=()): + def cb(*_): + return callback(*args) + self.loop.set_alarm_in(seconds, cb) + + def sig_pop_view_state(self, sender): + if len(self.view_stack) > 1: + self.view_stack.pop() + self.loop.widget = self.view_stack[-1] + else: + signals.status_prompt_onekey.send( + self, + prompt = "Quit", + keys = ( + ("yes", "y"), + ("no", "n"), + ), + callback = self.quit, + ) + + def sig_push_view_state(self, sender, window): + self.view_stack.append(window) + self.loop.widget = window + self.loop.draw_screen() + + def _run_script_method(self, method, s, f): + status, val = s.run(method, f) + if val: + if status: + signals.add_event("Method %s return: %s" % (method, val), "debug") + else: + signals.add_event( + "Method %s error: %s" % + (method, val[1]), "error") + + def run_script_once(self, command, f): + if not command: + return + signals.add_event("Running script on flow: %s" % command, "debug") + + try: + s = script.Script(command, script.ScriptContext(self)) + except script.ScriptException as v: + signals.status_message.send( + message = "Error loading script." + ) + signals.add_event("Error loading script:\n%s" % v.args[0], "error") + return + + if f.request: + self._run_script_method("request", s, f) + if f.response: + self._run_script_method("response", s, f) + if f.error: + self._run_script_method("error", s, f) + s.unload() + signals.flow_change.send(self, flow = f) + + def set_script(self, command): + if not command: + return + ret = self.load_script(command) + if ret: + signals.status_message.send(message=ret) + + def toggle_eventlog(self): + self.eventlog = not self.eventlog + signals.pop_view_state.send(self) + self.view_flowlist() + + def _readflows(self, path): + """ + Utitility function that reads a list of flows + or prints an error to the UI if that fails. + Returns + - None, if there was an error. + - a list of flows, otherwise. + """ + try: + return flow.read_flows_from_paths(path) + except flow.FlowReadError as e: + signals.status_message.send(message=e.strerror) + + def client_playback_path(self, path): + if not isinstance(path, list): + path = [path] + flows = self._readflows(path) + if flows: + self.start_client_playback(flows, False) + + def server_playback_path(self, path): + if not isinstance(path, list): + path = [path] + flows = self._readflows(path) + if flows: + self.start_server_playback( + flows, + self.killextra, self.rheaders, + False, self.nopop, + self.options.replay_ignore_params, + self.options.replay_ignore_content, + self.options.replay_ignore_payload_params, + self.options.replay_ignore_host + ) + + def spawn_editor(self, data): + fd, name = tempfile.mkstemp('', "mproxy") + os.write(fd, data) + os.close(fd) + c = os.environ.get("EDITOR") + # if no EDITOR is set, assume 'vi' + if not c: + c = "vi" + cmd = shlex.split(c) + cmd.append(name) + self.ui.stop() + try: + subprocess.call(cmd) + except: + signals.status_message.send( + message = "Can't start editor: %s" % " ".join(c) + ) + else: + data = open(name, "rb").read() + self.ui.start() + os.unlink(name) + return data + + def spawn_external_viewer(self, data, contenttype): + if contenttype: + contenttype = contenttype.split(";")[0] + ext = mimetypes.guess_extension(contenttype) or "" + else: + ext = "" + fd, name = tempfile.mkstemp(ext, "mproxy") + os.write(fd, data) + os.close(fd) + + # read-only to remind the user that this is a view function + os.chmod(name, stat.S_IREAD) + + cmd = None + shell = False + + if contenttype: + c = mailcap.getcaps() + cmd, _ = mailcap.findmatch(c, contenttype, filename=name) + if cmd: + shell = True + if not cmd: + # hm which one should get priority? + c = os.environ.get("PAGER") or os.environ.get("EDITOR") + if not c: + c = "less" + cmd = shlex.split(c) + cmd.append(name) + self.ui.stop() + try: + subprocess.call(cmd, shell=shell) + except: + signals.status_message.send( + message="Can't start external viewer: %s" % " ".join(c) + ) + self.ui.start() + os.unlink(name) + + def set_palette(self, name): + self.palette = name + self.ui.register_palette( + palettes.palettes[name].palette(self.palette_transparent) + ) + self.ui.clear() + + def ticker(self, *userdata): + changed = self.tick(self.masterq, timeout=0) + if changed: + self.loop.draw_screen() + signals.update_settings.send() + self.loop.set_alarm_in(0.01, self.ticker) + + def run(self): + self.ui = urwid.raw_display.Screen() + self.ui.set_terminal_properties(256) + self.set_palette(self.palette) + self.loop = urwid.MainLoop( + urwid.SolidFill("x"), + screen = self.ui, + handle_mouse = not self.options.no_mouse, + ) + + self.server.start_slave( + controller.Slave, + controller.Channel(self.masterq, self.should_exit) + ) + + if self.options.rfile: + ret = self.load_flows_path(self.options.rfile) + if ret and self.state.flow_count(): + signals.add_event( + "File truncated or corrupted. " + "Loaded as many flows as possible.", + "error" + ) + elif ret and not self.state.flow_count(): + self.shutdown() + print >> sys.stderr, "Could not load file:", ret + sys.exit(1) + + self.loop.set_alarm_in(0.01, self.ticker) + + # It's not clear why we need to handle this explicitly - without this, + # mitmproxy hangs on keyboard interrupt. Remove if we ever figure it + # out. + def exit(s, f): + raise urwid.ExitMainLoop + signal.signal(signal.SIGINT, exit) + + self.loop.set_alarm_in( + 0.0001, + lambda *args: self.view_flowlist() + ) + + try: + self.loop.run() + except Exception: + self.loop.stop() + sys.stdout.flush() + print >> sys.stderr, traceback.format_exc() + print >> sys.stderr, "mitmproxy has crashed!" + print >> sys.stderr, "Please lodge a bug report at:" + print >> sys.stderr, "\thttps://github.com/mitmproxy/mitmproxy" + print >> sys.stderr, "Shutting down..." + sys.stderr.flush() + self.shutdown() + + def view_help(self, helpctx): + signals.push_view_state.send( + self, + window = window.Window( + self, + help.HelpView(helpctx), + None, + statusbar.StatusBar(self, help.footer), + None + ) + ) + + def view_options(self): + for i in self.view_stack: + if isinstance(i["body"], options.Options): + return + signals.push_view_state.send( + self, + window = window.Window( + self, + options.Options(self), + None, + statusbar.StatusBar(self, options.footer), + options.help_context, + ) + ) + + def view_palette_picker(self): + signals.push_view_state.send( + self, + window = window.Window( + self, + palettepicker.PalettePicker(self), + None, + statusbar.StatusBar(self, palettepicker.footer), + palettepicker.help_context, + ) + ) + + def view_grideditor(self, ge): + signals.push_view_state.send( + self, + window = window.Window( + self, + ge, + None, + statusbar.StatusBar(self, grideditor.FOOTER), + ge.make_help() + ) + ) + + def view_flowlist(self): + if self.ui.started: + self.ui.clear() + if self.state.follow_focus: + self.state.set_focus(self.state.flow_count()) + + if self.eventlog: + body = flowlist.BodyPile(self) + else: + body = flowlist.FlowListBox(self) + + if self.follow: + self.toggle_follow_flows() + + signals.push_view_state.send( + self, + window = window.Window( + self, + body, + None, + statusbar.StatusBar(self, flowlist.footer), + flowlist.help_context + ) + ) + + def view_flow(self, flow, tab_offset=0): + self.state.set_focus_flow(flow) + signals.push_view_state.send( + self, + window = window.Window( + self, + flowview.FlowView(self, self.state, flow, tab_offset), + flowview.FlowViewHeader(self, flow), + statusbar.StatusBar(self, flowview.footer), + flowview.help_context + ) + ) + + def _write_flows(self, path, flows): + if not path: + return + path = os.path.expanduser(path) + try: + f = file(path, "wb") + fw = flow.FlowWriter(f) + for i in flows: + fw.add(i) + f.close() + except IOError as v: + signals.status_message.send(message=v.strerror) + + def save_one_flow(self, path, flow): + return self._write_flows(path, [flow]) + + def save_flows(self, path): + return self._write_flows(path, self.state.view) + + def save_marked_flows(self, path): + marked_flows = [] + for f in self.state.view: + if self.state.flow_marked(f): + marked_flows.append(f) + return self._write_flows(path, marked_flows) + + def load_flows_callback(self, path): + if not path: + return + ret = self.load_flows_path(path) + return ret or "Flows loaded from %s" % path + + def load_flows_path(self, path): + reterr = None + try: + flow.FlowMaster.load_flows_file(self, path) + except flow.FlowReadError as v: + reterr = str(v) + signals.flowlist_change.send(self) + return reterr + + def accept_all(self): + self.state.accept_all(self) + + def set_limit(self, txt): + v = self.state.set_limit(txt) + signals.flowlist_change.send(self) + return v + + def set_intercept(self, txt): + return self.state.set_intercept(txt) + + def change_default_display_mode(self, t): + v = contentviews.get_by_shortcut(t) + self.state.default_body_view = v + self.refresh_focus() + + def edit_scripts(self, scripts): + commands = [x[0] for x in scripts] # remove outer array + if commands == [s.command for s in self.scripts]: + return + + self.unload_scripts() + for command in commands: + self.load_script(command) + signals.update_settings.send(self) + + def stop_client_playback_prompt(self, a): + if a != "n": + self.stop_client_playback() + + def stop_server_playback_prompt(self, a): + if a != "n": + self.stop_server_playback() + + def quit(self, a): + if a != "n": + raise urwid.ExitMainLoop + + def shutdown(self): + self.state.killall(self) + flow.FlowMaster.shutdown(self) + + def clear_flows(self): + self.state.clear() + signals.flowlist_change.send(self) + + def toggle_follow_flows(self): + # toggle flow follow + self.state.follow_focus = not self.state.follow_focus + # jump to most recent flow if follow is now on + if self.state.follow_focus: + self.state.set_focus(self.state.flow_count()) + signals.flowlist_change.send(self) + + def delete_flow(self, f): + self.state.delete_flow(f) + signals.flowlist_change.send(self) + + def refresh_focus(self): + if self.state.view: + signals.flow_change.send( + self, + flow = self.state.view[self.state.focus] + ) + + def process_flow(self, f): + if self.state.intercept and f.match( + self.state.intercept) and not f.request.is_replay: + f.intercept(self) + else: + # check if flow was intercepted within an inline script by flow.intercept() + if f.intercepted: + f.intercept(self) + else: + f.reply() + signals.flowlist_change.send(self) + signals.flow_change.send(self, flow = f) + + def clear_events(self): + self.eventlist[:] = [] + + # Handlers + def handle_error(self, f): + f = flow.FlowMaster.handle_error(self, f) + if f: + self.process_flow(f) + return f + + def handle_request(self, f): + f = flow.FlowMaster.handle_request(self, f) + if f: + self.process_flow(f) + return f + + def handle_response(self, f): + f = flow.FlowMaster.handle_response(self, f) + if f: + self.process_flow(f) + return f + + def handle_script_change(self, script): + if super(ConsoleMaster, self).handle_script_change(script): + signals.status_message.send(message='"{}" reloaded.'.format(script.filename)) + else: + signals.status_message.send(message='Error reloading "{}".'.format(script.filename)) diff --git a/mitmproxy/console/common.py b/mitmproxy/console/common.py new file mode 100644 index 00000000..c29ffddc --- /dev/null +++ b/mitmproxy/console/common.py @@ -0,0 +1,444 @@ +from __future__ import absolute_import + +import urwid +import urwid.util +import os + +from netlib.http import CONTENT_MISSING +import netlib.utils + +from .. import utils +from .. import flow_export +from ..models import decoded +from . import signals + + +try: + import pyperclip +except: + pyperclip = False + + +VIEW_FLOW_REQUEST = 0 +VIEW_FLOW_RESPONSE = 1 + +METHOD_OPTIONS = [ + ("get", "g"), + ("post", "p"), + ("put", "u"), + ("head", "h"), + ("trace", "t"), + ("delete", "d"), + ("options", "o"), + ("edit raw", "e"), +] + + +def is_keypress(k): + """ + Is this input event a keypress? + """ + if isinstance(k, basestring): + return True + + +def highlight_key(str, key, textattr="text", keyattr="key"): + l = [] + parts = str.split(key, 1) + if parts[0]: + l.append((textattr, parts[0])) + l.append((keyattr, key)) + if parts[1]: + l.append((textattr, parts[1])) + return l + + +KEY_MAX = 30 + + +def format_keyvals(lst, key="key", val="text", indent=0): + """ + Format a list of (key, value) tuples. + + If key is None, it's treated specially: + - We assume a sub-value, and add an extra indent. + - The value is treated as a pre-formatted list of directives. + """ + ret = [] + if lst: + maxk = min(max(len(i[0]) for i in lst if i and i[0]), KEY_MAX) + for i, kv in enumerate(lst): + if kv is None: + ret.append(urwid.Text("")) + else: + if isinstance(kv[1], urwid.Widget): + v = kv[1] + elif kv[1] is None: + v = urwid.Text("") + else: + v = urwid.Text([(val, kv[1])]) + ret.append( + urwid.Columns( + [ + ("fixed", indent, urwid.Text("")), + ( + "fixed", + maxk, + urwid.Text([(key, kv[0] or "")]) + ), + v + ], + dividechars = 2 + ) + ) + return ret + + +def shortcuts(k): + if k == " ": + k = "page down" + elif k == "ctrl f": + k = "page down" + elif k == "ctrl b": + k = "page up" + elif k == "j": + k = "down" + elif k == "k": + k = "up" + return k + + +def fcol(s, attr): + s = unicode(s) + return ( + "fixed", + len(s), + urwid.Text( + [ + (attr, s) + ] + ) + ) + +if urwid.util.detected_encoding: + SYMBOL_REPLAY = u"\u21ba" + SYMBOL_RETURN = u"\u2190" + SYMBOL_MARK = u"\u25cf" +else: + SYMBOL_REPLAY = u"[r]" + SYMBOL_RETURN = u"<-" + SYMBOL_MARK = "[m]" + + +def raw_format_flow(f, focus, extended): + f = dict(f) + pile = [] + req = [] + if extended: + req.append( + fcol( + utils.format_timestamp(f["req_timestamp"]), + "highlight" + ) + ) + else: + req.append(fcol(">>" if focus else " ", "focus")) + + if f["marked"]: + req.append(fcol(SYMBOL_MARK, "mark")) + + if f["req_is_replay"]: + req.append(fcol(SYMBOL_REPLAY, "replay")) + req.append(fcol(f["req_method"], "method")) + + preamble = sum(i[1] for i in req) + len(req) - 1 + + if f["intercepted"] and not f["acked"]: + uc = "intercept" + elif f["resp_code"] or f["err_msg"]: + uc = "text" + else: + uc = "title" + + url = f["req_url"] + if f["req_http_version"] not in ("HTTP/1.0", "HTTP/1.1"): + url += " " + f["req_http_version"] + req.append( + urwid.Text([(uc, url)]) + ) + + pile.append(urwid.Columns(req, dividechars=1)) + + resp = [] + resp.append( + ("fixed", preamble, urwid.Text("")) + ) + + if f["resp_code"]: + codes = { + 2: "code_200", + 3: "code_300", + 4: "code_400", + 5: "code_500", + } + ccol = codes.get(f["resp_code"] / 100, "code_other") + resp.append(fcol(SYMBOL_RETURN, ccol)) + if f["resp_is_replay"]: + resp.append(fcol(SYMBOL_REPLAY, "replay")) + resp.append(fcol(f["resp_code"], ccol)) + if f["intercepted"] and f["resp_code"] and not f["acked"]: + rc = "intercept" + else: + rc = "text" + + if f["resp_ctype"]: + resp.append(fcol(f["resp_ctype"], rc)) + resp.append(fcol(f["resp_clen"], rc)) + resp.append(fcol(f["roundtrip"], rc)) + + elif f["err_msg"]: + resp.append(fcol(SYMBOL_RETURN, "error")) + resp.append( + urwid.Text([ + ( + "error", + f["err_msg"] + ) + ]) + ) + pile.append(urwid.Columns(resp, dividechars=1)) + return urwid.Pile(pile) + + +# Save file to disk +def save_data(path, data): + if not path: + return + try: + with file(path, "wb") as f: + f.write(data) + except IOError as v: + signals.status_message.send(message=v.strerror) + + +def ask_save_overwrite(path, data): + if not path: + return + path = os.path.expanduser(path) + if os.path.exists(path): + def save_overwrite(k): + if k == "y": + save_data(path, data) + + signals.status_prompt_onekey.send( + prompt = "'" + path + "' already exists. Overwrite?", + keys = ( + ("yes", "y"), + ("no", "n"), + ), + callback = save_overwrite + ) + else: + save_data(path, data) + + +def ask_save_path(prompt, data): + signals.status_prompt_path.send( + prompt = prompt, + callback = ask_save_overwrite, + args = (data, ) + ) + + +def copy_flow_format_data(part, scope, flow): + if part == "u": + data = flow.request.url + else: + data = "" + if scope in ("q", "a"): + if flow.request.content is None or flow.request.content == CONTENT_MISSING: + return None, "Request content is missing" + with decoded(flow.request): + if part == "h": + data += netlib.http.http1.assemble_request(flow.request) + elif part == "c": + data += flow.request.content + else: + raise ValueError("Unknown part: {}".format(part)) + if scope == "a" and flow.request.content and flow.response: + # Add padding between request and response + data += "\r\n" * 2 + if scope in ("s", "a") and flow.response: + if flow.response.content is None or flow.response.content == CONTENT_MISSING: + return None, "Response content is missing" + with decoded(flow.response): + if part == "h": + data += netlib.http.http1.assemble_response(flow.response) + elif part == "c": + data += flow.response.content + else: + raise ValueError("Unknown part: {}".format(part)) + return data, False + + +def export_prompt(k, flow): + exporters = { + "c": flow_export.curl_command, + "p": flow_export.python_code, + "r": flow_export.raw_request, + } + if k in exporters: + copy_to_clipboard_or_prompt(exporters[k](flow)) + + +def copy_to_clipboard_or_prompt(data): + # pyperclip calls encode('utf-8') on data to be copied without checking. + # if data are already encoded that way UnicodeDecodeError is thrown. + toclip = "" + try: + toclip = data.decode('utf-8') + except (UnicodeDecodeError): + toclip = data + + try: + pyperclip.copy(toclip) + except (RuntimeError, UnicodeDecodeError, AttributeError): + def save(k): + if k == "y": + ask_save_path("Save data", data) + signals.status_prompt_onekey.send( + prompt = "Cannot copy data to clipboard. Save as file?", + keys = ( + ("yes", "y"), + ("no", "n"), + ), + callback = save + ) + + +def copy_flow(part, scope, flow, master, state): + """ + part: _c_ontent, _h_eaders+content, _u_rl + scope: _a_ll, re_q_uest, re_s_ponse + """ + data, err = copy_flow_format_data(part, scope, flow) + + if err: + signals.status_message.send(message=err) + return + + if not data: + if scope == "q": + signals.status_message.send(message="No request content to copy.") + elif scope == "s": + signals.status_message.send(message="No response content to copy.") + else: + signals.status_message.send(message="No contents to copy.") + return + + copy_to_clipboard_or_prompt(data) + + +def ask_copy_part(scope, flow, master, state): + choices = [ + ("content", "c"), + ("headers+content", "h") + ] + if scope != "s": + choices.append(("url", "u")) + + signals.status_prompt_onekey.send( + prompt = "Copy", + keys = choices, + callback = copy_flow, + args = (scope, flow, master, state) + ) + + +def ask_save_body(part, master, state, flow): + """ + Save either the request or the response body to disk. part can either be + "q" (request), "s" (response) or None (ask user if necessary). + """ + + request_has_content = flow.request and flow.request.content + response_has_content = flow.response and flow.response.content + + if part is None: + # We first need to determine whether we want to save the request or the + # response content. + if request_has_content and response_has_content: + signals.status_prompt_onekey.send( + prompt = "Save", + keys = ( + ("request", "q"), + ("response", "s"), + ), + callback = ask_save_body, + args = (master, state, flow) + ) + elif response_has_content: + ask_save_body("s", master, state, flow) + else: + ask_save_body("q", master, state, flow) + + elif part == "q" and request_has_content: + ask_save_path( + "Save request content", + flow.request.get_decoded_content() + ) + elif part == "s" and response_has_content: + ask_save_path( + "Save response content", + flow.response.get_decoded_content() + ) + else: + signals.status_message.send(message="No content to save.") + + +flowcache = utils.LRUCache(800) + + +def format_flow(f, focus, extended=False, hostheader=False, marked=False): + d = dict( + intercepted = f.intercepted, + acked = f.reply.acked, + + req_timestamp = f.request.timestamp_start, + req_is_replay = f.request.is_replay, + req_method = f.request.method, + req_url = f.request.pretty_url if hostheader else f.request.url, + req_http_version = f.request.http_version, + + err_msg = f.error.msg if f.error else None, + resp_code = f.response.status_code if f.response else None, + + marked = marked, + ) + if f.response: + if f.response.content: + contentdesc = netlib.utils.pretty_size(len(f.response.content)) + elif f.response.content == CONTENT_MISSING: + contentdesc = "[content missing]" + else: + contentdesc = "[no content]" + duration = 0 + if f.response.timestamp_end and f.request.timestamp_start: + duration = f.response.timestamp_end - f.request.timestamp_start + roundtrip = utils.pretty_duration(duration) + + d.update(dict( + resp_code = f.response.status_code, + resp_is_replay = f.response.is_replay, + resp_clen = contentdesc, + roundtrip = roundtrip, + )) + t = f.response.headers.get("content-type") + if t: + d["resp_ctype"] = t.split(";")[0] + else: + d["resp_ctype"] = "" + return flowcache.get( + raw_format_flow, + tuple(sorted(d.items())), focus, extended + ) diff --git a/mitmproxy/console/flowdetailview.py b/mitmproxy/console/flowdetailview.py new file mode 100644 index 00000000..f4b4262e --- /dev/null +++ b/mitmproxy/console/flowdetailview.py @@ -0,0 +1,153 @@ +from __future__ import absolute_import +import urwid +from . import common, searchable +from .. import utils + + +def maybe_timestamp(base, attr): + if base and getattr(base, attr): + return utils.format_timestamp_with_milli(getattr(base, attr)) + else: + return "active" + + +def flowdetails(state, flow): + text = [] + + cc = flow.client_conn + sc = flow.server_conn + req = flow.request + resp = flow.response + + if sc is not None: + text.append(urwid.Text([("head", "Server Connection:")])) + parts = [ + ["Address", "%s:%s" % sc.address()], + ] + + text.extend( + common.format_keyvals(parts, key="key", val="text", indent=4) + ) + + c = sc.cert + if c: + text.append(urwid.Text([("head", "Server Certificate:")])) + parts = [ + ["Type", "%s, %s bits" % c.keyinfo], + ["SHA1 digest", c.digest("sha1")], + ["Valid to", str(c.notafter)], + ["Valid from", str(c.notbefore)], + ["Serial", str(c.serial)], + [ + "Subject", + urwid.BoxAdapter( + urwid.ListBox( + common.format_keyvals( + c.subject, + key="highlight", + val="text" + ) + ), + len(c.subject) + ) + ], + [ + "Issuer", + urwid.BoxAdapter( + urwid.ListBox( + common.format_keyvals( + c.issuer, key="highlight", val="text" + ) + ), + len(c.issuer) + ) + ] + ] + + if c.altnames: + parts.append( + [ + "Alt names", + ", ".join(c.altnames) + ] + ) + text.extend( + common.format_keyvals(parts, key="key", val="text", indent=4) + ) + + if cc is not None: + text.append(urwid.Text([("head", "Client Connection:")])) + + parts = [ + ["Address", "%s:%s" % cc.address()], + # ["Requests", "%s"%cc.requestcount], + ] + + text.extend( + common.format_keyvals(parts, key="key", val="text", indent=4) + ) + + parts = [] + + parts.append( + [ + "Client conn. established", + maybe_timestamp(cc, "timestamp_start") + ] + ) + parts.append( + [ + "Server conn. initiated", + maybe_timestamp(sc, "timestamp_start") + ] + ) + parts.append( + [ + "Server conn. TCP handshake", + maybe_timestamp(sc, "timestamp_tcp_setup") + ] + ) + if sc.ssl_established: + parts.append( + [ + "Server conn. SSL handshake", + maybe_timestamp(sc, "timestamp_ssl_setup") + ] + ) + parts.append( + [ + "Client conn. SSL handshake", + maybe_timestamp(cc, "timestamp_ssl_setup") + ] + ) + parts.append( + [ + "First request byte", + maybe_timestamp(req, "timestamp_start") + ] + ) + parts.append( + [ + "Request complete", + maybe_timestamp(req, "timestamp_end") + ] + ) + parts.append( + [ + "First response byte", + maybe_timestamp(resp, "timestamp_start") + ] + ) + parts.append( + [ + "Response complete", + maybe_timestamp(resp, "timestamp_end") + ] + ) + + # sort operations by timestamp + parts = sorted(parts, key=lambda p: p[1]) + + text.append(urwid.Text([("head", "Timing:")])) + text.extend(common.format_keyvals(parts, key="key", val="text", indent=4)) + return searchable.Searchable(state, text) diff --git a/mitmproxy/console/flowlist.py b/mitmproxy/console/flowlist.py new file mode 100644 index 00000000..c2201055 --- /dev/null +++ b/mitmproxy/console/flowlist.py @@ -0,0 +1,397 @@ +from __future__ import absolute_import +import urwid + +import netlib.utils + +from . import common, signals + + +def _mkhelp(): + text = [] + keys = [ + ("A", "accept all intercepted flows"), + ("a", "accept this intercepted flow"), + ("b", "save request/response body"), + ("C", "clear flow list or eventlog"), + ("d", "delete flow"), + ("D", "duplicate flow"), + ("E", "export"), + ("e", "toggle eventlog"), + ("F", "toggle follow flow list"), + ("l", "set limit filter pattern"), + ("L", "load saved flows"), + ("m", "toggle flow mark"), + ("n", "create a new request"), + ("P", "copy flow to clipboard"), + ("r", "replay request"), + ("U", "unmark all marked flows"), + ("V", "revert changes to request"), + ("w", "save flows "), + ("W", "stream flows to file"), + ("X", "kill and delete flow, even if it's mid-intercept"), + ("tab", "tab between eventlog and flow list"), + ("enter", "view flow"), + ("|", "run script on this flow"), + ] + text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + return text +help_context = _mkhelp() + +footer = [ + ('heading_key', "?"), ":help ", +] + + +class EventListBox(urwid.ListBox): + + def __init__(self, master): + self.master = master + urwid.ListBox.__init__(self, master.eventlist) + + def keypress(self, size, key): + key = common.shortcuts(key) + if key == "C": + self.master.clear_events() + key = None + elif key == "G": + self.set_focus(len(self.master.eventlist) - 1) + elif key == "g": + self.set_focus(0) + return urwid.ListBox.keypress(self, size, key) + + +class BodyPile(urwid.Pile): + + def __init__(self, master): + h = urwid.Text("Event log") + h = urwid.Padding(h, align="left", width=("relative", 100)) + + self.inactive_header = urwid.AttrWrap(h, "heading_inactive") + self.active_header = urwid.AttrWrap(h, "heading") + + urwid.Pile.__init__( + self, + [ + FlowListBox(master), + urwid.Frame( + EventListBox(master), + header = self.inactive_header + ) + ] + ) + self.master = master + + def keypress(self, size, key): + if key == "tab": + self.focus_position = ( + self.focus_position + 1) % len(self.widget_list) + if self.focus_position == 1: + self.widget_list[1].header = self.active_header + else: + self.widget_list[1].header = self.inactive_header + key = None + elif key == "e": + self.master.toggle_eventlog() + key = None + + # This is essentially a copypasta from urwid.Pile's keypress handler. + # So much for "closed for modification, but open for extension". + item_rows = None + if len(size) == 2: + item_rows = self.get_item_rows(size, focus = True) + i = self.widget_list.index(self.focus_item) + tsize = self.get_item_size(size, i, True, item_rows) + return self.focus_item.keypress(tsize, key) + + +class ConnectionItem(urwid.WidgetWrap): + + def __init__(self, master, state, flow, focus): + self.master, self.state, self.flow = master, state, flow + self.f = focus + w = self.get_text() + urwid.WidgetWrap.__init__(self, w) + + def get_text(self): + return common.format_flow( + self.flow, + self.f, + hostheader = self.master.showhost, + marked=self.state.flow_marked(self.flow) + ) + + def selectable(self): + return True + + def save_flows_prompt(self, k): + if k == "a": + signals.status_prompt_path.send( + prompt = "Save all flows to", + callback = self.master.save_flows + ) + elif k == "m": + signals.status_prompt_path.send( + prompt = "Save marked flows to", + callback = self.master.save_marked_flows + ) + else: + signals.status_prompt_path.send( + prompt = "Save this flow to", + callback = self.master.save_one_flow, + args = (self.flow,) + ) + + def stop_server_playback_prompt(self, a): + if a != "n": + self.master.stop_server_playback() + + def server_replay_prompt(self, k): + if k == "a": + self.master.start_server_playback( + [i.copy() for i in self.master.state.view], + self.master.killextra, self.master.rheaders, + False, self.master.nopop, + self.master.options.replay_ignore_params, + self.master.options.replay_ignore_content, + self.master.options.replay_ignore_payload_params, + self.master.options.replay_ignore_host + ) + elif k == "t": + self.master.start_server_playback( + [self.flow.copy()], + self.master.killextra, self.master.rheaders, + False, self.master.nopop, + self.master.options.replay_ignore_params, + self.master.options.replay_ignore_content, + self.master.options.replay_ignore_payload_params, + self.master.options.replay_ignore_host + ) + else: + signals.status_prompt_path.send( + prompt = "Server replay path", + callback = self.master.server_playback_path + ) + + def mouse_event(self, size, event, button, col, row, focus): + if event == "mouse press" and button == 1: + if self.flow.request: + self.master.view_flow(self.flow) + return True + + def keypress(self, xxx_todo_changeme, key): + (maxcol,) = xxx_todo_changeme + key = common.shortcuts(key) + if key == "a": + self.flow.accept_intercept(self.master) + signals.flowlist_change.send(self) + elif key == "d": + self.flow.kill(self.master) + self.state.delete_flow(self.flow) + signals.flowlist_change.send(self) + elif key == "D": + f = self.master.duplicate_flow(self.flow) + self.master.view_flow(f) + elif key == "m": + if self.state.flow_marked(self.flow): + self.state.set_flow_marked(self.flow, False) + else: + self.state.set_flow_marked(self.flow, True) + signals.flowlist_change.send(self) + elif key == "r": + r = self.master.replay_request(self.flow) + if r: + signals.status_message.send(message=r) + signals.flowlist_change.send(self) + elif key == "S": + if not self.master.server_playback: + signals.status_prompt_onekey.send( + prompt = "Server Replay", + keys = ( + ("all flows", "a"), + ("this flow", "t"), + ("file", "f"), + ), + callback = self.server_replay_prompt, + ) + else: + signals.status_prompt_onekey.send( + prompt = "Stop current server replay?", + keys = ( + ("yes", "y"), + ("no", "n"), + ), + callback = self.stop_server_playback_prompt, + ) + elif key == "U": + for f in self.state.flows: + self.state.set_flow_marked(f, False) + signals.flowlist_change.send(self) + elif key == "V": + if not self.flow.modified(): + signals.status_message.send(message="Flow not modified.") + return + self.state.revert(self.flow) + signals.flowlist_change.send(self) + signals.status_message.send(message="Reverted.") + elif key == "w": + signals.status_prompt_onekey.send( + self, + prompt = "Save", + keys = ( + ("all flows", "a"), + ("this flow", "t"), + ("marked flows", "m"), + ), + callback = self.save_flows_prompt, + ) + elif key == "X": + self.flow.kill(self.master) + elif key == "enter": + if self.flow.request: + self.master.view_flow(self.flow) + elif key == "|": + signals.status_prompt_path.send( + prompt = "Send flow to script", + callback = self.master.run_script_once, + args = (self.flow,) + ) + elif key == "P": + common.ask_copy_part("a", self.flow, self.master, self.state) + elif key == "E": + signals.status_prompt_onekey.send( + self, + prompt = "Export", + keys = ( + ("as curl command", "c"), + ("as python code", "p"), + ("as raw request", "r"), + ), + callback = common.export_prompt, + args = (self.flow,) + ) + elif key == "b": + common.ask_save_body(None, self.master, self.state, self.flow) + else: + return key + + +class FlowListWalker(urwid.ListWalker): + + def __init__(self, master, state): + self.master, self.state = master, state + signals.flowlist_change.connect(self.sig_flowlist_change) + + def sig_flowlist_change(self, sender): + self._modified() + + def get_focus(self): + f, i = self.state.get_focus() + f = ConnectionItem(self.master, self.state, f, True) if f else None + return f, i + + def set_focus(self, focus): + ret = self.state.set_focus(focus) + return ret + + def get_next(self, pos): + f, i = self.state.get_next(pos) + f = ConnectionItem(self.master, self.state, f, False) if f else None + return f, i + + def get_prev(self, pos): + f, i = self.state.get_prev(pos) + f = ConnectionItem(self.master, self.state, f, False) if f else None + return f, i + + +class FlowListBox(urwid.ListBox): + + def __init__(self, master): + self.master = master + urwid.ListBox.__init__( + self, + FlowListWalker(master, master.state) + ) + + def get_method_raw(self, k): + if k: + self.get_url(k) + + def get_method(self, k): + if k == "e": + signals.status_prompt.send( + self, + prompt = "Method", + text = "", + callback = self.get_method_raw + ) + else: + method = "" + for i in common.METHOD_OPTIONS: + if i[1] == k: + method = i[0].upper() + self.get_url(method) + + def get_url(self, method): + signals.status_prompt.send( + prompt = "URL", + text = "http://www.example.com/", + callback = self.new_request, + args = (method,) + ) + + def new_request(self, url, method): + parts = netlib.utils.parse_url(str(url)) + if not parts: + signals.status_message.send(message="Invalid Url") + return + scheme, host, port, path = parts + f = self.master.create_request(method, scheme, host, port, path) + self.master.view_flow(f) + + def keypress(self, size, key): + key = common.shortcuts(key) + if key == "A": + self.master.accept_all() + signals.flowlist_change.send(self) + elif key == "C": + self.master.clear_flows() + elif key == "e": + self.master.toggle_eventlog() + elif key == "g": + self.master.state.set_focus(0) + signals.flowlist_change.send(self) + elif key == "G": + self.master.state.set_focus(self.master.state.flow_count()) + signals.flowlist_change.send(self) + elif key == "l": + signals.status_prompt.send( + prompt = "Limit", + text = self.master.state.limit_txt, + callback = self.master.set_limit + ) + elif key == "L": + signals.status_prompt_path.send( + self, + prompt = "Load flows", + callback = self.master.load_flows_callback + ) + elif key == "n": + signals.status_prompt_onekey.send( + prompt = "Method", + keys = common.METHOD_OPTIONS, + callback = self.get_method + ) + elif key == "F": + self.master.toggle_follow_flows() + elif key == "W": + if self.master.stream: + self.master.stop_stream() + else: + signals.status_prompt_path.send( + self, + prompt = "Stream flows to", + callback = self.master.start_stream_to_path + ) + else: + return urwid.ListBox.keypress(self, size, key) diff --git a/mitmproxy/console/flowview.py b/mitmproxy/console/flowview.py new file mode 100644 index 00000000..f74ab140 --- /dev/null +++ b/mitmproxy/console/flowview.py @@ -0,0 +1,714 @@ +from __future__ import absolute_import, division +import os +import traceback +import sys + +import math +import urwid + +from netlib import odict +from netlib.http import CONTENT_MISSING, Headers +from . import common, grideditor, signals, searchable, tabs +from . import flowdetailview +from .. import utils, controller, contentviews +from ..models import HTTPRequest, HTTPResponse, decoded +from ..exceptions import ContentViewException + + +class SearchError(Exception): + pass + + +def _mkhelp(): + text = [] + keys = [ + ("A", "accept all intercepted flows"), + ("a", "accept this intercepted flow"), + ("b", "save request/response body"), + ("D", "duplicate flow"), + ("d", "delete flow"), + ("E", "export"), + ("e", "edit request/response"), + ("f", "load full body data"), + ("m", "change body display mode for this entity"), + (None, + common.highlight_key("automatic", "a") + + [("text", ": automatic detection")] + ), + (None, + common.highlight_key("hex", "e") + + [("text", ": Hex")] + ), + (None, + common.highlight_key("html", "h") + + [("text", ": HTML")] + ), + (None, + common.highlight_key("image", "i") + + [("text", ": Image")] + ), + (None, + common.highlight_key("javascript", "j") + + [("text", ": JavaScript")] + ), + (None, + common.highlight_key("json", "s") + + [("text", ": JSON")] + ), + (None, + common.highlight_key("urlencoded", "u") + + [("text", ": URL-encoded data")] + ), + (None, + common.highlight_key("raw", "r") + + [("text", ": raw data")] + ), + (None, + common.highlight_key("xml", "x") + + [("text", ": XML")] + ), + ("M", "change default body display mode"), + ("p", "previous flow"), + ("P", "copy request/response (content/headers) to clipboard"), + ("r", "replay request"), + ("V", "revert changes to request"), + ("v", "view body in external viewer"), + ("w", "save all flows matching current limit"), + ("W", "save this flow"), + ("x", "delete body"), + ("z", "encode/decode a request/response"), + ("tab", "next tab"), + ("h, l", "previous tab, next tab"), + ("space", "next flow"), + ("|", "run script on this flow"), + ("/", "search (case sensitive)"), + ("n", "repeat search forward"), + ("N", "repeat search backwards"), + ] + text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + return text +help_context = _mkhelp() + +footer = [ + ('heading_key', "?"), ":help ", + ('heading_key', "q"), ":back ", +] + + +class FlowViewHeader(urwid.WidgetWrap): + + def __init__(self, master, f): + self.master, self.flow = master, f + self._w = common.format_flow( + f, + False, + extended=True, + hostheader=self.master.showhost + ) + signals.flow_change.connect(self.sig_flow_change) + + def sig_flow_change(self, sender, flow): + if flow == self.flow: + self._w = common.format_flow( + flow, + False, + extended=True, + hostheader=self.master.showhost + ) + + +cache = utils.LRUCache(200) + +TAB_REQ = 0 +TAB_RESP = 1 + + +class FlowView(tabs.Tabs): + highlight_color = "focusfield" + + def __init__(self, master, state, flow, tab_offset): + self.master, self.state, self.flow = master, state, flow + tabs.Tabs.__init__(self, + [ + (self.tab_request, self.view_request), + (self.tab_response, self.view_response), + (self.tab_details, self.view_details), + ], + tab_offset + ) + self.show() + self.last_displayed_body = None + signals.flow_change.connect(self.sig_flow_change) + + def tab_request(self): + if self.flow.intercepted and not self.flow.reply.acked and not self.flow.response: + return "Request intercepted" + else: + return "Request" + + def tab_response(self): + if self.flow.intercepted and not self.flow.reply.acked and self.flow.response: + return "Response intercepted" + else: + return "Response" + + def tab_details(self): + return "Detail" + + def view_request(self): + return self.conn_text(self.flow.request) + + def view_response(self): + return self.conn_text(self.flow.response) + + def view_details(self): + return flowdetailview.flowdetails(self.state, self.flow) + + def sig_flow_change(self, sender, flow): + if flow == self.flow: + self.show() + + def content_view(self, viewmode, message): + if message.content == CONTENT_MISSING: + msg, body = "", [urwid.Text([("error", "[content missing]")])] + return msg, body + else: + full = self.state.get_flow_setting( + self.flow, + (self.tab_offset, "fullcontents"), + False + ) + if full: + limit = sys.maxsize + else: + limit = contentviews.VIEW_CUTOFF + return cache.get( + self._get_content_view, + viewmode, + message, + limit, + (bytes(message.headers), message.content) # Cache invalidation + ) + + def _get_content_view(self, viewmode, message, max_lines, _): + + try: + query = None + if isinstance(message, HTTPRequest): + query = message.query + description, lines = contentviews.get_content_view( + viewmode, message.content, headers=message.headers, query=query + ) + except ContentViewException: + s = "Content viewer failed: \n" + traceback.format_exc() + signals.add_event(s, "error") + description, lines = contentviews.get_content_view( + contentviews.get("Raw"), message.content, headers=message.headers + ) + description = description.replace("Raw", "Couldn't parse: falling back to Raw") + + # Give hint that you have to tab for the response. + if description == "No content" and isinstance(message, HTTPRequest): + description = "No request content (press tab to view response)" + + # If the users has a wide terminal, he gets fewer lines; this should not be an issue. + chars_per_line = 80 + max_chars = max_lines * chars_per_line + total_chars = 0 + text_objects = [] + for line in lines: + txt = [] + for (style, text) in line: + if total_chars + len(text) > max_chars: + text = text[:max_chars - total_chars] + txt.append((style, text)) + total_chars += len(text) + if total_chars == max_chars: + break + + # round up to the next line. + total_chars = int(math.ceil(total_chars / chars_per_line) * chars_per_line) + + text_objects.append(urwid.Text(txt)) + if total_chars == max_chars: + text_objects.append(urwid.Text([ + ("highlight", "Stopped displaying data after %d lines. Press " % max_lines), + ("key", "f"), + ("highlight", " to load all data.") + ])) + break + + return description, text_objects + + def viewmode_get(self): + override = self.state.get_flow_setting( + self.flow, + (self.tab_offset, "prettyview") + ) + return self.state.default_body_view if override is None else override + + def conn_text(self, conn): + if conn: + txt = common.format_keyvals( + [(h + ":", v) for (h, v) in conn.headers.fields], + key = "header", + val = "text" + ) + viewmode = self.viewmode_get() + msg, body = self.content_view(viewmode, conn) + + cols = [ + urwid.Text( + [ + ("heading", msg), + ] + ), + urwid.Text( + [ + " ", + ('heading', "["), + ('heading_key', "m"), + ('heading', (":%s]" % viewmode.name)), + ], + align="right" + ) + ] + title = urwid.AttrWrap(urwid.Columns(cols), "heading") + + txt.append(title) + txt.extend(body) + else: + txt = [ + urwid.Text(""), + urwid.Text( + [ + ("highlight", "No response. Press "), + ("key", "e"), + ("highlight", " and edit any aspect to add one."), + ] + ) + ] + return searchable.Searchable(self.state, txt) + + def set_method_raw(self, m): + if m: + self.flow.request.method = m + signals.flow_change.send(self, flow = self.flow) + + def edit_method(self, m): + if m == "e": + signals.status_prompt.send( + prompt = "Method", + text = self.flow.request.method, + callback = self.set_method_raw + ) + else: + for i in common.METHOD_OPTIONS: + if i[1] == m: + self.flow.request.method = i[0].upper() + signals.flow_change.send(self, flow = self.flow) + + def set_url(self, url): + request = self.flow.request + try: + request.url = str(url) + except ValueError: + return "Invalid URL." + signals.flow_change.send(self, flow = self.flow) + + def set_resp_code(self, code): + response = self.flow.response + try: + response.status_code = int(code) + except ValueError: + return None + import BaseHTTPServer + if int(code) in BaseHTTPServer.BaseHTTPRequestHandler.responses: + response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[ + int(code)][0] + signals.flow_change.send(self, flow = self.flow) + + def set_resp_msg(self, msg): + response = self.flow.response + response.msg = msg + signals.flow_change.send(self, flow = self.flow) + + def set_headers(self, fields, conn): + conn.headers = Headers(fields) + signals.flow_change.send(self, flow = self.flow) + + def set_query(self, lst, conn): + conn.set_query(odict.ODict(lst)) + signals.flow_change.send(self, flow = self.flow) + + def set_path_components(self, lst, conn): + conn.set_path_components(lst) + signals.flow_change.send(self, flow = self.flow) + + def set_form(self, lst, conn): + conn.set_form_urlencoded(odict.ODict(lst)) + signals.flow_change.send(self, flow = self.flow) + + def edit_form(self, conn): + self.master.view_grideditor( + grideditor.URLEncodedFormEditor( + self.master, + conn.get_form_urlencoded().lst, + self.set_form, + conn + ) + ) + + def edit_form_confirm(self, key, conn): + if key == "y": + self.edit_form(conn) + + def set_cookies(self, lst, conn): + od = odict.ODict(lst) + conn.set_cookies(od) + signals.flow_change.send(self, flow = self.flow) + + def set_setcookies(self, data, conn): + conn.set_cookies(data) + signals.flow_change.send(self, flow = self.flow) + + def edit(self, part): + if self.tab_offset == TAB_REQ: + message = self.flow.request + else: + if not self.flow.response: + self.flow.response = HTTPResponse( + self.flow.request.http_version, + 200, "OK", Headers(), "" + ) + self.flow.response.reply = controller.DummyReply() + message = self.flow.response + + self.flow.backup() + if message == self.flow.request and part == "c": + self.master.view_grideditor( + grideditor.CookieEditor( + self.master, + message.get_cookies().lst, + self.set_cookies, + message + ) + ) + if message == self.flow.response and part == "c": + self.master.view_grideditor( + grideditor.SetCookieEditor( + self.master, + message.get_cookies(), + self.set_setcookies, + message + ) + ) + if part == "r": + with decoded(message): + # Fix an issue caused by some editors when editing a + # request/response body. Many editors make it hard to save a + # file without a terminating newline on the last line. When + # editing message bodies, this can cause problems. For now, I + # just strip the newlines off the end of the body when we return + # from an editor. + c = self.master.spawn_editor(message.content or "") + message.content = c.rstrip("\n") + elif part == "f": + if not message.get_form_urlencoded() and message.content: + signals.status_prompt_onekey.send( + prompt = "Existing body is not a URL-encoded form. Clear and edit?", + keys = [ + ("yes", "y"), + ("no", "n"), + ], + callback = self.edit_form_confirm, + args = (message,) + ) + else: + self.edit_form(message) + elif part == "h": + self.master.view_grideditor( + grideditor.HeaderEditor( + self.master, + message.headers.fields, + self.set_headers, + message + ) + ) + elif part == "p": + p = message.get_path_components() + self.master.view_grideditor( + grideditor.PathEditor( + self.master, + p, + self.set_path_components, + message + ) + ) + elif part == "q": + self.master.view_grideditor( + grideditor.QueryEditor( + self.master, + message.get_query().lst, + self.set_query, message + ) + ) + elif part == "u": + signals.status_prompt.send( + prompt = "URL", + text = message.url, + callback = self.set_url + ) + elif part == "m": + signals.status_prompt_onekey.send( + prompt = "Method", + keys = common.METHOD_OPTIONS, + callback = self.edit_method + ) + elif part == "o": + signals.status_prompt.send( + prompt = "Code", + text = str(message.status_code), + callback = self.set_resp_code + ) + elif part == "m": + signals.status_prompt.send( + prompt = "Message", + text = message.msg, + callback = self.set_resp_msg + ) + signals.flow_change.send(self, flow = self.flow) + + def _view_nextprev_flow(self, np, flow): + try: + idx = self.state.view.index(flow) + except IndexError: + return + if np == "next": + new_flow, new_idx = self.state.get_next(idx) + else: + new_flow, new_idx = self.state.get_prev(idx) + if new_flow is None: + signals.status_message.send(message="No more flows!") + else: + signals.pop_view_state.send(self) + self.master.view_flow(new_flow, self.tab_offset) + + def view_next_flow(self, flow): + return self._view_nextprev_flow("next", flow) + + def view_prev_flow(self, flow): + return self._view_nextprev_flow("prev", flow) + + def change_this_display_mode(self, t): + self.state.add_flow_setting( + self.flow, + (self.tab_offset, "prettyview"), + contentviews.get_by_shortcut(t) + ) + signals.flow_change.send(self, flow = self.flow) + + def delete_body(self, t): + if t == "m": + val = CONTENT_MISSING + else: + val = None + if self.tab_offset == TAB_REQ: + self.flow.request.content = val + else: + self.flow.response.content = val + signals.flow_change.send(self, flow = self.flow) + + def keypress(self, size, key): + key = super(self.__class__, self).keypress(size, key) + + if key == " ": + self.view_next_flow(self.flow) + return + + key = common.shortcuts(key) + if self.tab_offset == TAB_REQ: + conn = self.flow.request + elif self.tab_offset == TAB_RESP: + conn = self.flow.response + else: + conn = None + + if key in ("up", "down", "page up", "page down"): + # Why doesn't this just work?? + self._w.keypress(size, key) + elif key == "a": + self.flow.accept_intercept(self.master) + signals.flow_change.send(self, flow = self.flow) + elif key == "A": + self.master.accept_all() + signals.flow_change.send(self, flow = self.flow) + elif key == "d": + if self.state.flow_count() == 1: + self.master.view_flowlist() + elif self.state.view.index(self.flow) == len(self.state.view) - 1: + self.view_prev_flow(self.flow) + else: + self.view_next_flow(self.flow) + f = self.flow + f.kill(self.master) + self.state.delete_flow(f) + elif key == "D": + f = self.master.duplicate_flow(self.flow) + self.master.view_flow(f) + signals.status_message.send(message="Duplicated.") + elif key == "p": + self.view_prev_flow(self.flow) + elif key == "r": + r = self.master.replay_request(self.flow) + if r: + signals.status_message.send(message=r) + signals.flow_change.send(self, flow = self.flow) + elif key == "V": + if not self.flow.modified(): + signals.status_message.send(message="Flow not modified.") + return + self.state.revert(self.flow) + signals.flow_change.send(self, flow = self.flow) + signals.status_message.send(message="Reverted.") + elif key == "W": + signals.status_prompt_path.send( + prompt = "Save this flow", + callback = self.master.save_one_flow, + args = (self.flow,) + ) + elif key == "E": + signals.status_prompt_onekey.send( + self, + prompt = "Export", + keys = ( + ("as curl command", "c"), + ("as python code", "p"), + ("as raw request", "r"), + ), + callback = common.export_prompt, + args = (self.flow,) + ) + elif key == "|": + signals.status_prompt_path.send( + prompt = "Send flow to script", + callback = self.master.run_script_once, + args = (self.flow,) + ) + + if not conn and key in set(list("befgmxvz")): + signals.status_message.send( + message = "Tab to the request or response", + expire = 1 + ) + elif conn: + if key == "b": + if self.tab_offset == TAB_REQ: + common.ask_save_body( + "q", self.master, self.state, self.flow + ) + else: + common.ask_save_body( + "s", self.master, self.state, self.flow + ) + elif key == "e": + if self.tab_offset == TAB_REQ: + signals.status_prompt_onekey.send( + prompt = "Edit request", + keys = ( + ("cookies", "c"), + ("query", "q"), + ("path", "p"), + ("url", "u"), + ("header", "h"), + ("form", "f"), + ("raw body", "r"), + ("method", "m"), + ), + callback = self.edit + ) + else: + signals.status_prompt_onekey.send( + prompt = "Edit response", + keys = ( + ("cookies", "c"), + ("code", "o"), + ("message", "m"), + ("header", "h"), + ("raw body", "r"), + ), + callback = self.edit + ) + key = None + elif key == "f": + signals.status_message.send(message="Loading all body data...") + self.state.add_flow_setting( + self.flow, + (self.tab_offset, "fullcontents"), + True + ) + signals.flow_change.send(self, flow = self.flow) + signals.status_message.send(message="") + elif key == "P": + if self.tab_offset == TAB_REQ: + scope = "q" + else: + scope = "s" + common.ask_copy_part(scope, self.flow, self.master, self.state) + elif key == "m": + p = list(contentviews.view_prompts) + p.insert(0, ("Clear", "C")) + signals.status_prompt_onekey.send( + self, + prompt = "Display mode", + keys = p, + callback = self.change_this_display_mode + ) + key = None + elif key == "x": + signals.status_prompt_onekey.send( + prompt = "Delete body", + keys = ( + ("completely", "c"), + ("mark as missing", "m"), + ), + callback = self.delete_body + ) + key = None + elif key == "v": + if conn.content: + t = conn.headers.get("content-type") + if "EDITOR" in os.environ or "PAGER" in os.environ: + self.master.spawn_external_viewer(conn.content, t) + else: + signals.status_message.send( + message = "Error! Set $EDITOR or $PAGER." + ) + elif key == "z": + self.flow.backup() + e = conn.headers.get("content-encoding", "identity") + if e != "identity": + if not conn.decode(): + signals.status_message.send( + message = "Could not decode - invalid data?" + ) + else: + signals.status_prompt_onekey.send( + prompt = "Select encoding: ", + keys = ( + ("gzip", "z"), + ("deflate", "d"), + ), + callback = self.encode_callback, + args = (conn,) + ) + signals.flow_change.send(self, flow = self.flow) + return key + + def encode_callback(self, key, conn): + encoding_map = { + "z": "gzip", + "d": "deflate", + } + conn.encode(encoding_map[key]) + signals.flow_change.send(self, flow = self.flow) diff --git a/mitmproxy/console/grideditor.py b/mitmproxy/console/grideditor.py new file mode 100644 index 00000000..a11c962c --- /dev/null +++ b/mitmproxy/console/grideditor.py @@ -0,0 +1,716 @@ +from __future__ import absolute_import + +import copy +import re +import os +import urwid + +from netlib import odict +from netlib.http import user_agents + +from . import common, signals +from .. import utils, filt, script + + +FOOTER = [ + ('heading_key', "enter"), ":edit ", + ('heading_key', "q"), ":back ", +] +FOOTER_EDITING = [ + ('heading_key', "esc"), ":stop editing ", +] + + +class TextColumn: + subeditor = None + + def __init__(self, heading): + self.heading = heading + + def text(self, obj): + return SEscaped(obj or "") + + def blank(self): + return "" + + def keypress(self, key, editor): + if key == "r": + if editor.walker.get_current_value() is not None: + signals.status_prompt_path.send( + self, + prompt = "Read file", + callback = editor.read_file + ) + elif key == "R": + if editor.walker.get_current_value() is not None: + signals.status_prompt_path.send( + editor, + prompt = "Read unescaped file", + callback = editor.read_file, + args = (True,) + ) + elif key == "e": + o = editor.walker.get_current_value() + if o is not None: + n = editor.master.spawn_editor(o.encode("string-escape")) + n = utils.clean_hanging_newline(n) + editor.walker.set_current_value(n, False) + editor.walker._modified() + elif key in ["enter"]: + editor.walker.start_edit() + else: + return key + + +class SubgridColumn: + + def __init__(self, heading, subeditor): + self.heading = heading + self.subeditor = subeditor + + def text(self, obj): + p = http_cookies._format_pairs(obj, sep="\n") + return urwid.Text(p) + + def blank(self): + return [] + + def keypress(self, key, editor): + if key in "rRe": + signals.status_message.send( + self, + message = "Press enter to edit this field.", + expire = 1000 + ) + return + elif key in ["enter"]: + editor.master.view_grideditor( + self.subeditor( + editor.master, + editor.walker.get_current_value(), + editor.set_subeditor_value, + editor.walker.focus, + editor.walker.focus_col + ) + ) + else: + return key + + +class SEscaped(urwid.WidgetWrap): + + def __init__(self, txt): + txt = txt.encode("string-escape") + w = urwid.Text(txt, wrap="any") + urwid.WidgetWrap.__init__(self, w) + + def get_text(self): + return self._w.get_text()[0] + + def keypress(self, size, key): + return key + + def selectable(self): + return True + + +class SEdit(urwid.WidgetWrap): + + def __init__(self, txt): + txt = txt.encode("string-escape") + w = urwid.Edit(edit_text=txt, wrap="any", multiline=True) + w = urwid.AttrWrap(w, "editfield") + urwid.WidgetWrap.__init__(self, w) + + def get_text(self): + return self._w.get_text()[0].strip() + + def selectable(self): + return True + + +class GridRow(urwid.WidgetWrap): + + def __init__(self, focused, editing, editor, values): + self.focused, self.editing, self.editor = focused, editing, editor + + errors = values[1] + self.fields = [] + for i, v in enumerate(values[0]): + if focused == i and editing: + self.editing = SEdit(v) + self.fields.append(self.editing) + else: + w = self.editor.columns[i].text(v) + if focused == i: + if i in errors: + w = urwid.AttrWrap(w, "focusfield_error") + else: + w = urwid.AttrWrap(w, "focusfield") + elif i in errors: + w = urwid.AttrWrap(w, "field_error") + self.fields.append(w) + + fspecs = self.fields[:] + if len(self.fields) > 1: + fspecs[0] = ("fixed", self.editor.first_width + 2, fspecs[0]) + w = urwid.Columns( + fspecs, + dividechars = 2 + ) + if focused is not None: + w.set_focus_column(focused) + urwid.WidgetWrap.__init__(self, w) + + def get_edit_value(self): + return self.editing.get_text() + + def keypress(self, s, k): + if self.editing: + w = self._w.column_widths(s)[self.focused] + k = self.editing.keypress((w,), k) + return k + + def selectable(self): + return True + + +class GridWalker(urwid.ListWalker): + + """ + Stores rows as a list of (rows, errors) tuples, where rows is a list + and errors is a set with an entry of each offset in rows that is an + error. + """ + + def __init__(self, lst, editor): + self.lst = [(i, set([])) for i in lst] + self.editor = editor + self.focus = 0 + self.focus_col = 0 + self.editing = False + + def _modified(self): + self.editor.show_empty_msg() + return urwid.ListWalker._modified(self) + + def add_value(self, lst): + self.lst.append((lst[:], set([]))) + self._modified() + + def get_current_value(self): + if self.lst: + return self.lst[self.focus][0][self.focus_col] + + def set_current_value(self, val, unescaped): + if not unescaped: + try: + val = val.decode("string-escape") + except ValueError: + signals.status_message.send( + self, + message = "Invalid Python-style string encoding.", + expire = 1000 + ) + return + errors = self.lst[self.focus][1] + emsg = self.editor.is_error(self.focus_col, val) + if emsg: + signals.status_message.send(message = emsg, expire = 1) + errors.add(self.focus_col) + else: + errors.discard(self.focus_col) + self.set_value(val, self.focus, self.focus_col, errors) + + def set_value(self, val, focus, focus_col, errors=None): + if not errors: + errors = set([]) + row = list(self.lst[focus][0]) + row[focus_col] = val + self.lst[focus] = [tuple(row), errors] + self._modified() + + def delete_focus(self): + if self.lst: + del self.lst[self.focus] + self.focus = min(len(self.lst) - 1, self.focus) + self._modified() + + def _insert(self, pos): + self.focus = pos + self.lst.insert( + self.focus, + [ + [c.blank() for c in self.editor.columns], set([]) + ] + ) + self.focus_col = 0 + self.start_edit() + + def insert(self): + return self._insert(self.focus) + + def add(self): + return self._insert(min(self.focus + 1, len(self.lst))) + + def start_edit(self): + col = self.editor.columns[self.focus_col] + if self.lst and not col.subeditor: + self.editing = GridRow( + self.focus_col, True, self.editor, self.lst[self.focus] + ) + self.editor.master.loop.widget.footer.update(FOOTER_EDITING) + self._modified() + + def stop_edit(self): + if self.editing: + self.editor.master.loop.widget.footer.update(FOOTER) + self.set_current_value(self.editing.get_edit_value(), False) + self.editing = False + self._modified() + + def left(self): + self.focus_col = max(self.focus_col - 1, 0) + self._modified() + + def right(self): + self.focus_col = min(self.focus_col + 1, len(self.editor.columns) - 1) + self._modified() + + def tab_next(self): + self.stop_edit() + if self.focus_col < len(self.editor.columns) - 1: + self.focus_col += 1 + elif self.focus != len(self.lst) - 1: + self.focus_col = 0 + self.focus += 1 + self._modified() + + def get_focus(self): + if self.editing: + return self.editing, self.focus + elif self.lst: + return GridRow( + self.focus_col, + False, + self.editor, + self.lst[self.focus] + ), self.focus + else: + return None, None + + def set_focus(self, focus): + self.stop_edit() + self.focus = focus + self._modified() + + def get_next(self, pos): + if pos + 1 >= len(self.lst): + return None, None + return GridRow(None, False, self.editor, self.lst[pos + 1]), pos + 1 + + def get_prev(self, pos): + if pos - 1 < 0: + return None, None + return GridRow(None, False, self.editor, self.lst[pos - 1]), pos - 1 + + +class GridListBox(urwid.ListBox): + + def __init__(self, lw): + urwid.ListBox.__init__(self, lw) + + +FIRST_WIDTH_MAX = 40 +FIRST_WIDTH_MIN = 20 + + +class GridEditor(urwid.WidgetWrap): + title = None + columns = None + + def __init__(self, master, value, callback, *cb_args, **cb_kwargs): + value = self.data_in(copy.deepcopy(value)) + self.master, self.value, self.callback = master, value, callback + self.cb_args, self.cb_kwargs = cb_args, cb_kwargs + + first_width = 20 + if value: + for r in value: + assert len(r) == len(self.columns) + first_width = max(len(r), first_width) + self.first_width = min(first_width, FIRST_WIDTH_MAX) + + title = urwid.Text(self.title) + title = urwid.Padding(title, align="left", width=("relative", 100)) + title = urwid.AttrWrap(title, "heading") + + headings = [] + for i, col in enumerate(self.columns): + c = urwid.Text(col.heading) + if i == 0 and len(self.columns) > 1: + headings.append(("fixed", first_width + 2, c)) + else: + headings.append(c) + h = urwid.Columns( + headings, + dividechars = 2 + ) + h = urwid.AttrWrap(h, "heading") + + self.walker = GridWalker(self.value, self) + self.lb = GridListBox(self.walker) + self._w = urwid.Frame( + self.lb, + header = urwid.Pile([title, h]) + ) + self.master.loop.widget.footer.update("") + self.show_empty_msg() + + def show_empty_msg(self): + if self.walker.lst: + self._w.set_footer(None) + else: + self._w.set_footer( + urwid.Text( + [ + ("highlight", "No values. Press "), + ("key", "a"), + ("highlight", " to add some."), + ] + ) + ) + + def encode(self, s): + if not self.encoding: + return s + try: + return s.encode(self.encoding) + except ValueError: + return None + + def read_file(self, p, unescaped=False): + if p: + try: + p = os.path.expanduser(p) + d = file(p, "rb").read() + self.walker.set_current_value(d, unescaped) + self.walker._modified() + except IOError as v: + return str(v) + + def set_subeditor_value(self, val, focus, focus_col): + self.walker.set_value(val, focus, focus_col) + + def keypress(self, size, key): + if self.walker.editing: + if key in ["esc"]: + self.walker.stop_edit() + elif key == "tab": + pf, pfc = self.walker.focus, self.walker.focus_col + self.walker.tab_next() + if self.walker.focus == pf and self.walker.focus_col != pfc: + self.walker.start_edit() + else: + self._w.keypress(size, key) + return None + + key = common.shortcuts(key) + column = self.columns[self.walker.focus_col] + if key in ["q", "esc"]: + res = [] + for i in self.walker.lst: + if not i[1] and any([x for x in i[0]]): + res.append(i[0]) + self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs) + signals.pop_view_state.send(self) + elif key == "g": + self.walker.set_focus(0) + elif key == "G": + self.walker.set_focus(len(self.walker.lst) - 1) + elif key in ["h", "left"]: + self.walker.left() + elif key in ["l", "right"]: + self.walker.right() + elif key == "tab": + self.walker.tab_next() + elif key == "a": + self.walker.add() + elif key == "A": + self.walker.insert() + elif key == "d": + self.walker.delete_focus() + elif column.keypress(key, self) and not self.handle_key(key): + return self._w.keypress(size, key) + + def data_out(self, data): + """ + Called on raw list data, before data is returned through the + callback. + """ + return data + + def data_in(self, data): + """ + Called to prepare provided data. + """ + return data + + def is_error(self, col, val): + """ + Return False, or a string error message. + """ + return False + + def handle_key(self, key): + return False + + def make_help(self): + text = [] + text.append(urwid.Text([("text", "Editor control:\n")])) + keys = [ + ("A", "insert row before cursor"), + ("a", "add row after cursor"), + ("d", "delete row"), + ("e", "spawn external editor on current field"), + ("q", "save changes and exit editor"), + ("r", "read value from file"), + ("R", "read unescaped value from file"), + ("esc", "save changes and exit editor"), + ("tab", "next field"), + ("enter", "edit field"), + ] + text.extend( + common.format_keyvals(keys, key="key", val="text", indent=4) + ) + text.append( + urwid.Text( + [ + "\n", + ("text", "Values are escaped Python-style strings.\n"), + ] + ) + ) + return text + + +class QueryEditor(GridEditor): + title = "Editing query" + columns = [ + TextColumn("Key"), + TextColumn("Value") + ] + + +class HeaderEditor(GridEditor): + title = "Editing headers" + columns = [ + TextColumn("Key"), + TextColumn("Value") + ] + + def make_help(self): + h = GridEditor.make_help(self) + text = [] + text.append(urwid.Text([("text", "Special keys:\n")])) + keys = [ + ("U", "add User-Agent header"), + ] + text.extend( + common.format_keyvals(keys, key="key", val="text", indent=4) + ) + text.append(urwid.Text([("text", "\n")])) + text.extend(h) + return text + + def set_user_agent(self, k): + ua = user_agents.get_by_shortcut(k) + if ua: + self.walker.add_value( + [ + "User-Agent", + ua[2] + ] + ) + + def handle_key(self, key): + if key == "U": + signals.status_prompt_onekey.send( + prompt = "Add User-Agent header:", + keys = [(i[0], i[1]) for i in user_agents.UASTRINGS], + callback = self.set_user_agent, + ) + return True + + +class URLEncodedFormEditor(GridEditor): + title = "Editing URL-encoded form" + columns = [ + TextColumn("Key"), + TextColumn("Value") + ] + + +class ReplaceEditor(GridEditor): + title = "Editing replacement patterns" + columns = [ + TextColumn("Filter"), + TextColumn("Regex"), + TextColumn("Replacement"), + ] + + def is_error(self, col, val): + if col == 0: + if not filt.parse(val): + return "Invalid filter specification." + elif col == 1: + try: + re.compile(val) + except re.error: + return "Invalid regular expression." + return False + + +class SetHeadersEditor(GridEditor): + title = "Editing header set patterns" + columns = [ + TextColumn("Filter"), + TextColumn("Header"), + TextColumn("Value"), + ] + + def is_error(self, col, val): + if col == 0: + if not filt.parse(val): + return "Invalid filter specification" + return False + + def make_help(self): + h = GridEditor.make_help(self) + text = [] + text.append(urwid.Text([("text", "Special keys:\n")])) + keys = [ + ("U", "add User-Agent header"), + ] + text.extend( + common.format_keyvals(keys, key="key", val="text", indent=4) + ) + text.append(urwid.Text([("text", "\n")])) + text.extend(h) + return text + + def set_user_agent(self, k): + ua = user_agents.get_by_shortcut(k) + if ua: + self.walker.add_value( + [ + ".*", + "User-Agent", + ua[2] + ] + ) + + def handle_key(self, key): + if key == "U": + signals.status_prompt_onekey.send( + prompt = "Add User-Agent header:", + keys = [(i[0], i[1]) for i in user_agents.UASTRINGS], + callback = self.set_user_agent, + ) + return True + + +class PathEditor(GridEditor): + title = "Editing URL path components" + columns = [ + TextColumn("Component"), + ] + + def data_in(self, data): + return [[i] for i in data] + + def data_out(self, data): + return [i[0] for i in data] + + +class ScriptEditor(GridEditor): + title = "Editing scripts" + columns = [ + TextColumn("Command"), + ] + + def is_error(self, col, val): + try: + script.Script.parse_command(val) + except script.ScriptException as v: + return str(v) + + +class HostPatternEditor(GridEditor): + title = "Editing host patterns" + columns = [ + TextColumn("Regex (matched on hostname:port / ip:port)") + ] + + def is_error(self, col, val): + try: + re.compile(val, re.IGNORECASE) + except re.error as e: + return "Invalid regex: %s" % str(e) + + def data_in(self, data): + return [[i] for i in data] + + def data_out(self, data): + return [i[0] for i in data] + + +class CookieEditor(GridEditor): + title = "Editing request Cookie header" + columns = [ + TextColumn("Name"), + TextColumn("Value"), + ] + + +class CookieAttributeEditor(GridEditor): + title = "Editing Set-Cookie attributes" + columns = [ + TextColumn("Name"), + TextColumn("Value"), + ] + + def data_out(self, data): + ret = [] + for i in data: + if not i[1]: + ret.append([i[0], None]) + else: + ret.append(i) + return ret + + +class SetCookieEditor(GridEditor): + title = "Editing response SetCookie header" + columns = [ + TextColumn("Name"), + TextColumn("Value"), + SubgridColumn("Attributes", CookieAttributeEditor), + ] + + def data_in(self, data): + flattened = [] + for k, v in data.items(): + flattened.append([k, v[0], v[1].lst]) + return flattened + + def data_out(self, data): + vals = [] + for i in data: + vals.append( + [ + i[0], + [i[1], odict.ODictCaseless(i[2])] + ] + ) + return odict.ODict(vals) diff --git a/mitmproxy/console/help.py b/mitmproxy/console/help.py new file mode 100644 index 00000000..0c264ebf --- /dev/null +++ b/mitmproxy/console/help.py @@ -0,0 +1,117 @@ +from __future__ import absolute_import + +import urwid + +from . import common, signals +from .. import filt, version + +footer = [ + ("heading", 'mitmproxy v%s ' % version.VERSION), + ('heading_key', "q"), ":back ", +] + + +class HelpView(urwid.ListBox): + + def __init__(self, help_context): + self.help_context = help_context or [] + urwid.ListBox.__init__( + self, + self.helptext() + ) + + def helptext(self): + text = [] + text.append(urwid.Text([("head", "This view:\n")])) + text.extend(self.help_context) + + text.append(urwid.Text([("head", "\n\nMovement:\n")])) + keys = [ + ("j, k", "down, up"), + ("h, l", "left, right (in some contexts)"), + ("g, G", "go to beginning, end"), + ("space", "page down"), + ("pg up/down", "page up/down"), + ("ctrl+b/ctrl+f", "page up/down"), + ("arrows", "up, down, left, right"), + ] + text.extend( + common.format_keyvals( + keys, + key="key", + val="text", + indent=4)) + + text.append(urwid.Text([("head", "\n\nGlobal keys:\n")])) + keys = [ + ("c", "client replay of HTTP requests"), + ("i", "set interception pattern"), + ("o", "options"), + ("q", "quit / return to previous page"), + ("Q", "quit without confirm prompt"), + ("S", "server replay of HTTP responses"), + ] + text.extend( + common.format_keyvals(keys, key="key", val="text", indent=4) + ) + + text.append(urwid.Text([("head", "\n\nFilter expressions:\n")])) + f = [] + for i in filt.filt_unary: + f.append( + ("~%s" % i.code, i.help) + ) + for i in filt.filt_rex: + f.append( + ("~%s regex" % i.code, i.help) + ) + for i in filt.filt_int: + f.append( + ("~%s int" % i.code, i.help) + ) + f.sort() + f.extend( + [ + ("!", "unary not"), + ("&", "and"), + ("|", "or"), + ("(...)", "grouping"), + ] + ) + text.extend(common.format_keyvals(f, key="key", val="text", indent=4)) + + text.append( + urwid.Text( + [ + "\n", + ("text", " Regexes are Python-style.\n"), + ("text", " Regexes can be specified as quoted strings.\n"), + ("text", " Header matching (~h, ~hq, ~hs) is against a string of the form \"name: value\".\n"), + ("text", " Expressions with no operators are regex matches against URL.\n"), + ("text", " Default binary operator is &.\n"), + ("head", "\n Examples:\n"), + ] + ) + ) + examples = [ + ("google\.com", "Url containing \"google.com"), + ("~q ~b test", "Requests where body contains \"test\""), + ("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."), + ] + text.extend( + common.format_keyvals(examples, key="key", val="text", indent=4) + ) + return text + + def keypress(self, size, key): + key = common.shortcuts(key) + if key == "q": + signals.pop_view_state.send(self) + return None + elif key == "?": + key = None + elif key == "g": + self.set_focus(0) + elif key == "G": + self.set_focus(len(self.body.contents)) + return urwid.ListBox.keypress(self, size, key) diff --git a/mitmproxy/console/options.py b/mitmproxy/console/options.py new file mode 100644 index 00000000..5c9e0cc9 --- /dev/null +++ b/mitmproxy/console/options.py @@ -0,0 +1,271 @@ +import urwid + +from .. import contentviews +from . import common, signals, grideditor +from . import select, palettes + +footer = [ + ('heading_key', "enter/space"), ":toggle ", + ('heading_key', "C"), ":clear all ", +] + + +def _mkhelp(): + text = [] + keys = [ + ("enter/space", "activate option"), + ("C", "clear all options"), + ] + text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + return text +help_context = _mkhelp() + + +class Options(urwid.WidgetWrap): + + def __init__(self, master): + self.master = master + self.lb = select.Select( + [ + select.Heading("Traffic Manipulation"), + select.Option( + "Header Set Patterns", + "H", + lambda: master.setheaders.count(), + self.setheaders + ), + select.Option( + "Ignore Patterns", + "I", + lambda: master.server.config.check_ignore, + self.ignorepatterns + ), + select.Option( + "Replacement Patterns", + "R", + lambda: master.replacehooks.count(), + self.replacepatterns + ), + select.Option( + "Scripts", + "S", + lambda: master.scripts, + self.scripts + ), + + select.Heading("Interface"), + select.Option( + "Default Display Mode", + "M", + self.has_default_displaymode, + self.default_displaymode + ), + select.Option( + "Palette", + "P", + lambda: self.master.palette != palettes.DEFAULT, + self.palette + ), + select.Option( + "Show Host", + "w", + lambda: master.showhost, + self.toggle_showhost + ), + + select.Heading("Network"), + select.Option( + "No Upstream Certs", + "U", + lambda: master.server.config.no_upstream_cert, + self.toggle_upstream_cert + ), + select.Option( + "TCP Proxying", + "T", + lambda: master.server.config.check_tcp, + self.tcp_proxy + ), + + select.Heading("Utility"), + select.Option( + "Anti-Cache", + "a", + lambda: master.anticache, + self.toggle_anticache + ), + select.Option( + "Anti-Compression", + "o", + lambda: master.anticomp, + self.toggle_anticomp + ), + select.Option( + "Kill Extra", + "x", + lambda: master.killextra, + self.toggle_killextra + ), + select.Option( + "No Refresh", + "f", + lambda: not master.refresh_server_playback, + self.toggle_refresh_server_playback + ), + select.Option( + "Sticky Auth", + "A", + lambda: master.stickyauth_txt, + self.sticky_auth + ), + select.Option( + "Sticky Cookies", + "t", + lambda: master.stickycookie_txt, + self.sticky_cookie + ), + ] + ) + title = urwid.Text("Options") + title = urwid.Padding(title, align="left", width=("relative", 100)) + title = urwid.AttrWrap(title, "heading") + self._w = urwid.Frame( + self.lb, + header = title + ) + self.master.loop.widget.footer.update("") + signals.update_settings.connect(self.sig_update_settings) + + def sig_update_settings(self, sender): + self.lb.walker._modified() + + def keypress(self, size, key): + if key == "C": + self.clearall() + return None + return super(self.__class__, self).keypress(size, key) + + def clearall(self): + self.master.anticache = False + self.master.anticomp = False + self.master.killextra = False + self.master.showhost = False + self.master.refresh_server_playback = True + self.master.server.config.no_upstream_cert = False + self.master.setheaders.clear() + self.master.replacehooks.clear() + self.master.set_ignore_filter([]) + self.master.set_tcp_filter([]) + self.master.scripts = [] + self.master.set_stickyauth(None) + self.master.set_stickycookie(None) + self.master.state.default_body_view = contentviews.get("Auto") + + signals.update_settings.send(self) + signals.status_message.send( + message = "All select.Options cleared", + expire = 1 + ) + + def toggle_anticache(self): + self.master.anticache = not self.master.anticache + + def toggle_anticomp(self): + self.master.anticomp = not self.master.anticomp + + def toggle_killextra(self): + self.master.killextra = not self.master.killextra + + def toggle_showhost(self): + self.master.showhost = not self.master.showhost + + def toggle_refresh_server_playback(self): + self.master.refresh_server_playback = not self.master.refresh_server_playback + + def toggle_upstream_cert(self): + self.master.server.config.no_upstream_cert = not self.master.server.config.no_upstream_cert + signals.update_settings.send(self) + + def setheaders(self): + def _set(*args, **kwargs): + self.master.setheaders.set(*args, **kwargs) + signals.update_settings.send(self) + self.master.view_grideditor( + grideditor.SetHeadersEditor( + self.master, + self.master.setheaders.get_specs(), + _set + ) + ) + + def ignorepatterns(self): + def _set(ignore): + self.master.set_ignore_filter(ignore) + signals.update_settings.send(self) + self.master.view_grideditor( + grideditor.HostPatternEditor( + self.master, + self.master.get_ignore_filter(), + _set + ) + ) + + def replacepatterns(self): + def _set(*args, **kwargs): + self.master.replacehooks.set(*args, **kwargs) + signals.update_settings.send(self) + self.master.view_grideditor( + grideditor.ReplaceEditor( + self.master, + self.master.replacehooks.get_specs(), + _set + ) + ) + + def scripts(self): + self.master.view_grideditor( + grideditor.ScriptEditor( + self.master, + [[i.command] for i in self.master.scripts], + self.master.edit_scripts + ) + ) + + def default_displaymode(self): + signals.status_prompt_onekey.send( + prompt = "Global default display mode", + keys = contentviews.view_prompts, + callback = self.master.change_default_display_mode + ) + + def has_default_displaymode(self): + return self.master.state.default_body_view.name != "Auto" + + def tcp_proxy(self): + def _set(tcp): + self.master.set_tcp_filter(tcp) + signals.update_settings.send(self) + self.master.view_grideditor( + grideditor.HostPatternEditor( + self.master, + self.master.get_tcp_filter(), + _set + ) + ) + + def sticky_auth(self): + signals.status_prompt.send( + prompt = "Sticky auth filter", + text = self.master.stickyauth_txt, + callback = self.master.set_stickyauth + ) + + def sticky_cookie(self): + signals.status_prompt.send( + prompt = "Sticky cookie filter", + text = self.master.stickycookie_txt, + callback = self.master.set_stickycookie + ) + + def palette(self): + self.master.view_palette_picker() diff --git a/mitmproxy/console/palettepicker.py b/mitmproxy/console/palettepicker.py new file mode 100644 index 00000000..51ad0606 --- /dev/null +++ b/mitmproxy/console/palettepicker.py @@ -0,0 +1,82 @@ +import urwid + +from . import select, common, palettes, signals + +footer = [ + ('heading_key', "enter/space"), ":select", +] + + +def _mkhelp(): + text = [] + keys = [ + ("enter/space", "select"), + ] + text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + return text +help_context = _mkhelp() + + +class PalettePicker(urwid.WidgetWrap): + + def __init__(self, master): + self.master = master + low, high = [], [] + for k, v in palettes.palettes.items(): + if v.high: + high.append(k) + else: + low.append(k) + high.sort() + low.sort() + + options = [ + select.Heading("High Colour") + ] + + def mkopt(name): + return select.Option( + i, + None, + lambda: self.master.palette == name, + lambda: self.select(name) + ) + + for i in high: + options.append(mkopt(i)) + options.append(select.Heading("Low Colour")) + for i in low: + options.append(mkopt(i)) + + options.extend( + [ + select.Heading("Options"), + select.Option( + "Transparent", + "T", + lambda: master.palette_transparent, + self.toggle_palette_transparent + ) + ] + ) + + self.lb = select.Select(options) + title = urwid.Text("Palettes") + title = urwid.Padding(title, align="left", width=("relative", 100)) + title = urwid.AttrWrap(title, "heading") + self._w = urwid.Frame( + self.lb, + header = title + ) + signals.update_settings.connect(self.sig_update_settings) + + def sig_update_settings(self, sender): + self.lb.walker._modified() + + def select(self, name): + self.master.set_palette(name) + + def toggle_palette_transparent(self): + self.master.palette_transparent = not self.master.palette_transparent + self.master.set_palette(self.master.palette) + signals.update_settings.send(self) diff --git a/mitmproxy/console/palettes.py b/mitmproxy/console/palettes.py new file mode 100644 index 00000000..bd370181 --- /dev/null +++ b/mitmproxy/console/palettes.py @@ -0,0 +1,326 @@ +# Low-color themes should ONLY use the standard foreground and background +# colours listed here: +# +# http://urwid.org/manual/displayattributes.html +# + + +class Palette: + _fields = [ + 'background', + 'title', + + # Status bar & heading + 'heading', 'heading_key', 'heading_inactive', + + # Help + 'key', 'head', 'text', + + # Options + 'option_selected', 'option_active', 'option_active_selected', + 'option_selected_key', + + # List and Connections + 'method', 'focus', + 'code_200', 'code_300', 'code_400', 'code_500', 'code_other', + 'error', + 'header', 'highlight', 'intercept', 'replay', 'mark', + + # Hex view + 'offset', + + # Grid Editor + 'focusfield', 'focusfield_error', 'field_error', 'editfield', + ] + high = None + + def palette(self, transparent): + l = [] + highback, lowback = None, None + if not transparent: + if self.high and self.high.get("background"): + highback = self.high["background"][1] + lowback = self.low["background"][1] + + for i in self._fields: + if transparent and i == "background": + l.append(["background", "default", "default"]) + else: + v = [i] + low = list(self.low[i]) + if lowback and low[1] == "default": + low[1] = lowback + v.extend(low) + if self.high and i in self.high: + v.append(None) + high = list(self.high[i]) + if highback and high[1] == "default": + high[1] = highback + v.extend(high) + elif highback and self.low[i][1] == "default": + high = [None, low[0], highback] + v.extend(high) + l.append(tuple(v)) + return l + + +class LowDark(Palette): + + """ + Low-color dark background + """ + low = dict( + background = ('white', 'black'), + title = ('white,bold', 'default'), + + # Status bar & heading + heading = ('white', 'dark blue'), + heading_key = ('light cyan', 'dark blue'), + heading_inactive = ('dark gray', 'light gray'), + + # Help + key = ('light cyan', 'default'), + head = ('white,bold', 'default'), + text = ('light gray', 'default'), + + # Options + option_selected = ('black', 'light gray'), + option_selected_key = ('light cyan', 'light gray'), + option_active = ('light red', 'default'), + option_active_selected = ('light red', 'light gray'), + + # List and Connections + method = ('dark cyan', 'default'), + focus = ('yellow', 'default'), + + code_200 = ('dark green', 'default'), + code_300 = ('light blue', 'default'), + code_400 = ('light red', 'default'), + code_500 = ('light red', 'default'), + code_other = ('dark red', 'default'), + + error = ('light red', 'default'), + + header = ('dark cyan', 'default'), + highlight = ('white,bold', 'default'), + intercept = ('brown', 'default'), + replay = ('light green', 'default'), + mark = ('light red', 'default'), + + # Hex view + offset = ('dark cyan', 'default'), + + # Grid Editor + focusfield = ('black', 'light gray'), + focusfield_error = ('dark red', 'light gray'), + field_error = ('dark red', 'default'), + editfield = ('white', 'default'), + ) + + +class Dark(LowDark): + high = dict( + heading_inactive = ('g58', 'g11'), + intercept = ('#f60', 'default'), + + option_selected = ('g85', 'g45'), + option_selected_key = ('light cyan', 'g50'), + option_active_selected = ('light red', 'g50'), + ) + + +class LowLight(Palette): + + """ + Low-color light background + """ + low = dict( + background = ('black', 'white'), + title = ('dark magenta', 'default'), + + # Status bar & heading + heading = ('white', 'black'), + heading_key = ('dark blue', 'black'), + heading_inactive = ('black', 'light gray'), + + # Help + key = ('dark blue', 'default'), + head = ('black', 'default'), + text = ('dark gray', 'default'), + + # Options + option_selected = ('black', 'light gray'), + option_selected_key = ('dark blue', 'light gray'), + option_active = ('light red', 'default'), + option_active_selected = ('light red', 'light gray'), + + # List and Connections + method = ('dark cyan', 'default'), + focus = ('black', 'default'), + + code_200 = ('dark green', 'default'), + code_300 = ('light blue', 'default'), + code_400 = ('dark red', 'default'), + code_500 = ('dark red', 'default'), + code_other = ('light red', 'default'), + + error = ('light red', 'default'), + + header = ('dark blue', 'default'), + highlight = ('black,bold', 'default'), + intercept = ('brown', 'default'), + replay = ('dark green', 'default'), + mark = ('dark red', 'default'), + + # Hex view + offset = ('dark blue', 'default'), + + # Grid Editor + focusfield = ('black', 'light gray'), + focusfield_error = ('dark red', 'light gray'), + field_error = ('dark red', 'black'), + editfield = ('black', 'default'), + ) + + +class Light(LowLight): + high = dict( + background = ('black', 'g100'), + heading = ('g99', '#08f'), + heading_key = ('#0ff,bold', '#08f'), + heading_inactive = ('g35', 'g85'), + replay = ('#0a0,bold', 'default'), + + option_selected = ('black', 'g85'), + option_selected_key = ('dark blue', 'g85'), + option_active_selected = ('light red', 'g85'), + ) + + +# Solarized palette in Urwid-style terminal high-colour offsets +# See: http://ethanschoonover.com/solarized +sol_base03 = "h234" +sol_base02 = "h235" +sol_base01 = "h240" +sol_base00 = "h241" +sol_base0 = "h244" +sol_base1 = "h245" +sol_base2 = "h254" +sol_base3 = "h230" +sol_yellow = "h136" +sol_orange = "h166" +sol_red = "h160" +sol_magenta = "h125" +sol_violet = "h61" +sol_blue = "h33" +sol_cyan = "h37" +sol_green = "h64" + + +class SolarizedLight(LowLight): + high = dict( + background = (sol_base00, sol_base3), + title = (sol_cyan, 'default'), + text = (sol_base00, 'default'), + + # Status bar & heading + heading = (sol_base2, sol_base02), + heading_key = (sol_blue, sol_base03), + heading_inactive = (sol_base03, sol_base1), + + # Help + key = (sol_blue, 'default',), + head = (sol_base00, 'default'), + + # Options + option_selected = (sol_base03, sol_base2), + option_selected_key = (sol_blue, sol_base2), + option_active = (sol_orange, 'default'), + option_active_selected = (sol_orange, sol_base2), + + # List and Connections + method = (sol_cyan, 'default'), + focus = (sol_base01, 'default'), + + code_200 = (sol_green, 'default'), + code_300 = (sol_blue, 'default'), + code_400 = (sol_orange, 'default',), + code_500 = (sol_red, 'default'), + code_other = (sol_magenta, 'default'), + + error = (sol_red, 'default'), + + header = (sol_blue, 'default'), + highlight = (sol_base01, 'default'), + intercept = (sol_red, 'default',), + replay = (sol_green, 'default',), + + # Hex view + offset = (sol_cyan, 'default'), + + # Grid Editor + focusfield = (sol_base00, sol_base2), + focusfield_error = (sol_red, sol_base2), + field_error = (sol_red, 'default'), + editfield = (sol_base01, 'default'), + ) + + +class SolarizedDark(LowDark): + high = dict( + background = (sol_base2, sol_base03), + title = (sol_blue, 'default'), + text = (sol_base1, 'default'), + + # Status bar & heading + heading = (sol_base2, sol_base01), + heading_key = (sol_blue + ",bold", sol_base01), + heading_inactive = (sol_base1, sol_base02), + + # Help + key = (sol_blue, 'default',), + head = (sol_base2, 'default'), + + # Options + option_selected = (sol_base03, sol_base00), + option_selected_key = (sol_blue, sol_base00), + option_active = (sol_orange, 'default'), + option_active_selected = (sol_orange, sol_base00), + + # List and Connections + method = (sol_cyan, 'default'), + focus = (sol_base1, 'default'), + + code_200 = (sol_green, 'default'), + code_300 = (sol_blue, 'default'), + code_400 = (sol_orange, 'default',), + code_500 = (sol_red, 'default'), + code_other = (sol_magenta, 'default'), + + error = (sol_red, 'default'), + + header = (sol_blue, 'default'), + highlight = (sol_base01, 'default'), + intercept = (sol_red, 'default',), + replay = (sol_green, 'default',), + + # Hex view + offset = (sol_cyan, 'default'), + + # Grid Editor + focusfield = (sol_base0, sol_base02), + focusfield_error = (sol_red, sol_base02), + field_error = (sol_red, 'default'), + editfield = (sol_base1, 'default'), + ) + + +DEFAULT = "dark" +palettes = { + "lowlight": LowLight(), + "lowdark": LowDark(), + "light": Light(), + "dark": Dark(), + "solarized_light": SolarizedLight(), + "solarized_dark": SolarizedDark(), +} diff --git a/mitmproxy/console/pathedit.py b/mitmproxy/console/pathedit.py new file mode 100644 index 00000000..4447070b --- /dev/null +++ b/mitmproxy/console/pathedit.py @@ -0,0 +1,71 @@ +import glob +import os.path + +import urwid + + +class _PathCompleter: + + def __init__(self, _testing=False): + """ + _testing: disables reloading of the lookup table to make testing + possible. + """ + self.lookup, self.offset = None, None + self.final = None + self._testing = _testing + + def reset(self): + self.lookup = None + self.offset = -1 + + def complete(self, txt): + """ + Returns the next completion for txt, or None if there is no + completion. + """ + path = os.path.expanduser(txt) + if not self.lookup: + if not self._testing: + # Lookup is a set of (display value, actual value) tuples. + self.lookup = [] + if os.path.isdir(path): + files = glob.glob(os.path.join(path, "*")) + prefix = txt + else: + files = glob.glob(path + "*") + prefix = os.path.dirname(txt) + prefix = prefix or "./" + for f in files: + display = os.path.join(prefix, os.path.basename(f)) + if os.path.isdir(f): + display += "/" + self.lookup.append((display, f)) + if not self.lookup: + self.final = path + return path + self.lookup.sort() + self.offset = -1 + self.lookup.append((txt, txt)) + self.offset += 1 + if self.offset >= len(self.lookup): + self.offset = 0 + ret = self.lookup[self.offset] + self.final = ret[1] + return ret[0] + + +class PathEdit(urwid.Edit, _PathCompleter): + + def __init__(self, *args, **kwargs): + urwid.Edit.__init__(self, *args, **kwargs) + _PathCompleter.__init__(self) + + def keypress(self, size, key): + if key == "tab": + comp = self.complete(self.get_edit_text()) + self.set_edit_text(comp) + self.set_edit_pos(len(comp)) + else: + self.reset() + return urwid.Edit.keypress(self, size, key) diff --git a/mitmproxy/console/searchable.py b/mitmproxy/console/searchable.py new file mode 100644 index 00000000..cff1f0a1 --- /dev/null +++ b/mitmproxy/console/searchable.py @@ -0,0 +1,93 @@ +import urwid + +from . import signals + + +class Highlight(urwid.AttrMap): + + def __init__(self, t): + urwid.AttrMap.__init__( + self, + urwid.Text(t.text), + "focusfield", + ) + self.backup = t + + +class Searchable(urwid.ListBox): + + def __init__(self, state, contents): + self.walker = urwid.SimpleFocusListWalker(contents) + urwid.ListBox.__init__(self, self.walker) + self.state = state + self.search_offset = 0 + self.current_highlight = None + self.search_term = None + + def keypress(self, size, key): + if key == "/": + signals.status_prompt.send( + prompt = "Search for", + text = "", + callback = self.set_search + ) + elif key == "n": + self.find_next(False) + elif key == "N": + self.find_next(True) + elif key == "g": + self.set_focus(0) + self.walker._modified() + elif key == "G": + self.set_focus(len(self.walker) - 1) + self.walker._modified() + else: + return super(self.__class__, self).keypress(size, key) + + def set_search(self, text): + self.state.last_search = text + self.search_term = text or None + self.find_next(False) + + def set_highlight(self, offset): + if self.current_highlight is not None: + old = self.body[self.current_highlight] + self.body[self.current_highlight] = old.backup + if offset is None: + self.current_highlight = None + else: + self.body[offset] = Highlight(self.body[offset]) + self.current_highlight = offset + + def get_text(self, w): + if isinstance(w, urwid.Text): + return w.text + elif isinstance(w, Highlight): + return w.backup.text + else: + return None + + def find_next(self, backwards): + if not self.search_term: + if self.state.last_search: + self.search_term = self.state.last_search + else: + self.set_highlight(None) + return + # Start search at focus + 1 + if backwards: + rng = xrange(len(self.body) - 1, -1, -1) + else: + rng = xrange(1, len(self.body) + 1) + for i in rng: + off = (self.focus_position + i) % len(self.body) + w = self.body[off] + txt = self.get_text(w) + if txt and self.search_term in txt: + self.set_highlight(off) + self.set_focus(off, coming_from="above") + self.body._modified() + return + else: + self.set_highlight(None) + signals.status_message.send(message="Search not found.", expire=1) diff --git a/mitmproxy/console/select.py b/mitmproxy/console/select.py new file mode 100644 index 00000000..928a7ca5 --- /dev/null +++ b/mitmproxy/console/select.py @@ -0,0 +1,120 @@ +import urwid + +from . import common + + +class _OptionWidget(urwid.WidgetWrap): + + def __init__(self, option, text, shortcut, active, focus): + self.option = option + textattr = "text" + keyattr = "key" + if focus and active: + textattr = "option_active_selected" + keyattr = "option_selected_key" + elif focus: + textattr = "option_selected" + keyattr = "option_selected_key" + elif active: + textattr = "option_active" + if shortcut: + text = common.highlight_key( + text, + shortcut, + textattr = textattr, + keyattr = keyattr + ) + opt = urwid.Text(text, align="left") + opt = urwid.AttrWrap(opt, textattr) + opt = urwid.Padding(opt, align = "center", width = 40) + urwid.WidgetWrap.__init__(self, opt) + + def keypress(self, size, key): + return key + + def selectable(self): + return True + + +class OptionWalker(urwid.ListWalker): + + def __init__(self, options): + urwid.ListWalker.__init__(self) + self.options = options + self.focus = 0 + + def set_focus(self, pos): + self.focus = pos + + def get_focus(self): + return self.options[self.focus].render(True), self.focus + + def get_next(self, pos): + if pos >= len(self.options) - 1: + return None, None + return self.options[pos + 1].render(False), pos + 1 + + def get_prev(self, pos): + if pos <= 0: + return None, None + return self.options[pos - 1].render(False), pos - 1 + + +class Heading: + + def __init__(self, text): + self.text = text + + def render(self, focus): + opt = urwid.Text("\n" + self.text, align="left") + opt = urwid.AttrWrap(opt, "title") + opt = urwid.Padding(opt, align = "center", width = 40) + return opt + + +_neg = lambda: False + + +class Option: + + def __init__(self, text, shortcut, getstate=None, activate=None): + self.text = text + self.shortcut = shortcut + self.getstate = getstate or _neg + self.activate = activate or _neg + + def render(self, focus): + return _OptionWidget( + self, + self.text, + self.shortcut, + self.getstate(), + focus) + + +class Select(urwid.ListBox): + + def __init__(self, options): + self.walker = OptionWalker(options) + urwid.ListBox.__init__( + self, + self.walker + ) + self.options = options + self.keymap = {} + for i in options: + if hasattr(i, "shortcut") and i.shortcut: + if i.shortcut in self.keymap: + raise ValueError("Duplicate shortcut key: %s" % i.shortcut) + self.keymap[i.shortcut] = i + + def keypress(self, size, key): + if key == "enter" or key == " ": + self.get_focus()[0].option.activate() + return None + key = common.shortcuts(key) + if key in self.keymap: + self.keymap[key].activate() + self.set_focus(self.options.index(self.keymap[key])) + return None + return super(self.__class__, self).keypress(size, key) diff --git a/mitmproxy/console/signals.py b/mitmproxy/console/signals.py new file mode 100644 index 00000000..6a439bf3 --- /dev/null +++ b/mitmproxy/console/signals.py @@ -0,0 +1,43 @@ +import blinker + +# Show a status message in the action bar +sig_add_event = blinker.Signal() + + +def add_event(e, level): + sig_add_event.send( + None, + e=e, + level=level + ) + +# Show a status message in the action bar +status_message = blinker.Signal() + +# Prompt for input +status_prompt = blinker.Signal() + +# Prompt for a path +status_prompt_path = blinker.Signal() + +# Prompt for a single keystroke +status_prompt_onekey = blinker.Signal() + +# Call a callback in N seconds +call_in = blinker.Signal() + +# Focus the body, footer or header of the main window +focus = blinker.Signal() + +# Fired when settings change +update_settings = blinker.Signal() + +# Fired when a flow changes +flow_change = blinker.Signal() + +# Fired when the flow list or focus changes +flowlist_change = blinker.Signal() + +# Pop and push view state onto a stack +pop_view_state = blinker.Signal() +push_view_state = blinker.Signal() diff --git a/mitmproxy/console/statusbar.py b/mitmproxy/console/statusbar.py new file mode 100644 index 00000000..4cc63a54 --- /dev/null +++ b/mitmproxy/console/statusbar.py @@ -0,0 +1,258 @@ +import os.path + +import urwid + +import netlib.utils +from . import pathedit, signals, common + + +class ActionBar(urwid.WidgetWrap): + + def __init__(self): + urwid.WidgetWrap.__init__(self, None) + self.clear() + signals.status_message.connect(self.sig_message) + signals.status_prompt.connect(self.sig_prompt) + signals.status_prompt_path.connect(self.sig_path_prompt) + signals.status_prompt_onekey.connect(self.sig_prompt_onekey) + + self.last_path = "" + + self.prompting = False + self.onekey = False + self.pathprompt = False + + def sig_message(self, sender, message, expire=None): + w = urwid.Text(message) + self._w = w + self.prompting = False + if expire: + def cb(*args): + if w == self._w: + self.clear() + signals.call_in.send(seconds=expire, callback=cb) + + def prep_prompt(self, p): + return p.strip() + ": " + + def sig_prompt(self, sender, prompt, text, callback, args=()): + signals.focus.send(self, section="footer") + self._w = urwid.Edit(self.prep_prompt(prompt), text or "") + self.prompting = (callback, args) + + def sig_path_prompt(self, sender, prompt, callback, args=()): + signals.focus.send(self, section="footer") + self._w = pathedit.PathEdit( + self.prep_prompt(prompt), + os.path.dirname(self.last_path) + ) + self.pathprompt = True + self.prompting = (callback, args) + + def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()): + """ + Keys are a set of (word, key) tuples. The appropriate key in the + word is highlighted. + """ + signals.focus.send(self, section="footer") + prompt = [prompt, " ("] + mkup = [] + for i, e in enumerate(keys): + mkup.extend(common.highlight_key(e[0], e[1])) + if i < len(keys) - 1: + mkup.append(",") + prompt.extend(mkup) + prompt.append(")? ") + self.onekey = set(i[1] for i in keys) + self._w = urwid.Edit(prompt, "") + self.prompting = (callback, args) + + def selectable(self): + return True + + def keypress(self, size, k): + if self.prompting: + if k == "esc": + self.prompt_done() + elif self.onekey: + if k == "enter": + self.prompt_done() + elif k in self.onekey: + self.prompt_execute(k) + elif k == "enter": + self.prompt_execute(self._w.get_edit_text()) + else: + if common.is_keypress(k): + self._w.keypress(size, k) + else: + return k + + def clear(self): + self._w = urwid.Text("") + self.prompting = False + + def prompt_done(self): + self.prompting = False + self.onekey = False + self.pathprompt = False + signals.status_message.send(message="") + signals.focus.send(self, section="body") + + def prompt_execute(self, txt): + if self.pathprompt: + self.last_path = txt + p, args = self.prompting + self.prompt_done() + msg = p(txt, *args) + if msg: + signals.status_message.send(message=msg, expire=1) + + +class StatusBar(urwid.WidgetWrap): + + def __init__(self, master, helptext): + self.master, self.helptext = master, helptext + self.ab = ActionBar() + self.ib = urwid.WidgetWrap(urwid.Text("")) + self._w = urwid.Pile([self.ib, self.ab]) + signals.update_settings.connect(self.sig_update_settings) + signals.flowlist_change.connect(self.sig_update_settings) + self.redraw() + + def sig_update_settings(self, sender): + self.redraw() + + def keypress(self, *args, **kwargs): + return self.ab.keypress(*args, **kwargs) + + def get_status(self): + r = [] + + if self.master.setheaders.count(): + r.append("[") + r.append(("heading_key", "H")) + r.append("eaders]") + if self.master.replacehooks.count(): + r.append("[") + r.append(("heading_key", "R")) + r.append("eplacing]") + if self.master.client_playback: + r.append("[") + r.append(("heading_key", "cplayback")) + r.append(":%s to go]" % self.master.client_playback.count()) + if self.master.server_playback: + r.append("[") + r.append(("heading_key", "splayback")) + if self.master.nopop: + r.append(":%s in file]" % self.master.server_playback.count()) + else: + r.append(":%s to go]" % self.master.server_playback.count()) + if self.master.get_ignore_filter(): + r.append("[") + r.append(("heading_key", "I")) + r.append("gnore:%d]" % len(self.master.get_ignore_filter())) + if self.master.get_tcp_filter(): + r.append("[") + r.append(("heading_key", "T")) + r.append("CP:%d]" % len(self.master.get_tcp_filter())) + if self.master.state.intercept_txt: + r.append("[") + r.append(("heading_key", "i")) + r.append(":%s]" % self.master.state.intercept_txt) + if self.master.state.limit_txt: + r.append("[") + r.append(("heading_key", "l")) + r.append(":%s]" % self.master.state.limit_txt) + if self.master.stickycookie_txt: + r.append("[") + r.append(("heading_key", "t")) + r.append(":%s]" % self.master.stickycookie_txt) + if self.master.stickyauth_txt: + r.append("[") + r.append(("heading_key", "u")) + r.append(":%s]" % self.master.stickyauth_txt) + if self.master.state.default_body_view.name != "Auto": + r.append("[") + r.append(("heading_key", "M")) + r.append(":%s]" % self.master.state.default_body_view.name) + + opts = [] + if self.master.anticache: + opts.append("anticache") + if self.master.anticomp: + opts.append("anticomp") + if self.master.showhost: + opts.append("showhost") + if not self.master.refresh_server_playback: + opts.append("norefresh") + if self.master.killextra: + opts.append("killextra") + if self.master.server.config.no_upstream_cert: + opts.append("no-upstream-cert") + if self.master.state.follow_focus: + opts.append("following") + if self.master.stream_large_bodies: + opts.append( + "stream:%s" % netlib.utils.pretty_size( + self.master.stream_large_bodies.max_size + ) + ) + + if opts: + r.append("[%s]" % (":".join(opts))) + + if self.master.server.config.mode in ["reverse", "upstream"]: + dst = self.master.server.config.upstream_server + r.append("[dest:%s]" % netlib.utils.unparse_url( + dst.scheme, + dst.address.host, + dst.address.port + )) + if self.master.scripts: + r.append("[") + r.append(("heading_key", "s")) + r.append("cripts:%s]" % len(self.master.scripts)) + # r.append("[lt:%0.3f]"%self.master.looptime) + + if self.master.stream: + r.append("[W:%s]" % self.master.stream_path) + + return r + + def redraw(self): + fc = self.master.state.flow_count() + if self.master.state.focus is None: + offset = 0 + else: + offset = min(self.master.state.focus + 1, fc) + t = [ + ('heading', ("[%s/%s]" % (offset, fc)).ljust(9)) + ] + + if self.master.server.bound: + host = self.master.server.address.host + if host == "0.0.0.0": + host = "*" + boundaddr = "[%s:%s]" % (host, self.master.server.address.port) + else: + boundaddr = "" + t.extend(self.get_status()) + status = urwid.AttrWrap(urwid.Columns([ + urwid.Text(t), + urwid.Text( + [ + self.helptext, + boundaddr + ], + align="right" + ), + ]), "heading") + self.ib._w = status + + def update(self, text): + self.helptext = text + self.redraw() + self.master.loop.draw_screen() + + def selectable(self): + return True diff --git a/mitmproxy/console/tabs.py b/mitmproxy/console/tabs.py new file mode 100644 index 00000000..b5423038 --- /dev/null +++ b/mitmproxy/console/tabs.py @@ -0,0 +1,70 @@ +import urwid + + +class Tab(urwid.WidgetWrap): + + def __init__(self, offset, content, attr, onclick): + """ + onclick is called on click with the tab offset as argument + """ + p = urwid.Text(content, align="center") + p = urwid.Padding(p, align="center", width=("relative", 100)) + p = urwid.AttrWrap(p, attr) + urwid.WidgetWrap.__init__(self, p) + self.offset = offset + self.onclick = onclick + + def mouse_event(self, size, event, button, col, row, focus): + if event == "mouse press" and button == 1: + self.onclick(self.offset) + return True + + +class Tabs(urwid.WidgetWrap): + + def __init__(self, tabs, tab_offset=0): + urwid.WidgetWrap.__init__(self, "") + self.tab_offset = tab_offset + self.tabs = tabs + self.show() + + def change_tab(self, offset): + self.tab_offset = offset + self.show() + + def keypress(self, size, key): + n = len(self.tabs) + if key in ["tab", "l"]: + self.change_tab((self.tab_offset + 1) % n) + elif key == "h": + self.change_tab((self.tab_offset - 1) % n) + return self._w.keypress(size, key) + + def show(self): + headers = [] + for i in range(len(self.tabs)): + txt = self.tabs[i][0]() + if i == self.tab_offset: + headers.append( + Tab( + i, + txt, + "heading", + self.change_tab + ) + ) + else: + headers.append( + Tab( + i, + txt, + "heading_inactive", + self.change_tab + ) + ) + headers = urwid.Columns(headers, dividechars=1) + self._w = urwid.Frame( + body = self.tabs[self.tab_offset][1](), + header = headers + ) + self._w.set_focus("body") diff --git a/mitmproxy/console/window.py b/mitmproxy/console/window.py new file mode 100644 index 00000000..47c284e4 --- /dev/null +++ b/mitmproxy/console/window.py @@ -0,0 +1,90 @@ +import urwid +from . import signals + + +class Window(urwid.Frame): + + def __init__(self, master, body, header, footer, helpctx): + urwid.Frame.__init__( + self, + urwid.AttrWrap(body, "background"), + header = urwid.AttrWrap(header, "background") if header else None, + footer = urwid.AttrWrap(footer, "background") if footer else None + ) + self.master = master + self.helpctx = helpctx + signals.focus.connect(self.sig_focus) + + def sig_focus(self, sender, section): + self.focus_position = section + + def mouse_event(self, *args, **kwargs): + # args: (size, event, button, col, row) + k = super(self.__class__, self).mouse_event(*args, **kwargs) + if not k: + if args[1] == "mouse drag": + signals.status_message.send( + message = "Hold down shift, alt or ctrl to select text.", + expire = 1 + ) + elif args[1] == "mouse press" and args[2] == 4: + self.keypress(args[0], "up") + elif args[1] == "mouse press" and args[2] == 5: + self.keypress(args[0], "down") + else: + return False + return True + + def keypress(self, size, k): + k = super(self.__class__, self).keypress(size, k) + if k == "?": + self.master.view_help(self.helpctx) + elif k == "c": + if not self.master.client_playback: + signals.status_prompt_path.send( + self, + prompt = "Client replay", + callback = self.master.client_playback_path + ) + else: + signals.status_prompt_onekey.send( + self, + prompt = "Stop current client replay?", + keys = ( + ("yes", "y"), + ("no", "n"), + ), + callback = self.master.stop_client_playback_prompt, + ) + elif k == "i": + signals.status_prompt.send( + self, + prompt = "Intercept filter", + text = self.master.state.intercept_txt, + callback = self.master.set_intercept + ) + elif k == "o": + self.master.view_options() + elif k == "Q": + raise urwid.ExitMainLoop + elif k == "q": + signals.pop_view_state.send(self) + elif k == "S": + if not self.master.server_playback: + signals.status_prompt_path.send( + self, + prompt = "Server replay path", + callback = self.master.server_playback_path + ) + else: + signals.status_prompt_onekey.send( + self, + prompt = "Stop current server replay?", + keys = ( + ("yes", "y"), + ("no", "n"), + ), + callback = self.master.stop_server_playback_prompt, + ) + else: + return k |
