diff options
Diffstat (limited to 'libmproxy/console/flowview.py')
-rw-r--r-- | libmproxy/console/flowview.py | 711 |
1 files changed, 0 insertions, 711 deletions
diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py deleted file mode 100644 index d2b98b68..00000000 --- a/libmproxy/console/flowview.py +++ /dev/null @@ -1,711 +0,0 @@ -from __future__ import absolute_import, division -import os -import traceback -import sys - -import math -import urwid - -from netlib import odict -from netlib.http import CONTENT_MISSING, Headers -from . import common, grideditor, signals, searchable, tabs -from . import flowdetailview -from .. import utils, controller, contentviews -from ..models import HTTPRequest, HTTPResponse, decoded -from ..exceptions import ContentViewException - - -class SearchError(Exception): - pass - - -def _mkhelp(): - text = [] - keys = [ - ("A", "accept all intercepted flows"), - ("a", "accept this intercepted flow"), - ("b", "save request/response body"), - ("D", "duplicate flow"), - ("d", "delete flow"), - ("E", "export"), - ("e", "edit request/response"), - ("f", "load full body data"), - ("m", "change body display mode for this entity"), - (None, - common.highlight_key("automatic", "a") + - [("text", ": automatic detection")] - ), - (None, - common.highlight_key("hex", "e") + - [("text", ": Hex")] - ), - (None, - common.highlight_key("html", "h") + - [("text", ": HTML")] - ), - (None, - common.highlight_key("image", "i") + - [("text", ": Image")] - ), - (None, - common.highlight_key("javascript", "j") + - [("text", ": JavaScript")] - ), - (None, - common.highlight_key("json", "s") + - [("text", ": JSON")] - ), - (None, - common.highlight_key("urlencoded", "u") + - [("text", ": URL-encoded data")] - ), - (None, - common.highlight_key("raw", "r") + - [("text", ": raw data")] - ), - (None, - common.highlight_key("xml", "x") + - [("text", ": XML")] - ), - ("M", "change default body display mode"), - ("p", "previous flow"), - ("P", "copy response(content/headers) to clipboard"), - ("r", "replay request"), - ("V", "revert changes to request"), - ("v", "view body in external viewer"), - ("w", "save all flows matching current limit"), - ("W", "save this flow"), - ("x", "delete body"), - ("z", "encode/decode a request/response"), - ("tab", "next tab"), - ("h, l", "previous tab, next tab"), - ("space", "next flow"), - ("|", "run script on this flow"), - ("/", "search (case sensitive)"), - ("n", "repeat search forward"), - ("N", "repeat search backwards"), - ] - text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) - return text -help_context = _mkhelp() - -footer = [ - ('heading_key', "?"), ":help ", - ('heading_key', "q"), ":back ", -] - - -class FlowViewHeader(urwid.WidgetWrap): - - def __init__(self, master, f): - self.master, self.flow = master, f - self._w = common.format_flow( - f, - False, - extended=True, - hostheader=self.master.showhost - ) - signals.flow_change.connect(self.sig_flow_change) - - def sig_flow_change(self, sender, flow): - if flow == self.flow: - self._w = common.format_flow( - flow, - False, - extended=True, - hostheader=self.master.showhost - ) - - -cache = utils.LRUCache(200) - -TAB_REQ = 0 -TAB_RESP = 1 - - -class FlowView(tabs.Tabs): - highlight_color = "focusfield" - - def __init__(self, master, state, flow, tab_offset): - self.master, self.state, self.flow = master, state, flow - tabs.Tabs.__init__(self, - [ - (self.tab_request, self.view_request), - (self.tab_response, self.view_response), - (self.tab_details, self.view_details), - ], - tab_offset - ) - self.show() - self.last_displayed_body = None - signals.flow_change.connect(self.sig_flow_change) - - def tab_request(self): - if self.flow.intercepted and not self.flow.reply.acked and not self.flow.response: - return "Request intercepted" - else: - return "Request" - - def tab_response(self): - if self.flow.intercepted and not self.flow.reply.acked and self.flow.response: - return "Response intercepted" - else: - return "Response" - - def tab_details(self): - return "Detail" - - def view_request(self): - return self.conn_text(self.flow.request) - - def view_response(self): - return self.conn_text(self.flow.response) - - def view_details(self): - return flowdetailview.flowdetails(self.state, self.flow) - - def sig_flow_change(self, sender, flow): - if flow == self.flow: - self.show() - - def content_view(self, viewmode, message): - if message.content == CONTENT_MISSING: - msg, body = "", [urwid.Text([("error", "[content missing]")])] - return msg, body - else: - full = self.state.get_flow_setting( - self.flow, - (self.tab_offset, "fullcontents"), - False - ) - if full: - limit = sys.maxsize - else: - limit = contentviews.VIEW_CUTOFF - return cache.get( - self._get_content_view, - viewmode, - message, - limit, - (bytes(message.headers), message.content) # Cache invalidation - ) - - def _get_content_view(self, viewmode, message, max_lines, _): - - try: - description, lines = contentviews.get_content_view( - viewmode, message.content, headers=message.headers - ) - except ContentViewException: - s = "Content viewer failed: \n" + traceback.format_exc() - signals.add_event(s, "error") - description, lines = contentviews.get_content_view( - contentviews.get("Raw"), message.content, headers=message.headers - ) - description = description.replace("Raw", "Couldn't parse: falling back to Raw") - - # Give hint that you have to tab for the response. - if description == "No content" and isinstance(message, HTTPRequest): - description = "No request content (press tab to view response)" - - # If the users has a wide terminal, he gets fewer lines; this should not be an issue. - chars_per_line = 80 - max_chars = max_lines * chars_per_line - total_chars = 0 - text_objects = [] - for line in lines: - txt = [] - for (style, text) in line: - if total_chars + len(text) > max_chars: - text = text[:max_chars - total_chars] - txt.append((style, text)) - total_chars += len(text) - if total_chars == max_chars: - break - - # round up to the next line. - total_chars = int(math.ceil(total_chars / chars_per_line) * chars_per_line) - - text_objects.append(urwid.Text(txt)) - if total_chars == max_chars: - text_objects.append(urwid.Text([ - ("highlight", "Stopped displaying data after %d lines. Press " % max_lines), - ("key", "f"), - ("highlight", " to load all data.") - ])) - break - - return description, text_objects - - def viewmode_get(self): - override = self.state.get_flow_setting( - self.flow, - (self.tab_offset, "prettyview") - ) - return self.state.default_body_view if override is None else override - - def conn_text(self, conn): - if conn: - txt = common.format_keyvals( - [(h + ":", v) for (h, v) in conn.headers.fields], - key = "header", - val = "text" - ) - viewmode = self.viewmode_get() - msg, body = self.content_view(viewmode, conn) - - cols = [ - urwid.Text( - [ - ("heading", msg), - ] - ), - urwid.Text( - [ - " ", - ('heading', "["), - ('heading_key', "m"), - ('heading', (":%s]" % viewmode.name)), - ], - align="right" - ) - ] - title = urwid.AttrWrap(urwid.Columns(cols), "heading") - - txt.append(title) - txt.extend(body) - else: - txt = [ - urwid.Text(""), - urwid.Text( - [ - ("highlight", "No response. Press "), - ("key", "e"), - ("highlight", " and edit any aspect to add one."), - ] - ) - ] - return searchable.Searchable(self.state, txt) - - def set_method_raw(self, m): - if m: - self.flow.request.method = m - signals.flow_change.send(self, flow = self.flow) - - def edit_method(self, m): - if m == "e": - signals.status_prompt.send( - prompt = "Method", - text = self.flow.request.method, - callback = self.set_method_raw - ) - else: - for i in common.METHOD_OPTIONS: - if i[1] == m: - self.flow.request.method = i[0].upper() - signals.flow_change.send(self, flow = self.flow) - - def set_url(self, url): - request = self.flow.request - try: - request.url = str(url) - except ValueError: - return "Invalid URL." - signals.flow_change.send(self, flow = self.flow) - - def set_resp_code(self, code): - response = self.flow.response - try: - response.status_code = int(code) - except ValueError: - return None - import BaseHTTPServer - if int(code) in BaseHTTPServer.BaseHTTPRequestHandler.responses: - response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[ - int(code)][0] - signals.flow_change.send(self, flow = self.flow) - - def set_resp_msg(self, msg): - response = self.flow.response - response.msg = msg - signals.flow_change.send(self, flow = self.flow) - - def set_headers(self, fields, conn): - conn.headers = Headers(fields) - signals.flow_change.send(self, flow = self.flow) - - def set_query(self, lst, conn): - conn.set_query(odict.ODict(lst)) - signals.flow_change.send(self, flow = self.flow) - - def set_path_components(self, lst, conn): - conn.set_path_components(lst) - signals.flow_change.send(self, flow = self.flow) - - def set_form(self, lst, conn): - conn.set_form_urlencoded(odict.ODict(lst)) - signals.flow_change.send(self, flow = self.flow) - - def edit_form(self, conn): - self.master.view_grideditor( - grideditor.URLEncodedFormEditor( - self.master, - conn.get_form_urlencoded().lst, - self.set_form, - conn - ) - ) - - def edit_form_confirm(self, key, conn): - if key == "y": - self.edit_form(conn) - - def set_cookies(self, lst, conn): - od = odict.ODict(lst) - conn.set_cookies(od) - signals.flow_change.send(self, flow = self.flow) - - def set_setcookies(self, data, conn): - conn.set_cookies(data) - signals.flow_change.send(self, flow = self.flow) - - def edit(self, part): - if self.tab_offset == TAB_REQ: - message = self.flow.request - else: - if not self.flow.response: - self.flow.response = HTTPResponse( - self.flow.request.http_version, - 200, "OK", Headers(), "" - ) - self.flow.response.reply = controller.DummyReply() - message = self.flow.response - - self.flow.backup() - if message == self.flow.request and part == "c": - self.master.view_grideditor( - grideditor.CookieEditor( - self.master, - message.get_cookies().lst, - self.set_cookies, - message - ) - ) - if message == self.flow.response and part == "c": - self.master.view_grideditor( - grideditor.SetCookieEditor( - self.master, - message.get_cookies(), - self.set_setcookies, - message - ) - ) - if part == "r": - with decoded(message): - # Fix an issue caused by some editors when editing a - # request/response body. Many editors make it hard to save a - # file without a terminating newline on the last line. When - # editing message bodies, this can cause problems. For now, I - # just strip the newlines off the end of the body when we return - # from an editor. - c = self.master.spawn_editor(message.content or "") - message.content = c.rstrip("\n") - elif part == "f": - if not message.get_form_urlencoded() and message.content: - signals.status_prompt_onekey.send( - prompt = "Existing body is not a URL-encoded form. Clear and edit?", - keys = [ - ("yes", "y"), - ("no", "n"), - ], - callback = self.edit_form_confirm, - args = (message,) - ) - else: - self.edit_form(message) - elif part == "h": - self.master.view_grideditor( - grideditor.HeaderEditor( - self.master, - message.headers.fields, - self.set_headers, - message - ) - ) - elif part == "p": - p = message.get_path_components() - self.master.view_grideditor( - grideditor.PathEditor( - self.master, - p, - self.set_path_components, - message - ) - ) - elif part == "q": - self.master.view_grideditor( - grideditor.QueryEditor( - self.master, - message.get_query().lst, - self.set_query, message - ) - ) - elif part == "u": - signals.status_prompt.send( - prompt = "URL", - text = message.url, - callback = self.set_url - ) - elif part == "m": - signals.status_prompt_onekey.send( - prompt = "Method", - keys = common.METHOD_OPTIONS, - callback = self.edit_method - ) - elif part == "o": - signals.status_prompt.send( - prompt = "Code", - text = str(message.status_code), - callback = self.set_resp_code - ) - elif part == "m": - signals.status_prompt.send( - prompt = "Message", - text = message.msg, - callback = self.set_resp_msg - ) - signals.flow_change.send(self, flow = self.flow) - - def _view_nextprev_flow(self, np, flow): - try: - idx = self.state.view.index(flow) - except IndexError: - return - if np == "next": - new_flow, new_idx = self.state.get_next(idx) - else: - new_flow, new_idx = self.state.get_prev(idx) - if new_flow is None: - signals.status_message.send(message="No more flows!") - else: - signals.pop_view_state.send(self) - self.master.view_flow(new_flow, self.tab_offset) - - def view_next_flow(self, flow): - return self._view_nextprev_flow("next", flow) - - def view_prev_flow(self, flow): - return self._view_nextprev_flow("prev", flow) - - def change_this_display_mode(self, t): - self.state.add_flow_setting( - self.flow, - (self.tab_offset, "prettyview"), - contentviews.get_by_shortcut(t) - ) - signals.flow_change.send(self, flow = self.flow) - - def delete_body(self, t): - if t == "m": - val = CONTENT_MISSING - else: - val = None - if self.tab_offset == TAB_REQ: - self.flow.request.content = val - else: - self.flow.response.content = val - signals.flow_change.send(self, flow = self.flow) - - def keypress(self, size, key): - key = super(self.__class__, self).keypress(size, key) - - if key == " ": - self.view_next_flow(self.flow) - return - - key = common.shortcuts(key) - if self.tab_offset == TAB_REQ: - conn = self.flow.request - elif self.tab_offset == TAB_RESP: - conn = self.flow.response - else: - conn = None - - if key in ("up", "down", "page up", "page down"): - # Why doesn't this just work?? - self._w.keypress(size, key) - elif key == "a": - self.flow.accept_intercept(self.master) - signals.flow_change.send(self, flow = self.flow) - elif key == "A": - self.master.accept_all() - signals.flow_change.send(self, flow = self.flow) - elif key == "d": - if self.state.flow_count() == 1: - self.master.view_flowlist() - elif self.state.view.index(self.flow) == len(self.state.view) - 1: - self.view_prev_flow(self.flow) - else: - self.view_next_flow(self.flow) - f = self.flow - f.kill(self.master) - self.state.delete_flow(f) - elif key == "D": - f = self.master.duplicate_flow(self.flow) - self.master.view_flow(f) - signals.status_message.send(message="Duplicated.") - elif key == "p": - self.view_prev_flow(self.flow) - elif key == "r": - r = self.master.replay_request(self.flow) - if r: - signals.status_message.send(message=r) - signals.flow_change.send(self, flow = self.flow) - elif key == "V": - if not self.flow.modified(): - signals.status_message.send(message="Flow not modified.") - return - self.state.revert(self.flow) - signals.flow_change.send(self, flow = self.flow) - signals.status_message.send(message="Reverted.") - elif key == "W": - signals.status_prompt_path.send( - prompt = "Save this flow", - callback = self.master.save_one_flow, - args = (self.flow,) - ) - elif key == "E": - signals.status_prompt_onekey.send( - self, - prompt = "Export", - keys = ( - ("as curl command", "c"), - ("as python code", "p"), - ("as raw request", "r"), - ), - callback = common.export_prompt, - args = (self.flow,) - ) - elif key == "|": - signals.status_prompt_path.send( - prompt = "Send flow to script", - callback = self.master.run_script_once, - args = (self.flow,) - ) - - if not conn and key in set(list("befgmxvz")): - signals.status_message.send( - message = "Tab to the request or response", - expire = 1 - ) - elif conn: - if key == "b": - if self.tab_offset == TAB_REQ: - common.ask_save_body( - "q", self.master, self.state, self.flow - ) - else: - common.ask_save_body( - "s", self.master, self.state, self.flow - ) - elif key == "e": - if self.tab_offset == TAB_REQ: - signals.status_prompt_onekey.send( - prompt = "Edit request", - keys = ( - ("cookies", "c"), - ("query", "q"), - ("path", "p"), - ("url", "u"), - ("header", "h"), - ("form", "f"), - ("raw body", "r"), - ("method", "m"), - ), - callback = self.edit - ) - else: - signals.status_prompt_onekey.send( - prompt = "Edit response", - keys = ( - ("cookies", "c"), - ("code", "o"), - ("message", "m"), - ("header", "h"), - ("raw body", "r"), - ), - callback = self.edit - ) - key = None - elif key == "f": - signals.status_message.send(message="Loading all body data...") - self.state.add_flow_setting( - self.flow, - (self.tab_offset, "fullcontents"), - True - ) - signals.flow_change.send(self, flow = self.flow) - signals.status_message.send(message="") - elif key == "P": - if self.tab_offset == TAB_REQ: - scope = "q" - else: - scope = "s" - common.ask_copy_part(scope, self.flow, self.master, self.state) - elif key == "m": - p = list(contentviews.view_prompts) - p.insert(0, ("Clear", "C")) - signals.status_prompt_onekey.send( - self, - prompt = "Display mode", - keys = p, - callback = self.change_this_display_mode - ) - key = None - elif key == "x": - signals.status_prompt_onekey.send( - prompt = "Delete body", - keys = ( - ("completely", "c"), - ("mark as missing", "m"), - ), - callback = self.delete_body - ) - key = None - elif key == "v": - if conn.content: - t = conn.headers.get("content-type") - if "EDITOR" in os.environ or "PAGER" in os.environ: - self.master.spawn_external_viewer(conn.content, t) - else: - signals.status_message.send( - message = "Error! Set $EDITOR or $PAGER." - ) - elif key == "z": - self.flow.backup() - e = conn.headers.get("content-encoding", "identity") - if e != "identity": - if not conn.decode(): - signals.status_message.send( - message = "Could not decode - invalid data?" - ) - else: - signals.status_prompt_onekey.send( - prompt = "Select encoding: ", - keys = ( - ("gzip", "z"), - ("deflate", "d"), - ), - callback = self.encode_callback, - args = (conn,) - ) - signals.flow_change.send(self, flow = self.flow) - return key - - def encode_callback(self, key, conn): - encoding_map = { - "z": "gzip", - "d": "deflate", - } - conn.encode(encoding_map[key]) - signals.flow_change.send(self, flow = self.flow) |