From 9c31669211dd0b81e3f7f5325a3564e827a8d6d0 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Fri, 11 Sep 2015 19:03:50 +0200 Subject: mitmdump: colorize output, add content views --- libmproxy/dump.py | 207 ++++++++++++++++++++++++++++++++++++------------------ setup.py | 8 ++- test/test_dump.py | 58 ++++++++++++--- 3 files changed, 192 insertions(+), 81 deletions(-) diff --git a/libmproxy/dump.py b/libmproxy/dump.py index 17b47dd2..dd44dc69 100644 --- a/libmproxy/dump.py +++ b/libmproxy/dump.py @@ -1,14 +1,16 @@ from __future__ import absolute_import, print_function -import json import sys import os +import traceback -from netlib.http.semantics import CONTENT_MISSING -import netlib.utils +import click -from . import flow, filt, utils -from .protocol import http +from netlib.http.semantics import CONTENT_MISSING +import netlib.utils +from . import flow, filt, contentview +from .exceptions import ContentViewException +from .models import HTTPRequest class DumpError(Exception): pass @@ -55,24 +57,6 @@ class Options(object): setattr(self, i, None) -def str_response(resp): - r = "%s %s" % (resp.code, resp.msg) - if resp.is_replay: - r = "[replay] " + r - return r - - -def str_request(f, showhost): - if f.client_conn: - c = f.client_conn.address.host - else: - c = "[replay]" - r = "%s %s %s" % (c, f.request.method, f.request.pretty_url(showhost)) - if f.request.stickycookie: - r = "[stickycookie] " + r - return r - - class DumpMaster(flow.FlowMaster): def __init__(self, server, options, outfile=sys.stdout): flow.FlowMaster.__init__(self, server, flow.State()) @@ -163,73 +147,162 @@ class DumpMaster(flow.FlowMaster): def add_event(self, e, level="info"): needed = dict(error=0, info=1, debug=2).get(level, 1) if self.o.verbosity >= needed: - print(e, file=self.outfile) - self.outfile.flush() + self.echo( + e, + fg="red" if level == "error" else None, + dim=(level == "debug") + ) @staticmethod - def indent(n, t): - l = str(t).strip().splitlines() + def indent(n, text): + l = str(text).strip().splitlines() pad = " " * n return "\n".join(pad + i for i in l) - def _print_message(self, message): + def echo(self, text, indent=None, **style): + if indent: + text = self.indent(indent, text) + click.secho(text, file=self.outfile, **style) + + def _echo_message(self, message): if self.o.flow_detail >= 2: - print(self.indent(4, str(message.headers)), file=self.outfile) + headers = "\r\n".join( + "{}: {}".format( + click.style(k, fg="blue", bold=True), + click.style(v, fg="blue")) + for k, v in message.headers.fields + ) + self.echo(headers, indent=4) if self.o.flow_detail >= 3: if message.content == CONTENT_MISSING: - print(self.indent(4, "(content missing)"), file=self.outfile) + self.echo("(content missing)", indent=4) elif message.content: - print("", file=self.outfile) - content = message.get_decoded_content() - if not utils.isBin(content): - try: - jsn = json.loads(content) - print( - self.indent( - 4, - json.dumps( - jsn, - indent=2)), - file=self.outfile) - except ValueError: - print(self.indent(4, content), file=self.outfile) - else: - d = netlib.utils.hexdump(content) - d = "\n".join("%s\t%s %s" % i for i in d) - print(self.indent(4, d), file=self.outfile) + self.echo("") + cutoff = sys.maxsize if self.o.flow_detail >= 4 else contentview.VIEW_CUTOFF + try: + type, lines = contentview.get_content_view( + contentview.get("Auto"), + message.headers, + message.body, + cutoff, + isinstance(message, HTTPRequest) + ) + except ContentViewException: + s = "Content viewer failed: \n" + traceback.format_exc() + self.add_event(s, "debug") + type, lines = contentview.get_content_view( + contentview.get("Raw"), + message.headers, + message.body, + cutoff, + isinstance(message, HTTPRequest) + ) + + styles = dict( + highlight=dict(bold=True), + offset=dict(fg="blue"), + header=dict(fg="green", bold=True), + text=dict(fg="green") + ) + + def colorful(line): + yield " " # we can already indent here + for (style, text) in line: + yield click.style(text, **styles.get(style, {})) + + content = "\r\n".join( + "".join(colorful(line)) for line in lines + ) + self.echo(content) + if self.o.flow_detail >= 2: - print("", file=self.outfile) + self.echo("") - def _process_flow(self, f): - self.state.delete_flow(f) - if self.filt and not f.match(self.filt): - return + def _echo_request_line(self, flow): + if flow.request.stickycookie: + stickycookie = click.style("[stickycookie] ", fg="yellow", bold=True) + else: + stickycookie = "" + + if flow.client_conn: + client = click.style(flow.client_conn.address.host, bold=True) + else: + client = click.style("[replay]", fg="yellow", bold=True) + + method = flow.request.method + method_color=dict( + GET="green", + DELETE="red" + ).get(method.upper(), "magenta") + method = click.style(method, fg=method_color, bold=True) + url = click.style(flow.request.pretty_url(self.showhost), bold=True) + + line = "{stickycookie}{client} {method} {url}".format( + stickycookie=stickycookie, + client=client, + method=method, + url=url + ) + self.echo(line) + + def _echo_response_line(self, flow): + if flow.response.is_replay: + replay = click.style("[replay] ", fg="yellow", bold=True) + else: + replay = "" + + code = flow.response.status_code + code_color = None + if 200 <= code < 300: + code_color = "green" + elif 300 <= code < 400: + code_color = "magenta" + elif 400 <= code < 600: + code_color = "red" + code = click.style(str(code), fg=code_color, bold=True, blink=(code == 418)) + msg = click.style(flow.response.msg, fg=code_color, bold=True) + + if flow.response.content == CONTENT_MISSING: + size = "(content missing)" + else: + size = netlib.utils.pretty_size(len(flow.response.content)) + size = click.style(size, bold=True) + + arrows = click.style("<<", bold=True) + line = "{replay} {arrows} {code} {msg} {size}".format( + replay=replay, + arrows=arrows, + code=code, + msg=msg, + size=size + ) + self.echo(line) + + def echo_flow(self, f): if self.o.flow_detail == 0: return if f.request: - print(str_request(f, self.showhost), file=self.outfile) - self._print_message(f.request) + self._echo_request_line(f) + self._echo_message(f.request) if f.response: - if f.response.content == CONTENT_MISSING: - sz = "(content missing)" - else: - sz = netlib.utils.pretty_size(len(f.response.content)) - print( - " << %s %s" % - (str_response( - f.response), - sz), - file=self.outfile) - self._print_message(f.response) + self._echo_response_line(f) + self._echo_message(f.response) if f.error: - print(" << {}".format(f.error.msg), file=self.outfile) + self.echo(" << {}".format(f.error.msg), bold=True, fg="red") self.outfile.flush() + def _process_flow(self, f): + self.state.delete_flow(f) + if self.filt and not f.match(self.filt): + return + + self.echo_flow(f) + def handle_request(self, f): flow.FlowMaster.handle_request(self, f) if f: diff --git a/setup.py b/setup.py index 896d0248..50d4604f 100644 --- a/setup.py +++ b/setup.py @@ -23,15 +23,17 @@ deps = { "html2text>=2015.4.14", "construct>=2.5.2", "six>=1.9.0", + "lxml>=3.3.6", + "Pillow>=2.3.0", } # A script -> additional dependencies dict. scripts = { "mitmproxy": { "urwid>=1.3", - "lxml>=3.3.6", - "Pillow>=2.3.0", }, - "mitmdump": set(), + "mitmdump": { + "click>=5.1", + }, "mitmweb": set() } # Developer dependencies diff --git a/test/test_dump.py b/test/test_dump.py index c76f555f..0fc4cd4d 100644 --- a/test/test_dump.py +++ b/test/test_dump.py @@ -1,5 +1,7 @@ import os from cStringIO import StringIO +from libmproxy.contentview import ViewAuto +from libmproxy.exceptions import ContentViewException from libmproxy.models import HTTPResponse import netlib.tutils @@ -12,17 +14,51 @@ import mock def test_strfuncs(): - t = HTTPResponse.wrap(netlib.tutils.tresp()) - t.is_replay = True - dump.str_response(t) - - f = tutils.tflow() - f.client_conn = None - f.request.stickycookie = True - assert "stickycookie" in dump.str_request(f, False) - assert "stickycookie" in dump.str_request(f, True) - assert "replay" in dump.str_request(f, False) - assert "replay" in dump.str_request(f, True) + o = dump.Options() + m = dump.DumpMaster(None, o) + + m.outfile = StringIO() + m.o.flow_detail = 0 + m.echo_flow(tutils.tflow()) + assert not m.outfile.getvalue() + + m.o.flow_detail = 4 + m.echo_flow(tutils.tflow()) + assert m.outfile.getvalue() + + m.outfile = StringIO() + m.echo_flow(tutils.tflow(resp=True)) + assert "<<" in m.outfile.getvalue() + + m.outfile = StringIO() + m.echo_flow(tutils.tflow(err=True)) + assert "<<" in m.outfile.getvalue() + + flow = tutils.tflow() + flow.request = netlib.tutils.treq() + flow.request.stickycookie = True + flow.client_conn = mock.MagicMock() + flow.client_conn.address.host = "foo" + flow.response = netlib.tutils.tresp(content=CONTENT_MISSING) + flow.response.is_replay = True + flow.response.code = 300 + m.echo_flow(flow) + + + flow = tutils.tflow(resp=netlib.tutils.tresp("{")) + flow.response.headers["content-type"] = "application/json" + flow.response.code = 400 + m.echo_flow(flow) + + +@mock.patch("libmproxy.contentview.get_content_view") +def test_contentview(get_content_view): + get_content_view.side_effect = ContentViewException(""), ("x", []) + + o = dump.Options(flow_detail=4, verbosity=3) + m = dump.DumpMaster(None, o, StringIO()) + m.echo_flow(tutils.tflow()) + assert "Content viewer failed" in m.outfile.getvalue() class TestDumpMaster: -- cgit v1.2.3 From 049d253a83e18116340670cb86528b4ac1d3b215 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Sat, 12 Sep 2015 13:49:16 +0200 Subject: simplify contentview api --- libmproxy/console/__init__.py | 6 +- libmproxy/console/flowview.py | 58 +++-- libmproxy/console/options.py | 6 +- libmproxy/contentview.py | 549 ----------------------------------------- libmproxy/contentviews.py | 559 ++++++++++++++++++++++++++++++++++++++++++ libmproxy/dump.py | 44 ++-- test/test_contentview.py | 8 +- test/test_dump.py | 3 +- 8 files changed, 628 insertions(+), 605 deletions(-) delete mode 100644 libmproxy/contentview.py create mode 100644 libmproxy/contentviews.py diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py index b75fa5d8..3bc0c091 100644 --- a/libmproxy/console/__init__.py +++ b/libmproxy/console/__init__.py @@ -14,7 +14,7 @@ import traceback import urwid import weakref -from .. import controller, flow, script, contentview +from .. import controller, flow, script, contentviews from . import flowlist, flowview, help, window, signals, options from . import grideditor, palettes, statusbar, palettepicker @@ -26,7 +26,7 @@ class ConsoleState(flow.State): flow.State.__init__(self) self.focus = None self.follow_focus = None - self.default_body_view = contentview.get("Auto") + self.default_body_view = contentviews.get("Auto") self.flowsettings = weakref.WeakKeyDictionary() self.last_search = None @@ -648,7 +648,7 @@ class ConsoleMaster(flow.FlowMaster): return self.state.set_intercept(txt) def change_default_display_mode(self, t): - v = contentview.get_by_shortcut(t) + v = contentviews.get_by_shortcut(t) self.state.default_body_view = v self.refresh_focus() diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py index 192b1e5b..3e13fab4 100644 --- a/libmproxy/console/flowview.py +++ b/libmproxy/console/flowview.py @@ -1,15 +1,15 @@ from __future__ import absolute_import import os -import sys import traceback +import sys + import urwid from netlib import odict from netlib.http.semantics import CONTENT_MISSING, Headers - from . import common, grideditor, signals, searchable, tabs from . import flowdetailview -from .. import utils, controller, contentview +from .. import utils, controller, contentviews from ..models import HTTPRequest, HTTPResponse, decoded from ..exceptions import ContentViewException @@ -167,10 +167,10 @@ class FlowView(tabs.Tabs): if flow == self.flow: self.show() - def content_view(self, viewmode, conn): - if conn.content == CONTENT_MISSING: + def content_view(self, viewmode, message): + if message.body == CONTENT_MISSING: msg, body = "", [urwid.Text([("error", "[content missing]")])] - return (msg, body) + return msg, body else: full = self.state.get_flow_setting( self.flow, @@ -180,29 +180,43 @@ class FlowView(tabs.Tabs): if full: limit = sys.maxsize else: - limit = contentview.VIEW_CUTOFF + limit = contentviews.VIEW_CUTOFF return cache.get( self._get_content_view, viewmode, - conn.headers, - conn.content, - limit, - isinstance(conn, HTTPRequest) + message, + limit ) - def _get_content_view(self, viewmode, headers, content, limit, is_request): + def _get_content_view(self, viewmode, message, max_lines): + try: - description, lines = contentview.get_content_view( - viewmode, headers, content, limit, is_request + description, lines = contentviews.get_content_view( + viewmode, message.body, headers=message.headers ) except ContentViewException: s = "Content viewer failed: \n" + traceback.format_exc() signals.add_event(s, "error") - description, lines = contentview.get_content_view( - contentview.get("Raw"), headers, content, limit, is_request + description, lines = contentviews.get_content_view( + contentviews.get("Raw"), message.body, headers=message.headers ) description = description.replace("Raw", "Couldn't parse: falling back to Raw") - text_objects = [urwid.Text(l) for l in lines] + + # Give hint that you have to tab for the response. + if description == "No content" and isinstance(message, HTTPRequest): + description = "No request content (press tab to view response)" + + text_objects = [] + for line in lines: + text_objects.append(urwid.Text(line)) + if len(text_objects) == max_lines: + text_objects.append(urwid.Text([ + ("highlight", "Stopped displaying data after %d lines. Press " % max_lines), + ("key", "f"), + ("highlight", " to load all data.") + ])) + break + return description, text_objects def viewmode_get(self): @@ -227,9 +241,7 @@ class FlowView(tabs.Tabs): [ ("heading", msg), ] - ) - ] - cols.append( + ), urwid.Text( [ " ", @@ -239,7 +251,7 @@ class FlowView(tabs.Tabs): ], align="right" ) - ) + ] title = urwid.AttrWrap(urwid.Columns(cols), "heading") txt.append(title) @@ -471,7 +483,7 @@ class FlowView(tabs.Tabs): self.state.add_flow_setting( self.flow, (self.tab_offset, "prettyview"), - contentview.get_by_shortcut(t) + contentviews.get_by_shortcut(t) ) signals.flow_change.send(self, flow = self.flow) @@ -611,7 +623,7 @@ class FlowView(tabs.Tabs): scope = "s" common.ask_copy_part(scope, self.flow, self.master, self.state) elif key == "m": - p = list(contentview.view_prompts) + p = list(contentviews.view_prompts) p.insert(0, ("Clear", "C")) signals.status_prompt_onekey.send( self, diff --git a/libmproxy/console/options.py b/libmproxy/console/options.py index 0948e96d..a365a78c 100644 --- a/libmproxy/console/options.py +++ b/libmproxy/console/options.py @@ -1,6 +1,6 @@ import urwid -from .. import contentview +from .. import contentviews from . import common, signals, grideditor from . import select, palettes @@ -158,7 +158,7 @@ class Options(urwid.WidgetWrap): self.master.scripts = [] self.master.set_stickyauth(None) self.master.set_stickycookie(None) - self.master.state.default_body_view = contentview.get("Auto") + self.master.state.default_body_view = contentviews.get("Auto") signals.update_settings.send(self) signals.status_message.send( @@ -233,7 +233,7 @@ class Options(urwid.WidgetWrap): def default_displaymode(self): signals.status_prompt_onekey.send( prompt = "Global default display mode", - keys = contentview.view_prompts, + keys = contentviews.view_prompts, callback = self.master.change_default_display_mode ) diff --git a/libmproxy/contentview.py b/libmproxy/contentview.py deleted file mode 100644 index 219adfb7..00000000 --- a/libmproxy/contentview.py +++ /dev/null @@ -1,549 +0,0 @@ -from __future__ import absolute_import -import cStringIO -import json -import logging -import subprocess -import sys - -import lxml.html -import lxml.etree -from PIL import Image - -from PIL.ExifTags import TAGS -import html2text -import six - -from netlib.odict import ODict -from netlib import encoding -import netlib.utils -from . import utils -from .exceptions import ContentViewException -from .contrib import jsbeautifier -from .contrib.wbxml.ASCommandResponse import ASCommandResponse - -try: - import pyamf - from pyamf import remoting, flex -except ImportError: # pragma nocover - pyamf = None - -try: - import cssutils -except ImportError: # pragma nocover - cssutils = None -else: - cssutils.log.setLevel(logging.CRITICAL) - - cssutils.ser.prefs.keepComments = True - cssutils.ser.prefs.omitLastSemicolon = False - cssutils.ser.prefs.indentClosingBrace = False - cssutils.ser.prefs.validOnly = False - -VIEW_CUTOFF = 1024 * 50 -KEY_MAX = 30 - - -def format_dict(d): - """ - Transforms the given dictionary into a list of - ("key", key ) - ("value", value) - tuples, where key is padded to a uniform width. - """ - max_key_len = max(len(k) for k in d.keys()) - max_key_len = min(max_key_len, KEY_MAX) - for key, value in d.items(): - key += ":" - key = key.ljust(max_key_len + 2) - yield [ - ("header", key), - ("text", value) - ] - - -def format_text(content, limit): - """ - Transforms the given content into - """ - content = netlib.utils.cleanBin(content) - - for line in content[:limit].splitlines(): - yield [("text", line)] - - for msg in trailer(content, limit): - yield msg - - -def trailer(content, limit): - bytes_removed = len(content) - limit - if bytes_removed > 0: - yield [ - ("cutoff", "... {} of data not shown.".format(netlib.utils.pretty_size(bytes_removed))) - ] - - -class View(object): - name = None - prompt = () - content_types = [] - - def __call__(self, hdrs, content, limit): - """ - Returns: - A (description, content generator) tuple. - - The content generator yields lists of (style, text) tuples. - Iit must not yield tuples of tuples, because urwid cannot process that. - """ - raise NotImplementedError() - - -class ViewAuto(View): - name = "Auto" - prompt = ("auto", "a") - content_types = [] - - def __call__(self, hdrs, content, limit): - ctype = hdrs.get("content-type") - if ctype: - ct = netlib.utils.parse_content_type(ctype) if ctype else None - ct = "%s/%s" % (ct[0], ct[1]) - if ct in content_types_map: - return content_types_map[ct][0](hdrs, content, limit) - elif utils.isXML(content): - return get("XML")(hdrs, content, limit) - return get("Raw")(hdrs, content, limit) - - -class ViewRaw(View): - name = "Raw" - prompt = ("raw", "r") - content_types = [] - - def __call__(self, hdrs, content, limit): - return "Raw", format_text(content, limit) - - -class ViewHex(View): - name = "Hex" - prompt = ("hex", "e") - content_types = [] - - @staticmethod - def _format(content, limit): - for offset, hexa, s in netlib.utils.hexdump(content[:limit]): - yield [ - ("offset", offset + " "), - ("text", hexa + " "), - ("text", s) - ] - for msg in trailer(content, limit): - yield msg - - def __call__(self, hdrs, content, limit): - return "Hex", self._format(content, limit) - - -class ViewXML(View): - name = "XML" - prompt = ("xml", "x") - content_types = ["text/xml"] - - def __call__(self, hdrs, content, limit): - parser = lxml.etree.XMLParser( - remove_blank_text=True, - resolve_entities=False, - strip_cdata=False, - recover=False - ) - try: - document = lxml.etree.fromstring(content, parser) - except lxml.etree.XMLSyntaxError: - return None - docinfo = document.getroottree().docinfo - - prev = [] - p = document.getroottree().getroot().getprevious() - while p is not None: - prev.insert( - 0, - lxml.etree.tostring(p) - ) - p = p.getprevious() - doctype = docinfo.doctype - if prev: - doctype += "\n".join(prev).strip() - doctype = doctype.strip() - - s = lxml.etree.tostring( - document, - pretty_print=True, - xml_declaration=True, - doctype=doctype or None, - encoding=docinfo.encoding - ) - - return "XML-like data", format_text(s, limit) - - -class ViewJSON(View): - name = "JSON" - prompt = ("json", "s") - content_types = ["application/json"] - - def __call__(self, hdrs, content, limit): - pretty_json = utils.pretty_json(content) - if pretty_json: - return "JSON", format_text(pretty_json, limit) - - -class ViewHTML(View): - name = "HTML" - prompt = ("html", "h") - content_types = ["text/html"] - - def __call__(self, hdrs, content, limit): - if utils.isXML(content): - parser = lxml.etree.HTMLParser( - strip_cdata=True, - remove_blank_text=True - ) - d = lxml.html.fromstring(content, parser=parser) - docinfo = d.getroottree().docinfo - s = lxml.etree.tostring( - d, - pretty_print=True, - doctype=docinfo.doctype - ) - return "HTML", format_text(s, limit) - - -class ViewHTMLOutline(View): - name = "HTML Outline" - prompt = ("html outline", "o") - content_types = ["text/html"] - - def __call__(self, hdrs, content, limit): - content = content.decode("utf-8") - h = html2text.HTML2Text(baseurl="") - h.ignore_images = True - h.body_width = 0 - content = h.handle(content) - return "HTML Outline", format_text(content, limit) - - -class ViewURLEncoded(View): - name = "URL-encoded" - prompt = ("urlencoded", "u") - content_types = ["application/x-www-form-urlencoded"] - - def __call__(self, hdrs, content, limit): - d = netlib.utils.urldecode(content) - return "URLEncoded form", format_dict(ODict(d)) - - -class ViewMultipart(View): - name = "Multipart Form" - prompt = ("multipart", "m") - content_types = ["multipart/form-data"] - - @staticmethod - def _format(v): - yield [("highlight", "Form data:\n")] - for message in format_dict(ODict(v)): - yield message - - def __call__(self, hdrs, content, limit): - v = netlib.utils.multipartdecode(hdrs, content) - if v: - return "Multipart form", self._format(v) - - -if pyamf: - class DummyObject(dict): - def __init__(self, alias): - dict.__init__(self) - - def __readamf__(self, input): - data = input.readObject() - self["data"] = data - - - def pyamf_class_loader(s): - for i in pyamf.CLASS_LOADERS: - if i != pyamf_class_loader: - v = i(s) - if v: - return v - return DummyObject - - - pyamf.register_class_loader(pyamf_class_loader) - - - class ViewAMF(View): - name = "AMF" - prompt = ("amf", "f") - content_types = ["application/x-amf"] - - def unpack(self, b, seen=set([])): - if hasattr(b, "body"): - return self.unpack(b.body, seen) - if isinstance(b, DummyObject): - if id(b) in seen: - return "" - else: - seen.add(id(b)) - for k, v in b.items(): - b[k] = self.unpack(v, seen) - return b - elif isinstance(b, dict): - for k, v in b.items(): - b[k] = self.unpack(v, seen) - return b - elif isinstance(b, list): - return [self.unpack(i) for i in b] - elif isinstance(b, flex.ArrayCollection): - return [self.unpack(i, seen) for i in b] - else: - return b - - def _format(self, envelope, limit): - for target, message in iter(envelope): - if isinstance(message, pyamf.remoting.Request): - yield [ - ("header", "Request: "), - ("text", str(target)), - ] - else: - yield [ - ("header", "Response: "), - ("text", "%s, code %s" % (target, message.status)), - ] - - s = json.dumps(self.unpack(message), indent=4) - for msg in format_text(s, limit): - yield msg - - def __call__(self, hdrs, content, limit): - envelope = remoting.decode(content, strict=False) - if envelope: - return "AMF v%s" % envelope.amfVersion, self._format(envelope, limit) - - -class ViewJavaScript(View): - name = "JavaScript" - prompt = ("javascript", "j") - content_types = [ - "application/x-javascript", - "application/javascript", - "text/javascript" - ] - - def __call__(self, hdrs, content, limit): - opts = jsbeautifier.default_options() - opts.indent_size = 2 - res = jsbeautifier.beautify(content[:limit], opts) - cutoff = max(0, len(content) - limit) - return "JavaScript", format_text(res, limit - cutoff) - - -class ViewCSS(View): - name = "CSS" - prompt = ("css", "c") - content_types = [ - "text/css" - ] - - def __call__(self, hdrs, content, limit): - if cssutils: - sheet = cssutils.parseString(content) - beautified = sheet.cssText - else: - beautified = content - - return "CSS", format_text(beautified, limit) - - -class ViewImage(View): - name = "Image" - prompt = ("image", "i") - content_types = [ - "image/png", - "image/jpeg", - "image/gif", - "image/vnd.microsoft.icon", - "image/x-icon", - ] - - def __call__(self, hdrs, content, limit): - try: - img = Image.open(cStringIO.StringIO(content)) - except IOError: - return None - parts = [ - ("Format", str(img.format_description)), - ("Size", "%s x %s px" % img.size), - ("Mode", str(img.mode)), - ] - for i in sorted(img.info.keys()): - if i != "exif": - parts.append( - (str(i), str(img.info[i])) - ) - if hasattr(img, "_getexif"): - ex = img._getexif() - if ex: - for i in sorted(ex.keys()): - tag = TAGS.get(i, i) - parts.append( - (str(tag), str(ex[i])) - ) - clean = [] - for i in parts: - clean.append( - [netlib.utils.cleanBin(i[0]), netlib.utils.cleanBin(i[1])] - ) - fmt = format_dict(ODict(clean)) - return "%s image" % img.format, fmt - - -class ViewProtobuf(View): - """Human friendly view of protocol buffers - The view uses the protoc compiler to decode the binary - """ - - name = "Protocol Buffer" - prompt = ("protobuf", "p") - content_types = [ - "application/x-protobuf", - "application/x-protobuffer", - ] - - @staticmethod - def is_available(): - try: - p = subprocess.Popen( - ["protoc", "--version"], - stdout=subprocess.PIPE - ) - out, _ = p.communicate() - return out.startswith("libprotoc") - except: - return False - - def decode_protobuf(self, content): - # if Popen raises OSError, it will be caught in - # get_content_view and fall back to Raw - p = subprocess.Popen(['protoc', '--decode_raw'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - out, err = p.communicate(input=content) - if out: - return out - else: - return err - - def __call__(self, hdrs, content, limit): - decoded = self.decode_protobuf(content) - return "Protobuf", format_text(decoded, limit) - - -class ViewWBXML(View): - name = "WBXML" - prompt = ("wbxml", "w") - content_types = [ - "application/vnd.wap.wbxml", - "application/vnd.ms-sync.wbxml" - ] - - def __call__(self, hdrs, content, limit): - - try: - parser = ASCommandResponse(content) - parsedContent = parser.xmlString - if parsedContent: - return "WBXML", format_text(parsedContent, limit) - except: - return None - - -views = [ - ViewAuto(), - ViewRaw(), - ViewHex(), - ViewJSON(), - ViewXML(), - ViewWBXML(), - ViewHTML(), - ViewHTMLOutline(), - ViewJavaScript(), - ViewCSS(), - ViewURLEncoded(), - ViewMultipart(), - ViewImage(), -] -if pyamf: - views.append(ViewAMF()) - -if ViewProtobuf.is_available(): - views.append(ViewProtobuf()) - -content_types_map = {} -for i in views: - for ct in i.content_types: - l = content_types_map.setdefault(ct, []) - l.append(i) - -view_prompts = [i.prompt for i in views] - - -def get_by_shortcut(c): - for i in views: - if i.prompt[1] == c: - return i - - -def get(name): - for i in views: - if i.name == name: - return i - - -def get_content_view(viewmode, headers, content, limit, is_request): - """ - Returns: - A (description, content generator) tuple. - - Raises: - ContentViewException, if the content view threw an error. - """ - if not content: - if is_request: - return "No request content (press tab to view response)", [] - else: - return "No content", [] - msg = [] - - enc = headers.get("content-encoding") - if enc and enc != "identity": - decoded = encoding.decode(enc, content) - if decoded: - content = decoded - msg.append("[decoded %s]" % enc) - try: - ret = viewmode(headers, content, limit) - # Third-party viewers can fail in unexpected ways... - except Exception as e: - six.reraise( - ContentViewException, - ContentViewException(str(e)), - sys.exc_info()[2] - ) - if not ret: - ret = get("Raw")(headers, content, limit) - msg.append("Couldn't parse: falling back to Raw") - else: - msg.append(ret[0]) - return " ".join(msg), ret[1] diff --git a/libmproxy/contentviews.py b/libmproxy/contentviews.py new file mode 100644 index 00000000..a356b35d --- /dev/null +++ b/libmproxy/contentviews.py @@ -0,0 +1,559 @@ +""" +Mitmproxy Content Views +======================= + +mitmproxy includes a set of content views which can be used to format/decode/highlight data. +While they are currently used for HTTP message bodies only, the may be used in other contexts +in the future, e.g. to decode protobuf messages sent as WebSocket frames. + +Thus, the View API is very minimalistic. The only arguments are `data` and `**metadata`, +where `data` is the actual content (as bytes). The contents on metadata depend on the protocol in +use. For HTTP, the message headers are passed as the ``headers`` keyword argument. + +""" +from __future__ import (absolute_import, print_function, division) +import cStringIO +import json +import logging +import subprocess +import sys + +import lxml.html +import lxml.etree +from PIL import Image +from PIL.ExifTags import TAGS +import html2text +import six + +from netlib.odict import ODict +from netlib import encoding +import netlib.utils + +from . import utils +from .exceptions import ContentViewException +from .contrib import jsbeautifier +from .contrib.wbxml.ASCommandResponse import ASCommandResponse + +try: + import pyamf + from pyamf import remoting, flex +except ImportError: # pragma nocover + pyamf = None + +try: + import cssutils +except ImportError: # pragma nocover + cssutils = None +else: + cssutils.log.setLevel(logging.CRITICAL) + + cssutils.ser.prefs.keepComments = True + cssutils.ser.prefs.omitLastSemicolon = False + cssutils.ser.prefs.indentClosingBrace = False + cssutils.ser.prefs.validOnly = False + +# Default view cutoff *in lines* +VIEW_CUTOFF = 512 + +KEY_MAX = 30 + + +def format_dict(d): + """ + Helper function that transforms the given dictionary into a list of + ("key", key ) + ("value", value) + tuples, where key is padded to a uniform width. + """ + max_key_len = max(len(k) for k in d.keys()) + max_key_len = min(max_key_len, KEY_MAX) + for key, value in d.items(): + key += ":" + key = key.ljust(max_key_len + 2) + yield [ + ("header", key), + ("text", value) + ] + + +def format_text(text): + """ + Helper function that transforms bytes into the view output format. + """ + for line in text.splitlines(): + yield [("text", line)] + + +class View(object): + name = None + prompt = () + content_types = [] + + def __call__(self, data, **metadata): + """ + Transform raw data into human-readable output. + + Args: + data: the data to decode/format as bytes. + metadata: optional keyword-only arguments for metadata. Implementations must not + rely on a given argument being present. + + Returns: + A (description, content generator) tuple. + + The content generator yields lists of (style, text) tuples, where each list represents + a single line. ``text`` is a unfiltered byte string which may need to be escaped, + depending on the used output. + + Caveats: + The content generator must not yield tuples of tuples, + because urwid cannot process that. You have to yield a *list* of tuples per line. + """ + raise NotImplementedError() + + +class ViewAuto(View): + name = "Auto" + prompt = ("auto", "a") + content_types = [] + + def __call__(self, data, **metadata): + headers = metadata.get("headers", {}) + ctype = headers.get("content-type") + if ctype: + ct = netlib.utils.parse_content_type(ctype) if ctype else None + ct = "%s/%s" % (ct[0], ct[1]) + if ct in content_types_map: + return content_types_map[ct][0](data, **metadata) + elif utils.isXML(data): + return get("XML")(data, **metadata) + return get("Raw")(data) + + +class ViewRaw(View): + name = "Raw" + prompt = ("raw", "r") + content_types = [] + + def __call__(self, data, **metadata): + return "Raw", format_text(data) + + +class ViewHex(View): + name = "Hex" + prompt = ("hex", "e") + content_types = [] + + @staticmethod + def _format(data): + for offset, hexa, s in netlib.utils.hexdump(data): + yield [ + ("offset", offset + " "), + ("text", hexa + " "), + ("text", s) + ] + + def __call__(self, data, **metadata): + return "Hex", self._format(data) + + +class ViewXML(View): + name = "XML" + prompt = ("xml", "x") + content_types = ["text/xml"] + + def __call__(self, data, **metadata): + parser = lxml.etree.XMLParser( + remove_blank_text=True, + resolve_entities=False, + strip_cdata=False, + recover=False + ) + try: + document = lxml.etree.fromstring(data, parser) + except lxml.etree.XMLSyntaxError: + return None + docinfo = document.getroottree().docinfo + + prev = [] + p = document.getroottree().getroot().getprevious() + while p is not None: + prev.insert( + 0, + lxml.etree.tostring(p) + ) + p = p.getprevious() + doctype = docinfo.doctype + if prev: + doctype += "\n".join(prev).strip() + doctype = doctype.strip() + + s = lxml.etree.tostring( + document, + pretty_print=True, + xml_declaration=True, + doctype=doctype or None, + encoding=docinfo.encoding + ) + + return "XML-like data", format_text(s) + + +class ViewJSON(View): + name = "JSON" + prompt = ("json", "s") + content_types = ["application/json"] + + def __call__(self, data, **metadata): + pretty_json = utils.pretty_json(data) + if pretty_json: + return "JSON", format_text(pretty_json) + + +class ViewHTML(View): + name = "HTML" + prompt = ("html", "h") + content_types = ["text/html"] + + def __call__(self, data, **metadata): + if utils.isXML(data): + parser = lxml.etree.HTMLParser( + strip_cdata=True, + remove_blank_text=True + ) + d = lxml.html.fromstring(data, parser=parser) + docinfo = d.getroottree().docinfo + s = lxml.etree.tostring( + d, + pretty_print=True, + doctype=docinfo.doctype + ) + return "HTML", format_text(s) + + +class ViewHTMLOutline(View): + name = "HTML Outline" + prompt = ("html outline", "o") + content_types = ["text/html"] + + def __call__(self, data, **metadata): + data = data.decode("utf-8") + h = html2text.HTML2Text(baseurl="") + h.ignore_images = True + h.body_width = 0 + outline = h.handle(data) + return "HTML Outline", format_text(outline) + + +class ViewURLEncoded(View): + name = "URL-encoded" + prompt = ("urlencoded", "u") + content_types = ["application/x-www-form-urlencoded"] + + def __call__(self, data, **metadata): + d = netlib.utils.urldecode(data) + return "URLEncoded form", format_dict(ODict(d)) + + +class ViewMultipart(View): + name = "Multipart Form" + prompt = ("multipart", "m") + content_types = ["multipart/form-data"] + + @staticmethod + def _format(v): + yield [("highlight", "Form data:\n")] + for message in format_dict(ODict(v)): + yield message + + def __call__(self, data, **metadata): + headers = metadata.get("headers", {}) + v = netlib.utils.multipartdecode(headers, data) + if v: + return "Multipart form", self._format(v) + + +if pyamf: + class DummyObject(dict): + def __init__(self, alias): + dict.__init__(self) + + def __readamf__(self, input): + data = input.readObject() + self["data"] = data + + + def pyamf_class_loader(s): + for i in pyamf.CLASS_LOADERS: + if i != pyamf_class_loader: + v = i(s) + if v: + return v + return DummyObject + + + pyamf.register_class_loader(pyamf_class_loader) + + + class ViewAMF(View): + name = "AMF" + prompt = ("amf", "f") + content_types = ["application/x-amf"] + + def unpack(self, b, seen=set([])): + if hasattr(b, "body"): + return self.unpack(b.body, seen) + if isinstance(b, DummyObject): + if id(b) in seen: + return "" + else: + seen.add(id(b)) + for k, v in b.items(): + b[k] = self.unpack(v, seen) + return b + elif isinstance(b, dict): + for k, v in b.items(): + b[k] = self.unpack(v, seen) + return b + elif isinstance(b, list): + return [self.unpack(i) for i in b] + elif isinstance(b, flex.ArrayCollection): + return [self.unpack(i, seen) for i in b] + else: + return b + + def _format(self, envelope): + for target, message in iter(envelope): + if isinstance(message, pyamf.remoting.Request): + yield [ + ("header", "Request: "), + ("text", str(target)), + ] + else: + yield [ + ("header", "Response: "), + ("text", "%s, code %s" % (target, message.status)), + ] + + s = json.dumps(self.unpack(message), indent=4) + for msg in format_text(s): + yield msg + + def __call__(self, data, **metadata): + envelope = remoting.decode(data, strict=False) + if envelope: + return "AMF v%s" % envelope.amfVersion, self._format(envelope) + + +class ViewJavaScript(View): + name = "JavaScript" + prompt = ("javascript", "j") + content_types = [ + "application/x-javascript", + "application/javascript", + "text/javascript" + ] + + def __call__(self, data, **metadata): + opts = jsbeautifier.default_options() + opts.indent_size = 2 + res = jsbeautifier.beautify(data, opts) + return "JavaScript", format_text(res) + + +class ViewCSS(View): + name = "CSS" + prompt = ("css", "c") + content_types = [ + "text/css" + ] + + def __call__(self, data, **metadata): + if cssutils: + sheet = cssutils.parseString(data) + beautified = sheet.cssText + else: + beautified = data + + return "CSS", format_text(beautified) + + +class ViewImage(View): + name = "Image" + prompt = ("image", "i") + content_types = [ + "image/png", + "image/jpeg", + "image/gif", + "image/vnd.microsoft.icon", + "image/x-icon", + ] + + def __call__(self, data, **metadata): + try: + img = Image.open(cStringIO.StringIO(data)) + except IOError: + return None + parts = [ + ("Format", str(img.format_description)), + ("Size", "%s x %s px" % img.size), + ("Mode", str(img.mode)), + ] + for i in sorted(img.info.keys()): + if i != "exif": + parts.append( + (str(i), str(img.info[i])) + ) + if hasattr(img, "_getexif"): + ex = img._getexif() + if ex: + for i in sorted(ex.keys()): + tag = TAGS.get(i, i) + parts.append( + (str(tag), str(ex[i])) + ) + fmt = format_dict(ODict(parts)) + return "%s image" % img.format, fmt + + +class ViewProtobuf(View): + """Human friendly view of protocol buffers + The view uses the protoc compiler to decode the binary + """ + + name = "Protocol Buffer" + prompt = ("protobuf", "p") + content_types = [ + "application/x-protobuf", + "application/x-protobuffer", + ] + + @staticmethod + def is_available(): + try: + p = subprocess.Popen( + ["protoc", "--version"], + stdout=subprocess.PIPE + ) + out, _ = p.communicate() + return out.startswith("libprotoc") + except: + return False + + def decode_protobuf(self, content): + # if Popen raises OSError, it will be caught in + # get_content_view and fall back to Raw + p = subprocess.Popen(['protoc', '--decode_raw'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = p.communicate(input=content) + if out: + return out + else: + return err + + def __call__(self, data, **metadata): + decoded = self.decode_protobuf(data) + return "Protobuf", format_text(decoded) + + +class ViewWBXML(View): + name = "WBXML" + prompt = ("wbxml", "w") + content_types = [ + "application/vnd.wap.wbxml", + "application/vnd.ms-sync.wbxml" + ] + + def __call__(self, data, **metadata): + + try: + parser = ASCommandResponse(data) + parsedContent = parser.xmlString + if parsedContent: + return "WBXML", format_text(parsedContent) + except: + return None + + +views = [ + ViewAuto(), + ViewRaw(), + ViewHex(), + ViewJSON(), + ViewXML(), + ViewWBXML(), + ViewHTML(), + ViewHTMLOutline(), + ViewJavaScript(), + ViewCSS(), + ViewURLEncoded(), + ViewMultipart(), + ViewImage(), +] +if pyamf: + views.append(ViewAMF()) + +if ViewProtobuf.is_available(): + views.append(ViewProtobuf()) + +content_types_map = {} +for i in views: + for ct in i.content_types: + l = content_types_map.setdefault(ct, []) + l.append(i) + +view_prompts = [i.prompt for i in views] + + +def get_by_shortcut(c): + for i in views: + if i.prompt[1] == c: + return i + + +def get(name): + for i in views: + if i.name == name: + return i + + +def get_content_view(viewmode, data, **metadata): + """ + Args: + viewmode: the view to use. + data, **metadata: arguments passed to View instance. + + Returns: + A (description, content generator) tuple. + + Raises: + ContentViewException, if the content view threw an error. + """ + if not data: + return "No content", [] + msg = [] + + headers = metadata.get("headers", {}) + enc = headers.get("content-encoding") + if enc and enc != "identity": + decoded = encoding.decode(enc, data) + if decoded: + data = decoded + msg.append("[decoded %s]" % enc) + try: + ret = viewmode(data, **metadata) + # Third-party viewers can fail in unexpected ways... + except Exception as e: + six.reraise( + ContentViewException, + ContentViewException(str(e)), + sys.exc_info()[2] + ) + if not ret: + ret = get("Raw")(data, **metadata) + msg.append("Couldn't parse: falling back to Raw") + else: + msg.append(ret[0]) + return " ".join(msg), ret[1] diff --git a/libmproxy/dump.py b/libmproxy/dump.py index dd44dc69..d477e032 100644 --- a/libmproxy/dump.py +++ b/libmproxy/dump.py @@ -4,11 +4,11 @@ import os import traceback import click - +import itertools from netlib.http.semantics import CONTENT_MISSING import netlib.utils -from . import flow, filt, contentview +from . import flow, filt, contentviews from .exceptions import ContentViewException from .models import HTTPRequest @@ -57,6 +57,10 @@ class Options(object): setattr(self, i, None) +_contentview_auto = contentviews.get("Auto") +_contentview_raw = contentviews.get("Raw") + + class DumpMaster(flow.FlowMaster): def __init__(self, server, options, outfile=sys.stdout): flow.FlowMaster.__init__(self, server, flow.State()) @@ -174,28 +178,24 @@ class DumpMaster(flow.FlowMaster): ) self.echo(headers, indent=4) if self.o.flow_detail >= 3: - if message.content == CONTENT_MISSING: + if message.body == CONTENT_MISSING: self.echo("(content missing)", indent=4) - elif message.content: + elif message.body: self.echo("") - cutoff = sys.maxsize if self.o.flow_detail >= 4 else contentview.VIEW_CUTOFF + try: - type, lines = contentview.get_content_view( - contentview.get("Auto"), - message.headers, - message.body, - cutoff, - isinstance(message, HTTPRequest) + type, lines = contentviews.get_content_view( + _contentview_auto, + message.body, + headers=message.headers ) except ContentViewException: s = "Content viewer failed: \n" + traceback.format_exc() self.add_event(s, "debug") - type, lines = contentview.get_content_view( - contentview.get("Raw"), - message.headers, - message.body, - cutoff, - isinstance(message, HTTPRequest) + type, lines = contentviews.get_content_view( + _contentview_raw, + message.body, + headers=message.headers ) styles = dict( @@ -210,10 +210,18 @@ class DumpMaster(flow.FlowMaster): for (style, text) in line: yield click.style(text, **styles.get(style, {})) + if self.o.flow_detail == 3: + lines_to_echo = itertools.islice(lines, contentviews.VIEW_CUTOFF) + else: + lines_to_echo = lines + content = "\r\n".join( - "".join(colorful(line)) for line in lines + "".join(colorful(line)) for line in lines_to_echo ) + self.echo(content) + if next(lines, None): + self.echo("(cut off)", indent=4, dim=True) if self.o.flow_detail >= 2: self.echo("") diff --git a/test/test_contentview.py b/test/test_contentview.py index ec1b4930..2089b3ea 100644 --- a/test/test_contentview.py +++ b/test/test_contentview.py @@ -6,7 +6,7 @@ import sys import netlib.utils from netlib import encoding -import libmproxy.contentview as cv +import libmproxy.contentviews as cv import tutils try: @@ -21,12 +21,6 @@ except: class TestContentView: - def test_trailer(self): - txt = "X"*10 - lines = cv.trailer(txt, 1000) - assert not list(lines) - lines = cv.trailer(txt, 5) - assert list(lines) def test_view_auto(self): v = cv.ViewAuto() diff --git a/test/test_dump.py b/test/test_dump.py index 0fc4cd4d..88f1a6fd 100644 --- a/test/test_dump.py +++ b/test/test_dump.py @@ -1,6 +1,5 @@ import os from cStringIO import StringIO -from libmproxy.contentview import ViewAuto from libmproxy.exceptions import ContentViewException from libmproxy.models import HTTPResponse @@ -51,7 +50,7 @@ def test_strfuncs(): m.echo_flow(flow) -@mock.patch("libmproxy.contentview.get_content_view") +@mock.patch("libmproxy.contentviews.get_content_view") def test_contentview(get_content_view): get_content_view.side_effect = ContentViewException(""), ("x", []) -- cgit v1.2.3 From eb2264e91a7fef4170eade4bc6af9c0c4fe9694a Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Sat, 12 Sep 2015 17:10:38 +0200 Subject: improve display of non-ascii contents fixes #283 --- libmproxy/contentviews.py | 30 ++++++++++++++++++++++++------ libmproxy/dump.py | 29 ++++++++++++++--------------- libmproxy/protocol/http.py | 17 ++++++++++++----- libmproxy/protocol/rawtcp.py | 4 ++-- libmproxy/utils.py | 13 +++++++------ 5 files changed, 59 insertions(+), 34 deletions(-) diff --git a/libmproxy/contentviews.py b/libmproxy/contentviews.py index a356b35d..9af08033 100644 --- a/libmproxy/contentviews.py +++ b/libmproxy/contentviews.py @@ -27,7 +27,7 @@ import six from netlib.odict import ODict from netlib import encoding -import netlib.utils +from netlib.utils import clean_bin, hexdump, urldecode, multipartdecode, parse_content_type from . import utils from .exceptions import ContentViewException @@ -121,12 +121,14 @@ class ViewAuto(View): headers = metadata.get("headers", {}) ctype = headers.get("content-type") if ctype: - ct = netlib.utils.parse_content_type(ctype) if ctype else None + ct = parse_content_type(ctype) if ctype else None ct = "%s/%s" % (ct[0], ct[1]) if ct in content_types_map: return content_types_map[ct][0](data, **metadata) elif utils.isXML(data): return get("XML")(data, **metadata) + if utils.isMostlyBin(data): + return get("Hex")(data) return get("Raw")(data) @@ -146,7 +148,7 @@ class ViewHex(View): @staticmethod def _format(data): - for offset, hexa, s in netlib.utils.hexdump(data): + for offset, hexa, s in hexdump(data): yield [ ("offset", offset + " "), ("text", hexa + " "), @@ -251,7 +253,7 @@ class ViewURLEncoded(View): content_types = ["application/x-www-form-urlencoded"] def __call__(self, data, **metadata): - d = netlib.utils.urldecode(data) + d = urldecode(data) return "URLEncoded form", format_dict(ODict(d)) @@ -268,7 +270,7 @@ class ViewMultipart(View): def __call__(self, data, **metadata): headers = metadata.get("headers", {}) - v = netlib.utils.multipartdecode(headers, data) + v = multipartdecode(headers, data) if v: return "Multipart form", self._format(v) @@ -519,6 +521,21 @@ def get(name): return i +def safe_to_print(lines, encoding="utf8"): + """ + Wraps a content generator so that each text portion is a *safe to print* unicode string. + """ + for line in lines: + clean_line = [] + for (style, text) in line: + try: + text = clean_bin(text.decode(encoding, "strict")) + except UnicodeDecodeError: + text = clean_bin(text).decode(encoding, "strict") + clean_line.append((style, text)) + yield clean_line + + def get_content_view(viewmode, data, **metadata): """ Args: @@ -527,6 +544,7 @@ def get_content_view(viewmode, data, **metadata): Returns: A (description, content generator) tuple. + In contrast to calling the views directly, text is always safe-to-print unicode. Raises: ContentViewException, if the content view threw an error. @@ -556,4 +574,4 @@ def get_content_view(viewmode, data, **metadata): msg.append("Couldn't parse: falling back to Raw") else: msg.append(ret[0]) - return " ".join(msg), ret[1] + return " ".join(msg), safe_to_print(ret[1]) diff --git a/libmproxy/dump.py b/libmproxy/dump.py index d477e032..9fc9e1b8 100644 --- a/libmproxy/dump.py +++ b/libmproxy/dump.py @@ -57,12 +57,8 @@ class Options(object): setattr(self, i, None) -_contentview_auto = contentviews.get("Auto") -_contentview_raw = contentviews.get("Raw") - - class DumpMaster(flow.FlowMaster): - def __init__(self, server, options, outfile=sys.stdout): + def __init__(self, server, options, outfile=None): flow.FlowMaster.__init__(self, server, flow.State()) self.outfile = outfile self.o = options @@ -91,7 +87,7 @@ class DumpMaster(flow.FlowMaster): if options.outfile: path = os.path.expanduser(options.outfile[0]) try: - f = file(path, options.outfile[1]) + f = open(path, options.outfile[1]) self.start_stream(f, self.filt) except IOError as v: raise DumpError(v.strerror) @@ -185,16 +181,16 @@ class DumpMaster(flow.FlowMaster): try: type, lines = contentviews.get_content_view( - _contentview_auto, - message.body, + contentviews.get("Auto"), + message.body, headers=message.headers ) except ContentViewException: s = "Content viewer failed: \n" + traceback.format_exc() self.add_event(s, "debug") type, lines = contentviews.get_content_view( - _contentview_raw, - message.body, + contentviews.get("Raw"), + message.body, headers=message.headers ) @@ -206,17 +202,19 @@ class DumpMaster(flow.FlowMaster): ) def colorful(line): - yield " " # we can already indent here + yield u" " # we can already indent here for (style, text) in line: yield click.style(text, **styles.get(style, {})) if self.o.flow_detail == 3: - lines_to_echo = itertools.islice(lines, contentviews.VIEW_CUTOFF) + lines_to_echo = itertools.islice(lines, 70) else: lines_to_echo = lines - content = "\r\n".join( - "".join(colorful(line)) for line in lines_to_echo + lines_to_echo = list(lines_to_echo) + + content = u"\r\n".join( + u"".join(colorful(line)) for line in lines_to_echo ) self.echo(content) @@ -302,7 +300,8 @@ class DumpMaster(flow.FlowMaster): if f.error: self.echo(" << {}".format(f.error.msg), bold=True, fg="red") - self.outfile.flush() + if self.outfile: + self.outfile.flush() def _process_flow(self, f): self.state.delete_flow(f) diff --git a/libmproxy/protocol/http.py b/libmproxy/protocol/http.py index 3a415320..230f2be9 100644 --- a/libmproxy/protocol/http.py +++ b/libmproxy/protocol/http.py @@ -1,6 +1,7 @@ from __future__ import (absolute_import, print_function, division) import itertools import sys +import traceback import six @@ -384,9 +385,13 @@ class HttpLayer(Layer): return except (HttpErrorConnClosed, NetLibError, HttpError, ProtocolException) as e: + error_propagated = False if flow.request and not flow.response: - flow.error = Error(repr(e)) + flow.error = Error(str(e)) self.channel.ask("error", flow) + self.log(traceback.format_exc(), "debug") + error_propagated = True + try: self.send_response(make_error_response( getattr(e, "code", 502), @@ -394,10 +399,12 @@ class HttpLayer(Layer): )) except NetLibError: pass - if isinstance(e, ProtocolException): - six.reraise(ProtocolException, e, sys.exc_info()[2]) - else: - six.reraise(ProtocolException, ProtocolException("Error in HTTP connection: %s" % repr(e)), sys.exc_info()[2]) + + if not error_propagated: + if isinstance(e, ProtocolException): + six.reraise(ProtocolException, e, sys.exc_info()[2]) + else: + six.reraise(ProtocolException, ProtocolException("Error in HTTP connection: %s" % repr(e)), sys.exc_info()[2]) finally: flow.live = False diff --git a/libmproxy/protocol/rawtcp.py b/libmproxy/protocol/rawtcp.py index 9b155412..24c19523 100644 --- a/libmproxy/protocol/rawtcp.py +++ b/libmproxy/protocol/rawtcp.py @@ -7,7 +7,7 @@ import sys from OpenSSL import SSL from netlib.tcp import NetLibError, ssl_read_select -from netlib.utils import cleanBin +from netlib.utils import clean_bin from ..exceptions import ProtocolException from .base import Layer @@ -58,7 +58,7 @@ class RawTCPLayer(Layer): direction = "-> tcp -> {}".format(repr(self.server_conn.address)) else: direction = "<- tcp <- {}".format(repr(self.server_conn.address)) - data = cleanBin(buf[:size].tobytes()) + data = clean_bin(buf[:size].tobytes()) self.log( "{}\r\n{}".format(direction, data), "info" diff --git a/libmproxy/utils.py b/libmproxy/utils.py index 4b591250..8bd843a0 100644 --- a/libmproxy/utils.py +++ b/libmproxy/utils.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import (absolute_import, print_function, division) import os import datetime import re @@ -30,15 +30,16 @@ def isBin(s): """ for i in s: i = ord(i) - if i < 9: - return True - elif i > 13 and i < 32: - return True - elif i > 126: + if i < 9 or 13 < i < 32 or 126 < i: return True return False +def isMostlyBin(s): + s = s[:100] + return sum(isBin(ch) for ch in s)/len(s) > 0.3 + + def isXML(s): for i in s: if i in "\n \t": -- cgit v1.2.3 From 5fe12a467f37bfba2f4f663274cacbc6ecc770f7 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Sat, 12 Sep 2015 17:40:30 +0200 Subject: fix tests --- test/test_contentview.py | 110 +++++++++++++++++++---------------------------- test/test_dump.py | 2 +- 2 files changed, 46 insertions(+), 66 deletions(-) diff --git a/test/test_contentview.py b/test/test_contentview.py index 2089b3ea..97608520 100644 --- a/test/test_contentview.py +++ b/test/test_contentview.py @@ -1,8 +1,5 @@ from libmproxy.exceptions import ContentViewException from netlib.http import Headers - -import sys - import netlib.utils from netlib import encoding @@ -25,66 +22,61 @@ class TestContentView: def test_view_auto(self): v = cv.ViewAuto() f = v( - Headers(), "foo", - 1000 + headers=Headers() ) assert f[0] == "Raw" f = v( - Headers(content_type="text/html"), "", - 1000 + headers=Headers(content_type="text/html") ) assert f[0] == "HTML" f = v( - Headers(content_type="text/flibble"), "foo", - 1000 + headers=Headers(content_type="text/flibble") ) assert f[0] == "Raw" f = v( - Headers(content_type="text/flibble"), "", - 1000 + headers=Headers(content_type="text/flibble") ) assert f[0].startswith("XML") def test_view_urlencoded(self): d = netlib.utils.urlencode([("one", "two"), ("three", "four")]) v = cv.ViewURLEncoded() - assert v([], d, 100) + assert v(d) d = netlib.utils.urlencode([("adsfa", "")]) v = cv.ViewURLEncoded() - assert v([], d, 100) + assert v(d) def test_view_html(self): v = cv.ViewHTML() s = "


