diff options
authorBkPHcgQL3V <BkPHcgQL3V@gmx.com>2018-12-19 17:52:32 +0000
committerBkPHcgQL3V <BkPHcgQL3V@gmx.com>2018-12-19 17:52:32 +0000
commit407a5d71a86b715a3c4dce34e1918c35e9dcab79 (patch)
parent889987aa0a7f4852758ed09f70fe5d30f733a6d3 (diff)
Tabular list flow
4 files changed, 356 insertions, 75 deletions
diff --git a/mitmproxy/tools/console/common.py b/mitmproxy/tools/console/common.py
index 5d7ee09d..49f5c247 100644
--- a/mitmproxy/tools/console/common.py
+++ b/mitmproxy/tools/console/common.py
@@ -1,5 +1,8 @@
import platform
import typing
+import datetime
+import time
+import math
from functools import lru_cache
import urwid
@@ -97,12 +100,171 @@ if urwid.util.detected_encoding and not IS_WSL:
SYMBOL_MARK = u"\u25cf"
SYMBOL_UP = u"\u21E7"
SYMBOL_DOWN = u"\u21E9"
+ SYMBOL_ELLIPSIS = u"\u2026"
+def fixlen(s, maxlen):
+ if len(s) <= maxlen:
+ return s.ljust(maxlen)
+ else:
+ return s[0:maxlen - len(SYMBOL_ELLIPSIS)] + SYMBOL_ELLIPSIS
+def fixlen_r(s, maxlen):
+ if len(s) <= maxlen:
+ return s.rjust(maxlen)
+ else:
+ return SYMBOL_ELLIPSIS + s[len(s) - maxlen + len(SYMBOL_ELLIPSIS):]
+class TruncatedText(urwid.Widget):
+ def __init__(self, text, attr, align='left'):
+ self.text = text
+ self.attr = attr
+ self.align = align
+ super(TruncatedText, self).__init__()
+ def pack(self, size, focus=False):
+ return (len(self.text), 1)
+ def rows(self, size, focus=False):
+ return 1
+ def render(self, size, focus=False):
+ text = self.text
+ attr = self.attr
+ if self.align == 'right':
+ text = text[::-1]
+ attr = attr[::-1]
+ text_len = len(text) # TODO: unicode?
+ if size is not None and len(size) > 0:
+ width = size[0]
+ else:
+ width = text_len
+ if width >= text_len:
+ remaining = width - text_len
+ if remaining > 0:
+ c_text = text + ' ' * remaining
+ c_attr = attr + [('text', remaining)]
+ else:
+ c_text = text
+ c_attr = attr
+ else:
+ visible_len = width - len(SYMBOL_ELLIPSIS)
+ visible_text = text[0:visible_len]
+ c_text = visible_text + SYMBOL_ELLIPSIS
+ c_attr = (urwid.util.rle_subseg(attr, 0, len(visible_text.encode())) +
+ [('focus', len(SYMBOL_ELLIPSIS.encode()))])
+ if self.align == 'right':
+ c_text = c_text[::-1]
+ c_attr = c_attr[::-1]
+ return urwid.TextCanvas([c_text.encode()], [c_attr], maxcol=width)
+def truncated_plain(text, attr, align='left'):
+ return TruncatedText(text, [(attr, len(text.encode()))], align)
+# Work around https://github.com/urwid/urwid/pull/330
+def rle_append_beginning_modify(rle, a_r):
+ """
+ Append (a, r) (unpacked from *a_r*) to BEGINNING of rle.
+ Merge with first run when possible
+ MODIFIES rle parameter contents. Returns None.
+ """
+ a, r = a_r
+ if not rle:
+ rle[:] = [(a, r)]
+ else:
+ al, run = rle[0]
+ if a == al:
+ rle[0] = (a, run + r)
+ else:
+ rle[0:0] = [(a, r)]
+def colorize_host(s):
+ if len(s) == 0 or s[0] == '[' or s.split('.')[-1].isdigit():
+ main_part = -1
+ else:
+ main_part = 1 # TODO: second-level domains (https://publicsuffix.org/list/)
+ part = 0
+ attr = []
+ for i in reversed(range(len(s))):
+ c = s[i]
+ if c == '.':
+ part += 1
+ if c in ".:[]":
+ a = 'url_punctuation'
+ elif part == main_part:
+ a = 'url_domain'
+ else:
+ a = 'text'
+ rle_append_beginning_modify(attr, (a, len(c.encode())))
+ return attr
+def colorize_req(s):
+ path = s.split('?', 2)[0]
+ i_query = len(path)
+ i_last_slash = path.rfind('/')
+ i_ext = path[i_last_slash + 1:].rfind('.')
+ i_ext = i_last_slash + i_ext if i_ext >= 0 else len(s)
+ in_val = False
+ attr = []
+ for i in range(len(s)):
+ c = s[i]
+ if ((i < i_query and c == '/') or
+ (i < i_query and i > i_last_slash and c == '.') or
+ (i == i_query)):
+ a = 'url_punctuation'
+ elif i > i_query:
+ if in_val:
+ if c == '&':
+ in_val = False
+ a = 'url_punctuation'
+ else:
+ a = 'url_query_value'
+ else:
+ if c == '=':
+ in_val = True
+ a = 'url_punctuation'
+ else:
+ a = 'url_query_key'
+ elif i > i_ext:
+ a = 'url_extension'
+ elif i > i_last_slash:
+ a = 'url_filename'
+ else:
+ a = 'text'
+ urwid.util.rle_append_modify(attr, (a, len(c.encode())))
+ return attr
+def colorize_url(url):
+ parts = url.split('/', 3)
+ if len(parts) < 4 or len(parts[1]) > 0 or parts[0][-1:] != ':':
+ return [('error', len(url))] # bad URL
+ schemes = {
+ 'http:': 'scheme_http',
+ 'https:': 'scheme_https',
+ }
+ return [
+ (schemes.get(parts[0], "scheme_other"), len(parts[0]) - 1),
+ ('url_punctuation', 3), # ://
+ ] + colorize_host(parts[2]) + colorize_req('/' + parts[3])
@@ -110,50 +272,71 @@ def raw_format_flow(f):
f = dict(f)
pile = []
req = []
- if f["extended"]:
- req.append(
- fcol(
- human.format_timestamp(f["req_timestamp"]),
- "highlight"
- )
- )
- else:
- req.append(fcol(">>" if f["focus"] else " ", "focus"))
- if f["marked"]:
- req.append(fcol(SYMBOL_MARK, "mark"))
+ cursor = [' ', 'focus']
+ if f.get('resp_is_replay', False):
+ cursor[0] = SYMBOL_REPLAY
+ cursor[1] = 'replay'
+ if f['marked']:
+ if cursor[0] == ' ':
+ cursor[0] = SYMBOL_MARK
+ cursor[1] = 'mark'
+ if f['focus']:
+ cursor[0] = '>'
- if f["req_is_replay"]:
- req.append(fcol(SYMBOL_REPLAY, "replay"))
+ req.append(fcol(*cursor))
- req.append(fcol(f["req_method"], "method"))
+ if f["two_line"]:
+ req.append(TruncatedText(f["req_url"], colorize_url(f["req_url"]), 'left'))
+ pile.append(urwid.Columns(req, dividechars=1))
- preamble = sum(i[1] for i in req) + len(req) - 1
+ req = []
+ req.append(fcol(' ', 'text'))
if f["intercepted"] and not f["acked"]:
uc = "intercept"
- elif "resp_code" in f or "err_msg" in f:
- uc = "text"
+ elif "resp_code" in f or f["err_msg"] is not None:
+ uc = "highlight"
uc = "title"
- url = f["req_url"]
- if f["max_url_len"] and len(url) > f["max_url_len"]:
- url = url[:f["max_url_len"]] + "…"
+ if f["extended"]:
+ s = human.format_timestamp(f["req_timestamp"])
+ else:
+ s = datetime.datetime.fromtimestamp(time.mktime(time.localtime(f["req_timestamp"]))).strftime("%H:%M:%S")
+ req.append(fcol(s, uc))
+ methods = {
+ 'GET': 'method_get',
+ 'POST': 'method_post',
+ }
+ uc = methods.get(f["req_method"], "method_other")
+ if f['extended']:
+ req.append(fcol(f["req_method"], uc))
+ if f["req_promise"]:
+ req.append(fcol('PUSH_PROMISE', 'method_http2_push'))
+ else:
+ if f["req_promise"]:
+ uc = 'method_http2_push'
+ req.append(("fixed", 4, truncated_plain(f["req_method"], uc)))
- if f["req_http_version"] not in ("HTTP/1.0", "HTTP/1.1"):
- url += " " + f["req_http_version"]
- req.append(
- urwid.Text([(uc, url)])
- )
+ if f["two_line"]:
+ req.append(fcol(f["req_http_version"], 'text'))
+ else:
+ schemes = {
+ 'http': 'scheme_http',
+ 'https': 'scheme_https',
+ }
+ req.append(fcol(fixlen(f["req_scheme"].upper(), 5), schemes.get(f["req_scheme"], "scheme_other")))
- pile.append(urwid.Columns(req, dividechars=1))
+ req.append(('weight', 0.25, TruncatedText(f["req_host"], colorize_host(f["req_host"]), 'right')))
+ req.append(('weight', 1.0, TruncatedText(f["req_path"], colorize_req(f["req_path"]), 'left')))
- resp = []
- resp.append(
- ("fixed", preamble, urwid.Text(""))
- )
+ ret = (' ' * len(SYMBOL_RETURN), 'text')
+ status = ('', 'text')
+ content = ('', 'text')
+ size = ('', 'text')
+ duration = ('', 'text')
if "resp_code" in f:
codes = {
@@ -163,79 +346,112 @@ def raw_format_flow(f):
5: "code_500",
ccol = codes.get(f["resp_code"] // 100, "code_other")
- resp.append(fcol(SYMBOL_RETURN, ccol))
- if f["resp_is_replay"]:
- resp.append(fcol(SYMBOL_REPLAY, "replay"))
- resp.append(fcol(f["resp_code"], ccol))
- if f["extended"]:
- resp.append(fcol(f["resp_reason"], ccol))
- if f["intercepted"] and f["resp_code"] and not f["acked"]:
- rc = "intercept"
+ ret = (SYMBOL_RETURN, ccol)
+ status = (str(f["resp_code"]), ccol)
+ if f["resp_len"] < 0:
+ if f["intercepted"] and f["resp_code"] and not f["acked"]:
+ rc = "intercept"
+ else:
+ rc = "content_none"
+ if f["resp_len"] == -1:
+ contentdesc = "[content missing]"
+ else:
+ contentdesc = "[no content]"
+ content = (contentdesc, rc)
- rc = "text"
- if f["resp_ctype"]:
- resp.append(fcol(f["resp_ctype"], rc))
- resp.append(fcol(f["resp_clen"], rc))
- resp.append(fcol(f["roundtrip"], rc))
+ if f["resp_ctype"]:
+ ctype = f["resp_ctype"].split(";")[0]
+ if ctype.endswith('/javascript'):
+ rc = 'content_script'
+ elif ctype.startswith('text/'):
+ rc = 'content_text'
+ elif (ctype.startswith('image/') or
+ ctype.startswith('video/') or
+ ctype.startswith('font/') or
+ "/x-font-" in ctype):
+ rc = 'content_media'
+ elif ctype.endswith('/json') or ctype.endswith('/xml'):
+ rc = 'content_data'
+ elif ctype.startswith('application/'):
+ rc = 'content_raw'
+ else:
+ rc = 'content_other'
+ content = (ctype, rc)
+ rc = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + f["resp_len"]) / 20, 0.99))
+ size_str = human.pretty_size(f["resp_len"])
+ if not f['extended']:
+ # shorten to 5 chars max
+ if len(size_str) > 5:
+ size_str = size_str[0:4].rstrip('.') + size_str[-1:]
+ size = (size_str, rc)
+ if f['duration'] is not None:
+ rc = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + 1000 * f['duration']) / 12, 0.99))
+ duration = (human.pretty_duration(f['duration']), rc)
elif f["err_msg"]:
- resp.append(fcol(SYMBOL_RETURN, "error"))
- resp.append(
- urwid.Text([
- (
- "error",
- f["err_msg"]
- )
- ])
- )
- pile.append(urwid.Columns(resp, dividechars=1))
+ status = ('Err', 'error')
+ content = f["err_msg"], 'error'
+ if f["two_line"]:
+ req.append(fcol(*ret))
+ req.append(fcol(fixlen(status[0], 3), status[1]))
+ req.append(('weight', 0.15, truncated_plain(content[0], content[1], 'right')))
+ if f['extended']:
+ req.append(fcol(*size))
+ else:
+ req.append(fcol(fixlen_r(size[0], 5), size[1]))
+ req.append(fcol(fixlen_r(duration[0], 5), duration[1]))
+ pile.append(urwid.Columns(req, dividechars=1, min_width=15))
return urwid.Pile(pile)
-def format_flow(f, focus, extended=False, hostheader=False, max_url_len=False):
+def format_flow(f, focus, extended=False, hostheader=False, cols=False):
acked = False
if f.reply and f.reply.state == "committed":
acked = True
- pushed = ' PUSH_PROMISE' if 'h2-pushed-stream' in f.metadata else ''
d = dict(
- max_url_len=max_url_len,
+ two_line=extended or cols < 100,
- req_method=f.request.method + pushed,
+ req_method=f.request.method,
+ req_promise='h2-pushed-stream' in f.metadata,
req_url=f.request.pretty_url if hostheader else f.request.url,
+ req_scheme=f.request.scheme,
+ req_host=f.request.pretty_host if hostheader else f.request.host,
+ req_path=f.request.path,
err_msg=f.error.msg if f.error else None,
if f.response:
if f.response.raw_content:
- contentdesc = human.pretty_size(len(f.response.raw_content))
+ content_len = len(f.response.raw_content)
elif f.response.raw_content is None:
- contentdesc = "[content missing]"
+ content_len = -1
- contentdesc = "[no content]"
- duration = 0
+ content_len = -2
+ duration = None
if f.response.timestamp_end and f.request.timestamp_start:
duration = f.response.timestamp_end - f.request.timestamp_start
- roundtrip = human.pretty_duration(duration)
- resp_clen=contentdesc,
- roundtrip=roundtrip,
+ resp_len=content_len,
+ resp_ctype=f.response.headers.get("content-type"),
+ duration=duration,
- t = f.response.headers.get("content-type")
- if t:
- d["resp_ctype"] = t.split(";")[0]
- else:
- d["resp_ctype"] = ""
return raw_format_flow(tuple(sorted(d.items())))
diff --git a/mitmproxy/tools/console/flowlist.py b/mitmproxy/tools/console/flowlist.py
index e947a582..63e67327 100644
--- a/mitmproxy/tools/console/flowlist.py
+++ b/mitmproxy/tools/console/flowlist.py
@@ -18,7 +18,7 @@ class FlowItem(urwid.WidgetWrap):
self.flow is self.master.view.focus.flow,
- max_url_len=cols,
+ cols=cols,
def selectable(self):
diff --git a/mitmproxy/tools/console/flowview.py b/mitmproxy/tools/console/flowview.py
index b4e3876f..5466319a 100644
--- a/mitmproxy/tools/console/flowview.py
+++ b/mitmproxy/tools/console/flowview.py
@@ -38,7 +38,7 @@ class FlowViewHeader(urwid.WidgetWrap):
- max_url_len=cols,
+ cols=cols,
self._w = urwid.Pile([])
diff --git a/mitmproxy/tools/console/palettes.py b/mitmproxy/tools/console/palettes.py
index 7930c4a3..405f1a6c 100644
--- a/mitmproxy/tools/console/palettes.py
+++ b/mitmproxy/tools/console/palettes.py
@@ -22,7 +22,12 @@ class Palette:
# List and Connections
- 'method', 'focus',
+ 'method',
+ 'method_get', 'method_post', 'method_other', 'method_http2_push',
+ 'scheme_http', 'scheme_https', 'scheme_other',
+ 'url_punctuation', 'url_domain', 'url_filename', 'url_extension', 'url_query_key', 'url_query_value',
+ 'content_none', 'content_text', 'content_script', 'content_media', 'content_data', 'content_raw', 'content_other',
+ 'focus',
'code_200', 'code_300', 'code_400', 'code_500', 'code_other',
'error', "warn", "alert",
'header', 'highlight', 'intercept', 'replay', 'mark',
@@ -36,6 +41,7 @@ class Palette:
# Commander
'commander_command', 'commander_invalid', 'commander_hint'
+ _fields.extend(['gradient_%02d' % i for i in range(100)])
high: typing.Mapping[str, typing.Sequence[str]] = None
def palette(self, transparent):
@@ -68,6 +74,27 @@ class Palette:
return l
+def gen_gradient(palette, cols):
+ for i in range(100):
+ palette['gradient_%02d' % i] = (cols[i * len(cols) // 100], 'default')
+def gen_rgb_gradient(palette, cols):
+ parts = len(cols) - 1
+ for i in range(100):
+ p = i / 100
+ idx = int(p * parts)
+ t0 = cols[idx]
+ t1 = cols[idx + 1]
+ pp = p * parts % 1
+ t = (
+ round(t0[0] + (t1[0] - t0[0]) * pp),
+ round(t0[1] + (t1[1] - t0[1]) * pp),
+ round(t0[2] + (t1[2] - t0[2]) * pp),
+ )
+ palette['gradient_%02d' % i] = ("#%x%x%x" % t, 'default')
class LowDark(Palette):
@@ -95,6 +122,30 @@ class LowDark(Palette):
# List and Connections
method = ('dark cyan', 'default'),
+ method_get = ('dark cyan', 'default'),
+ method_post = ('dark red', 'default'),
+ method_other = ('dark magenta', 'default'),
+ method_http2_push = ('dark gray', 'default'),
+ scheme_http = ('dark cyan', 'default'),
+ scheme_https = ('dark green', 'default'),
+ scheme_other = ('dark magenta', 'default'),
+ url_punctuation = ('dark gray', 'default'),
+ url_domain = ('white', 'default'),
+ url_filename = ('dark cyan', 'default'),
+ url_extension = ('light gray', 'default'),
+ url_query_key = ('white', 'default'),
+ url_query_value = ('light gray', 'default'),
+ content_none = ('dark gray', 'default'),
+ content_text = ('light gray', 'default'),
+ content_script = ('dark green', 'default'),
+ content_media = ('light blue', 'default'),
+ content_data = ('brown', 'default'),
+ content_raw = ('dark red', 'default'),
+ content_other = ('dark magenta', 'default'),
focus = ('yellow', 'default'),
code_200 = ('dark green', 'default'),
@@ -127,6 +178,7 @@ class LowDark(Palette):
commander_invalid = ('light red', 'default'),
commander_hint = ('dark gray', 'default'),
+ gen_gradient(low, ['light red', 'yellow', 'light green', 'dark green', 'dark cyan', 'dark blue'])
class Dark(LowDark):
@@ -312,8 +364,20 @@ class SolarizedDark(LowDark):
# List and Connections
method = (sol_cyan, 'default'),
+ method_http2_push = (sol_base01, 'default'),
focus = (sol_base1, 'default'),
+ url_punctuation = ('h242', 'default'),
+ url_domain = ('h252', 'default'),
+ url_filename = ('h132', 'default'),
+ url_extension = ('h96', 'default'),
+ url_query_key = ('h37', 'default'),
+ url_query_value = ('h30', 'default'),
+ content_none = (sol_base01, 'default'),
+ content_text = (sol_base1, 'default'),
+ content_media = (sol_blue, 'default'),
code_200 = (sol_green, 'default'),
code_300 = (sol_blue, 'default'),
code_400 = (sol_orange, 'default',),
@@ -342,6 +406,7 @@ class SolarizedDark(LowDark):
commander_invalid = (sol_orange, 'default'),
commander_hint = (sol_base00, 'default'),
+ gen_rgb_gradient(high, [(15, 0, 0), (15, 15, 0), (0, 15, 0), (0, 15, 15), (0, 0, 15)])
DEFAULT = "dark"