aboutsummaryrefslogtreecommitdiffstats
path: root/libmproxy
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@nullcube.com>2015-03-20 11:08:04 +1300
committerAldo Cortesi <aldo@nullcube.com>2015-03-20 11:08:04 +1300
commit2f8ebfdce2165f1bd9196954a1d3bcdfec463494 (patch)
tree0e4f5ae4ce12509245642a0467bd77eeba8efb97 /libmproxy
parent241530eb0aa69c5a69bed979a1a2a3a23d473112 (diff)
downloadmitmproxy-2f8ebfdce2165f1bd9196954a1d3bcdfec463494.tar.gz
mitmproxy-2f8ebfdce2165f1bd9196954a1d3bcdfec463494.tar.bz2
mitmproxy-2f8ebfdce2165f1bd9196954a1d3bcdfec463494.zip
Pull console StatusBar into its own file.
Diffstat (limited to 'libmproxy')
-rw-r--r--libmproxy/console/__init__.py189
-rw-r--r--libmproxy/console/statusbar.py180
2 files changed, 187 insertions, 182 deletions
diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py
index f3c8ee12..5f564a20 100644
--- a/libmproxy/console/__init__.py
+++ b/libmproxy/console/__init__.py
@@ -1,6 +1,5 @@
from __future__ import absolute_import
-import glob
import mailcap
import mimetypes
import tempfile
@@ -11,191 +10,17 @@ import signal
import stat
import subprocess
import sys
-import time
import traceback
import urwid
import weakref
-from .. import controller, utils, flow, script
+from .. import controller, flow, script
from . import flowlist, flowview, help, common
-from . import grideditor, palettes, contentview, flowdetailview, pathedit
+from . import grideditor, palettes, contentview, flowdetailview, statusbar
EVENTLOG_SIZE = 500
-class ActionBar(urwid.WidgetWrap):
- def __init__(self):
- self.message("")
-
- def selectable(self):
- return True
-
- def path_prompt(self, prompt, text):
- self.expire = None
- self._w = pathedit.PathEdit(prompt, text)
-
- def prompt(self, prompt, text = ""):
- self.expire = None
- self._w = urwid.Edit(prompt, text or "")
-
- def message(self, message, expire=None):
- self.expire = expire
- self._w = urwid.Text(message)
-
-
-class StatusBar(urwid.WidgetWrap):
- def __init__(self, master, helptext):
- self.master, self.helptext = master, helptext
- self.ab = ActionBar()
- self.ib = urwid.WidgetWrap(urwid.Text(""))
- self._w = urwid.Pile([self.ib, self.ab])
-
- def get_status(self):
- r = []
-
- if self.master.setheaders.count():
- r.append("[")
- r.append(("heading_key", "H"))
- r.append("eaders]")
- if self.master.replacehooks.count():
- r.append("[")
- r.append(("heading_key", "R"))
- r.append("eplacing]")
- if self.master.client_playback:
- r.append("[")
- r.append(("heading_key", "cplayback"))
- r.append(":%s to go]"%self.master.client_playback.count())
- if self.master.server_playback:
- r.append("[")
- r.append(("heading_key", "splayback"))
- if self.master.nopop:
- r.append(":%s in file]"%self.master.server_playback.count())
- else:
- r.append(":%s to go]"%self.master.server_playback.count())
- if self.master.get_ignore_filter():
- r.append("[")
- r.append(("heading_key", "I"))
- r.append("gnore:%d]" % len(self.master.get_ignore_filter()))
- if self.master.get_tcp_filter():
- r.append("[")
- r.append(("heading_key", "T"))
- r.append("CP:%d]" % len(self.master.get_tcp_filter()))
- if self.master.state.intercept_txt:
- r.append("[")
- r.append(("heading_key", "i"))
- r.append(":%s]"%self.master.state.intercept_txt)
- if self.master.state.limit_txt:
- r.append("[")
- r.append(("heading_key", "l"))
- r.append(":%s]"%self.master.state.limit_txt)
- if self.master.stickycookie_txt:
- r.append("[")
- r.append(("heading_key", "t"))
- r.append(":%s]"%self.master.stickycookie_txt)
- if self.master.stickyauth_txt:
- r.append("[")
- r.append(("heading_key", "u"))
- r.append(":%s]"%self.master.stickyauth_txt)
- if self.master.state.default_body_view.name != "Auto":
- r.append("[")
- r.append(("heading_key", "M"))
- r.append(":%s]"%self.master.state.default_body_view.name)
-
- opts = []
- if self.master.anticache:
- opts.append("anticache")
- if self.master.anticomp:
- opts.append("anticomp")
- if self.master.showhost:
- opts.append("showhost")
- if not self.master.refresh_server_playback:
- opts.append("norefresh")
- if self.master.killextra:
- opts.append("killextra")
- if self.master.server.config.no_upstream_cert:
- opts.append("no-upstream-cert")
- if self.master.state.follow_focus:
- opts.append("following")
- if self.master.stream_large_bodies:
- opts.append("stream:%s" % utils.pretty_size(self.master.stream_large_bodies.max_size))
-
- if opts:
- r.append("[%s]"%(":".join(opts)))
-
- if self.master.server.config.mode in ["reverse", "upstream"]:
- dst = self.master.server.config.mode.dst
- scheme = "https" if dst[0] else "http"
- if dst[1] != dst[0]:
- scheme += "2https" if dst[1] else "http"
- r.append("[dest:%s]"%utils.unparse_url(scheme, *dst[2:]))
- if self.master.scripts:
- r.append("[")
- r.append(("heading_key", "s"))
- r.append("cripts:%s]"%len(self.master.scripts))
- # r.append("[lt:%0.3f]"%self.master.looptime)
-
- if self.master.stream:
- r.append("[W:%s]"%self.master.stream_path)
-
- return r
-
- def redraw(self):
- if self.ab.expire and time.time() > self.ab.expire:
- self.message("")
-
- fc = self.master.state.flow_count()
- if self.master.state.focus is None:
- offset = 0
- else:
- offset = min(self.master.state.focus + 1, fc)
- t = [
- ('heading', ("[%s/%s]"%(offset, fc)).ljust(9))
- ]
-
- if self.master.server.bound:
- host = self.master.server.address.host
- if host == "0.0.0.0":
- host = "*"
- boundaddr = "[%s:%s]"%(host, self.master.server.address.port)
- else:
- boundaddr = ""
- t.extend(self.get_status())
- status = urwid.AttrWrap(urwid.Columns([
- urwid.Text(t),
- urwid.Text(
- [
- self.helptext,
- boundaddr
- ],
- align="right"
- ),
- ]), "heading")
- self.ib._w = status
-
- def update(self, text):
- self.helptext = text
- self.redraw()
- self.master.loop.draw_screen()
-
- def selectable(self):
- return True
-
- def get_edit_text(self):
- return self.ab._w.get_edit_text()
-
- def path_prompt(self, prompt, text):
- return self.ab.path_prompt(prompt, text)
-
- def prompt(self, prompt, text = ""):
- self.ab.prompt(prompt, text)
-
- def message(self, msg, expire=None):
- if expire:
- expire = time.time() + float(expire)/1000
- self.ab.message(msg, expire)
- self.master.loop.draw_screen()
-
-
class ConsoleState(flow.State):
def __init__(self):
flow.State.__init__(self)
@@ -763,7 +588,7 @@ class ConsoleMaster(flow.FlowMaster):
self.help_context,
(self.statusbar, self.body, self.header)
)
- self.statusbar = StatusBar(self, help.footer)
+ self.statusbar = statusbar.StatusBar(self, help.footer)
self.body = h
self.header = None
self.loop.widget = self.make_view()
@@ -774,7 +599,7 @@ class ConsoleMaster(flow.FlowMaster):
flow,
(self.statusbar, self.body, self.header)
)
- self.statusbar = StatusBar(self, flowdetailview.footer)
+ self.statusbar = statusbar.StatusBar(self, flowdetailview.footer)
self.body = h
self.header = None
self.loop.widget = self.make_view()
@@ -783,7 +608,7 @@ class ConsoleMaster(flow.FlowMaster):
self.body = ge
self.header = None
self.help_context = ge.make_help()
- self.statusbar = StatusBar(self, grideditor.footer)
+ self.statusbar = statusbar.StatusBar(self, grideditor.footer)
self.loop.widget = self.make_view()
def view_flowlist(self):
@@ -796,7 +621,7 @@ class ConsoleMaster(flow.FlowMaster):
self.body = flowlist.BodyPile(self)
else:
self.body = flowlist.FlowListBox(self)
- self.statusbar = StatusBar(self, flowlist.footer)
+ self.statusbar = statusbar.StatusBar(self, flowlist.footer)
self.header = None
self.state.view_mode = common.VIEW_LIST
@@ -806,7 +631,7 @@ class ConsoleMaster(flow.FlowMaster):
def view_flow(self, flow):
self.body = flowview.FlowView(self, self.state, flow)
self.header = flowview.FlowViewHeader(self, flow)
- self.statusbar = StatusBar(self, flowview.footer)
+ self.statusbar = statusbar.StatusBar(self, flowview.footer)
self.state.set_focus_flow(flow)
self.state.view_mode = common.VIEW_FLOW
self.loop.widget = self.make_view()
diff --git a/libmproxy/console/statusbar.py b/libmproxy/console/statusbar.py
new file mode 100644
index 00000000..4fb717cd
--- /dev/null
+++ b/libmproxy/console/statusbar.py
@@ -0,0 +1,180 @@
+
+import time
+
+import urwid
+
+from . import pathedit
+from .. import utils
+
+
+class ActionBar(urwid.WidgetWrap):
+ def __init__(self):
+ self.message("")
+
+ def selectable(self):
+ return True
+
+ def path_prompt(self, prompt, text):
+ self.expire = None
+ self._w = pathedit.PathEdit(prompt, text)
+
+ def prompt(self, prompt, text = ""):
+ self.expire = None
+ self._w = urwid.Edit(prompt, text or "")
+
+ def message(self, message, expire=None):
+ self.expire = expire
+ self._w = urwid.Text(message)
+
+
+class StatusBar(urwid.WidgetWrap):
+ def __init__(self, master, helptext):
+ self.master, self.helptext = master, helptext
+ self.ab = ActionBar()
+ self.ib = urwid.WidgetWrap(urwid.Text(""))
+ self._w = urwid.Pile([self.ib, self.ab])
+
+ def get_status(self):
+ r = []
+
+ if self.master.setheaders.count():
+ r.append("[")
+ r.append(("heading_key", "H"))
+ r.append("eaders]")
+ if self.master.replacehooks.count():
+ r.append("[")
+ r.append(("heading_key", "R"))
+ r.append("eplacing]")
+ if self.master.client_playback:
+ r.append("[")
+ r.append(("heading_key", "cplayback"))
+ r.append(":%s to go]"%self.master.client_playback.count())
+ if self.master.server_playback:
+ r.append("[")
+ r.append(("heading_key", "splayback"))
+ if self.master.nopop:
+ r.append(":%s in file]"%self.master.server_playback.count())
+ else:
+ r.append(":%s to go]"%self.master.server_playback.count())
+ if self.master.get_ignore_filter():
+ r.append("[")
+ r.append(("heading_key", "I"))
+ r.append("gnore:%d]" % len(self.master.get_ignore_filter()))
+ if self.master.get_tcp_filter():
+ r.append("[")
+ r.append(("heading_key", "T"))
+ r.append("CP:%d]" % len(self.master.get_tcp_filter()))
+ if self.master.state.intercept_txt:
+ r.append("[")
+ r.append(("heading_key", "i"))
+ r.append(":%s]"%self.master.state.intercept_txt)
+ if self.master.state.limit_txt:
+ r.append("[")
+ r.append(("heading_key", "l"))
+ r.append(":%s]"%self.master.state.limit_txt)
+ if self.master.stickycookie_txt:
+ r.append("[")
+ r.append(("heading_key", "t"))
+ r.append(":%s]"%self.master.stickycookie_txt)
+ if self.master.stickyauth_txt:
+ r.append("[")
+ r.append(("heading_key", "u"))
+ r.append(":%s]"%self.master.stickyauth_txt)
+ if self.master.state.default_body_view.name != "Auto":
+ r.append("[")
+ r.append(("heading_key", "M"))
+ r.append(":%s]"%self.master.state.default_body_view.name)
+
+ opts = []
+ if self.master.anticache:
+ opts.append("anticache")
+ if self.master.anticomp:
+ opts.append("anticomp")
+ if self.master.showhost:
+ opts.append("showhost")
+ if not self.master.refresh_server_playback:
+ opts.append("norefresh")
+ if self.master.killextra:
+ opts.append("killextra")
+ if self.master.server.config.no_upstream_cert:
+ opts.append("no-upstream-cert")
+ if self.master.state.follow_focus:
+ opts.append("following")
+ if self.master.stream_large_bodies:
+ opts.append("stream:%s" % utils.pretty_size(self.master.stream_large_bodies.max_size))
+
+ if opts:
+ r.append("[%s]"%(":".join(opts)))
+
+ if self.master.server.config.mode in ["reverse", "upstream"]:
+ dst = self.master.server.config.mode.dst
+ scheme = "https" if dst[0] else "http"
+ if dst[1] != dst[0]:
+ scheme += "2https" if dst[1] else "http"
+ r.append("[dest:%s]"%utils.unparse_url(scheme, *dst[2:]))
+ if self.master.scripts:
+ r.append("[")
+ r.append(("heading_key", "s"))
+ r.append("cripts:%s]"%len(self.master.scripts))
+ # r.append("[lt:%0.3f]"%self.master.looptime)
+
+ if self.master.stream:
+ r.append("[W:%s]"%self.master.stream_path)
+
+ return r
+
+ def redraw(self):
+ if self.ab.expire and time.time() > self.ab.expire:
+ self.message("")
+
+ fc = self.master.state.flow_count()
+ if self.master.state.focus is None:
+ offset = 0
+ else:
+ offset = min(self.master.state.focus + 1, fc)
+ t = [
+ ('heading', ("[%s/%s]"%(offset, fc)).ljust(9))
+ ]
+
+ if self.master.server.bound:
+ host = self.master.server.address.host
+ if host == "0.0.0.0":
+ host = "*"
+ boundaddr = "[%s:%s]"%(host, self.master.server.address.port)
+ else:
+ boundaddr = ""
+ t.extend(self.get_status())
+ status = urwid.AttrWrap(urwid.Columns([
+ urwid.Text(t),
+ urwid.Text(
+ [
+ self.helptext,
+ boundaddr
+ ],
+ align="right"
+ ),
+ ]), "heading")
+ self.ib._w = status
+
+ def update(self, text):
+ self.helptext = text
+ self.redraw()
+ self.master.loop.draw_screen()
+
+ def selectable(self):
+ return True
+
+ def get_edit_text(self):
+ return self.ab._w.get_edit_text()
+
+ def path_prompt(self, prompt, text):
+ return self.ab.path_prompt(prompt, text)
+
+ def prompt(self, prompt, text = ""):
+ self.ab.prompt(prompt, text)
+
+ def message(self, msg, expire=None):
+ if expire:
+ expire = time.time() + float(expire)/1000
+ self.ab.message(msg, expire)
+ self.master.loop.draw_screen()