diff options
author | Shadab Zafar <dufferzafar0@gmail.com> | 2016-07-10 20:23:58 +0530 |
---|---|---|
committer | Shadab Zafar <dufferzafar0@gmail.com> | 2016-07-20 10:03:00 +0530 |
commit | 5377d5a61dfdad9790c7b64118622c40ad405e73 (patch) | |
tree | a6e4d96d83e9a3aa7dbc3a785055ef748d384dbf | |
parent | 4f04dd618c438b6884515a62996daf487e6903c0 (diff) | |
download | mitmproxy-5377d5a61dfdad9790c7b64118622c40ad405e73.tar.gz mitmproxy-5377d5a61dfdad9790c7b64118622c40ad405e73.tar.bz2 mitmproxy-5377d5a61dfdad9790c7b64118622c40ad405e73.zip |
Refactor
-rw-r--r-- | mitmproxy/console/common.py | 223 |
1 files changed, 112 insertions, 111 deletions
diff --git a/mitmproxy/console/common.py b/mitmproxy/console/common.py index d84c4620..03343914 100644 --- a/mitmproxy/console/common.py +++ b/mitmproxy/console/common.py @@ -134,88 +134,6 @@ else: SYMBOL_MARK = "[m]" -def raw_format_flow(f, focus, extended): - f = dict(f) - pile = [] - req = [] - if extended: - req.append( - fcol( - human.format_timestamp(f["req_timestamp"]), - "highlight" - ) - ) - else: - req.append(fcol(">>" if focus else " ", "focus")) - - if f["marked"]: - req.append(fcol(SYMBOL_MARK, "mark")) - - if f["req_is_replay"]: - req.append(fcol(SYMBOL_REPLAY, "replay")) - req.append(fcol(f["req_method"], "method")) - - preamble = sum(i[1] for i in req) + len(req) - 1 - - if f["intercepted"] and not f["acked"]: - uc = "intercept" - elif "resp_code" in f or "err_msg" in f: - uc = "text" - else: - uc = "title" - - url = f["req_url"] - if f["req_http_version"] not in ("HTTP/1.0", "HTTP/1.1"): - url += " " + f["req_http_version"] - req.append( - urwid.Text([(uc, url)]) - ) - - pile.append(urwid.Columns(req, dividechars=1)) - - resp = [] - resp.append( - ("fixed", preamble, urwid.Text("")) - ) - - if "resp_code" in f: - codes = { - 2: "code_200", - 3: "code_300", - 4: "code_400", - 5: "code_500", - } - ccol = codes.get(f["resp_code"] / 100, "code_other") - resp.append(fcol(SYMBOL_RETURN, ccol)) - if f["resp_is_replay"]: - resp.append(fcol(SYMBOL_REPLAY, "replay")) - resp.append(fcol(f["resp_code"], ccol)) - if extended: - resp.append(fcol(f["resp_reason"], ccol)) - if f["intercepted"] and f["resp_code"] and not f["acked"]: - rc = "intercept" - else: - rc = "text" - - if f["resp_ctype"]: - resp.append(fcol(f["resp_ctype"], rc)) - resp.append(fcol(f["resp_clen"], rc)) - resp.append(fcol(f["roundtrip"], rc)) - - elif f["err_msg"]: - resp.append(fcol(SYMBOL_RETURN, "error")) - resp.append( - urwid.Text([ - ( - "error", - f["err_msg"] - ) - ]) - ) - pile.append(urwid.Columns(resp, dividechars=1)) - return urwid.Pile(pile) - - # Save file to disk def save_data(path, data): if not path: @@ -256,7 +174,32 @@ def ask_save_path(data, prompt="File path"): ) -def copy_flow_format_data(part, scope, flow): +def copy_to_clipboard_or_prompt(data): + # pyperclip calls encode('utf-8') on data to be copied without checking. + # if data are already encoded that way UnicodeDecodeError is thrown. + toclip = "" + try: + toclip = data.decode('utf-8') + except (UnicodeDecodeError): + toclip = data + + try: + pyperclip.copy(toclip) + except (RuntimeError, UnicodeDecodeError, AttributeError, TypeError): + def save(k): + if k == "y": + ask_save_path(data, "Save data") + signals.status_prompt_onekey.send( + prompt = "Cannot copy data to clipboard. Save as file?", + keys = ( + ("yes", "y"), + ("no", "n"), + ), + callback = save + ) + + +def flow_format_data(part, scope, flow): if part == "u": data = flow.request.url else: @@ -289,37 +232,12 @@ def copy_flow_format_data(part, scope, flow): return data, False -def copy_to_clipboard_or_prompt(data): - # pyperclip calls encode('utf-8') on data to be copied without checking. - # if data are already encoded that way UnicodeDecodeError is thrown. - toclip = "" - try: - toclip = data.decode('utf-8') - except (UnicodeDecodeError): - toclip = data - - try: - pyperclip.copy(toclip) - except (RuntimeError, UnicodeDecodeError, AttributeError, TypeError): - def save(k): - if k == "y": - ask_save_path(data, "Save data") - signals.status_prompt_onekey.send( - prompt = "Cannot copy data to clipboard. Save as file?", - keys = ( - ("yes", "y"), - ("no", "n"), - ), - callback = save - ) - - def copy_flow(part, scope, flow, master, state): """ part: _c_ontent, _h_eaders+content, _u_rl scope: _a_ll, re_q_uest, re_s_ponse """ - data, err = copy_flow_format_data(part, scope, flow) + data, err = flow_format_data(part, scope, flow) if err: signals.status_message.send(message=err) @@ -355,8 +273,9 @@ def ask_copy_part(scope, flow, master, state): def ask_save_body(part, master, state, flow): """ - Save either the request or the response body to disk. part can either be - "q" (request), "s" (response) or None (ask user if necessary). + Save either the request or the response body to disk. + + 'part' can either be "q" (request), "s" (response) or None (ask user if necessary). """ request_has_content = flow.request and flow.request.raw_content @@ -408,6 +327,88 @@ def export_to_clip_or_file(key, flow, writer): flowcache = utils.LRUCache(800) +def raw_format_flow(f, focus, extended): + f = dict(f) + pile = [] + req = [] + if extended: + req.append( + fcol( + human.format_timestamp(f["req_timestamp"]), + "highlight" + ) + ) + else: + req.append(fcol(">>" if focus else " ", "focus")) + + if f["marked"]: + req.append(fcol(SYMBOL_MARK, "mark")) + + if f["req_is_replay"]: + req.append(fcol(SYMBOL_REPLAY, "replay")) + req.append(fcol(f["req_method"], "method")) + + preamble = sum(i[1] for i in req) + len(req) - 1 + + if f["intercepted"] and not f["acked"]: + uc = "intercept" + elif "resp_code" in f or "err_msg" in f: + uc = "text" + else: + uc = "title" + + url = f["req_url"] + if f["req_http_version"] not in ("HTTP/1.0", "HTTP/1.1"): + url += " " + f["req_http_version"] + req.append( + urwid.Text([(uc, url)]) + ) + + pile.append(urwid.Columns(req, dividechars=1)) + + resp = [] + resp.append( + ("fixed", preamble, urwid.Text("")) + ) + + if "resp_code" in f: + codes = { + 2: "code_200", + 3: "code_300", + 4: "code_400", + 5: "code_500", + } + ccol = codes.get(f["resp_code"] / 100, "code_other") + resp.append(fcol(SYMBOL_RETURN, ccol)) + if f["resp_is_replay"]: + resp.append(fcol(SYMBOL_REPLAY, "replay")) + resp.append(fcol(f["resp_code"], ccol)) + if extended: + resp.append(fcol(f["resp_reason"], ccol)) + if f["intercepted"] and f["resp_code"] and not f["acked"]: + rc = "intercept" + else: + rc = "text" + + if f["resp_ctype"]: + resp.append(fcol(f["resp_ctype"], rc)) + resp.append(fcol(f["resp_clen"], rc)) + resp.append(fcol(f["roundtrip"], rc)) + + elif f["err_msg"]: + resp.append(fcol(SYMBOL_RETURN, "error")) + resp.append( + urwid.Text([ + ( + "error", + f["err_msg"] + ) + ]) + ) + pile.append(urwid.Columns(resp, dividechars=1)) + return urwid.Pile(pile) + + def format_flow(f, focus, extended=False, hostheader=False, marked=False): d = dict( intercepted = f.intercepted, |