From 8cd5e2d25b6db818518c195af9fcaa62bc46a63a Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Sun, 12 Apr 2020 01:22:44 +0200 Subject: lint, mypy, tests++ --- mitmproxy/addons/view.py | 2 +- mitmproxy/tools/console/common.py | 37 ++++++++++++++++++------------- mitmproxy/tools/console/flowdetailview.py | 3 +++ 3 files changed, 25 insertions(+), 17 deletions(-) (limited to 'mitmproxy') diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py index 4b7e076b..4d0a7ef9 100644 --- a/mitmproxy/addons/view.py +++ b/mitmproxy/addons/view.py @@ -575,7 +575,7 @@ class View(collections.abc.Sequence): def kill(self, f): self.update([f]) - def tcp_start(self,f): + def tcp_start(self, f): self.add([f]) def tcp_message(self, f): diff --git a/mitmproxy/tools/console/common.py b/mitmproxy/tools/console/common.py index ccfda186..cba3a355 100644 --- a/mitmproxy/tools/console/common.py +++ b/mitmproxy/tools/console/common.py @@ -1,8 +1,6 @@ import enum import platform import typing -import datetime -import time import math from functools import lru_cache from publicsuffix2 import get_sld, get_tld @@ -343,7 +341,7 @@ def format_left_indicators( intercepted: bool, timestamp: float ): - indicators = [] + indicators: typing.List[typing.Union[str, typing.Tuple[str, str]]] = [] if focused: indicators.append(("focus", ">>")) else: @@ -361,7 +359,7 @@ def format_right_indicators( replay: bool, marked: bool ): - indicators = [] + indicators: typing.List[typing.Union[str, typing.Tuple[str, str]]] = [] if replay: indicators.append(("replay", SYMBOL_REPLAY)) else: @@ -440,14 +438,14 @@ def format_http_flow_list( if intercepted: style = "intercept" else: - style = None + style = "" status_style = style or HTTP_RESPONSE_CODE_STYLE.get(response_code // 100, "code_other") resp.append(fcol(SYMBOL_RETURN, status_style)) if response_is_replay: resp.append(fcol(SYMBOL_REPLAY, "replay")) resp.append(fcol(str(response_code), status_style)) - if render_mode is RenderMode.DETAILVIEW: + if response_reason and render_mode is RenderMode.DETAILVIEW: resp.append(fcol(response_reason, status_style)) if response_content_type: @@ -512,7 +510,7 @@ def format_http_flow_table( if intercepted and not response_code: request_style = "intercept" else: - request_style = None + request_style = "" scheme_style = request_style or SCHEME_STYLES.get(request_scheme, "scheme_other") items.append(fcol(fixlen(request_scheme.upper(), 5), scheme_style)) @@ -529,7 +527,7 @@ def format_http_flow_table( if intercepted and response_code: response_style = "intercept" else: - response_style = None + response_style = "" if response_code: @@ -584,16 +582,17 @@ def format_http_flow_table( @lru_cache(maxsize=800) -def raw_format_tcp_flow( +def format_tcp_flow( *, render_mode: RenderMode, - focused: typing.Optional[bool], + focused: bool, timestamp_start: float, marked: bool, client_address, server_address, total_size: int, duration: typing.Optional[float], + error_message: typing.Optional[str], ): conn = f"{human.format_address(client_address)} <-> {human.format_address(server_address)}" @@ -615,6 +614,8 @@ def raw_format_tcp_flow( items.append(fcol("TCP", SCHEME_STYLES["tcp"])) items.append(('weight', 1.0, truncated_plain(conn, "text", 'left'))) + if error_message: + items.append(('weight', 1.0, truncated_plain(error_message, "error", 'left'))) if total_size: size, size_style = format_size(total_size) @@ -648,6 +649,12 @@ def format_flow( relevant for display and call the render with only that. This assures that rows are updated if the flow is changed. """ + duration: typing.Optional[float] + error_message: typing.Optional[str] + if f.error: + error_message = f.error.msg + else: + error_message = None if isinstance(f, TCPFlow): total_size = 0 @@ -657,7 +664,7 @@ def format_flow( duration = f.messages[-1].timestamp - f.timestamp_start else: duration = None - return raw_format_tcp_flow( + return format_tcp_flow( render_mode=render_mode, focused=focused, timestamp_start=f.timestamp_start, @@ -666,11 +673,13 @@ def format_flow( server_address=f.server_conn.address, total_size=total_size, duration=duration, + error_message=error_message, ) elif isinstance(f, HTTPFlow): intercepted = ( f.intercepted and not (f.reply and f.reply.state == "committed") ) + response_content_length: typing.Optional[int] if f.response: if f.response.raw_content is not None: response_content_length = len(f.response.raw_content) @@ -691,10 +700,6 @@ def format_flow( response_content_type = None response_is_replay = False duration = None - if f.error: - error_message = f.error.msg - else: - error_message = None if render_mode in (RenderMode.LIST, RenderMode.DETAILVIEW): render_func = format_http_flow_list @@ -720,7 +725,7 @@ def format_flow( response_content_type=response_content_type, response_is_replay=response_is_replay, duration=duration, - error_message=error_message + error_message=error_message, ) else: diff --git a/mitmproxy/tools/console/flowdetailview.py b/mitmproxy/tools/console/flowdetailview.py index ec716936..fb2494e8 100644 --- a/mitmproxy/tools/console/flowdetailview.py +++ b/mitmproxy/tools/console/flowdetailview.py @@ -1,3 +1,4 @@ +import typing import urwid import mitmproxy.flow @@ -19,6 +20,8 @@ def flowdetails(state, flow: mitmproxy.flow.Flow): sc = flow.server_conn cc = flow.client_conn + req: typing.Optional[http.HTTPRequest] + resp: typing.Optional[http.HTTPResponse] if isinstance(flow, http.HTTPFlow): req = flow.request resp = flow.response -- cgit v1.2.3