aboutsummaryrefslogtreecommitdiffstats
path: root/libmproxy/console/flowview.py
diff options
context:
space:
mode:
Diffstat (limited to 'libmproxy/console/flowview.py')
-rw-r--r--libmproxy/console/flowview.py900
1 files changed, 366 insertions, 534 deletions
diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py
index 89e75aad..632b725e 100644
--- a/libmproxy/console/flowview.py
+++ b/libmproxy/console/flowview.py
@@ -1,12 +1,16 @@
from __future__ import absolute_import
-import os, sys, copy
+import os
+import sys
import urwid
-from . import common, grideditor, contentview
-from .. import utils, flow, controller
+from netlib import odict
+from . import common, grideditor, contentview, signals, searchable, tabs
+from . import flowdetailview
+from .. import utils, controller
from ..protocol.http import HTTPRequest, HTTPResponse, CONTENT_MISSING, decoded
-class SearchError(Exception): pass
+class SearchError(Exception):
+ pass
def _mkhelp():
@@ -19,7 +23,6 @@ def _mkhelp():
("D", "duplicate flow"),
("e", "edit request/response"),
("f", "load full body data"),
- ("g", "copy response(content/headers) to clipboard"),
("m", "change body display mode for this entity"),
(None,
common.highlight_key("automatic", "a") +
@@ -59,18 +62,19 @@ def _mkhelp():
),
("M", "change default body display mode"),
("p", "previous flow"),
+ ("P", "copy response(content/headers) to clipboard"),
("r", "replay request"),
("V", "revert changes to request"),
("v", "view body in external viewer"),
("w", "save all flows matching current limit"),
("W", "save this flow"),
("x", "delete body"),
- ("X", "view flow details"),
("z", "encode/decode a request/response"),
- ("tab", "toggle request/response view"),
+ ("tab", "next tab"),
+ ("h, l", "previous tab, next tab"),
("space", "next flow"),
("|", "run script on this flow"),
- ("/", "search in response body (case sensitive)"),
+ ("/", "search (case sensitive)"),
("n", "repeat search forward"),
("N", "repeat search backwards"),
]
@@ -87,109 +91,129 @@ footer = [
class FlowViewHeader(urwid.WidgetWrap):
def __init__(self, master, f):
self.master, self.flow = master, f
- self._w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost)
-
- def refresh_flow(self, f):
- if f == self.flow:
- self._w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost)
-
+ self._w = common.format_flow(
+ f,
+ False,
+ extended=True,
+ padding=0,
+ hostheader=self.master.showhost
+ )
+ signals.flow_change.connect(self.sig_flow_change)
+
+ def sig_flow_change(self, sender, flow):
+ if flow == self.flow:
+ self._w = common.format_flow(
+ flow,
+ False,
+ extended=True,
+ padding=0,
+ hostheader=self.master.showhost
+ )
-class CallbackCache:
- @utils.LRUCache(200)
- def _callback(self, method, *args, **kwargs):
- return getattr(self.obj, method)(*args, **kwargs)
- def callback(self, obj, method, *args, **kwargs):
- # obj varies!
- self.obj = obj
- return self._callback(method, *args, **kwargs)
-cache = CallbackCache()
+cache = utils.LRUCache(200)
+TAB_REQ = 0
+TAB_RESP = 1
-class FlowView(urwid.WidgetWrap):
- REQ = 0
- RESP = 1
+class FlowView(tabs.Tabs):
highlight_color = "focusfield"
- def __init__(self, master, state, flow):
+ def __init__(self, master, state, flow, tab_offset):
self.master, self.state, self.flow = master, state, flow
+ tabs.Tabs.__init__(self,
+ [
+ (self.tab_request, self.view_request),
+ (self.tab_response, self.view_response),
+ (self.tab_details, self.view_details),
+ ],
+ tab_offset
+ )
+ self.show()
self.last_displayed_body = None
- if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
- self.view_response()
- else:
- self.view_request()
+ signals.flow_change.connect(self.sig_flow_change)
- def _cached_content_view(self, viewmode, hdrItems, content, limit, is_request):
- return contentview.get_content_view(viewmode, hdrItems, content, limit, self.master.add_event, is_request)
+ def tab_request(self):
+ if self.flow.intercepted and not self.flow.reply.acked and not self.flow.response:
+ return "Request intercepted"
+ else:
+ return "Request"
- def content_view(self, viewmode, conn):
- full = self.state.get_flow_setting(
- self.flow,
- (self.state.view_flow_mode, "fullcontents"),
- False
- )
- if full:
- limit = sys.maxint
+ def tab_response(self):
+ if self.flow.intercepted and not self.flow.reply.acked and self.flow.response:
+ return "Response intercepted"
else:
- limit = contentview.VIEW_CUTOFF
- description, text_objects = cache.callback(
- self, "_cached_content_view",
- viewmode,
- tuple(tuple(i) for i in conn.headers.lst),
- conn.content,
- limit,
- isinstance(conn, HTTPRequest)
- )
- return (description, text_objects)
+ return "Response"
- def cont_view_handle_missing(self, conn, viewmode):
- if conn.content == CONTENT_MISSING:
- msg, body = "", [urwid.Text([("error", "[content missing]")])]
- else:
- msg, body = self.content_view(viewmode, conn)
+ def tab_details(self):
+ return "Detail"
+
+ def view_request(self):
+ return self.conn_text(self.flow.request)
+
+ def view_response(self):
+ return self.conn_text(self.flow.response)
+ def view_details(self):
+ return flowdetailview.flowdetails(self.state, self.flow)
+
+ def sig_flow_change(self, sender, flow):
+ if flow == self.flow:
+ self.show()
+
+ def content_view(self, viewmode, conn):
+ if conn.content == CONTENT_MISSING:
+ msg, body = "", [urwid.Text([("error", "[content missing]")])]
return (msg, body)
+ else:
+ full = self.state.get_flow_setting(
+ self.flow,
+ (self.tab_offset, "fullcontents"),
+ False
+ )
+ if full:
+ limit = sys.maxint
+ else:
+ limit = contentview.VIEW_CUTOFF
+ description, text_objects = cache.get(
+ contentview.get_content_view,
+ viewmode,
+ tuple(tuple(i) for i in conn.headers.lst),
+ conn.content,
+ limit,
+ self.master.add_event,
+ isinstance(conn, HTTPRequest)
+ )
+ return (description, text_objects)
- def viewmode_get(self, override):
+ def viewmode_get(self):
+ override = self.state.get_flow_setting(
+ self.flow,
+ (self.tab_offset, "prettyview")
+ )
return self.state.default_body_view if override is None else override
- def override_get(self):
- return self.state.get_flow_setting(self.flow,
- (self.state.view_flow_mode, "prettyview"))
-
- def conn_text_raw(self, conn):
- """
- Based on a request/response, conn, returns the elements for
- display.
- """
- headers = common.format_keyvals(
+ def conn_text(self, conn):
+ if conn:
+ txt = common.format_keyvals(
[(h+":", v) for (h, v) in conn.headers.lst],
key = "header",
val = "text"
)
- override = self.override_get()
- viewmode = self.viewmode_get(override)
- msg, body = self.cont_view_handle_missing(conn, viewmode)
- return headers, msg, body
-
- def conn_text_merge(self, headers, msg, body):
- """
- Grabs what is returned by conn_text_raw and merges them all
- toghether, mainly used by conn_text and search
- """
- override = self.override_get()
- viewmode = self.viewmode_get(override)
-
- cols = [urwid.Text(
- [
- ("heading", msg),
- ]
- )
- ]
+ viewmode = self.viewmode_get()
+ msg, body = self.content_view(viewmode, conn)
- if override is not None:
- cols.append(urwid.Text([
+ cols = [
+ urwid.Text(
+ [
+ ("heading", msg),
+ ]
+ )
+ ]
+ cols.append(
+ urwid.Text(
+ [
" ",
('heading', "["),
('heading_key', "m"),
@@ -198,306 +222,40 @@ class FlowView(urwid.WidgetWrap):
align="right"
)
)
+ title = urwid.AttrWrap(urwid.Columns(cols), "heading")
- title = urwid.AttrWrap(urwid.Columns(cols), "heading")
- headers.append(title)
- headers.extend(body)
-
- return headers
-
- def conn_text(self, conn):
- """
- Same as conn_text_raw, but returns result wrapped in a listbox ready for usage.
- """
- headers, msg, body = self.conn_text_raw(conn)
- merged = self.conn_text_merge(headers, msg, body)
- return urwid.ListBox(merged)
-
- def _tab(self, content, attr):
- p = urwid.Text(content)
- p = urwid.Padding(p, align="left", width=("relative", 100))
- p = urwid.AttrWrap(p, attr)
- return p
-
- def wrap_body(self, active, body):
- parts = []
-
- if self.flow.intercepted and not self.flow.reply.acked and not self.flow.response:
- qt = "Request intercepted"
- else:
- qt = "Request"
- if active == common.VIEW_FLOW_REQUEST:
- parts.append(self._tab(qt, "heading"))
- else:
- parts.append(self._tab(qt, "heading_inactive"))
-
- if self.flow.intercepted and not self.flow.reply.acked and self.flow.response:
- st = "Response intercepted"
- else:
- st = "Response"
- if active == common.VIEW_FLOW_RESPONSE:
- parts.append(self._tab(st, "heading"))
+ txt.append(title)
+ txt.extend(body)
else:
- parts.append(self._tab(st, "heading_inactive"))
-
- h = urwid.Columns(parts)
- f = urwid.Frame(
- body,
- header=h
+ txt = [
+ urwid.Text(""),
+ urwid.Text(
+ [
+ ("highlight", "No response. Press "),
+ ("key", "e"),
+ ("highlight", " and edit any aspect to add one."),
+ ]
)
- return f
-
- def search_wrapped_around(self, last_find_line, last_search_index, backwards):
- """
- returns true if search wrapped around the bottom.
- """
-
- current_find_line = self.state.get_flow_setting(self.flow,
- "last_find_line")
- current_search_index = self.state.get_flow_setting(self.flow,
- "last_search_index")
-
- if not backwards:
- message = "search hit BOTTOM, continuing at TOP"
- if current_find_line <= last_find_line:
- return True, message
- elif current_find_line == last_find_line:
- if current_search_index <= last_search_index:
- return True, message
- else:
- message = "search hit TOP, continuing at BOTTOM"
- if current_find_line >= last_find_line:
- return True, message
- elif current_find_line == last_find_line:
- if current_search_index >= last_search_index:
- return True, message
-
- return False, ""
-
- def search_again(self, backwards=False):
- """
- runs the previous search again, forwards or backwards.
- """
- last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
- if last_search_string:
- message = self.search(last_search_string, backwards)
- if message:
- self.master.statusbar.message(message)
- else:
- message = "no previous searches have been made"
- self.master.statusbar.message(message)
-
- return message
-
- def search(self, search_string, backwards=False):
- """
- similar to view_response or view_request, but instead of just
- displaying the conn, it highlights a word that the user is
- searching for and handles all the logic surrounding that.
- """
-
- if not search_string:
- search_string = self.state.get_flow_setting(self.flow,
- "last_search_string")
- if not search_string:
- return
-
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- text = self.flow.request
- const = common.VIEW_FLOW_REQUEST
- else:
- text = self.flow.response
- const = common.VIEW_FLOW_RESPONSE
- if not self.flow.response:
- return "no response to search in"
-
- last_find_line = self.state.get_flow_setting(self.flow,
- "last_find_line")
- last_search_index = self.state.get_flow_setting(self.flow,
- "last_search_index")
-
- # generate the body, highlight the words and get focus
- headers, msg, body = self.conn_text_raw(text)
- try:
- body, focus_position = self.search_highlight_text(body, search_string, backwards=backwards)
- except SearchError:
- return "Search not supported in this view."
-
- if focus_position == None:
- # no results found.
- return "no matches for '%s'" % search_string
-
- # UI stuff.
- merged = self.conn_text_merge(headers, msg, body)
- list_box = urwid.ListBox(merged)
- list_box.set_focus(focus_position + 2)
- self._w = self.wrap_body(const, list_box)
- self.master.statusbar.redraw()
-
- self.last_displayed_body = list_box
-
- wrapped, wrapped_message = self.search_wrapped_around(last_find_line, last_search_index, backwards)
-
- if wrapped:
- return wrapped_message
-
- def search_get_start(self, search_string):
- start_line = 0
- start_index = 0
- last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
- if search_string == last_search_string:
- start_line = self.state.get_flow_setting(self.flow, "last_find_line")
- start_index = self.state.get_flow_setting(self.flow,
- "last_search_index")
-
- if start_index == None:
- start_index = 0
- else:
- start_index += len(search_string)
-
- if start_line == None:
- start_line = 0
-
- else:
- self.state.add_flow_setting(self.flow, "last_search_string",
- search_string)
-
- return (start_line, start_index)
-
- def search_get_range(self, len_text_objects, start_line, backwards):
- if not backwards:
- loop_range = xrange(start_line, len_text_objects)
- else:
- loop_range = xrange(start_line, -1, -1)
-
- return loop_range
-
- def search_find(self, text, search_string, start_index, backwards):
- if backwards == False:
- find_index = text.find(search_string, start_index)
- else:
- if start_index != 0:
- start_index -= len(search_string)
- else:
- start_index = None
-
- find_index = text.rfind(search_string, 0, start_index)
-
- return find_index
-
- def search_highlight_text(self, text_objects, search_string, looping = False, backwards = False):
- start_line, start_index = self.search_get_start(search_string)
- i = start_line
-
- found = False
- text_objects = copy.deepcopy(text_objects)
- loop_range = self.search_get_range(len(text_objects), start_line, backwards)
- for i in loop_range:
- text_object = text_objects[i]
-
- try:
- text, style = text_object.get_text()
- except AttributeError:
- raise SearchError()
-
- if i != start_line:
- start_index = 0
-
- find_index = self.search_find(text, search_string, start_index, backwards)
-
- if find_index != -1:
- new_text = self.search_highlight_object(text, find_index, search_string)
- text_objects[i] = new_text
-
- found = True
- self.state.add_flow_setting(self.flow, "last_search_index",
- find_index)
- self.state.add_flow_setting(self.flow, "last_find_line", i)
-
- break
-
- # handle search WRAP
- if found:
- focus_pos = i
- else :
- if looping:
- focus_pos = None
- else:
- if not backwards:
- self.state.add_flow_setting(self.flow, "last_search_index", 0)
- self.state.add_flow_setting(self.flow, "last_find_line", 0)
- else:
- self.state.add_flow_setting(self.flow, "last_search_index", None)
- self.state.add_flow_setting(self.flow, "last_find_line", len(text_objects) - 1)
-
- text_objects, focus_pos = self.search_highlight_text(text_objects,
- search_string, looping=True, backwards=backwards)
-
- return text_objects, focus_pos
-
- def search_highlight_object(self, text_object, find_index, search_string):
- """
- just a little abstraction
- """
- before = text_object[:find_index]
- after = text_object[find_index+len(search_string):]
-
- new_text = urwid.Text(
- [
- before,
- (self.highlight_color, search_string),
- after,
]
- )
-
- return new_text
-
- def view_request(self):
- self.state.view_flow_mode = common.VIEW_FLOW_REQUEST
- body = self.conn_text(self.flow.request)
- self._w = self.wrap_body(common.VIEW_FLOW_REQUEST, body)
- self.master.statusbar.redraw()
-
- def view_response(self):
- self.state.view_flow_mode = common.VIEW_FLOW_RESPONSE
- if self.flow.response:
- body = self.conn_text(self.flow.response)
- else:
- body = urwid.ListBox(
- [
- urwid.Text(""),
- urwid.Text(
- [
- ("highlight", "No response. Press "),
- ("key", "e"),
- ("highlight", " and edit any aspect to add one."),
- ]
- )
- ]
- )
- self._w = self.wrap_body(common.VIEW_FLOW_RESPONSE, body)
- self.master.statusbar.redraw()
-
- def refresh_flow(self, c=None):
- if c == self.flow:
- if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and self.flow.response:
- self.view_response()
- else:
- self.view_request()
+ return searchable.Searchable(self.state, txt)
def set_method_raw(self, m):
if m:
self.flow.request.method = m
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def edit_method(self, m):
if m == "e":
- self.master.prompt_edit("Method", self.flow.request.method, self.set_method_raw)
+ signals.status_prompt.send(
+ prompt = "Method",
+ text = self.flow.request.method,
+ callback = self.set_method_raw
+ )
else:
for i in common.METHOD_OPTIONS:
if i[1] == m:
self.flow.request.method = i[0].upper()
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def set_url(self, url):
request = self.flow.request
@@ -505,7 +263,7 @@ class FlowView(urwid.WidgetWrap):
request.url = str(url)
except ValueError:
return "Invalid URL."
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def set_resp_code(self, code):
response = self.flow.response
@@ -516,85 +274,158 @@ class FlowView(urwid.WidgetWrap):
import BaseHTTPServer
if BaseHTTPServer.BaseHTTPRequestHandler.responses.has_key(int(code)):
response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[int(code)][0]
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def set_resp_msg(self, msg):
response = self.flow.response
response.msg = msg
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def set_headers(self, lst, conn):
- conn.headers = flow.ODictCaseless(lst)
+ conn.headers = odict.ODictCaseless(lst)
+ signals.flow_change.send(self, flow = self.flow)
def set_query(self, lst, conn):
- conn.set_query(flow.ODict(lst))
+ conn.set_query(odict.ODict(lst))
+ signals.flow_change.send(self, flow = self.flow)
def set_path_components(self, lst, conn):
- conn.set_path_components([i[0] for i in lst])
+ conn.set_path_components(lst)
+ signals.flow_change.send(self, flow = self.flow)
def set_form(self, lst, conn):
- conn.set_form_urlencoded(flow.ODict(lst))
+ conn.set_form_urlencoded(odict.ODict(lst))
+ signals.flow_change.send(self, flow = self.flow)
def edit_form(self, conn):
self.master.view_grideditor(
- grideditor.URLEncodedFormEditor(self.master, conn.get_form_urlencoded().lst, self.set_form, conn)
+ grideditor.URLEncodedFormEditor(
+ self.master,
+ conn.get_form_urlencoded().lst,
+ self.set_form,
+ conn
+ )
)
def edit_form_confirm(self, key, conn):
if key == "y":
self.edit_form(conn)
+ def set_cookies(self, lst, conn):
+ od = odict.ODict(lst)
+ conn.set_cookies(od)
+ signals.flow_change.send(self, flow = self.flow)
+
+ def set_setcookies(self, data, conn):
+ conn.set_cookies(data)
+ signals.flow_change.send(self, flow = self.flow)
+
def edit(self, part):
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
+ if self.tab_offset == TAB_REQ:
message = self.flow.request
else:
if not self.flow.response:
self.flow.response = HTTPResponse(
self.flow.request.httpversion,
- 200, "OK", flow.ODictCaseless(), ""
+ 200, "OK", odict.ODictCaseless(), ""
)
self.flow.response.reply = controller.DummyReply()
message = self.flow.response
self.flow.backup()
+ if message == self.flow.request and part == "c":
+ self.master.view_grideditor(
+ grideditor.CookieEditor(
+ self.master,
+ message.get_cookies().lst,
+ self.set_cookies,
+ message
+ )
+ )
+ if message == self.flow.response and part == "c":
+ self.master.view_grideditor(
+ grideditor.SetCookieEditor(
+ self.master,
+ message.get_cookies(),
+ self.set_setcookies,
+ message
+ )
+ )
if part == "r":
with decoded(message):
- # Fix an issue caused by some editors when editing a request/response body.
- # Many editors make it hard to save a file without a terminating newline on the last
- # line. When editing message bodies, this can cause problems. For now, I just
- # strip the newlines off the end of the body when we return from an editor.
+ # Fix an issue caused by some editors when editing a
+ # request/response body. Many editors make it hard to save a
+ # file without a terminating newline on the last line. When
+ # editing message bodies, this can cause problems. For now, I
+ # just strip the newlines off the end of the body when we return
+ # from an editor.
c = self.master.spawn_editor(message.content or "")
message.content = c.rstrip("\n")
elif part == "f":
if not message.get_form_urlencoded() and message.content:
- self.master.prompt_onekey(
- "Existing body is not a URL-encoded form. Clear and edit?",
- [
+ signals.status_prompt_onekey.send(
+ prompt = "Existing body is not a URL-encoded form. Clear and edit?",
+ keys = [
("yes", "y"),
("no", "n"),
],
- self.edit_form_confirm,
- message
+ callback = self.edit_form_confirm,
+ args = (message,)
)
else:
self.edit_form(message)
elif part == "h":
- self.master.view_grideditor(grideditor.HeaderEditor(self.master, message.headers.lst, self.set_headers, message))
+ self.master.view_grideditor(
+ grideditor.HeaderEditor(
+ self.master,
+ message.headers.lst,
+ self.set_headers,
+ message
+ )
+ )
elif part == "p":
p = message.get_path_components()
- p = [[i] for i in p]
- self.master.view_grideditor(grideditor.PathEditor(self.master, p, self.set_path_components, message))
+ self.master.view_grideditor(
+ grideditor.PathEditor(
+ self.master,
+ p,
+ self.set_path_components,
+ message
+ )
+ )
elif part == "q":
- self.master.view_grideditor(grideditor.QueryEditor(self.master, message.get_query().lst, self.set_query, message))
- elif part == "u" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- self.master.prompt_edit("URL", message.url, self.set_url)
- elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- self.master.prompt_onekey("Method", common.METHOD_OPTIONS, self.edit_method)
- elif part == "c" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
- self.master.prompt_edit("Code", str(message.code), self.set_resp_code)
- elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
- self.master.prompt_edit("Message", message.msg, self.set_resp_msg)
- self.master.refresh_flow(self.flow)
+ self.master.view_grideditor(
+ grideditor.QueryEditor(
+ self.master,
+ message.get_query().lst,
+ self.set_query, message
+ )
+ )
+ elif part == "u":
+ signals.status_prompt.send(
+ prompt = "URL",
+ text = message.url,
+ callback = self.set_url
+ )
+ elif part == "m":
+ signals.status_prompt_onekey.send(
+ prompt = "Method",
+ keys = common.METHOD_OPTIONS,
+ callback = self.edit_method
+ )
+ elif part == "o":
+ signals.status_prompt.send(
+ prompt = "Code",
+ text = str(message.code),
+ callback = self.set_resp_code
+ )
+ elif part == "m":
+ signals.status_prompt.send(
+ prompt = "Message",
+ text = message.msg,
+ callback = self.set_resp_msg
+ )
+ signals.flow_change.send(self, flow = self.flow)
def _view_nextprev_flow(self, np, flow):
try:
@@ -606,9 +437,10 @@ class FlowView(urwid.WidgetWrap):
else:
new_flow, new_idx = self.state.get_prev(idx)
if new_flow is None:
- self.master.statusbar.message("No more flows!")
- return
- self.master.view_flow(new_flow)
+ signals.status_message.send(message="No more flows!")
+ else:
+ signals.pop_view_state.send(self)
+ self.master.view_flow(new_flow, self.tab_offset)
def view_next_flow(self, flow):
return self._view_nextprev_flow("next", flow)
@@ -619,42 +451,38 @@ class FlowView(urwid.WidgetWrap):
def change_this_display_mode(self, t):
self.state.add_flow_setting(
self.flow,
- (self.state.view_flow_mode, "prettyview"),
+ (self.tab_offset, "prettyview"),
contentview.get_by_shortcut(t)
)
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def delete_body(self, t):
if t == "m":
val = CONTENT_MISSING
else:
val = None
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
+ if self.tab_offset == TAB_REQ:
self.flow.request.content = val
else:
self.flow.response.content = val
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)
def keypress(self, size, key):
+ key = super(self.__class__, self).keypress(size, key)
+
if key == " ":
self.view_next_flow(self.flow)
return
key = common.shortcuts(key)
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
+ if self.tab_offset == TAB_REQ:
conn = self.flow.request
- else:
+ elif self.tab_offset == TAB_RESP:
conn = self.flow.response
+ else:
+ conn = None
- if key == "q":
- self.master.view_flowlist()
- key = None
- elif key == "tab":
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- self.view_response()
- else:
- self.view_request()
- elif key in ("up", "down", "page up", "page down"):
+ if key in ("up", "down", "page up", "page down"):
# Why doesn't this just work??
self._w.keypress(size, key)
elif key == "a":
@@ -663,11 +491,6 @@ class FlowView(urwid.WidgetWrap):
elif key == "A":
self.master.accept_all()
self.master.view_flow(self.flow)
- elif key == "b":
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- common.ask_save_body("q", self.master, self.state, self.flow)
- else:
- common.ask_save_body("s", self.master, self.state, self.flow)
elif key == "d":
if self.state.flow_count() == 1:
self.master.view_flowlist()
@@ -681,134 +504,143 @@ class FlowView(urwid.WidgetWrap):
elif key == "D":
f = self.master.duplicate_flow(self.flow)
self.master.view_flow(f)
- self.master.statusbar.message("Duplicated.")
- elif key == "e":
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- self.master.prompt_onekey(
- "Edit request",
- (
- ("query", "q"),
- ("path", "p"),
- ("url", "u"),
- ("header", "h"),
- ("form", "f"),
- ("raw body", "r"),
- ("method", "m"),
- ),
- self.edit
- )
- else:
- self.master.prompt_onekey(
- "Edit response",
- (
- ("code", "c"),
- ("message", "m"),
- ("header", "h"),
- ("raw body", "r"),
- ),
- self.edit
- )
- key = None
- elif key == "f":
- self.master.statusbar.message("Loading all body data...")
- self.state.add_flow_setting(
- self.flow,
- (self.state.view_flow_mode, "fullcontents"),
- True
- )
- self.master.refresh_flow(self.flow)
- self.master.statusbar.message("")
- elif key == "g":
- if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
- scope = "q"
- else:
- scope = "s"
- common.ask_copy_part(scope, self.flow, self.master, self.state)
- elif key == "m":
- p = list(contentview.view_prompts)
- p.insert(0, ("Clear", "C"))
- self.master.prompt_onekey(
- "Display mode",
- p,
- self.change_this_display_mode
- )
- key = None
+ signals.status_message.send(message="Duplicated.")
elif key == "p":
self.view_prev_flow(self.flow)
elif key == "r":
r = self.master.replay_request(self.flow)
if r:
- self.master.statusbar.message(r)
- self.master.refresh_flow(self.flow)
+ signals.status_message.send(message=r)
+ signals.flow_change.send(self, flow = self.flow)
elif key == "V":
if not self.flow.modified():
- self.master.statusbar.message("Flow not modified.")
+ signals.status_message.send(message="Flow not modified.")
return
self.state.revert(self.flow)
- self.master.refresh_flow(self.flow)
- self.master.statusbar.message("Reverted.")
+ signals.flow_change.send(self, flow = self.flow)
+ signals.status_message.send(message="Reverted.")
elif key == "W":
- self.master.path_prompt(
- "Save this flow: ",
- self.state.last_saveload,
- self.master.save_one_flow,
- self.flow
+ signals.status_prompt_path.send(
+ prompt = "Save this flow",
+ callback = self.master.save_one_flow,
+ args = (self.flow,)
)
- elif key == "v":
- if conn and conn.content:
- t = conn.headers["content-type"] or [None]
- t = t[0]
- if os.environ.has_key("EDITOR") or os.environ.has_key("PAGER"):
- self.master.spawn_external_viewer(conn.content, t)
- else:
- self.master.statusbar.message("Error! Set $EDITOR or $PAGER.")
elif key == "|":
- self.master.path_prompt(
- "Send flow to script: ", self.state.last_script,
- self.master.run_script_once, self.flow
+ signals.status_prompt_path.send(
+ prompt = "Send flow to script",
+ callback = self.master.run_script_once,
+ args = (self.flow,)
)
- elif key == "x":
- self.master.prompt_onekey(
- "Delete body",
- (
- ("completely", "c"),
- ("mark as missing", "m"),
- ),
- self.delete_body
+
+ if not conn and key in set(list("befgmxvz")):
+ signals.status_message.send(
+ message = "Tab to the request or response",
+ expire = 1
)
- key = None
- elif key == "X":
- self.master.view_flowdetails(self.flow)
- elif key == "z":
- if conn:
+ elif conn:
+ if key == "b":
+ if self.tab_offset == TAB_REQ:
+ common.ask_save_body(
+ "q", self.master, self.state, self.flow
+ )
+ else:
+ common.ask_save_body(
+ "s", self.master, self.state, self.flow
+ )
+ elif key == "e":
+ if self.tab_offset == TAB_REQ:
+ signals.status_prompt_onekey.send(
+ prompt = "Edit request",
+ keys = (
+ ("cookies", "c"),
+ ("query", "q"),
+ ("path", "p"),
+ ("url", "u"),
+ ("header", "h"),
+ ("form", "f"),
+ ("raw body", "r"),
+ ("method", "m"),
+ ),
+ callback = self.edit
+ )
+ else:
+ signals.status_prompt_onekey.send(
+ prompt = "Edit response",
+ keys = (
+ ("cookies", "c"),
+ ("code", "o"),
+ ("message", "m"),
+ ("header", "h"),
+ ("raw body", "r"),
+ ),
+ callback = self.edit
+ )
+ key = None
+ elif key == "f":
+ signals.status_message.send(message="Loading all body data...")
+ self.state.add_flow_setting(
+ self.flow,
+ (self.tab_offset, "fullcontents"),
+ True
+ )
+ signals.flow_change.send(self, flow = self.flow)
+ signals.status_message.send(message="")
+ elif key == "P":
+ if self.tab_offset == TAB_REQ:
+ scope = "q"
+ else:
+ scope = "s"
+ common.ask_copy_part(scope, self.flow, self.master, self.state)
+ elif key == "m":
+ p = list(contentview.view_prompts)
+ p.insert(0, ("Clear", "C"))
+ signals.status_prompt_onekey.send(
+ self,
+ prompt = "Display mode",
+ keys = p,
+ callback = self.change_this_display_mode
+ )
+ key = None
+ elif key == "x":
+ signals.status_prompt_onekey.send(
+ prompt = "Delete body",
+ keys = (
+ ("completely", "c"),
+ ("mark as missing", "m"),
+ ),
+ callback = self.delete_body
+ )
+ key = None
+ elif key == "v":
+ if conn.content:
+ t = conn.headers["content-type"] or [None]
+ t = t[0]
+ if os.environ.has_key("EDITOR") or os.environ.has_key("PAGER"):
+ self.master.spawn_external_viewer(conn.content, t)
+ else:
+ signals.status_message.send(
+ message = "Error! Set $EDITOR or $PAGER."
+ )
+ elif key == "z":
self.flow.backup()
e = conn.headers.get_first("content-encoding", "identity")
if e != "identity":
if not conn.decode():
- self.master.statusbar.message("Could not decode - invalid data?")
+ signals.status_message.send(
+ message = "Could not decode - invalid data?"
+ )
else:
- self.master.prompt_onekey(
- "Select encoding: ",
- (
+ signals.status_prompt_onekey.send(
+ prompt = "Select encoding: ",
+ keys = (
("gzip", "z"),
("deflate", "d"),
),
- self.encode_callback,
- conn
+ callback = self.encode_callback,
+ args = (conn,)
)
- self.master.refresh_flow(self.flow)
- elif key == "/":
- last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
- search_prompt = "Search body ["+last_search_string+"]: " if last_search_string else "Search body: "
- self.master.prompt(search_prompt,
- None,
- self.search)
- elif key == "n":
- self.search_again(backwards=False)
- elif key == "N":
- self.search_again(backwards=True)
- else:
- return key
+ signals.flow_change.send(self, flow = self.flow)
+ return key
def encode_callback(self, key, conn):
encoding_map = {
@@ -816,4 +648,4 @@ class FlowView(urwid.WidgetWrap):
"d": "deflate",
}
conn.encode(encoding_map[key])
- self.master.refresh_flow(self.flow)
+ signals.flow_change.send(self, flow = self.flow)