aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2014-01-07 02:29:10 +0100
committerMaximilian Hils <git@maximilianhils.com>2014-01-07 02:29:10 +0100
commitea2f17680b964e2e793ff804a6f4e7b9d2d7b1ac (patch)
tree84250064ffbebb2974849d083d0129e545848862 /test
parentb34ad82b528b55dabc318f999577fb6a020ccad9 (diff)
parentd5f9b02615bffe56639a7250f31752cebd2b8d62 (diff)
downloadmitmproxy-ea2f17680b964e2e793ff804a6f4e7b9d2d7b1ac.tar.gz
mitmproxy-ea2f17680b964e2e793ff804a6f4e7b9d2d7b1ac.tar.bz2
mitmproxy-ea2f17680b964e2e793ff804a6f4e7b9d2d7b1ac.zip
continue work on the proxyhandler
Diffstat (limited to 'test')
-rw-r--r--test/data/1.css1
-rw-r--r--test/test_console_contentview.py123
-rw-r--r--test/test_controller.py2
-rw-r--r--test/test_dump.py2
-rw-r--r--test/test_flow.py21
-rw-r--r--test/test_proxy.py20
-rw-r--r--test/test_server.py8
-rw-r--r--test/tservers.py4
-rw-r--r--test/tutils.py27
9 files changed, 176 insertions, 32 deletions
diff --git a/test/data/1.css b/test/data/1.css
new file mode 100644
index 00000000..33387ca7
--- /dev/null
+++ b/test/data/1.css
@@ -0,0 +1 @@
+body,html{height:100%}body{font-family:'Open Sans',sans-serif;font-size:1.5em;padding-top:80px}
diff --git a/test/test_console_contentview.py b/test/test_console_contentview.py
index ef44f834..07ecf1d0 100644
--- a/test/test_console_contentview.py
+++ b/test/test_console_contentview.py
@@ -13,6 +13,11 @@ try:
except ImportError:
pyamf = None
+try:
+ import cssutils
+except:
+ cssutils = None
+
class TestContentView:
def test_trailer(self):
@@ -112,6 +117,26 @@ class TestContentView:
assert v([], "[1, 2, 3", 100)
assert v([], "function(a){[1, 2, 3]}", 100)
+ def test_view_css(self):
+ v = cv.ViewCSS()
+
+ with open('./test/data/1.css', 'r') as fp:
+ fixture_1 = fp.read()
+
+ result = v([], 'a', 100)
+
+ if cssutils:
+ assert len(result[1]) == 0
+ else:
+ assert len(result[1]) == 1
+
+ result = v([], fixture_1, 100)
+
+ if cssutils:
+ assert len(result[1]) > 1
+ else:
+ assert len(result[1]) == 1
+
def test_view_hex(self):
v = cv.ViewHex()
assert v([], "foo", 1000)
@@ -250,3 +275,101 @@ if cv.ViewProtobuf.is_available():
def test_get_by_shortcut():
assert cv.get_by_shortcut("h")
+
+def test_search_highlights():
+ # Default text in requests is content. We will search for nt once, and
+ # expect the first bit to be highlighted. We will do it again and expect the
+ # second to be.
+ f = tutils.tflowview()
+
+ f.search("nt")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == ('content', [(None, 2), (f.highlight_color, 2)])
+
+ f.search("nt")
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert text_object.get_text() == ('content', [(None, 5), (f.highlight_color, 2)])
+
+def test_search_returns_useful_messages():
+ f = tutils.tflowview()
+
+ # original string is content. this string should not be in there.
+ response = f.search("oranges and other fruit.")
+ assert response == "no matches for 'oranges and other fruit.'"
+
+def test_search_highlights_clears_prev():
+ f = tutils.tflowview(request_contents="this is string\nstring is string")
+
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
+
+ # search again, it should not be highlighted again.
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() != ('this is string', [(None, 8), (f.highlight_color, 6)])
+
+def test_search_highlights_multi_line():
+ f = tutils.tflowview(request_contents="this is string\nstring is string")
+
+ # should highlight the first line.
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
+
+ # should highlight second line, first appearance of string.
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert text_object.get_text() == ('string is string', [(None, 0), (f.highlight_color, 6)])
+
+ # should highlight third line, second appearance of string.
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert text_object.get_text() == ('string is string', [(None, 10), (f.highlight_color, 6)])
+
+def test_search_loops():
+ f = tutils.tflowview(request_contents="this is string\nstring is string")
+
+ # get to the end.
+ f.search("string")
+ f.search("string")
+ f.search("string")
+
+ # should highlight the first line.
+ message = f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
+ assert message == "search hit BOTTOM, continuing at TOP"
+
+def test_search_focuses():
+ f = tutils.tflowview(request_contents="this is string\nstring is string")
+
+ # should highlight the first line.
+ f.search("string")
+
+ # should be focusing on the 2nd text line.
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert f.last_displayed_body.focus == text_object
+
+def test_search_does_not_crash_on_bad():
+ """
+ this used to crash, kept for reference.
+ """
+
+ f = tutils.tflowview(request_contents="this is string\nstring is string\n"+("A" * cv.VIEW_CUTOFF)+"AFTERCUTOFF")
+ f.search("AFTERCUTOFF")
+
+ # pretend F
+ f.state.add_flow_setting(
+ f.flow,
+ (f.state.view_flow_mode, "fullcontents"),
+ True
+ )
+ f.master.refresh_flow(f.flow)
+
+ # text changed, now this string will exist. can happen when user presses F
+ # for full text view
+ f.search("AFTERCUTOFF")
+
+
diff --git a/test/test_controller.py b/test/test_controller.py
index f6d6b5eb..e71a148e 100644
--- a/test/test_controller.py
+++ b/test/test_controller.py
@@ -6,7 +6,7 @@ class TestMaster:
def test_default_handler(self):
m = controller.Master(None)
msg = mock.MagicMock()
- m.handle(msg)
+ m.handle("type", msg)
assert msg.reply.call_count == 1
diff --git a/test/test_dump.py b/test/test_dump.py
index 3d375f16..9874b650 100644
--- a/test/test_dump.py
+++ b/test/test_dump.py
@@ -114,7 +114,7 @@ class TestDumpMaster:
o = dump.Options(app=True)
s = mock.MagicMock()
m = dump.DumpMaster(s, o, None)
- assert s.apps.add.call_count == 1
+ assert len(m.apps.apps) == 1
def test_replacements(self):
o = dump.Options(replacements=[(".*", "content", "foo")])
diff --git a/test/test_flow.py b/test/test_flow.py
index c614960b..bf6a7a42 100644
--- a/test/test_flow.py
+++ b/test/test_flow.py
@@ -5,6 +5,27 @@ from libmproxy import filt, flow, controller, utils, tnetstring, proxy
import tutils
+def test_app_registry():
+ ar = flow.AppRegistry()
+ ar.add("foo", "domain", 80)
+
+ r = tutils.treq()
+ r.host = "domain"
+ r.port = 80
+ assert ar.get(r)
+
+ r.port = 81
+ assert not ar.get(r)
+
+ r = tutils.treq()
+ r.host = "domain2"
+ r.port = 80
+ assert not ar.get(r)
+ r.headers["host"] = ["domain"]
+ assert ar.get(r)
+
+
+
class TestStickyCookieState:
def _response(self, cookie, host):
s = flow.StickyCookieState(filt.parse(".*"))
diff --git a/test/test_proxy.py b/test/test_proxy.py
index dfccaaf7..371e5ef7 100644
--- a/test/test_proxy.py
+++ b/test/test_proxy.py
@@ -11,26 +11,6 @@ def test_proxy_error():
assert str(p)
-def test_app_registry():
- ar = proxy.AppRegistry()
- ar.add("foo", "domain", 80)
-
- r = tutils.treq()
- r.host = "domain"
- r.port = 80
- assert ar.get(r)
-
- r.port = 81
- assert not ar.get(r)
-
- r = tutils.treq()
- r.host = "domain2"
- r.port = 80
- assert not ar.get(r)
- r.headers["host"] = ["domain"]
- assert ar.get(r)
-
-
class TestServerConnection:
def setUp(self):
self.d = test.Daemon()
diff --git a/test/test_server.py b/test/test_server.py
index 2c94b9df..646460ab 100644
--- a/test/test_server.py
+++ b/test/test_server.py
@@ -176,10 +176,10 @@ class TestHTTPAuth(tservers.HTTPProxTest):
class TestHTTPConnectSSLError(tservers.HTTPProxTest):
certfile = True
def test_go(self):
- p = self.pathoc()
- req = "connect:'localhost:%s'"%self.proxy.port
- assert p.request(req).status_code == 200
- assert p.request(req).status_code == 400
+ p = self.pathoc_raw()
+ dst = ("localhost", self.proxy.port)
+ p.connect(connect_to=dst)
+ tutils.raises("400 - Bad Request", p.http_connect, dst)
class TestHTTPS(tservers.HTTPProxTest, CommonMixin):
diff --git a/test/tservers.py b/test/tservers.py
index f886d42d..ac95b168 100644
--- a/test/tservers.py
+++ b/test/tservers.py
@@ -23,10 +23,10 @@ def errapp(environ, start_response):
class TestMaster(flow.FlowMaster):
def __init__(self, testq, config):
s = proxy.ProxyServer(config, 0)
- s.apps.add(testapp, "testapp", 80)
- s.apps.add(errapp, "errapp", 80)
state = flow.State()
flow.FlowMaster.__init__(self, s, state)
+ self.apps.add(testapp, "testapp", 80)
+ self.apps.add(errapp, "errapp", 80)
self.testq = testq
self.clear_log()
self.start_app(APP_HOST, APP_PORT, False)
diff --git a/test/tutils.py b/test/tutils.py
index 4cd7b7f8..d6332107 100644
--- a/test/tutils.py
+++ b/test/tutils.py
@@ -1,8 +1,11 @@
import os, shutil, tempfile
from contextlib import contextmanager
from libmproxy import flow, utils, controller
+from libmproxy.console.flowview import FlowView
+from libmproxy.console import ConsoleState
from netlib import certutils
from nose.plugins.skip import SkipTest
+from mock import Mock
def _SkipWindows():
raise SkipTest("Skipped on Windows.")
@@ -12,13 +15,14 @@ def SkipWindows(fn):
else:
return fn
-def treq(conn=None):
+def treq(conn=None, content="content"):
if not conn:
conn = flow.ClientConnect(("address", 22))
conn.reply = controller.DummyReply()
headers = flow.ODictCaseless()
headers["header"] = ["qvalue"]
- r = flow.Request(conn, (1, 1), "host", 80, "http", "GET", "/path", headers, "content")
+ r = flow.Request(conn, (1, 1), "host", 80, "http", "GET", "/path", headers,
+ content)
r.reply = controller.DummyReply()
return r
@@ -41,8 +45,9 @@ def terr(req=None):
return err
-def tflow():
- r = treq()
+def tflow(r=None):
+ if r == None:
+ r = treq()
return flow.Flow(r)
@@ -57,6 +62,20 @@ def tflow_err():
f.error = terr(f.request)
return f
+def tflowview(request_contents=None):
+ m = Mock()
+ cs = ConsoleState()
+ if request_contents == None:
+ flow = tflow()
+ else:
+ req = treq(None, request_contents)
+ flow = tflow(req)
+
+ fv = FlowView(m, cs, flow)
+ return fv
+
+def get_body_line(last_displayed_body, line_nb):
+ return last_displayed_body.contents()[line_nb + 2]
@contextmanager
def tmpdir(*args, **kwargs):