one

" - assert v([], s, 1000) + assert v(s) s = "gobbledygook" - assert not v([], s, 1000) + assert not v(s) def test_view_html_outline(self): v = cv.ViewHTMLOutline() s = "


one

" - assert v([], s, 1000) + assert v(s) def test_view_json(self): cv.VIEW_CUTOFF = 100 v = cv.ViewJSON() - assert v([], "{}", 1000) - assert not v([], "{", 1000) - assert v([], "[" + ",".join(["0"] * cv.VIEW_CUTOFF) + "]", 1000) - assert v([], "[1, 2, 3, 4, 5]", 5) + assert v("{}") + assert not v("{") + assert v("[1, 2, 3, 4, 5]") def test_view_xml(self): v = cv.ViewXML() - assert v([], "", 1000) - assert not v([], "", 1000) + assert v("") + assert not v("") s = """ """ - assert v([], s, 1000) + assert v(s) def test_view_raw(self): v = cv.ViewRaw() - assert v([], "foo", 1000) + assert v("foo") def test_view_javascript(self): v = cv.ViewJavaScript() - assert v([], "[1, 2, 3]", 100) - assert v([], "[1, 2, 3", 100) - assert v([], "function(a){[1, 2, 3]}", 100) + assert v("[1, 2, 3]") + assert v("[1, 2, 3") + assert v("function(a){[1, 2, 3]}") def test_view_css(self): v = cv.ViewCSS() @@ -111,14 +103,14 @@ class TestContentView: with open(tutils.test_data.path('data/1.css'), 'r') as fp: fixture_1 = fp.read() - result = v([], 'a', 100) + result = v('a') if cssutils: assert len(list(result[1])) == 0 else: assert len(list(result[1])) == 1 - result = v([], fixture_1, 100) + result = v(fixture_1) if cssutils: assert len(list(result[1])) > 1 @@ -127,23 +119,23 @@ class TestContentView: def test_view_hex(self): v = cv.ViewHex() - assert v([], "foo", 1000) + assert v("foo") def test_view_image(self): v = cv.ViewImage() p = tutils.test_data.path("data/image.png") - assert v([], file(p, "rb").read(), sys.maxsize) + assert v(file(p, "rb").read()) p = tutils.test_data.path("data/image.gif") - assert v([], file(p, "rb").read(), sys.maxsize) + assert v(file(p, "rb").read()) p = tutils.test_data.path("data/image-err1.jpg") - assert v([], file(p, "rb").read(), sys.maxsize) + assert v(file(p, "rb").read()) p = tutils.test_data.path("data/image.ico") - assert v([], file(p, "rb").read(), sys.maxsize) + assert v(file(p, "rb").read()) - assert not v([], "flibble", sys.maxsize) + assert not v("flibble") def test_view_multipart(self): view = cv.ViewMultipart() @@ -155,42 +147,36 @@ Larry --AaB03x """.strip() h = Headers(content_type="multipart/form-data; boundary=AaB03x") - assert view(h, v, 1000) + assert view(v, headers=h) h = Headers() - assert not view(h, v, 1000) + assert not view(v, headers=h) h = Headers(content_type="multipart/form-data") - assert not view(h, v, 1000) + assert not view(v, headers=h) h = Headers(content_type="unparseable") - assert not view(h, v, 1000) + assert not view(v, headers=h) def test_get_content_view(self): r = cv.get_content_view( cv.get("Raw"), - Headers(content_type="application/json"), "[1, 2, 3]", - 1000, - False + headers=Headers(content_type="application/json") ) assert "Raw" in r[0] r = cv.get_content_view( cv.get("Auto"), - Headers(content_type="application/json"), "[1, 2, 3]", - 1000, - False + headers=Headers(content_type="application/json") ) assert r[0] == "JSON" r = cv.get_content_view( cv.get("Auto"), - Headers(content_type="application/json"), "[1, 2", - 1000, - False + headers=Headers(content_type="application/json") ) assert "Raw" in r[0] @@ -198,34 +184,28 @@ Larry ContentViewException, cv.get_content_view, cv.get("AMF"), - Headers(), "[1, 2", - 1000, - False + headers=Headers() ) r = cv.get_content_view( cv.get("Auto"), - Headers( + encoding.encode('gzip', "[1, 2, 3]"), + headers=Headers( content_type="application/json", content_encoding="gzip" - ), - encoding.encode('gzip', "[1, 2, 3]"), - 1000, - False + ) ) assert "decoded gzip" in r[0] assert "JSON" in r[0] r = cv.get_content_view( cv.get("XML"), - Headers( + encoding.encode('gzip', "[1, 2, 3]"), + headers=Headers( content_type="application/json", content_encoding="gzip" - ), - encoding.encode('gzip', "[1, 2, 3]"), - 1000, - False + ) ) assert "decoded gzip" in r[0] assert "Raw" in r[0] @@ -236,22 +216,22 @@ if pyamf: v = cv.ViewAMF() p = tutils.test_data.path("data/amf01") - assert v([], file(p, "rb").read(), sys.maxsize) + assert v(file(p, "rb").read()) p = tutils.test_data.path("data/amf02") - assert v([], file(p, "rb").read(), sys.maxsize) + assert v(file(p, "rb").read()) def test_view_amf_response(): v = cv.ViewAMF() p = tutils.test_data.path("data/amf03") - assert v([], file(p, "rb").read(), sys.maxsize) + assert v(file(p, "rb").read()) if cv.ViewProtobuf.is_available(): def test_view_protobuf_request(): v = cv.ViewProtobuf() p = tutils.test_data.path("data/protobuf01") - content_type, output = v([], file(p, "rb").read(), sys.maxsize) + content_type, output = v(file(p, "rb").read()) assert content_type == "Protobuf" assert output[0].text == '1: "3bbc333c-e61c-433b-819a-0b9a8cc103b8"' diff --git a/test/test_dump.py b/test/test_dump.py index 88f1a6fd..29931759 100644 --- a/test/test_dump.py +++ b/test/test_dump.py @@ -52,7 +52,7 @@ def test_strfuncs(): @mock.patch("libmproxy.contentviews.get_content_view") def test_contentview(get_content_view): - get_content_view.side_effect = ContentViewException(""), ("x", []) + get_content_view.side_effect = ContentViewException(""), ("x", iter([])) o = dump.Options(flow_detail=4, verbosity=3) m = dump.DumpMaster(None, o, StringIO()) -- cgit v1.2.3