aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/tools/console/flowview.py
diff options
context:
space:
mode:
Diffstat (limited to 'mitmproxy/tools/console/flowview.py')
-rw-r--r--mitmproxy/tools/console/flowview.py411
1 files changed, 41 insertions, 370 deletions
diff --git a/mitmproxy/tools/console/flowview.py b/mitmproxy/tools/console/flowview.py
index b7b7053f..50f0d176 100644
--- a/mitmproxy/tools/console/flowview.py
+++ b/mitmproxy/tools/console/flowview.py
@@ -1,5 +1,4 @@
import math
-import os
import sys
from functools import lru_cache
from typing import Optional, Union # noqa
@@ -7,13 +6,9 @@ from typing import Optional, Union # noqa
import urwid
from mitmproxy import contentviews
-from mitmproxy import exceptions
from mitmproxy import http
-from mitmproxy.net.http import Headers
-from mitmproxy.net.http import status_codes
from mitmproxy.tools.console import common
from mitmproxy.tools.console import flowdetailview
-from mitmproxy.tools.console import grideditor
from mitmproxy.tools.console import overlay
from mitmproxy.tools.console import searchable
from mitmproxy.tools.console import signals
@@ -106,49 +101,51 @@ class FlowViewHeader(urwid.WidgetWrap):
def __init__(
self,
master: "mitmproxy.tools.console.master.ConsoleMaster",
- f: http.HTTPFlow
) -> None:
self.master = master
- self.flow = f
- self._w = common.format_flow(
- f,
- False,
- extended=True,
- hostheader=self.master.options.showhost
- )
- signals.flow_change.connect(self.sig_flow_change)
+ self.focus_changed()
- def sig_flow_change(self, sender, flow):
- if flow == self.flow:
+ def focus_changed(self):
+ if self.master.view.focus.flow:
self._w = common.format_flow(
- flow,
+ self.master.view.focus.flow,
False,
extended=True,
hostheader=self.master.options.showhost
)
+ else:
+ self._w = urwid.Pile([])
TAB_REQ = 0
TAB_RESP = 1
-class FlowView(tabs.Tabs):
+class FlowDetails(tabs.Tabs):
highlight_color = "focusfield"
- def __init__(self, master, view, flow, tab_offset):
- self.master, self.view, self.flow = master, view, flow
- super().__init__(
- [
+ def __init__(self, master, tab_offset):
+ self.master = master
+ super().__init__([], tab_offset)
+ self.show()
+ self.last_displayed_body = None
+
+ def focus_changed(self):
+ if self.master.view.focus.flow:
+ self.tabs = [
(self.tab_request, self.view_request),
(self.tab_response, self.view_response),
(self.tab_details, self.view_details),
- ],
- tab_offset
- )
-
+ ]
self.show()
- self.last_displayed_body = None
- signals.flow_change.connect(self.sig_flow_change)
+
+ @property
+ def view(self):
+ return self.master.view
+
+ @property
+ def flow(self):
+ return self.master.view.focus.flow
def tab_request(self):
if self.flow.intercepted and not self.flow.response:
@@ -174,10 +171,6 @@ class FlowView(tabs.Tabs):
def view_details(self):
return flowdetailview.flowdetails(self.view, self.flow)
- def sig_flow_change(self, sender, flow):
- if flow == self.flow:
- self.show()
-
def content_view(self, viewmode, message):
if message.raw_content is None:
msg, body = "", [urwid.Text([("error", "[content missing]")])]
@@ -288,208 +281,11 @@ class FlowView(tabs.Tabs):
]
)
]
- return searchable.Searchable(self.view, txt)
-
- def set_method_raw(self, m):
- if m:
- self.flow.request.method = m
- signals.flow_change.send(self, flow = self.flow)
-
- def edit_method(self, m):
- if m == "e":
- signals.status_prompt.send(
- prompt = "Method",
- text = self.flow.request.method,
- callback = self.set_method_raw
- )
- else:
- for i in common.METHOD_OPTIONS:
- if i[1] == m:
- self.flow.request.method = i[0].upper()
- signals.flow_change.send(self, flow = self.flow)
-
- def set_url(self, url):
- request = self.flow.request
- try:
- request.url = str(url)
- except ValueError:
- return "Invalid URL."
- signals.flow_change.send(self, flow = self.flow)
-
- def set_resp_status_code(self, status_code):
- try:
- status_code = int(status_code)
- except ValueError:
- return None
- self.flow.response.status_code = status_code
- if status_code in status_codes.RESPONSES:
- self.flow.response.reason = status_codes.RESPONSES[status_code]
- signals.flow_change.send(self, flow = self.flow)
-
- def set_resp_reason(self, reason):
- self.flow.response.reason = reason
- signals.flow_change.send(self, flow = self.flow)
-
- def set_headers(self, fields, conn):
- conn.headers = Headers(fields)
- signals.flow_change.send(self, flow = self.flow)
-
- def set_query(self, lst, conn):
- conn.query = lst
- signals.flow_change.send(self, flow = self.flow)
-
- def set_path_components(self, lst, conn):
- conn.path_components = lst
- signals.flow_change.send(self, flow = self.flow)
-
- def set_form(self, lst, conn):
- conn.urlencoded_form = lst
- signals.flow_change.send(self, flow = self.flow)
-
- def edit_form(self, conn):
- self.master.view_grideditor(
- grideditor.URLEncodedFormEditor(
- self.master,
- conn.urlencoded_form.items(multi=True),
- self.set_form,
- conn
- )
- )
-
- def edit_form_confirm(self, key, conn):
- if key == "y":
- self.edit_form(conn)
-
- def set_cookies(self, lst, conn):
- conn.cookies = lst
- signals.flow_change.send(self, flow = self.flow)
-
- def set_setcookies(self, data, conn):
- conn.cookies = data
- signals.flow_change.send(self, flow = self.flow)
-
- def edit(self, part):
- if self.tab_offset == TAB_REQ:
- message = self.flow.request
- else:
- if not self.flow.response:
- self.flow.response = http.HTTPResponse.make(200, b"")
- message = self.flow.response
-
- self.flow.backup()
- if message == self.flow.request and part == "c":
- self.master.view_grideditor(
- grideditor.CookieEditor(
- self.master,
- message.cookies.items(multi=True),
- self.set_cookies,
- message
- )
- )
- if message == self.flow.response and part == "c":
- self.master.view_grideditor(
- grideditor.SetCookieEditor(
- self.master,
- message.cookies.items(multi=True),
- self.set_setcookies,
- message
- )
- )
- if part == "r":
- # Fix an issue caused by some editors when editing a
- # request/response body. Many editors make it hard to save a
- # file without a terminating newline on the last line. When
- # editing message bodies, this can cause problems. For now, I
- # just strip the newlines off the end of the body when we return
- # from an editor.
- c = self.master.spawn_editor(message.get_content(strict=False) or b"")
- message.content = c.rstrip(b"\n")
- elif part == "f":
- if not message.urlencoded_form and message.raw_content:
- signals.status_prompt_onekey.send(
- prompt = "Existing body is not a URL-encoded form. Clear and edit?",
- keys = [
- ("yes", "y"),
- ("no", "n"),
- ],
- callback = self.edit_form_confirm,
- args = (message,)
- )
- else:
- self.edit_form(message)
- elif part == "h":
- self.master.view_grideditor(
- grideditor.HeaderEditor(
- self.master,
- message.headers.fields,
- self.set_headers,
- message
- )
- )
- elif part == "p":
- p = message.path_components
- self.master.view_grideditor(
- grideditor.PathEditor(
- self.master,
- p,
- self.set_path_components,
- message
- )
- )
- elif part == "q":
- self.master.view_grideditor(
- grideditor.QueryEditor(
- self.master,
- message.query.items(multi=True),
- self.set_query, message
- )
- )
- elif part == "u":
- signals.status_prompt.send(
- prompt = "URL",
- text = message.url,
- callback = self.set_url
- )
- elif part == "m" and message == self.flow.request:
- signals.status_prompt_onekey.send(
- prompt = "Method",
- keys = common.METHOD_OPTIONS,
- callback = self.edit_method
- )
- elif part == "o":
- signals.status_prompt.send(
- prompt = "Code",
- text = str(message.status_code),
- callback = self.set_resp_status_code
- )
- elif part == "m" and message == self.flow.response:
- signals.status_prompt.send(
- prompt = "Message",
- text = message.reason,
- callback = self.set_resp_reason
- )
- signals.flow_change.send(self, flow = self.flow)
-
- def view_flow(self, flow):
- signals.pop_view_state.send(self)
- self.master.view_flow(flow, self.tab_offset)
-
- def _view_nextprev_flow(self, idx, flow):
- if not self.view.inbounds(idx):
- signals.status_message.send(message="No more flows")
- return
- self.view_flow(self.view[idx])
-
- def view_next_flow(self, flow):
- return self._view_nextprev_flow(self.view.index(flow) + 1, flow)
-
- def view_prev_flow(self, flow):
- return self._view_nextprev_flow(self.view.index(flow) - 1, flow)
+ return searchable.Searchable(txt)
def change_this_display_mode(self, t):
view = contentviews.get(t)
self.view.settings[self.flow][(self.tab_offset, "prettyview")] = view.name.lower()
- signals.flow_change.send(self, flow=self.flow)
def keypress(self, size, key):
conn = None # type: Optional[Union[http.HTTPRequest, http.HTTPResponse]]
@@ -500,112 +296,12 @@ class FlowView(tabs.Tabs):
key = super().keypress(size, key)
- # Special case: Space moves over to the next flow.
- # We need to catch that before applying common.shortcuts()
- if key == " ":
- self.view_next_flow(self.flow)
- return
-
key = common.shortcuts(key)
if key in ("up", "down", "page up", "page down"):
# Pass scroll events to the wrapped widget
self._w.keypress(size, key)
- elif key == "a":
- self.flow.resume()
- self.master.view.update(self.flow)
- elif key == "A":
- for f in self.view:
- if f.intercepted:
- f.resume()
- self.master.view.update(self.flow)
- elif key == "d":
- if self.flow.killable:
- self.flow.kill()
- self.view.remove(self.flow)
- if not self.view.focus.flow:
- self.master.view_flowlist()
- else:
- self.view_flow(self.view.focus.flow)
- elif key == "D":
- cp = self.flow.copy()
- self.master.view.add(cp)
- self.master.view.focus.flow = cp
- self.view_flow(cp)
- signals.status_message.send(message="Duplicated.")
- elif key == "p":
- self.view_prev_flow(self.flow)
- elif key == "r":
- try:
- self.master.replay_request(self.flow)
- except exceptions.ReplayException as e:
- signals.add_log("Replay error: %s" % e, "warn")
- signals.flow_change.send(self, flow = self.flow)
- elif key == "V":
- if self.flow.modified():
- self.flow.revert()
- signals.flow_change.send(self, flow = self.flow)
- signals.status_message.send(message="Reverted.")
- else:
- signals.status_message.send(message="Flow not modified.")
- elif key == "W":
- signals.status_prompt_path.send(
- prompt = "Save this flow",
- callback = self.master.save_one_flow,
- args = (self.flow,)
- )
- elif key == "|":
- signals.status_prompt_path.send(
- prompt = "Send flow to script",
- callback = self.master.run_script_once,
- args = (self.flow,)
- )
- elif key == "e":
- if self.tab_offset == TAB_REQ:
- signals.status_prompt_onekey.send(
- prompt="Edit request",
- keys=(
- ("cookies", "c"),
- ("query", "q"),
- ("path", "p"),
- ("url", "u"),
- ("header", "h"),
- ("form", "f"),
- ("raw body", "r"),
- ("method", "m"),
- ),
- callback=self.edit
- )
- elif self.tab_offset == TAB_RESP:
- signals.status_prompt_onekey.send(
- prompt="Edit response",
- keys=(
- ("cookies", "c"),
- ("code", "o"),
- ("message", "m"),
- ("header", "h"),
- ("raw body", "r"),
- ),
- callback=self.edit
- )
- else:
- signals.status_message.send(
- message="Tab to the request or response",
- expire=1
- )
- elif key in set("bfgmxvzEC") and not conn:
- signals.status_message.send(
- message = "Tab to the request or response",
- expire = 1
- )
- return
- elif key == "b":
- if self.tab_offset == TAB_REQ:
- common.ask_save_body("q", self.flow)
- else:
- common.ask_save_body("s", self.flow)
elif key == "f":
self.view.settings[self.flow][(self.tab_offset, "fullcontents")] = True
- signals.flow_change.send(self, flow = self.flow)
signals.status_message.send(message="Loading all body data...")
elif key == "m":
opts = [i.name.lower() for i in contentviews.views]
@@ -617,44 +313,6 @@ class FlowView(tabs.Tabs):
self.change_this_display_mode
)
)
- elif key == "E":
- pass
- # if self.tab_offset == TAB_REQ:
- # scope = "q"
- # else:
- # scope = "s"
- # signals.status_prompt_onekey.send(
- # self,
- # prompt = "Export to file",
- # keys = [(e[0], e[1]) for e in export.EXPORTERS],
- # callback = common.export_to_clip_or_file,
- # args = (scope, self.flow, common.ask_save_path)
- # )
- elif key == "C":
- pass
- # if self.tab_offset == TAB_REQ:
- # scope = "q"
- # else:
- # scope = "s"
- # signals.status_prompt_onekey.send(
- # self,
- # prompt = "Export to clipboard",
- # keys = [(e[0], e[1]) for e in export.EXPORTERS],
- # callback = common.export_to_clip_or_file,
- # args = (scope, self.flow, common.copy_to_clipboard_or_prompt)
- # )
- elif key == "x":
- conn.content = None
- signals.flow_change.send(self, flow=self.flow)
- elif key == "v":
- if conn.raw_content:
- t = conn.headers.get("content-type")
- if "EDITOR" in os.environ or "PAGER" in os.environ:
- self.master.spawn_external_viewer(conn.get_content(strict=False), t)
- else:
- signals.status_message.send(
- message = "Error! Set $EDITOR or $PAGER."
- )
elif key == "z":
self.flow.backup()
enc = conn.headers.get("content-encoding", "identity")
@@ -676,7 +334,6 @@ class FlowView(tabs.Tabs):
callback = self.encode_callback,
args = (conn,)
)
- signals.flow_change.send(self, flow = self.flow)
else:
# Key is not handled here.
return key
@@ -688,4 +345,18 @@ class FlowView(tabs.Tabs):
"b": "br",
}
conn.encode(encoding_map[key])
- signals.flow_change.send(self, flow = self.flow)
+
+
+class FlowView(urwid.Frame):
+ keyctx = "flowview"
+
+ def __init__(self, master):
+ super().__init__(
+ FlowDetails(master, 0),
+ header = FlowViewHeader(master),
+ )
+ self.master = master
+
+ def focus_changed(self, *args, **kwargs):
+ self.body.focus_changed()
+ self.header.focus_changed()