From b4e0be905213cdcd8c764fbbe87b63f411e765b4 Mon Sep 17 00:00:00 2001 From: Chris Czub Date: Wed, 2 Sep 2015 14:28:25 -0400 Subject: PR #700 Start moving the contentview up a level Started shifting the contentview class up to libmproxy --- libmproxy/console/contentview.py | 543 --------------------------------------- libmproxy/console/flowview.py | 4 +- libmproxy/console/options.py | 3 +- libmproxy/contentview.py | 543 +++++++++++++++++++++++++++++++++++++++ test/test_console_contentview.py | 2 +- test/test_console_import.py | 5 + 6 files changed, 553 insertions(+), 547 deletions(-) delete mode 100644 libmproxy/console/contentview.py create mode 100644 libmproxy/contentview.py create mode 100644 test/test_console_import.py diff --git a/libmproxy/console/contentview.py b/libmproxy/console/contentview.py deleted file mode 100644 index 95ea7b17..00000000 --- a/libmproxy/console/contentview.py +++ /dev/null @@ -1,543 +0,0 @@ -from __future__ import absolute_import -import cStringIO -import json -import logging -import lxml.html -import lxml.etree -from PIL import Image -from PIL.ExifTags import TAGS -import subprocess -import traceback -import urwid -import html2text - -import netlib.utils -from netlib import odict, encoding - -from . import common, signals -from .. import utils -from ..contrib import jsbeautifier -from ..contrib.wbxml.ASCommandResponse import ASCommandResponse - -try: - import pyamf - from pyamf import remoting, flex -except ImportError: # pragma nocover - pyamf = None - -try: - import cssutils -except ImportError: # pragma nocover - cssutils = None -else: - cssutils.log.setLevel(logging.CRITICAL) - - cssutils.ser.prefs.keepComments = True - cssutils.ser.prefs.omitLastSemicolon = False - cssutils.ser.prefs.indentClosingBrace = False - cssutils.ser.prefs.validOnly = False - -VIEW_CUTOFF = 1024 * 50 - - -def _view_text(content, total, limit): - """ - Generates a body for a chunk of text. - """ - txt = [] - for i in netlib.utils.cleanBin(content).splitlines(): - txt.append( - urwid.Text(("text", i), wrap="any") - ) - trailer(total, txt, limit) - return txt - - -def trailer(clen, txt, limit): - rem = clen - limit - if rem > 0: - txt.append(urwid.Text("")) - txt.append( - urwid.Text( - [ - ("highlight", "... %s of data not shown. Press " % netlib.utils.pretty_size(rem)), - ("key", "f"), - ("highlight", " to load all data.") - ] - ) - ) - - -class ViewAuto: - name = "Auto" - prompt = ("auto", "a") - content_types = [] - - def __call__(self, hdrs, content, limit): - ctype = hdrs.get_first("content-type") - if ctype: - ct = netlib.utils.parse_content_type(ctype) if ctype else None - ct = "%s/%s" % (ct[0], ct[1]) - if ct in content_types_map: - return content_types_map[ct][0](hdrs, content, limit) - elif utils.isXML(content): - return get("XML")(hdrs, content, limit) - return get("Raw")(hdrs, content, limit) - - -class ViewRaw: - name = "Raw" - prompt = ("raw", "r") - content_types = [] - - def __call__(self, hdrs, content, limit): - txt = _view_text(content[:limit], len(content), limit) - return "Raw", txt - - -class ViewHex: - name = "Hex" - prompt = ("hex", "e") - content_types = [] - - def __call__(self, hdrs, content, limit): - txt = [] - for offset, hexa, s in netlib.utils.hexdump(content[:limit]): - txt.append(urwid.Text([ - ("offset", offset), - " ", - ("text", hexa), - " ", - ("text", s), - ])) - trailer(len(content), txt, limit) - return "Hex", txt - - -class ViewXML: - name = "XML" - prompt = ("xml", "x") - content_types = ["text/xml"] - - def __call__(self, hdrs, content, limit): - parser = lxml.etree.XMLParser( - remove_blank_text=True, - resolve_entities=False, - strip_cdata=False, - recover=False - ) - try: - document = lxml.etree.fromstring(content, parser) - except lxml.etree.XMLSyntaxError: - return None - docinfo = document.getroottree().docinfo - - prev = [] - p = document.getroottree().getroot().getprevious() - while p is not None: - prev.insert( - 0, - lxml.etree.tostring(p) - ) - p = p.getprevious() - doctype = docinfo.doctype - if prev: - doctype += "\n".join(prev).strip() - doctype = doctype.strip() - - s = lxml.etree.tostring( - document, - pretty_print=True, - xml_declaration=True, - doctype=doctype or None, - encoding = docinfo.encoding - ) - - txt = [] - for i in s[:limit].strip().split("\n"): - txt.append( - urwid.Text(("text", i)), - ) - trailer(len(content), txt, limit) - return "XML-like data", txt - - -class ViewJSON: - name = "JSON" - prompt = ("json", "s") - content_types = ["application/json"] - - def __call__(self, hdrs, content, limit): - lines = utils.pretty_json(content) - if lines: - txt = [] - sofar = 0 - for i in lines: - sofar += len(i) - txt.append( - urwid.Text(("text", i)), - ) - if sofar > limit: - break - trailer(sum(len(i) for i in lines), txt, limit) - return "JSON", txt - - -class ViewHTML: - name = "HTML" - prompt = ("html", "h") - content_types = ["text/html"] - - def __call__(self, hdrs, content, limit): - if utils.isXML(content): - parser = lxml.etree.HTMLParser( - strip_cdata=True, - remove_blank_text=True - ) - d = lxml.html.fromstring(content, parser=parser) - docinfo = d.getroottree().docinfo - s = lxml.etree.tostring( - d, - pretty_print=True, - doctype=docinfo.doctype - ) - return "HTML", _view_text(s[:limit], len(s), limit) - - -class ViewHTMLOutline: - name = "HTML Outline" - prompt = ("html outline", "o") - content_types = ["text/html"] - - def __call__(self, hdrs, content, limit): - content = content.decode("utf-8") - h = html2text.HTML2Text(baseurl="") - h.ignore_images = True - h.body_width = 0 - content = h.handle(content) - txt = _view_text(content[:limit], len(content), limit) - return "HTML Outline", txt - - -class ViewURLEncoded: - name = "URL-encoded" - prompt = ("urlencoded", "u") - content_types = ["application/x-www-form-urlencoded"] - - def __call__(self, hdrs, content, limit): - lines = netlib.utils.urldecode(content) - if lines: - body = common.format_keyvals( - [(k + ":", v) for (k, v) in lines], - key = "header", - val = "text" - ) - return "URLEncoded form", body - - -class ViewMultipart: - name = "Multipart Form" - prompt = ("multipart", "m") - content_types = ["multipart/form-data"] - - def __call__(self, hdrs, content, limit): - v = netlib.utils.multipartdecode(hdrs, content) - if v: - r = [ - urwid.Text(("highlight", "Form data:\n")), - ] - r.extend(common.format_keyvals( - v, - key = "header", - val = "text" - )) - return "Multipart form", r - - -if pyamf: - class DummyObject(dict): - def __init__(self, alias): - dict.__init__(self) - - def __readamf__(self, input): - data = input.readObject() - self["data"] = data - - def pyamf_class_loader(s): - for i in pyamf.CLASS_LOADERS: - if i != pyamf_class_loader: - v = i(s) - if v: - return v - return DummyObject - - pyamf.register_class_loader(pyamf_class_loader) - - class ViewAMF: - name = "AMF" - prompt = ("amf", "f") - content_types = ["application/x-amf"] - - def unpack(self, b, seen=set([])): - if hasattr(b, "body"): - return self.unpack(b.body, seen) - if isinstance(b, DummyObject): - if id(b) in seen: - return "" - else: - seen.add(id(b)) - for k, v in b.items(): - b[k] = self.unpack(v, seen) - return b - elif isinstance(b, dict): - for k, v in b.items(): - b[k] = self.unpack(v, seen) - return b - elif isinstance(b, list): - return [self.unpack(i) for i in b] - elif isinstance(b, flex.ArrayCollection): - return [self.unpack(i, seen) for i in b] - else: - return b - - def __call__(self, hdrs, content, limit): - envelope = remoting.decode(content, strict=False) - if not envelope: - return None - - txt = [] - for target, message in iter(envelope): - if isinstance(message, pyamf.remoting.Request): - txt.append(urwid.Text([ - ("header", "Request: "), - ("text", str(target)), - ])) - else: - txt.append(urwid.Text([ - ("header", "Response: "), - ("text", "%s, code %s" % (target, message.status)), - ])) - - s = json.dumps(self.unpack(message), indent=4) - txt.extend(_view_text(s[:limit], len(s), limit)) - - return "AMF v%s" % envelope.amfVersion, txt - - -class ViewJavaScript: - name = "JavaScript" - prompt = ("javascript", "j") - content_types = [ - "application/x-javascript", - "application/javascript", - "text/javascript" - ] - - def __call__(self, hdrs, content, limit): - opts = jsbeautifier.default_options() - opts.indent_size = 2 - res = jsbeautifier.beautify(content[:limit], opts) - return "JavaScript", _view_text(res, len(res), limit) - - -class ViewCSS: - name = "CSS" - prompt = ("css", "c") - content_types = [ - "text/css" - ] - - def __call__(self, hdrs, content, limit): - if cssutils: - sheet = cssutils.parseString(content) - beautified = sheet.cssText - else: - beautified = content - - return "CSS", _view_text(beautified, len(beautified), limit) - - -class ViewImage: - name = "Image" - prompt = ("image", "i") - content_types = [ - "image/png", - "image/jpeg", - "image/gif", - "image/vnd.microsoft.icon", - "image/x-icon", - ] - - def __call__(self, hdrs, content, limit): - try: - img = Image.open(cStringIO.StringIO(content)) - except IOError: - return None - parts = [ - ("Format", str(img.format_description)), - ("Size", "%s x %s px" % img.size), - ("Mode", str(img.mode)), - ] - for i in sorted(img.info.keys()): - if i != "exif": - parts.append( - (str(i), str(img.info[i])) - ) - if hasattr(img, "_getexif"): - ex = img._getexif() - if ex: - for i in sorted(ex.keys()): - tag = TAGS.get(i, i) - parts.append( - (str(tag), str(ex[i])) - ) - clean = [] - for i in parts: - clean.append( - [netlib.utils.cleanBin(i[0]), netlib.utils.cleanBin(i[1])] - ) - fmt = common.format_keyvals( - clean, - key = "header", - val = "text" - ) - return "%s image" % img.format, fmt - - -class ViewProtobuf: - """Human friendly view of protocol buffers - The view uses the protoc compiler to decode the binary - """ - - name = "Protocol Buffer" - prompt = ("protobuf", "p") - content_types = [ - "application/x-protobuf", - "application/x-protobuffer", - ] - - @staticmethod - def is_available(): - try: - p = subprocess.Popen( - ["protoc", "--version"], - stdout=subprocess.PIPE - ) - out, _ = p.communicate() - return out.startswith("libprotoc") - except: - return False - - def decode_protobuf(self, content): - # if Popen raises OSError, it will be caught in - # get_content_view and fall back to Raw - p = subprocess.Popen(['protoc', '--decode_raw'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - out, err = p.communicate(input=content) - if out: - return out - else: - return err - - def __call__(self, hdrs, content, limit): - decoded = self.decode_protobuf(content) - txt = _view_text(decoded[:limit], len(decoded), limit) - return "Protobuf", txt - - -class ViewWBXML: - name = "WBXML" - prompt = ("wbxml", "w") - content_types = [ - "application/vnd.wap.wbxml", - "application/vnd.ms-sync.wbxml" - ] - - def __call__(self, hdrs, content, limit): - - try: - parser = ASCommandResponse(content) - parsedContent = parser.xmlString - txt = _view_text(parsedContent, len(parsedContent), limit) - return "WBXML", txt - except: - return None - -views = [ - ViewAuto(), - ViewRaw(), - ViewHex(), - ViewJSON(), - ViewXML(), - ViewWBXML(), - ViewHTML(), - ViewHTMLOutline(), - ViewJavaScript(), - ViewCSS(), - ViewURLEncoded(), - ViewMultipart(), - ViewImage(), -] -if pyamf: - views.append(ViewAMF()) - -if ViewProtobuf.is_available(): - views.append(ViewProtobuf()) - -content_types_map = {} -for i in views: - for ct in i.content_types: - l = content_types_map.setdefault(ct, []) - l.append(i) - - -view_prompts = [i.prompt for i in views] - - -def get_by_shortcut(c): - for i in views: - if i.prompt[1] == c: - return i - - -def get(name): - for i in views: - if i.name == name: - return i - - -def get_content_view(viewmode, hdrItems, content, limit, is_request): - """ - Returns a (msg, body) tuple. - """ - if not content: - if is_request: - return "No request content (press tab to view response)", "" - else: - return "No content", "" - msg = [] - - hdrs = odict.ODictCaseless([list(i) for i in hdrItems]) - - enc = hdrs.get_first("content-encoding") - if enc and enc != "identity": - decoded = encoding.decode(enc, content) - if decoded: - content = decoded - msg.append("[decoded %s]" % enc) - try: - ret = viewmode(hdrs, content, limit) - # Third-party viewers can fail in unexpected ways... - except Exception: - s = traceback.format_exc() - s = "Content viewer failed: \n" + s - signals.add_event(s, "error") - ret = None - if not ret: - ret = get("Raw")(hdrs, content, limit) - msg.append("Couldn't parse: falling back to Raw") - else: - msg.append(ret[0]) - return " ".join(msg), ret[1] diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py index 8b828653..8cec3c36 100644 --- a/libmproxy/console/flowview.py +++ b/libmproxy/console/flowview.py @@ -6,9 +6,9 @@ import urwid from netlib import odict from netlib.http.semantics import CONTENT_MISSING -from . import common, grideditor, contentview, signals, searchable, tabs +from . import common, grideditor, signals, searchable, tabs from . import flowdetailview -from .. import utils, controller +from .. import utils, controller, contentview from ..models import HTTPRequest, HTTPResponse, decoded diff --git a/libmproxy/console/options.py b/libmproxy/console/options.py index 58a4d469..0948e96d 100644 --- a/libmproxy/console/options.py +++ b/libmproxy/console/options.py @@ -1,6 +1,7 @@ import urwid -from . import common, signals, grideditor, contentview +from .. import contentview +from . import common, signals, grideditor from . import select, palettes footer = [ diff --git a/libmproxy/contentview.py b/libmproxy/contentview.py new file mode 100644 index 00000000..3345f5b3 --- /dev/null +++ b/libmproxy/contentview.py @@ -0,0 +1,543 @@ +from __future__ import absolute_import +import cStringIO +import json +import logging +import lxml.html +import lxml.etree +from PIL import Image +from PIL.ExifTags import TAGS +import subprocess +import traceback +import urwid +import html2text + +import netlib.utils +from netlib import odict, encoding + +from .console import common, signals +from . import utils +from .contrib import jsbeautifier +from .contrib.wbxml.ASCommandResponse import ASCommandResponse + +try: + import pyamf + from pyamf import remoting, flex +except ImportError: # pragma nocover + pyamf = None + +try: + import cssutils +except ImportError: # pragma nocover + cssutils = None +else: + cssutils.log.setLevel(logging.CRITICAL) + + cssutils.ser.prefs.keepComments = True + cssutils.ser.prefs.omitLastSemicolon = False + cssutils.ser.prefs.indentClosingBrace = False + cssutils.ser.prefs.validOnly = False + +VIEW_CUTOFF = 1024 * 50 + + +def _view_text(content, total, limit): + """ + Generates a body for a chunk of text. + """ + txt = [] + for i in netlib.utils.cleanBin(content).splitlines(): + txt.append( + urwid.Text(("text", i), wrap="any") + ) + trailer(total, txt, limit) + return txt + + +def trailer(clen, txt, limit): + rem = clen - limit + if rem > 0: + txt.append(urwid.Text("")) + txt.append( + urwid.Text( + [ + ("highlight", "... %s of data not shown. Press " % netlib.utils.pretty_size(rem)), + ("key", "f"), + ("highlight", " to load all data.") + ] + ) + ) + + +class ViewAuto: + name = "Auto" + prompt = ("auto", "a") + content_types = [] + + def __call__(self, hdrs, content, limit): + ctype = hdrs.get_first("content-type") + if ctype: + ct = netlib.utils.parse_content_type(ctype) if ctype else None + ct = "%s/%s" % (ct[0], ct[1]) + if ct in content_types_map: + return content_types_map[ct][0](hdrs, content, limit) + elif utils.isXML(content): + return get("XML")(hdrs, content, limit) + return get("Raw")(hdrs, content, limit) + + +class ViewRaw: + name = "Raw" + prompt = ("raw", "r") + content_types = [] + + def __call__(self, hdrs, content, limit): + txt = _view_text(content[:limit], len(content), limit) + return "Raw", txt + + +class ViewHex: + name = "Hex" + prompt = ("hex", "e") + content_types = [] + + def __call__(self, hdrs, content, limit): + txt = [] + for offset, hexa, s in netlib.utils.hexdump(content[:limit]): + txt.append(urwid.Text([ + ("offset", offset), + " ", + ("text", hexa), + " ", + ("text", s), + ])) + trailer(len(content), txt, limit) + return "Hex", txt + + +class ViewXML: + name = "XML" + prompt = ("xml", "x") + content_types = ["text/xml"] + + def __call__(self, hdrs, content, limit): + parser = lxml.etree.XMLParser( + remove_blank_text=True, + resolve_entities=False, + strip_cdata=False, + recover=False + ) + try: + document = lxml.etree.fromstring(content, parser) + except lxml.etree.XMLSyntaxError: + return None + docinfo = document.getroottree().docinfo + + prev = [] + p = document.getroottree().getroot().getprevious() + while p is not None: + prev.insert( + 0, + lxml.etree.tostring(p) + ) + p = p.getprevious() + doctype = docinfo.doctype + if prev: + doctype += "\n".join(prev).strip() + doctype = doctype.strip() + + s = lxml.etree.tostring( + document, + pretty_print=True, + xml_declaration=True, + doctype=doctype or None, + encoding = docinfo.encoding + ) + + txt = [] + for i in s[:limit].strip().split("\n"): + txt.append( + urwid.Text(("text", i)), + ) + trailer(len(content), txt, limit) + return "XML-like data", txt + + +class ViewJSON: + name = "JSON" + prompt = ("json", "s") + content_types = ["application/json"] + + def __call__(self, hdrs, content, limit): + lines = utils.pretty_json(content) + if lines: + txt = [] + sofar = 0 + for i in lines: + sofar += len(i) + txt.append( + urwid.Text(("text", i)), + ) + if sofar > limit: + break + trailer(sum(len(i) for i in lines), txt, limit) + return "JSON", txt + + +class ViewHTML: + name = "HTML" + prompt = ("html", "h") + content_types = ["text/html"] + + def __call__(self, hdrs, content, limit): + if utils.isXML(content): + parser = lxml.etree.HTMLParser( + strip_cdata=True, + remove_blank_text=True + ) + d = lxml.html.fromstring(content, parser=parser) + docinfo = d.getroottree().docinfo + s = lxml.etree.tostring( + d, + pretty_print=True, + doctype=docinfo.doctype + ) + return "HTML", _view_text(s[:limit], len(s), limit) + + +class ViewHTMLOutline: + name = "HTML Outline" + prompt = ("html outline", "o") + content_types = ["text/html"] + + def __call__(self, hdrs, content, limit): + content = content.decode("utf-8") + h = html2text.HTML2Text(baseurl="") + h.ignore_images = True + h.body_width = 0 + content = h.handle(content) + txt = _view_text(content[:limit], len(content), limit) + return "HTML Outline", txt + + +class ViewURLEncoded: + name = "URL-encoded" + prompt = ("urlencoded", "u") + content_types = ["application/x-www-form-urlencoded"] + + def __call__(self, hdrs, content, limit): + lines = netlib.utils.urldecode(content) + if lines: + body = common.format_keyvals( + [(k + ":", v) for (k, v) in lines], + key = "header", + val = "text" + ) + return "URLEncoded form", body + + +class ViewMultipart: + name = "Multipart Form" + prompt = ("multipart", "m") + content_types = ["multipart/form-data"] + + def __call__(self, hdrs, content, limit): + v = netlib.utils.multipartdecode(hdrs, content) + if v: + r = [ + urwid.Text(("highlight", "Form data:\n")), + ] + r.extend(common.format_keyvals( + v, + key = "header", + val = "text" + )) + return "Multipart form", r + + +if pyamf: + class DummyObject(dict): + def __init__(self, alias): + dict.__init__(self) + + def __readamf__(self, input): + data = input.readObject() + self["data"] = data + + def pyamf_class_loader(s): + for i in pyamf.CLASS_LOADERS: + if i != pyamf_class_loader: + v = i(s) + if v: + return v + return DummyObject + + pyamf.register_class_loader(pyamf_class_loader) + + class ViewAMF: + name = "AMF" + prompt = ("amf", "f") + content_types = ["application/x-amf"] + + def unpack(self, b, seen=set([])): + if hasattr(b, "body"): + return self.unpack(b.body, seen) + if isinstance(b, DummyObject): + if id(b) in seen: + return "" + else: + seen.add(id(b)) + for k, v in b.items(): + b[k] = self.unpack(v, seen) + return b + elif isinstance(b, dict): + for k, v in b.items(): + b[k] = self.unpack(v, seen) + return b + elif isinstance(b, list): + return [self.unpack(i) for i in b] + elif isinstance(b, flex.ArrayCollection): + return [self.unpack(i, seen) for i in b] + else: + return b + + def __call__(self, hdrs, content, limit): + envelope = remoting.decode(content, strict=False) + if not envelope: + return None + + txt = [] + for target, message in iter(envelope): + if isinstance(message, pyamf.remoting.Request): + txt.append(urwid.Text([ + ("header", "Request: "), + ("text", str(target)), + ])) + else: + txt.append(urwid.Text([ + ("header", "Response: "), + ("text", "%s, code %s" % (target, message.status)), + ])) + + s = json.dumps(self.unpack(message), indent=4) + txt.extend(_view_text(s[:limit], len(s), limit)) + + return "AMF v%s" % envelope.amfVersion, txt + + +class ViewJavaScript: + name = "JavaScript" + prompt = ("javascript", "j") + content_types = [ + "application/x-javascript", + "application/javascript", + "text/javascript" + ] + + def __call__(self, hdrs, content, limit): + opts = jsbeautifier.default_options() + opts.indent_size = 2 + res = jsbeautifier.beautify(content[:limit], opts) + return "JavaScript", _view_text(res, len(res), limit) + + +class ViewCSS: + name = "CSS" + prompt = ("css", "c") + content_types = [ + "text/css" + ] + + def __call__(self, hdrs, content, limit): + if cssutils: + sheet = cssutils.parseString(content) + beautified = sheet.cssText + else: + beautified = content + + return "CSS", _view_text(beautified, len(beautified), limit) + + +class ViewImage: + name = "Image" + prompt = ("image", "i") + content_types = [ + "image/png", + "image/jpeg", + "image/gif", + "image/vnd.microsoft.icon", + "image/x-icon", + ] + + def __call__(self, hdrs, content, limit): + try: + img = Image.open(cStringIO.StringIO(content)) + except IOError: + return None + parts = [ + ("Format", str(img.format_description)), + ("Size", "%s x %s px" % img.size), + ("Mode", str(img.mode)), + ] + for i in sorted(img.info.keys()): + if i != "exif": + parts.append( + (str(i), str(img.info[i])) + ) + if hasattr(img, "_getexif"): + ex = img._getexif() + if ex: + for i in sorted(ex.keys()): + tag = TAGS.get(i, i) + parts.append( + (str(tag), str(ex[i])) + ) + clean = [] + for i in parts: + clean.append( + [netlib.utils.cleanBin(i[0]), netlib.utils.cleanBin(i[1])] + ) + fmt = common.format_keyvals( + clean, + key = "header", + val = "text" + ) + return "%s image" % img.format, fmt + + +class ViewProtobuf: + """Human friendly view of protocol buffers + The view uses the protoc compiler to decode the binary + """ + + name = "Protocol Buffer" + prompt = ("protobuf", "p") + content_types = [ + "application/x-protobuf", + "application/x-protobuffer", + ] + + @staticmethod + def is_available(): + try: + p = subprocess.Popen( + ["protoc", "--version"], + stdout=subprocess.PIPE + ) + out, _ = p.communicate() + return out.startswith("libprotoc") + except: + return False + + def decode_protobuf(self, content): + # if Popen raises OSError, it will be caught in + # get_content_view and fall back to Raw + p = subprocess.Popen(['protoc', '--decode_raw'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = p.communicate(input=content) + if out: + return out + else: + return err + + def __call__(self, hdrs, content, limit): + decoded = self.decode_protobuf(content) + txt = _view_text(decoded[:limit], len(decoded), limit) + return "Protobuf", txt + + +class ViewWBXML: + name = "WBXML" + prompt = ("wbxml", "w") + content_types = [ + "application/vnd.wap.wbxml", + "application/vnd.ms-sync.wbxml" + ] + + def __call__(self, hdrs, content, limit): + + try: + parser = ASCommandResponse(content) + parsedContent = parser.xmlString + txt = _view_text(parsedContent, len(parsedContent), limit) + return "WBXML", txt + except: + return None + +views = [ + ViewAuto(), + ViewRaw(), + ViewHex(), + ViewJSON(), + ViewXML(), + ViewWBXML(), + ViewHTML(), + ViewHTMLOutline(), + ViewJavaScript(), + ViewCSS(), + ViewURLEncoded(), + ViewMultipart(), + ViewImage(), +] +if pyamf: + views.append(ViewAMF()) + +if ViewProtobuf.is_available(): + views.append(ViewProtobuf()) + +content_types_map = {} +for i in views: + for ct in i.content_types: + l = content_types_map.setdefault(ct, []) + l.append(i) + + +view_prompts = [i.prompt for i in views] + + +def get_by_shortcut(c): + for i in views: + if i.prompt[1] == c: + return i + + +def get(name): + for i in views: + if i.name == name: + return i + + +def get_content_view(viewmode, hdrItems, content, limit, is_request): + """ + Returns a (msg, body) tuple. + """ + if not content: + if is_request: + return "No request content (press tab to view response)", "" + else: + return "No content", "" + msg = [] + + hdrs = odict.ODictCaseless([list(i) for i in hdrItems]) + + enc = hdrs.get_first("content-encoding") + if enc and enc != "identity": + decoded = encoding.decode(enc, content) + if decoded: + content = decoded + msg.append("[decoded %s]" % enc) + try: + ret = viewmode(hdrs, content, limit) + # Third-party viewers can fail in unexpected ways... + except Exception: + s = traceback.format_exc() + s = "Content viewer failed: \n" + s + signals.add_event(s, "error") + ret = None + if not ret: + ret = get("Raw")(hdrs, content, limit) + msg.append("Couldn't parse: falling back to Raw") + else: + msg.append(ret[0]) + return " ".join(msg), ret[1] diff --git a/test/test_console_contentview.py b/test/test_console_contentview.py index d1a6180f..50c3f766 100644 --- a/test/test_console_contentview.py +++ b/test/test_console_contentview.py @@ -7,7 +7,7 @@ import sys import netlib.utils from netlib import odict, encoding -import libmproxy.console.contentview as cv +import libmproxy.contentview as cv from libmproxy import utils, flow import tutils diff --git a/test/test_console_import.py b/test/test_console_import.py new file mode 100644 index 00000000..c99faae8 --- /dev/null +++ b/test/test_console_import.py @@ -0,0 +1,5 @@ +import libmproxy.contentview as cv + + +def test_pass(): + assert True -- cgit v1.2.3 From cc2a6a39198aee1694d8e368c03d4a0055f3bb92 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Wed, 2 Sep 2015 20:56:19 +0200 Subject: fix circular imports --- libmproxy/console/flowview.py | 3 ++- libmproxy/contentview.py | 26 +++++++++++++++++--------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py index 8cec3c36..958ab176 100644 --- a/libmproxy/console/flowview.py +++ b/libmproxy/console/flowview.py @@ -185,7 +185,8 @@ class FlowView(tabs.Tabs): tuple(tuple(i) for i in conn.headers.lst), conn.content, limit, - isinstance(conn, HTTPRequest) + isinstance(conn, HTTPRequest), + signals.add_event ) return (description, text_objects) diff --git a/libmproxy/contentview.py b/libmproxy/contentview.py index 3345f5b3..3e0ef6ab 100644 --- a/libmproxy/contentview.py +++ b/libmproxy/contentview.py @@ -14,7 +14,6 @@ import html2text import netlib.utils from netlib import odict, encoding -from .console import common, signals from . import utils from .contrib import jsbeautifier from .contrib.wbxml.ASCommandResponse import ASCommandResponse @@ -40,6 +39,10 @@ else: VIEW_CUTOFF = 1024 * 50 +def format_keyvals(lst, key="key", val="text", indent=0): + raise NotImplementedError() + + def _view_text(content, total, limit): """ Generates a body for a chunk of text. @@ -227,7 +230,7 @@ class ViewURLEncoded: def __call__(self, hdrs, content, limit): lines = netlib.utils.urldecode(content) if lines: - body = common.format_keyvals( + body = format_keyvals( [(k + ":", v) for (k, v) in lines], key = "header", val = "text" @@ -246,7 +249,7 @@ class ViewMultipart: r = [ urwid.Text(("highlight", "Form data:\n")), ] - r.extend(common.format_keyvals( + r.extend(format_keyvals( v, key = "header", val = "text" @@ -396,7 +399,7 @@ class ViewImage: clean.append( [netlib.utils.cleanBin(i[0]), netlib.utils.cleanBin(i[1])] ) - fmt = common.format_keyvals( + fmt = format_keyvals( clean, key = "header", val = "text" @@ -508,9 +511,13 @@ def get(name): return i -def get_content_view(viewmode, hdrItems, content, limit, is_request): +def get_content_view(viewmode, hdrItems, content, limit, is_request, log=None): """ - Returns a (msg, body) tuple. + Returns: + A (msg, body) tuple. + + Raises: + ContentViewException, if the content view threw an error. """ if not content: if is_request: @@ -531,9 +538,10 @@ def get_content_view(viewmode, hdrItems, content, limit, is_request): ret = viewmode(hdrs, content, limit) # Third-party viewers can fail in unexpected ways... except Exception: - s = traceback.format_exc() - s = "Content viewer failed: \n" + s - signals.add_event(s, "error") + if log: + s = traceback.format_exc() + s = "Content viewer failed: \n" + s + log(s, "error") ret = None if not ret: ret = get("Raw")(hdrs, content, limit) -- cgit v1.2.3 From 018c693dee2ce0184a0fa02012d7063725803df0 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Fri, 4 Sep 2015 17:07:14 +0200 Subject: remove urwid from contentviews (wip) --- libmproxy/contentview.py | 144 +++++++++++++++++++++++++++++------------------ 1 file changed, 89 insertions(+), 55 deletions(-) diff --git a/libmproxy/contentview.py b/libmproxy/contentview.py index 3e0ef6ab..ec3603c7 100644 --- a/libmproxy/contentview.py +++ b/libmproxy/contentview.py @@ -2,18 +2,18 @@ from __future__ import absolute_import import cStringIO import json import logging +import subprocess +import traceback + import lxml.html import lxml.etree from PIL import Image from PIL.ExifTags import TAGS -import subprocess -import traceback import urwid import html2text import netlib.utils from netlib import odict, encoding - from . import utils from .contrib import jsbeautifier from .contrib.wbxml.ASCommandResponse import ASCommandResponse @@ -37,16 +37,54 @@ else: cssutils.ser.prefs.validOnly = False VIEW_CUTOFF = 1024 * 50 +KEY_MAX = 30 -def format_keyvals(lst, key="key", val="text", indent=0): - raise NotImplementedError() +def format_dict(d): + """ + Transforms the given dictionary into a list of + ("key", key ) + ("value", value) + tuples, where key is padded to a uniform width. + """ + max_key_len = max(len(k) for k in d.keys()) + max_key_len = min(max_key_len, KEY_MAX) + for key, value in d.items(): + key += ":" + key = key.ljust(max_key_len + 2) + yield ( + ("key", key), + ("value", value) + ) -def _view_text(content, total, limit): +def format_text(content, limit): """ - Generates a body for a chunk of text. + Transforms the given content into """ + content = netlib.utils.cleanBin(content) + + for line in content[:limit].splitlines(): + yield ("text", line) + + for msg in trailer(content, limit): + yield msg + + +def trailer(content, limit): + bytes_removed = len(content) - limit + if bytes_removed > 0: + yield ( + "cutoff", + "... {} of data not shown.".format(netlib.utils.pretty_size(bytes_removed)) + ) + + +""" +def _view_text(content, total, limit): + "" + Generates a body for a chunk of text. + "" txt = [] for i in netlib.utils.cleanBin(content).splitlines(): txt.append( @@ -69,9 +107,19 @@ def trailer(clen, txt, limit): ] ) ) +""" -class ViewAuto: +class View: + name = None + prompt = () + content_types = [] + + def __call__(self, hdrs, content, limit): + raise NotImplementedError() + + +class ViewAuto(View): name = "Auto" prompt = ("auto", "a") content_types = [] @@ -84,40 +132,40 @@ class ViewAuto: if ct in content_types_map: return content_types_map[ct][0](hdrs, content, limit) elif utils.isXML(content): - return get("XML")(hdrs, content, limit) - return get("Raw")(hdrs, content, limit) + return ViewXML(hdrs, content, limit) + return ViewRaw(hdrs, content, limit) -class ViewRaw: +class ViewRaw(View): name = "Raw" prompt = ("raw", "r") content_types = [] def __call__(self, hdrs, content, limit): - txt = _view_text(content[:limit], len(content), limit) - return "Raw", txt + return "Raw", format_text(content, limit) -class ViewHex: +class ViewHex(View): name = "Hex" prompt = ("hex", "e") content_types = [] - def __call__(self, hdrs, content, limit): - txt = [] + @staticmethod + def _format(content, limit): for offset, hexa, s in netlib.utils.hexdump(content[:limit]): - txt.append(urwid.Text([ - ("offset", offset), - " ", - ("text", hexa), - " ", + yield ( + ("offset", offset + " "), + ("text", hexa + " "), ("text", s), - ])) - trailer(len(content), txt, limit) - return "Hex", txt + ) + for msg in trailer(content, limit): + yield msg + + def __call__(self, hdrs, content, limit): + return "Hex", self._format(content, limit) -class ViewXML: +class ViewXML(View): name = "XML" prompt = ("xml", "x") content_types = ["text/xml"] @@ -153,37 +201,20 @@ class ViewXML: pretty_print=True, xml_declaration=True, doctype=doctype or None, - encoding = docinfo.encoding + encoding=docinfo.encoding ) - txt = [] - for i in s[:limit].strip().split("\n"): - txt.append( - urwid.Text(("text", i)), - ) - trailer(len(content), txt, limit) - return "XML-like data", txt + return "XML-like data", format_text(s, limit) -class ViewJSON: +class ViewJSON(View): name = "JSON" prompt = ("json", "s") content_types = ["application/json"] def __call__(self, hdrs, content, limit): - lines = utils.pretty_json(content) - if lines: - txt = [] - sofar = 0 - for i in lines: - sofar += len(i) - txt.append( - urwid.Text(("text", i)), - ) - if sofar > limit: - break - trailer(sum(len(i) for i in lines), txt, limit) - return "JSON", txt + pretty_json = utils.pretty_json(content) + return "JSON", format_text(pretty_json, limit) class ViewHTML: @@ -204,7 +235,7 @@ class ViewHTML: pretty_print=True, doctype=docinfo.doctype ) - return "HTML", _view_text(s[:limit], len(s), limit) + return "HTML", format_text(s, limit) class ViewHTMLOutline: @@ -232,8 +263,8 @@ class ViewURLEncoded: if lines: body = format_keyvals( [(k + ":", v) for (k, v) in lines], - key = "header", - val = "text" + key="header", + val="text" ) return "URLEncoded form", body @@ -251,8 +282,8 @@ class ViewMultipart: ] r.extend(format_keyvals( v, - key = "header", - val = "text" + key="header", + val="text" )) return "Multipart form", r @@ -266,6 +297,7 @@ if pyamf: data = input.readObject() self["data"] = data + def pyamf_class_loader(s): for i in pyamf.CLASS_LOADERS: if i != pyamf_class_loader: @@ -274,8 +306,10 @@ if pyamf: return v return DummyObject + pyamf.register_class_loader(pyamf_class_loader) + class ViewAMF: name = "AMF" prompt = ("amf", "f") @@ -401,8 +435,8 @@ class ViewImage: ) fmt = format_keyvals( clean, - key = "header", - val = "text" + key="header", + val="text" ) return "%s image" % img.format, fmt @@ -468,6 +502,7 @@ class ViewWBXML: except: return None + views = [ ViewAuto(), ViewRaw(), @@ -495,7 +530,6 @@ for i in views: l = content_types_map.setdefault(ct, []) l.append(i) - view_prompts = [i.prompt for i in views] -- cgit v1.2.3 From b62498e125191beca3b49841eb5f1fb9a93a868a Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Fri, 4 Sep 2015 17:33:21 +0200 Subject: remove urwid from contentviews --- libmproxy/contentview.py | 105 ++++++++++++++++++++++------------------------- 1 file changed, 48 insertions(+), 57 deletions(-) diff --git a/libmproxy/contentview.py b/libmproxy/contentview.py index ec3603c7..45c1f2f1 100644 --- a/libmproxy/contentview.py +++ b/libmproxy/contentview.py @@ -53,8 +53,8 @@ def format_dict(d): key += ":" key = key.ljust(max_key_len + 2) yield ( - ("key", key), - ("value", value) + ("header", key), + ("text", value) ) @@ -110,12 +110,16 @@ def trailer(clen, txt, limit): """ -class View: +class View(object): name = None prompt = () content_types = [] def __call__(self, hdrs, content, limit): + """ + Returns: + A (mode name, content generator) tuple. + """ raise NotImplementedError() @@ -132,8 +136,8 @@ class ViewAuto(View): if ct in content_types_map: return content_types_map[ct][0](hdrs, content, limit) elif utils.isXML(content): - return ViewXML(hdrs, content, limit) - return ViewRaw(hdrs, content, limit) + return get("XML")(hdrs, content, limit) + return get("Raw")(hdrs, content, limit) class ViewRaw(View): @@ -217,7 +221,7 @@ class ViewJSON(View): return "JSON", format_text(pretty_json, limit) -class ViewHTML: +class ViewHTML(View): name = "HTML" prompt = ("html", "h") content_types = ["text/html"] @@ -238,7 +242,7 @@ class ViewHTML: return "HTML", format_text(s, limit) -class ViewHTMLOutline: +class ViewHTMLOutline(View): name = "HTML Outline" prompt = ("html outline", "o") content_types = ["text/html"] @@ -249,43 +253,34 @@ class ViewHTMLOutline: h.ignore_images = True h.body_width = 0 content = h.handle(content) - txt = _view_text(content[:limit], len(content), limit) - return "HTML Outline", txt + return "HTML Outline", format_text(content, limit) -class ViewURLEncoded: +class ViewURLEncoded(View): name = "URL-encoded" prompt = ("urlencoded", "u") content_types = ["application/x-www-form-urlencoded"] def __call__(self, hdrs, content, limit): - lines = netlib.utils.urldecode(content) - if lines: - body = format_keyvals( - [(k + ":", v) for (k, v) in lines], - key="header", - val="text" - ) - return "URLEncoded form", body + d = netlib.utils.urldecode(content) + return "URLEncoded form", format_dict(d) -class ViewMultipart: +class ViewMultipart(View): name = "Multipart Form" prompt = ("multipart", "m") content_types = ["multipart/form-data"] + @staticmethod + def _format(v): + yield (("highlight", "Form data:\n")) + for message in format_dict({key:val for key,val in v}): + yield message + def __call__(self, hdrs, content, limit): v = netlib.utils.multipartdecode(hdrs, content) if v: - r = [ - urwid.Text(("highlight", "Form data:\n")), - ] - r.extend(format_keyvals( - v, - key="header", - val="text" - )) - return "Multipart form", r + return "Multipart form", self._format(v) if pyamf: @@ -310,7 +305,7 @@ if pyamf: pyamf.register_class_loader(pyamf_class_loader) - class ViewAMF: + class ViewAMF(View): name = "AMF" prompt = ("amf", "f") content_types = ["application/x-amf"] @@ -337,31 +332,32 @@ if pyamf: else: return b - def __call__(self, hdrs, content, limit): - envelope = remoting.decode(content, strict=False) - if not envelope: - return None - - txt = [] + def _format(self, envelope, limit): for target, message in iter(envelope): if isinstance(message, pyamf.remoting.Request): - txt.append(urwid.Text([ + yield ( ("header", "Request: "), ("text", str(target)), - ])) + ) else: - txt.append(urwid.Text([ + yield ( ("header", "Response: "), ("text", "%s, code %s" % (target, message.status)), - ])) + ) s = json.dumps(self.unpack(message), indent=4) - txt.extend(_view_text(s[:limit], len(s), limit)) + for msg in format_text(s, limit): + yield msg + + def __call__(self, hdrs, content, limit): + envelope = remoting.decode(content, strict=False) + if not envelope: + return None - return "AMF v%s" % envelope.amfVersion, txt + return "AMF v%s" % envelope.amfVersion, self._format(envelope, limit) -class ViewJavaScript: +class ViewJavaScript(View): name = "JavaScript" prompt = ("javascript", "j") content_types = [ @@ -374,10 +370,11 @@ class ViewJavaScript: opts = jsbeautifier.default_options() opts.indent_size = 2 res = jsbeautifier.beautify(content[:limit], opts) - return "JavaScript", _view_text(res, len(res), limit) + cutoff = max(0, len(content) - limit) + return "JavaScript", format_text(res, limit - cutoff) -class ViewCSS: +class ViewCSS(View): name = "CSS" prompt = ("css", "c") content_types = [ @@ -391,10 +388,10 @@ class ViewCSS: else: beautified = content - return "CSS", _view_text(beautified, len(beautified), limit) + return "CSS", format_text(beautified, limit) -class ViewImage: +class ViewImage(View): name = "Image" prompt = ("image", "i") content_types = [ @@ -433,15 +430,11 @@ class ViewImage: clean.append( [netlib.utils.cleanBin(i[0]), netlib.utils.cleanBin(i[1])] ) - fmt = format_keyvals( - clean, - key="header", - val="text" - ) + fmt = format_dict({k:v for k,v in clean}) return "%s image" % img.format, fmt -class ViewProtobuf: +class ViewProtobuf(View): """Human friendly view of protocol buffers The view uses the protoc compiler to decode the binary """ @@ -480,11 +473,10 @@ class ViewProtobuf: def __call__(self, hdrs, content, limit): decoded = self.decode_protobuf(content) - txt = _view_text(decoded[:limit], len(decoded), limit) - return "Protobuf", txt + return "Protobuf", format_text(decoded, limit) -class ViewWBXML: +class ViewWBXML(View): name = "WBXML" prompt = ("wbxml", "w") content_types = [ @@ -497,8 +489,7 @@ class ViewWBXML: try: parser = ASCommandResponse(content) parsedContent = parser.xmlString - txt = _view_text(parsedContent, len(parsedContent), limit) - return "WBXML", txt + return "WBXML", format_text(parsedContent, limit) except: return None -- cgit v1.2.3 From 625a719eb1237556d4aa2ed3e0088634324c0ad9 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Fri, 11 Sep 2015 12:26:52 +0200 Subject: completely remove console from contentviews --- libmproxy/console/flowview.py | 21 ++++++++++++++++--- libmproxy/contentview.py | 48 +++++++++---------------------------------- libmproxy/exceptions.py | 4 ++++ 3 files changed, 32 insertions(+), 41 deletions(-) diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py index e33d4c43..4946ed9c 100644 --- a/libmproxy/console/flowview.py +++ b/libmproxy/console/flowview.py @@ -1,6 +1,7 @@ from __future__ import absolute_import import os import sys +import traceback import urwid from netlib import odict @@ -10,6 +11,7 @@ from . import common, grideditor, signals, searchable, tabs from . import flowdetailview from .. import utils, controller, contentview from ..models import HTTPRequest, HTTPResponse, decoded +from ..exceptions import ContentViewException class SearchError(Exception): @@ -180,16 +182,29 @@ class FlowView(tabs.Tabs): else: limit = contentview.VIEW_CUTOFF description, text_objects = cache.get( - contentview.get_content_view, + self._get_content_view, viewmode, conn.headers, conn.content, limit, - isinstance(conn, HTTPRequest), - signals.add_event + isinstance(conn, HTTPRequest) ) return (description, text_objects) + def _get_content_view(self, viewmode, headers, content, limit, is_request): + try: + return contentview.get_content_view( + viewmode, headers, content, limit, is_request + ) + except ContentViewException: + s = "Content viewer failed: \n" + traceback.format_exc() + signals.add_event(s, "error") + msg, view = contentview.get_content_view( + viewmode, headers, content, limit, is_request + ) + msg = msg.replace("Raw", "Couldn't parse: falling back to Raw") + return msg, view + def viewmode_get(self): override = self.state.get_flow_setting( self.flow, diff --git a/libmproxy/contentview.py b/libmproxy/contentview.py index a9b6cf95..1b41066b 100644 --- a/libmproxy/contentview.py +++ b/libmproxy/contentview.py @@ -3,13 +3,15 @@ import cStringIO import json import logging import subprocess -import traceback import lxml.html import lxml.etree from PIL import Image from PIL.ExifTags import TAGS import html2text +import six +import sys +from libmproxy.exceptions import ContentViewException import netlib.utils from . import utils @@ -79,36 +81,6 @@ def trailer(content, limit): ) -""" -def _view_text(content, total, limit): - "" - Generates a body for a chunk of text. - "" - txt = [] - for i in netlib.utils.cleanBin(content).splitlines(): - txt.append( - urwid.Text(("text", i), wrap="any") - ) - trailer(total, txt, limit) - return txt - - -def trailer(clen, txt, limit): - rem = clen - limit - if rem > 0: - txt.append(urwid.Text("")) - txt.append( - urwid.Text( - [ - ("highlight", "... %s of data not shown. Press " % netlib.utils.pretty_size(rem)), - ("key", "f"), - ("highlight", " to load all data.") - ] - ) - ) -""" - - class View(object): name = None prompt = () @@ -535,7 +507,7 @@ def get(name): return i -def get_content_view(viewmode, headers, content, limit, is_request, log=None): +def get_content_view(viewmode, headers, content, limit, is_request): """ Returns: A (msg, body) tuple. @@ -559,12 +531,12 @@ def get_content_view(viewmode, headers, content, limit, is_request, log=None): try: ret = viewmode(headers, content, limit) # Third-party viewers can fail in unexpected ways... - except Exception: - if log: - s = traceback.format_exc() - s = "Content viewer failed: \n" + s - log(s, "error") - ret = None + except Exception as e: + six.reraise( + ContentViewException, + ContentViewException(str(e)), + sys.exc_info()[2] + ) if not ret: ret = get("Raw")(headers, content, limit) msg.append("Couldn't parse: falling back to Raw") diff --git a/libmproxy/exceptions.py b/libmproxy/exceptions.py index 0e11c136..03ddcb3d 100644 --- a/libmproxy/exceptions.py +++ b/libmproxy/exceptions.py @@ -51,3 +51,7 @@ class InvalidCredentials(HttpException): class ServerException(ProxyException): pass + + +class ContentViewException(ProxyException): + pass -- cgit v1.2.3 From 960f62f3630b48ee3b43ae9289e8e0b33659fe64 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Fri, 11 Sep 2015 13:37:52 +0200 Subject: fix bugs --- libmproxy/console/__init__.py | 4 +- libmproxy/console/flowview.py | 12 ++--- libmproxy/contentview.py | 66 +++++++++++++++------------- libmproxy/contrib/wbxml/ASCommandResponse.py | 5 +-- libmproxy/utils.py | 2 +- test/test_console_contentview.py | 29 ++++++------ 6 files changed, 59 insertions(+), 59 deletions(-) diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py index 2133f97f..b75fa5d8 100644 --- a/libmproxy/console/__init__.py +++ b/libmproxy/console/__init__.py @@ -14,9 +14,9 @@ import traceback import urwid import weakref -from .. import controller, flow, script +from .. import controller, flow, script, contentview from . import flowlist, flowview, help, window, signals, options -from . import grideditor, palettes, contentview, statusbar, palettepicker +from . import grideditor, palettes, statusbar, palettepicker EVENTLOG_SIZE = 500 diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py index 4946ed9c..c0720652 100644 --- a/libmproxy/console/flowview.py +++ b/libmproxy/console/flowview.py @@ -181,7 +181,7 @@ class FlowView(tabs.Tabs): limit = sys.maxsize else: limit = contentview.VIEW_CUTOFF - description, text_objects = cache.get( + return cache.get( self._get_content_view, viewmode, conn.headers, @@ -189,21 +189,21 @@ class FlowView(tabs.Tabs): limit, isinstance(conn, HTTPRequest) ) - return (description, text_objects) def _get_content_view(self, viewmode, headers, content, limit, is_request): try: - return contentview.get_content_view( + description, lines = contentview.get_content_view( viewmode, headers, content, limit, is_request ) except ContentViewException: s = "Content viewer failed: \n" + traceback.format_exc() signals.add_event(s, "error") - msg, view = contentview.get_content_view( + description, lines = contentview.get_content_view( viewmode, headers, content, limit, is_request ) - msg = msg.replace("Raw", "Couldn't parse: falling back to Raw") - return msg, view + description = description.replace("Raw", "Couldn't parse: falling back to Raw") + text_objects = [urwid.Text(l) for l in lines] + return description, text_objects def viewmode_get(self): override = self.state.get_flow_setting( diff --git a/libmproxy/contentview.py b/libmproxy/contentview.py index 1b41066b..219adfb7 100644 --- a/libmproxy/contentview.py +++ b/libmproxy/contentview.py @@ -3,21 +3,23 @@ import cStringIO import json import logging import subprocess +import sys import lxml.html import lxml.etree from PIL import Image + from PIL.ExifTags import TAGS import html2text import six -import sys -from libmproxy.exceptions import ContentViewException +from netlib.odict import ODict +from netlib import encoding import netlib.utils from . import utils +from .exceptions import ContentViewException from .contrib import jsbeautifier from .contrib.wbxml.ASCommandResponse import ASCommandResponse -from netlib import encoding try: import pyamf @@ -53,10 +55,10 @@ def format_dict(d): for key, value in d.items(): key += ":" key = key.ljust(max_key_len + 2) - yield ( + yield [ ("header", key), ("text", value) - ) + ] def format_text(content, limit): @@ -66,7 +68,7 @@ def format_text(content, limit): content = netlib.utils.cleanBin(content) for line in content[:limit].splitlines(): - yield ("text", line) + yield [("text", line)] for msg in trailer(content, limit): yield msg @@ -75,10 +77,9 @@ def format_text(content, limit): def trailer(content, limit): bytes_removed = len(content) - limit if bytes_removed > 0: - yield ( - "cutoff", - "... {} of data not shown.".format(netlib.utils.pretty_size(bytes_removed)) - ) + yield [ + ("cutoff", "... {} of data not shown.".format(netlib.utils.pretty_size(bytes_removed))) + ] class View(object): @@ -89,7 +90,10 @@ class View(object): def __call__(self, hdrs, content, limit): """ Returns: - A (mode name, content generator) tuple. + A (description, content generator) tuple. + + The content generator yields lists of (style, text) tuples. + Iit must not yield tuples of tuples, because urwid cannot process that. """ raise NotImplementedError() @@ -128,11 +132,11 @@ class ViewHex(View): @staticmethod def _format(content, limit): for offset, hexa, s in netlib.utils.hexdump(content[:limit]): - yield ( + yield [ ("offset", offset + " "), ("text", hexa + " "), - ("text", s), - ) + ("text", s) + ] for msg in trailer(content, limit): yield msg @@ -189,7 +193,8 @@ class ViewJSON(View): def __call__(self, hdrs, content, limit): pretty_json = utils.pretty_json(content) - return "JSON", format_text(pretty_json, limit) + if pretty_json: + return "JSON", format_text(pretty_json, limit) class ViewHTML(View): @@ -234,7 +239,7 @@ class ViewURLEncoded(View): def __call__(self, hdrs, content, limit): d = netlib.utils.urldecode(content) - return "URLEncoded form", format_dict(d) + return "URLEncoded form", format_dict(ODict(d)) class ViewMultipart(View): @@ -244,8 +249,8 @@ class ViewMultipart(View): @staticmethod def _format(v): - yield (("highlight", "Form data:\n")) - for message in format_dict({key:val for key,val in v}): + yield [("highlight", "Form data:\n")] + for message in format_dict(ODict(v)): yield message def __call__(self, hdrs, content, limit): @@ -306,15 +311,15 @@ if pyamf: def _format(self, envelope, limit): for target, message in iter(envelope): if isinstance(message, pyamf.remoting.Request): - yield ( + yield [ ("header", "Request: "), ("text", str(target)), - ) + ] else: - yield ( + yield [ ("header", "Response: "), ("text", "%s, code %s" % (target, message.status)), - ) + ] s = json.dumps(self.unpack(message), indent=4) for msg in format_text(s, limit): @@ -322,10 +327,8 @@ if pyamf: def __call__(self, hdrs, content, limit): envelope = remoting.decode(content, strict=False) - if not envelope: - return None - - return "AMF v%s" % envelope.amfVersion, self._format(envelope, limit) + if envelope: + return "AMF v%s" % envelope.amfVersion, self._format(envelope, limit) class ViewJavaScript(View): @@ -401,7 +404,7 @@ class ViewImage(View): clean.append( [netlib.utils.cleanBin(i[0]), netlib.utils.cleanBin(i[1])] ) - fmt = format_dict({k:v for k,v in clean}) + fmt = format_dict(ODict(clean)) return "%s image" % img.format, fmt @@ -460,7 +463,8 @@ class ViewWBXML(View): try: parser = ASCommandResponse(content) parsedContent = parser.xmlString - return "WBXML", format_text(parsedContent, limit) + if parsedContent: + return "WBXML", format_text(parsedContent, limit) except: return None @@ -510,16 +514,16 @@ def get(name): def get_content_view(viewmode, headers, content, limit, is_request): """ Returns: - A (msg, body) tuple. + A (description, content generator) tuple. Raises: ContentViewException, if the content view threw an error. """ if not content: if is_request: - return "No request content (press tab to view response)", "" + return "No request content (press tab to view response)", [] else: - return "No content", "" + return "No content", [] msg = [] enc = headers.get("content-encoding") diff --git a/libmproxy/contrib/wbxml/ASCommandResponse.py b/libmproxy/contrib/wbxml/ASCommandResponse.py index 7bd31409..08d03445 100644 --- a/libmproxy/contrib/wbxml/ASCommandResponse.py +++ b/libmproxy/contrib/wbxml/ASCommandResponse.py @@ -38,10 +38,10 @@ class ASCommandResponse: if ( len(response) > 0): self.xmlString = self.decodeWBXML(self.wbxmlBody) else: - logging.error("Empty WBXML body passed") + raise ValueError("Empty WBXML body passed") except Exception as e: - logging.error("Error: {0}".format(e.message)) self.xmlString = None + raise ValueError("Error: {0}".format(e.message)) def getWBXMLBytes(self): return self.wbxmlBytes @@ -70,4 +70,3 @@ if __name__ == "__main__": logging.info("-"*100) instance = ASCommandResponse(byteWBXML) logging.info(instance.xmlString) - \ No newline at end of file diff --git a/libmproxy/utils.py b/libmproxy/utils.py index a6ca55f7..4b591250 100644 --- a/libmproxy/utils.py +++ b/libmproxy/utils.py @@ -54,7 +54,7 @@ def pretty_json(s): p = json.loads(s) except ValueError: return None - return json.dumps(p, sort_keys=True, indent=4).split("\n") + return json.dumps(p, sort_keys=True, indent=4) def pretty_duration(secs): diff --git a/test/test_console_contentview.py b/test/test_console_contentview.py index d44a3cf4..ec1b4930 100644 --- a/test/test_console_contentview.py +++ b/test/test_console_contentview.py @@ -1,16 +1,12 @@ -import os -from nose.plugins.skip import SkipTest +from libmproxy.exceptions import ContentViewException from netlib.http import Headers -if os.name == "nt": - raise SkipTest("Skipped on Windows.") import sys import netlib.utils from netlib import encoding import libmproxy.contentview as cv -from libmproxy import utils, flow import tutils try: @@ -26,11 +22,11 @@ except: class TestContentView: def test_trailer(self): - txt = [] - cv.trailer(5, txt, 1000) - assert not txt - cv.trailer(cv.VIEW_CUTOFF + 10, txt, cv.VIEW_CUTOFF) - assert txt + txt = "X"*10 + lines = cv.trailer(txt, 1000) + assert not list(lines) + lines = cv.trailer(txt, 5) + assert list(lines) def test_view_auto(self): v = cv.ViewAuto() @@ -124,16 +120,16 @@ class TestContentView: result = v([], 'a', 100) if cssutils: - assert len(result[1]) == 0 + assert len(list(result[1])) == 0 else: - assert len(result[1]) == 1 + assert len(list(result[1])) == 1 result = v([], fixture_1, 100) if cssutils: - assert len(result[1]) > 1 + assert len(list(result[1])) > 1 else: - assert len(result[1]) == 1 + assert len(list(result[1])) == 1 def test_view_hex(self): v = cv.ViewHex() @@ -204,14 +200,15 @@ Larry ) assert "Raw" in r[0] - r = cv.get_content_view( + tutils.raises( + ContentViewException, + cv.get_content_view, cv.get("AMF"), Headers(), "[1, 2", 1000, False ) - assert "Raw" in r[0] r = cv.get_content_view( cv.get("Auto"), -- cgit v1.2.3 From 47602dc1a5949a41535bc562adb83279f33f0b73 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Fri, 11 Sep 2015 13:41:16 +0200 Subject: clean up tests --- test/test_console_common.py | 2 - test/test_console_contentview.py | 266 --------------------------------------- test/test_console_import.py | 5 - test/test_contentview.py | 266 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 266 insertions(+), 273 deletions(-) delete mode 100644 test/test_console_contentview.py delete mode 100644 test/test_console_import.py create mode 100644 test/test_contentview.py diff --git a/test/test_console_common.py b/test/test_console_common.py index 57cbef98..459539c5 100644 --- a/test/test_console_common.py +++ b/test/test_console_common.py @@ -3,10 +3,8 @@ from nose.plugins.skip import SkipTest if os.name == "nt": raise SkipTest("Skipped on Windows.") -from netlib import encoding import libmproxy.console.common as common -from libmproxy import utils, flow import tutils diff --git a/test/test_console_contentview.py b/test/test_console_contentview.py deleted file mode 100644 index ec1b4930..00000000 --- a/test/test_console_contentview.py +++ /dev/null @@ -1,266 +0,0 @@ -from libmproxy.exceptions import ContentViewException -from netlib.http import Headers - -import sys - -import netlib.utils -from netlib import encoding - -import libmproxy.contentview as cv -import tutils - -try: - import pyamf -except ImportError: - pyamf = None - -try: - import cssutils -except: - cssutils = None - - -class TestContentView: - def test_trailer(self): - txt = "X"*10 - lines = cv.trailer(txt, 1000) - assert not list(lines) - lines = cv.trailer(txt, 5) - assert list(lines) - - def test_view_auto(self): - v = cv.ViewAuto() - f = v( - Headers(), - "foo", - 1000 - ) - assert f[0] == "Raw" - - f = v( - Headers(content_type="text/html"), - "", - 1000 - ) - assert f[0] == "HTML" - - f = v( - Headers(content_type="text/flibble"), - "foo", - 1000 - ) - assert f[0] == "Raw" - - f = v( - Headers(content_type="text/flibble"), - "", - 1000 - ) - assert f[0].startswith("XML") - - def test_view_urlencoded(self): - d = netlib.utils.urlencode([("one", "two"), ("three", "four")]) - v = cv.ViewURLEncoded() - assert v([], d, 100) - d = netlib.utils.urlencode([("adsfa", "")]) - v = cv.ViewURLEncoded() - assert v([], d, 100) - - def test_view_html(self): - v = cv.ViewHTML() - s = "


