aboutsummaryrefslogtreecommitdiffstats
path: root/libmproxy/console
diff options
context:
space:
mode:
Diffstat (limited to 'libmproxy/console')
-rw-r--r--libmproxy/console/__init__.py760
-rw-r--r--libmproxy/console/common.py168
-rw-r--r--libmproxy/console/contentview.py32
-rw-r--r--libmproxy/console/flowdetailview.py233
-rw-r--r--libmproxy/console/flowlist.py151
-rw-r--r--libmproxy/console/flowview.py900
-rw-r--r--libmproxy/console/grideditor.py337
-rw-r--r--libmproxy/console/help.py100
-rw-r--r--libmproxy/console/options.py268
-rw-r--r--libmproxy/console/palettepicker.py81
-rw-r--r--libmproxy/console/palettes.py115
-rw-r--r--libmproxy/console/pathedit.py69
-rw-r--r--libmproxy/console/searchable.py91
-rw-r--r--libmproxy/console/select.py107
-rw-r--r--libmproxy/console/signals.py32
-rw-r--r--libmproxy/console/statusbar.py254
-rw-r--r--libmproxy/console/tabs.py38
-rw-r--r--libmproxy/console/window.py72
18 files changed, 2226 insertions, 1582 deletions
diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py
index 198b7bbe..527ed07d 100644
--- a/libmproxy/console/__init__.py
+++ b/libmproxy/console/__init__.py
@@ -1,281 +1,38 @@
from __future__ import absolute_import
-import glob
import mailcap
import mimetypes
import tempfile
import os
import os.path
import shlex
+import signal
import stat
import subprocess
import sys
-import time
import traceback
import urwid
import weakref
-from .. import controller, utils, flow, script
-from . import flowlist, flowview, help, common
-from . import grideditor, palettes, contentview, flowdetailview
+from .. import controller, flow, script
+from . import flowlist, flowview, help, window, signals, options
+from . import grideditor, palettes, contentview, statusbar, palettepicker
EVENTLOG_SIZE = 500
-class _PathCompleter:
- def __init__(self, _testing=False):
- """
- _testing: disables reloading of the lookup table to make testing possible.
- """
- self.lookup, self.offset = None, None
- self.final = None
- self._testing = _testing
-
- def reset(self):
- self.lookup = None
- self.offset = -1
-
- def complete(self, txt):
- """
- Returns the next completion for txt, or None if there is no completion.
- """
- path = os.path.expanduser(txt)
- if not self.lookup:
- if not self._testing:
- # Lookup is a set of (display value, actual value) tuples.
- self.lookup = []
- if os.path.isdir(path):
- files = glob.glob(os.path.join(path, "*"))
- prefix = txt
- else:
- files = glob.glob(path+"*")
- prefix = os.path.dirname(txt)
- prefix = prefix or "./"
- for f in files:
- display = os.path.join(prefix, os.path.basename(f))
- if os.path.isdir(f):
- display += "/"
- self.lookup.append((display, f))
- if not self.lookup:
- self.final = path
- return path
- self.lookup.sort()
- self.offset = -1
- self.lookup.append((txt, txt))
- self.offset += 1
- if self.offset >= len(self.lookup):
- self.offset = 0
- ret = self.lookup[self.offset]
- self.final = ret[1]
- return ret[0]
-
-
-class PathEdit(urwid.Edit, _PathCompleter):
- def __init__(self, *args, **kwargs):
- urwid.Edit.__init__(self, *args, **kwargs)
- _PathCompleter.__init__(self)
-
- def keypress(self, size, key):
- if key == "tab":
- comp = self.complete(self.get_edit_text())
- self.set_edit_text(comp)
- self.set_edit_pos(len(comp))
- else:
- self.reset()
- return urwid.Edit.keypress(self, size, key)
-
-
-class ActionBar(urwid.WidgetWrap):
- def __init__(self):
- self.message("")
-
- def selectable(self):
- return True
-
- def path_prompt(self, prompt, text):
- self.expire = None
- self._w = PathEdit(prompt, text)
-
- def prompt(self, prompt, text = ""):
- self.expire = None
- # A (partial) workaround for this Urwid issue:
- # https://github.com/Nic0/tyrs/issues/115
- # We can remove it once veryone is beyond 1.0.1
- if isinstance(prompt, basestring):
- prompt = unicode(prompt)
- self._w = urwid.Edit(prompt, text or "")
-
- def message(self, message, expire=None):
- self.expire = expire
- self._w = urwid.Text(message)
-
-
-class StatusBar(urwid.WidgetWrap):
- def __init__(self, master, helptext):
- self.master, self.helptext = master, helptext
- self.ab = ActionBar()
- self.ib = urwid.WidgetWrap(urwid.Text(""))
- self._w = urwid.Pile([self.ib, self.ab])
-
- def get_status(self):
- r = []
-
- if self.master.setheaders.count():
- r.append("[")
- r.append(("heading_key", "H"))
- r.append("eaders]")
- if self.master.replacehooks.count():
- r.append("[")
- r.append(("heading_key", "R"))
- r.append("eplacing]")
- if self.master.client_playback:
- r.append("[")
- r.append(("heading_key", "cplayback"))
- r.append(":%s to go]"%self.master.client_playback.count())
- if self.master.server_playback:
- r.append("[")
- r.append(("heading_key", "splayback"))
- if self.master.nopop:
- r.append(":%s in file]"%self.master.server_playback.count())
- else:
- r.append(":%s to go]"%self.master.server_playback.count())
- if self.master.get_ignore_filter():
- r.append("[")
- r.append(("heading_key", "I"))
- r.append("gnore:%d]" % len(self.master.get_ignore_filter()))
- if self.master.get_tcp_filter():
- r.append("[")
- r.append(("heading_key", "T"))
- r.append("CP:%d]" % len(self.master.get_tcp_filter()))
- if self.master.state.intercept_txt:
- r.append("[")
- r.append(("heading_key", "i"))
- r.append(":%s]"%self.master.state.intercept_txt)
- if self.master.state.limit_txt:
- r.append("[")
- r.append(("heading_key", "l"))
- r.append(":%s]"%self.master.state.limit_txt)
- if self.master.stickycookie_txt:
- r.append("[")
- r.append(("heading_key", "t"))
- r.append(":%s]"%self.master.stickycookie_txt)
- if self.master.stickyauth_txt:
- r.append("[")
- r.append(("heading_key", "u"))
- r.append(":%s]"%self.master.stickyauth_txt)
- if self.master.state.default_body_view.name != "Auto":
- r.append("[")
- r.append(("heading_key", "M"))
- r.append(":%s]"%self.master.state.default_body_view.name)
-
- opts = []
- if self.master.anticache:
- opts.append("anticache")
- if self.master.anticomp:
- opts.append("anticomp")
- if self.master.showhost:
- opts.append("showhost")
- if not self.master.refresh_server_playback:
- opts.append("norefresh")
- if self.master.killextra:
- opts.append("killextra")
- if self.master.server.config.no_upstream_cert:
- opts.append("no-upstream-cert")
- if self.master.state.follow_focus:
- opts.append("following")
- if self.master.stream_large_bodies:
- opts.append("stream:%s" % utils.pretty_size(self.master.stream_large_bodies.max_size))
-
- if opts:
- r.append("[%s]"%(":".join(opts)))
-
- if self.master.server.config.mode in ["reverse", "upstream"]:
- dst = self.master.server.config.mode.dst
- scheme = "https" if dst[0] else "http"
- if dst[1] != dst[0]:
- scheme += "2https" if dst[1] else "http"
- r.append("[dest:%s]"%utils.unparse_url(scheme, *dst[2:]))
- if self.master.scripts:
- r.append("[")
- r.append(("heading_key", "s"))
- r.append("cripts:%s]"%len(self.master.scripts))
- # r.append("[lt:%0.3f]"%self.master.looptime)
-
- if self.master.stream:
- r.append("[W:%s]"%self.master.stream_path)
-
- return r
-
- def redraw(self):
- if self.ab.expire and time.time() > self.ab.expire:
- self.message("")
-
- fc = self.master.state.flow_count()
- if self.master.state.focus is None:
- offset = 0
- else:
- offset = min(self.master.state.focus + 1, fc)
- t = [
- ('heading', ("[%s/%s]"%(offset, fc)).ljust(9))
- ]
-
- if self.master.server.bound:
- host = self.master.server.address.host
- if host == "0.0.0.0":
- host = "*"
- boundaddr = "[%s:%s]"%(host, self.master.server.address.port)
- else:
- boundaddr = ""
- t.extend(self.get_status())
- status = urwid.AttrWrap(urwid.Columns([
- urwid.Text(t),
- urwid.Text(
- [
- self.helptext,
- boundaddr
- ],
- align="right"
- ),
- ]), "heading")
- self.ib._w = status
-
- def update(self, text):
- self.helptext = text
- self.redraw()
- self.master.loop.draw_screen()
-
- def selectable(self):
- return True
-
- def get_edit_text(self):
- return self.ab._w.get_edit_text()
-
- def path_prompt(self, prompt, text):
- return self.ab.path_prompt(prompt, text)
-
- def prompt(self, prompt, text = ""):
- self.ab.prompt(prompt, text)
-
- def message(self, msg, expire=None):
- if expire:
- expire = time.time() + float(expire)/1000
- self.ab.message(msg, expire)
- self.master.loop.draw_screen()
-
-
class ConsoleState(flow.State):
def __init__(self):
flow.State.__init__(self)
self.focus = None
self.follow_focus = None
self.default_body_view = contentview.get("Auto")
-
- self.view_mode = common.VIEW_LIST
- self.view_flow_mode = common.VIEW_FLOW_REQUEST
-
- self.last_script = ""
- self.last_saveload = ""
self.flowsettings = weakref.WeakKeyDictionary()
+ self.last_search = None
+
+ def __setattr__(self, name, value):
+ self.__dict__[name] = value
+ signals.update_settings.send(self)
def add_flow_setting(self, flow, key, value):
d = self.flowsettings.setdefault(flow, {})
@@ -316,6 +73,8 @@ class ConsoleState(flow.State):
elif idx < 0:
idx = 0
self.focus = idx
+ else:
+ self.focus = None
def set_focus_flow(self, f):
self.set_focus(self.view.index(f))
@@ -357,6 +116,7 @@ class Options(object):
"keepserving",
"kill",
"intercept",
+ "limit",
"no_server",
"refresh_server_playback",
"rfile",
@@ -373,6 +133,7 @@ class Options(object):
"wfile",
"nopop",
"palette",
+ "palette_transparent"
]
def __init__(self, **kwargs):
@@ -388,7 +149,6 @@ class ConsoleMaster(flow.FlowMaster):
def __init__(self, server, options):
flow.FlowMaster.__init__(self, server, ConsoleState())
- self.looptime = 0
self.stream_path = None
self.options = options
@@ -398,14 +158,14 @@ class ConsoleMaster(flow.FlowMaster):
for i in options.setheaders:
self.setheaders.add(*i)
- self.flow_list_walker = None
- self.set_palette(options.palette)
-
r = self.set_intercept(options.intercept)
if r:
print >> sys.stderr, "Intercept error:", r
sys.exit(1)
+ if options.limit:
+ self.set_limit(options.limit)
+
r = self.set_stickycookie(options.stickycookie)
if r:
print >> sys.stderr, "Sticky cookies error:", r
@@ -425,12 +185,12 @@ class ConsoleMaster(flow.FlowMaster):
self.rheaders = options.rheaders
self.nopop = options.nopop
self.showhost = options.showhost
+ self.palette = options.palette
+ self.palette_transparent = options.palette_transparent
self.eventlog = options.eventlog
self.eventlist = urwid.SimpleListWalker([])
- self.statusbar = None
-
if options.client_replay:
self.client_playback_path(options.client_replay)
@@ -453,8 +213,42 @@ class ConsoleMaster(flow.FlowMaster):
print >> sys.stderr, "Stream file error:", err
sys.exit(1)
+ self.view_stack = []
+
if options.app:
self.start_app(self.options.app_host, self.options.app_port)
+ signals.call_in.connect(self.sig_call_in)
+ signals.pop_view_state.connect(self.sig_pop_view_state)
+ signals.push_view_state.connect(self.sig_push_view_state)
+
+ def __setattr__(self, name, value):
+ self.__dict__[name] = value
+ signals.update_settings.send(self)
+
+ def sig_call_in(self, sender, seconds, callback, args=()):
+ def cb(*_):
+ return callback(*args)
+ self.loop.set_alarm_in(seconds, cb)
+
+ def sig_pop_view_state(self, sender):
+ if len(self.view_stack) > 1:
+ self.view_stack.pop()
+ self.loop.widget = self.view_stack[-1]
+ else:
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Quit",
+ keys = (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ callback = self.quit,
+ )
+
+ def sig_push_view_state(self, sender, window):
+ self.view_stack.append(window)
+ self.loop.widget = window
+ self.loop.draw_screen()
def start_stream_to_path(self, path, mode="wb"):
path = os.path.expanduser(path)
@@ -481,7 +275,9 @@ class ConsoleMaster(flow.FlowMaster):
try:
s = script.Script(command, self)
except script.ScriptError, v:
- self.statusbar.message("Error loading script.")
+ signals.status_message.send(
+ message = "Error loading script."
+ )
self.add_event("Error loading script:\n%s"%v.args[0], "error")
return
@@ -492,22 +288,21 @@ class ConsoleMaster(flow.FlowMaster):
if f.error:
self._run_script_method("error", s, f)
s.unload()
- self.refresh_flow(f)
- self.state.last_script = command
+ signals.flow_change.send(self, flow = f)
def set_script(self, command):
if not command:
return
ret = self.load_script(command)
if ret:
- self.statusbar.message(ret)
- self.state.last_script = command
+ signals.status_message.send(message=ret)
def toggle_eventlog(self):
self.eventlog = not self.eventlog
+ signals.pop_view_state.send(self)
self.view_flowlist()
- def _readflow(self, paths):
+ def _readflows(self, path):
"""
Utitility function that reads a list of flows
or prints an error to the UI if that fails.
@@ -516,22 +311,21 @@ class ConsoleMaster(flow.FlowMaster):
- a list of flows, otherwise.
"""
try:
- return flow.read_flows_from_paths(paths)
+ return flow.read_flows_from_paths(path)
except flow.FlowReadError as e:
- if not self.statusbar:
- print >> sys.stderr, e.strerror
- sys.exit(1)
- else:
- self.statusbar.message(e.strerror)
- return None
+ signals.status_message.send(message=e.strerror)
def client_playback_path(self, path):
- flows = self._readflow(path)
+ if not isinstance(path, list):
+ path = [path]
+ flows = self._readflows(path)
if flows:
self.start_client_playback(flows, False)
def server_playback_path(self, path):
- flows = self._readflow(path)
+ if not isinstance(path, list):
+ path = [path]
+ flows = self._readflows(path)
if flows:
self.start_server_playback(
flows,
@@ -557,7 +351,9 @@ class ConsoleMaster(flow.FlowMaster):
try:
subprocess.call(cmd)
except:
- self.statusbar.message("Can't start editor: %s" % " ".join(c))
+ signals.status_message.send(
+ message = "Can't start editor: %s" % " ".join(c)
+ )
else:
data = open(name, "rb").read()
self.ui.start()
@@ -596,191 +392,34 @@ class ConsoleMaster(flow.FlowMaster):
try:
subprocess.call(cmd, shell=shell)
except:
- self.statusbar.message(
- "Can't start external viewer: %s" % " ".join(c)
+ signals.status_message.send(
+ message="Can't start external viewer: %s" % " ".join(c)
)
self.ui.start()
os.unlink(name)
def set_palette(self, name):
- self.palette = palettes.palettes[name]
-
- def input_filter(self, keys, raw):
- for k in keys:
- if self.prompting:
- if k == "esc":
- self.prompt_cancel()
- elif self.onekey:
- if k == "enter":
- self.prompt_cancel()
- elif k in self.onekey:
- self.prompt_execute(k)
- elif k == "enter":
- self.prompt_execute()
- else:
- self.view.keypress(self.loop.screen_size, k)
- else:
- k = self.view.keypress(self.loop.screen_size, k)
- if k:
- self.statusbar.message("")
- if k == "?":
- self.view_help()
- elif k == "c":
- if not self.client_playback:
- self.path_prompt(
- "Client replay: ",
- self.state.last_saveload,
- self.client_playback_path
- )
- else:
- self.prompt_onekey(
- "Stop current client replay?",
- (
- ("yes", "y"),
- ("no", "n"),
- ),
- self.stop_client_playback_prompt,
- )
- elif k == "H":
- self.view_grideditor(
- grideditor.SetHeadersEditor(
- self,
- self.setheaders.get_specs(),
- self.setheaders.set
- )
- )
- elif k == "I":
- self.view_grideditor(
- grideditor.HostPatternEditor(
- self,
- [[x] for x in self.get_ignore_filter()],
- self.edit_ignore_filter
- )
- )
- elif k == "T":
- self.view_grideditor(
- grideditor.HostPatternEditor(
- self,
- [[x] for x in self.get_tcp_filter()],
- self.edit_tcp_filter
- )
- )
- elif k == "i":
- self.prompt(
- "Intercept filter: ",
- self.state.intercept_txt,
- self.set_intercept
- )
- elif k == "Q":
- raise urwid.ExitMainLoop
- elif k == "q":
- self.prompt_onekey(
- "Quit",
- (
- ("yes", "y"),
- ("no", "n"),
- ),
- self.quit,
- )
- elif k == "M":
- self.prompt_onekey(
- "Global default display mode",
- contentview.view_prompts,
- self.change_default_display_mode
- )
- elif k == "R":
- self.view_grideditor(
- grideditor.ReplaceEditor(
- self,
- self.replacehooks.get_specs(),
- self.replacehooks.set
- )
- )
- elif k == "s":
- self.view_grideditor(
- grideditor.ScriptEditor(
- self,
- [[i.command] for i in self.scripts],
- self.edit_scripts
- )
- )
- #if self.scripts:
- # self.load_script(None)
- #else:
- # self.path_prompt(
- # "Set script: ",
- # self.state.last_script,
- # self.set_script
- # )
- elif k == "S":
- if not self.server_playback:
- self.path_prompt(
- "Server replay path: ",
- self.state.last_saveload,
- self.server_playback_path
- )
- else:
- self.prompt_onekey(
- "Stop current server replay?",
- (
- ("yes", "y"),
- ("no", "n"),
- ),
- self.stop_server_playback_prompt,
- )
- elif k == "o":
- self.prompt_onekey(
- "Options",
- (
- ("anticache", "a"),
- ("anticomp", "c"),
- ("showhost", "h"),
- ("killextra", "k"),
- ("norefresh", "n"),
- ("no-upstream-certs", "u"),
- ),
- self._change_options
- )
- elif k == "t":
- self.prompt(
- "Sticky cookie filter: ",
- self.stickycookie_txt,
- self.set_stickycookie
- )
- elif k == "u":
- self.prompt(
- "Sticky auth filter: ",
- self.stickyauth_txt,
- self.set_stickyauth
- )
- self.statusbar.redraw()
+ self.palette = name
+ self.ui.register_palette(
+ palettes.palettes[name].palette(self.palette_transparent)
+ )
+ self.ui.clear()
def ticker(self, *userdata):
changed = self.tick(self.masterq, timeout=0)
if changed:
self.loop.draw_screen()
- self.statusbar.redraw()
+ signals.update_settings.send()
self.loop.set_alarm_in(0.01, self.ticker)
def run(self):
self.ui = urwid.raw_display.Screen()
self.ui.set_terminal_properties(256)
- self.ui.register_palette(self.palette.palette())
- self.flow_list_walker = flowlist.FlowListWalker(self, self.state)
- self.view = None
- self.statusbar = None
- self.header = None
- self.body = None
- self.help_context = None
- self.prompting = False
- self.onekey = False
+ self.set_palette(self.palette)
self.loop = urwid.MainLoop(
- self.view,
+ urwid.SolidFill("x"),
screen = self.ui,
- input_filter = self.input_filter
)
- self.view_flowlist()
- self.statusbar.redraw()
self.server.start_slave(
controller.Slave,
@@ -801,6 +440,19 @@ class ConsoleMaster(flow.FlowMaster):
sys.exit(1)
self.loop.set_alarm_in(0.01, self.ticker)
+
+ # It's not clear why we need to handle this explicitly - without this,
+ # mitmproxy hangs on keyboard interrupt. Remove if we ever figure it
+ # out.
+ def exit(s, f):
+ raise urwid.ExitMainLoop
+ signal.signal(signal.SIGINT, exit)
+
+ self.loop.set_alarm_in(
+ 0.0001,
+ lambda *args: self.view_flowlist()
+ )
+
try:
self.loop.run()
except Exception:
@@ -814,43 +466,56 @@ class ConsoleMaster(flow.FlowMaster):
sys.stderr.flush()
self.shutdown()
- def make_view(self):
- self.view = urwid.Frame(
- self.body,
- header = self.header,
- footer = self.statusbar
+ def view_help(self, helpctx):
+ signals.push_view_state.send(
+ self,
+ window = window.Window(
+ self,
+ help.HelpView(helpctx),
+ None,
+ statusbar.StatusBar(self, help.footer),
+ None
+ )
)
- self.view.set_focus("body")
- return self.view
- def view_help(self):
- h = help.HelpView(
+ def view_options(self):
+ for i in self.view_stack:
+ if isinstance(i["body"], options.Options):
+ return
+ signals.push_view_state.send(
self,
- self.help_context,
- (self.statusbar, self.body, self.header)
+ window = window.Window(
+ self,
+ options.Options(self),
+ None,
+ statusbar.StatusBar(self, options.footer),
+ options.help_context,
+ )
)
- self.statusbar = StatusBar(self, help.footer)
- self.body = h
- self.header = None
- self.loop.widget = self.make_view()
- def view_flowdetails(self, flow):
- h = flowdetailview.FlowDetailsView(
+ def view_palette_picker(self):
+ signals.push_view_state.send(
self,
- flow,
- (self.statusbar, self.body, self.header)
+ window = window.Window(
+ self,
+ palettepicker.PalettePicker(self),
+ None,
+ statusbar.StatusBar(self, palettepicker.footer),
+ palettepicker.help_context,
+ )
)
- self.statusbar = StatusBar(self, flowdetailview.footer)
- self.body = h
- self.header = None
- self.loop.widget = self.make_view()
def view_grideditor(self, ge):
- self.body = ge
- self.header = None
- self.help_context = ge.make_help()
- self.statusbar = StatusBar(self, grideditor.footer)
- self.loop.widget = self.make_view()
+ signals.push_view_state.send(
+ self,
+ window = window.Window(
+ self,
+ ge,
+ None,
+ statusbar.StatusBar(self, grideditor.FOOTER),
+ ge.make_help()
+ )
+ )
def view_flowlist(self):
if self.ui.started:
@@ -859,27 +524,35 @@ class ConsoleMaster(flow.FlowMaster):
self.state.set_focus(self.state.flow_count())
if self.eventlog:
- self.body = flowlist.BodyPile(self)
+ body = flowlist.BodyPile(self)
else:
- self.body = flowlist.FlowListBox(self)
- self.statusbar = StatusBar(self, flowlist.footer)
- self.header = None
- self.state.view_mode = common.VIEW_LIST
-
- self.loop.widget = self.make_view()
- self.help_context = flowlist.help_context
-
- def view_flow(self, flow):
- self.body = flowview.FlowView(self, self.state, flow)
- self.header = flowview.FlowViewHeader(self, flow)
- self.statusbar = StatusBar(self, flowview.footer)
+ body = flowlist.FlowListBox(self)
+
+ signals.push_view_state.send(
+ self,
+ window = window.Window(
+ self,
+ body,
+ None,
+ statusbar.StatusBar(self, flowlist.footer),
+ flowlist.help_context
+ )
+ )
+
+ def view_flow(self, flow, tab_offset=0):
self.state.set_focus_flow(flow)
- self.state.view_mode = common.VIEW_FLOW
- self.loop.widget = self.make_view()
- self.help_context = flowview.help_context
+ signals.push_view_state.send(
+ self,
+ window = window.Window(
+ self,
+ flowview.FlowView(self, self.state, flow, tab_offset),
+ flowview.FlowViewHeader(self, flow),
+ statusbar.StatusBar(self, flowview.footer),
+ flowview.help_context
+ )
+ )
def _write_flows(self, path, flows):
- self.state.last_saveload = path
if not path:
return
path = os.path.expanduser(path)
@@ -890,7 +563,7 @@ class ConsoleMaster(flow.FlowMaster):
fw.add(i)
f.close()
except IOError, v:
- self.statusbar.message(v.strerror)
+ signals.status_message.send(message=v.strerror)
def save_one_flow(self, path, flow):
return self._write_flows(path, [flow])
@@ -905,71 +578,20 @@ class ConsoleMaster(flow.FlowMaster):
return ret or "Flows loaded from %s"%path
def load_flows_path(self, path):
- self.state.last_saveload = path
reterr = None
try:
flow.FlowMaster.load_flows_file(self, path)
except flow.FlowReadError, v:
reterr = str(v)
- if self.flow_list_walker:
- self.sync_list_view()
+ signals.flowlist_change.send(self)
return reterr
- def path_prompt(self, prompt, text, callback, *args):
- self.statusbar.path_prompt(prompt, text)
- self.view.set_focus("footer")
- self.prompting = (callback, args)
-
- def prompt(self, prompt, text, callback, *args):
- self.statusbar.prompt(prompt, text)
- self.view.set_focus("footer")
- self.prompting = (callback, args)
-
- def prompt_edit(self, prompt, text, callback):
- self.statusbar.prompt(prompt + ": ", text)
- self.view.set_focus("footer")
- self.prompting = (callback, [])
-
- def prompt_onekey(self, prompt, keys, callback, *args):
- """
- Keys are a set of (word, key) tuples. The appropriate key in the
- word is highlighted.
- """
- prompt = [prompt, " ("]
- mkup = []
- for i, e in enumerate(keys):
- mkup.extend(common.highlight_key(e[0], e[1]))
- if i < len(keys)-1:
- mkup.append(",")
- prompt.extend(mkup)
- prompt.append(")? ")
- self.onekey = "".join(i[1] for i in keys)
- self.prompt(prompt, "", callback, *args)
-
- def prompt_done(self):
- self.prompting = False
- self.onekey = False
- self.view.set_focus("body")
- self.statusbar.message("")
-
- def prompt_execute(self, txt=None):
- if not txt:
- txt = self.statusbar.get_edit_text()
- p, args = self.prompting
- self.prompt_done()
- msg = p(txt, *args)
- if msg:
- self.statusbar.message(msg, 1000)
-
- def prompt_cancel(self):
- self.prompt_done()
-
def accept_all(self):
self.state.accept_all(self)
def set_limit(self, txt):
v = self.state.set_limit(txt)
- self.sync_list_view()
+ signals.flowlist_change.send(self)
return v
def set_intercept(self, txt):
@@ -980,12 +602,6 @@ class ConsoleMaster(flow.FlowMaster):
self.state.default_body_view = v
self.refresh_focus()
- def pop_view(self):
- if self.state.view_mode == common.VIEW_FLOW:
- self.view_flow(self.state.view[self.state.focus])
- else:
- self.view_flowlist()
-
def edit_scripts(self, scripts):
commands = [x[0] for x in scripts] # remove outer array
if commands == [s.command for s in self.scripts]:
@@ -994,14 +610,7 @@ class ConsoleMaster(flow.FlowMaster):
self.unload_scripts()
for command in commands:
self.load_script(command)
-
- def edit_ignore_filter(self, ignore):
- patterns = (x[0] for x in ignore)
- self.set_ignore_filter(patterns)
-
- def edit_tcp_filter(self, tcp):
- patterns = (x[0] for x in tcp)
- self.set_tcp_filter(patterns)
+ signals.update_settings.send(self)
def stop_client_playback_prompt(self, a):
if a != "n":
@@ -1015,33 +624,13 @@ class ConsoleMaster(flow.FlowMaster):
if a != "n":
raise urwid.ExitMainLoop
- def _change_options(self, a):
- if a == "a":
- self.anticache = not self.anticache
- if a == "c":
- self.anticomp = not self.anticomp
- if a == "h":
- self.showhost = not self.showhost
- self.sync_list_view()
- self.refresh_focus()
- elif a == "k":
- self.killextra = not self.killextra
- elif a == "n":
- self.refresh_server_playback = not self.refresh_server_playback
- elif a == "u":
- self.server.config.no_upstream_cert =\
- not self.server.config.no_upstream_cert
-
def shutdown(self):
self.state.killall(self)
flow.FlowMaster.shutdown(self)
- def sync_list_view(self):
- self.flow_list_walker._modified()
-
def clear_flows(self):
self.state.clear()
- self.sync_list_view()
+ signals.flowlist_change.send(self)
def toggle_follow_flows(self):
# toggle flow follow
@@ -1049,31 +638,26 @@ class ConsoleMaster(flow.FlowMaster):
# jump to most recent flow if follow is now on
if self.state.follow_focus:
self.state.set_focus(self.state.flow_count())
- self.sync_list_view()
+ signals.flowlist_change.send(self)
def delete_flow(self, f):
self.state.delete_flow(f)
- self.sync_list_view()
+ signals.flowlist_change.send(self)
def refresh_focus(self):
if self.state.view:
- self.refresh_flow(self.state.view[self.state.focus])
-
- def refresh_flow(self, c):
- if hasattr(self.header, "refresh_flow"):
- self.header.refresh_flow(c)
- if hasattr(self.body, "refresh_flow"):
- self.body.refresh_flow(c)
- if hasattr(self.statusbar, "refresh_flow"):
- self.statusbar.refresh_flow(c)
+ signals.flow_change.send(
+ self,
+ flow = self.state.view[self.state.focus]
+ )
def process_flow(self, f):
if self.state.intercept and f.match(self.state.intercept) and not f.request.is_replay:
f.intercept(self)
else:
f.reply()
- self.sync_list_view()
- self.refresh_flow(f)
+ signals.flowlist_change.send(self)
+ signals.flow_change.send(self, flow = f)
def clear_events(self):
self.eventlist[:] = []
diff --git a/libmproxy/console/common.py b/libmproxy/console/common.py
index 3a708c7c..b920a11f 100644
--- a/libmproxy/console/common.py
+++ b/libmproxy/console/common.py
@@ -6,15 +6,14 @@ import os
from .. import utils
from ..protocol.http import CONTENT_MISSING, decoded
+from . import signals
+import netlib.utils
try:
import pyperclip
except:
pyperclip = False
-VIEW_LIST = 0
-VIEW_FLOW = 1
-
VIEW_FLOW_REQUEST = 0
VIEW_FLOW_RESPONSE = 1
@@ -31,14 +30,22 @@ METHOD_OPTIONS = [
]
-def highlight_key(s, k):
+def is_keypress(k):
+ """
+ Is this input event a keypress?
+ """
+ if isinstance(k, basestring):
+ return True
+
+
+def highlight_key(str, key, textattr="text", keyattr="key"):
l = []
- parts = s.split(k, 1)
+ parts = str.split(key, 1)
if parts[0]:
- l.append(("text", parts[0]))
- l.append(("key", k))
+ l.append((textattr, parts[0]))
+ l.append((keyattr, key))
if parts[1]:
- l.append(("text", parts[1]))
+ l.append((textattr, parts[1]))
return l
@@ -60,20 +67,26 @@ def format_keyvals(lst, key="key", val="text", indent=0):
if kv is None:
ret.append(urwid.Text(""))
else:
- cols = []
- # This cumbersome construction process is here for a reason:
- # Urwid < 1.0 barfs if given a fixed size column of size zero.
- if indent:
- cols.append(("fixed", indent, urwid.Text("")))
- cols.extend([
- (
- "fixed",
- maxk,
- urwid.Text([(key, kv[0] or "")])
- ),
- kv[1] if isinstance(kv[1], urwid.Widget) else urwid.Text([(val, kv[1])])
- ])
- ret.append(urwid.Columns(cols, dividechars = 2))
+ if isinstance(kv[1], urwid.Widget):
+ v = kv[1]
+ elif kv[1] is None:
+ v = urwid.Text("")
+ else:
+ v = urwid.Text([(val, kv[1])])
+ ret.append(
+ urwid.Columns(
+ [
+ ("fixed", indent, urwid.Text("")),
+ (
+ "fixed",
+ maxk,
+ urwid.Text([(key, kv[0] or "")])
+ ),
+ v
+ ],
+ dividechars = 2
+ )
+ )
return ret
@@ -184,23 +197,39 @@ def raw_format_flow(f, focus, extended, padding):
def save_data(path, data, master, state):
if not path:
return
- state.last_saveload = path
- path = os.path.expanduser(path)
try:
with file(path, "wb") as f:
f.write(data)
except IOError, v:
- master.statusbar.message(v.strerror)
+ signals.status_message.send(message=v.strerror)
+
+
+def ask_save_overwite(path, data, master, state):
+ if not path:
+ return
+ path = os.path.expanduser(path)
+ if os.path.exists(path):
+ def save_overwite(k):
+ if k == "y":
+ save_data(path, data, master, state)
+
+ signals.status_prompt_onekey.send(
+ prompt = "'"+path+"' already exists. Overwite?",
+ keys = (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ callback = save_overwite
+ )
+ else:
+ save_data(path, data, master, state)
def ask_save_path(prompt, data, master, state):
- master.path_prompt(
- prompt,
- state.last_saveload,
- save_data,
- data,
- master,
- state
+ signals.status_prompt_path.send(
+ prompt = prompt,
+ callback = ask_save_overwite,
+ args = (data, master, state)
)
@@ -210,6 +239,8 @@ def copy_flow_format_data(part, scope, flow):
else:
data = ""
if scope in ("q", "a"):
+ if flow.request.content is None or flow.request.content == CONTENT_MISSING:
+ return None, "Request content is missing"
with decoded(flow.request):
if part == "h":
data += flow.request.assemble()
@@ -221,6 +252,8 @@ def copy_flow_format_data(part, scope, flow):
# Add padding between request and response
data += "\r\n" * 2
if scope in ("s", "a") and flow.response:
+ if flow.response.content is None or flow.response.content == CONTENT_MISSING:
+ return None, "Response content is missing"
with decoded(flow.response):
if part == "h":
data += flow.response.assemble()
@@ -228,40 +261,43 @@ def copy_flow_format_data(part, scope, flow):
data += flow.response.content
else:
raise ValueError("Unknown part: {}".format(part))
- return data
+ return data, False
def copy_flow(part, scope, flow, master, state):
"""
- part: _c_ontent, _a_ll, _u_rl
+ part: _c_ontent, _h_eaders+content, _u_rl
scope: _a_ll, re_q_uest, re_s_ponse
"""
- data = copy_flow_format_data(part, scope, flow)
+ data, err = copy_flow_format_data(part, scope, flow)
+
+ if err:
+ signals.status_message.send(message=err)
+ return
if not data:
if scope == "q":
- master.statusbar.message("No request content to copy.")
+ signals.status_message.send(message="No request content to copy.")
elif scope == "s":
- master.statusbar.message("No response content to copy.")
+ signals.status_message.send(message="No response content to copy.")
else:
- master.statusbar.message("No contents to copy.")
+ signals.status_message.send(message="No contents to copy.")
return
try:
master.add_event(str(len(data)))
pyperclip.copy(data)
- except RuntimeError:
+ except (RuntimeError, UnicodeDecodeError):
def save(k):
if k == "y":
- ask_save_path("Save data: ", data, master, state)
-
- master.prompt_onekey(
- "Cannot copy binary data to clipboard. Save as file?",
- (
+ ask_save_path("Save data", data, master, state)
+ signals.status_prompt_onekey.send(
+ prompt = "Cannot copy binary data to clipboard. Save as file?",
+ keys = (
("yes", "y"),
("no", "n"),
),
- save
+ callback = save
)
@@ -273,14 +309,11 @@ def ask_copy_part(scope, flow, master, state):
if scope != "s":
choices.append(("url", "u"))
- master.prompt_onekey(
- "Copy",
- choices,
- copy_flow,
- scope,
- flow,
- master,
- state
+ signals.status_prompt_onekey.send(
+ prompt = "Copy",
+ keys = choices,
+ callback = copy_flow,
+ args = (scope, flow, master, state)
)
@@ -297,16 +330,14 @@ def ask_save_body(part, master, state, flow):
# We first need to determine whether we want to save the request or the
# response content.
if request_has_content and response_has_content:
- master.prompt_onekey(
- "Save",
- (
+ signals.status_prompt_onekey.send(
+ prompt = "Save",
+ keys = (
("request", "q"),
("response", "s"),
),
- ask_save_body,
- master,
- state,
- flow
+ callback = ask_save_body,
+ args = (master, state, flow)
)
elif response_has_content:
ask_save_body("s", master, state, flow)
@@ -315,27 +346,23 @@ def ask_save_body(part, master, state, flow):
elif part == "q" and request_has_content:
ask_save_path(
- "Save request content: ",
+ "Save request content",
flow.request.get_decoded_content(),
master,
state
)
elif part == "s" and response_has_content:
ask_save_path(
- "Save response content: ",
+ "Save response content",
flow.response.get_decoded_content(),
master,
state
)
else:
- master.statusbar.message("No content to save.")
+ signals.status_message.send(message="No content to save.")
-class FlowCache:
- @utils.LRUCache(200)
- def format_flow(self, *args):
- return raw_format_flow(*args)
-flowcache = FlowCache()
+flowcache = utils.LRUCache(800)
def format_flow(f, focus, extended=False, hostheader=False, padding=2):
@@ -353,7 +380,7 @@ def format_flow(f, focus, extended=False, hostheader=False, padding=2):
)
if f.response:
if f.response.content:
- contentdesc = utils.pretty_size(len(f.response.content))
+ contentdesc = netlib.utils.pretty_size(len(f.response.content))
elif f.response.content == CONTENT_MISSING:
contentdesc = "[content missing]"
else:
@@ -374,6 +401,7 @@ def format_flow(f, focus, extended=False, hostheader=False, padding=2):
d["resp_ctype"] = t[0].split(";")[0]
else:
d["resp_ctype"] = ""
- return flowcache.format_flow(
+ return flowcache.get(
+ raw_format_flow,
tuple(sorted(d.items())), focus, extended, padding
)
diff --git a/libmproxy/console/contentview.py b/libmproxy/console/contentview.py
index 95d908a4..a121dfab 100644
--- a/libmproxy/console/contentview.py
+++ b/libmproxy/console/contentview.py
@@ -6,15 +6,15 @@ import lxml.html
import lxml.etree
from PIL import Image
from PIL.ExifTags import TAGS
-import re
import subprocess
import traceback
import urwid
import netlib.utils
+from netlib import odict
from . import common
-from .. import utils, encoding, flow
+from .. import utils, encoding
from ..contrib import jsbeautifier, html2text
from ..contrib.wbxml.ASCommandResponse import ASCommandResponse
@@ -59,7 +59,7 @@ def trailer(clen, txt, limit):
txt.append(
urwid.Text(
[
- ("highlight", "... %s of data not shown. Press "%utils.pretty_size(rem)),
+ ("highlight", "... %s of data not shown. Press "%netlib.utils.pretty_size(rem)),
("key", "f"),
("highlight", " to load all data.")
]
@@ -240,33 +240,13 @@ class ViewMultipart:
content_types = ["multipart/form-data"]
def __call__(self, hdrs, content, limit):
- v = hdrs.get_first("content-type")
+ v = utils.multipartdecode(hdrs, content)
if v:
- v = utils.parse_content_type(v)
- if not v:
- return
- boundary = v[2].get("boundary")
- if not boundary:
- return
-
- rx = re.compile(r'\bname="([^"]+)"')
- keys = []
- vals = []
-
- for i in content.split("--" + boundary):
- parts = i.splitlines()
- if len(parts) > 1 and parts[0][0:2] != "--":
- match = rx.search(parts[1])
- if match:
- keys.append(match.group(1) + ":")
- vals.append(netlib.utils.cleanBin(
- "\n".join(parts[3+parts[2:].index(""):])
- ))
r = [
urwid.Text(("highlight", "Form data:\n")),
]
r.extend(common.format_keyvals(
- zip(keys, vals),
+ v,
key = "header",
val = "text"
))
@@ -539,7 +519,7 @@ def get_content_view(viewmode, hdrItems, content, limit, logfunc, is_request):
return "No content", ""
msg = []
- hdrs = flow.ODictCaseless([list(i) for i in hdrItems])
+ hdrs = odict.ODictCaseless([list(i) for i in hdrItems])
enc = hdrs.get_first("content-encoding")
if enc and enc != "identity":
diff --git a/libmproxy/console/flowdetailview.py b/libmproxy/console/flowdetailview.py
index f351bff1..48845a62 100644
--- a/libmproxy/console/flowdetailview.py
+++ b/libmproxy/console/flowdetailview.py
@@ -1,113 +1,154 @@
from __future__ import absolute_import
import urwid
-from . import common
+from . import common, searchable
from .. import utils
-footer = [
- ('heading_key', "q"), ":back ",
-]
-class FlowDetailsView(urwid.ListBox):
- def __init__(self, master, flow, state):
- self.master, self.flow, self.state = master, flow, state
- urwid.ListBox.__init__(
- self,
- self.flowtext()
- )
+def maybe_timestamp(base, attr):
+ if base and getattr(base, attr):
+ return utils.format_timestamp_with_milli(getattr(base, attr))
+ else:
+ return "active"
+ pass
- def keypress(self, size, key):
- key = common.shortcuts(key)
- if key == "q":
- self.master.statusbar = self.state[0]
- self.master.body = self.state[1]
- self.master.header = self.state[2]
- self.master.loop.widget = self.master.make_view()
- return None
- elif key == "?":
- key = None
- return urwid.ListBox.keypress(self, size, key)
-
- def flowtext(self):
- text = []
-
- title = urwid.Text("Flow details")
- title = urwid.Padding(title, align="left", width=("relative", 100))
- title = urwid.AttrWrap(title, "heading")
- text.append(title)
-
- cc = self.flow.client_conn
- sc = self.flow.server_conn
- req = self.flow.request
- resp = self.flow.response
-
- if sc:
- text.append(urwid.Text([("head", "Server Connection:")]))
- parts = [
- ["Address", "%s:%s" % sc.address()],
- ]
- text.extend(common.format_keyvals(parts, key="key", val="text", indent=4))
-
- c = sc.cert
- if c:
- text.append(urwid.Text([("head", "Server Certificate:")]))
- parts = [
- ["Type", "%s, %s bits"%c.keyinfo],
- ["SHA1 digest", c.digest("sha1")],
- ["Valid to", str(c.notafter)],
- ["Valid from", str(c.notbefore)],
- ["Serial", str(c.serial)],
- [
- "Subject",
- urwid.BoxAdapter(
- urwid.ListBox(common.format_keyvals(c.subject, key="highlight", val="text")),
- len(c.subject)
- )
- ],
- [
- "Issuer",
- urwid.BoxAdapter(
- urwid.ListBox(common.format_keyvals(c.issuer, key="highlight", val="text")),
- len(c.issuer)
- )
- ]
- ]
+def flowdetails(state, flow):
+ text = []
- if c.altnames:
- parts.append(
- [
- "Alt names",
- ", ".join(c.altnames)
- ]
- )
- text.extend(common.format_keyvals(parts, key="key", val="text", indent=4))
+ cc = flow.client_conn
+ sc = flow.server_conn
+ req = flow.request
+ resp = flow.response
- if cc:
- text.append(urwid.Text([("head", "Client Connection:")]))
+ if sc:
+ text.append(urwid.Text([("head", "Server Connection:")]))
+ parts = [
+ ["Address", "%s:%s" % sc.address()],
+ ]
+
+ text.extend(
+ common.format_keyvals(parts, key="key", val="text", indent=4)
+ )
+ c = sc.cert
+ if c:
+ text.append(urwid.Text([("head", "Server Certificate:")]))
parts = [
- ["Address", "%s:%s" % cc.address()],
- # ["Requests", "%s"%cc.requestcount],
+ ["Type", "%s, %s bits"%c.keyinfo],
+ ["SHA1 digest", c.digest("sha1")],
+ ["Valid to", str(c.notafter)],
+ ["Valid from", str(c.notbefore)],
+ ["Serial", str(c.serial)],
+ [
+ "Subject",
+ urwid.BoxAdapter(
+ urwid.ListBox(
+ common.format_keyvals(
+ c.subject,
+ key="highlight",
+ val="text"
+ )
+ ),
+ len(c.subject)
+ )
+ ],
+ [
+ "Issuer",
+ urwid.BoxAdapter(
+ urwid.ListBox(
+ common.format_keyvals(
+ c.issuer, key="highlight", val="text"
+ )
+ ),
+ len(c.issuer)
+ )
+ ]
]
- text.extend(common.format_keyvals(parts, key="key", val="text", indent=4))
+ if c.altnames:
+ parts.append(
+ [
+ "Alt names",
+ ", ".join(c.altnames)
+ ]
+ )
+ text.extend(
+ common.format_keyvals(parts, key="key", val="text", indent=4)
+ )
- parts = []
+ if cc:
+ text.append(urwid.Text([("head", "Client Connection:")]))
- parts.append(["Client conn. established", utils.format_timestamp_with_milli(cc.timestamp_start) if (cc and cc.timestamp_start) else "active"])
- parts.append(["Server conn. initiated", utils.format_timestamp_with_milli(sc.timestamp_start) if sc else "active" ])
- parts.append(["Server conn. TCP handshake", utils.format_timestamp_with_milli(sc.timestamp_tcp_setup) if (sc and sc.timestamp_tcp_setup) else "active"])
- if sc.ssl_established:
- parts.append(["Server conn. SSL handshake", utils.format_timestamp_with_milli(sc.timestamp_ssl_setup) if sc.timestamp_ssl_setup else "active"])
- parts.append(["Client conn. SSL handshake", utils.format_timestamp_with_milli(cc.timestamp_ssl_setup) if (cc and cc.timestamp_ssl_setup) else "active"])
- parts.append(["First request byte", utils.format_timestamp_with_milli(req.timestamp_start)])
- parts.append(["Request complete", utils.format_timestamp_with_milli(req.timestamp_end) if req.timestamp_end else "active"])
- parts.append(["First response byte", utils.format_timestamp_with_milli(resp.timestamp_start) if resp else "active"])
- parts.append(["Response complete", utils.format_timestamp_with_milli(resp.timestamp_end) if (resp and resp.timestamp_end) else "active"])
+ parts = [
+ ["Address", "%s:%s" % cc.address()],
+ # ["Requests", "%s"%cc.requestcount],
+ ]
- # sort operations by timestamp
- parts = sorted(parts, key=lambda p: p[1])
+ text.extend(
+ common.format_keyvals(parts, key="key", val="text", indent=4)
+ )
- text.append(urwid.Text([("head", "Timing:")]))
- text.extend(common.format_keyvals(parts, key="key", val="text", indent=4))
- return text
+ parts = []
+
+ parts.append(
+ [
+ "Client conn. established",
+ maybe_timestamp(cc, "timestamp_start")
+ ]
+ )
+ parts.append(
+ [
+ "Server conn. initiated",
+ maybe_timestamp(sc, "timestamp_start")
+ ]
+ )
+ parts.append(
+ [
+ "Server conn. TCP handshake",
+ maybe_timestamp(sc, "timestamp_tcp_setup")
+ ]
+ )
+ if sc.ssl_established:
+ parts.append(
+ [
+ "Server conn. SSL handshake",
+ maybe_timestamp(sc, "timestamp_ssl_setup")
+ ]
+ )
+ parts.append(
+ [
+ "Client conn. SSL handshake",
+ maybe_timestamp(cc, "timestamp_ssl_setup")
+ ]
+ )
+ parts.append(
+ [
+ "First request byte",
+ maybe_timestamp(req, "timestamp_start")
+ ]
+ )
+ parts.append(
+ [
+ "Request complete",
+ maybe_timestamp(req, "timestamp_end")
+ ]
+ )
+ parts.append(
+ [
+ "First response byte",
+ maybe_timestamp(resp, "timestamp_start")
+ ]
+ )
+ parts.append(
+ [
+ "Response complete",
+ maybe_timestamp(resp, "timestamp_end")
+ ]
+ )
+
+ # sort operations by timestamp
+ parts = sorted(parts, key=lambda p: p[1])
+
+ text.append(urwid.Text([("head", "Timing:")]))
+ text.extend(common.format_keyvals(parts, key="key", val="text", indent=4))
+ return searchable.Searchable(state, text)
diff --git a/libmproxy/console/flowlist.py b/libmproxy/console/flowlist.py
index 5d8ad942..6ab45bad 100644
--- a/libmproxy/console/flowlist.py
+++ b/libmproxy/console/flowlist.py
@@ -1,7 +1,7 @@
from __future__ import absolute_import
import urwid
from netlib import http
-from . import common
+from . import common, signals
def _mkhelp():
@@ -15,10 +15,10 @@ def _mkhelp():
("D", "duplicate flow"),
("e", "toggle eventlog"),
("F", "toggle follow flow list"),
- ("g", "copy flow to clipboard"),
("l", "set limit filter pattern"),
("L", "load saved flows"),
("n", "create a new request"),
+ ("P", "copy flow to clipboard"),
("r", "replay request"),
("V", "revert changes to request"),
("w", "save flows "),
@@ -47,6 +47,10 @@ class EventListBox(urwid.ListBox):
if key == "C":
self.master.clear_events()
key = None
+ elif key == "G":
+ self.set_focus(0)
+ elif key == "g":
+ self.set_focus(len(self.master.eventlist)-1)
return urwid.ListBox.keypress(self, size, key)
@@ -111,17 +115,15 @@ class ConnectionItem(urwid.WidgetWrap):
def save_flows_prompt(self, k):
if k == "a":
- self.master.path_prompt(
- "Save all flows to: ",
- self.state.last_saveload,
- self.master.save_flows
+ signals.status_prompt_path.send(
+ prompt = "Save all flows to",
+ callback = self.master.save_flows
)
else:
- self.master.path_prompt(
- "Save this flow to: ",
- self.state.last_saveload,
- self.master.save_one_flow,
- self.flow
+ signals.status_prompt_path.send(
+ prompt = "Save this flow to",
+ callback = self.master.save_one_flow,
+ args = (self.flow,)
)
def stop_server_playback_prompt(self, a):
@@ -150,64 +152,64 @@ class ConnectionItem(urwid.WidgetWrap):
self.master.options.replay_ignore_host
)
else:
- self.master.path_prompt(
- "Server replay path: ",
- self.state.last_saveload,
- self.master.server_playback_path
+ signals.status_prompt_path.send(
+ prompt = "Server replay path",
+ callback = self.master.server_playback_path
)
def keypress(self, (maxcol,), key):
key = common.shortcuts(key)
if key == "a":
self.flow.accept_intercept(self.master)
- self.master.sync_list_view()
+ signals.flowlist_change.send(self)
elif key == "d":
self.flow.kill(self.master)
self.state.delete_flow(self.flow)
- self.master.sync_list_view()
+ signals.flowlist_change.send(self)
elif key == "D":
f = self.master.duplicate_flow(self.flow)
self.master.view_flow(f)
elif key == "r":
r = self.master.replay_request(self.flow)
if r:
- self.master.statusbar.message(r)
- self.master.sync_list_view()
+ signals.status_message.send(message=r)
+ signals.flowlist_change.send(self)
elif key == "S":
if not self.master.server_playback:
- self.master.prompt_onekey(
- "Server Replay",
- (
+ signals.status_prompt_onekey.send(
+ prompt = "Server Replay",
+ keys = (
("all flows", "a"),
("this flow", "t"),
("file", "f"),
),
- self.server_replay_prompt,
+ callback = self.server_replay_prompt,
)
else:
- self.master.prompt_onekey(
- "Stop current server replay?",
- (
+ signals.status_prompt_onekey.send(
+ prompt = "Stop current server replay?",
+ keys = (
("yes", "y"),
("no", "n"),
),
- self.stop_server_playback_prompt,
+ callback = self.stop_server_playback_prompt,
)
elif key == "V":
if not self.flow.modified():
- self.master.statusbar.message("Flow not modified.")
+ signals.status_message.send(message="Flow not modified.")
return
self.state.revert(self.flow)
- self.master.sync_list_view()
- self.master.statusbar.message("Reverted.")
+ signals.flowlist_change.send(self)
+ signals.status_message.send(message="Reverted.")
elif key == "w":
- self.master.prompt_onekey(
- "Save",
- (
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Save",
+ keys = (
("all flows", "a"),
("this flow", "t"),
),
- self.save_flows_prompt,
+ callback = self.save_flows_prompt,
)
elif key == "X":
self.flow.kill(self.master)
@@ -215,13 +217,12 @@ class ConnectionItem(urwid.WidgetWrap):
if self.flow.request:
self.master.view_flow(self.flow)
elif key == "|":
- self.master.path_prompt(
- "Send flow to script: ",
- self.state.last_script,
- self.master.run_script_once,
- self.flow
+ signals.status_prompt_path.send(
+ prompt = "Send flow to script",
+ callback = self.master.run_script_once,
+ args = (self.flow,)
)
- elif key == "g":
+ elif key == "P":
common.ask_copy_part("a", self.flow, self.master, self.state)
elif key == "b":
common.ask_save_body(None, self.master, self.state, self.flow)
@@ -232,8 +233,10 @@ class ConnectionItem(urwid.WidgetWrap):
class FlowListWalker(urwid.ListWalker):
def __init__(self, master, state):
self.master, self.state = master, state
- if self.state.flow_count():
- self.set_focus(0)
+ signals.flowlist_change.connect(self.sig_flowlist_change)
+
+ def sig_flowlist_change(self, sender):
+ self._modified()
def get_focus(self):
f, i = self.state.get_focus()
@@ -258,7 +261,10 @@ class FlowListWalker(urwid.ListWalker):
class FlowListBox(urwid.ListBox):
def __init__(self, master):
self.master = master
- urwid.ListBox.__init__(self, master.flow_list_walker)
+ urwid.ListBox.__init__(
+ self,
+ FlowListWalker(master, master.state)
+ )
def get_method_raw(self, k):
if k:
@@ -266,7 +272,12 @@ class FlowListBox(urwid.ListBox):
def get_method(self, k):
if k == "e":
- self.master.prompt("Method:", "", self.get_method_raw)
+ signals.status_prompt.send(
+ self,
+ prompt = "Method",
+ text = "",
+ callback = self.get_method_raw
+ )
else:
method = ""
for i in common.METHOD_OPTIONS:
@@ -275,17 +286,17 @@ class FlowListBox(urwid.ListBox):
self.get_url(method)
def get_url(self, method):
- self.master.prompt(
- "URL:",
- "http://www.example.com/",
- self.new_request,
- method
+ signals.status_prompt.send(
+ prompt = "URL",
+ text = "http://www.example.com/",
+ callback = self.new_request,
+ args = (method,)
)
def new_request(self, url, method):
parts = http.parse_url(str(url))
if not parts:
- self.master.statusbar.message("Invalid Url")
+ signals.status_message.send(message="Invalid Url")
return
scheme, host, port, path = parts
f = self.master.create_request(method, scheme, host, port, path)
@@ -295,28 +306,34 @@ class FlowListBox(urwid.ListBox):
key = common.shortcuts(key)
if key == "A":
self.master.accept_all()
- self.master.sync_list_view()
+ signals.flowlist_change.send(self)
elif key == "C":
self.master.clear_flows()
elif key == "e":
self.master.toggle_eventlog()
+ elif key == "G":
+ self.master.state.set_focus(0)
+ signals.flowlist_change.send(self)
+ elif key == "g":
+ self.master.state.set_focus(self.master.state.flow_count())
+ signals.flowlist_change.send(self)
elif key == "l":
- self.master.prompt(
- "Limit: ",
- self.master.state.limit_txt,
- self.master.set_limit
+ signals.status_prompt.send(
+ prompt = "Limit",
+ text = self.master.state.limit_txt,
+ callback = self.master.set_limit
)
elif key == "L":
- self.master.path_prompt(
- "Load flows: ",
- self.master.state.last_saveload,
- self.master.load_flows_callback
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Load flows",
+ callback = self.master.load_flows_callback
)
elif key == "n":
- self.master.prompt_onekey(
- "Method",
- common.METHOD_OPTIONS,
- self.get_method
+ signals.status_prompt_onekey.send(
+ prompt = "Method",
+ keys = common.METHOD_OPTIONS,
+ callback = self.get_method
)
elif key == "F":
self.master.toggle_follow_flows()
@@ -324,10 +341,10 @@ class FlowListBox(urwid.ListBox):
if self.master.stream:
self.master.stop_stream()
else:
- self.master.path_prompt(
- "Stream flows to: ",
- self.master.state.last_saveload,
- self.master.start_stream_to_path
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Stream flows to",
+ callback = self.master.start_stream_to_path
)
else:
return urwid.ListBox.keypress(self, size, key)
diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py
index 89e75aad..632b725e 100644
--- a/libmproxy/console/flowview.py
+++ b/libmproxy/console/flowview.py
@@ -1,12 +1,16 @@
from __future__ import absolute_import
-import os, sys, copy
+import os
+import sys
import urwid
-from . import common, grideditor, contentview
-from .. import utils, flow, controller
+from netlib import odict
+from . import common, grideditor, contentview, signals, searchable, tabs
+from . import flowdetailview
+from .. import utils, controller
from ..protocol.http import HTTPRequest, HTTPResponse, CONTENT_MISSING, decoded
-class SearchError(Exception): pass
+class SearchError(Exception):
+ pass
def _mkhelp():
@@ -19,7 +23,6 @@ def _mkhelp():
("D", "duplicate flow"),
("e", "edit request/response"),
("f", "load full body data"),
- ("g", "copy response(content/headers) to clipboard"),
("m", "change body display mode for this entity"),
(None,
common.highlight_key("automatic", "a") +
@@ -59,18 +62,19 @@ def _mkhelp():
),
("M", "change default body display mode"),
("p", "previous flow"),
+ ("P", "copy response(content/headers) to clipboard"),
("r", "replay request"),
("V", "revert changes to request"),
("v", "view body in external viewer"),
("w", "save all flows matching current limit"),
("W", "save this flow"),
("x", "delete body"),
- ("X", "view flow details"),
("z", "encode/decode a request/response"),
- ("tab", "toggle request/response view"),
+ ("tab", "next tab"),
+ ("h, l", "previous tab, next tab"),
("space", "next flow"),
("|", "run script on this flow"),
- ("/", "search in response body (case sensitive)"),
+ ("/", "search (case sensitive)"),
("n", "repeat search forward"),
("N", "repeat search backwards"),
]
@@ -87,109 +91,129 @@ footer = [
class FlowViewHeader(urwid.WidgetWrap):
def __init__(self, master, f):
self.master, self.flow = master, f
- self._w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost)
-
- def refresh_flow(self, f):
- if f == self.flow:
- self._w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost)
-
+ self._w = common.format_flow(
+ f,
+ False,
+ extended=True,
+ padding=0,
+ hostheader=self.master.showhost
+ )
+ signals.flow_change.connect(self.sig_flow_change)
+
+ def sig_flow_change(self, sender, flow):
+ if flow == self.flow:
+ self._w = common.format_flow(
+ flow,
+ False,
+ extended=True,
+ padding=0,
+ hostheader=self.master.showhost
+ )
-class CallbackCache:
- @utils.LRUCache(200)
- def _callback(self, method, *args, **kwargs):
- return getattr(self.obj, method)(*args, **kwargs)
- def callback(self, obj, method, *args, **kwargs):
- # obj varies!
- self.obj = obj
- return self._callback(method, *args, **kwargs)
-cache = CallbackCache()
+cache = utils.LRUCache(200)
+TAB_REQ = 0
+TAB_RESP = 1
-class FlowView(urwid.WidgetWrap):
- REQ = 0
- RESP = 1
+class FlowView(tabs.Tabs):
highlight_color = "focusfield"
- def __init__(self, master, state, flow):
+ def __init__(self, master, state, flow, tab_offset):
self.master, self.state, self.flow = master, state, flow
+ tabs.Tabs.__init__(self,
+ [
+ (self.tab_request, self.view_request),
+ (self.tab_response, self.view_response),
+ (self.tab_details, self.view_details),
+ ],
+ tab_offset
+ )
+ self.show()
self.last_displayed_body = None
- if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
- self.view_response()
- else:
- self.view_request()
+ signals.flow_change.connect(self.sig_flow_change)
- def _cached_content_view(self, viewmode, hdrItems, content, limit, is_request):
- return contentview.get_content_view(viewmode, hdrItems, content, limit, self.master.add_event, is_request)
+ def tab_request(self):
+ if self.flow.intercepted and not self.flow.reply.acked and not self.flow.response:
+ return "Request intercepted"
+ else:
+ return "Request"
- def content_view(self, viewmode, conn):
- full = self.state.get_flow_setting(
- self.flow,
- (self.state.view_flow_mode, "fullcontents"),
- False
- )
- if full:
- limit = sys.maxint
+ def tab_response(self):
+ if self.flow.intercepted and not self.flow.reply.acked and self.flow.response:
+ return "Response intercepted"
else:
- limit = contentview.VIEW_CUTOFF
- description, text_objects = cache.callback(
- self, "_cached_content_view",
- viewmode,
- tuple(tuple(i) for i in conn.headers.lst),
- conn.content,
- limit,
- isinstance(conn, HTTPRequest)
- )
- return (description, text_objects)
+ return "Response"
- def cont_view_handle_missing(self, conn, viewmode):
- if conn.content == CONTENT_MISSING:
- msg, body = "", [urwid.Text([("error", "[content missing]")])]
- else:
- msg, body = self.content_view(viewmode, conn)
+ def tab_details(self):
+ return "Detail"
+
+ def view_request(self):
+ return self.conn_text(self.flow.request)
+
+ def view_response(self):
+ return self.conn_text(self.flow.response)
+ def view_details(self):
+ return flowdetailview.flowdetails(self.state, self.flow)
+
+ def sig_flow_change(self, sender, flow):
+ if flow == self.flow:
+ self.show()
+
+ def content_view(self, viewmode, conn):
+ if conn.content == CONTENT_MISSING:
+ msg, body = "", [urwid.Text([("error", "[content missing]")])]
return (msg, body)
+ else:
+ full = self.state.get_flow_setting(
+ self.flow,
+ (self.tab_offset, "fullcontents"),
+ False
+ )
+ if full:
+ limit = sys.maxint
+ else:
+ limit = contentview.VIEW_CUTOFF
+ description, text_objects = cache.get(
+ contentview.get_content_view,
+ viewmode,
+ tuple(tuple(i) for i in conn.headers.lst),
+ conn.content,
+ limit,
+ self.master.add_event,
+ isinstance(conn, HTTPRequest)
+ )
+ return (description, text_objects)
- def viewmode_get(self, override):
+ def viewmode_get(self):
+ override = self.state.get_flow_setting(
+ self.flow,
+ (self.tab_offset, "prettyview")
+ )
return self.state.default_body_view if override is None else override
- def override_get(self):
- return self.state.get_flow_setting(self.flow,
- (self.state.view_flow_mode, "prettyview"))
-
- def conn_text_raw(self, conn):
- """
- Based on a request/response, conn, returns the elements for
- display.
- """
- headers = common.format_keyvals(
+ def conn_text(self, conn):
+ if conn:
+ txt = common.format_keyvals(
[(h+":", v) for (h, v) in conn.headers.lst],
key = "header",
val = "text"
)
- override = self.override_get()
- viewmode = self.viewmode_get(override)
- msg, body = self.cont_view_handle_missing(conn, viewmode)
- return headers, msg, body
-
- def conn_text_merge(self, headers, msg, body):
- """
- Grabs what is returned by conn_text_raw and merges them all
- toghether, mainly used by conn_text and search
- """
- override = self.override_get()
- viewmode = self.viewmode_get(override)
-
- cols = [urwid.Text(
- [
- ("heading", msg),
- ]
- )
- ]
+ viewmode = self.viewmode_get()
+ msg, body = self.content_view(viewmode, conn)
- if override is not None:
- cols.append(urwid.Text([
+ cols = [
+ urwid.Text(
+ [
+ ("heading", msg),
+ ]
+ )
+ ]
+ cols.append(
+ urwid.Text(
+ [
" ",
('heading', "["),
('heading_key', "m"),
@@ -198,306 +222,40 @@ class FlowView(urwid.WidgetWrap):
align="right"
)
)
+ title = urwid.AttrWrap(urwid.Columns(cols), "heading")
- title = urwid.AttrWrap(urwid.Columns(cols), "heading")
- headers.append(title)
- headers.extend(body)
-
- return headers
-
- def conn_text(self, conn):
- """
- Same as conn_text_raw, but returns result wrapped in a listbox ready for usage.
- """
- headers, msg, body = self.conn_text_raw(conn)
- merged = self.conn_text_merge(headers, msg, body)
- return urwid.ListBox(merged)
-
- def _tab(self, content, attr):
- p = urwid.Text(content)
- p = urwid.Padding(p, align="left", width=("relative", 100))
- p = urwid.AttrWrap(p, attr)
- return p
-
- def wrap_body(self, active, body):
- parts = []
-
- if self.flow.intercepted and not self.flow.reply.acked and not self.flow.response:
- qt = "Request intercepted"
- else:
- qt = "Request"
- if active == common.VIEW_FLOW_REQUEST:
- parts.append(self._tab(qt, "heading"))
- else:
- parts.append(self._tab(qt, "heading_inactive"))
-
- if self.flow.intercepted and not self.flow.reply.acked and self.flow.response:
- st = "Response intercepted"
- else:
- st = "Response"
- if active == common.VIEW_FLOW_RESPONSE:
- parts.append(self._tab(st, "heading"))
+ txt.append(title)
+ txt.extend(body)
else:
- parts.append(self._tab(st, "heading_inactive"))
-
- h = urwid.Columns(parts)
- f = urwid.Frame(
- body,
- header=h
+ txt = [
+ urwid.Text(""),
+ urwid.Text(
+ [
+ ("highlight", "No response. Press "),
+ ("key", "e"),
+ ("highlight", " and edit any aspect to add one."),
+ ]
)
- return f
-
- def search_wrapped_around(self, last_find_line, last_search_index, backwards):
- """
- returns true if search wrapped around the bottom.
- """
-
- current_find_line = self.state.get_flow_setting(self.flow,
- "last_find_line")
- current_search_index = self.state.get_flow_setting(self.flow,
- "last_search_index")
-
- if not backwards:
- message = "search hit BOTTOM, continuing at TOP"
- if current_find_line <= last_find_line:
- return True, message
- elif current_find_line == last_find_line:
- if current_search_index <= last_search_index:
- return True, message
- else:
- message = "search hit TOP, continuing at BOTTOM"
- if current_find_line >= last_find_line:
- return True, message
- elif current_find_line == last_find_line:
- if current_search_index >= last_search_index:
- return True, message
-
- return False, ""
-
- def search_again(self, backwards=False):
- """
- runs the previous search again, forwards or backwards.
- """
- last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
- if last_search_string:
- message = self.search(last_search_string, backwards)
- if message:
- self.master.statusbar.message(message)
- else:
- message = "no previous searches have been made"
- self.master.statusbar.message(message)
-
- return message
-
- def search(self, search_string, backwards=False):
- """
- similar to view_response or view_request, but instead of just
- displaying the conn, it highlights a word that the user is
- searching for and handles all the logic surrounding that.
- """
-
- if not search_string:
- search_string = self.state.get_flow_setting(self.flow,
- "last_search_string")
- if not search_string:
- return
-
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- text = self.flow.request
- const = common.VIEW_FLOW_REQUEST
- else:
- text = self.flow.response
- const = common.VIEW_FLOW_RESPONSE
- if not self.flow.response:
- return "no response to search in"
-
- last_find_line = self.state.get_flow_setting(self.flow,
- "last_find_line")
- last_search_index = self.state.get_flow_setting(self.flow,
- "last_search_index")
-
- # generate the body, highlight the words and get focus
- headers, msg, body = self.conn_text_raw(text)
- try:
- body, focus_position = self.search_highlight_text(body, search_string, backwards=backwards)
- except SearchError:
- return "Search not supported in this view."
-
- if focus_position == None:
- # no results found.
- return "no matches for '%s'" % search_string
-
- # UI stuff.
- merged = self.conn_text_merge(headers, msg, body)
- list_box = urwid.ListBox(merged)
- list_box.set_focus(focus_position + 2)
- self._w = self.wrap_body(const, list_box)
- self.master.statusbar.redraw()
-
- self.last_displayed_body = list_box
-
- wrapped, wrapped_message = self.search_wrapped_around(last_find_line, last_search_index, backwards)
-
- if wrapped:
- return wrapped_message
-
- def search_get_start(self, search_string):
- start_line = 0
- start_index = 0
- last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
- if search_string == last_search_string:
- start_line = self.state.get_flow_setting(self.flow, "last_find_line")
- start_index = self.state.get_flow_setting(self.flow,
- "last_search_index")
-
- if start_index == None:
- start_index = 0
- else:
- start_index += len(search_string)
-
- if start_line == None:
- start_line = 0
-
- else:
- self.state.add_flow_setting(self.flow, "last_search_string",
- search_string)
-
- return (start_line, start_index)
-
- def search_get_range(self, len_text_objects, start_line, backwards):
- if not backwards:
- loop_range = xrange(start_line, len_text_objects)
- else:
- loop_range = xrange(start_line, -1, -1)
-
- return loop_range
-
- def search_find(self, text, search_string, start_index, backwards):
- if backwards == False:
- find_index = text.find(search_string, start_index)
- else:
- if start_index != 0:
- start_index -= len(search_string)
- else:
- start_index = None
-
- find_index = text.rfind(search_string, 0, start_index)
-
- return find_index
-
- def search_highlight_text(self, text_objects, search_string, looping = False, backwards = False):
- start_line, start_index = self.search_get_start(search_string)
- i = start_line
-
- found = False
- text_objects = copy.deepcopy(text_objects)
- loop_range = self.search_get_range(len(text_objects), start_line, backwards)
- for i in loop_range:
- text_object = text_objects[i]
-
- try:
- text, style = text_object.get_text()
- except AttributeError:
- raise SearchError()
-
- if i != start_line:
- start_index = 0
-
- find_index = self.search_find(text, search_string, start_index, backwards)
-
- if find_index != -1:
- new_text = self.search_highlight_object(text, find_index, search_string)
- text_objects[i] = new_text
-
- found = True
- self.state.add_flow_setting(self.flow, "last_search_index",
- find_index)
- self.state.add_flow_setting(self.flow, "last_find_line", i)
-
- break
-
- # handle search WRAP
- if found:
- focus_pos = i
- else :
- if looping:
- focus_pos = None
- else:
- if not backwards:
- self.state.add_flow_setting(self.flow, "last_search_index", 0)
- self.state.add_flow_setting(self.flow, "last_find_line", 0)
- else:
- self.state.add_flow_setting(self.flow, "last_search_index", None)
- self.state.add_flow_setting(self.flow, "last_find_line", len(text_objects) - 1)
-
- text_objects, focus_pos = self.search_highlight_text(text_objects,
- search_string, looping=True, backwards=backwards)
-
- return text_objects, focus_pos
-
- def search_highlight_object(self, text_object, find_index, search_string):
- """
- just a little abstraction
- """
- before = text_object[:find_index]
- after = text_object[find_index+len(search_string):]
-
- new_text = urwid.Text(
- [
- before,
- (self.highlight_color, search_string),
- after,
]
- )
-
- return new_text
-
- def view_request(self):
- self.state.view_flow_mode = common.VIEW_FLOW_REQUEST
- body = self.conn_text(self.flow.request)
- self._w = self.wrap_body(common.VIEW_FLOW_REQUEST, body)
- self.master.statusbar.redraw()
-
- def view_response(self):
- self.state.view_flow_mode = common.VIEW_FLOW_RESPONSE
- if self.flow.response:
- body = self.conn_text(self.flow.response)
- else:
- body = urwid.ListBox(
- [
- urwid.Text(""),
- urwid.Text(
- [
- ("highlight", "No response. Press "),
- ("key", "e"),
- ("highlight", " and edit any aspect to add one."),
- ]
- )
- ]
- )
- self._w = self.wrap_body(common.VIEW_FLOW_RESPONSE, body)
- self.master.statusbar.redraw()
-
- def refresh_flow(self, c=None):
- if c == self.flow:
- if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and self.flow.response:
- self.view_response()
- else:
- self.view_request()
+ return searchable.Searchable(self.state, txt)
def set_method_raw(self, m):
if m:
self.flow.request.method = m
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def edit_method(self, m):
if m == "e":
- self.master.prompt_edit("Method", self.flow.request.method, self.set_method_raw)
+ signals.status_prompt.send(
+ prompt = "Method",
+ text = self.flow.request.method,
+ callback = self.set_method_raw
+ )
else:
for i in common.METHOD_OPTIONS:
if i[1] == m:
self.flow.request.method = i[0].upper()
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def set_url(self, url):
request = self.flow.request
@@ -505,7 +263,7 @@ class FlowView(urwid.WidgetWrap):
request.url = str(url)
except ValueError:
return "Invalid URL."
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def set_resp_code(self, code):
response = self.flow.response
@@ -516,85 +274,158 @@ class FlowView(urwid.WidgetWrap):
import BaseHTTPServer
if BaseHTTPServer.BaseHTTPRequestHandler.responses.has_key(int(code)):
response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[int(code)][0]
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def set_resp_msg(self, msg):
response = self.flow.response
response.msg = msg
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def set_headers(self, lst, conn):
- conn.headers = flow.ODictCaseless(lst)
+ conn.headers = odict.ODictCaseless(lst)
+ signals.flow_change.send(self, flow = self.flow)
def set_query(self, lst, conn):
- conn.set_query(flow.ODict(lst))
+ conn.set_query(odict.ODict(lst))
+ signals.flow_change.send(self, flow = self.flow)
def set_path_components(self, lst, conn):
- conn.set_path_components([i[0] for i in lst])
+ conn.set_path_components(lst)
+ signals.flow_change.send(self, flow = self.flow)
def set_form(self, lst, conn):
- conn.set_form_urlencoded(flow.ODict(lst))
+ conn.set_form_urlencoded(odict.ODict(lst))
+ signals.flow_change.send(self, flow = self.flow)
def edit_form(self, conn):
self.master.view_grideditor(
- grideditor.URLEncodedFormEditor(self.master, conn.get_form_urlencoded().lst, self.set_form, conn)
+ grideditor.URLEncodedFormEditor(
+ self.master,
+ conn.get_form_urlencoded().lst,
+ self.set_form,
+ conn
+ )
)
def edit_form_confirm(self, key, conn):
if key == "y":
self.edit_form(conn)
+ def set_cookies(self, lst, conn):
+ od = odict.ODict(lst)
+ conn.set_cookies(od)
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_setcookies(self, data, conn):
+ conn.set_cookies(data)
+ signals.flow_change.send(self, flow = self.flow)
+
def edit(self, part):
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
+ if self.tab_offset == TAB_REQ:
message = self.flow.request
else:
if not self.flow.response:
self.flow.response = HTTPResponse(
self.flow.request.httpversion,
- 200, "OK", flow.ODictCaseless(), ""
+ 200, "OK", odict.ODictCaseless(), ""
)
self.flow.response.reply = controller.DummyReply()
message = self.flow.response
self.flow.backup()
+ if message == self.flow.request and part == "c":
+ self.master.view_grideditor(
+ grideditor.CookieEditor(
+ self.master,
+ message.get_cookies().lst,
+ self.set_cookies,
+ message
+ )
+ )
+ if message == self.flow.response and part == "c":
+ self.master.view_grideditor(
+ grideditor.SetCookieEditor(
+ self.master,
+ message.get_cookies(),
+ self.set_setcookies,
+ message
+ )
+ )
if part == "r":
with decoded(message):
- # Fix an issue caused by some editors when editing a request/response body.
- # Many editors make it hard to save a file without a terminating newline on the last
- # line. When editing message bodies, this can cause problems. For now, I just
- # strip the newlines off the end of the body when we return from an editor.
+ # Fix an issue caused by some editors when editing a
+ # request/response body. Many editors make it hard to save a
+ # file without a terminating newline on the last line. When
+ # editing message bodies, this can cause problems. For now, I
+ # just strip the newlines off the end of the body when we return
+ # from an editor.
c = self.master.spawn_editor(message.content or "")
message.content = c.rstrip("\n")
elif part == "f":
if not message.get_form_urlencoded() and message.content:
- self.master.prompt_onekey(
- "Existing body is not a URL-encoded form. Clear and edit?",
- [
+ signals.status_prompt_onekey.send(
+ prompt = "Existing body is not a URL-encoded form. Clear and edit?",
+ keys = [
("yes", "y"),
("no", "n"),
],
- self.edit_form_confirm,
- message
+ callback = self.edit_form_confirm,
+ args = (message,)
)
else:
self.edit_form(message)
elif part == "h":
- self.master.view_grideditor(grideditor.HeaderEditor(self.master, message.headers.lst, self.set_headers, message))
+ self.master.view_grideditor(
+ grideditor.HeaderEditor(
+ self.master,
+ message.headers.lst,
+ self.set_headers,
+ message
+ )
+ )
elif part == "p":
p = message.get_path_components()
- p = [[i] for i in p]
- self.master.view_grideditor(grideditor.PathEditor(self.master, p, self.set_path_components, message))
+ self.master.view_grideditor(
+ grideditor.PathEditor(
+ self.master,
+ p,
+ self.set_path_components,
+ message
+ )
+ )
elif part == "q":
- self.master.view_grideditor(grideditor.QueryEditor(self.master, message.get_query().lst, self.set_query, message))
- elif part == "u" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- self.master.prompt_edit("URL", message.url, self.set_url)
- elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- self.master.prompt_onekey("Method", common.METHOD_OPTIONS, self.edit_method)
- elif part == "c" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
- self.master.prompt_edit("Code", str(message.code), self.set_resp_code)
- elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
- self.master.prompt_edit("Message", message.msg, self.set_resp_msg)
- self.master.refresh_flow(self.flow)
+ self.master.view_grideditor(
+ grideditor.QueryEditor(
+ self.master,
+ message.get_query().lst,
+ self.set_query, message
+ )
+ )
+ elif part == "u":
+ signals.status_prompt.send(
+ prompt = "URL",
+ text = message.url,
+ callback = self.set_url
+ )
+ elif part == "m":
+ signals.status_prompt_onekey.send(
+ prompt = "Method",
+ keys = common.METHOD_OPTIONS,
+ callback = self.edit_method
+ )
+ elif part == "o":
+ signals.status_prompt.send(
+ prompt = "Code",
+ text = str(message.code),
+ callback = self.set_resp_code
+ )
+ elif part == "m":
+ signals.status_prompt.send(
+ prompt = "Message",
+ text = message.msg,
+ callback = self.set_resp_msg
+ )
+ signals.flow_change.send(self, flow = self.flow)
def _view_nextprev_flow(self, np, flow):
try:
@@ -606,9 +437,10 @@ class FlowView(urwid.WidgetWrap):
else:
new_flow, new_idx = self.state.get_prev(idx)
if new_flow is None:
- self.master.statusbar.message("No more flows!")
- return
- self.master.view_flow(new_flow)
+ signals.status_message.send(message="No more flows!")
+ else:
+ signals.pop_view_state.send(self)
+ self.master.view_flow(new_flow, self.tab_offset)
def view_next_flow(self, flow):
return self._view_nextprev_flow("next", flow)
@@ -619,42 +451,38 @@ class FlowView(urwid.WidgetWrap):
def change_this_display_mode(self, t):
self.state.add_flow_setting(
self.flow,
- (self.state.view_flow_mode, "prettyview"),
+ (self.tab_offset, "prettyview"),
contentview.get_by_shortcut(t)
)
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def delete_body(self, t):
if t == "m":
val = CONTENT_MISSING
else:
val = None
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
+ if self.tab_offset == TAB_REQ:
self.flow.request.content = val
else:
self.flow.response.content = val
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def keypress(self, size, key):
+ key = super(self.__class__, self).keypress(size, key)
+
if key == " ":
self.view_next_flow(self.flow)
return
key = common.shortcuts(key)
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
+ if self.tab_offset == TAB_REQ:
conn = self.flow.request
- else:
+ elif self.tab_offset == TAB_RESP:
conn = self.flow.response
+ else:
+ conn = None
- if key == "q":
- self.master.view_flowlist()
- key = None
- elif key == "tab":
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- self.view_response()
- else:
- self.view_request()
- elif key in ("up", "down", "page up", "page down"):
+ if key in ("up", "down", "page up", "page down"):
# Why doesn't this just work??
self._w.keypress(size, key)
elif key == "a":
@@ -663,11 +491,6 @@ class FlowView(urwid.WidgetWrap):
elif key == "A":
self.master.accept_all()
self.master.view_flow(self.flow)
- elif key == "b":
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- common.ask_save_body("q", self.master, self.state, self.flow)
- else:
- common.ask_save_body("s", self.master, self.state, self.flow)
elif key == "d":
if self.state.flow_count() == 1:
self.master.view_flowlist()
@@ -681,134 +504,143 @@ class FlowView(urwid.WidgetWrap):
elif key == "D":
f = self.master.duplicate_flow(self.flow)
self.master.view_flow(f)
- self.master.statusbar.message("Duplicated.")
- elif key == "e":
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- self.master.prompt_onekey(
- "Edit request",
- (
- ("query", "q"),
- ("path", "p"),
- ("url", "u"),
- ("header", "h"),
- ("form", "f"),
- ("raw body", "r"),
- ("method", "m"),
- ),
- self.edit
- )
- else:
- self.master.prompt_onekey(
- "Edit response",
- (
- ("code", "c"),
- ("message", "m"),
- ("header", "h"),
- ("raw body", "r"),
- ),
- self.edit
- )
- key = None
- elif key == "f":
- self.master.statusbar.message("Loading all body data...")
- self.state.add_flow_setting(
- self.flow,
- (self.state.view_flow_mode, "fullcontents"),
- True
- )
- self.master.refresh_flow(self.flow)
- self.master.statusbar.message("")
- elif key == "g":
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- scope = "q"
- else:
- scope = "s"
- common.ask_copy_part(scope, self.flow, self.master, self.state)
- elif key == "m":
- p = list(contentview.view_prompts)
- p.insert(0, ("Clear", "C"))
- self.master.prompt_onekey(
- "Display mode",
- p,
- self.change_this_display_mode
- )
- key = None
+ signals.status_message.send(message="Duplicated.")
elif key == "p":
self.view_prev_flow(self.flow)
elif key == "r":
r = self.master.replay_request(self.flow)
if r:
- self.master.statusbar.message(r)
- self.master.refresh_flow(self.flow)
+ signals.status_message.send(message=r)
+ signals.flow_change.send(self, flow = self.flow)
elif key == "V":
if not self.flow.modified():
- self.master.statusbar.message("Flow not modified.")
+ signals.status_message.send(message="Flow not modified.")
return
self.state.revert(self.flow)
- self.master.refresh_flow(self.flow)
- self.master.statusbar.message("Reverted.")
+ signals.flow_change.send(self, flow = self.flow)
+ signals.status_message.send(message="Reverted.")
elif key == "W":
- self.master.path_prompt(
- "Save this flow: ",
- self.state.last_saveload,
- self.master.save_one_flow,
- self.flow
+ signals.status_prompt_path.send(
+ prompt = "Save this flow",
+ callback = self.master.save_one_flow,
+ args = (self.flow,)
)
- elif key == "v":
- if conn and conn.content:
- t = conn.headers["content-type"] or [None]
- t = t[0]
- if os.environ.has_key("EDITOR") or os.environ.has_key("PAGER"):
- self.master.spawn_external_viewer(conn.content, t)
- else:
- self.master.statusbar.message("Error! Set $EDITOR or $PAGER.")
elif key == "|":
- self.master.path_prompt(
- "Send flow to script: ", self.state.last_script,
- self.master.run_script_once, self.flow
+ signals.status_prompt_path.send(
+ prompt = "Send flow to script",
+ callback = self.master.run_script_once,
+ args = (self.flow,)
)
- elif key == "x":
- self.master.prompt_onekey(
- "Delete body",
- (
- ("completely", "c"),
- ("mark as missing", "m"),
- ),
- self.delete_body
+
+ if not conn and key in set(list("befgmxvz")):
+ signals.status_message.send(
+ message = "Tab to the request or response",
+ expire = 1
)
- key = None
- elif key == "X":
- self.master.view_flowdetails(self.flow)
- elif key == "z":
- if conn:
+ elif conn:
+ if key == "b":
+ if self.tab_offset == TAB_REQ:
+ common.ask_save_body(
+ "q", self.master, self.state, self.flow
+ )
+ else:
+ common.ask_save_body(
+ "s", self.master, self.state, self.flow
+ )
+ elif key == "e":
+ if self.tab_offset == TAB_REQ:
+ signals.status_prompt_onekey.send(
+ prompt = "Edit request",
+ keys = (
+ ("cookies", "c"),
+ ("query", "q"),
+ ("path", "p"),
+ ("url", "u"),
+ ("header", "h"),
+ ("form", "f"),
+ ("raw body", "r"),
+ ("method", "m"),
+ ),
+ callback = self.edit
+ )
+ else:
+ signals.status_prompt_onekey.send(
+ prompt = "Edit response",
+ keys = (
+ ("cookies", "c"),
+ ("code", "o"),
+ ("message", "m"),
+ ("header", "h"),
+ ("raw body", "r"),
+ ),
+ callback = self.edit
+ )
+ key = None
+ elif key == "f":
+ signals.status_message.send(message="Loading all body data...")
+ self.state.add_flow_setting(
+ self.flow,
+ (self.tab_offset, "fullcontents"),
+ True
+ )
+ signals.flow_change.send(self, flow = self.flow)
+ signals.status_message.send(message="")
+ elif key == "P":
+ if self.tab_offset == TAB_REQ:
+ scope = "q"
+ else:
+ scope = "s"
+ common.ask_copy_part(scope, self.flow, self.master, self.state)
+ elif key == "m":
+ p = list(contentview.view_prompts)
+ p.insert(0, ("Clear", "C"))
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Display mode",
+ keys = p,
+ callback = self.change_this_display_mode
+ )
+ key = None
+ elif key == "x":
+ signals.status_prompt_onekey.send(
+ prompt = "Delete body",
+ keys = (
+ ("completely", "c"),
+ ("mark as missing", "m"),
+ ),
+ callback = self.delete_body
+ )
+ key = None
+ elif key == "v":
+ if conn.content:
+ t = conn.headers["content-type"] or [None]
+ t = t[0]
+ if os.environ.has_key("EDITOR") or os.environ.has_key("PAGER"):
+ self.master.spawn_external_viewer(conn.content, t)
+ else:
+ signals.status_message.send(
+ message = "Error! Set $EDITOR or $PAGER."
+ )
+ elif key == "z":
self.flow.backup()
e = conn.headers.get_first("content-encoding", "identity")
if e != "identity":
if not conn.decode():
- self.master.statusbar.message("Could not decode - invalid data?")
+ signals.status_message.send(
+ message = "Could not decode - invalid data?"
+ )
else:
- self.master.prompt_onekey(
- "Select encoding: ",
- (
+ signals.status_prompt_onekey.send(
+ prompt = "Select encoding: ",
+ keys = (
("gzip", "z"),
("deflate", "d"),
),
- self.encode_callback,
- conn
+ callback = self.encode_callback,
+ args = (conn,)
)
- self.master.refresh_flow(self.flow)
- elif key == "/":
- last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
- search_prompt = "Search body ["+last_search_string+"]: " if last_search_string else "Search body: "
- self.master.prompt(search_prompt,
- None,
- self.search)
- elif key == "n":
- self.search_again(backwards=False)
- elif key == "N":
- self.search_again(backwards=True)
- else:
- return key
+ signals.flow_change.send(self, flow = self.flow)
+ return key
def encode_callback(self, key, conn):
encoding_map = {
@@ -816,4 +648,4 @@ class FlowView(urwid.WidgetWrap):
"d": "deflate",
}
conn.encode(encoding_map[key])
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
diff --git a/libmproxy/console/grideditor.py b/libmproxy/console/grideditor.py
index fe3df509..5a2da59f 100644
--- a/libmproxy/console/grideditor.py
+++ b/libmproxy/console/grideditor.py
@@ -5,31 +5,99 @@ import re
import os
import urwid
-from . import common
+from . import common, signals
from .. import utils, filt, script
-from netlib import http_uastrings
+from netlib import http_uastrings, http_cookies, odict
-footer = [
+FOOTER = [
('heading_key', "enter"), ":edit ",
('heading_key', "q"), ":back ",
]
-footer_editing = [
+FOOTER_EDITING = [
('heading_key', "esc"), ":stop editing ",
]
-class SText(urwid.WidgetWrap):
- def __init__(self, txt, focused, error):
+class TextColumn:
+ subeditor = None
+
+ def __init__(self, heading):
+ self.heading = heading
+
+ def text(self, obj):
+ return SEscaped(obj or "")
+
+ def blank(self):
+ return ""
+
+ def keypress(self, key, editor):
+ if key == "r":
+ if editor.walker.get_current_value() is not None:
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Read file",
+ callback = editor.read_file
+ )
+ elif key == "R":
+ if editor.walker.get_current_value() is not None:
+ signals.status_prompt_path.send(
+ editor,
+ prompt = "Read unescaped file",
+ callback = editor.read_file,
+ args = (True,)
+ )
+ elif key == "e":
+ o = editor.walker.get_current_value()
+ if o is not None:
+ n = editor.master.spawn_editor(o.encode("string-escape"))
+ n = utils.clean_hanging_newline(n)
+ editor.walker.set_current_value(n, False)
+ editor.walker._modified()
+ elif key in ["enter"]:
+ editor.walker.start_edit()
+ else:
+ return key
+
+
+class SubgridColumn:
+ def __init__(self, heading, subeditor):
+ self.heading = heading
+ self.subeditor = subeditor
+
+ def text(self, obj):
+ p = http_cookies._format_pairs(obj, sep="\n")
+ return urwid.Text(p)
+
+ def blank(self):
+ return []
+
+ def keypress(self, key, editor):
+ if key in "rRe":
+ signals.status_message.send(
+ self,
+ message = "Press enter to edit this field.",
+ expire = 1000
+ )
+ return
+ elif key in ["enter"]:
+ editor.master.view_grideditor(
+ self.subeditor(
+ editor.master,
+ editor.walker.get_current_value(),
+ editor.set_subeditor_value,
+ editor.walker.focus,
+ editor.walker.focus_col
+ )
+ )
+ else:
+ return key
+
+
+class SEscaped(urwid.WidgetWrap):
+ def __init__(self, txt):
txt = txt.encode("string-escape")
w = urwid.Text(txt, wrap="any")
- if focused:
- if error:
- w = urwid.AttrWrap(w, "focusfield_error")
- else:
- w = urwid.AttrWrap(w, "focusfield")
- elif error:
- w = urwid.AttrWrap(w, "field_error")
urwid.WidgetWrap.__init__(self, w)
def get_text(self):
@@ -50,7 +118,7 @@ class SEdit(urwid.WidgetWrap):
urwid.WidgetWrap.__init__(self, w)
def get_text(self):
- return self._w.get_text()[0]
+ return self._w.get_text()[0].strip()
def selectable(self):
return True
@@ -67,9 +135,15 @@ class GridRow(urwid.WidgetWrap):
self.editing = SEdit(v)
self.fields.append(self.editing)
else:
- self.fields.append(
- SText(v, True if focused == i else False, i in errors)
- )
+ w = self.editor.columns[i].text(v)
+ if focused == i:
+ if i in errors:
+ w = urwid.AttrWrap(w, "focusfield_error")
+ else:
+ w = urwid.AttrWrap(w, "focusfield")
+ elif i in errors:
+ w = urwid.AttrWrap(w, "field_error")
+ self.fields.append(w)
fspecs = self.fields[:]
if len(self.fields) > 1:
@@ -125,21 +199,28 @@ class GridWalker(urwid.ListWalker):
try:
val = val.decode("string-escape")
except ValueError:
- self.editor.master.statusbar.message(
- "Invalid Python-style string encoding.", 1000
+ signals.status_message.send(
+ self,
+ message = "Invalid Python-style string encoding.",
+ expire = 1000
)
return
errors = self.lst[self.focus][1]
emsg = self.editor.is_error(self.focus_col, val)
if emsg:
- self.editor.master.statusbar.message(emsg, 1000)
+ signals.status_message.send(message = emsg, expire = 1)
errors.add(self.focus_col)
else:
errors.discard(self.focus_col)
-
- row = list(self.lst[self.focus][0])
- row[self.focus_col] = val
- self.lst[self.focus] = [tuple(row), errors]
+ self.set_value(val, self.focus, self.focus_col, errors)
+
+ def set_value(self, val, focus, focus_col, errors=None):
+ if not errors:
+ errors = set([])
+ row = list(self.lst[focus][0])
+ row[focus_col] = val
+ self.lst[focus] = [tuple(row), errors]
+ self._modified()
def delete_focus(self):
if self.lst:
@@ -149,7 +230,12 @@ class GridWalker(urwid.ListWalker):
def _insert(self, pos):
self.focus = pos
- self.lst.insert(self.focus, [[""]*self.editor.columns, set([])])
+ self.lst.insert(
+ self.focus,
+ [
+ [c.blank() for c in self.editor.columns], set([])
+ ]
+ )
self.focus_col = 0
self.start_edit()
@@ -160,16 +246,17 @@ class GridWalker(urwid.ListWalker):
return self._insert(min(self.focus + 1, len(self.lst)))
def start_edit(self):
- if self.lst:
+ col = self.editor.columns[self.focus_col]
+ if self.lst and not col.subeditor:
self.editing = GridRow(
self.focus_col, True, self.editor, self.lst[self.focus]
)
- self.editor.master.statusbar.update(footer_editing)
+ self.editor.master.loop.widget.footer.update(FOOTER_EDITING)
self._modified()
def stop_edit(self):
if self.editing:
- self.editor.master.statusbar.update(footer)
+ self.editor.master.loop.widget.footer.update(FOOTER)
self.set_current_value(self.editing.get_edit_value(), False)
self.editing = False
self._modified()
@@ -179,12 +266,12 @@ class GridWalker(urwid.ListWalker):
self._modified()
def right(self):
- self.focus_col = min(self.focus_col + 1, self.editor.columns-1)
+ self.focus_col = min(self.focus_col + 1, len(self.editor.columns)-1)
self._modified()
def tab_next(self):
self.stop_edit()
- if self.focus_col < self.editor.columns-1:
+ if self.focus_col < len(self.editor.columns)-1:
self.focus_col += 1
elif self.focus != len(self.lst)-1:
self.focus_col = 0
@@ -207,6 +294,7 @@ class GridWalker(urwid.ListWalker):
def set_focus(self, focus):
self.stop_edit()
self.focus = focus
+ self._modified()
def get_next(self, pos):
if pos+1 >= len(self.lst):
@@ -231,17 +319,16 @@ FIRST_WIDTH_MIN = 20
class GridEditor(urwid.WidgetWrap):
title = None
columns = None
- headings = None
def __init__(self, master, value, callback, *cb_args, **cb_kwargs):
- value = copy.deepcopy(value)
+ value = self.data_in(copy.deepcopy(value))
self.master, self.value, self.callback = master, value, callback
self.cb_args, self.cb_kwargs = cb_args, cb_kwargs
first_width = 20
if value:
for r in value:
- assert len(r) == self.columns
+ assert len(r) == len(self.columns)
first_width = max(len(r), first_width)
self.first_width = min(first_width, FIRST_WIDTH_MAX)
@@ -250,9 +337,9 @@ class GridEditor(urwid.WidgetWrap):
title = urwid.AttrWrap(title, "heading")
headings = []
- for i, h in enumerate(self.headings):
- c = urwid.Text(h)
- if i == 0 and len(self.headings) > 1:
+ for i, col in enumerate(self.columns):
+ c = urwid.Text(col.heading)
+ if i == 0 and len(self.columns) > 1:
headings.append(("fixed", first_width + 2, c))
else:
headings.append(c)
@@ -268,7 +355,7 @@ class GridEditor(urwid.WidgetWrap):
self.lb,
header = urwid.Pile([title, h])
)
- self.master.statusbar.update("")
+ self.master.loop.widget.footer.update("")
self.show_empty_msg()
def show_empty_msg(self):
@@ -303,6 +390,9 @@ class GridEditor(urwid.WidgetWrap):
except IOError, v:
return str(v)
+ def set_subeditor_value(self, val, focus, focus_col):
+ self.walker.set_value(val, focus, focus_col)
+
def keypress(self, size, key):
if self.walker.editing:
if key in ["esc"]:
@@ -317,13 +407,18 @@ class GridEditor(urwid.WidgetWrap):
return None
key = common.shortcuts(key)
+ column = self.columns[self.walker.focus_col]
if key in ["q", "esc"]:
res = []
for i in self.walker.lst:
- if not i[1] and any([x.strip() for x in i[0]]):
+ if not i[1] and any([x for x in i[0]]):
res.append(i[0])
- self.callback(res, *self.cb_args, **self.cb_kwargs)
- self.master.pop_view()
+ self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs)
+ signals.pop_view_state.send(self)
+ elif key == "G":
+ self.walker.set_focus(0)
+ elif key == "g":
+ self.walker.set_focus(len(self.walker.lst)-1)
elif key in ["h", "left"]:
self.walker.left()
elif key in ["l", "right"]:
@@ -336,26 +431,22 @@ class GridEditor(urwid.WidgetWrap):
self.walker.insert()
elif key == "d":
self.walker.delete_focus()
- elif key == "r":
- if self.walker.get_current_value() is not None:
- self.master.path_prompt("Read file: ", "", self.read_file)
- elif key == "R":
- if self.walker.get_current_value() is not None:
- self.master.path_prompt(
- "Read unescaped file: ", "", self.read_file, True
- )
- elif key == "e":
- o = self.walker.get_current_value()
- if o is not None:
- n = self.master.spawn_editor(o.encode("string-escape"))
- n = utils.clean_hanging_newline(n)
- self.walker.set_current_value(n, False)
- self.walker._modified()
- elif key in ["enter"]:
- self.walker.start_edit()
- elif not self.handle_key(key):
+ elif column.keypress(key, self) and not self.handle_key(key):
return self._w.keypress(size, key)
+ def data_out(self, data):
+ """
+ Called on raw list data, before data is returned through the
+ callback.
+ """
+ return data
+
+ def data_in(self, data):
+ """
+ Called to prepare provided data.
+ """
+ return data
+
def is_error(self, col, val):
"""
Return False, or a string error message.
@@ -373,10 +464,10 @@ class GridEditor(urwid.WidgetWrap):
("a", "add row after cursor"),
("d", "delete row"),
("e", "spawn external editor on current field"),
- ("q", "return to flow view"),
+ ("q", "save changes and exit editor"),
("r", "read value from file"),
("R", "read unescaped value from file"),
- ("esc", "return to flow view/exit field edit mode"),
+ ("esc", "save changes and exit editor"),
("tab", "next field"),
("enter", "edit field"),
]
@@ -396,14 +487,18 @@ class GridEditor(urwid.WidgetWrap):
class QueryEditor(GridEditor):
title = "Editing query"
- columns = 2
- headings = ("Key", "Value")
+ columns = [
+ TextColumn("Key"),
+ TextColumn("Value")
+ ]
class HeaderEditor(GridEditor):
title = "Editing headers"
- columns = 2
- headings = ("Key", "Value")
+ columns = [
+ TextColumn("Key"),
+ TextColumn("Value")
+ ]
def make_help(self):
h = GridEditor.make_help(self)
@@ -431,24 +526,29 @@ class HeaderEditor(GridEditor):
def handle_key(self, key):
if key == "U":
- self.master.prompt_onekey(
- "Add User-Agent header:",
- [(i[0], i[1]) for i in http_uastrings.UASTRINGS],
- self.set_user_agent,
+ signals.status_prompt_onekey.send(
+ prompt = "Add User-Agent header:",
+ keys = [(i[0], i[1]) for i in http_uastrings.UASTRINGS],
+ callback = self.set_user_agent,
)
return True
class URLEncodedFormEditor(GridEditor):
title = "Editing URL-encoded form"
- columns = 2
- headings = ("Key", "Value")
+ columns = [
+ TextColumn("Key"),
+ TextColumn("Value")
+ ]
class ReplaceEditor(GridEditor):
title = "Editing replacement patterns"
- columns = 3
- headings = ("Filter", "Regex", "Replacement")
+ columns = [
+ TextColumn("Filter"),
+ TextColumn("Regex"),
+ TextColumn("Replacement"),
+ ]
def is_error(self, col, val):
if col == 0:
@@ -464,8 +564,11 @@ class ReplaceEditor(GridEditor):
class SetHeadersEditor(GridEditor):
title = "Editing header set patterns"
- columns = 3
- headings = ("Filter", "Header", "Value")
+ columns = [
+ TextColumn("Filter"),
+ TextColumn("Header"),
+ TextColumn("Value"),
+ ]
def is_error(self, col, val):
if col == 0:
@@ -500,24 +603,32 @@ class SetHeadersEditor(GridEditor):
def handle_key(self, key):
if key == "U":
- self.master.prompt_onekey(
- "Add User-Agent header:",
- [(i[0], i[1]) for i in http_uastrings.UASTRINGS],
- self.set_user_agent,
+ signals.status_prompt_onekey.send(
+ prompt = "Add User-Agent header:",
+ keys = [(i[0], i[1]) for i in http_uastrings.UASTRINGS],
+ callback = self.set_user_agent,
)
return True
class PathEditor(GridEditor):
title = "Editing URL path components"
- columns = 1
- headings = ("Component",)
+ columns = [
+ TextColumn("Component"),
+ ]
+
+ def data_in(self, data):
+ return [[i] for i in data]
+
+ def data_out(self, data):
+ return [i[0] for i in data]
class ScriptEditor(GridEditor):
title = "Editing scripts"
- columns = 1
- headings = ("Command",)
+ columns = [
+ TextColumn("Command"),
+ ]
def is_error(self, col, val):
try:
@@ -528,11 +639,69 @@ class ScriptEditor(GridEditor):
class HostPatternEditor(GridEditor):
title = "Editing host patterns"
- columns = 1
- headings = ("Regex (matched on hostname:port / ip:port)",)
+ columns = [
+ TextColumn("Regex (matched on hostname:port / ip:port)")
+ ]
def is_error(self, col, val):
try:
re.compile(val, re.IGNORECASE)
except re.error as e:
return "Invalid regex: %s" % str(e)
+
+ def data_in(self, data):
+ return [[i] for i in data]
+
+ def data_out(self, data):
+ return [i[0] for i in data]
+
+
+class CookieEditor(GridEditor):
+ title = "Editing request Cookie header"
+ columns = [
+ TextColumn("Name"),
+ TextColumn("Value"),
+ ]
+
+
+class CookieAttributeEditor(GridEditor):
+ title = "Editing Set-Cookie attributes"
+ columns = [
+ TextColumn("Name"),
+ TextColumn("Value"),
+ ]
+
+ def data_out(self, data):
+ ret = []
+ for i in data:
+ if not i[1]:
+ ret.append([i[0], None])
+ else:
+ ret.append(i)
+ return ret
+
+
+class SetCookieEditor(GridEditor):
+ title = "Editing response SetCookie header"
+ columns = [
+ TextColumn("Name"),
+ TextColumn("Value"),
+ SubgridColumn("Attributes", CookieAttributeEditor),
+ ]
+
+ def data_in(self, data):
+ flattened = []
+ for k, v in data.items():
+ flattened.append([k, v[0], v[1].lst])
+ return flattened
+
+ def data_out(self, data):
+ vals = []
+ for i in data:
+ vals.append(
+ [
+ i[0],
+ [i[1], odict.ODictCaseless(i[2])]
+ ]
+ )
+ return odict.ODict(vals)
diff --git a/libmproxy/console/help.py b/libmproxy/console/help.py
index 6bb49a92..cbd5bef8 100644
--- a/libmproxy/console/help.py
+++ b/libmproxy/console/help.py
@@ -2,7 +2,7 @@ from __future__ import absolute_import
import urwid
-from . import common
+from . import common, signals
from .. import filt, version
footer = [
@@ -12,8 +12,7 @@ footer = [
class HelpView(urwid.ListBox):
- def __init__(self, master, help_context, state):
- self.master, self.state = master, state
+ def __init__(self, help_context):
self.help_context = help_context or []
urwid.ListBox.__init__(
self,
@@ -29,6 +28,7 @@ class HelpView(urwid.ListBox):
keys = [
("j, k", "down, up"),
("h, l", "left, right (in some contexts)"),
+ ("g, G", "go to end, beginning"),
("space", "page down"),
("pg up/down", "page up/down"),
("arrows", "up, down, left, right"),
@@ -38,92 +38,11 @@ class HelpView(urwid.ListBox):
text.append(urwid.Text([("head", "\n\nGlobal keys:\n")]))
keys = [
("c", "client replay"),
- ("H", "edit global header set patterns"),
- ("I", "set ignore pattern"),
("i", "set interception pattern"),
- ("M", "change global default display mode"),
- (None,
- common.highlight_key("automatic", "a") +
- [("text", ": automatic detection")]
- ),
- (None,
- common.highlight_key("hex", "e") +
- [("text", ": Hex")]
- ),
- (None,
- common.highlight_key("html", "h") +
- [("text", ": HTML")]
- ),
- (None,
- common.highlight_key("image", "i") +
- [("text", ": Image")]
- ),
- (None,
- common.highlight_key("javascript", "j") +
- [("text", ": JavaScript")]
- ),
- (None,
- common.highlight_key("json", "s") +
- [("text", ": JSON")]
- ),
- (None,
- common.highlight_key("css", "c") +
- [("text", ": CSS")]
- ),
- (None,
- common.highlight_key("urlencoded", "u") +
- [("text", ": URL-encoded data")]
- ),
- (None,
- common.highlight_key("raw", "r") +
- [("text", ": raw data")]
- ),
- (None,
- common.highlight_key("xml", "x") +
- [("text", ": XML")]
- ),
- (None,
- common.highlight_key("wbxml", "w") +
- [("text", ": WBXML")]
- ),
- (None,
- common.highlight_key("amf", "f") +
- [("text", ": AMF (requires PyAMF)")]
- ),
- ("o", "toggle options:"),
- (None,
- common.highlight_key("anticache", "a") +
- [("text", ": prevent cached responses")]
- ),
- (None,
- common.highlight_key("anticomp", "c") +
- [("text", ": prevent compressed responses")]
- ),
- (None,
- common.highlight_key("showhost", "h") +
- [("text", ": use Host header for URL display")]
- ),
- (None,
- common.highlight_key("killextra", "k") +
- [("text", ": kill requests not part of server replay")]
- ),
- (None,
- common.highlight_key("norefresh", "n") +
- [("text", ": disable server replay response refresh")]
- ),
- (None,
- common.highlight_key("upstream certs", "u") +
- [("text", ": sniff cert info from upstream server")]
- ),
-
- ("q", "quit / return to flow list"),
+ ("o", "options"),
+ ("q", "quit / return to previous page"),
("Q", "quit without confirm prompt"),
- ("R", "edit replacement patterns"),
- ("s", "add/remove scripts"),
("S", "server replay"),
- ("t", "set sticky cookie expression"),
- ("T", "set tcp proxying pattern"),
- ("u", "set sticky auth expression"),
]
text.extend(
common.format_keyvals(keys, key="key", val="text", indent=4)
@@ -180,11 +99,12 @@ class HelpView(urwid.ListBox):
def keypress(self, size, key):
key = common.shortcuts(key)
if key == "q":
- self.master.statusbar = self.state[0]
- self.master.body = self.state[1]
- self.master.header = self.state[2]
- self.master.loop.widget = self.master.make_view()
+ signals.pop_view_state.send(self)
return None
elif key == "?":
key = None
+ elif key == "G":
+ self.set_focus(0)
+ elif key == "g":
+ self.set_focus(len(self.body.contents))
return urwid.ListBox.keypress(self, size, key)
diff --git a/libmproxy/console/options.py b/libmproxy/console/options.py
new file mode 100644
index 00000000..c728123f
--- /dev/null
+++ b/libmproxy/console/options.py
@@ -0,0 +1,268 @@
+import urwid
+
+from . import common, signals, grideditor, contentview
+from . import select, palettes
+
+footer = [
+ ('heading_key', "enter/space"), ":toggle ",
+ ('heading_key', "C"), ":clear all ",
+]
+
+def _mkhelp():
+ text = []
+ keys = [
+ ("enter/space", "activate option"),
+ ("C", "clear all options"),
+ ]
+ text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
+ return text
+help_context = _mkhelp()
+
+
+class Options(urwid.WidgetWrap):
+ def __init__(self, master):
+ self.master = master
+ self.lb = select.Select(
+ [
+ select.Heading("Traffic Manipulation"),
+ select.Option(
+ "Header Set Patterns",
+ "H",
+ lambda: master.setheaders.count(),
+ self.setheaders
+ ),
+ select.Option(
+ "Ignore Patterns",
+ "I",
+ lambda: master.server.config.check_ignore,
+ self.ignorepatterns
+ ),
+ select.Option(
+ "Replacement Patterns",
+ "R",
+ lambda: master.replacehooks.count(),
+ self.replacepatterns
+ ),
+ select.Option(
+ "Scripts",
+ "S",
+ lambda: master.scripts,
+ self.scripts
+ ),
+
+ select.Heading("Interface"),
+ select.Option(
+ "Default Display Mode",
+ "M",
+ self.has_default_displaymode,
+ self.default_displaymode
+ ),
+ select.Option(
+ "Palette",
+ "P",
+ lambda: self.master.palette != palettes.DEFAULT,
+ self.palette
+ ),
+ select.Option(
+ "Show Host",
+ "w",
+ lambda: master.showhost,
+ self.toggle_showhost
+ ),
+
+ select.Heading("Network"),
+ select.Option(
+ "No Upstream Certs",
+ "U",
+ lambda: master.server.config.no_upstream_cert,
+ self.toggle_upstream_cert
+ ),
+ select.Option(
+ "TCP Proxying",
+ "T",
+ lambda: master.server.config.check_tcp,
+ self.tcp_proxy
+ ),
+
+ select.Heading("Utility"),
+ select.Option(
+ "Anti-Cache",
+ "a",
+ lambda: master.anticache,
+ self.toggle_anticache
+ ),
+ select.Option(
+ "Anti-Compression",
+ "o",
+ lambda: master.anticomp,
+ self.toggle_anticomp
+ ),
+ select.Option(
+ "Kill Extra",
+ "x",
+ lambda: master.killextra,
+ self.toggle_killextra
+ ),
+ select.Option(
+ "No Refresh",
+ "f",
+ lambda: not master.refresh_server_playback,
+ self.toggle_refresh_server_playback
+ ),
+ select.Option(
+ "Sticky Auth",
+ "A",
+ lambda: master.stickyauth_txt,
+ self.sticky_auth
+ ),
+ select.Option(
+ "Sticky Cookies",
+ "t",
+ lambda: master.stickycookie_txt,
+ self.sticky_cookie
+ ),
+ ]
+ )
+ title = urwid.Text("Options")
+ title = urwid.Padding(title, align="left", width=("relative", 100))
+ title = urwid.AttrWrap(title, "heading")
+ self._w = urwid.Frame(
+ self.lb,
+ header = title
+ )
+ self.master.loop.widget.footer.update("")
+ signals.update_settings.connect(self.sig_update_settings)
+
+ def sig_update_settings(self, sender):
+ self.lb.walker._modified()
+
+ def keypress(self, size, key):
+ if key == "C":
+ self.clearall()
+ return None
+ return super(self.__class__, self).keypress(size, key)
+
+ def clearall(self):
+ self.master.anticache = False
+ self.master.anticomp = False
+ self.master.killextra = False
+ self.master.showhost = False
+ self.master.refresh_server_playback = True
+ self.master.server.config.no_upstream_cert = False
+ self.master.setheaders.clear()
+ self.master.replacehooks.clear()
+ self.master.set_ignore_filter([])
+ self.master.set_tcp_filter([])
+ self.master.scripts = []
+ self.master.set_stickyauth(None)
+ self.master.set_stickycookie(None)
+ self.master.state.default_body_view = contentview.get("Auto")
+
+ signals.update_settings.send(self)
+ signals.status_message.send(
+ message = "All select.Options cleared",
+ expire = 1
+ )
+
+ def toggle_anticache(self):
+ self.master.anticache = not self.master.anticache
+
+ def toggle_anticomp(self):
+ self.master.anticomp = not self.master.anticomp
+
+ def toggle_killextra(self):
+ self.master.killextra = not self.master.killextra
+
+ def toggle_showhost(self):
+ self.master.showhost = not self.master.showhost
+
+ def toggle_refresh_server_playback(self):
+ self.master.refresh_server_playback = not self.master.refresh_server_playback
+
+ def toggle_upstream_cert(self):
+ self.master.server.config.no_upstream_cert = not self.master.server.config.no_upstream_cert
+ signals.update_settings.send(self)
+
+ def setheaders(self):
+ def _set(*args, **kwargs):
+ self.master.setheaders.set(*args, **kwargs)
+ signals.update_settings.send(self)
+ self.master.view_grideditor(
+ grideditor.SetHeadersEditor(
+ self.master,
+ self.master.setheaders.get_specs(),
+ _set
+ )
+ )
+
+ def ignorepatterns(self):
+ def _set(ignore):
+ self.master.set_ignore_filter(ignore)
+ signals.update_settings.send(self)
+ self.master.view_grideditor(
+ grideditor.HostPatternEditor(
+ self.master,
+ self.master.get_ignore_filter(),
+ _set
+ )
+ )
+
+ def replacepatterns(self):
+ def _set(*args, **kwargs):
+ self.master.replacehooks.set(*args, **kwargs)
+ signals.update_settings.send(self)
+ self.master.view_grideditor(
+ grideditor.ReplaceEditor(
+ self.master,
+ self.master.replacehooks.get_specs(),
+ _set
+ )
+ )
+
+ def scripts(self):
+ self.master.view_grideditor(
+ grideditor.ScriptEditor(
+ self.master,
+ [[i.command] for i in self.master.scripts],
+ self.master.edit_scripts
+ )
+ )
+
+ def default_displaymode(self):
+ signals.status_prompt_onekey.send(
+ prompt = "Global default display mode",
+ keys = contentview.view_prompts,
+ callback = self.master.change_default_display_mode
+ )
+
+ def has_default_displaymode(self):
+ return self.master.state.default_body_view.name != "Auto"
+
+ def tcp_proxy(self):
+ def _set(tcp):
+ self.master.set_tcp_filter(tcp)
+ signals.update_settings.send(self)
+ self.master.view_grideditor(
+ grideditor.HostPatternEditor(
+ self.master,
+ self.master.get_tcp_filter(),
+ _set
+ )
+ )
+
+ def sticky_auth(self):
+ signals.status_prompt.send(
+ prompt = "Sticky auth filter",
+ text = self.master.stickyauth_txt,
+ callback = self.master.set_stickyauth
+ )
+
+ def sticky_cookie(self):
+ signals.status_prompt.send(
+ prompt = "Sticky cookie filter",
+ text = self.master.stickycookie_txt,
+ callback = self.master.set_stickycookie
+ )
+
+ def palette(self):
+ self.master.view_palette_picker()
diff --git a/libmproxy/console/palettepicker.py b/libmproxy/console/palettepicker.py
new file mode 100644
index 00000000..7e2c10cd
--- /dev/null
+++ b/libmproxy/console/palettepicker.py
@@ -0,0 +1,81 @@
+import urwid
+
+from . import select, common, palettes, signals
+
+footer = [
+ ('heading_key', "enter/space"), ":select",
+]
+
+
+def _mkhelp():
+ text = []
+ keys = [
+ ("enter/space", "select"),
+ ]
+ text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
+ return text
+help_context = _mkhelp()
+
+
+class PalettePicker(urwid.WidgetWrap):
+ def __init__(self, master):
+ self.master = master
+ low, high = [], []
+ for k, v in palettes.palettes.items():
+ if v.high:
+ high.append(k)
+ else:
+ low.append(k)
+ high.sort()
+ low.sort()
+
+ options = [
+ select.Heading("High Colour")
+ ]
+
+ def mkopt(name):
+ return select.Option(
+ i,
+ None,
+ lambda: self.master.palette == name,
+ lambda: self.select(name)
+ )
+
+ for i in high:
+ options.append(mkopt(i))
+ options.append(select.Heading("Low Colour"))
+ for i in low:
+ options.append(mkopt(i))
+
+ options.extend(
+ [
+ select.Heading("Options"),
+ select.Option(
+ "Transparent",
+ "T",
+ lambda: master.palette_transparent,
+ self.toggle_palette_transparent
+ )
+ ]
+ )
+
+ self.lb = select.Select(options)
+ title = urwid.Text("Palettes")
+ title = urwid.Padding(title, align="left", width=("relative", 100))
+ title = urwid.AttrWrap(title, "heading")
+ self._w = urwid.Frame(
+ self.lb,
+ header = title
+ )
+ signals.update_settings.connect(self.sig_update_settings)
+
+ def sig_update_settings(self, sender):
+ self.lb.walker._modified()
+
+ def select(self, name):
+ self.master.set_palette(name)
+
+ def toggle_palette_transparent(self):
+ self.master.palette_transparent = not self.master.palette_transparent
+ self.master.set_palette(self.master.palette)
+ signals.update_settings.send(self)
diff --git a/libmproxy/console/palettes.py b/libmproxy/console/palettes.py
index cfb2702c..6490eb73 100644
--- a/libmproxy/console/palettes.py
+++ b/libmproxy/console/palettes.py
@@ -1,4 +1,3 @@
-
# Low-color themes should ONLY use the standard foreground and background
# colours listed here:
#
@@ -6,9 +5,9 @@
#
-
class Palette:
_fields = [
+ 'background',
'title',
# Status bar & heading
@@ -17,6 +16,10 @@ class Palette:
# Help
'key', 'head', 'text',
+ # Options
+ 'option_selected', 'option_active', 'option_active_selected',
+ 'option_selected_key',
+
# List and Connections
'method', 'focus',
'code_200', 'code_300', 'code_400', 'code_500', 'code_other',
@@ -31,15 +34,33 @@ class Palette:
]
high = None
- def palette(self):
+ def palette(self, transparent):
l = []
+ highback, lowback = None, None
+ if not transparent:
+ if self.high and self.high.get("background"):
+ highback = self.high["background"][1]
+ lowback = self.low["background"][1]
+
for i in self._fields:
- v = [i]
- v.extend(self.low[i])
- if self.high and i in self.high:
- v.append(None)
- v.extend(self.high[i])
- l.append(tuple(v))
+ if transparent and i == "background":
+ l.append(["background", "default", "default"])
+ else:
+ v = [i]
+ low = list(self.low[i])
+ if lowback and low[1] == "default":
+ low[1] = lowback
+ v.extend(low)
+ if self.high and i in self.high:
+ v.append(None)
+ high = list(self.high[i])
+ if highback and high[1] == "default":
+ high[1] = highback
+ v.extend(high)
+ elif highback and self.low[i][1] == "default":
+ high = [None, low[0], highback]
+ v.extend(high)
+ l.append(tuple(v))
return l
@@ -48,18 +69,25 @@ class LowDark(Palette):
Low-color dark background
"""
low = dict(
+ background = ('white', 'black'),
title = ('white,bold', 'default'),
# Status bar & heading
- heading = ('light gray', 'dark blue'),
+ heading = ('white', 'dark blue'),
heading_key = ('light cyan', 'dark blue'),
- heading_inactive = ('white', 'dark gray'),
+ heading_inactive = ('dark gray', 'light gray'),
# Help
key = ('light cyan', 'default'),
head = ('white,bold', 'default'),
text = ('light gray', 'default'),
+ # Options
+ option_selected = ('black', 'light gray'),
+ option_selected_key = ('light cyan', 'light gray'),
+ option_active = ('light red', 'default'),
+ option_active_selected = ('light red', 'light gray'),
+
# List and Connections
method = ('dark cyan', 'default'),
focus = ('yellow', 'default'),
@@ -92,6 +120,10 @@ class Dark(LowDark):
high = dict(
heading_inactive = ('g58', 'g11'),
intercept = ('#f60', 'default'),
+
+ option_selected = ('g85', 'g45'),
+ option_selected_key = ('light cyan', 'g50'),
+ option_active_selected = ('light red', 'g50'),
)
@@ -100,18 +132,25 @@ class LowLight(Palette):
Low-color light background
"""
low = dict(
- title = ('dark magenta,bold', 'light blue'),
+ background = ('black', 'white'),
+ title = ('dark magenta', 'default'),
# Status bar & heading
- heading = ('light gray', 'dark blue'),
- heading_key = ('light cyan', 'dark blue'),
+ heading = ('white', 'black'),
+ heading_key = ('dark blue', 'black'),
heading_inactive = ('black', 'light gray'),
# Help
- key = ('dark blue,bold', 'default'),
- head = ('black,bold', 'default'),
+ key = ('dark blue', 'default'),
+ head = ('black', 'default'),
text = ('dark gray', 'default'),
+ # Options
+ option_selected = ('black', 'light gray'),
+ option_selected_key = ('dark blue', 'light gray'),
+ option_active = ('light red', 'default'),
+ option_active_selected = ('light red', 'light gray'),
+
# List and Connections
method = ('dark cyan', 'default'),
focus = ('black', 'default'),
@@ -142,10 +181,15 @@ class LowLight(Palette):
class Light(LowLight):
high = dict(
+ background = ('black', 'g100'),
heading = ('g99', '#08f'),
heading_key = ('#0ff,bold', '#08f'),
heading_inactive = ('g35', 'g85'),
replay = ('#0a0,bold', 'default'),
+
+ option_selected = ('black', 'g85'),
+ option_selected_key = ('dark blue', 'g85'),
+ option_active_selected = ('light red', 'g85'),
)
@@ -155,10 +199,10 @@ sol_base03 = "h234"
sol_base02 = "h235"
sol_base01 = "h240"
sol_base00 = "h241"
-sol_base0 = "h244"
-sol_base1 = "h245"
-sol_base2 = "h254"
-sol_base3 = "h230"
+sol_base0 = "h244"
+sol_base1 = "h245"
+sol_base2 = "h254"
+sol_base3 = "h230"
sol_yellow = "h136"
sol_orange = "h166"
sol_red = "h160"
@@ -167,9 +211,12 @@ sol_violet = "h61"
sol_blue = "h33"
sol_cyan = "h37"
sol_green = "h64"
+
+
class SolarizedLight(LowLight):
high = dict(
- title = (sol_blue, 'default'),
+ background = (sol_base00, sol_base3),
+ title = (sol_cyan, 'default'),
text = (sol_base00, 'default'),
# Status bar & heading
@@ -181,6 +228,12 @@ class SolarizedLight(LowLight):
key = (sol_blue, 'default',),
head = (sol_base00, 'default'),
+ # Options
+ option_selected = (sol_base03, sol_base2),
+ option_selected_key = (sol_blue, sol_base2),
+ option_active = (sol_orange, 'default'),
+ option_active_selected = (sol_orange, sol_base2),
+
# List and Connections
method = (sol_cyan, 'default'),
focus = (sol_base01, 'default'),
@@ -193,7 +246,7 @@ class SolarizedLight(LowLight):
error = (sol_red, 'default'),
- header = (sol_base01, 'default'),
+ header = (sol_blue, 'default'),
highlight = (sol_base01, 'default'),
intercept = (sol_red, 'default',),
replay = (sol_green, 'default',),
@@ -211,17 +264,24 @@ class SolarizedLight(LowLight):
class SolarizedDark(LowDark):
high = dict(
+ background = (sol_base2, sol_base03),
title = (sol_blue, 'default'),
- text = (sol_base0, 'default'),
+ text = (sol_base1, 'default'),
# Status bar & heading
- heading = (sol_base03, sol_base1),
- heading_key = (sol_blue+",bold", sol_base1),
+ heading = (sol_base2, sol_base01),
+ heading_key = (sol_blue+",bold", sol_base01),
heading_inactive = (sol_base1, sol_base02),
# Help
key = (sol_blue, 'default',),
- head = (sol_base00, 'default'),
+ head = (sol_base2, 'default'),
+
+ # Options
+ option_selected = (sol_base03, sol_base00),
+ option_selected_key = (sol_blue, sol_base00),
+ option_active = (sol_orange, 'default'),
+ option_active_selected = (sol_orange, sol_base00),
# List and Connections
method = (sol_cyan, 'default'),
@@ -235,7 +295,7 @@ class SolarizedDark(LowDark):
error = (sol_red, 'default'),
- header = (sol_base01, 'default'),
+ header = (sol_blue, 'default'),
highlight = (sol_base01, 'default'),
intercept = (sol_red, 'default',),
replay = (sol_green, 'default',),
@@ -251,6 +311,7 @@ class SolarizedDark(LowDark):
)
+DEFAULT = "dark"
palettes = {
"lowlight": LowLight(),
"lowdark": LowDark(),
diff --git a/libmproxy/console/pathedit.py b/libmproxy/console/pathedit.py
new file mode 100644
index 00000000..53cda3be
--- /dev/null
+++ b/libmproxy/console/pathedit.py
@@ -0,0 +1,69 @@
+import glob
+import os.path
+
+import urwid
+
+
+class _PathCompleter:
+ def __init__(self, _testing=False):
+ """
+ _testing: disables reloading of the lookup table to make testing
+ possible.
+ """
+ self.lookup, self.offset = None, None
+ self.final = None
+ self._testing = _testing
+
+ def reset(self):
+ self.lookup = None
+ self.offset = -1
+
+ def complete(self, txt):
+ """
+ Returns the next completion for txt, or None if there is no
+ completion.
+ """
+ path = os.path.expanduser(txt)
+ if not self.lookup:
+ if not self._testing:
+ # Lookup is a set of (display value, actual value) tuples.
+ self.lookup = []
+ if os.path.isdir(path):
+ files = glob.glob(os.path.join(path, "*"))
+ prefix = txt
+ else:
+ files = glob.glob(path+"*")
+ prefix = os.path.dirname(txt)
+ prefix = prefix or "./"
+ for f in files:
+ display = os.path.join(prefix, os.path.basename(f))
+ if os.path.isdir(f):
+ display += "/"
+ self.lookup.append((display, f))
+ if not self.lookup:
+ self.final = path
+ return path
+ self.lookup.sort()
+ self.offset = -1
+ self.lookup.append((txt, txt))
+ self.offset += 1
+ if self.offset >= len(self.lookup):
+ self.offset = 0
+ ret = self.lookup[self.offset]
+ self.final = ret[1]
+ return ret[0]
+
+
+class PathEdit(urwid.Edit, _PathCompleter):
+ def __init__(self, *args, **kwargs):
+ urwid.Edit.__init__(self, *args, **kwargs)
+ _PathCompleter.__init__(self)
+
+ def keypress(self, size, key):
+ if key == "tab":
+ comp = self.complete(self.get_edit_text())
+ self.set_edit_text(comp)
+ self.set_edit_pos(len(comp))
+ else:
+ self.reset()
+ return urwid.Edit.keypress(self, size, key)
diff --git a/libmproxy/console/searchable.py b/libmproxy/console/searchable.py
new file mode 100644
index 00000000..a9572ae3
--- /dev/null
+++ b/libmproxy/console/searchable.py
@@ -0,0 +1,91 @@
+import urwid
+
+from . import signals
+
+
+class Highlight(urwid.AttrMap):
+ def __init__(self, t):
+ urwid.AttrMap.__init__(
+ self,
+ urwid.Text(t.text),
+ "focusfield",
+ )
+ self.backup = t
+
+
+class Searchable(urwid.ListBox):
+ def __init__(self, state, contents):
+ self.walker = urwid.SimpleFocusListWalker(contents)
+ urwid.ListBox.__init__(self, self.walker)
+ self.state = state
+ self.search_offset = 0
+ self.current_highlight = None
+ self.search_term = None
+
+ def keypress(self, size, key):
+ if key == "/":
+ signals.status_prompt.send(
+ prompt = "Search for",
+ text = "",
+ callback = self.set_search
+ )
+ elif key == "n":
+ self.find_next(False)
+ elif key == "N":
+ self.find_next(True)
+ elif key == "G":
+ self.set_focus(0)
+ self.walker._modified()
+ elif key == "g":
+ self.set_focus(len(self.walker)-1)
+ self.walker._modified()
+ else:
+ return super(self.__class__, self).keypress(size, key)
+
+ def set_search(self, text):
+ self.state.last_search = text
+ self.search_term = text or None
+ self.find_next(False)
+
+ def set_highlight(self, offset):
+ if self.current_highlight is not None:
+ old = self.body[self.current_highlight]
+ self.body[self.current_highlight] = old.backup
+ if offset is None:
+ self.current_highlight = None
+ else:
+ self.body[offset] = Highlight(self.body[offset])
+ self.current_highlight = offset
+
+ def get_text(self, w):
+ if isinstance(w, urwid.Text):
+ return w.text
+ elif isinstance(w, Highlight):
+ return w.backup.text
+ else:
+ return None
+
+ def find_next(self, backwards):
+ if not self.search_term:
+ if self.state.last_search:
+ self.search_term = self.state.last_search
+ else:
+ self.set_highlight(None)
+ return
+ # Start search at focus + 1
+ if backwards:
+ rng = xrange(len(self.body)-1, -1, -1)
+ else:
+ rng = xrange(1, len(self.body) + 1)
+ for i in rng:
+ off = (self.focus_position + i)%len(self.body)
+ w = self.body[off]
+ txt = self.get_text(w)
+ if txt and self.search_term in txt:
+ self.set_highlight(off)
+ self.set_focus(off, coming_from="above")
+ self.body._modified()
+ return
+ else:
+ self.set_highlight(None)
+ signals.status_message.send(message="Search not found.", expire=1)
diff --git a/libmproxy/console/select.py b/libmproxy/console/select.py
new file mode 100644
index 00000000..61ee50e4
--- /dev/null
+++ b/libmproxy/console/select.py
@@ -0,0 +1,107 @@
+import urwid
+
+from . import common
+
+class _OptionWidget(urwid.WidgetWrap):
+ def __init__(self, option, text, shortcut, active, focus):
+ self.option = option
+ textattr = "text"
+ keyattr = "key"
+ if focus and active:
+ textattr = "option_active_selected"
+ keyattr = "option_selected_key"
+ elif focus:
+ textattr = "option_selected"
+ keyattr = "option_selected_key"
+ elif active:
+ textattr = "option_active"
+ if shortcut:
+ text = common.highlight_key(
+ text,
+ shortcut,
+ textattr = textattr,
+ keyattr = keyattr
+ )
+ opt = urwid.Text(text, align="left")
+ opt = urwid.AttrWrap(opt, textattr)
+ opt = urwid.Padding(opt, align = "center", width = 40)
+ urwid.WidgetWrap.__init__(self, opt)
+
+ def keypress(self, size, key):
+ return key
+
+ def selectable(self):
+ return True
+
+
+class OptionWalker(urwid.ListWalker):
+ def __init__(self, options):
+ urwid.ListWalker.__init__(self)
+ self.options = options
+ self.focus = 0
+
+ def set_focus(self, pos):
+ self.focus = pos
+
+ def get_focus(self):
+ return self.options[self.focus].render(True), self.focus
+
+ def get_next(self, pos):
+ if pos >= len(self.options)-1:
+ return None, None
+ return self.options[pos+1].render(False), pos+1
+
+ def get_prev(self, pos):
+ if pos <= 0:
+ return None, None
+ return self.options[pos-1].render(False), pos-1
+
+
+class Heading:
+ def __init__(self, text):
+ self.text = text
+
+ def render(self, focus):
+ opt = urwid.Text("\n" + self.text, align="left")
+ opt = urwid.AttrWrap(opt, "title")
+ opt = urwid.Padding(opt, align = "center", width = 40)
+ return opt
+
+
+_neg = lambda: False
+class Option:
+ def __init__(self, text, shortcut, getstate=None, activate=None):
+ self.text = text
+ self.shortcut = shortcut
+ self.getstate = getstate or _neg
+ self.activate = activate or _neg
+
+ def render(self, focus):
+ return _OptionWidget(self, self.text, self.shortcut, self.getstate(), focus)
+
+
+class Select(urwid.ListBox):
+ def __init__(self, options):
+ self.walker = OptionWalker(options)
+ urwid.ListBox.__init__(
+ self,
+ self.walker
+ )
+ self.options = options
+ self.keymap = {}
+ for i in options:
+ if hasattr(i, "shortcut") and i.shortcut:
+ if i.shortcut in self.keymap:
+ raise ValueError("Duplicate shortcut key: %s"%i.shortcut)
+ self.keymap[i.shortcut] = i
+
+ def keypress(self, size, key):
+ if key == "enter" or key == " ":
+ self.get_focus()[0].option.activate()
+ return None
+ key = common.shortcuts(key)
+ if key in self.keymap:
+ self.keymap[key].activate()
+ self.set_focus(self.options.index(self.keymap[key]))
+ return None
+ return super(self.__class__, self).keypress(size, key)
diff --git a/libmproxy/console/signals.py b/libmproxy/console/signals.py
new file mode 100644
index 00000000..c1bcf201
--- /dev/null
+++ b/libmproxy/console/signals.py
@@ -0,0 +1,32 @@
+import blinker
+
+# Show a status message in the action bar
+status_message = blinker.Signal()
+
+# Prompt for input
+status_prompt = blinker.Signal()
+
+# Prompt for a path
+status_prompt_path = blinker.Signal()
+
+# Prompt for a single keystroke
+status_prompt_onekey = blinker.Signal()
+
+# Call a callback in N seconds
+call_in = blinker.Signal()
+
+# Focus the body, footer or header of the main window
+focus = blinker.Signal()
+
+# Fired when settings change
+update_settings = blinker.Signal()
+
+# Fired when a flow changes
+flow_change = blinker.Signal()
+
+# Fired when the flow list or focus changes
+flowlist_change = blinker.Signal()
+
+# Pop and push view state onto a stack
+pop_view_state = blinker.Signal()
+push_view_state = blinker.Signal()
diff --git a/libmproxy/console/statusbar.py b/libmproxy/console/statusbar.py
new file mode 100644
index 00000000..37ceef94
--- /dev/null
+++ b/libmproxy/console/statusbar.py
@@ -0,0 +1,254 @@
+import os.path
+
+import urwid
+
+import netlib.utils
+from . import pathedit, signals, common
+from .. import utils
+
+
+class ActionBar(urwid.WidgetWrap):
+ def __init__(self):
+ urwid.WidgetWrap.__init__(self, None)
+ self.clear()
+ signals.status_message.connect(self.sig_message)
+ signals.status_prompt.connect(self.sig_prompt)
+ signals.status_prompt_path.connect(self.sig_path_prompt)
+ signals.status_prompt_onekey.connect(self.sig_prompt_onekey)
+
+ self.last_path = ""
+
+ self.prompting = False
+ self.onekey = False
+ self.pathprompt = False
+
+ def sig_message(self, sender, message, expire=None):
+ w = urwid.Text(message)
+ self._w = w
+ if expire:
+ def cb(*args):
+ if w == self._w:
+ self.clear()
+ signals.call_in.send(seconds=expire, callback=cb)
+
+ def prep_prompt(self, p):
+ return p.strip() + ": "
+
+ def sig_prompt(self, sender, prompt, text, callback, args=()):
+ signals.focus.send(self, section="footer")
+ self._w = urwid.Edit(self.prep_prompt(prompt), text or "")
+ self.prompting = (callback, args)
+
+ def sig_path_prompt(self, sender, prompt, callback, args=()):
+ signals.focus.send(self, section="footer")
+ self._w = pathedit.PathEdit(
+ self.prep_prompt(prompt),
+ os.path.dirname(self.last_path)
+ )
+ self.pathprompt = True
+ self.prompting = (callback, args)
+
+ def sig_prompt_onekey(self, sender, prompt, keys, callback, args=()):
+ """
+ Keys are a set of (word, key) tuples. The appropriate key in the
+ word is highlighted.
+ """
+ signals.focus.send(self, section="footer")
+ prompt = [prompt, " ("]
+ mkup = []
+ for i, e in enumerate(keys):
+ mkup.extend(common.highlight_key(e[0], e[1]))
+ if i < len(keys)-1:
+ mkup.append(",")
+ prompt.extend(mkup)
+ prompt.append(")? ")
+ self.onekey = set(i[1] for i in keys)
+ self._w = urwid.Edit(prompt, "")
+ self.prompting = (callback, args)
+
+ def selectable(self):
+ return True
+
+ def keypress(self, size, k):
+ if self.prompting:
+ if k == "esc":
+ self.prompt_done()
+ elif self.onekey:
+ if k == "enter":
+ self.prompt_done()
+ elif k in self.onekey:
+ self.prompt_execute(k)
+ elif k == "enter":
+ self.prompt_execute(self._w.get_edit_text())
+ else:
+ if common.is_keypress(k):
+ self._w.keypress(size, k)
+ else:
+ return k
+
+ def clear(self):
+ self._w = urwid.Text("")
+
+ def prompt_done(self):
+ self.prompting = False
+ self.onekey = False
+ self.pathprompt = False
+ signals.status_message.send(message="")
+ signals.focus.send(self, section="body")
+
+ def prompt_execute(self, txt):
+ if self.pathprompt:
+ self.last_path = txt
+ p, args = self.prompting
+ self.prompt_done()
+ msg = p(txt, *args)
+ if msg:
+ signals.status_message.send(message=msg, expire=1)
+
+
+class StatusBar(urwid.WidgetWrap):
+ def __init__(self, master, helptext):
+ self.master, self.helptext = master, helptext
+ self.ab = ActionBar()
+ self.ib = urwid.WidgetWrap(urwid.Text(""))
+ self._w = urwid.Pile([self.ib, self.ab])
+ signals.update_settings.connect(self.sig_update_settings)
+ signals.flowlist_change.connect(self.sig_update_settings)
+ self.redraw()
+
+ def sig_update_settings(self, sender):
+ self.redraw()
+
+ def keypress(self, *args, **kwargs):
+ return self.ab.keypress(*args, **kwargs)
+
+ def get_status(self):
+ r = []
+
+ if self.master.setheaders.count():
+ r.append("[")
+ r.append(("heading_key", "H"))
+ r.append("eaders]")
+ if self.master.replacehooks.count():
+ r.append("[")
+ r.append(("heading_key", "R"))
+ r.append("eplacing]")
+ if self.master.client_playback:
+ r.append("[")
+ r.append(("heading_key", "cplayback"))
+ r.append(":%s to go]"%self.master.client_playback.count())
+ if self.master.server_playback:
+ r.append("[")
+ r.append(("heading_key", "splayback"))
+ if self.master.nopop:
+ r.append(":%s in file]"%self.master.server_playback.count())
+ else:
+ r.append(":%s to go]"%self.master.server_playback.count())
+ if self.master.get_ignore_filter():
+ r.append("[")
+ r.append(("heading_key", "I"))
+ r.append("gnore:%d]" % len(self.master.get_ignore_filter()))
+ if self.master.get_tcp_filter():
+ r.append("[")
+ r.append(("heading_key", "T"))
+ r.append("CP:%d]" % len(self.master.get_tcp_filter()))
+ if self.master.state.intercept_txt:
+ r.append("[")
+ r.append(("heading_key", "i"))
+ r.append(":%s]"%self.master.state.intercept_txt)
+ if self.master.state.limit_txt:
+ r.append("[")
+ r.append(("heading_key", "l"))
+ r.append(":%s]"%self.master.state.limit_txt)
+ if self.master.stickycookie_txt:
+ r.append("[")
+ r.append(("heading_key", "t"))
+ r.append(":%s]"%self.master.stickycookie_txt)
+ if self.master.stickyauth_txt:
+ r.append("[")
+ r.append(("heading_key", "u"))
+ r.append(":%s]"%self.master.stickyauth_txt)
+ if self.master.state.default_body_view.name != "Auto":
+ r.append("[")
+ r.append(("heading_key", "M"))
+ r.append(":%s]"%self.master.state.default_body_view.name)
+
+ opts = []
+ if self.master.anticache:
+ opts.append("anticache")
+ if self.master.anticomp:
+ opts.append("anticomp")
+ if self.master.showhost:
+ opts.append("showhost")
+ if not self.master.refresh_server_playback:
+ opts.append("norefresh")
+ if self.master.killextra:
+ opts.append("killextra")
+ if self.master.server.config.no_upstream_cert:
+ opts.append("no-upstream-cert")
+ if self.master.state.follow_focus:
+ opts.append("following")
+ if self.master.stream_large_bodies:
+ opts.append(
+ "stream:%s" % netlib.utils.pretty_size(
+ self.master.stream_large_bodies.max_size
+ )
+ )
+
+ if opts:
+ r.append("[%s]"%(":".join(opts)))
+
+ if self.master.server.config.mode in ["reverse", "upstream"]:
+ dst = self.master.server.config.mode.dst
+ scheme = "https" if dst[0] else "http"
+ if dst[1] != dst[0]:
+ scheme += "2https" if dst[1] else "http"
+ r.append("[dest:%s]"%utils.unparse_url(scheme, *dst[2:]))
+ if self.master.scripts:
+ r.append("[")
+ r.append(("heading_key", "s"))
+ r.append("cripts:%s]"%len(self.master.scripts))
+ # r.append("[lt:%0.3f]"%self.master.looptime)
+
+ if self.master.stream:
+ r.append("[W:%s]"%self.master.stream_path)
+
+ return r
+
+ def redraw(self):
+ fc = self.master.state.flow_count()
+ if self.master.state.focus is None:
+ offset = 0
+ else:
+ offset = min(self.master.state.focus + 1, fc)
+ t = [
+ ('heading', ("[%s/%s]"%(offset, fc)).ljust(9))
+ ]
+
+ if self.master.server.bound:
+ host = self.master.server.address.host
+ if host == "0.0.0.0":
+ host = "*"
+ boundaddr = "[%s:%s]"%(host, self.master.server.address.port)
+ else:
+ boundaddr = ""
+ t.extend(self.get_status())
+ status = urwid.AttrWrap(urwid.Columns([
+ urwid.Text(t),
+ urwid.Text(
+ [
+ self.helptext,
+ boundaddr
+ ],
+ align="right"
+ ),
+ ]), "heading")
+ self.ib._w = status
+
+ def update(self, text):
+ self.helptext = text
+ self.redraw()
+ self.master.loop.draw_screen()
+
+ def selectable(self):
+ return True
diff --git a/libmproxy/console/tabs.py b/libmproxy/console/tabs.py
new file mode 100644
index 00000000..2c46e59e
--- /dev/null
+++ b/libmproxy/console/tabs.py
@@ -0,0 +1,38 @@
+import urwid
+
+class Tabs(urwid.WidgetWrap):
+ def __init__(self, tabs, tab_offset=0):
+ urwid.WidgetWrap.__init__(self, "")
+ self.tab_offset = tab_offset
+ self.tabs = tabs
+ self.show()
+
+ def _tab(self, content, attr):
+ p = urwid.Text(content, align="center")
+ p = urwid.Padding(p, align="center", width=("relative", 100))
+ p = urwid.AttrWrap(p, attr)
+ return p
+
+ def keypress(self, size, key):
+ if key in ["tab", "l"]:
+ self.tab_offset = (self.tab_offset + 1)%(len(self.tabs))
+ self.show()
+ elif key == "h":
+ self.tab_offset = (self.tab_offset - 1)%(len(self.tabs))
+ self.show()
+ return self._w.keypress(size, key)
+
+ def show(self):
+ headers = []
+ for i in range(len(self.tabs)):
+ txt = self.tabs[i][0]()
+ if i == self.tab_offset:
+ headers.append(self._tab(txt, "heading"))
+ else:
+ headers.append(self._tab(txt, "heading_inactive"))
+ headers = urwid.Columns(headers, dividechars=1)
+ self._w = urwid.Frame(
+ body = self.tabs[self.tab_offset][1](),
+ header = headers
+ )
+ self._w.set_focus("body")
diff --git a/libmproxy/console/window.py b/libmproxy/console/window.py
new file mode 100644
index 00000000..d64e83df
--- /dev/null
+++ b/libmproxy/console/window.py
@@ -0,0 +1,72 @@
+import urwid
+from . import signals
+
+
+class Window(urwid.Frame):
+ def __init__(self, master, body, header, footer, helpctx):
+ urwid.Frame.__init__(
+ self,
+ urwid.AttrWrap(body, "background"),
+ header = urwid.AttrWrap(header, "background") if header else None,
+ footer = urwid.AttrWrap(footer, "background") if footer else None
+ )
+ self.master = master
+ self.helpctx = helpctx
+ signals.focus.connect(self.sig_focus)
+
+ def sig_focus(self, sender, section):
+ self.focus_position = section
+
+ def keypress(self, size, k):
+ k = super(self.__class__, self).keypress(size, k)
+ if k == "?":
+ self.master.view_help(self.helpctx)
+ elif k == "c":
+ if not self.master.client_playback:
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Client replay",
+ callback = self.master.client_playback_path
+ )
+ else:
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Stop current client replay?",
+ keys = (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ callback = self.master.stop_client_playback_prompt,
+ )
+ elif k == "i":
+ signals.status_prompt.send(
+ self,
+ prompt = "Intercept filter",
+ text = self.master.state.intercept_txt,
+ callback = self.master.set_intercept
+ )
+ elif k == "o":
+ self.master.view_options()
+ elif k == "Q":
+ raise urwid.ExitMainLoop
+ elif k == "q":
+ signals.pop_view_state.send(self)
+ elif k == "S":
+ if not self.master.server_playback:
+ signals.status_prompt_path.send(
+ self,
+ prompt = "Server replay path",
+ callback = self.master.server_playback_path
+ )
+ else:
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Stop current server replay?",
+ keys = (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ callback = self.master.stop_server_playback_prompt,
+ )
+ else:
+ return k