aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@corte.si>2016-10-29 12:19:19 +1300
committerGitHub <noreply@github.com>2016-10-29 12:19:19 +1300
commita3131ac34330537b205ba7332f752c1b1ea93278 (patch)
treedea3958bc9c306bc5b6c6cb849c34ae8a0cb39a1 /test
parent9be34baa403802953e09d4962b755d50af91a503 (diff)
parent005c22445b013fdbce06569966cd86a48201e837 (diff)
downloadmitmproxy-a3131ac34330537b205ba7332f752c1b1ea93278.tar.gz
mitmproxy-a3131ac34330537b205ba7332f752c1b1ea93278.tar.bz2
mitmproxy-a3131ac34330537b205ba7332f752c1b1ea93278.zip
Merge pull request #1683 from cortesi/view
addons.View
Diffstat (limited to 'test')
-rw-r--r--test/mitmproxy/addons/test_view.py275
1 files changed, 275 insertions, 0 deletions
diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py
new file mode 100644
index 00000000..15cf534e
--- /dev/null
+++ b/test/mitmproxy/addons/test_view.py
@@ -0,0 +1,275 @@
+from mitmproxy.addons import view
+from mitmproxy import flowfilter
+
+from .. import tutils
+
+
+def test_simple():
+ v = view.View()
+ f = tutils.tflow()
+ f.request.timestamp_start = 1
+ v.request(f)
+ assert list(v) == [f]
+ v.request(f)
+ assert list(v) == [f]
+ assert len(v._store) == 1
+
+ f2 = tutils.tflow()
+ f2.request.timestamp_start = 3
+ v.request(f2)
+ assert list(v) == [f, f2]
+ v.request(f2)
+ assert list(v) == [f, f2]
+ assert len(v._store) == 2
+
+ f3 = tutils.tflow()
+ f3.request.timestamp_start = 2
+ v.request(f3)
+ assert list(v) == [f, f3, f2]
+ v.request(f3)
+ assert list(v) == [f, f3, f2]
+ assert len(v._store) == 3
+
+
+def tft(*, method="get", start=0):
+ f = tutils.tflow()
+ f.request.method = method
+ f.request.timestamp_start = start
+ return f
+
+
+def test_filter():
+ v = view.View()
+ f = flowfilter.parse("~m get")
+ v.request(tft(method="get"))
+ v.request(tft(method="put"))
+ v.request(tft(method="get"))
+ v.request(tft(method="put"))
+ assert(len(v)) == 4
+ v.set_filter(f)
+ assert [i.request.method for i in v] == ["GET", "GET"]
+ assert len(v._store) == 4
+ v.set_filter(None)
+
+
+def test_order():
+ v = view.View()
+ v.request(tft(method="get", start=1))
+ v.request(tft(method="put", start=2))
+ v.request(tft(method="get", start=3))
+ v.request(tft(method="put", start=4))
+ assert [i.request.timestamp_start for i in v] == [1, 2, 3, 4]
+
+ v.set_order(view.key_request_method)
+ assert [i.request.method for i in v] == ["GET", "GET", "PUT", "PUT"]
+ v.toggle_reversed()
+ assert [i.request.method for i in v] == ["PUT", "PUT", "GET", "GET"]
+
+ v.set_order(view.key_request_start)
+ assert [i.request.timestamp_start for i in v] == [4, 3, 2, 1]
+
+ v.toggle_reversed()
+ assert [i.request.timestamp_start for i in v] == [1, 2, 3, 4]
+
+
+def test_reversed():
+ v = view.View()
+ v.request(tft(start=1))
+ v.request(tft(start=2))
+ v.request(tft(start=3))
+ v.toggle_reversed()
+
+ assert v[0].request.timestamp_start == 3
+ assert v[-1].request.timestamp_start == 1
+ assert v[2].request.timestamp_start == 1
+ tutils.raises(IndexError, v.__getitem__, 5)
+ tutils.raises(IndexError, v.__getitem__, -5)
+
+ assert v.bisect(v[0]) == 1
+ assert v.bisect(v[2]) == 3
+
+
+def test_update():
+ v = view.View()
+ flt = flowfilter.parse("~m get")
+ v.set_filter(flt)
+
+ f = tft(method="get")
+ v.request(f)
+ assert f in v
+
+ f.request.method = "put"
+ v.update(f)
+ assert f not in v
+
+ f.request.method = "get"
+ v.update(f)
+ assert f in v
+
+ v.update(f)
+ assert f in v
+
+
+class Record:
+ def __init__(self):
+ self.calls = []
+
+ def __bool__(self):
+ return bool(self.calls)
+
+ def __repr__(self):
+ return repr(self.calls)
+
+ def __call__(self, *args, **kwargs):
+ self.calls.append((args, kwargs))
+
+
+def test_signals():
+ v = view.View()
+ rec_add = Record()
+ rec_update = Record()
+ rec_remove = Record()
+ rec_refresh = Record()
+
+ def clearrec():
+ rec_add.calls = []
+ rec_update.calls = []
+ rec_remove.calls = []
+ rec_refresh.calls = []
+
+ v.sig_add.connect(rec_add)
+ v.sig_update.connect(rec_update)
+ v.sig_remove.connect(rec_remove)
+ v.sig_refresh.connect(rec_refresh)
+
+ assert not any([rec_add, rec_update, rec_remove, rec_refresh])
+
+ # Simple add
+ v.add(tft())
+ assert rec_add
+ assert not any([rec_update, rec_remove, rec_refresh])
+
+ # Filter change triggers refresh
+ clearrec()
+ v.set_filter(flowfilter.parse("~m put"))
+ assert rec_refresh
+ assert not any([rec_update, rec_add, rec_remove])
+
+ v.set_filter(flowfilter.parse("~m get"))
+
+ # An update that results in a flow being added to the view
+ clearrec()
+ v[0].request.method = "PUT"
+ v.update(v[0])
+ assert rec_remove
+ assert not any([rec_update, rec_refresh, rec_add])
+
+ # An update that does not affect the view just sends update
+ v.set_filter(flowfilter.parse("~m put"))
+ clearrec()
+ v.update(v[0])
+ assert rec_update
+ assert not any([rec_remove, rec_refresh, rec_add])
+
+ # An update for a flow in state but not view does not do anything
+ f = v[0]
+ v.set_filter(flowfilter.parse("~m get"))
+ assert not len(v)
+ clearrec()
+ v.update(f)
+ assert not any([rec_add, rec_update, rec_remove, rec_refresh])
+
+
+def test_focus():
+ # Special case - initialising with a view that already contains data
+ v = view.View()
+ v.add(tft())
+ f = view.Focus(v)
+ assert f.index is 0
+ assert f.flow is v[0]
+
+ # Start empty
+ v = view.View()
+ f = view.Focus(v)
+ assert f.index is None
+ assert f.flow is None
+
+ v.add(tft(start=1))
+ assert f.index == 0
+ assert f.flow is v[0]
+
+ v.add(tft(start=0))
+ assert f.index == 1
+ assert f.flow is v[1]
+
+ v.add(tft(start=2))
+ assert f.index == 1
+ assert f.flow is v[1]
+
+ v.remove(v[1])
+ assert f.index == 1
+ assert f.flow is v[1]
+
+ v.remove(v[1])
+ assert f.index == 0
+ assert f.flow is v[0]
+
+ v.remove(v[0])
+ assert f.index is None
+ assert f.flow is None
+
+ v.add(tft(method="get", start=0))
+ v.add(tft(method="get", start=1))
+ v.add(tft(method="put", start=2))
+ v.add(tft(method="get", start=3))
+
+ f.flow = v[2]
+ assert f.flow.request.method == "PUT"
+
+ filt = flowfilter.parse("~m get")
+ v.set_filter(filt)
+ assert f.index == 2
+
+ filt = flowfilter.parse("~m oink")
+ v.set_filter(filt)
+ assert f.index is None
+
+
+def test_focus_nextprev():
+ v = view.View()
+ # Nops on an empty view
+ v.focus.next()
+ v.focus.prev()
+
+ # Nops on a single-flow view
+ v.add(tft(start=0))
+ assert v.focus.flow == v[0]
+ v.focus.next()
+ assert v.focus.flow == v[0]
+ v.focus.prev()
+ assert v.focus.flow == v[0]
+
+ v.add(tft(start=1))
+ v.focus.next()
+ assert v.focus.flow == v[1]
+ v.focus.next()
+ assert v.focus.flow == v[1]
+ v.focus.prev()
+ assert v.focus.flow == v[0]
+ v.focus.prev()
+ assert v.focus.flow == v[0]
+
+
+def test_settings():
+ v = view.View()
+ f = tft()
+
+ tutils.raises(KeyError, v.settings.__getitem__, f)
+ v.add(f)
+ assert v.settings[f] == {}
+ v.settings[f]["foo"] = "bar"
+ assert v.settings[f]["foo"] == "bar"
+ assert len(list(v.settings)) == 1
+ v.remove(f)
+ tutils.raises(KeyError, v.settings.__getitem__, f)
+ assert not v.settings.keys()