aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@corte.si>2014-01-04 14:34:20 -0800
committerAldo Cortesi <aldo@corte.si>2014-01-04 14:34:20 -0800
commit7d37e0ce10d6684c237833b85280c922ba2926de (patch)
treee15cd7b52c1511aafa7229191e9f7faddbb3e3e2
parent8a599be0602382e0ea5ffbc4018db07b0f379ae7 (diff)
parent799c87767684880469c12d75053fb860f4a0d3c9 (diff)
downloadmitmproxy-7d37e0ce10d6684c237833b85280c922ba2926de.tar.gz
mitmproxy-7d37e0ce10d6684c237833b85280c922ba2926de.tar.bz2
mitmproxy-7d37e0ce10d6684c237833b85280c922ba2926de.zip
Merge pull request #193 from droope/search-functionality
Search functionality
-rw-r--r--libmproxy/console/flowlist.py3
-rw-r--r--libmproxy/console/flowview.py266
-rw-r--r--libmproxy/flow.py2
-rw-r--r--test/test_console_contentview.py98
-rw-r--r--test/tutils.py27
5 files changed, 353 insertions, 43 deletions
diff --git a/libmproxy/console/flowlist.py b/libmproxy/console/flowlist.py
index 6ba97733..7f11a7e2 100644
--- a/libmproxy/console/flowlist.py
+++ b/libmproxy/console/flowlist.py
@@ -12,6 +12,7 @@ def _mkhelp():
("e", "toggle eventlog"),
("F", "toggle follow flow list"),
("l", "set limit filter pattern"),
+ ("/", "same as above"),
("L", "load saved flows"),
("r", "replay request"),
("V", "revert changes to request"),
@@ -244,7 +245,7 @@ class FlowListBox(urwid.ListBox):
self.master.clear_flows()
elif key == "e":
self.master.toggle_eventlog()
- elif key == "l":
+ elif key == "l" or key == "/":
self.master.prompt("Limit: ", self.master.state.limit_txt, self.master.set_limit)
elif key == "L":
self.master.path_prompt(
diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py
index c19ba5e6..35fc1e43 100644
--- a/libmproxy/console/flowview.py
+++ b/libmproxy/console/flowview.py
@@ -63,6 +63,8 @@ def _mkhelp():
("tab", "toggle request/response view"),
("space", "next flow"),
("|", "run script on this flow"),
+ ("/", "search in response body (case sensitive)"),
+ ("n", "repeat previous search"),
]
text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
return text
@@ -85,7 +87,9 @@ class FlowViewHeader(common.WWrap):
class CallbackCache:
- @utils.LRUCache(200)
+ #commented decorator because it was breaking search functionality (caching after
+ # searches.) If it can be made to only cache the first time, it'd be great.
+ #@utils.LRUCache(200)
def _callback(self, method, *args, **kwargs):
return getattr(self.obj, method)(*args, **kwargs)
@@ -109,8 +113,12 @@ class FlowView(common.WWrap):
("options", "o"),
("edit raw", "e"),
]
+
+ highlight_color = "focusfield"
+
def __init__(self, master, state, flow):
self.master, self.state, self.flow = master, state, flow
+ self.last_displayed_body = None
if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE:
self.view_response()
else:
@@ -129,7 +137,8 @@ class FlowView(common.WWrap):
limit = sys.maxint
else:
limit = contentview.VIEW_CUTOFF
- return cache.callback(
+
+ description, text_objects = cache.callback(
self, "_cached_content_view",
viewmode,
tuple(tuple(i) for i in conn.headers.lst),
@@ -137,49 +146,84 @@ class FlowView(common.WWrap):
limit
)
- def conn_text(self, conn):
- txt = common.format_keyvals(
+ return (description, text_objects)
+
+ def cont_view_handle_missing(self, conn, viewmode):
+ if conn.content == flow.CONTENT_MISSING:
+ msg, body = "", [urwid.Text([("error", "[content missing]")])], 0
+ else:
+ msg, body = self.content_view(viewmode, conn)
+
+ return (msg, body)
+
+ def viewmode_get(self, override):
+ return self.state.default_body_view if override is None else override
+
+ def override_get(self):
+ return self.state.get_flow_setting(self.flow,
+ (self.state.view_flow_mode, "prettyview"))
+
+ def conn_text_raw(self, conn):
+ """
+ Based on a request/response, conn, returns the elements for
+ display.
+ """
+ headers = common.format_keyvals(
[(h+":", v) for (h, v) in conn.headers.lst],
key = "header",
val = "text"
)
+
if conn.content is not None:
- override = self.state.get_flow_setting(
- self.flow,
- (self.state.view_flow_mode, "prettyview"),
- )
- viewmode = self.state.default_body_view if override is None else override
+ override = self.override_get()
+ viewmode = self.viewmode_get(override)
+ msg, body = self.cont_view_handle_missing(conn, viewmode)
+ elif conn.content == flow.CONTENT_MISSING:
+ pass
- if conn.content == flow.CONTENT_MISSING:
- msg, body = "", [urwid.Text([("error", "[content missing]")])]
- else:
- msg, body = self.content_view(viewmode, conn)
+ return headers, msg, body
- cols = [
- urwid.Text(
- [
- ("heading", msg),
- ]
- )
- ]
- if override is not None:
- cols.append(
- urwid.Text(
- [
- " ",
- ('heading', "["),
- ('heading_key', "m"),
- ('heading', (":%s]"%viewmode.name)),
- ],
- align="right"
- )
+ def conn_text_merge(self, headers, msg, body):
+ """
+ Grabs what is returned by conn_text_raw and merges them all
+ toghether, mainly used by conn_text and search
+ """
+
+ override = self.override_get()
+ viewmode = self.viewmode_get(override)
+
+ cols = [urwid.Text(
+ [
+ ("heading", msg),
+ ]
+ )
+ ]
+
+ if override is not None:
+ cols.append(urwid.Text([
+ " ",
+ ('heading', "["),
+ ('heading_key', "m"),
+ ('heading', (":%s]"%viewmode.name)),
+ ],
+ align="right"
)
- title = urwid.AttrWrap(urwid.Columns(cols), "heading")
- txt.append(title)
- txt.extend(body)
- elif conn.content == flow.CONTENT_MISSING:
- pass
- return urwid.ListBox(txt)
+ )
+
+ title = urwid.AttrWrap(urwid.Columns(cols), "heading")
+ headers.append(title)
+ headers.extend(body)
+
+ return headers
+
+ def conn_text(self, conn):
+ """
+ Same as conn_text_raw, but returns result wrapped in a listbox ready for usage.
+ """
+ headers, msg, body = self.conn_text_raw(conn)
+ merged = self.conn_text_merge(headers, msg, body)
+
+ return urwid.ListBox(merged)
def _tab(self, content, attr):
p = urwid.Text(content)
@@ -215,6 +259,140 @@ class FlowView(common.WWrap):
)
return f
+ def search_wrapped_around(self, last_find_line, last_search_index):
+ """
+ returns true if search wrapped around the bottom.
+ """
+
+ current_find_line = self.state.get_flow_setting(self.flow,
+ "last_find_line")
+ current_search_index = self.state.get_flow_setting(self.flow,
+ "last_search_index")
+
+ if current_find_line <= last_find_line:
+ return True
+ elif current_find_line == last_find_line:
+ if current_search_index <= last_search_index:
+ return True
+
+ return False
+
+ def search(self, search_string):
+ """
+ similar to view_response or view_request, but instead of just
+ displaying the conn, it highlights a word that the user is
+ searching for and handles all the logic surrounding that.
+ """
+
+ if search_string == "":
+ search_string = self.state.get_flow_setting(self.flow,
+ "last_search_string")
+
+ if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST:
+ text = self.flow.request
+ const = common.VIEW_FLOW_REQUEST
+ else:
+ text = self.flow.response
+ const = common.VIEW_FLOW_RESPONSE
+ if not self.flow.response:
+ return "no response to search in"
+
+ last_find_line = self.state.get_flow_setting(self.flow,
+ "last_find_line")
+ last_search_index = self.state.get_flow_setting(self.flow,
+ "last_search_index")
+
+ # generate the body, highlight the words and get focus
+ headers, msg, body = self.conn_text_raw(text)
+ body, focus_position = self.search_highlight_text(body, search_string)
+
+ if focus_position == None:
+ # no results found.
+ return "no matches for '%s'" % search_string
+
+ # UI stuff.
+ merged = self.conn_text_merge(headers, msg, body)
+ list_box = urwid.ListBox(merged)
+ list_box.set_focus(focus_position + 2)
+ self.w = self.wrap_body(const, list_box)
+ self.master.statusbar.redraw()
+
+ self.last_displayed_body = list_box
+
+ if self.search_wrapped_around(last_find_line, last_search_index):
+ return "search hit BOTTOM, continuing at TOP"
+
+ def search_get_start(self, search_string):
+ start_line = 0
+ start_index = 0
+ last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
+ if search_string == last_search_string:
+ start_line = self.state.get_flow_setting(self.flow, "last_find_line")
+ start_index = self.state.get_flow_setting(self.flow,
+ "last_search_index")
+
+ if start_index == None:
+ start_index = 0
+ else:
+ start_index += len(search_string)
+
+ if start_line == None:
+ start_line = 0
+
+ else:
+ self.state.add_flow_setting(self.flow, "last_search_string",
+ search_string)
+
+ return (start_line, start_index)
+
+ def search_highlight_text(self, text_objects, search_string, looping = False):
+ start_line, start_index = self.search_get_start(search_string)
+ i = start_line
+
+ found = False
+ for text_object in text_objects[start_line:]:
+ if i != start_line:
+ start_index = 0
+
+ text, style = text_object.get_text()
+
+ find_index = text.find(search_string, start_index)
+ if find_index != -1:
+ before = text[:find_index]
+ after = text[find_index+len(search_string):]
+ new_text = urwid.Text(
+ [
+ before,
+ (self.highlight_color, search_string),
+ after,
+ ]
+ )
+
+ self.state.add_flow_setting(self.flow, "last_search_index",
+ find_index)
+ self.state.add_flow_setting(self.flow, "last_find_line", i)
+
+ text_objects[i] = new_text
+
+ found = True
+
+ break
+
+ i += 1
+
+ if found:
+ focus_pos = i
+ else :
+ # loop from the beginning, but not forever.
+ if (start_line == 0 and start_index == 0) or looping:
+ focus_pos = None
+ else:
+ self.state.add_flow_setting(self.flow, "last_search_index", 0)
+ self.state.add_flow_setting(self.flow, "last_find_line", 0)
+ text_objects, focus_pos = self.search_highlight_text(text_objects, search_string, True)
+
+ return text_objects, focus_pos
+
def view_request(self):
self.state.view_flow_mode = common.VIEW_FLOW_REQUEST
body = self.conn_text(self.flow.request)
@@ -574,6 +752,20 @@ class FlowView(common.WWrap):
conn
)
self.master.refresh_flow(self.flow)
+ elif key == "/":
+ last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
+ search_prompt = "Search body ["+last_search_string+"]: " if last_search_string else "Search body: "
+ self.master.prompt(search_prompt,
+ None,
+ self.search)
+ elif key == "n":
+ last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
+ if last_search_string:
+ message = self.search(last_search_string)
+ if message:
+ self.master.statusbar.message(message)
+ else:
+ self.master.statusbar.message("no previous searches have been made")
else:
return key
diff --git a/libmproxy/flow.py b/libmproxy/flow.py
index f5985197..32306513 100644
--- a/libmproxy/flow.py
+++ b/libmproxy/flow.py
@@ -1463,7 +1463,7 @@ class FlowMaster(controller.Master):
def run_script_hook(self, name, *args, **kwargs):
for script in self.scripts:
self.run_single_script_hook(script, name, *args, **kwargs)
-
+
def set_stickycookie(self, txt):
if txt:
flt = filt.parse(txt)
diff --git a/test/test_console_contentview.py b/test/test_console_contentview.py
index b982aff3..07ecf1d0 100644
--- a/test/test_console_contentview.py
+++ b/test/test_console_contentview.py
@@ -275,3 +275,101 @@ if cv.ViewProtobuf.is_available():
def test_get_by_shortcut():
assert cv.get_by_shortcut("h")
+
+def test_search_highlights():
+ # Default text in requests is content. We will search for nt once, and
+ # expect the first bit to be highlighted. We will do it again and expect the
+ # second to be.
+ f = tutils.tflowview()
+
+ f.search("nt")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == ('content', [(None, 2), (f.highlight_color, 2)])
+
+ f.search("nt")
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert text_object.get_text() == ('content', [(None, 5), (f.highlight_color, 2)])
+
+def test_search_returns_useful_messages():
+ f = tutils.tflowview()
+
+ # original string is content. this string should not be in there.
+ response = f.search("oranges and other fruit.")
+ assert response == "no matches for 'oranges and other fruit.'"
+
+def test_search_highlights_clears_prev():
+ f = tutils.tflowview(request_contents="this is string\nstring is string")
+
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
+
+ # search again, it should not be highlighted again.
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() != ('this is string', [(None, 8), (f.highlight_color, 6)])
+
+def test_search_highlights_multi_line():
+ f = tutils.tflowview(request_contents="this is string\nstring is string")
+
+ # should highlight the first line.
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
+
+ # should highlight second line, first appearance of string.
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert text_object.get_text() == ('string is string', [(None, 0), (f.highlight_color, 6)])
+
+ # should highlight third line, second appearance of string.
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert text_object.get_text() == ('string is string', [(None, 10), (f.highlight_color, 6)])
+
+def test_search_loops():
+ f = tutils.tflowview(request_contents="this is string\nstring is string")
+
+ # get to the end.
+ f.search("string")
+ f.search("string")
+ f.search("string")
+
+ # should highlight the first line.
+ message = f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
+ assert message == "search hit BOTTOM, continuing at TOP"
+
+def test_search_focuses():
+ f = tutils.tflowview(request_contents="this is string\nstring is string")
+
+ # should highlight the first line.
+ f.search("string")
+
+ # should be focusing on the 2nd text line.
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert f.last_displayed_body.focus == text_object
+
+def test_search_does_not_crash_on_bad():
+ """
+ this used to crash, kept for reference.
+ """
+
+ f = tutils.tflowview(request_contents="this is string\nstring is string\n"+("A" * cv.VIEW_CUTOFF)+"AFTERCUTOFF")
+ f.search("AFTERCUTOFF")
+
+ # pretend F
+ f.state.add_flow_setting(
+ f.flow,
+ (f.state.view_flow_mode, "fullcontents"),
+ True
+ )
+ f.master.refresh_flow(f.flow)
+
+ # text changed, now this string will exist. can happen when user presses F
+ # for full text view
+ f.search("AFTERCUTOFF")
+
+
diff --git a/test/tutils.py b/test/tutils.py
index 4cd7b7f8..d6332107 100644
--- a/test/tutils.py
+++ b/test/tutils.py
@@ -1,8 +1,11 @@
import os, shutil, tempfile
from contextlib import contextmanager
from libmproxy import flow, utils, controller
+from libmproxy.console.flowview import FlowView
+from libmproxy.console import ConsoleState
from netlib import certutils
from nose.plugins.skip import SkipTest
+from mock import Mock
def _SkipWindows():
raise SkipTest("Skipped on Windows.")
@@ -12,13 +15,14 @@ def SkipWindows(fn):
else:
return fn
-def treq(conn=None):
+def treq(conn=None, content="content"):
if not conn:
conn = flow.ClientConnect(("address", 22))
conn.reply = controller.DummyReply()
headers = flow.ODictCaseless()
headers["header"] = ["qvalue"]
- r = flow.Request(conn, (1, 1), "host", 80, "http", "GET", "/path", headers, "content")
+ r = flow.Request(conn, (1, 1), "host", 80, "http", "GET", "/path", headers,
+ content)
r.reply = controller.DummyReply()
return r
@@ -41,8 +45,9 @@ def terr(req=None):
return err
-def tflow():
- r = treq()
+def tflow(r=None):
+ if r == None:
+ r = treq()
return flow.Flow(r)
@@ -57,6 +62,20 @@ def tflow_err():
f.error = terr(f.request)
return f
+def tflowview(request_contents=None):
+ m = Mock()
+ cs = ConsoleState()
+ if request_contents == None:
+ flow = tflow()
+ else:
+ req = treq(None, request_contents)
+ flow = tflow(req)
+
+ fv = FlowView(m, cs, flow)
+ return fv
+
+def get_body_line(last_displayed_body, line_nb):
+ return last_displayed_body.contents()[line_nb + 2]
@contextmanager
def tmpdir(*args, **kwargs):