diff options
author | Aldo Cortesi <aldo@corte.si> | 2016-10-29 12:19:19 +1300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-10-29 12:19:19 +1300 |
commit | a3131ac34330537b205ba7332f752c1b1ea93278 (patch) | |
tree | dea3958bc9c306bc5b6c6cb849c34ae8a0cb39a1 /test | |
parent | 9be34baa403802953e09d4962b755d50af91a503 (diff) | |
parent | 005c22445b013fdbce06569966cd86a48201e837 (diff) | |
download | mitmproxy-a3131ac34330537b205ba7332f752c1b1ea93278.tar.gz mitmproxy-a3131ac34330537b205ba7332f752c1b1ea93278.tar.bz2 mitmproxy-a3131ac34330537b205ba7332f752c1b1ea93278.zip |
Merge pull request #1683 from cortesi/view
addons.View
Diffstat (limited to 'test')
-rw-r--r-- | test/mitmproxy/addons/test_view.py | 275 |
1 files changed, 275 insertions, 0 deletions
diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py new file mode 100644 index 00000000..15cf534e --- /dev/null +++ b/test/mitmproxy/addons/test_view.py @@ -0,0 +1,275 @@ +from mitmproxy.addons import view +from mitmproxy import flowfilter + +from .. import tutils + + +def test_simple(): + v = view.View() + f = tutils.tflow() + f.request.timestamp_start = 1 + v.request(f) + assert list(v) == [f] + v.request(f) + assert list(v) == [f] + assert len(v._store) == 1 + + f2 = tutils.tflow() + f2.request.timestamp_start = 3 + v.request(f2) + assert list(v) == [f, f2] + v.request(f2) + assert list(v) == [f, f2] + assert len(v._store) == 2 + + f3 = tutils.tflow() + f3.request.timestamp_start = 2 + v.request(f3) + assert list(v) == [f, f3, f2] + v.request(f3) + assert list(v) == [f, f3, f2] + assert len(v._store) == 3 + + +def tft(*, method="get", start=0): + f = tutils.tflow() + f.request.method = method + f.request.timestamp_start = start + return f + + +def test_filter(): + v = view.View() + f = flowfilter.parse("~m get") + v.request(tft(method="get")) + v.request(tft(method="put")) + v.request(tft(method="get")) + v.request(tft(method="put")) + assert(len(v)) == 4 + v.set_filter(f) + assert [i.request.method for i in v] == ["GET", "GET"] + assert len(v._store) == 4 + v.set_filter(None) + + +def test_order(): + v = view.View() + v.request(tft(method="get", start=1)) + v.request(tft(method="put", start=2)) + v.request(tft(method="get", start=3)) + v.request(tft(method="put", start=4)) + assert [i.request.timestamp_start for i in v] == [1, 2, 3, 4] + + v.set_order(view.key_request_method) + assert [i.request.method for i in v] == ["GET", "GET", "PUT", "PUT"] + v.toggle_reversed() + assert [i.request.method for i in v] == ["PUT", "PUT", "GET", "GET"] + + v.set_order(view.key_request_start) + assert [i.request.timestamp_start for i in v] == [4, 3, 2, 1] + + v.toggle_reversed() + assert [i.request.timestamp_start for i in v] == [1, 2, 3, 4] + + +def test_reversed(): + v = view.View() + v.request(tft(start=1)) + v.request(tft(start=2)) + v.request(tft(start=3)) + v.toggle_reversed() + + assert v[0].request.timestamp_start == 3 + assert v[-1].request.timestamp_start == 1 + assert v[2].request.timestamp_start == 1 + tutils.raises(IndexError, v.__getitem__, 5) + tutils.raises(IndexError, v.__getitem__, -5) + + assert v.bisect(v[0]) == 1 + assert v.bisect(v[2]) == 3 + + +def test_update(): + v = view.View() + flt = flowfilter.parse("~m get") + v.set_filter(flt) + + f = tft(method="get") + v.request(f) + assert f in v + + f.request.method = "put" + v.update(f) + assert f not in v + + f.request.method = "get" + v.update(f) + assert f in v + + v.update(f) + assert f in v + + +class Record: + def __init__(self): + self.calls = [] + + def __bool__(self): + return bool(self.calls) + + def __repr__(self): + return repr(self.calls) + + def __call__(self, *args, **kwargs): + self.calls.append((args, kwargs)) + + +def test_signals(): + v = view.View() + rec_add = Record() + rec_update = Record() + rec_remove = Record() + rec_refresh = Record() + + def clearrec(): + rec_add.calls = [] + rec_update.calls = [] + rec_remove.calls = [] + rec_refresh.calls = [] + + v.sig_add.connect(rec_add) + v.sig_update.connect(rec_update) + v.sig_remove.connect(rec_remove) + v.sig_refresh.connect(rec_refresh) + + assert not any([rec_add, rec_update, rec_remove, rec_refresh]) + + # Simple add + v.add(tft()) + assert rec_add + assert not any([rec_update, rec_remove, rec_refresh]) + + # Filter change triggers refresh + clearrec() + v.set_filter(flowfilter.parse("~m put")) + assert rec_refresh + assert not any([rec_update, rec_add, rec_remove]) + + v.set_filter(flowfilter.parse("~m get")) + + # An update that results in a flow being added to the view + clearrec() + v[0].request.method = "PUT" + v.update(v[0]) + assert rec_remove + assert not any([rec_update, rec_refresh, rec_add]) + + # An update that does not affect the view just sends update + v.set_filter(flowfilter.parse("~m put")) + clearrec() + v.update(v[0]) + assert rec_update + assert not any([rec_remove, rec_refresh, rec_add]) + + # An update for a flow in state but not view does not do anything + f = v[0] + v.set_filter(flowfilter.parse("~m get")) + assert not len(v) + clearrec() + v.update(f) + assert not any([rec_add, rec_update, rec_remove, rec_refresh]) + + +def test_focus(): + # Special case - initialising with a view that already contains data + v = view.View() + v.add(tft()) + f = view.Focus(v) + assert f.index is 0 + assert f.flow is v[0] + + # Start empty + v = view.View() + f = view.Focus(v) + assert f.index is None + assert f.flow is None + + v.add(tft(start=1)) + assert f.index == 0 + assert f.flow is v[0] + + v.add(tft(start=0)) + assert f.index == 1 + assert f.flow is v[1] + + v.add(tft(start=2)) + assert f.index == 1 + assert f.flow is v[1] + + v.remove(v[1]) + assert f.index == 1 + assert f.flow is v[1] + + v.remove(v[1]) + assert f.index == 0 + assert f.flow is v[0] + + v.remove(v[0]) + assert f.index is None + assert f.flow is None + + v.add(tft(method="get", start=0)) + v.add(tft(method="get", start=1)) + v.add(tft(method="put", start=2)) + v.add(tft(method="get", start=3)) + + f.flow = v[2] + assert f.flow.request.method == "PUT" + + filt = flowfilter.parse("~m get") + v.set_filter(filt) + assert f.index == 2 + + filt = flowfilter.parse("~m oink") + v.set_filter(filt) + assert f.index is None + + +def test_focus_nextprev(): + v = view.View() + # Nops on an empty view + v.focus.next() + v.focus.prev() + + # Nops on a single-flow view + v.add(tft(start=0)) + assert v.focus.flow == v[0] + v.focus.next() + assert v.focus.flow == v[0] + v.focus.prev() + assert v.focus.flow == v[0] + + v.add(tft(start=1)) + v.focus.next() + assert v.focus.flow == v[1] + v.focus.next() + assert v.focus.flow == v[1] + v.focus.prev() + assert v.focus.flow == v[0] + v.focus.prev() + assert v.focus.flow == v[0] + + +def test_settings(): + v = view.View() + f = tft() + + tutils.raises(KeyError, v.settings.__getitem__, f) + v.add(f) + assert v.settings[f] == {} + v.settings[f]["foo"] = "bar" + assert v.settings[f]["foo"] == "bar" + assert len(list(v.settings)) == 1 + v.remove(f) + tutils.raises(KeyError, v.settings.__getitem__, f) + assert not v.settings.keys() |