one

" - assert v([], s, 1000) - - s = "gobbledygook" - assert not v([], s, 1000) - - def test_view_html_outline(self): - v = cv.ViewHTMLOutline() - s = "


one

" - assert v([], s, 1000) - - def test_view_json(self): - cv.VIEW_CUTOFF = 100 - v = cv.ViewJSON() - assert v([], "{}", 1000) - assert not v([], "{", 1000) - assert v([], "[" + ",".join(["0"] * cv.VIEW_CUTOFF) + "]", 1000) - assert v([], "[1, 2, 3, 4, 5]", 5) - - def test_view_xml(self): - v = cv.ViewXML() - assert v([], "", 1000) - assert not v([], "", 1000) - s = """ - - - - """ - assert v([], s, 1000) - - def test_view_raw(self): - v = cv.ViewRaw() - assert v([], "foo", 1000) - - def test_view_javascript(self): - v = cv.ViewJavaScript() - assert v([], "[1, 2, 3]", 100) - assert v([], "[1, 2, 3", 100) - assert v([], "function(a){[1, 2, 3]}", 100) - - def test_view_css(self): - v = cv.ViewCSS() - - with open(tutils.test_data.path('data/1.css'), 'r') as fp: - fixture_1 = fp.read() - - result = v([], 'a', 100) - - if cssutils: - assert len(list(result[1])) == 0 - else: - assert len(list(result[1])) == 1 - - result = v([], fixture_1, 100) - - if cssutils: - assert len(list(result[1])) > 1 - else: - assert len(list(result[1])) == 1 - - def test_view_hex(self): - v = cv.ViewHex() - assert v([], "foo", 1000) - - def test_view_image(self): - v = cv.ViewImage() - p = tutils.test_data.path("data/image.png") - assert v([], file(p, "rb").read(), sys.maxsize) - - p = tutils.test_data.path("data/image.gif") - assert v([], file(p, "rb").read(), sys.maxsize) - - p = tutils.test_data.path("data/image-err1.jpg") - assert v([], file(p, "rb").read(), sys.maxsize) - - p = tutils.test_data.path("data/image.ico") - assert v([], file(p, "rb").read(), sys.maxsize) - - assert not v([], "flibble", sys.maxsize) - - def test_view_multipart(self): - view = cv.ViewMultipart() - v = """ ---AaB03x -Content-Disposition: form-data; name="submit-name" - -Larry ---AaB03x - """.strip() - h = Headers(content_type="multipart/form-data; boundary=AaB03x") - assert view(h, v, 1000) - - h = Headers() - assert not view(h, v, 1000) - - h = Headers(content_type="multipart/form-data") - assert not view(h, v, 1000) - - h = Headers(content_type="unparseable") - assert not view(h, v, 1000) - - def test_get_content_view(self): - r = cv.get_content_view( - cv.get("Raw"), - Headers(content_type="application/json"), - "[1, 2, 3]", - 1000, - False - ) - assert "Raw" in r[0] - - r = cv.get_content_view( - cv.get("Auto"), - Headers(content_type="application/json"), - "[1, 2, 3]", - 1000, - False - ) - assert r[0] == "JSON" - - r = cv.get_content_view( - cv.get("Auto"), - Headers(content_type="application/json"), - "[1, 2", - 1000, - False - ) - assert "Raw" in r[0] - - tutils.raises( - ContentViewException, - cv.get_content_view, - cv.get("AMF"), - Headers(), - "[1, 2", - 1000, - False - ) - - r = cv.get_content_view( - cv.get("Auto"), - Headers( - content_type="application/json", - content_encoding="gzip" - ), - encoding.encode('gzip', "[1, 2, 3]"), - 1000, - False - ) - assert "decoded gzip" in r[0] - assert "JSON" in r[0] - - r = cv.get_content_view( - cv.get("XML"), - Headers( - content_type="application/json", - content_encoding="gzip" - ), - encoding.encode('gzip', "[1, 2, 3]"), - 1000, - False - ) - assert "decoded gzip" in r[0] - assert "Raw" in r[0] - - -if pyamf: - def test_view_amf_request(): - v = cv.ViewAMF() - - p = tutils.test_data.path("data/amf01") - assert v([], file(p, "rb").read(), sys.maxsize) - - p = tutils.test_data.path("data/amf02") - assert v([], file(p, "rb").read(), sys.maxsize) - - def test_view_amf_response(): - v = cv.ViewAMF() - p = tutils.test_data.path("data/amf03") - assert v([], file(p, "rb").read(), sys.maxsize) - -if cv.ViewProtobuf.is_available(): - def test_view_protobuf_request(): - v = cv.ViewProtobuf() - - p = tutils.test_data.path("data/protobuf01") - content_type, output = v([], file(p, "rb").read(), sys.maxsize) - assert content_type == "Protobuf" - assert output[0].text == '1: "3bbc333c-e61c-433b-819a-0b9a8cc103b8"' - - -def test_get_by_shortcut(): - assert cv.get_by_shortcut("h") diff --git a/test/test_console_import.py b/test/test_console_import.py deleted file mode 100644 index c99faae8..00000000 --- a/test/test_console_import.py +++ /dev/null @@ -1,5 +0,0 @@ -import libmproxy.contentview as cv - - -def test_pass(): - assert True diff --git a/test/test_contentview.py b/test/test_contentview.py new file mode 100644 index 00000000..ec1b4930 --- /dev/null +++ b/test/test_contentview.py @@ -0,0 +1,266 @@ +from libmproxy.exceptions import ContentViewException +from netlib.http import Headers + +import sys + +import netlib.utils +from netlib import encoding + +import libmproxy.contentview as cv +import tutils + +try: + import pyamf +except ImportError: + pyamf = None + +try: + import cssutils +except: + cssutils = None + + +class TestContentView: + def test_trailer(self): + txt = "X"*10 + lines = cv.trailer(txt, 1000) + assert not list(lines) + lines = cv.trailer(txt, 5) + assert list(lines) + + def test_view_auto(self): + v = cv.ViewAuto() + f = v( + Headers(), + "foo", + 1000 + ) + assert f[0] == "Raw" + + f = v( + Headers(content_type="text/html"), + "", + 1000 + ) + assert f[0] == "HTML" + + f = v( + Headers(content_type="text/flibble"), + "foo", + 1000 + ) + assert f[0] == "Raw" + + f = v( + Headers(content_type="text/flibble"), + "", + 1000 + ) + assert f[0].startswith("XML") + + def test_view_urlencoded(self): + d = netlib.utils.urlencode([("one", "two"), ("three", "four")]) + v = cv.ViewURLEncoded() + assert v([], d, 100) + d = netlib.utils.urlencode([("adsfa", "")]) + v = cv.ViewURLEncoded() + assert v([], d, 100) + + def test_view_html(self): + v = cv.ViewHTML() + s = "


one

" + assert v([], s, 1000) + + s = "gobbledygook" + assert not v([], s, 1000) + + def test_view_html_outline(self): + v = cv.ViewHTMLOutline() + s = "


one

" + assert v([], s, 1000) + + def test_view_json(self): + cv.VIEW_CUTOFF = 100 + v = cv.ViewJSON() + assert v([], "{}", 1000) + assert not v([], "{", 1000) + assert v([], "[" + ",".join(["0"] * cv.VIEW_CUTOFF) + "]", 1000) + assert v([], "[1, 2, 3, 4, 5]", 5) + + def test_view_xml(self): + v = cv.ViewXML() + assert v([], "", 1000) + assert not v([], "", 1000) + s = """ + + + + """ + assert v([], s, 1000) + + def test_view_raw(self): + v = cv.ViewRaw() + assert v([], "foo", 1000) + + def test_view_javascript(self): + v = cv.ViewJavaScript() + assert v([], "[1, 2, 3]", 100) + assert v([], "[1, 2, 3", 100) + assert v([], "function(a){[1, 2, 3]}", 100) + + def test_view_css(self): + v = cv.ViewCSS() + + with open(tutils.test_data.path('data/1.css'), 'r') as fp: + fixture_1 = fp.read() + + result = v([], 'a', 100) + + if cssutils: + assert len(list(result[1])) == 0 + else: + assert len(list(result[1])) == 1 + + result = v([], fixture_1, 100) + + if cssutils: + assert len(list(result[1])) > 1 + else: + assert len(list(result[1])) == 1 + + def test_view_hex(self): + v = cv.ViewHex() + assert v([], "foo", 1000) + + def test_view_image(self): + v = cv.ViewImage() + p = tutils.test_data.path("data/image.png") + assert v([], file(p, "rb").read(), sys.maxsize) + + p = tutils.test_data.path("data/image.gif") + assert v([], file(p, "rb").read(), sys.maxsize) + + p = tutils.test_data.path("data/image-err1.jpg") + assert v([], file(p, "rb").read(), sys.maxsize) + + p = tutils.test_data.path("data/image.ico") + assert v([], file(p, "rb").read(), sys.maxsize) + + assert not v([], "flibble", sys.maxsize) + + def test_view_multipart(self): + view = cv.ViewMultipart() + v = """ +--AaB03x +Content-Disposition: form-data; name="submit-name" + +Larry +--AaB03x + """.strip() + h = Headers(content_type="multipart/form-data; boundary=AaB03x") + assert view(h, v, 1000) + + h = Headers() + assert not view(h, v, 1000) + + h = Headers(content_type="multipart/form-data") + assert not view(h, v, 1000) + + h = Headers(content_type="unparseable") + assert not view(h, v, 1000) + + def test_get_content_view(self): + r = cv.get_content_view( + cv.get("Raw"), + Headers(content_type="application/json"), + "[1, 2, 3]", + 1000, + False + ) + assert "Raw" in r[0] + + r = cv.get_content_view( + cv.get("Auto"), + Headers(content_type="application/json"), + "[1, 2, 3]", + 1000, + False + ) + assert r[0] == "JSON" + + r = cv.get_content_view( + cv.get("Auto"), + Headers(content_type="application/json"), + "[1, 2", + 1000, + False + ) + assert "Raw" in r[0] + + tutils.raises( + ContentViewException, + cv.get_content_view, + cv.get("AMF"), + Headers(), + "[1, 2", + 1000, + False + ) + + r = cv.get_content_view( + cv.get("Auto"), + Headers( + content_type="application/json", + content_encoding="gzip" + ), + encoding.encode('gzip', "[1, 2, 3]"), + 1000, + False + ) + assert "decoded gzip" in r[0] + assert "JSON" in r[0] + + r = cv.get_content_view( + cv.get("XML"), + Headers( + content_type="application/json", + content_encoding="gzip" + ), + encoding.encode('gzip', "[1, 2, 3]"), + 1000, + False + ) + assert "decoded gzip" in r[0] + assert "Raw" in r[0] + + +if pyamf: + def test_view_amf_request(): + v = cv.ViewAMF() + + p = tutils.test_data.path("data/amf01") + assert v([], file(p, "rb").read(), sys.maxsize) + + p = tutils.test_data.path("data/amf02") + assert v([], file(p, "rb").read(), sys.maxsize) + + def test_view_amf_response(): + v = cv.ViewAMF() + p = tutils.test_data.path("data/amf03") + assert v([], file(p, "rb").read(), sys.maxsize) + +if cv.ViewProtobuf.is_available(): + def test_view_protobuf_request(): + v = cv.ViewProtobuf() + + p = tutils.test_data.path("data/protobuf01") + content_type, output = v([], file(p, "rb").read(), sys.maxsize) + assert content_type == "Protobuf" + assert output[0].text == '1: "3bbc333c-e61c-433b-819a-0b9a8cc103b8"' + + +def test_get_by_shortcut(): + assert cv.get_by_shortcut("h") -- cgit v1.2.3