import enum import platform import typing import math from functools import lru_cache from publicsuffix2 import get_sld, get_tld import urwid import urwid.util from mitmproxy import flow from mitmproxy.http import HTTPFlow from mitmproxy.utils import human from mitmproxy.tcp import TCPFlow # Detect Windows Subsystem for Linux IS_WSL = "Microsoft" in platform.platform() def is_keypress(k): """ Is this input event a keypress? """ if isinstance(k, str): return True def highlight_key(str, key, textattr="text", keyattr="key"): l = [] parts = str.split(key, 1) if parts[0]: l.append((textattr, parts[0])) l.append((keyattr, key)) if parts[1]: l.append((textattr, parts[1])) return l KEY_MAX = 30 def format_keyvals( entries: typing.Iterable[typing.Tuple[str, typing.Union[None, str, urwid.Widget]]], key_format: str = "key", value_format: str = "text", indent: int = 0 ) -> typing.List[urwid.Columns]: """ Format a list of (key, value) tuples. Args: entries: The list to format. keys must be strings, values can also be None or urwid widgets. The latter makes it possible to use the result of format_keyvals() as a value. key_format: The display attribute for the key. value_format: The display attribute for the value. indent: Additional indent to apply. """ max_key_len = max((len(k) for k, v in entries if k is not None), default=0) max_key_len = min(max_key_len, KEY_MAX) if indent > 2: indent -= 2 # We use dividechars=2 below, which already adds two empty spaces ret = [] for k, v in entries: if v is None: v = urwid.Text("") elif not isinstance(v, urwid.Widget): v = urwid.Text([(value_format, v)]) ret.append( urwid.Columns( [ ("fixed", indent, urwid.Text("")), ( "fixed", max_key_len, urwid.Text([(key_format, k)]) ), v ], dividechars=2 ) ) return ret def fcol(s: str, attr: str) -> typing.Tuple[str, int, urwid.Text]: s = str(s) return ( "fixed", len(s), urwid.Text( [ (attr, s) ] ) ) if urwid.util.detected_encoding: SYMBOL_REPLAY = u"\u21ba" SYMBOL_RETURN = u"\u2190" SYMBOL_MARK = u"\u25cf" SYMBOL_UP = u"\u21E7" SYMBOL_DOWN = u"\u21E9" SYMBOL_ELLIPSIS = u"\u2026" else: SYMBOL_REPLAY = u"[r]" SYMBOL_RETURN = u"<-" SYMBOL_MARK = "#" SYMBOL_UP = "^" SYMBOL_DOWN = " " SYMBOL_ELLIPSIS = "~" SCHEME_STYLES = { 'http': 'scheme_http', 'https': 'scheme_https', 'tcp': 'scheme_tcp', } HTTP_REQUEST_METHOD_STYLES = { 'GET': 'method_get', 'POST': 'method_post', 'DELETE': 'method_delete', 'HEAD': 'method_head', 'PUT': 'method_put' } HTTP_RESPONSE_CODE_STYLE = { 2: "code_200", 3: "code_300", 4: "code_400", 5: "code_500", } class RenderMode(enum.Enum): TABLE = 1 """The flow list in table format, i.e. one row per flow.""" LIST = 2 """The flow list in list format, i.e. potentially multiple rows per flow.""" DETAILVIEW = 3 """The top lines in the detail view.""" def fixlen(s: str, maxlen: int) -> str: if len(s) <= maxlen: return s.ljust(maxlen) else: return s[0:maxlen - len(SYMBOL_ELLIPSIS)] + SYMBOL_ELLIPSIS def fixlen_r(s: str, maxlen: int) -> str: if len(s) <= maxlen: return s.rjust(maxlen) else: return SYMBOL_ELLIPSIS + s[len(s) - maxlen + len(SYMBOL_ELLIPSIS):] class TruncatedText(urwid.Widget): def __init__(self, text, attr, align='left'): self.text = text self.attr = attr self.align = align super(TruncatedText, self).__init__() def pack(self, size, focus=False): return (len(self.text), 1) def rows(self, size, focus=False): return 1 def render(self, size, focus=False): text = self.text attr = self.attr if self.align == 'right': text = text[::-1] attr = attr[::-1] text_len = len(text) # TODO: unicode? if size is not None and len(size) > 0: width = size[0] else: width = text_len if width >= text_len: remaining = width - text_len if remaining > 0: c_text = text + ' ' * remaining c_attr = attr + [('text', remaining)] else: c_text = text c_attr = attr else: visible_len = width - len(SYMBOL_ELLIPSIS) visible_text = text[0:visible_len] c_text = visible_text + SYMBOL_ELLIPSIS c_attr = (urwid.util.rle_subseg(attr, 0, len(visible_text.encode())) + [('focus', len(SYMBOL_ELLIPSIS.encode()))]) if self.align == 'right': c_text = c_text[::-1] c_attr = c_attr[::-1] return urwid.TextCanvas([c_text.encode()], [c_attr], maxcol=width) def truncated_plain(text, attr, align='left'): return TruncatedText(text, [(attr, len(text.encode()))], align) # Work around https://github.com/urwid/urwid/pull/330 def rle_append_beginning_modify(rle, a_r): """ Append (a, r) (unpacked from *a_r*) to BEGINNING of rle. Merge with first run when possible MODIFIES rle parameter contents. Returns None. """ a, r = a_r if not rle: rle[:] = [(a, r)] else: al, run = rle[0] if a == al: rle[0] = (a, run + r) else: rle[0:0] = [(a, r)] def colorize_host(host): tld = get_tld(host) sld = get_sld(host) attr = [] tld_size = len(tld) sld_size = len(sld) - tld_size for letter in reversed(range(len(host))): character = host[letter] if tld_size > 0: style = 'url_domain' tld_size -= 1 elif tld_size == 0: style = 'text' tld_size -= 1 elif sld_size > 0: sld_size -= 1 style = 'url_extension' else: style = 'text' rle_append_beginning_modify(attr, (style, len(character.encode()))) return attr def colorize_req(s): path = s.split('?', 2)[0] i_query = len(path) i_last_slash = path.rfind('/') i_ext = path[i_last_slash + 1:].rfind('.') i_ext = i_last_slash + i_ext if i_ext >= 0 else len(s) in_val = False attr = [] for i in range(len(s)): c = s[i] if ((i < i_query and c == '/') or (i < i_query and i > i_last_slash and c == '.') or (i == i_query)): a = 'url_punctuation' elif i > i_query: if in_val: if c == '&': in_val = False a = 'url_punctuation' else: a = 'url_query_value' else: if c == '=': in_val = True a = 'url_punctuation' else: a = 'url_query_key' elif i > i_ext: a = 'url_extension' elif i > i_last_slash: a = 'url_filename' else: a = 'text' urwid.util.rle_append_modify(attr, (a, len(c.encode()))) return attr def colorize_url(url): parts = url.split('/', 3) if len(parts) < 4 or len(parts[1]) > 0 or parts[0][-1:] != ':': return [('error', len(url))] # bad URL schemes = { 'http:': 'scheme_http', 'https:': 'scheme_https', } return [ (schemes.get(parts[0], "scheme_other"), len(parts[0]) - 1), ('url_punctuation', 3), # :// ] + colorize_host(parts[2]) + colorize_req('/' + parts[3]) def format_http_content_type(content_type: str) -> typing.Tuple[str, str]: content_type = content_type.split(";")[0] if content_type.endswith('/javascript'): style = 'content_script' elif content_type.startswith('text/'): style = 'content_text' elif (content_type.startswith('image/') or content_type.startswith('video/') or content_type.startswith('font/') or "/x-font-" in content_type): style = 'content_media' elif content_type.endswith('/json') or content_type.endswith('/xml'): style = 'content_data' elif content_type.startswith('application/'): style = 'content_raw' else: style = 'content_other' return content_type, style def format_duration(duration: float) -> typing.Tuple[str, str]: pretty_duration = human.pretty_duration(duration) style = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + 1000 * duration) / 12, 0.99)) return pretty_duration, style def format_size(num_bytes: int) -> typing.Tuple[str, str]: pretty_size = human.pretty_size(num_bytes) style = 'gradient_%02d' % int(99 - 100 * min(math.log2(1 + num_bytes) / 20, 0.99)) return pretty_size, style def format_left_indicators( *, focused: bool, intercepted: bool, timestamp: float ): indicators: typing.List[typing.Union[str, typing.Tuple[str, str]]] = [] if focused: indicators.append(("focus", ">>")) else: indicators.append(" ") pretty_timestamp = human.format_timestamp(timestamp)[-8:] if intercepted: indicators.append(("intercept", pretty_timestamp)) else: indicators.append(("text", pretty_timestamp)) return "fixed", 10, urwid.Text(indicators) def format_right_indicators( *, replay: bool, marked: bool ): indicators: typing.List[typing.Union[str, typing.Tuple[str, str]]] = [] if replay: indicators.append(("replay", SYMBOL_REPLAY)) else: indicators.append(" ") if marked: indicators.append(("mark", SYMBOL_MARK)) else: indicators.append(" ") return "fixed", 2, urwid.Text(indicators) @lru_cache(maxsize=800) def format_http_flow_list( *, render_mode: RenderMode, focused: bool, marked: bool, request_method: str, request_scheme: str, request_host: str, request_path: str, request_url: str, request_http_version: str, request_timestamp: float, request_is_push_promise: bool, request_is_replay: bool, intercepted: bool, response_code: typing.Optional[int], response_reason: typing.Optional[str], response_content_length: typing.Optional[int], response_content_type: typing.Optional[str], response_is_replay: bool, duration: typing.Optional[float], error_message: typing.Optional[str], ) -> urwid.Widget: req = [] if render_mode is RenderMode.DETAILVIEW: req.append(fcol(human.format_timestamp(request_timestamp), "highlight")) else: if focused: req.append(fcol(">>", "focus")) else: req.append(fcol(" ", "focus")) method_style = HTTP_REQUEST_METHOD_STYLES.get(request_method, "method_other") req.append(fcol(request_method, method_style)) if request_is_push_promise: req.append(fcol('PUSH_PROMISE', 'method_http2_push')) preamble_len = sum(x[1] for x in req) + len(req) - 1 if request_http_version not in ("HTTP/1.0", "HTTP/1.1"): request_url += " " + request_http_version if intercepted and not response_code: url_style = "intercept" elif response_code or error_message: url_style = "text" else: url_style = "title" if render_mode is RenderMode.DETAILVIEW: req.append( urwid.Text([(url_style, request_url)]) ) else: req.append(truncated_plain(request_url, url_style)) req.append(format_right_indicators(replay=request_is_replay or response_is_replay, marked=marked)) resp = [ ("fixed", preamble_len, urwid.Text("")) ] if response_code: if intercepted: style = "intercept" else: style = "" status_style = style or HTTP_RESPONSE_CODE_STYLE.get(response_code // 100, "code_other") resp.append(fcol(SYMBOL_RETURN, status_style)) if response_is_replay: resp.append(fcol(SYMBOL_REPLAY, "replay")) resp.append(fcol(str(response_code), status_style)) if response_reason and render_mode is RenderMode.DETAILVIEW: resp.append(fcol(response_reason, status_style)) if response_content_type: ct, ct_style = format_http_content_type(response_content_type) resp.append(fcol(ct, style or ct_style)) if response_content_length: size, size_style = format_size(response_content_length) elif response_content_length == 0: size = "[no content]" size_style = "text" else: size = "[content missing]" size_style = "text" resp.append(fcol(size, style or size_style)) if duration: dur, dur_style = format_duration(duration) resp.append(fcol(dur, style or dur_style)) elif error_message: resp.append(fcol(SYMBOL_RETURN, "error")) resp.append(urwid.Text([("error", error_message)])) return urwid.Pile([ urwid.Columns(req, dividechars=1), urwid.Columns(resp, dividechars=1) ]) @lru_cache(maxsize=800) def format_http_flow_table( *, render_mode: RenderMode, focused: bool, marked: bool, request_method: str, request_scheme: str, request_host: str, request_path: str, request_url: str, request_http_version: str, request_timestamp: float, request_is_push_promise: bool, request_is_replay: bool, intercepted: bool, response_code: typing.Optional[int], response_reason: typing.Optional[str], response_content_length: typing.Optional[int], response_content_type: typing.Optional[str], response_is_replay: bool, duration: typing.Optional[float], error_message: typing.Optional[str], ) -> urwid.Widget: items = [ format_left_indicators( focused=focused, intercepted=intercepted, timestamp=request_timestamp ) ] if intercepted and not response_code: request_style = "intercept" else: request_style = "" scheme_style = request_style or SCHEME_STYLES.get(request_scheme, "scheme_other") items.append(fcol(fixlen(request_scheme.upper(), 5), scheme_style)) if request_is_push_promise: method_style = 'method_http2_push' else: method_style = request_style or HTTP_REQUEST_METHOD_STYLES.get(request_method, "method_other") items.append(fcol(fixlen(request_method, 4), method_style)) items.append(('weight', 0.25, TruncatedText(request_host, colorize_host(request_host), 'right'))) items.append(('weight', 1.0, TruncatedText(request_path, colorize_req(request_path), 'left'))) if intercepted and response_code: response_style = "intercept" else: response_style = "" if response_code: status = str(response_code) status_style = response_style or HTTP_RESPONSE_CODE_STYLE.get(response_code // 100, "code_other") if response_content_length and response_content_type: content, content_style = format_http_content_type(response_content_type) content_style = response_style or content_style elif response_content_length: content = '' content_style = 'content_none' elif response_content_length == 0: content = "[no content]" content_style = 'content_none' else: content = "[content missing]" content_style = 'content_none' elif error_message: status = 'err' status_style = 'error' content = error_message content_style = 'error' else: status = '' status_style = 'text' content = '' content_style = '' items.append(fcol(fixlen(status, 3), status_style)) items.append(('weight', 0.15, truncated_plain(content, content_style, 'right'))) if response_content_length: size, size_style = format_size(response_content_length) items.append(fcol(fixlen_r(size, 5), response_style or size_style)) else: items.append(("fixed", 5, urwid.Text(""))) if duration: duration_pretty, duration_style = format_duration(duration) items.append(fcol(fixlen_r(duration_pretty, 5), response_style or duration_style)) else: items.append(("fixed", 5, urwid.Text(""))) items.append(format_right_indicators( replay=request_is_replay or response_is_replay, marked=marked )) return urwid.Columns(items, dividechars=1, min_width=15) @lru_cache(maxsize=800) def format_tcp_flow( *, render_mode: RenderMode, focused: bool, timestamp_start: float, marked: bool, client_address, server_address, total_size: int, duration: typing.Optional[float], error_message: typing.Optional[str], ): conn = f"{human.format_address(client_address)} <-> {human.format_address(server_address)}" items = [] if render_mode in (RenderMode.TABLE, RenderMode.DETAILVIEW): items.append( format_left_indicators(focused=focused, intercepted=False, timestamp=timestamp_start) ) else: if focused: items.append(fcol(">>", "focus")) else: items.append(fcol(" ", "focus")) if render_mode is RenderMode.TABLE: items.append(fcol("TCP ", SCHEME_STYLES["tcp"])) else: items.append(fcol("TCP", SCHEME_STYLES["tcp"])) items.append(('weight', 1.0, truncated_plain(conn, "text", 'left'))) if error_message: items.append(('weight', 1.0, truncated_plain(error_message, "error", 'left'))) if total_size: size, size_style = format_size(total_size) items.append(fcol(fixlen_r(size, 5), size_style)) else: items.append(("fixed", 5, urwid.Text(""))) if duration: duration_pretty, duration_style = format_duration(duration) items.append(fcol(fixlen_r(duration_pretty, 5), duration_style)) else: items.append(("fixed", 5, urwid.Text(""))) items.append(format_right_indicators(replay=False, marked=marked)) return urwid.Pile([ urwid.Columns(items, dividechars=1, min_width=15) ]) def format_flow( f: flow.Flow, *, render_mode: RenderMode, hostheader: bool = False, # pass options directly if we need more stuff from them focused: bool = True, ) -> urwid.Widget: """ This functions calls the proper renderer depending on the flow type. We also want to cache the renderer output, so we extract all attributes relevant for display and call the render with only that. This assures that rows are updated if the flow is changed. """ duration: typing.Optional[float] error_message: typing.Optional[str] if f.error: error_message = f.error.msg else: error_message = None if isinstance(f, TCPFlow): total_size = 0 for message in f.messages: total_size += len(message.content) if f.messages: duration = f.messages[-1].timestamp - f.timestamp_start else: duration = None return format_tcp_flow( render_mode=render_mode, focused=focused, timestamp_start=f.timestamp_start, marked=f.marked, client_address=f.client_conn.address, server_address=f.server_conn.address, total_size=total_size, duration=duration, error_message=error_message, ) elif isinstance(f, HTTPFlow): intercepted = ( f.intercepted and not (f.reply and f.reply.state == "committed") ) response_content_length: typing.Optional[int] if f.response: if f.response.raw_content is not None: response_content_length = len(f.response.raw_content) else: response_content_length = None response_code = f.response.status_code response_reason = f.response.reason response_content_type = f.response.headers.get("content-type") response_is_replay = f.response.is_replay if f.response.timestamp_end: duration = max([f.response.timestamp_end - f.request.timestamp_start, 0]) else: duration = None else: response_content_length = None response_code = None response_reason = None response_content_type = None response_is_replay = False duration = None if render_mode in (RenderMode.LIST, RenderMode.DETAILVIEW): render_func = format_http_flow_list else: render_func = format_http_flow_table return render_func( render_mode=render_mode, focused=focused, marked=f.marked, request_method=f.request.method, request_scheme=f.request.scheme, request_host=f.request.pretty_host if hostheader else f.request.host, request_path=f.request.path, request_url=f.request.pretty_url if hostheader else f.request.url, request_http_version=f.request.http_version, request_timestamp=f.request.timestamp_start, request_is_push_promise='h2-pushed-stream' in f.metadata, request_is_replay=f.request.is_replay, intercepted=intercepted, response_code=response_code, response_reason=response_reason, response_content_length=response_content_length, response_content_type=response_content_type, response_is_replay=response_is_replay, duration=duration, error_message=error_message, ) else: raise NotImplementedError()