aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/libmproxy/console/flowview.py
diff options
context:
space:
mode:
Diffstat (limited to 'mitmproxy/libmproxy/console/flowview.py')
-rw-r--r--mitmproxy/libmproxy/console/flowview.py711
1 files changed, 711 insertions, 0 deletions
diff --git a/mitmproxy/libmproxy/console/flowview.py b/mitmproxy/libmproxy/console/flowview.py
new file mode 100644
index 00000000..d2b98b68
--- /dev/null
+++ b/mitmproxy/libmproxy/console/flowview.py
@@ -0,0 +1,711 @@
+from __future__ import absolute_import, division
+import os
+import traceback
+import sys
+
+import math
+import urwid
+
+from netlib import odict
+from netlib.http import CONTENT_MISSING, Headers
+from . import common, grideditor, signals, searchable, tabs
+from . import flowdetailview
+from .. import utils, controller, contentviews
+from ..models import HTTPRequest, HTTPResponse, decoded
+from ..exceptions import ContentViewException
+
+
+class SearchError(Exception):
+ pass
+
+
+def _mkhelp():
+ text = []
+ keys = [
+ ("A", "accept all intercepted flows"),
+ ("a", "accept this intercepted flow"),
+ ("b", "save request/response body"),
+ ("D", "duplicate flow"),
+ ("d", "delete flow"),
+ ("E", "export"),
+ ("e", "edit request/response"),
+ ("f", "load full body data"),
+ ("m", "change body display mode for this entity"),
+ (None,
+ common.highlight_key("automatic", "a") +
+ [("text", ": automatic detection")]
+ ),
+ (None,
+ common.highlight_key("hex", "e") +
+ [("text", ": Hex")]
+ ),
+ (None,
+ common.highlight_key("html", "h") +
+ [("text", ": HTML")]
+ ),
+ (None,
+ common.highlight_key("image", "i") +
+ [("text", ": Image")]
+ ),
+ (None,
+ common.highlight_key("javascript", "j") +
+ [("text", ": JavaScript")]
+ ),
+ (None,
+ common.highlight_key("json", "s") +
+ [("text", ": JSON")]
+ ),
+ (None,
+ common.highlight_key("urlencoded", "u") +
+ [("text", ": URL-encoded data")]
+ ),
+ (None,
+ common.highlight_key("raw", "r") +
+ [("text", ": raw data")]
+ ),
+ (None,
+ common.highlight_key("xml", "x") +
+ [("text", ": XML")]
+ ),
+ ("M", "change default body display mode"),
+ ("p", "previous flow"),
+ ("P", "copy response(content/headers) to clipboard"),
+ ("r", "replay request"),
+ ("V", "revert changes to request"),
+ ("v", "view body in external viewer"),
+ ("w", "save all flows matching current limit"),
+ ("W", "save this flow"),
+ ("x", "delete body"),
+ ("z", "encode/decode a request/response"),
+ ("tab", "next tab"),
+ ("h, l", "previous tab, next tab"),
+ ("space", "next flow"),
+ ("|", "run script on this flow"),
+ ("/", "search (case sensitive)"),
+ ("n", "repeat search forward"),
+ ("N", "repeat search backwards"),
+ ]
+ text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
+ return text
+help_context = _mkhelp()
+
+footer = [
+ ('heading_key', "?"), ":help ",
+ ('heading_key', "q"), ":back ",
+]
+
+
+class FlowViewHeader(urwid.WidgetWrap):
+
+ def __init__(self, master, f):
+ self.master, self.flow = master, f
+ self._w = common.format_flow(
+ f,
+ False,
+ extended=True,
+ hostheader=self.master.showhost
+ )
+ signals.flow_change.connect(self.sig_flow_change)
+
+ def sig_flow_change(self, sender, flow):
+ if flow == self.flow:
+ self._w = common.format_flow(
+ flow,
+ False,
+ extended=True,
+ hostheader=self.master.showhost
+ )
+
+
+cache = utils.LRUCache(200)
+
+TAB_REQ = 0
+TAB_RESP = 1
+
+
+class FlowView(tabs.Tabs):
+ highlight_color = "focusfield"
+
+ def __init__(self, master, state, flow, tab_offset):
+ self.master, self.state, self.flow = master, state, flow
+ tabs.Tabs.__init__(self,
+ [
+ (self.tab_request, self.view_request),
+ (self.tab_response, self.view_response),
+ (self.tab_details, self.view_details),
+ ],
+ tab_offset
+ )
+ self.show()
+ self.last_displayed_body = None
+ signals.flow_change.connect(self.sig_flow_change)
+
+ def tab_request(self):
+ if self.flow.intercepted and not self.flow.reply.acked and not self.flow.response:
+ return "Request intercepted"
+ else:
+ return "Request"
+
+ def tab_response(self):
+ if self.flow.intercepted and not self.flow.reply.acked and self.flow.response:
+ return "Response intercepted"
+ else:
+ return "Response"
+
+ def tab_details(self):
+ return "Detail"
+
+ def view_request(self):
+ return self.conn_text(self.flow.request)
+
+ def view_response(self):
+ return self.conn_text(self.flow.response)
+
+ def view_details(self):
+ return flowdetailview.flowdetails(self.state, self.flow)
+
+ def sig_flow_change(self, sender, flow):
+ if flow == self.flow:
+ self.show()
+
+ def content_view(self, viewmode, message):
+ if message.content == CONTENT_MISSING:
+ msg, body = "", [urwid.Text([("error", "[content missing]")])]
+ return msg, body
+ else:
+ full = self.state.get_flow_setting(
+ self.flow,
+ (self.tab_offset, "fullcontents"),
+ False
+ )
+ if full:
+ limit = sys.maxsize
+ else:
+ limit = contentviews.VIEW_CUTOFF
+ return cache.get(
+ self._get_content_view,
+ viewmode,
+ message,
+ limit,
+ (bytes(message.headers), message.content) # Cache invalidation
+ )
+
+ def _get_content_view(self, viewmode, message, max_lines, _):
+
+ try:
+ description, lines = contentviews.get_content_view(
+ viewmode, message.content, headers=message.headers
+ )
+ except ContentViewException:
+ s = "Content viewer failed: \n" + traceback.format_exc()
+ signals.add_event(s, "error")
+ description, lines = contentviews.get_content_view(
+ contentviews.get("Raw"), message.content, headers=message.headers
+ )
+ description = description.replace("Raw", "Couldn't parse: falling back to Raw")
+
+ # Give hint that you have to tab for the response.
+ if description == "No content" and isinstance(message, HTTPRequest):
+ description = "No request content (press tab to view response)"
+
+ # If the users has a wide terminal, he gets fewer lines; this should not be an issue.
+ chars_per_line = 80
+ max_chars = max_lines * chars_per_line
+ total_chars = 0
+ text_objects = []
+ for line in lines:
+ txt = []
+ for (style, text) in line:
+ if total_chars + len(text) > max_chars:
+ text = text[:max_chars - total_chars]
+ txt.append((style, text))
+ total_chars += len(text)
+ if total_chars == max_chars:
+ break
+
+ # round up to the next line.
+ total_chars = int(math.ceil(total_chars / chars_per_line) * chars_per_line)
+
+ text_objects.append(urwid.Text(txt))
+ if total_chars == max_chars:
+ text_objects.append(urwid.Text([
+ ("highlight", "Stopped displaying data after %d lines. Press " % max_lines),
+ ("key", "f"),
+ ("highlight", " to load all data.")
+ ]))
+ break
+
+ return description, text_objects
+
+ def viewmode_get(self):
+ override = self.state.get_flow_setting(
+ self.flow,
+ (self.tab_offset, "prettyview")
+ )
+ return self.state.default_body_view if override is None else override
+
+ def conn_text(self, conn):
+ if conn:
+ txt = common.format_keyvals(
+ [(h + ":", v) for (h, v) in conn.headers.fields],
+ key = "header",
+ val = "text"
+ )
+ viewmode = self.viewmode_get()
+ msg, body = self.content_view(viewmode, conn)
+
+ cols = [
+ urwid.Text(
+ [
+ ("heading", msg),
+ ]
+ ),
+ urwid.Text(
+ [
+ " ",
+ ('heading', "["),
+ ('heading_key', "m"),
+ ('heading', (":%s]" % viewmode.name)),
+ ],
+ align="right"
+ )
+ ]
+ title = urwid.AttrWrap(urwid.Columns(cols), "heading")
+
+ txt.append(title)
+ txt.extend(body)
+ else:
+ txt = [
+ urwid.Text(""),
+ urwid.Text(
+ [
+ ("highlight", "No response. Press "),
+ ("key", "e"),
+ ("highlight", " and edit any aspect to add one."),
+ ]
+ )
+ ]
+ return searchable.Searchable(self.state, txt)
+
+ def set_method_raw(self, m):
+ if m:
+ self.flow.request.method = m
+ signals.flow_change.send(self, flow = self.flow)
+
+ def edit_method(self, m):
+ if m == "e":
+ signals.status_prompt.send(
+ prompt = "Method",
+ text = self.flow.request.method,
+ callback = self.set_method_raw
+ )
+ else:
+ for i in common.METHOD_OPTIONS:
+ if i[1] == m:
+ self.flow.request.method = i[0].upper()
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_url(self, url):
+ request = self.flow.request
+ try:
+ request.url = str(url)
+ except ValueError:
+ return "Invalid URL."
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_resp_code(self, code):
+ response = self.flow.response
+ try:
+ response.status_code = int(code)
+ except ValueError:
+ return None
+ import BaseHTTPServer
+ if int(code) in BaseHTTPServer.BaseHTTPRequestHandler.responses:
+ response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[
+ int(code)][0]
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_resp_msg(self, msg):
+ response = self.flow.response
+ response.msg = msg
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_headers(self, fields, conn):
+ conn.headers = Headers(fields)
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_query(self, lst, conn):
+ conn.set_query(odict.ODict(lst))
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_path_components(self, lst, conn):
+ conn.set_path_components(lst)
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_form(self, lst, conn):
+ conn.set_form_urlencoded(odict.ODict(lst))
+ signals.flow_change.send(self, flow = self.flow)
+
+ def edit_form(self, conn):
+ self.master.view_grideditor(
+ grideditor.URLEncodedFormEditor(
+ self.master,
+ conn.get_form_urlencoded().lst,
+ self.set_form,
+ conn
+ )
+ )
+
+ def edit_form_confirm(self, key, conn):
+ if key == "y":
+ self.edit_form(conn)
+
+ def set_cookies(self, lst, conn):
+ od = odict.ODict(lst)
+ conn.set_cookies(od)
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_setcookies(self, data, conn):
+ conn.set_cookies(data)
+ signals.flow_change.send(self, flow = self.flow)
+
+ def edit(self, part):
+ if self.tab_offset == TAB_REQ:
+ message = self.flow.request
+ else:
+ if not self.flow.response:
+ self.flow.response = HTTPResponse(
+ self.flow.request.http_version,
+ 200, "OK", Headers(), ""
+ )
+ self.flow.response.reply = controller.DummyReply()
+ message = self.flow.response
+
+ self.flow.backup()
+ if message == self.flow.request and part == "c":
+ self.master.view_grideditor(
+ grideditor.CookieEditor(
+ self.master,
+ message.get_cookies().lst,
+ self.set_cookies,
+ message
+ )
+ )
+ if message == self.flow.response and part == "c":
+ self.master.view_grideditor(
+ grideditor.SetCookieEditor(
+ self.master,
+ message.get_cookies(),
+ self.set_setcookies,
+ message
+ )
+ )
+ if part == "r":
+ with decoded(message):
+ # Fix an issue caused by some editors when editing a
+ # request/response body. Many editors make it hard to save a
+ # file without a terminating newline on the last line. When
+ # editing message bodies, this can cause problems. For now, I
+ # just strip the newlines off the end of the body when we return
+ # from an editor.
+ c = self.master.spawn_editor(message.content or "")
+ message.content = c.rstrip("\n")
+ elif part == "f":
+ if not message.get_form_urlencoded() and message.content:
+ signals.status_prompt_onekey.send(
+ prompt = "Existing body is not a URL-encoded form. Clear and edit?",
+ keys = [
+ ("yes", "y"),
+ ("no", "n"),
+ ],
+ callback = self.edit_form_confirm,
+ args = (message,)
+ )
+ else:
+ self.edit_form(message)
+ elif part == "h":
+ self.master.view_grideditor(
+ grideditor.HeaderEditor(
+ self.master,
+ message.headers.fields,
+ self.set_headers,
+ message
+ )
+ )
+ elif part == "p":
+ p = message.get_path_components()
+ self.master.view_grideditor(
+ grideditor.PathEditor(
+ self.master,
+ p,
+ self.set_path_components,
+ message
+ )
+ )
+ elif part == "q":
+ self.master.view_grideditor(
+ grideditor.QueryEditor(
+ self.master,
+ message.get_query().lst,
+ self.set_query, message
+ )
+ )
+ elif part == "u":
+ signals.status_prompt.send(
+ prompt = "URL",
+ text = message.url,
+ callback = self.set_url
+ )
+ elif part == "m":
+ signals.status_prompt_onekey.send(
+ prompt = "Method",
+ keys = common.METHOD_OPTIONS,
+ callback = self.edit_method
+ )
+ elif part == "o":
+ signals.status_prompt.send(
+ prompt = "Code",
+ text = str(message.status_code),
+ callback = self.set_resp_code
+ )
+ elif part == "m":
+ signals.status_prompt.send(
+ prompt = "Message",
+ text = message.msg,
+ callback = self.set_resp_msg
+ )
+ signals.flow_change.send(self, flow = self.flow)
+
+ def _view_nextprev_flow(self, np, flow):
+ try:
+ idx = self.state.view.index(flow)
+ except IndexError:
+ return
+ if np == "next":
+ new_flow, new_idx = self.state.get_next(idx)
+ else:
+ new_flow, new_idx = self.state.get_prev(idx)
+ if new_flow is None:
+ signals.status_message.send(message="No more flows!")
+ else:
+ signals.pop_view_state.send(self)
+ self.master.view_flow(new_flow, self.tab_offset)
+
+ def view_next_flow(self, flow):
+ return self._view_nextprev_flow("next", flow)
+
+ def view_prev_flow(self, flow):
+ return self._view_nextprev_flow("prev", flow)
+
+ def change_this_display_mode(self, t):
+ self.state.add_flow_setting(
+ self.flow,
+ (self.tab_offset, "prettyview"),
+ contentviews.get_by_shortcut(t)
+ )
+ signals.flow_change.send(self, flow = self.flow)
+
+ def delete_body(self, t):
+ if t == "m":
+ val = CONTENT_MISSING
+ else:
+ val = None
+ if self.tab_offset == TAB_REQ:
+ self.flow.request.content = val
+ else:
+ self.flow.response.content = val
+ signals.flow_change.send(self, flow = self.flow)
+
+ def keypress(self, size, key):
+ key = super(self.__class__, self).keypress(size, key)
+
+ if key == " ":
+ self.view_next_flow(self.flow)
+ return
+
+ key = common.shortcuts(key)
+ if self.tab_offset == TAB_REQ:
+ conn = self.flow.request
+ elif self.tab_offset == TAB_RESP:
+ conn = self.flow.response
+ else:
+ conn = None
+
+ if key in ("up", "down", "page up", "page down"):
+ # Why doesn't this just work??
+ self._w.keypress(size, key)
+ elif key == "a":
+ self.flow.accept_intercept(self.master)
+ signals.flow_change.send(self, flow = self.flow)
+ elif key == "A":
+ self.master.accept_all()
+ signals.flow_change.send(self, flow = self.flow)
+ elif key == "d":
+ if self.state.flow_count() == 1:
+ self.master.view_flowlist()
+ elif self.state.view.index(self.flow) == len(self.state.view) - 1:
+ self.view_prev_flow(self.flow)
+ else:
+ self.view_next_flow(self.flow)
+ f = self.flow
+ f.kill(self.master)
+ self.state.delete_flow(f)
+ elif key == "D":
+ f = self.master.duplicate_flow(self.flow)
+ self.master.view_flow(f)
+ signals.status_message.send(message="Duplicated.")
+ elif key == "p":
+ self.view_prev_flow(self.flow)
+ elif key == "r":
+ r = self.master.replay_request(self.flow)
+ if r:
+ signals.status_message.send(message=r)
+ signals.flow_change.send(self, flow = self.flow)
+ elif key == "V":
+ if not self.flow.modified():
+ signals.status_message.send(message="Flow not modified.")
+ return
+ self.state.revert(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
+ signals.status_message.send(message="Reverted.")
+ elif key == "W":
+ signals.status_prompt_path.send(
+ prompt = "Save this flow",
+ callback = self.master.save_one_flow,
+ args = (self.flow,)
+ )
+ elif key == "E":
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Export",
+ keys = (
+ ("as curl command", "c"),
+ ("as python code", "p"),
+ ("as raw request", "r"),
+ ),
+ callback = common.export_prompt,
+ args = (self.flow,)
+ )
+ elif key == "|":
+ signals.status_prompt_path.send(
+ prompt = "Send flow to script",
+ callback = self.master.run_script_once,
+ args = (self.flow,)
+ )
+
+ if not conn and key in set(list("befgmxvz")):
+ signals.status_message.send(
+ message = "Tab to the request or response",
+ expire = 1
+ )
+ elif conn:
+ if key == "b":
+ if self.tab_offset == TAB_REQ:
+ common.ask_save_body(
+ "q", self.master, self.state, self.flow
+ )
+ else:
+ common.ask_save_body(
+ "s", self.master, self.state, self.flow
+ )
+ elif key == "e":
+ if self.tab_offset == TAB_REQ:
+ signals.status_prompt_onekey.send(
+ prompt = "Edit request",
+ keys = (
+ ("cookies", "c"),
+ ("query", "q"),
+ ("path", "p"),
+ ("url", "u"),
+ ("header", "h"),
+ ("form", "f"),
+ ("raw body", "r"),
+ ("method", "m"),
+ ),
+ callback = self.edit
+ )
+ else:
+ signals.status_prompt_onekey.send(
+ prompt = "Edit response",
+ keys = (
+ ("cookies", "c"),
+ ("code", "o"),
+ ("message", "m"),
+ ("header", "h"),
+ ("raw body", "r"),
+ ),
+ callback = self.edit
+ )
+ key = None
+ elif key == "f":
+ signals.status_message.send(message="Loading all body data...")
+ self.state.add_flow_setting(
+ self.flow,
+ (self.tab_offset, "fullcontents"),
+ True
+ )
+ signals.flow_change.send(self, flow = self.flow)
+ signals.status_message.send(message="")
+ elif key == "P":
+ if self.tab_offset == TAB_REQ:
+ scope = "q"
+ else:
+ scope = "s"
+ common.ask_copy_part(scope, self.flow, self.master, self.state)
+ elif key == "m":
+ p = list(contentviews.view_prompts)
+ p.insert(0, ("Clear", "C"))
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Display mode",
+ keys = p,
+ callback = self.change_this_display_mode
+ )
+ key = None
+ elif key == "x":
+ signals.status_prompt_onekey.send(
+ prompt = "Delete body",
+ keys = (
+ ("completely", "c"),
+ ("mark as missing", "m"),
+ ),
+ callback = self.delete_body
+ )
+ key = None
+ elif key == "v":
+ if conn.content:
+ t = conn.headers.get("content-type")
+ if "EDITOR" in os.environ or "PAGER" in os.environ:
+ self.master.spawn_external_viewer(conn.content, t)
+ else:
+ signals.status_message.send(
+ message = "Error! Set $EDITOR or $PAGER."
+ )
+ elif key == "z":
+ self.flow.backup()
+ e = conn.headers.get("content-encoding", "identity")
+ if e != "identity":
+ if not conn.decode():
+ signals.status_message.send(
+ message = "Could not decode - invalid data?"
+ )
+ else:
+ signals.status_prompt_onekey.send(
+ prompt = "Select encoding: ",
+ keys = (
+ ("gzip", "z"),
+ ("deflate", "d"),
+ ),
+ callback = self.encode_callback,
+ args = (conn,)
+ )
+ signals.flow_change.send(self, flow = self.flow)
+ return key
+
+ def encode_callback(self, key, conn):
+ encoding_map = {
+ "z": "gzip",
+ "d": "deflate",
+ }
+ conn.encode(encoding_map[key])
+ signals.flow_change.send(self, flow = self.flow)