aboutsummaryrefslogtreecommitdiffstats
path: root/libmproxy/console
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@corte.si>2014-01-04 14:34:20 -0800
committerAldo Cortesi <aldo@corte.si>2014-01-04 14:34:20 -0800
commit7d37e0ce10d6684c237833b85280c922ba2926de (patch)
treee15cd7b52c1511aafa7229191e9f7faddbb3e3e2 /libmproxy/console
parent8a599be0602382e0ea5ffbc4018db07b0f379ae7 (diff)
parent799c87767684880469c12d75053fb860f4a0d3c9 (diff)
downloadmitmproxy-7d37e0ce10d6684c237833b85280c922ba2926de.tar.gz
mitmproxy-7d37e0ce10d6684c237833b85280c922ba2926de.tar.bz2
mitmproxy-7d37e0ce10d6684c237833b85280c922ba2926de.zip
Merge pull request #193 from droope/search-functionality
Search functionality
Diffstat (limited to 'libmproxy/console')
-rw-r--r--libmproxy/console/flowlist.py3
-rw-r--r--libmproxy/console/flowview.py266
2 files changed, 231 insertions, 38 deletions
diff --git a/libmproxy/console/flowlist.py b/libmproxy/console/flowlist.py
index 6ba97733..7f11a7e2 100644
--- a/libmproxy/console/flowlist.py
+++ b/libmproxy/console/flowlist.py
@@ -12,6 +12,7 @@ def _mkhelp():
("e", "toggle eventlog"),
("F", "toggle follow flow list"),
("l", "set limit filter pattern"),
+ ("/", "same as above"),
("L", "load saved flows"),
("r", "replay request"),
("V", "revert changes to request"),
@@ -244,7 +245,7 @@ class FlowListBox(urwid.ListBox):
self.master.clear_flows()
elif key == "e":
self.master.toggle_eventlog()
- elif key == "l":
+ elif key == "l" or key == "/":
self.master.prompt("Limit: ", self.master.state.limit_txt, self.master.set_limit)
elif key == "L":
self.master.path_prompt(
diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py
index c19ba5e6..35fc1e43 100644
--- a/libmproxy/console/flowview.py
+++ b/libmproxy/console/flowview.py
@@ -63,6 +63,8 @@ def _mkhelp():
("tab", "toggle request/response view"),
("space", "next flow"),
("|", "run script on this flow"),
+ ("/", "search in response body (case sensitive)"),
+ ("n", "repeat previous search"),
]
text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
return text
@@ -85,7 +87,9 @@ class FlowViewHeader(common.WWrap):
class CallbackCache:
- @utils.LRUCache(200)
+ #commented decorator because it was breaking search functionality (caching after
+ # searches.) If it can be made to only cache the first time, it'd be great.
+ #@utils.LRUCache(200)
def _callback(self, method, *args, **kwargs):
return getattr(self.obj, method)(*args, **kwargs)
@@ -109,8 +113,12 @@ class FlowView(common.WWrap):
("options", "o"),
("edit raw", "e"),
]
+
+ highlight_color = "focusfield"
+
def __init__(self, master, state, flow):
self.master, self.state, self.flow = master, state, flow
+ self.last_displayed_body = None
if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
self.view_response()
else:
@@ -129,7 +137,8 @@ class FlowView(common.WWrap):
limit = sys.maxint
else:
limit = contentview.VIEW_CUTOFF
- return cache.callback(
+
+ description, text_objects = cache.callback(
self, "_cached_content_view",
viewmode,
tuple(tuple(i) for i in conn.headers.lst),
@@ -137,49 +146,84 @@ class FlowView(common.WWrap):
limit
)
- def conn_text(self, conn):
- txt = common.format_keyvals(
+ return (description, text_objects)
+
+ def cont_view_handle_missing(self, conn, viewmode):
+ if conn.content == flow.CONTENT_MISSING:
+ msg, body = "", [urwid.Text([("error", "[content missing]")])], 0
+ else:
+ msg, body = self.content_view(viewmode, conn)
+
+ return (msg, body)
+
+ def viewmode_get(self, override):
+ return self.state.default_body_view if override is None else override
+
+ def override_get(self):
+ return self.state.get_flow_setting(self.flow,
+ (self.state.view_flow_mode, "prettyview"))
+
+ def conn_text_raw(self, conn):
+ """
+ Based on a request/response, conn, returns the elements for
+ display.
+ """
+ headers = common.format_keyvals(
[(h+":", v) for (h, v) in conn.headers.lst],
key = "header",
val = "text"
)
+
if conn.content is not None:
- override = self.state.get_flow_setting(
- self.flow,
- (self.state.view_flow_mode, "prettyview"),
- )
- viewmode = self.state.default_body_view if override is None else override
+ override = self.override_get()
+ viewmode = self.viewmode_get(override)
+ msg, body = self.cont_view_handle_missing(conn, viewmode)
+ elif conn.content == flow.CONTENT_MISSING:
+ pass
- if conn.content == flow.CONTENT_MISSING:
- msg, body = "", [urwid.Text([("error", "[content missing]")])]
- else:
- msg, body = self.content_view(viewmode, conn)
+ return headers, msg, body
- cols = [
- urwid.Text(
- [
- ("heading", msg),
- ]
- )
- ]
- if override is not None:
- cols.append(
- urwid.Text(
- [
- " ",
- ('heading', "["),
- ('heading_key', "m"),
- ('heading', (":%s]"%viewmode.name)),
- ],
- align="right"
- )
+ def conn_text_merge(self, headers, msg, body):
+ """
+ Grabs what is returned by conn_text_raw and merges them all
+ toghether, mainly used by conn_text and search
+ """
+
+ override = self.override_get()
+ viewmode = self.viewmode_get(override)
+
+ cols = [urwid.Text(
+ [
+ ("heading", msg),
+ ]
+ )
+ ]
+
+ if override is not None:
+ cols.append(urwid.Text([
+ " ",
+ ('heading', "["),
+ ('heading_key', "m"),
+ ('heading', (":%s]"%viewmode.name)),
+ ],
+ align="right"
)
- title = urwid.AttrWrap(urwid.Columns(cols), "heading")
- txt.append(title)
- txt.extend(body)
- elif conn.content == flow.CONTENT_MISSING:
- pass
- return urwid.ListBox(txt)
+ )
+
+ title = urwid.AttrWrap(urwid.Columns(cols), "heading")
+ headers.append(title)
+ headers.extend(body)
+
+ return headers
+
+ def conn_text(self, conn):
+ """
+ Same as conn_text_raw, but returns result wrapped in a listbox ready for usage.
+ """
+ headers, msg, body = self.conn_text_raw(conn)
+ merged = self.conn_text_merge(headers, msg, body)
+
+ return urwid.ListBox(merged)
def _tab(self, content, attr):
p = urwid.Text(content)
@@ -215,6 +259,140 @@ class FlowView(common.WWrap):
)
return f
+ def search_wrapped_around(self, last_find_line, last_search_index):
+ """
+ returns true if search wrapped around the bottom.
+ """
+
+ current_find_line = self.state.get_flow_setting(self.flow,
+ "last_find_line")
+ current_search_index = self.state.get_flow_setting(self.flow,
+ "last_search_index")
+
+ if current_find_line <= last_find_line:
+ return True
+ elif current_find_line == last_find_line:
+ if current_search_index <= last_search_index:
+ return True
+
+ return False
+
+ def search(self, search_string):
+ """
+ similar to view_response or view_request, but instead of just
+ displaying the conn, it highlights a word that the user is
+ searching for and handles all the logic surrounding that.
+ """
+
+ if search_string == "":
+ search_string = self.state.get_flow_setting(self.flow,
+ "last_search_string")
+
+ if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
+ text = self.flow.request
+ const = common.VIEW_FLOW_REQUEST
+ else:
+ text = self.flow.response
+ const = common.VIEW_FLOW_RESPONSE
+ if not self.flow.response:
+ return "no response to search in"
+
+ last_find_line = self.state.get_flow_setting(self.flow,
+ "last_find_line")
+ last_search_index = self.state.get_flow_setting(self.flow,
+ "last_search_index")
+
+ # generate the body, highlight the words and get focus
+ headers, msg, body = self.conn_text_raw(text)
+ body, focus_position = self.search_highlight_text(body, search_string)
+
+ if focus_position == None:
+ # no results found.
+ return "no matches for '%s'" % search_string
+
+ # UI stuff.
+ merged = self.conn_text_merge(headers, msg, body)
+ list_box = urwid.ListBox(merged)
+ list_box.set_focus(focus_position + 2)
+ self.w = self.wrap_body(const, list_box)
+ self.master.statusbar.redraw()
+
+ self.last_displayed_body = list_box
+
+ if self.search_wrapped_around(last_find_line, last_search_index):
+ return "search hit BOTTOM, continuing at TOP"
+
+ def search_get_start(self, search_string):
+ start_line = 0
+ start_index = 0
+ last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
+ if search_string == last_search_string:
+ start_line = self.state.get_flow_setting(self.flow, "last_find_line")
+ start_index = self.state.get_flow_setting(self.flow,
+ "last_search_index")
+
+ if start_index == None:
+ start_index = 0
+ else:
+ start_index += len(search_string)
+
+ if start_line == None:
+ start_line = 0
+
+ else:
+ self.state.add_flow_setting(self.flow, "last_search_string",
+ search_string)
+
+ return (start_line, start_index)
+
+ def search_highlight_text(self, text_objects, search_string, looping = False):
+ start_line, start_index = self.search_get_start(search_string)
+ i = start_line
+
+ found = False
+ for text_object in text_objects[start_line:]:
+ if i != start_line:
+ start_index = 0
+
+ text, style = text_object.get_text()
+
+ find_index = text.find(search_string, start_index)
+ if find_index != -1:
+ before = text[:find_index]
+ after = text[find_index+len(search_string):]
+ new_text = urwid.Text(
+ [
+ before,
+ (self.highlight_color, search_string),
+ after,
+ ]
+ )
+
+ self.state.add_flow_setting(self.flow, "last_search_index",
+ find_index)
+ self.state.add_flow_setting(self.flow, "last_find_line", i)
+
+ text_objects[i] = new_text
+
+ found = True
+
+ break
+
+ i += 1
+
+ if found:
+ focus_pos = i
+ else :
+ # loop from the beginning, but not forever.
+ if (start_line == 0 and start_index == 0) or looping:
+ focus_pos = None
+ else:
+ self.state.add_flow_setting(self.flow, "last_search_index", 0)
+ self.state.add_flow_setting(self.flow, "last_find_line", 0)
+ text_objects, focus_pos = self.search_highlight_text(text_objects, search_string, True)
+
+ return text_objects, focus_pos
+
def view_request(self):
self.state.view_flow_mode = common.VIEW_FLOW_REQUEST
body = self.conn_text(self.flow.request)
@@ -574,6 +752,20 @@ class FlowView(common.WWrap):
conn
)
self.master.refresh_flow(self.flow)
+ elif key == "/":
+ last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
+ search_prompt = "Search body ["+last_search_string+"]: " if last_search_string else "Search body: "
+ self.master.prompt(search_prompt,
+ None,
+ self.search)
+ elif key == "n":
+ last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
+ if last_search_string:
+ message = self.search(last_search_string)
+ if message:
+ self.master.statusbar.message(message)
+ else:
+ self.master.statusbar.message("no previous searches have been made")
else:
return key