path: root/mitmproxy/contentviews
diff options
authorMaximilian Hils <git@maximilianhils.com>2016-12-09 19:02:55 +0100
committerMaximilian Hils <git@maximilianhils.com>2016-12-09 19:02:55 +0100
commitf53f079f917603a37fa92718e22af1c1c25988fa (patch)
tree36c78e49c63c65a44b180c71861a341d990a39aa /mitmproxy/contentviews
parentd1c72574d5f0e83de9bdfa7c921134052b74ae44 (diff)
split contentviews.py into mitmproxy.contentviews
Diffstat (limited to 'mitmproxy/contentviews')
16 files changed, 626 insertions, 0 deletions
diff --git a/mitmproxy/contentviews/__init__.py b/mitmproxy/contentviews/__init__.py
new file mode 100644
index 00000000..4c3cb99b
--- /dev/null
+++ b/mitmproxy/contentviews/__init__.py
@@ -0,0 +1,177 @@
+Mitmproxy Content Views
+mitmproxy includes a set of content views which can be used to
+format/decode/highlight data. While they are currently used for HTTP message
+bodies only, the may be used in other contexts in the future, e.g. to decode
+protobuf messages sent as WebSocket frames.
+Thus, the View API is very minimalistic. The only arguments are `data` and
+`**metadata`, where `data` is the actual content (as bytes). The contents on
+metadata depend on the protocol in use. For HTTP, the message headers are
+passed as the ``headers`` keyword argument. For HTTP requests, the query
+parameters are passed as the ``query`` keyword argument.
+import traceback
+from mitmproxy import exceptions
+from mitmproxy.net import http
+from mitmproxy.utils import strutils
+from .base import VIEW_CUTOFF, KEY_MAX
+views = []
+content_types_map = {}
+view_prompts = []
+def get(name):
+ for i in views:
+ if i.name.lower() == name.lower():
+ return i
+def get_by_shortcut(c):
+ for i in views:
+ if i.prompt[1] == c:
+ return i
+def add(view):
+ # TODO: auto-select a different name (append an integer?)
+ for i in views:
+ if i.name == view.name:
+ raise exceptions.ContentViewException("Duplicate view: " + view.name)
+ # TODO: the UI should auto-prompt for a replacement shortcut
+ for prompt in view_prompts:
+ if prompt[1] == view.prompt[1]:
+ raise exceptions.ContentViewException("Duplicate view shortcut: " + view.prompt[1])
+ views.append(view)
+ for ct in view.content_types:
+ l = content_types_map.setdefault(ct, [])
+ l.append(view)
+ view_prompts.append(view.prompt)
+def remove(view):
+ for ct in view.content_types:
+ l = content_types_map.setdefault(ct, [])
+ l.remove(view)
+ if not len(l):
+ del content_types_map[ct]
+ view_prompts.remove(view.prompt)
+ views.remove(view)
+def safe_to_print(lines, encoding="utf8"):
+ """
+ Wraps a content generator so that each text portion is a *safe to print* unicode string.
+ """
+ for line in lines:
+ clean_line = []
+ for (style, text) in line:
+ if isinstance(text, bytes):
+ text = text.decode(encoding, "replace")
+ text = strutils.escape_control_characters(text)
+ clean_line.append((style, text))
+ yield clean_line
+def get_message_content_view(viewname, message):
+ """
+ Like get_content_view, but also handles message encoding.
+ """
+ viewmode = get(viewname)
+ if not viewmode:
+ viewmode = get("auto")
+ try:
+ content = message.content
+ except ValueError:
+ content = message.raw_content
+ enc = "[cannot decode]"
+ else:
+ if isinstance(message, http.Message) and content != message.raw_content:
+ enc = "[decoded {}]".format(
+ message.headers.get("content-encoding")
+ )
+ else:
+ enc = None
+ if content is None:
+ return "", iter([[("error", "content missing")]]), None
+ metadata = {}
+ if isinstance(message, http.Request):
+ metadata["query"] = message.query
+ if isinstance(message, http.Message):
+ metadata["headers"] = message.headers
+ description, lines, error = get_content_view(
+ viewmode, content, **metadata
+ )
+ if enc:
+ description = "{} {}".format(enc, description)
+ return description, lines, error
+def get_content_view(viewmode, data, **metadata):
+ """
+ Args:
+ viewmode: the view to use.
+ data, **metadata: arguments passed to View instance.
+ Returns:
+ A (description, content generator, error) tuple.
+ If the content view raised an exception generating the view,
+ the exception is returned in error and the flow is formatted in raw mode.
+ In contrast to calling the views directly, text is always safe-to-print unicode.
+ """
+ try:
+ ret = viewmode(data, **metadata)
+ if ret is None:
+ ret = "Couldn't parse: falling back to Raw", get("Raw")(data, **metadata)[1]
+ desc, content = ret
+ error = None
+ # Third-party viewers can fail in unexpected ways...
+ except Exception:
+ desc = "Couldn't parse: falling back to Raw"
+ _, content = get("Raw")(data, **metadata)
+ error = "{} Content viewer failed: \n{}".format(
+ getattr(viewmode, "name"),
+ traceback.format_exc()
+ )
+ return desc, safe_to_print(content), error
+from . import (
+ auto, raw, hex, json, xml, wbxml, html, javascript, css,
+ urlencoded, multipart, image, query, protobuf
+if protobuf.ViewProtobuf.is_available():
+ add(protobuf.ViewProtobuf()) \ No newline at end of file
diff --git a/mitmproxy/contentviews/auto.py b/mitmproxy/contentviews/auto.py
new file mode 100644
index 00000000..2b08f165
--- /dev/null
+++ b/mitmproxy/contentviews/auto.py
@@ -0,0 +1,27 @@
+from mitmproxy.net import http
+from mitmproxy.utils import strutils
+from . import base
+from mitmproxy.contentviews import get, content_types_map
+class ViewAuto(base.View):
+ name = "Auto"
+ prompt = ("auto", "a")
+ content_types = []
+ def __call__(self, data, **metadata):
+ headers = metadata.get("headers", {})
+ ctype = headers.get("content-type")
+ if data and ctype:
+ ct = http.parse_content_type(ctype) if ctype else None
+ ct = "%s/%s" % (ct[0], ct[1])
+ if ct in content_types_map:
+ return content_types_map[ct][0](data, **metadata)
+ elif strutils.is_xml(data):
+ return get("XML")(data, **metadata)
+ if metadata.get("query"):
+ return get("Query")(data, **metadata)
+ if data and strutils.is_mostly_bin(data):
+ return get("Hex")(data)
+ if not data:
+ return "No content", []
+ return get("Raw")(data)
diff --git a/mitmproxy/contentviews/base.py b/mitmproxy/contentviews/base.py
new file mode 100644
index 00000000..b1a51ffe
--- /dev/null
+++ b/mitmproxy/contentviews/base.py
@@ -0,0 +1,66 @@
+# Default view cutoff *in lines*
+from typing import Iterable
+from typing import Mapping
+from typing import Tuple
+from typing import Union
+KEY_MAX = 30
+class View:
+ name = None
+ prompt = ()
+ content_types = []
+ def __call__(self, data: bytes, **metadata):
+ """
+ Transform raw data into human-readable output.
+ Args:
+ data: the data to decode/format.
+ metadata: optional keyword-only arguments for metadata. Implementations must not
+ rely on a given argument being present.
+ Returns:
+ A (description, content generator) tuple.
+ The content generator yields lists of (style, text) tuples, where each list represents
+ a single line. ``text`` is a unfiltered byte string which may need to be escaped,
+ depending on the used output.
+ Caveats:
+ The content generator must not yield tuples of tuples,
+ because urwid cannot process that. You have to yield a *list* of tuples per line.
+ """
+ raise NotImplementedError()
+def format_dict(
+ d: Mapping[Union[str, bytes], Union[str, bytes]]
+) -> Iterable[Tuple[Union[str, bytes], Union[str, bytes]]]:
+ """
+ Helper function that transforms the given dictionary into a list of
+ ("key", key )
+ ("value", value)
+ tuples, where key is padded to a uniform width.
+ """
+ max_key_len = max(len(k) for k in d.keys())
+ max_key_len = min(max_key_len, KEY_MAX)
+ for key, value in d.items():
+ key += b":" if isinstance(key, bytes) else u":"
+ key = key.ljust(max_key_len + 2)
+ yield [
+ ("header", key),
+ ("text", value)
+ ]
+def format_text(text):
+ """
+ Helper function that transforms bytes into the view output format.
+ """
+ for line in text.splitlines():
+ yield [("text", line)]
diff --git a/mitmproxy/contentviews/css.py b/mitmproxy/contentviews/css.py
new file mode 100644
index 00000000..353a3257
--- /dev/null
+++ b/mitmproxy/contentviews/css.py
@@ -0,0 +1,25 @@
+import logging
+import cssutils
+from . import base
+class ViewCSS(base.View):
+ name = "CSS"
+ prompt = ("css", "c")
+ content_types = [
+ "text/css"
+ ]
+ def __call__(self, data, **metadata):
+ cssutils.log.setLevel(logging.CRITICAL)
+ cssutils.ser.prefs.keepComments = True
+ cssutils.ser.prefs.omitLastSemicolon = False
+ cssutils.ser.prefs.indentClosingBrace = False
+ cssutils.ser.prefs.validOnly = False
+ sheet = cssutils.parseString(data)
+ beautified = sheet.cssText
+ return "CSS", base.format_text(beautified)
diff --git a/mitmproxy/contentviews/hex.py b/mitmproxy/contentviews/hex.py
new file mode 100644
index 00000000..116ed600
--- /dev/null
+++ b/mitmproxy/contentviews/hex.py
@@ -0,0 +1,20 @@
+from mitmproxy.utils import strutils
+from . import base
+class ViewHex(base.View):
+ name = "Hex"
+ prompt = ("hex", "e")
+ content_types = []
+ @staticmethod
+ def _format(data):
+ for offset, hexa, s in strutils.hexdump(data):
+ yield [
+ ("offset", offset + " "),
+ ("text", hexa + " "),
+ ("text", s)
+ ]
+ def __call__(self, data, **metadata):
+ return "Hex", self._format(data)
diff --git a/mitmproxy/contentviews/html.py b/mitmproxy/contentviews/html.py
new file mode 100644
index 00000000..c625beef
--- /dev/null
+++ b/mitmproxy/contentviews/html.py
@@ -0,0 +1,42 @@
+import html2text
+import lxml.etree
+import lxml.html
+from mitmproxy.contentviews.base import View, format_text
+from mitmproxy.utils import strutils
+class ViewHTML(View):
+ name = "HTML"
+ prompt = ("html", "h")
+ content_types = ["text/html"]
+ def __call__(self, data, **metadata):
+ if strutils.is_xml(data):
+ parser = lxml.etree.HTMLParser(
+ strip_cdata=True,
+ remove_blank_text=True
+ )
+ d = lxml.html.fromstring(data, parser=parser)
+ docinfo = d.getroottree().docinfo
+ s = lxml.etree.tostring(
+ d,
+ pretty_print=True,
+ doctype=docinfo.doctype,
+ encoding='utf8'
+ )
+ return "HTML", format_text(s)
+class ViewHTMLOutline(View):
+ name = "HTML Outline"
+ prompt = ("html outline", "o")
+ content_types = ["text/html"]
+ def __call__(self, data, **metadata):
+ data = data.decode("utf-8", "replace")
+ h = html2text.HTML2Text(baseurl="")
+ h.ignore_images = True
+ h.body_width = 0
+ outline = h.handle(data)
+ return "HTML Outline", format_text(outline)
diff --git a/mitmproxy/contentviews/image.py b/mitmproxy/contentviews/image.py
new file mode 100644
index 00000000..57b1fffb
--- /dev/null
+++ b/mitmproxy/contentviews/image.py
@@ -0,0 +1,45 @@
+import io
+from PIL import ExifTags
+from PIL import Image
+from mitmproxy.types import multidict
+from . import base
+class ViewImage(base.View):
+ name = "Image"
+ prompt = ("image", "i")
+ content_types = [
+ "image/png",
+ "image/jpeg",
+ "image/gif",
+ "image/vnd.microsoft.icon",
+ "image/x-icon",
+ ]
+ def __call__(self, data, **metadata):
+ try:
+ img = Image.open(io.BytesIO(data))
+ except IOError:
+ return None
+ parts = [
+ ("Format", str(img.format_description)),
+ ("Size", "%s x %s px" % img.size),
+ ("Mode", str(img.mode)),
+ ]
+ for i in sorted(img.info.keys()):
+ if i != "exif":
+ parts.append(
+ (str(i), str(img.info[i]))
+ )
+ if hasattr(img, "_getexif"):
+ ex = img._getexif()
+ if ex:
+ for i in sorted(ex.keys()):
+ tag = ExifTags.TAGS.get(i, i)
+ parts.append(
+ (str(tag), str(ex[i]))
+ )
+ fmt = base.format_dict(multidict.MultiDict(parts))
+ return "%s image" % img.format, fmt
diff --git a/mitmproxy/contentviews/javascript.py b/mitmproxy/contentviews/javascript.py
new file mode 100644
index 00000000..c2fab875
--- /dev/null
+++ b/mitmproxy/contentviews/javascript.py
@@ -0,0 +1,20 @@
+import jsbeautifier
+from . import base
+class ViewJavaScript(base.View):
+ name = "JavaScript"
+ prompt = ("javascript", "j")
+ content_types = [
+ "application/x-javascript",
+ "application/javascript",
+ "text/javascript"
+ ]
+ def __call__(self, data, **metadata):
+ opts = jsbeautifier.default_options()
+ opts.indent_size = 2
+ data = data.decode("utf-8", "replace")
+ res = jsbeautifier.beautify(data, opts)
+ return "JavaScript", base.format_text(res)
diff --git a/mitmproxy/contentviews/json.py b/mitmproxy/contentviews/json.py
new file mode 100644
index 00000000..7c128d02
--- /dev/null
+++ b/mitmproxy/contentviews/json.py
@@ -0,0 +1,32 @@
+import json
+from typing import Optional
+from mitmproxy.contentviews.base import format_text, View
+def pretty_json(s: bytes) -> Optional[bytes]:
+ try:
+ p = json.loads(s.decode('utf-8'))
+ except ValueError:
+ return None
+ pretty = json.dumps(p, sort_keys=True, indent=4, ensure_ascii=False)
+ if isinstance(pretty, str):
+ # json.dumps _may_ decide to return unicode, if the JSON object is not ascii.
+ # From limited testing this is always valid utf8 (otherwise json.loads will fail earlier),
+ # so we can just re-encode it here.
+ return pretty.encode("utf8", "strict")
+ return pretty
+class ViewJSON(View):
+ name = "JSON"
+ prompt = ("json", "s")
+ content_types = [
+ "application/json",
+ "application/vnd.api+json"
+ ]
+ def __call__(self, data, **metadata):
+ pj = pretty_json(data)
+ if pj:
+ return "JSON", format_text(pj)
diff --git a/mitmproxy/contentviews/multipart.py b/mitmproxy/contentviews/multipart.py
new file mode 100644
index 00000000..640896ab
--- /dev/null
+++ b/mitmproxy/contentviews/multipart.py
@@ -0,0 +1,20 @@
+from mitmproxy.net import http
+from mitmproxy.types import multidict
+from . import base
+class ViewMultipart(base.View):
+ name = "Multipart Form"
+ prompt = ("multipart", "m")
+ content_types = ["multipart/form-data"]
+ @staticmethod
+ def _format(v):
+ yield [("highlight", "Form data:\n")]
+ for message in base.format_dict(multidict.MultiDict(v)):
+ yield message
+ def __call__(self, data, **metadata):
+ headers = metadata.get("headers", {})
+ v = http.multipart.decode(headers, data)
+ if v:
+ return "Multipart form", self._format(v)
diff --git a/mitmproxy/contentviews/protobuf.py b/mitmproxy/contentviews/protobuf.py
new file mode 100644
index 00000000..620d9444
--- /dev/null
+++ b/mitmproxy/contentviews/protobuf.py
@@ -0,0 +1,45 @@
+import subprocess
+from . import base
+class ViewProtobuf(base.View):
+ """Human friendly view of protocol buffers
+ The view uses the protoc compiler to decode the binary
+ """
+ name = "Protocol Buffer"
+ prompt = ("protobuf", "p")
+ content_types = [
+ "application/x-protobuf",
+ "application/x-protobuffer",
+ ]
+ @staticmethod
+ def is_available():
+ try:
+ p = subprocess.Popen(
+ ["protoc", "--version"],
+ stdout=subprocess.PIPE
+ )
+ out, _ = p.communicate()
+ return out.startswith("libprotoc")
+ except:
+ return False
+ def decode_protobuf(self, content):
+ # if Popen raises OSError, it will be caught in
+ # get_content_view and fall back to Raw
+ p = subprocess.Popen(['protoc', '--decode_raw'],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ out, err = p.communicate(input=content)
+ if out:
+ return out
+ else:
+ return err
+ def __call__(self, data, **metadata):
+ decoded = self.decode_protobuf(data)
+ return "Protobuf", base.format_text(decoded)
diff --git a/mitmproxy/contentviews/query.py b/mitmproxy/contentviews/query.py
new file mode 100644
index 00000000..c4ce0faf
--- /dev/null
+++ b/mitmproxy/contentviews/query.py
@@ -0,0 +1,14 @@
+from . import base
+class ViewQuery(base.View):
+ name = "Query"
+ prompt = ("query", "q")
+ content_types = []
+ def __call__(self, data, **metadata):
+ query = metadata.get("query")
+ if query:
+ return "Query", base.format_dict(query)
+ else:
+ return "Query", base.format_text("")
diff --git a/mitmproxy/contentviews/raw.py b/mitmproxy/contentviews/raw.py
new file mode 100644
index 00000000..c504a461
--- /dev/null
+++ b/mitmproxy/contentviews/raw.py
@@ -0,0 +1,11 @@
+from mitmproxy.utils import strutils
+from . import base
+class ViewRaw(base.View):
+ name = "Raw"
+ prompt = ("raw", "r")
+ content_types = []
+ def __call__(self, data, **metadata):
+ return "Raw", base.format_text(strutils.bytes_to_escaped_str(data, True))
diff --git a/mitmproxy/contentviews/urlencoded.py b/mitmproxy/contentviews/urlencoded.py
new file mode 100644
index 00000000..79fe9c1c
--- /dev/null
+++ b/mitmproxy/contentviews/urlencoded.py
@@ -0,0 +1,17 @@
+from mitmproxy.net.http import url
+from mitmproxy.types import multidict
+from . import base
+class ViewURLEncoded(base.View):
+ name = "URL-encoded"
+ prompt = ("urlencoded", "u")
+ content_types = ["application/x-www-form-urlencoded"]
+ def __call__(self, data, **metadata):
+ try:
+ data = data.decode("ascii", "strict")
+ except ValueError:
+ return None
+ d = url.decode(data)
+ return "URLEncoded form", base.format_dict(multidict.MultiDict(d))
diff --git a/mitmproxy/contentviews/wbxml.py b/mitmproxy/contentviews/wbxml.py
new file mode 100644
index 00000000..d626e188
--- /dev/null
+++ b/mitmproxy/contentviews/wbxml.py
@@ -0,0 +1,20 @@
+from mitmproxy.contrib.wbxml import ASCommandResponse
+from . import base
+class ViewWBXML(base.View):
+ name = "WBXML"
+ prompt = ("wbxml", "w")
+ content_types = [
+ "application/vnd.wap.wbxml",
+ "application/vnd.ms-sync.wbxml"
+ ]
+ def __call__(self, data, **metadata):
+ try:
+ parser = ASCommandResponse.ASCommandResponse(data)
+ parsedContent = parser.xmlString
+ if parsedContent:
+ return "WBXML", base.format_text(parsedContent)
+ except:
+ return None
diff --git a/mitmproxy/contentviews/xml.py b/mitmproxy/contentviews/xml.py
new file mode 100644
index 00000000..a382b09d
--- /dev/null
+++ b/mitmproxy/contentviews/xml.py
@@ -0,0 +1,45 @@
+import lxml.etree
+from . import base
+class ViewXML(base.View):
+ name = "XML"
+ prompt = ("xml", "x")
+ content_types = ["text/xml"]
+ def __call__(self, data, **metadata):
+ parser = lxml.etree.XMLParser(
+ remove_blank_text=True,
+ resolve_entities=False,
+ strip_cdata=False,
+ recover=False
+ )
+ try:
+ document = lxml.etree.fromstring(data, parser)
+ except lxml.etree.XMLSyntaxError:
+ return None
+ docinfo = document.getroottree().docinfo
+ prev = []
+ p = document.getroottree().getroot().getprevious()
+ while p is not None:
+ prev.insert(
+ 0,
+ lxml.etree.tostring(p)
+ )
+ p = p.getprevious()
+ doctype = docinfo.doctype
+ if prev:
+ doctype += "\n".join(p.decode() for p in prev).strip()
+ doctype = doctype.strip()
+ s = lxml.etree.tostring(
+ document,
+ pretty_print=True,
+ xml_declaration=True,
+ doctype=doctype or None,
+ encoding=docinfo.encoding
+ )
+ return "XML-like data", base.format_text(s)