aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorShadab Zafar <dufferzafar0@gmail.com>2016-07-10 20:23:58 +0530
committerShadab Zafar <dufferzafar0@gmail.com>2016-07-20 10:03:00 +0530
commit5377d5a61dfdad9790c7b64118622c40ad405e73 (patch)
treea6e4d96d83e9a3aa7dbc3a785055ef748d384dbf
parent4f04dd618c438b6884515a62996daf487e6903c0 (diff)
downloadmitmproxy-5377d5a61dfdad9790c7b64118622c40ad405e73.tar.gz
mitmproxy-5377d5a61dfdad9790c7b64118622c40ad405e73.tar.bz2
mitmproxy-5377d5a61dfdad9790c7b64118622c40ad405e73.zip
Refactor
-rw-r--r--mitmproxy/console/common.py223
1 files changed, 112 insertions, 111 deletions
diff --git a/mitmproxy/console/common.py b/mitmproxy/console/common.py
index d84c4620..03343914 100644
--- a/mitmproxy/console/common.py
+++ b/mitmproxy/console/common.py
@@ -134,88 +134,6 @@ else:
SYMBOL_MARK = "[m]"
-def raw_format_flow(f, focus, extended):
- f = dict(f)
- pile = []
- req = []
- if extended:
- req.append(
- fcol(
- human.format_timestamp(f["req_timestamp"]),
- "highlight"
- )
- )
- else:
- req.append(fcol(">>" if focus else " ", "focus"))
-
- if f["marked"]:
- req.append(fcol(SYMBOL_MARK, "mark"))
-
- if f["req_is_replay"]:
- req.append(fcol(SYMBOL_REPLAY, "replay"))
- req.append(fcol(f["req_method"], "method"))
-
- preamble = sum(i[1] for i in req) + len(req) - 1
-
- if f["intercepted"] and not f["acked"]:
- uc = "intercept"
- elif "resp_code" in f or "err_msg" in f:
- uc = "text"
- else:
- uc = "title"
-
- url = f["req_url"]
- if f["req_http_version"] not in ("HTTP/1.0", "HTTP/1.1"):
- url += " " + f["req_http_version"]
- req.append(
- urwid.Text([(uc, url)])
- )
-
- pile.append(urwid.Columns(req, dividechars=1))
-
- resp = []
- resp.append(
- ("fixed", preamble, urwid.Text(""))
- )
-
- if "resp_code" in f:
- codes = {
- 2: "code_200",
- 3: "code_300",
- 4: "code_400",
- 5: "code_500",
- }
- ccol = codes.get(f["resp_code"] / 100, "code_other")
- resp.append(fcol(SYMBOL_RETURN, ccol))
- if f["resp_is_replay"]:
- resp.append(fcol(SYMBOL_REPLAY, "replay"))
- resp.append(fcol(f["resp_code"], ccol))
- if extended:
- resp.append(fcol(f["resp_reason"], ccol))
- if f["intercepted"] and f["resp_code"] and not f["acked"]:
- rc = "intercept"
- else:
- rc = "text"
-
- if f["resp_ctype"]:
- resp.append(fcol(f["resp_ctype"], rc))
- resp.append(fcol(f["resp_clen"], rc))
- resp.append(fcol(f["roundtrip"], rc))
-
- elif f["err_msg"]:
- resp.append(fcol(SYMBOL_RETURN, "error"))
- resp.append(
- urwid.Text([
- (
- "error",
- f["err_msg"]
- )
- ])
- )
- pile.append(urwid.Columns(resp, dividechars=1))
- return urwid.Pile(pile)
-
-
# Save file to disk
def save_data(path, data):
if not path:
@@ -256,7 +174,32 @@ def ask_save_path(data, prompt="File path"):
)
-def copy_flow_format_data(part, scope, flow):
+def copy_to_clipboard_or_prompt(data):
+ # pyperclip calls encode('utf-8') on data to be copied without checking.
+ # if data are already encoded that way UnicodeDecodeError is thrown.
+ toclip = ""
+ try:
+ toclip = data.decode('utf-8')
+ except (UnicodeDecodeError):
+ toclip = data
+
+ try:
+ pyperclip.copy(toclip)
+ except (RuntimeError, UnicodeDecodeError, AttributeError, TypeError):
+ def save(k):
+ if k == "y":
+ ask_save_path(data, "Save data")
+ signals.status_prompt_onekey.send(
+ prompt = "Cannot copy data to clipboard. Save as file?",
+ keys = (
+ ("yes", "y"),
+ ("no", "n"),
+ ),
+ callback = save
+ )
+
+
+def flow_format_data(part, scope, flow):
if part == "u":
data = flow.request.url
else:
@@ -289,37 +232,12 @@ def copy_flow_format_data(part, scope, flow):
return data, False
-def copy_to_clipboard_or_prompt(data):
- # pyperclip calls encode('utf-8') on data to be copied without checking.
- # if data are already encoded that way UnicodeDecodeError is thrown.
- toclip = ""
- try:
- toclip = data.decode('utf-8')
- except (UnicodeDecodeError):
- toclip = data
-
- try:
- pyperclip.copy(toclip)
- except (RuntimeError, UnicodeDecodeError, AttributeError, TypeError):
- def save(k):
- if k == "y":
- ask_save_path(data, "Save data")
- signals.status_prompt_onekey.send(
- prompt = "Cannot copy data to clipboard. Save as file?",
- keys = (
- ("yes", "y"),
- ("no", "n"),
- ),
- callback = save
- )
-
-
def copy_flow(part, scope, flow, master, state):
"""
part: _c_ontent, _h_eaders+content, _u_rl
scope: _a_ll, re_q_uest, re_s_ponse
"""
- data, err = copy_flow_format_data(part, scope, flow)
+ data, err = flow_format_data(part, scope, flow)
if err:
signals.status_message.send(message=err)
@@ -355,8 +273,9 @@ def ask_copy_part(scope, flow, master, state):
def ask_save_body(part, master, state, flow):
"""
- Save either the request or the response body to disk. part can either be
- "q" (request), "s" (response) or None (ask user if necessary).
+ Save either the request or the response body to disk.
+
+ 'part' can either be "q" (request), "s" (response) or None (ask user if necessary).
"""
request_has_content = flow.request and flow.request.raw_content
@@ -408,6 +327,88 @@ def export_to_clip_or_file(key, flow, writer):
flowcache = utils.LRUCache(800)
+def raw_format_flow(f, focus, extended):
+ f = dict(f)
+ pile = []
+ req = []
+ if extended:
+ req.append(
+ fcol(
+ human.format_timestamp(f["req_timestamp"]),
+ "highlight"
+ )
+ )
+ else:
+ req.append(fcol(">>" if focus else " ", "focus"))
+
+ if f["marked"]:
+ req.append(fcol(SYMBOL_MARK, "mark"))
+
+ if f["req_is_replay"]:
+ req.append(fcol(SYMBOL_REPLAY, "replay"))
+ req.append(fcol(f["req_method"], "method"))
+
+ preamble = sum(i[1] for i in req) + len(req) - 1
+
+ if f["intercepted"] and not f["acked"]:
+ uc = "intercept"
+ elif "resp_code" in f or "err_msg" in f:
+ uc = "text"
+ else:
+ uc = "title"
+
+ url = f["req_url"]
+ if f["req_http_version"] not in ("HTTP/1.0", "HTTP/1.1"):
+ url += " " + f["req_http_version"]
+ req.append(
+ urwid.Text([(uc, url)])
+ )
+
+ pile.append(urwid.Columns(req, dividechars=1))
+
+ resp = []
+ resp.append(
+ ("fixed", preamble, urwid.Text(""))
+ )
+
+ if "resp_code" in f:
+ codes = {
+ 2: "code_200",
+ 3: "code_300",
+ 4: "code_400",
+ 5: "code_500",
+ }
+ ccol = codes.get(f["resp_code"] / 100, "code_other")
+ resp.append(fcol(SYMBOL_RETURN, ccol))
+ if f["resp_is_replay"]:
+ resp.append(fcol(SYMBOL_REPLAY, "replay"))
+ resp.append(fcol(f["resp_code"], ccol))
+ if extended:
+ resp.append(fcol(f["resp_reason"], ccol))
+ if f["intercepted"] and f["resp_code"] and not f["acked"]:
+ rc = "intercept"
+ else:
+ rc = "text"
+
+ if f["resp_ctype"]:
+ resp.append(fcol(f["resp_ctype"], rc))
+ resp.append(fcol(f["resp_clen"], rc))
+ resp.append(fcol(f["roundtrip"], rc))
+
+ elif f["err_msg"]:
+ resp.append(fcol(SYMBOL_RETURN, "error"))
+ resp.append(
+ urwid.Text([
+ (
+ "error",
+ f["err_msg"]
+ )
+ ])
+ )
+ pile.append(urwid.Columns(resp, dividechars=1))
+ return urwid.Pile(pile)
+
+
def format_flow(f, focus, extended=False, hostheader=False, marked=False):
d = dict(
intercepted = f.intercepted,