diff options
Diffstat (limited to 'mitmproxy/tools/console/flowview.py')
-rw-r--r-- | mitmproxy/tools/console/flowview.py | 411 |
1 files changed, 41 insertions, 370 deletions
diff --git a/mitmproxy/tools/console/flowview.py b/mitmproxy/tools/console/flowview.py index b7b7053f..50f0d176 100644 --- a/mitmproxy/tools/console/flowview.py +++ b/mitmproxy/tools/console/flowview.py @@ -1,5 +1,4 @@ import math -import os import sys from functools import lru_cache from typing import Optional, Union # noqa @@ -7,13 +6,9 @@ from typing import Optional, Union # noqa import urwid from mitmproxy import contentviews -from mitmproxy import exceptions from mitmproxy import http -from mitmproxy.net.http import Headers -from mitmproxy.net.http import status_codes from mitmproxy.tools.console import common from mitmproxy.tools.console import flowdetailview -from mitmproxy.tools.console import grideditor from mitmproxy.tools.console import overlay from mitmproxy.tools.console import searchable from mitmproxy.tools.console import signals @@ -106,49 +101,51 @@ class FlowViewHeader(urwid.WidgetWrap): def __init__( self, master: "mitmproxy.tools.console.master.ConsoleMaster", - f: http.HTTPFlow ) -> None: self.master = master - self.flow = f - self._w = common.format_flow( - f, - False, - extended=True, - hostheader=self.master.options.showhost - ) - signals.flow_change.connect(self.sig_flow_change) + self.focus_changed() - def sig_flow_change(self, sender, flow): - if flow == self.flow: + def focus_changed(self): + if self.master.view.focus.flow: self._w = common.format_flow( - flow, + self.master.view.focus.flow, False, extended=True, hostheader=self.master.options.showhost ) + else: + self._w = urwid.Pile([]) TAB_REQ = 0 TAB_RESP = 1 -class FlowView(tabs.Tabs): +class FlowDetails(tabs.Tabs): highlight_color = "focusfield" - def __init__(self, master, view, flow, tab_offset): - self.master, self.view, self.flow = master, view, flow - super().__init__( - [ + def __init__(self, master, tab_offset): + self.master = master + super().__init__([], tab_offset) + self.show() + self.last_displayed_body = None + + def focus_changed(self): + if self.master.view.focus.flow: + self.tabs = [ (self.tab_request, self.view_request), (self.tab_response, self.view_response), (self.tab_details, self.view_details), - ], - tab_offset - ) - + ] self.show() - self.last_displayed_body = None - signals.flow_change.connect(self.sig_flow_change) + + @property + def view(self): + return self.master.view + + @property + def flow(self): + return self.master.view.focus.flow def tab_request(self): if self.flow.intercepted and not self.flow.response: @@ -174,10 +171,6 @@ class FlowView(tabs.Tabs): def view_details(self): return flowdetailview.flowdetails(self.view, self.flow) - def sig_flow_change(self, sender, flow): - if flow == self.flow: - self.show() - def content_view(self, viewmode, message): if message.raw_content is None: msg, body = "", [urwid.Text([("error", "[content missing]")])] @@ -288,208 +281,11 @@ class FlowView(tabs.Tabs): ] ) ] - return searchable.Searchable(self.view, txt) - - def set_method_raw(self, m): - if m: - self.flow.request.method = m - signals.flow_change.send(self, flow = self.flow) - - def edit_method(self, m): - if m == "e": - signals.status_prompt.send( - prompt = "Method", - text = self.flow.request.method, - callback = self.set_method_raw - ) - else: - for i in common.METHOD_OPTIONS: - if i[1] == m: - self.flow.request.method = i[0].upper() - signals.flow_change.send(self, flow = self.flow) - - def set_url(self, url): - request = self.flow.request - try: - request.url = str(url) - except ValueError: - return "Invalid URL." - signals.flow_change.send(self, flow = self.flow) - - def set_resp_status_code(self, status_code): - try: - status_code = int(status_code) - except ValueError: - return None - self.flow.response.status_code = status_code - if status_code in status_codes.RESPONSES: - self.flow.response.reason = status_codes.RESPONSES[status_code] - signals.flow_change.send(self, flow = self.flow) - - def set_resp_reason(self, reason): - self.flow.response.reason = reason - signals.flow_change.send(self, flow = self.flow) - - def set_headers(self, fields, conn): - conn.headers = Headers(fields) - signals.flow_change.send(self, flow = self.flow) - - def set_query(self, lst, conn): - conn.query = lst - signals.flow_change.send(self, flow = self.flow) - - def set_path_components(self, lst, conn): - conn.path_components = lst - signals.flow_change.send(self, flow = self.flow) - - def set_form(self, lst, conn): - conn.urlencoded_form = lst - signals.flow_change.send(self, flow = self.flow) - - def edit_form(self, conn): - self.master.view_grideditor( - grideditor.URLEncodedFormEditor( - self.master, - conn.urlencoded_form.items(multi=True), - self.set_form, - conn - ) - ) - - def edit_form_confirm(self, key, conn): - if key == "y": - self.edit_form(conn) - - def set_cookies(self, lst, conn): - conn.cookies = lst - signals.flow_change.send(self, flow = self.flow) - - def set_setcookies(self, data, conn): - conn.cookies = data - signals.flow_change.send(self, flow = self.flow) - - def edit(self, part): - if self.tab_offset == TAB_REQ: - message = self.flow.request - else: - if not self.flow.response: - self.flow.response = http.HTTPResponse.make(200, b"") - message = self.flow.response - - self.flow.backup() - if message == self.flow.request and part == "c": - self.master.view_grideditor( - grideditor.CookieEditor( - self.master, - message.cookies.items(multi=True), - self.set_cookies, - message - ) - ) - if message == self.flow.response and part == "c": - self.master.view_grideditor( - grideditor.SetCookieEditor( - self.master, - message.cookies.items(multi=True), - self.set_setcookies, - message - ) - ) - if part == "r": - # Fix an issue caused by some editors when editing a - # request/response body. Many editors make it hard to save a - # file without a terminating newline on the last line. When - # editing message bodies, this can cause problems. For now, I - # just strip the newlines off the end of the body when we return - # from an editor. - c = self.master.spawn_editor(message.get_content(strict=False) or b"") - message.content = c.rstrip(b"\n") - elif part == "f": - if not message.urlencoded_form and message.raw_content: - signals.status_prompt_onekey.send( - prompt = "Existing body is not a URL-encoded form. Clear and edit?", - keys = [ - ("yes", "y"), - ("no", "n"), - ], - callback = self.edit_form_confirm, - args = (message,) - ) - else: - self.edit_form(message) - elif part == "h": - self.master.view_grideditor( - grideditor.HeaderEditor( - self.master, - message.headers.fields, - self.set_headers, - message - ) - ) - elif part == "p": - p = message.path_components - self.master.view_grideditor( - grideditor.PathEditor( - self.master, - p, - self.set_path_components, - message - ) - ) - elif part == "q": - self.master.view_grideditor( - grideditor.QueryEditor( - self.master, - message.query.items(multi=True), - self.set_query, message - ) - ) - elif part == "u": - signals.status_prompt.send( - prompt = "URL", - text = message.url, - callback = self.set_url - ) - elif part == "m" and message == self.flow.request: - signals.status_prompt_onekey.send( - prompt = "Method", - keys = common.METHOD_OPTIONS, - callback = self.edit_method - ) - elif part == "o": - signals.status_prompt.send( - prompt = "Code", - text = str(message.status_code), - callback = self.set_resp_status_code - ) - elif part == "m" and message == self.flow.response: - signals.status_prompt.send( - prompt = "Message", - text = message.reason, - callback = self.set_resp_reason - ) - signals.flow_change.send(self, flow = self.flow) - - def view_flow(self, flow): - signals.pop_view_state.send(self) - self.master.view_flow(flow, self.tab_offset) - - def _view_nextprev_flow(self, idx, flow): - if not self.view.inbounds(idx): - signals.status_message.send(message="No more flows") - return - self.view_flow(self.view[idx]) - - def view_next_flow(self, flow): - return self._view_nextprev_flow(self.view.index(flow) + 1, flow) - - def view_prev_flow(self, flow): - return self._view_nextprev_flow(self.view.index(flow) - 1, flow) + return searchable.Searchable(txt) def change_this_display_mode(self, t): view = contentviews.get(t) self.view.settings[self.flow][(self.tab_offset, "prettyview")] = view.name.lower() - signals.flow_change.send(self, flow=self.flow) def keypress(self, size, key): conn = None # type: Optional[Union[http.HTTPRequest, http.HTTPResponse]] @@ -500,112 +296,12 @@ class FlowView(tabs.Tabs): key = super().keypress(size, key) - # Special case: Space moves over to the next flow. - # We need to catch that before applying common.shortcuts() - if key == " ": - self.view_next_flow(self.flow) - return - key = common.shortcuts(key) if key in ("up", "down", "page up", "page down"): # Pass scroll events to the wrapped widget self._w.keypress(size, key) - elif key == "a": - self.flow.resume() - self.master.view.update(self.flow) - elif key == "A": - for f in self.view: - if f.intercepted: - f.resume() - self.master.view.update(self.flow) - elif key == "d": - if self.flow.killable: - self.flow.kill() - self.view.remove(self.flow) - if not self.view.focus.flow: - self.master.view_flowlist() - else: - self.view_flow(self.view.focus.flow) - elif key == "D": - cp = self.flow.copy() - self.master.view.add(cp) - self.master.view.focus.flow = cp - self.view_flow(cp) - signals.status_message.send(message="Duplicated.") - elif key == "p": - self.view_prev_flow(self.flow) - elif key == "r": - try: - self.master.replay_request(self.flow) - except exceptions.ReplayException as e: - signals.add_log("Replay error: %s" % e, "warn") - signals.flow_change.send(self, flow = self.flow) - elif key == "V": - if self.flow.modified(): - self.flow.revert() - signals.flow_change.send(self, flow = self.flow) - signals.status_message.send(message="Reverted.") - else: - signals.status_message.send(message="Flow not modified.") - elif key == "W": - signals.status_prompt_path.send( - prompt = "Save this flow", - callback = self.master.save_one_flow, - args = (self.flow,) - ) - elif key == "|": - signals.status_prompt_path.send( - prompt = "Send flow to script", - callback = self.master.run_script_once, - args = (self.flow,) - ) - elif key == "e": - if self.tab_offset == TAB_REQ: - signals.status_prompt_onekey.send( - prompt="Edit request", - keys=( - ("cookies", "c"), - ("query", "q"), - ("path", "p"), - ("url", "u"), - ("header", "h"), - ("form", "f"), - ("raw body", "r"), - ("method", "m"), - ), - callback=self.edit - ) - elif self.tab_offset == TAB_RESP: - signals.status_prompt_onekey.send( - prompt="Edit response", - keys=( - ("cookies", "c"), - ("code", "o"), - ("message", "m"), - ("header", "h"), - ("raw body", "r"), - ), - callback=self.edit - ) - else: - signals.status_message.send( - message="Tab to the request or response", - expire=1 - ) - elif key in set("bfgmxvzEC") and not conn: - signals.status_message.send( - message = "Tab to the request or response", - expire = 1 - ) - return - elif key == "b": - if self.tab_offset == TAB_REQ: - common.ask_save_body("q", self.flow) - else: - common.ask_save_body("s", self.flow) elif key == "f": self.view.settings[self.flow][(self.tab_offset, "fullcontents")] = True - signals.flow_change.send(self, flow = self.flow) signals.status_message.send(message="Loading all body data...") elif key == "m": opts = [i.name.lower() for i in contentviews.views] @@ -617,44 +313,6 @@ class FlowView(tabs.Tabs): self.change_this_display_mode ) ) - elif key == "E": - pass - # if self.tab_offset == TAB_REQ: - # scope = "q" - # else: - # scope = "s" - # signals.status_prompt_onekey.send( - # self, - # prompt = "Export to file", - # keys = [(e[0], e[1]) for e in export.EXPORTERS], - # callback = common.export_to_clip_or_file, - # args = (scope, self.flow, common.ask_save_path) - # ) - elif key == "C": - pass - # if self.tab_offset == TAB_REQ: - # scope = "q" - # else: - # scope = "s" - # signals.status_prompt_onekey.send( - # self, - # prompt = "Export to clipboard", - # keys = [(e[0], e[1]) for e in export.EXPORTERS], - # callback = common.export_to_clip_or_file, - # args = (scope, self.flow, common.copy_to_clipboard_or_prompt) - # ) - elif key == "x": - conn.content = None - signals.flow_change.send(self, flow=self.flow) - elif key == "v": - if conn.raw_content: - t = conn.headers.get("content-type") - if "EDITOR" in os.environ or "PAGER" in os.environ: - self.master.spawn_external_viewer(conn.get_content(strict=False), t) - else: - signals.status_message.send( - message = "Error! Set $EDITOR or $PAGER." - ) elif key == "z": self.flow.backup() enc = conn.headers.get("content-encoding", "identity") @@ -676,7 +334,6 @@ class FlowView(tabs.Tabs): callback = self.encode_callback, args = (conn,) ) - signals.flow_change.send(self, flow = self.flow) else: # Key is not handled here. return key @@ -688,4 +345,18 @@ class FlowView(tabs.Tabs): "b": "br", } conn.encode(encoding_map[key]) - signals.flow_change.send(self, flow = self.flow) + + +class FlowView(urwid.Frame): + keyctx = "flowview" + + def __init__(self, master): + super().__init__( + FlowDetails(master, 0), + header = FlowViewHeader(master), + ) + self.master = master + + def focus_changed(self, *args, **kwargs): + self.body.focus_changed() + self.header.focus_changed() |