aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libmproxy/console/contentview.py166
-rw-r--r--libmproxy/console/flowview.py8
-rw-r--r--libmproxy/flow.py6
-rw-r--r--test/test_console_contentview.py135
-rw-r--r--test/test_flow.py5
5 files changed, 235 insertions, 85 deletions
diff --git a/libmproxy/console/contentview.py b/libmproxy/console/contentview.py
index 446eb350..c6e43a27 100644
--- a/libmproxy/console/contentview.py
+++ b/libmproxy/console/contentview.py
@@ -1,6 +1,6 @@
import urwid
import common
-from .. import utils, encoding
+from .. import utils, encoding, flow
VIEW_CUTOFF = 1024*100
@@ -20,19 +20,18 @@ VIEW_CONTENT_PRETTY_TYPE_XML = 2
VIEW_CONTENT_PRETTY_TYPE_URLENCODED = 3
CONTENT_PRETTY_NAMES = {
- VIEW_CONTENT_PRETTY_TYPE_JSON: "json",
- VIEW_CONTENT_PRETTY_TYPE_XML: "xmlish",
- VIEW_CONTENT_PRETTY_TYPE_URLENCODED: "urlencoded"
+ VIEW_CONTENT_PRETTY_TYPE_JSON: "JSON",
+ VIEW_CONTENT_PRETTY_TYPE_XML: "XML",
+ VIEW_CONTENT_PRETTY_TYPE_URLENCODED: "URL-encoded"
}
-CONTENT_PRETTY_TYPES = {
+CONTENT_TYPES_MAP = {
"text/html": VIEW_CONTENT_PRETTY_TYPE_XML,
"application/json": VIEW_CONTENT_PRETTY_TYPE_JSON,
"text/xml": VIEW_CONTENT_PRETTY_TYPE_XML,
"multipart/form-data": VIEW_CONTENT_PRETTY_TYPE_URLENCODED
}
-
def trailer(clen, txt):
rem = clen - VIEW_CUTOFF
if rem > 0:
@@ -45,16 +44,18 @@ def trailer(clen, txt):
)
)
-def view_flow_raw(content):
+
+def view_raw(hdrs, content):
txt = []
for i in utils.cleanBin(content[:VIEW_CUTOFF]).splitlines():
txt.append(
urwid.Text(("text", i))
)
trailer(len(content), txt)
- return txt
+ return "Raw", txt
+
-def view_flow_binary(content):
+def view_hex(hdrs, content):
txt = []
for offset, hexa, s in utils.hexdump(content[:VIEW_CUTOFF]):
txt.append(urwid.Text([
@@ -65,31 +66,37 @@ def view_flow_binary(content):
("text", s),
]))
trailer(len(content), txt)
- return txt
+ return "HEX", txt
-def view_flow_xmlish(content):
+
+def view_xmlish(hdrs, content):
txt = []
for i in utils.pretty_xmlish(content[:VIEW_CUTOFF]):
txt.append(
urwid.Text(("text", i)),
)
trailer(len(content), txt)
- return txt
+ return "XML-like data", txt
+
+
+def view_json(hdrs, content):
+ lines = utils.pretty_json(content)
+ if lines:
+ txt = []
+ sofar = 0
+ for i in lines:
+ sofar += len(i)
+ txt.append(
+ urwid.Text(("text", i)),
+ )
+ if sofar > VIEW_CUTOFF:
+ break
+ trailer(sum(len(i) for i in lines), txt)
+ return "JSON", txt
-def view_flow_json(lines):
- txt = []
- sofar = 0
- for i in lines:
- sofar += len(i)
- txt.append(
- urwid.Text(("text", i)),
- )
- if sofar > VIEW_CUTOFF:
- break
- trailer(sum(len(i) for i in lines), txt)
- return txt
-def view_flow_formdata(content, boundary):
+# FIXME
+def view_formdata(content, boundary):
rx = re.compile(r'\bname="([^"]+)"')
keys = []
vals = []
@@ -113,65 +120,66 @@ def view_flow_formdata(content, boundary):
))
return r
-def view_flow_urlencoded(lines):
- return common.format_keyvals(
- [(k+":", v) for (k, v) in lines],
- key = "header",
- val = "text"
- )
-
-def find_pretty_view(content, hdrItems, pretty_type=VIEW_CONTENT_PRETTY_TYPE_AUTO):
- ctype = None
- if pretty_type == VIEW_CONTENT_PRETTY_TYPE_AUTO:
- pretty_type == None
- for i in hdrItems:
- if i[0].lower() == "content-type":
- ctype = i[1]
- break
- ct = utils.parse_content_type(ctype) if ctype else None
- if ct:
- pretty_type = CONTENT_PRETTY_TYPES.get("%s/%s"%(ct[0], ct[1]))
- if not pretty_type and utils.isXML(content):
- pretty_type = VIEW_CONTENT_PRETTY_TYPE_XML
-
- if pretty_type == VIEW_CONTENT_PRETTY_TYPE_URLENCODED:
- data = utils.urldecode(content)
- if data:
- return "URLEncoded form", view_flow_urlencoded(data)
-
- if pretty_type == VIEW_CONTENT_PRETTY_TYPE_XML:
- return "Indented XML-ish", view_flow_xmlish(content)
- if pretty_type == VIEW_CONTENT_PRETTY_TYPE_JSON:
- lines = utils.pretty_json(content)
- if lines:
- return "JSON", view_flow_json(lines)
+def view_urlencoded(hdrs, content):
+ lines = utils.urldecode(content)
+ if lines:
+ body = common.format_keyvals(
+ [(k+":", v) for (k, v) in lines],
+ key = "header",
+ val = "text"
+ )
+ return "URLEncoded form", body
- return "Falling back to raw.", view_flow_raw(content)
+PRETTY_FUNCTION_MAP = {
+ VIEW_CONTENT_PRETTY_TYPE_XML: view_xmlish,
+ VIEW_CONTENT_PRETTY_TYPE_JSON: view_json,
+ VIEW_CONTENT_PRETTY_TYPE_URLENCODED: view_urlencoded
+}
-def get_content_view(viewmode, pretty_type, enc, content, hdrItems):
+def get_view_func(viewmode, pretty_type, hdrs, content):
"""
- Returns a (msg, body) tuple.
+ Returns a function object.
"""
- msg = ""
if viewmode == VIEW_CONTENT_HEX:
- body = view_flow_binary(content)
- elif viewmode == VIEW_CONTENT_PRETTY:
- emsg = ""
- if enc:
- decoded = encoding.decode(enc, content)
- if decoded:
- content = decoded
- if enc and enc != "identity":
- emsg = "[decoded %s]"%enc
- msg, body = find_pretty_view(content, hdrItems, pretty_type)
- if pretty_type != VIEW_CONTENT_PRETTY_TYPE_AUTO:
- emsg += " (forced to %s)"%(CONTENT_PRETTY_NAMES[pretty_type])
- if emsg:
- msg = emsg + " " + msg
+ return view_hex
+ elif viewmode == VIEW_CONTENT_RAW:
+ return view_raw
else:
- body = view_flow_raw(content)
- return msg, body
-
-
+ if pretty_type == VIEW_CONTENT_PRETTY_TYPE_AUTO:
+ ctype = hdrs.get("content-type")
+ if ctype:
+ ctype = ctype[0]
+ ct = utils.parse_content_type(ctype) if ctype else None
+ if ct:
+ pretty_type = CONTENT_TYPES_MAP.get("%s/%s"%(ct[0], ct[1]))
+ if not pretty_type and utils.isXML(content):
+ pretty_type = VIEW_CONTENT_PRETTY_TYPE_XML
+ return PRETTY_FUNCTION_MAP.get(pretty_type, view_raw)
+
+
+def get_content_view(viewmode, pretty_type, hdrItems, content):
+ """
+ Returns a (msg, body) tuple.
+ """
+ msg = []
+
+ hdrs = flow.ODictCaseless([list(i) for i in hdrItems])
+
+ enc = hdrs.get("content-encoding")
+ if enc and enc[0] != "identity":
+ decoded = encoding.decode(enc[0], content)
+ if decoded:
+ content = decoded
+ msg.append("[decoded %s]"%enc[0])
+
+ if viewmode == VIEW_CONTENT_PRETTY and pretty_type != VIEW_CONTENT_PRETTY_TYPE_AUTO:
+ msg.append("[forced to %s]"%(CONTENT_PRETTY_NAMES[pretty_type]))
+ func = get_view_func(viewmode, pretty_type, hdrs, content)
+
+ ret = func(hdrs, content)
+ if not ret:
+ ret = view_raw(hdrs, content)
+ msg.append(ret[0])
+ return " ".join(msg), ret[1]
diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py
index 95fc6d67..3a6f02d5 100644
--- a/libmproxy/console/flowview.py
+++ b/libmproxy/console/flowview.py
@@ -116,15 +116,14 @@ class FlowView(common.WWrap):
else:
self.view_request()
- def _cached_conn_text(self, e, content, hdrItems, viewmode, pretty_type):
+ def _cached_conn_text(self, content, hdrItems, viewmode, pretty_type):
txt = common.format_keyvals(
[(h+":", v) for (h, v) in hdrItems],
key = "header",
val = "text"
)
-
if content:
- msg, body = contentview.get_content_view(viewmode, pretty_type, e, content, hdrItems)
+ msg, body = contentview.get_content_view(viewmode, pretty_type, hdrItems, content)
title = urwid.AttrWrap(urwid.Columns([
urwid.Text(
[
@@ -180,11 +179,8 @@ class FlowView(common.WWrap):
return f
def _conn_text(self, conn, viewmode, pretty_type):
- e = conn.headers["content-encoding"]
- e = e[0] if e else None
return cache.callback(
self, "_cached_conn_text",
- e,
conn.content,
tuple(tuple(i) for i in conn.headers.lst),
viewmode,
diff --git a/libmproxy/flow.py b/libmproxy/flow.py
index 5ff12b5b..bb079a8d 100644
--- a/libmproxy/flow.py
+++ b/libmproxy/flow.py
@@ -181,6 +181,12 @@ class ODict:
def add(self, key, value):
self.lst.append([key, str(value)])
+ def get(self, k, d=None):
+ if k in self:
+ return self[k]
+ else:
+ return d
+
def _get_state(self):
return [tuple(i) for i in self.lst]
diff --git a/test/test_console_contentview.py b/test/test_console_contentview.py
new file mode 100644
index 00000000..070094ff
--- /dev/null
+++ b/test/test_console_contentview.py
@@ -0,0 +1,135 @@
+import libpry
+import libmproxy.console.contentview as cv
+from libmproxy import utils, flow, encoding
+
+class uContentView(libpry.AutoTree):
+ def test_trailer(self):
+ txt = []
+ cv.trailer(5, txt)
+ assert not txt
+ cv.trailer(cv.VIEW_CUTOFF + 10, txt)
+ assert txt
+
+ def test_get_view_func(self):
+ f = cv.get_view_func(
+ cv.VIEW_CONTENT_HEX,
+ cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
+ flow.ODictCaseless(),
+ "foo"
+ )
+ assert f is cv.view_hex
+
+ f = cv.get_view_func(
+ cv.VIEW_CONTENT_RAW,
+ cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
+ flow.ODictCaseless(),
+ "foo"
+ )
+ assert f is cv.view_raw
+
+ f = cv.get_view_func(
+ cv.VIEW_CONTENT_PRETTY,
+ cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
+ flow.ODictCaseless(
+ [["content-type", "text/html"]],
+ ),
+ "foo"
+ )
+ assert f is cv.view_xmlish
+
+ f = cv.get_view_func(
+ cv.VIEW_CONTENT_PRETTY,
+ cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
+ flow.ODictCaseless(
+ [["content-type", "text/flibble"]],
+ ),
+ "foo"
+ )
+ assert f is cv.view_raw
+
+ f = cv.get_view_func(
+ cv.VIEW_CONTENT_PRETTY,
+ cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
+ flow.ODictCaseless(
+ [["content-type", "text/flibble"]],
+ ),
+ "<xml></xml>"
+ )
+ assert f is cv.view_xmlish
+
+ def test_view_urlencoded(self):
+ d = utils.urlencode([("one", "two"), ("three", "four")])
+ assert cv.view_urlencoded([], d)
+ assert not cv.view_urlencoded([], "foo")
+
+ def test_view_json(self):
+ cv.VIEW_CUTOFF = 100
+ assert cv.view_json([], "{}")
+ assert not cv.view_urlencoded([], "{")
+ assert cv.view_json([], "[" + ",".join(["0"]*cv.VIEW_CUTOFF) + "]")
+
+ def test_view_xmlish(self):
+ assert cv.view_xmlish([], "<foo></foo>")
+ assert cv.view_xmlish([], "<foo>")
+
+ def test_view_raw(self):
+ assert cv.view_raw([], "foo")
+
+ def test_view_raw(self):
+ assert cv.view_hex([], "foo")
+
+ def test_get_content_view(self):
+ r = cv.get_content_view(
+ cv.VIEW_CONTENT_RAW,
+ cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
+ [["content-type", "application/json"]],
+ "[1, 2, 3]"
+ )
+ assert r[0] == "Raw"
+
+ r = cv.get_content_view(
+ cv.VIEW_CONTENT_PRETTY,
+ cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
+ [["content-type", "application/json"]],
+ "[1, 2, 3]"
+ )
+ assert r[0] == "JSON"
+
+
+ r = cv.get_content_view(
+ cv.VIEW_CONTENT_PRETTY,
+ cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
+ [["content-type", "application/json"]],
+ "[1, 2"
+ )
+ assert r[0] == "Raw"
+
+ r = cv.get_content_view(
+ cv.VIEW_CONTENT_PRETTY,
+ cv.VIEW_CONTENT_PRETTY_TYPE_AUTO,
+ [
+ ["content-type", "application/json"],
+ ["content-encoding", "gzip"]
+ ],
+ encoding.encode('gzip', "[1, 2, 3]")
+ )
+ assert "decoded gzip" in r[0]
+ assert "JSON" in r[0]
+
+
+ r = cv.get_content_view(
+ cv.VIEW_CONTENT_PRETTY,
+ cv.VIEW_CONTENT_PRETTY_TYPE_XML,
+ [
+ ["content-type", "application/json"],
+ ["content-encoding", "gzip"]
+ ],
+ encoding.encode('gzip', "[1, 2, 3]")
+ )
+ assert "decoded gzip" in r[0]
+ assert "forced" in r[0]
+
+
+tests = [
+ uContentView()
+]
diff --git a/test/test_flow.py b/test/test_flow.py
index 8a7da05c..e44e2b0d 100644
--- a/test/test_flow.py
+++ b/test/test_flow.py
@@ -1001,6 +1001,11 @@ class uODict(libpry.AutoTree):
["two", "vun"],
]
+ def test_get(self):
+ self.od.add("one", "two")
+ assert self.od.get("one") == ["two"]
+ assert self.od.get("two") == None
+
class uODictCaseless(libpry.AutoTree):
def setUp(self):