aboutsummaryrefslogtreecommitdiffstats
path: root/libmproxy/console
diff options
context:
space:
mode:
Diffstat (limited to 'libmproxy/console')
-rw-r--r--libmproxy/console/__init__.py445
-rw-r--r--libmproxy/console/common.py56
-rw-r--r--libmproxy/console/contentview.py90
-rw-r--r--libmproxy/console/flowlist.py54
-rw-r--r--libmproxy/console/flowview.py18
-rw-r--r--libmproxy/console/grideditor.py76
-rw-r--r--libmproxy/console/help.py11
7 files changed, 428 insertions, 322 deletions
diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py
index cc8a0c1f..70bd46d3 100644
--- a/libmproxy/console/__init__.py
+++ b/libmproxy/console/__init__.py
@@ -1,14 +1,25 @@
from __future__ import absolute_import
-import mailcap, mimetypes, tempfile, os, subprocess, glob, time, shlex, stat
-import os.path, sys, weakref, traceback
-import urwid
-from .. import controller, utils, flow, script, proxy
-from . import flowlist, flowview, help, common, grideditor, palettes, contentview, flowdetailview
-EVENTLOG_SIZE = 500
+import glob
+import mailcap
+import mimetypes
+import tempfile
+import os
+import os.path
+import shlex
+import stat
+import subprocess
+import sys
+import time
+import traceback
+import urwid
+import weakref
+from .. import controller, utils, flow, script
+from . import flowlist, flowview, help, common
+from . import grideditor, palettes, contentview, flowdetailview
-class Stop(Exception): pass
+EVENTLOG_SIZE = 500
class _PathCompleter:
@@ -58,7 +69,6 @@ class _PathCompleter:
self.final = ret[1]
return ret[0]
-#begin nocover
class PathEdit(urwid.Edit, _PathCompleter):
def __init__(self, *args, **kwargs):
@@ -75,7 +85,7 @@ class PathEdit(urwid.Edit, _PathCompleter):
return urwid.Edit.keypress(self, size, key)
-class ActionBar(common.WWrap):
+class ActionBar(urwid.WidgetWrap):
def __init__(self):
self.message("")
@@ -84,7 +94,7 @@ class ActionBar(common.WWrap):
def path_prompt(self, prompt, text):
self.expire = None
- self.w = PathEdit(prompt, text)
+ self._w = PathEdit(prompt, text)
def prompt(self, prompt, text = ""):
self.expire = None
@@ -93,19 +103,19 @@ class ActionBar(common.WWrap):
# We can remove it once veryone is beyond 1.0.1
if isinstance(prompt, basestring):
prompt = unicode(prompt)
- self.w = urwid.Edit(prompt, text or "")
+ self._w = urwid.Edit(prompt, text or "")
def message(self, message, expire=None):
self.expire = expire
- self.w = urwid.Text(message)
+ self._w = urwid.Text(message)
-class StatusBar(common.WWrap):
+class StatusBar(urwid.WidgetWrap):
def __init__(self, master, helptext):
self.master, self.helptext = master, helptext
self.ab = ActionBar()
- self.ib = common.WWrap(urwid.Text(""))
- self.w = urwid.Pile([self.ib, self.ab])
+ self.ib = urwid.WidgetWrap(urwid.Text(""))
+ self._w = urwid.Pile([self.ib, self.ab])
def get_status(self):
r = []
@@ -227,12 +237,12 @@ class StatusBar(common.WWrap):
align="right"
),
]), "heading")
- self.ib.set_w(status)
+ self.ib._w = status
def update(self, text):
self.helptext = text
self.redraw()
- self.master.drawscreen()
+ self.master.loop.draw_screen()
def selectable(self):
return True
@@ -250,11 +260,9 @@ class StatusBar(common.WWrap):
if expire:
expire = time.time() + float(expire)/1000
self.ab.message(msg, expire)
- self.master.drawscreen()
+ self.master.loop.draw_screen()
-#end nocover
-
class ConsoleState(flow.State):
def __init__(self):
flow.State.__init__(self)
@@ -337,7 +345,6 @@ class ConsoleState(flow.State):
super(ConsoleState, self).clear()
-
class Options(object):
attributes = [
"app",
@@ -367,6 +374,7 @@ class Options(object):
"nopop",
"palette",
]
+
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
@@ -375,11 +383,9 @@ class Options(object):
setattr(self, i, None)
-#begin nocover
-
-
class ConsoleMaster(flow.FlowMaster):
palette = []
+
def __init__(self, server, options):
flow.FlowMaster.__init__(self, server, ConsoleState())
self.looptime = 0
@@ -439,7 +445,10 @@ class ConsoleMaster(flow.FlowMaster):
sys.exit(1)
if options.outfile:
- err = self.start_stream_to_path(options.outfile[0], options.outfile[1])
+ err = self.start_stream_to_path(
+ options.outfile[0],
+ options.outfile[1]
+ )
if err:
print >> sys.stderr, "Stream file error:", err
sys.exit(1)
@@ -550,7 +559,7 @@ class ConsoleMaster(flow.FlowMaster):
except:
self.statusbar.message("Can't start editor: %s" % " ".join(c))
else:
- data = open(name,"rb").read()
+ data = open(name, "rb").read()
self.ui.start()
os.unlink(name)
return data
@@ -587,13 +596,172 @@ class ConsoleMaster(flow.FlowMaster):
try:
subprocess.call(cmd, shell=shell)
except:
- self.statusbar.message("Can't start external viewer: %s" % " ".join(c))
+ self.statusbar.message(
+ "Can't start external viewer: %s" % " ".join(c)
+ )
self.ui.start()
os.unlink(name)
def set_palette(self, name):
self.palette = palettes.palettes[name]
+ def input_filter(self, keys, raw):
+ for k in keys:
+ if self.prompting:
+ if k == "esc":
+ self.prompt_cancel()
+ elif self.onekey:
+ if k == "enter":
+ self.prompt_cancel()
+ elif k in self.onekey:
+ self.prompt_execute(k)
+ elif k == "enter":
+ self.prompt_execute()
+ else:
+ self.view.keypress(self.loop.screen_size, k)
+ else:
+ k = self.view.keypress(self.loop.screen_size, k)
+ if k:
+ self.statusbar.message("")
+ if k == "?":
+ self.view_help()
+ elif k == "c":
+ if not self.client_playback:
+ self.path_prompt(
+ "Client replay: ",
+ self.state.last_saveload,
+ self.client_playback_path
+ )
+ else:
+ self.prompt_onekey(
+ "Stop current client replay?",
+ (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ self.stop_client_playback_prompt,
+ )
+ elif k == "H":
+ self.view_grideditor(
+ grideditor.SetHeadersEditor(
+ self,
+ self.setheaders.get_specs(),
+ self.setheaders.set
+ )
+ )
+ elif k == "I":
+ self.view_grideditor(
+ grideditor.HostPatternEditor(
+ self,
+ [[x] for x in self.get_ignore_filter()],
+ self.edit_ignore_filter
+ )
+ )
+ elif k == "T":
+ self.view_grideditor(
+ grideditor.HostPatternEditor(
+ self,
+ [[x] for x in self.get_tcp_filter()],
+ self.edit_tcp_filter
+ )
+ )
+ elif k == "i":
+ self.prompt(
+ "Intercept filter: ",
+ self.state.intercept_txt,
+ self.set_intercept
+ )
+ elif k == "Q":
+ raise urwid.ExitMainLoop
+ elif k == "q":
+ self.prompt_onekey(
+ "Quit",
+ (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ self.quit,
+ )
+ elif k == "M":
+ self.prompt_onekey(
+ "Global default display mode",
+ contentview.view_prompts,
+ self.change_default_display_mode
+ )
+ elif k == "R":
+ self.view_grideditor(
+ grideditor.ReplaceEditor(
+ self,
+ self.replacehooks.get_specs(),
+ self.replacehooks.set
+ )
+ )
+ elif k == "s":
+ self.view_grideditor(
+ grideditor.ScriptEditor(
+ self,
+ [[i.command] for i in self.scripts],
+ self.edit_scripts
+ )
+ )
+ #if self.scripts:
+ # self.load_script(None)
+ #else:
+ # self.path_prompt(
+ # "Set script: ",
+ # self.state.last_script,
+ # self.set_script
+ # )
+ elif k == "S":
+ if not self.server_playback:
+ self.path_prompt(
+ "Server replay path: ",
+ self.state.last_saveload,
+ self.server_playback_path
+ )
+ else:
+ self.prompt_onekey(
+ "Stop current server replay?",
+ (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ self.stop_server_playback_prompt,
+ )
+ elif k == "o":
+ self.prompt_onekey(
+ "Options",
+ (
+ ("anticache", "a"),
+ ("anticomp", "c"),
+ ("showhost", "h"),
+ ("killextra", "k"),
+ ("norefresh", "n"),
+ ("no-upstream-certs", "u"),
+ ),
+ self._change_options
+ )
+ elif k == "t":
+ self.prompt(
+ "Sticky cookie filter: ",
+ self.stickycookie_txt,
+ self.set_stickycookie
+ )
+ elif k == "u":
+ self.prompt(
+ "Sticky auth filter: ",
+ self.stickyauth_txt,
+ self.set_stickyauth
+ )
+ self.statusbar.redraw()
+
+ def ticker(self, *userdata):
+ changed = self.tick(self.masterq, timeout=0)
+ if changed:
+ self.loop.draw_screen()
+ self.statusbar.redraw()
+ self.loop.set_alarm_in(0.01, self.ticker)
+
def run(self):
self.ui = urwid.raw_display.Screen()
self.ui.set_terminal_properties(256)
@@ -606,8 +774,13 @@ class ConsoleMaster(flow.FlowMaster):
self.help_context = None
self.prompting = False
self.onekey = False
-
+ self.loop = urwid.MainLoop(
+ self.view,
+ screen = self.ui,
+ input_filter = self.input_filter
+ )
self.view_flowlist()
+ self.statusbar.redraw()
self.server.start_slave(
controller.Slave,
@@ -627,46 +800,57 @@ class ConsoleMaster(flow.FlowMaster):
print >> sys.stderr, "Could not load file:", ret
sys.exit(1)
+ self.loop.set_alarm_in(0.01, self.ticker)
try:
- self.ui.run_wrapper(self.loop)
+ self.loop.run()
except Exception:
- self.ui.stop()
+ self.loop.stop()
sys.stdout.flush()
print >> sys.stderr, traceback.format_exc()
print >> sys.stderr, "mitmproxy has crashed!"
- print >> sys.stderr, "Please lodge a bug report at: https://github.com/mitmproxy/mitmproxy"
- print >> sys.stderr, "Shutting down..."
+ print >> sys.stderr, "Please lodge a bug report at:"
+ print >> sys.stderr, "\thttps://github.com/mitmproxy/mitmproxy"
+ print >> sys.stderr, "Shutting down..."
sys.stderr.flush()
self.shutdown()
def make_view(self):
self.view = urwid.Frame(
- self.body,
- header = self.header,
- footer = self.statusbar
- )
+ self.body,
+ header = self.header,
+ footer = self.statusbar
+ )
self.view.set_focus("body")
+ return self.view
def view_help(self):
- h = help.HelpView(self, self.help_context, (self.statusbar, self.body, self.header))
+ h = help.HelpView(
+ self,
+ self.help_context,
+ (self.statusbar, self.body, self.header)
+ )
self.statusbar = StatusBar(self, help.footer)
self.body = h
self.header = None
- self.make_view()
+ self.loop.widget = self.make_view()
def view_flowdetails(self, flow):
- h = flowdetailview.FlowDetailsView(self, flow, (self.statusbar, self.body, self.header))
+ h = flowdetailview.FlowDetailsView(
+ self,
+ flow,
+ (self.statusbar, self.body, self.header)
+ )
self.statusbar = StatusBar(self, flowdetailview.footer)
self.body = h
self.header = None
- self.make_view()
+ self.loop.widget = self.make_view()
def view_grideditor(self, ge):
self.body = ge
self.header = None
self.help_context = ge.make_help()
self.statusbar = StatusBar(self, grideditor.footer)
- self.make_view()
+ self.loop.widget = self.make_view()
def view_flowlist(self):
if self.ui.started:
@@ -682,7 +866,7 @@ class ConsoleMaster(flow.FlowMaster):
self.header = None
self.state.view_mode = common.VIEW_LIST
- self.make_view()
+ self.loop.widget = self.make_view()
self.help_context = flowlist.help_context
def view_flow(self, flow):
@@ -691,7 +875,7 @@ class ConsoleMaster(flow.FlowMaster):
self.statusbar = StatusBar(self, flowview.footer)
self.state.set_focus_flow(flow)
self.state.view_mode = common.VIEW_FLOW
- self.make_view()
+ self.loop.widget = self.make_view()
self.help_context = flowview.help_context
def _write_flows(self, path, flows):
@@ -796,12 +980,6 @@ class ConsoleMaster(flow.FlowMaster):
self.state.default_body_view = v
self.refresh_focus()
- def drawscreen(self):
- size = self.ui.get_cols_rows()
- canvas = self.view.render(size, focus=1)
- self.ui.draw_screen(size, canvas)
- return size
-
def pop_view(self):
if self.state.view_mode == common.VIEW_FLOW:
self.view_flow(self.state.view[self.state.focus])
@@ -825,170 +1003,6 @@ class ConsoleMaster(flow.FlowMaster):
patterns = (x[0] for x in tcp)
self.set_tcp_filter(patterns)
- def loop(self):
- changed = True
- try:
- while not self.should_exit.is_set():
- startloop = time.time()
- if changed:
- self.statusbar.redraw()
- size = self.drawscreen()
- changed = self.tick(self.masterq, timeout=0.1)
- self.ui.set_input_timeouts(max_wait=0)
- keys = self.ui.get_input()
- if keys:
- changed = True
- for k in keys:
- if self.prompting:
- if k == "esc":
- self.prompt_cancel()
- elif self.onekey:
- if k == "enter":
- self.prompt_cancel()
- elif k in self.onekey:
- self.prompt_execute(k)
- elif k == "enter":
- self.prompt_execute()
- else:
- self.view.keypress(size, k)
- else:
- k = self.view.keypress(size, k)
- if k:
- self.statusbar.message("")
- if k == "?":
- self.view_help()
- elif k == "c":
- if not self.client_playback:
- self.path_prompt(
- "Client replay: ",
- self.state.last_saveload,
- self.client_playback_path
- )
- else:
- self.prompt_onekey(
- "Stop current client replay?",
- (
- ("yes", "y"),
- ("no", "n"),
- ),
- self.stop_client_playback_prompt,
- )
- elif k == "H":
- self.view_grideditor(
- grideditor.SetHeadersEditor(
- self,
- self.setheaders.get_specs(),
- self.setheaders.set
- )
- )
- elif k == "I":
- self.view_grideditor(
- grideditor.HostPatternEditor(
- self,
- [[x] for x in self.get_ignore_filter()],
- self.edit_ignore_filter
- )
- )
- elif k == "T":
- self.view_grideditor(
- grideditor.HostPatternEditor(
- self,
- [[x] for x in self.get_tcp_filter()],
- self.edit_tcp_filter
- )
- )
- elif k == "i":
- self.prompt(
- "Intercept filter: ",
- self.state.intercept_txt,
- self.set_intercept
- )
- elif k == "Q":
- raise Stop
- elif k == "q":
- self.prompt_onekey(
- "Quit",
- (
- ("yes", "y"),
- ("no", "n"),
- ),
- self.quit,
- )
- elif k == "M":
- self.prompt_onekey(
- "Global default display mode",
- contentview.view_prompts,
- self.change_default_display_mode
- )
- elif k == "R":
- self.view_grideditor(
- grideditor.ReplaceEditor(
- self,
- self.replacehooks.get_specs(),
- self.replacehooks.set
- )
- )
- elif k == "s":
- self.view_grideditor(
- grideditor.ScriptEditor(
- self,
- [[i.command] for i in self.scripts],
- self.edit_scripts
- )
- )
- #if self.scripts:
- # self.load_script(None)
- #else:
- # self.path_prompt(
- # "Set script: ",
- # self.state.last_script,
- # self.set_script
- # )
- elif k == "S":
- if not self.server_playback:
- self.path_prompt(
- "Server replay path: ",
- self.state.last_saveload,
- self.server_playback_path
- )
- else:
- self.prompt_onekey(
- "Stop current server replay?",
- (
- ("yes", "y"),
- ("no", "n"),
- ),
- self.stop_server_playback_prompt,
- )
- elif k == "o":
- self.prompt_onekey(
- "Options",
- (
- ("anticache", "a"),
- ("anticomp", "c"),
- ("showhost", "h"),
- ("killextra", "k"),
- ("norefresh", "n"),
- ("no-upstream-certs", "u"),
- ),
- self._change_options
- )
- elif k == "t":
- self.prompt(
- "Sticky cookie filter: ",
- self.stickycookie_txt,
- self.set_stickycookie
- )
- elif k == "u":
- self.prompt(
- "Sticky auth filter: ",
- self.stickyauth_txt,
- self.set_stickyauth
- )
- self.looptime = time.time() - startloop
- except (Stop, KeyboardInterrupt):
- pass
-
def stop_client_playback_prompt(self, a):
if a != "n":
self.stop_client_playback()
@@ -999,7 +1013,7 @@ class ConsoleMaster(flow.FlowMaster):
def quit(self, a):
if a != "n":
- raise Stop
+ raise urwid.ExitMainLoop
def _change_options(self, a):
if a == "a":
@@ -1015,7 +1029,8 @@ class ConsoleMaster(flow.FlowMaster):
elif a == "n":
self.refresh_server_playback = not self.refresh_server_playback
elif a == "u":
- self.server.config.no_upstream_cert = not self.server.config.no_upstream_cert
+ self.server.config.no_upstream_cert =\
+ not self.server.config.no_upstream_cert
def shutdown(self):
self.state.killall(self)
diff --git a/libmproxy/console/common.py b/libmproxy/console/common.py
index fa21c93e..3a708c7c 100644
--- a/libmproxy/console/common.py
+++ b/libmproxy/console/common.py
@@ -1,7 +1,9 @@
from __future__ import absolute_import
+
import urwid
import urwid.util
import os
+
from .. import utils
from ..protocol.http import CONTENT_MISSING, decoded
@@ -41,6 +43,8 @@ def highlight_key(s, k):
KEY_MAX = 30
+
+
def format_keyvals(lst, key="key", val="text", indent=0):
"""
Format a list of (key, value) tuples.
@@ -103,10 +107,8 @@ else:
SYMBOL_RETURN = u"<-"
-
def raw_format_flow(f, focus, extended, padding):
f = dict(f)
-
pile = []
req = []
if extended:
@@ -122,7 +124,7 @@ def raw_format_flow(f, focus, extended, padding):
req.append(fcol(SYMBOL_REPLAY, "replay"))
req.append(fcol(f["req_method"], "method"))
- preamble = sum(i[1] for i in req) + len(req) -1
+ preamble = sum(i[1] for i in req) + len(req) - 1
if f["intercepted"] and not f["acked"]:
uc = "intercept"
@@ -284,15 +286,16 @@ def ask_copy_part(scope, flow, master, state):
def ask_save_body(part, master, state, flow):
"""
- Save either the request or the response body to disk.
- part can either be "q" (request), "s" (response) or None (ask user if necessary).
+ Save either the request or the response body to disk. part can either be
+ "q" (request), "s" (response) or None (ask user if necessary).
"""
request_has_content = flow.request and flow.request.content
response_has_content = flow.response and flow.response.content
if part is None:
- # We first need to determine whether we want to save the request or the response content.
+ # We first need to determine whether we want to save the request or the
+ # response content.
if request_has_content and response_has_content:
master.prompt_onekey(
"Save",
@@ -311,9 +314,19 @@ def ask_save_body(part, master, state, flow):
ask_save_body("q", master, state, flow)
elif part == "q" and request_has_content:
- ask_save_path("Save request content: ", flow.request.get_decoded_content(), master, state)
+ ask_save_path(
+ "Save request content: ",
+ flow.request.get_decoded_content(),
+ master,
+ state
+ )
elif part == "s" and response_has_content:
- ask_save_path("Save response content: ", flow.response.get_decoded_content(), master, state)
+ ask_save_path(
+ "Save response content: ",
+ flow.response.get_decoded_content(),
+ master,
+ state
+ )
else:
master.statusbar.message("No content to save.")
@@ -348,7 +361,6 @@ def format_flow(f, focus, extended=False, hostheader=False, padding=2):
duration = 0
if f.response.timestamp_end and f.request.timestamp_start:
duration = f.response.timestamp_end - f.request.timestamp_start
- size = f.response.size()
roundtrip = utils.pretty_duration(duration)
d.update(dict(
@@ -362,26 +374,6 @@ def format_flow(f, focus, extended=False, hostheader=False, padding=2):
d["resp_ctype"] = t[0].split(";")[0]
else:
d["resp_ctype"] = ""
- return flowcache.format_flow(tuple(sorted(d.items())), focus, extended, padding)
-
-
-def int_version(v):
- SIG = 3
- v = urwid.__version__.split("-")[0].split(".")
- x = 0
- for i in range(min(SIG, len(v))):
- x += int(v[i]) * 10**(SIG-i)
- return x
-
-
-# We have to do this to be portable over 0.9.8 and 0.9.9 If compatibility
-# becomes a pain to maintain, we'll just mandate 0.9.9 or newer.
-class WWrap(urwid.WidgetWrap):
- if int_version(urwid.__version__) >= 990:
- def set_w(self, x):
- self._w = x
- def get_w(self):
- return self._w
- w = property(get_w, set_w)
-
-
+ return flowcache.format_flow(
+ tuple(sorted(d.items())), focus, extended, padding
+ )
diff --git a/libmproxy/console/contentview.py b/libmproxy/console/contentview.py
index 582723bb..95d908a4 100644
--- a/libmproxy/console/contentview.py
+++ b/libmproxy/console/contentview.py
@@ -1,14 +1,23 @@
from __future__ import absolute_import
-import logging, subprocess, re, cStringIO, traceback, json, urwid
+import cStringIO
+import json
+import logging
+import lxml.html
+import lxml.etree
from PIL import Image
from PIL.ExifTags import TAGS
+import re
+import subprocess
+import traceback
+import urwid
-import lxml.html, lxml.etree
import netlib.utils
+
from . import common
from .. import utils, encoding, flow
from ..contrib import jsbeautifier, html2text
from ..contrib.wbxml.ASCommandResponse import ASCommandResponse
+
try:
import pyamf
from pyamf import remoting, flex
@@ -62,6 +71,7 @@ class ViewAuto:
name = "Auto"
prompt = ("auto", "a")
content_types = []
+
def __call__(self, hdrs, content, limit):
ctype = hdrs.get_first("content-type")
if ctype:
@@ -78,6 +88,7 @@ class ViewRaw:
name = "Raw"
prompt = ("raw", "r")
content_types = []
+
def __call__(self, hdrs, content, limit):
txt = _view_text(content[:limit], len(content), limit)
return "Raw", txt
@@ -87,6 +98,7 @@ class ViewHex:
name = "Hex"
prompt = ("hex", "e")
content_types = []
+
def __call__(self, hdrs, content, limit):
txt = []
for offset, hexa, s in netlib.utils.hexdump(content[:limit]):
@@ -105,8 +117,14 @@ class ViewXML:
name = "XML"
prompt = ("xml", "x")
content_types = ["text/xml"]
+
def __call__(self, hdrs, content, limit):
- parser = lxml.etree.XMLParser(remove_blank_text=True, resolve_entities=False, strip_cdata=False, recover=False)
+ parser = lxml.etree.XMLParser(
+ remove_blank_text=True,
+ resolve_entities=False,
+ strip_cdata=False,
+ recover=False
+ )
try:
document = lxml.etree.fromstring(content, parser)
except lxml.etree.XMLSyntaxError:
@@ -121,18 +139,18 @@ class ViewXML:
lxml.etree.tostring(p)
)
p = p.getprevious()
- doctype=docinfo.doctype
+ doctype = docinfo.doctype
if prev:
doctype += "\n".join(prev).strip()
doctype = doctype.strip()
s = lxml.etree.tostring(
- document,
- pretty_print=True,
- xml_declaration=True,
- doctype=doctype or None,
- encoding = docinfo.encoding
- )
+ document,
+ pretty_print=True,
+ xml_declaration=True,
+ doctype=doctype or None,
+ encoding = docinfo.encoding
+ )
txt = []
for i in s[:limit].strip().split("\n"):
@@ -147,6 +165,7 @@ class ViewJSON:
name = "JSON"
prompt = ("json", "s")
content_types = ["application/json"]
+
def __call__(self, hdrs, content, limit):
lines = utils.pretty_json(content)
if lines:
@@ -167,12 +186,20 @@ class ViewHTML:
name = "HTML"
prompt = ("html", "h")
content_types = ["text/html"]
+
def __call__(self, hdrs, content, limit):
if utils.isXML(content):
- parser = lxml.etree.HTMLParser(strip_cdata=True, remove_blank_text=True)
+ parser = lxml.etree.HTMLParser(
+ strip_cdata=True,
+ remove_blank_text=True
+ )
d = lxml.html.fromstring(content, parser=parser)
docinfo = d.getroottree().docinfo
- s = lxml.etree.tostring(d, pretty_print=True, doctype=docinfo.doctype)
+ s = lxml.etree.tostring(
+ d,
+ pretty_print=True,
+ doctype=docinfo.doctype
+ )
return "HTML", _view_text(s[:limit], len(s), limit)
@@ -180,6 +207,7 @@ class ViewHTMLOutline:
name = "HTML Outline"
prompt = ("html outline", "o")
content_types = ["text/html"]
+
def __call__(self, hdrs, content, limit):
content = content.decode("utf-8")
h = html2text.HTML2Text(baseurl="")
@@ -194,14 +222,15 @@ class ViewURLEncoded:
name = "URL-encoded"
prompt = ("urlencoded", "u")
content_types = ["application/x-www-form-urlencoded"]
+
def __call__(self, hdrs, content, limit):
lines = utils.urldecode(content)
if lines:
body = common.format_keyvals(
- [(k+":", v) for (k, v) in lines],
- key = "header",
- val = "text"
- )
+ [(k+":", v) for (k, v) in lines],
+ key = "header",
+ val = "text"
+ )
return "URLEncoded form", body
@@ -209,6 +238,7 @@ class ViewMultipart:
name = "Multipart Form"
prompt = ("multipart", "m")
content_types = ["multipart/form-data"]
+
def __call__(self, hdrs, content, limit):
v = hdrs.get_first("content-type")
if v:
@@ -322,12 +352,14 @@ class ViewJavaScript:
"application/javascript",
"text/javascript"
]
+
def __call__(self, hdrs, content, limit):
opts = jsbeautifier.default_options()
opts.indent_size = 2
res = jsbeautifier.beautify(content[:limit], opts)
return "JavaScript", _view_text(res, len(res), limit)
+
class ViewCSS:
name = "CSS"
prompt = ("css", "c")
@@ -355,6 +387,7 @@ class ViewImage:
"image/vnd.microsoft.icon",
"image/x-icon",
]
+
def __call__(self, hdrs, content, limit):
try:
img = Image.open(cStringIO.StringIO(content))
@@ -380,14 +413,17 @@ class ViewImage:
)
clean = []
for i in parts:
- clean.append([netlib.utils.cleanBin(i[0]), netlib.utils.cleanBin(i[1])])
- fmt = common.format_keyvals(
- clean,
- key = "header",
- val = "text"
+ clean.append(
+ [netlib.utils.cleanBin(i[0]), netlib.utils.cleanBin(i[1])]
)
+ fmt = common.format_keyvals(
+ clean,
+ key = "header",
+ val = "text"
+ )
return "%s image"%img.format, fmt
+
class ViewProtobuf:
"""Human friendly view of protocol buffers
The view uses the protoc compiler to decode the binary
@@ -403,7 +439,10 @@ class ViewProtobuf:
@staticmethod
def is_available():
try:
- p = subprocess.Popen(["protoc", "--version"], stdout=subprocess.PIPE)
+ p = subprocess.Popen(
+ ["protoc", "--version"],
+ stdout=subprocess.PIPE
+ )
out, _ = p.communicate()
return out.startswith("libprotoc")
except:
@@ -427,6 +466,7 @@ class ViewProtobuf:
txt = _view_text(decoded[:limit], len(decoded), limit)
return "Protobuf", txt
+
class ViewWBXML:
name = "WBXML"
prompt = ("wbxml", "w")
@@ -436,14 +476,14 @@ class ViewWBXML:
]
def __call__(self, hdrs, content, limit):
-
+
try:
parser = ASCommandResponse(content)
parsedContent = parser.xmlString
txt = _view_text(parsedContent, len(parsedContent), limit)
return "WBXML", txt
except:
- return None
+ return None
views = [
ViewAuto(),
@@ -512,7 +552,7 @@ def get_content_view(viewmode, hdrItems, content, limit, logfunc, is_request):
# Third-party viewers can fail in unexpected ways...
except Exception:
s = traceback.format_exc()
- s = "Content viewer failed: \n" + s
+ s = "Content viewer failed: \n" + s
logfunc(s, "error")
ret = None
if not ret:
diff --git a/libmproxy/console/flowlist.py b/libmproxy/console/flowlist.py
index 2a6a98c8..5d8ad942 100644
--- a/libmproxy/console/flowlist.py
+++ b/libmproxy/console/flowlist.py
@@ -3,6 +3,7 @@ import urwid
from netlib import http
from . import common
+
def _mkhelp():
text = []
keys = [
@@ -35,6 +36,7 @@ footer = [
('heading_key', "?"), ":help ",
]
+
class EventListBox(urwid.ListBox):
def __init__(self, master):
self.master = master
@@ -60,7 +62,10 @@ class BodyPile(urwid.Pile):
self,
[
FlowListBox(master),
- urwid.Frame(EventListBox(master), header = self.inactive_header)
+ urwid.Frame(
+ EventListBox(master),
+ header = self.inactive_header
+ )
]
)
self.master = master
@@ -80,22 +85,26 @@ class BodyPile(urwid.Pile):
# This is essentially a copypasta from urwid.Pile's keypress handler.
# So much for "closed for modification, but open for extension".
item_rows = None
- if len(size)==2:
- item_rows = self.get_item_rows( size, focus=True )
+ if len(size) == 2:
+ item_rows = self.get_item_rows(size, focus = True)
i = self.widget_list.index(self.focus_item)
- tsize = self.get_item_size(size,i,True,item_rows)
- return self.focus_item.keypress( tsize, key )
+ tsize = self.get_item_size(size, i, True, item_rows)
+ return self.focus_item.keypress(tsize, key)
-class ConnectionItem(common.WWrap):
+class ConnectionItem(urwid.WidgetWrap):
def __init__(self, master, state, flow, focus):
self.master, self.state, self.flow = master, state, flow
self.f = focus
w = self.get_text()
- common.WWrap.__init__(self, w)
+ urwid.WidgetWrap.__init__(self, w)
def get_text(self):
- return common.format_flow(self.flow, self.f, hostheader=self.master.showhost)
+ return common.format_flow(
+ self.flow,
+ self.f,
+ hostheader = self.master.showhost
+ )
def selectable(self):
return True
@@ -125,7 +134,8 @@ class ConnectionItem(common.WWrap):
[i.copy() for i in self.master.state.view],
self.master.killextra, self.master.rheaders,
False, self.master.nopop,
- self.master.options.replay_ignore_params, self.master.options.replay_ignore_content,
+ self.master.options.replay_ignore_params,
+ self.master.options.replay_ignore_content,
self.master.options.replay_ignore_payload_params,
self.master.options.replay_ignore_host
)
@@ -134,7 +144,8 @@ class ConnectionItem(common.WWrap):
[self.flow.copy()],
self.master.killextra, self.master.rheaders,
False, self.master.nopop,
- self.master.options.replay_ignore_params, self.master.options.replay_ignore_content,
+ self.master.options.replay_ignore_params,
+ self.master.options.replay_ignore_content,
self.master.options.replay_ignore_payload_params,
self.master.options.replay_ignore_host
)
@@ -251,7 +262,7 @@ class FlowListBox(urwid.ListBox):
def get_method_raw(self, k):
if k:
- self.get_url(k)
+ self.get_url(k)
def get_method(self, k):
if k == "e":
@@ -263,8 +274,13 @@ class FlowListBox(urwid.ListBox):
method = i[0].upper()
self.get_url(method)
- def get_url(self,method):
- self.master.prompt("URL:", "http://www.example.com/", self.new_request, method)
+ def get_url(self, method):
+ self.master.prompt(
+ "URL:",
+ "http://www.example.com/",
+ self.new_request,
+ method
+ )
def new_request(self, url, method):
parts = http.parse_url(str(url))
@@ -285,7 +301,11 @@ class FlowListBox(urwid.ListBox):
elif key == "e":
self.master.toggle_eventlog()
elif key == "l":
- self.master.prompt("Limit: ", self.master.state.limit_txt, self.master.set_limit)
+ self.master.prompt(
+ "Limit: ",
+ self.master.state.limit_txt,
+ self.master.set_limit
+ )
elif key == "L":
self.master.path_prompt(
"Load flows: ",
@@ -293,7 +313,11 @@ class FlowListBox(urwid.ListBox):
self.master.load_flows_callback
)
elif key == "n":
- self.master.prompt_onekey("Method", common.METHOD_OPTIONS, self.get_method)
+ self.master.prompt_onekey(
+ "Method",
+ common.METHOD_OPTIONS,
+ self.get_method
+ )
elif key == "F":
self.master.toggle_follow_flows()
elif key == "W":
diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py
index 5c91512c..89e75aad 100644
--- a/libmproxy/console/flowview.py
+++ b/libmproxy/console/flowview.py
@@ -19,7 +19,7 @@ def _mkhelp():
("D", "duplicate flow"),
("e", "edit request/response"),
("f", "load full body data"),
- ("g", "copy response(content/headers) to clipboard"),
+ ("g", "copy response(content/headers) to clipboard"),
("m", "change body display mode for this entity"),
(None,
common.highlight_key("automatic", "a") +
@@ -84,14 +84,14 @@ footer = [
]
-class FlowViewHeader(common.WWrap):
+class FlowViewHeader(urwid.WidgetWrap):
def __init__(self, master, f):
self.master, self.flow = master, f
- self.w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost)
+ self._w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost)
def refresh_flow(self, f):
if f == self.flow:
- self.w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost)
+ self._w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost)
class CallbackCache:
@@ -106,7 +106,7 @@ class CallbackCache:
cache = CallbackCache()
-class FlowView(common.WWrap):
+class FlowView(urwid.WidgetWrap):
REQ = 0
RESP = 1
@@ -331,7 +331,7 @@ class FlowView(common.WWrap):
merged = self.conn_text_merge(headers, msg, body)
list_box = urwid.ListBox(merged)
list_box.set_focus(focus_position + 2)
- self.w = self.wrap_body(const, list_box)
+ self._w = self.wrap_body(const, list_box)
self.master.statusbar.redraw()
self.last_displayed_body = list_box
@@ -455,7 +455,7 @@ class FlowView(common.WWrap):
def view_request(self):
self.state.view_flow_mode = common.VIEW_FLOW_REQUEST
body = self.conn_text(self.flow.request)
- self.w = self.wrap_body(common.VIEW_FLOW_REQUEST, body)
+ self._w = self.wrap_body(common.VIEW_FLOW_REQUEST, body)
self.master.statusbar.redraw()
def view_response(self):
@@ -475,7 +475,7 @@ class FlowView(common.WWrap):
)
]
)
- self.w = self.wrap_body(common.VIEW_FLOW_RESPONSE, body)
+ self._w = self.wrap_body(common.VIEW_FLOW_RESPONSE, body)
self.master.statusbar.redraw()
def refresh_flow(self, c=None):
@@ -656,7 +656,7 @@ class FlowView(common.WWrap):
self.view_request()
elif key in ("up", "down", "page up", "page down"):
# Why doesn't this just work??
- self.w.keypress(size, key)
+ self._w.keypress(size, key)
elif key == "a":
self.flow.accept_intercept(self.master)
self.master.view_flow(self.flow)
diff --git a/libmproxy/console/grideditor.py b/libmproxy/console/grideditor.py
index 438d0ad7..fe3df509 100644
--- a/libmproxy/console/grideditor.py
+++ b/libmproxy/console/grideditor.py
@@ -1,6 +1,10 @@
from __future__ import absolute_import
-import copy, re, os
+
+import copy
+import re
+import os
import urwid
+
from . import common
from .. import utils, filt, script
from netlib import http_uastrings
@@ -15,7 +19,7 @@ footer_editing = [
]
-class SText(common.WWrap):
+class SText(urwid.WidgetWrap):
def __init__(self, txt, focused, error):
txt = txt.encode("string-escape")
w = urwid.Text(txt, wrap="any")
@@ -26,10 +30,10 @@ class SText(common.WWrap):
w = urwid.AttrWrap(w, "focusfield")
elif error:
w = urwid.AttrWrap(w, "field_error")
- common.WWrap.__init__(self, w)
+ urwid.WidgetWrap.__init__(self, w)
def get_text(self):
- return self.w.get_text()[0]
+ return self._w.get_text()[0]
def keypress(self, size, key):
return key
@@ -38,21 +42,21 @@ class SText(common.WWrap):
return True
-class SEdit(common.WWrap):
+class SEdit(urwid.WidgetWrap):
def __init__(self, txt):
txt = txt.encode("string-escape")
w = urwid.Edit(edit_text=txt, wrap="any", multiline=True)
w = urwid.AttrWrap(w, "editfield")
- common.WWrap.__init__(self, w)
+ urwid.WidgetWrap.__init__(self, w)
def get_text(self):
- return self.w.get_text()[0]
+ return self._w.get_text()[0]
def selectable(self):
return True
-class GridRow(common.WWrap):
+class GridRow(urwid.WidgetWrap):
def __init__(self, focused, editing, editor, values):
self.focused, self.editing, self.editor = focused, editing, editor
@@ -76,14 +80,14 @@ class GridRow(common.WWrap):
)
if focused is not None:
w.set_focus_column(focused)
- common.WWrap.__init__(self, w)
+ urwid.WidgetWrap.__init__(self, w)
def get_edit_value(self):
return self.editing.get_text()
def keypress(self, s, k):
if self.editing:
- w = self.w.column_widths(s)[self.focused]
+ w = self._w.column_widths(s)[self.focused]
k = self.editing.keypress((w,), k)
return k
@@ -121,7 +125,9 @@ class GridWalker(urwid.ListWalker):
try:
val = val.decode("string-escape")
except ValueError:
- self.editor.master.statusbar.message("Invalid Python-style string encoding.", 1000)
+ self.editor.master.statusbar.message(
+ "Invalid Python-style string encoding.", 1000
+ )
return
errors = self.lst[self.focus][1]
emsg = self.editor.is_error(self.focus_col, val)
@@ -155,7 +161,9 @@ class GridWalker(urwid.ListWalker):
def start_edit(self):
if self.lst:
- self.editing = GridRow(self.focus_col, True, self.editor, self.lst[self.focus])
+ self.editing = GridRow(
+ self.focus_col, True, self.editor, self.lst[self.focus]
+ )
self.editor.master.statusbar.update(footer_editing)
self._modified()
@@ -187,7 +195,12 @@ class GridWalker(urwid.ListWalker):
if self.editing:
return self.editing, self.focus
elif self.lst:
- return GridRow(self.focus_col, False, self.editor, self.lst[self.focus]), self.focus
+ return GridRow(
+ self.focus_col,
+ False,
+ self.editor,
+ self.lst[self.focus]
+ ), self.focus
else:
return None, None
@@ -213,10 +226,13 @@ class GridListBox(urwid.ListBox):
FIRST_WIDTH_MAX = 40
FIRST_WIDTH_MIN = 20
-class GridEditor(common.WWrap):
+
+
+class GridEditor(urwid.WidgetWrap):
title = None
columns = None
headings = None
+
def __init__(self, master, value, callback, *cb_args, **cb_kwargs):
value = copy.deepcopy(value)
self.master, self.value, self.callback = master, value, callback
@@ -248,7 +264,7 @@ class GridEditor(common.WWrap):
self.walker = GridWalker(self.value, self)
self.lb = GridListBox(self.walker)
- self.w = urwid.Frame(
+ self._w = urwid.Frame(
self.lb,
header = urwid.Pile([title, h])
)
@@ -257,9 +273,9 @@ class GridEditor(common.WWrap):
def show_empty_msg(self):
if self.walker.lst:
- self.w.set_footer(None)
+ self._w.set_footer(None)
else:
- self.w.set_footer(
+ self._w.set_footer(
urwid.Text(
[
("highlight", "No values. Press "),
@@ -297,7 +313,7 @@ class GridEditor(common.WWrap):
if self.walker.focus == pf and self.walker.focus_col != pfc:
self.walker.start_edit()
else:
- self.w.keypress(size, key)
+ self._w.keypress(size, key)
return None
key = common.shortcuts(key)
@@ -325,7 +341,9 @@ class GridEditor(common.WWrap):
self.master.path_prompt("Read file: ", "", self.read_file)
elif key == "R":
if self.walker.get_current_value() is not None:
- self.master.path_prompt("Read unescaped file: ", "", self.read_file, True)
+ self.master.path_prompt(
+ "Read unescaped file: ", "", self.read_file, True
+ )
elif key == "e":
o = self.walker.get_current_value()
if o is not None:
@@ -336,7 +354,7 @@ class GridEditor(common.WWrap):
elif key in ["enter"]:
self.walker.start_edit()
elif not self.handle_key(key):
- return self.w.keypress(size, key)
+ return self._w.keypress(size, key)
def is_error(self, col, val):
"""
@@ -362,7 +380,9 @@ class GridEditor(common.WWrap):
("tab", "next field"),
("enter", "edit field"),
]
- text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
+ text.extend(
+ common.format_keyvals(keys, key="key", val="text", indent=4)
+ )
text.append(
urwid.Text(
[
@@ -384,6 +404,7 @@ class HeaderEditor(GridEditor):
title = "Editing headers"
columns = 2
headings = ("Key", "Value")
+
def make_help(self):
h = GridEditor.make_help(self)
text = []
@@ -391,7 +412,9 @@ class HeaderEditor(GridEditor):
keys = [
("U", "add User-Agent header"),
]
- text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
+ text.extend(
+ common.format_keyvals(keys, key="key", val="text", indent=4)
+ )
text.append(urwid.Text([("text", "\n")]))
text.extend(h)
return text
@@ -426,6 +449,7 @@ class ReplaceEditor(GridEditor):
title = "Editing replacement patterns"
columns = 3
headings = ("Filter", "Regex", "Replacement")
+
def is_error(self, col, val):
if col == 0:
if not filt.parse(val):
@@ -442,6 +466,7 @@ class SetHeadersEditor(GridEditor):
title = "Editing header set patterns"
columns = 3
headings = ("Filter", "Header", "Value")
+
def is_error(self, col, val):
if col == 0:
if not filt.parse(val):
@@ -455,7 +480,9 @@ class SetHeadersEditor(GridEditor):
keys = [
("U", "add User-Agent header"),
]
- text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
+ text.extend(
+ common.format_keyvals(keys, key="key", val="text", indent=4)
+ )
text.append(urwid.Text([("text", "\n")]))
text.extend(h)
return text
@@ -491,6 +518,7 @@ class ScriptEditor(GridEditor):
title = "Editing scripts"
columns = 1
headings = ("Command",)
+
def is_error(self, col, val):
try:
script.Script.parse_command(val)
@@ -507,4 +535,4 @@ class HostPatternEditor(GridEditor):
try:
re.compile(val, re.IGNORECASE)
except re.error as e:
- return "Invalid regex: %s" % str(e) \ No newline at end of file
+ return "Invalid regex: %s" % str(e)
diff --git a/libmproxy/console/help.py b/libmproxy/console/help.py
index 27288a36..fddab537 100644
--- a/libmproxy/console/help.py
+++ b/libmproxy/console/help.py
@@ -1,5 +1,7 @@
from __future__ import absolute_import
+
import urwid
+
from . import common
from .. import filt, version
@@ -8,6 +10,7 @@ footer = [
('heading_key', "q"), ":back ",
]
+
class HelpView(urwid.ListBox):
def __init__(self, master, help_context, state):
self.master, self.state = master, state
@@ -122,7 +125,9 @@ class HelpView(urwid.ListBox):
("T", "set tcp proxying pattern"),
("u", "set sticky auth expression"),
]
- text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
+ text.extend(
+ common.format_keyvals(keys, key="key", val="text", indent=4)
+ )
text.append(urwid.Text([("head", "\n\nFilter expressions:\n")]))
f = []
@@ -167,7 +172,9 @@ class HelpView(urwid.ListBox):
("~q ~b test", "Requests where body contains \"test\""),
("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."),
]
- text.extend(common.format_keyvals(examples, key="key", val="text", indent=4))
+ text.extend(
+ common.format_keyvals(examples, key="key", val="text", indent=4)
+ )
return text
def keypress(self, size, key):