diff options
-rw-r--r-- | mitmproxy/addons/setheaders.py | 16 | ||||
-rw-r--r-- | mitmproxy/addons/view.py | 26 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_anticache.py | 3 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_anticomp.py | 3 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_replace.py | 4 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_setheaders.py | 94 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_stickycookie.py | 173 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_termlog.py | 3 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_view.py | 70 |
9 files changed, 214 insertions, 178 deletions
diff --git a/mitmproxy/addons/setheaders.py b/mitmproxy/addons/setheaders.py index 5695e1e8..601e7521 100644 --- a/mitmproxy/addons/setheaders.py +++ b/mitmproxy/addons/setheaders.py @@ -14,13 +14,15 @@ class SetHeaders: header: Header name. value: Header value string """ - for fpatt, header, value in options.setheaders: - flt = flowfilter.parse(fpatt) - if not flt: - raise exceptions.OptionsError( - "Invalid setheader filter pattern %s" % fpatt - ) - self.lst.append((fpatt, header, value, flt)) + if "setheaders" in updated: + self.lst = [] + for fpatt, header, value in options.setheaders: + flt = flowfilter.parse(fpatt) + if not flt: + raise exceptions.OptionsError( + "Invalid setheader filter pattern %s" % fpatt + ) + self.lst.append((fpatt, header, value, flt)) def run(self, f, hdrs): for _, header, value, flt in self.lst: diff --git a/mitmproxy/addons/view.py b/mitmproxy/addons/view.py index f4c56883..15d842e4 100644 --- a/mitmproxy/addons/view.py +++ b/mitmproxy/addons/view.py @@ -51,13 +51,16 @@ class _OrderKey: return "_order_%s" % id(self) def __call__(self, f): - k = self._key() - s = self.view.settings[f] - if k in s: - return s[k] - val = self.generate(f) - s[k] = val - return val + if f.id in self.view._store: + k = self._key() + s = self.view.settings[f] + if k in s: + return s[k] + val = self.generate(f) + s[k] = val + return val + else: + return self.generate(f) class OrderRequestStart(_OrderKey): @@ -159,11 +162,8 @@ class View(collections.Sequence): # Reflect some methods to the efficient underlying implementation - def bisect(self, f: mitmproxy.flow.Flow) -> int: - v = self._view.bisect(f) - # Bisect returns an item to the RIGHT of the existing entries. - if v == 0: - return v + def _bisect(self, f: mitmproxy.flow.Flow) -> int: + v = self._view.bisect_right(f) return self._rev(v - 1) + 1 def index(self, f: mitmproxy.flow.Flow, start: int = 0, stop: typing.Optional[int] = None) -> int: @@ -349,7 +349,7 @@ class Focus: self.flow = self.view[idx] def _nearest(self, f, v): - return min(v.bisect(f), len(v) - 1) + return min(v._bisect(f), len(v) - 1) def _sig_remove(self, view, flow): if len(view) == 0: diff --git a/test/mitmproxy/addons/test_anticache.py b/test/mitmproxy/addons/test_anticache.py index 851be945..928f2180 100644 --- a/test/mitmproxy/addons/test_anticache.py +++ b/test/mitmproxy/addons/test_anticache.py @@ -1,11 +1,10 @@ from mitmproxy.test import tflow -from .. import mastertest from mitmproxy.addons import anticache from mitmproxy.test import taddons -class TestAntiCache(mastertest.MasterTest): +class TestAntiCache: def test_simple(self): sa = anticache.AntiCache() with taddons.context() as tctx: diff --git a/test/mitmproxy/addons/test_anticomp.py b/test/mitmproxy/addons/test_anticomp.py index eaf8fe53..2a6cf3ce 100644 --- a/test/mitmproxy/addons/test_anticomp.py +++ b/test/mitmproxy/addons/test_anticomp.py @@ -1,11 +1,10 @@ from mitmproxy.test import tflow -from .. import mastertest from mitmproxy.addons import anticomp from mitmproxy.test import taddons -class TestAntiComp(mastertest.MasterTest): +class TestAntiComp: def test_simple(self): sa = anticomp.AntiComp() with taddons.context() as tctx: diff --git a/test/mitmproxy/addons/test_replace.py b/test/mitmproxy/addons/test_replace.py index 94efe9c3..34fa5017 100644 --- a/test/mitmproxy/addons/test_replace.py +++ b/test/mitmproxy/addons/test_replace.py @@ -1,14 +1,14 @@ from mitmproxy.test import tflow from mitmproxy.test import tutils -from .. import mastertest, tservers +from .. import tservers from mitmproxy.addons import replace from mitmproxy import master from mitmproxy import options from mitmproxy import proxy -class TestReplace(mastertest.MasterTest): +class TestReplace: def test_configure(self): r = replace.Replace() updated = set(["replacements"]) diff --git a/test/mitmproxy/addons/test_setheaders.py b/test/mitmproxy/addons/test_setheaders.py index d7bdef61..34395ddf 100644 --- a/test/mitmproxy/addons/test_setheaders.py +++ b/test/mitmproxy/addons/test_setheaders.py @@ -1,21 +1,12 @@ from mitmproxy.test import tflow from mitmproxy.test import tutils - -from .. import mastertest +from mitmproxy.test import taddons from mitmproxy.addons import setheaders from mitmproxy import options -from mitmproxy import proxy - -class TestSetHeaders(mastertest.MasterTest): - def mkmaster(self, **opts): - o = options.Options(**opts) - m = mastertest.RecordingMaster(o, proxy.DummyServer()) - sh = setheaders.SetHeaders() - m.addons.add(sh) - return m, sh +class TestSetHeaders: def test_configure(self): sh = setheaders.SetHeaders() o = options.Options( @@ -27,41 +18,46 @@ class TestSetHeaders(mastertest.MasterTest): ) def test_setheaders(self): - m, sh = self.mkmaster( - setheaders = [ - ("~q", "one", "two"), - ("~s", "one", "three") - ] - ) - f = tflow.tflow() - f.request.headers["one"] = "xxx" - m.request(f) - assert f.request.headers["one"] == "two" - - f = tflow.tflow(resp=True) - f.response.headers["one"] = "xxx" - m.response(f) - assert f.response.headers["one"] == "three" - - m, sh = self.mkmaster( - setheaders = [ - ("~s", "one", "two"), - ("~s", "one", "three") - ] - ) - f = tflow.tflow(resp=True) - f.request.headers["one"] = "xxx" - f.response.headers["one"] = "xxx" - m.response(f) - assert f.response.headers.get_all("one") == ["two", "three"] - - m, sh = self.mkmaster( - setheaders = [ - ("~q", "one", "two"), - ("~q", "one", "three") - ] - ) - f = tflow.tflow() - f.request.headers["one"] = "xxx" - m.request(f) - assert f.request.headers.get_all("one") == ["two", "three"] + sh = setheaders.SetHeaders() + with taddons.context() as tctx: + tctx.configure( + sh, + setheaders = [ + ("~q", "one", "two"), + ("~s", "one", "three") + ] + ) + f = tflow.tflow() + f.request.headers["one"] = "xxx" + sh.request(f) + assert f.request.headers["one"] == "two" + + f = tflow.tflow(resp=True) + f.response.headers["one"] = "xxx" + sh.response(f) + assert f.response.headers["one"] == "three" + + tctx.configure( + sh, + setheaders = [ + ("~s", "one", "two"), + ("~s", "one", "three") + ] + ) + f = tflow.tflow(resp=True) + f.request.headers["one"] = "xxx" + f.response.headers["one"] = "xxx" + sh.response(f) + assert f.response.headers.get_all("one") == ["two", "three"] + + tctx.configure( + sh, + setheaders = [ + ("~q", "one", "two"), + ("~q", "one", "three") + ] + ) + f = tflow.tflow() + f.request.headers["one"] = "xxx" + sh.request(f) + assert f.request.headers.get_all("one") == ["two", "three"] diff --git a/test/mitmproxy/addons/test_stickycookie.py b/test/mitmproxy/addons/test_stickycookie.py index 7f1e83eb..df5d0221 100644 --- a/test/mitmproxy/addons/test_stickycookie.py +++ b/test/mitmproxy/addons/test_stickycookie.py @@ -1,11 +1,9 @@ from mitmproxy.test import tflow from mitmproxy.test import tutils +from mitmproxy.test import taddons -from .. import mastertest from mitmproxy.addons import stickycookie -from mitmproxy import master from mitmproxy import options -from mitmproxy import proxy from mitmproxy.test import tutils as ntutils @@ -14,14 +12,7 @@ def test_domain_match(): assert stickycookie.domain_match("google.com", ".google.com") -class TestStickyCookie(mastertest.MasterTest): - def mk(self): - o = options.Options(stickycookie = ".*") - m = master.Master(o, proxy.DummyServer()) - sc = stickycookie.StickyCookie() - m.addons.add(sc) - return m, sc - +class TestStickyCookie: def test_config(self): sc = stickycookie.StickyCookie() o = options.Options(stickycookie = "~b") @@ -31,103 +22,113 @@ class TestStickyCookie(mastertest.MasterTest): ) def test_simple(self): - m, sc = self.mk() - m.addons.add(sc) - - f = tflow.tflow(resp=True) - f.response.headers["set-cookie"] = "foo=bar" - m.request(f) + sc = stickycookie.StickyCookie() + with taddons.context() as tctx: + tctx.configure(sc, stickycookie=".*") + f = tflow.tflow(resp=True) + f.response.headers["set-cookie"] = "foo=bar" + sc.request(f) - f.reply.acked = False - m.response(f) + f.reply.acked = False + sc.response(f) - assert sc.jar - assert "cookie" not in f.request.headers + assert sc.jar + assert "cookie" not in f.request.headers - f = f.copy() - f.reply.acked = False - m.request(f) - assert f.request.headers["cookie"] == "foo=bar" + f = f.copy() + f.reply.acked = False + sc.request(f) + assert f.request.headers["cookie"] == "foo=bar" - def _response(self, m, sc, cookie, host): + def _response(self, sc, cookie, host): f = tflow.tflow(req=ntutils.treq(host=host, port=80), resp=True) f.response.headers["Set-Cookie"] = cookie - m.response(f) + sc.response(f) return f def test_response(self): - m, sc = self.mk() + sc = stickycookie.StickyCookie() + with taddons.context() as tctx: + tctx.configure(sc, stickycookie=".*") - c = "SSID=mooo; domain=.google.com, FOO=bar; Domain=.google.com; Path=/; " \ - "Expires=Wed, 13-Jan-2021 22:23:01 GMT; Secure; " + c = "SSID=mooo; domain=.google.com, FOO=bar; Domain=.google.com; Path=/; " \ + "Expires=Wed, 13-Jan-2021 22:23:01 GMT; Secure; " - self._response(m, sc, c, "host") - assert not sc.jar.keys() + self._response(sc, c, "host") + assert not sc.jar.keys() - self._response(m, sc, c, "www.google.com") - assert sc.jar.keys() + self._response(sc, c, "www.google.com") + assert sc.jar.keys() - sc.jar.clear() - self._response( - m, sc, "SSID=mooo", "www.google.com" - ) - assert list(sc.jar.keys())[0] == ('www.google.com', 80, '/') + sc.jar.clear() + self._response(sc, "SSID=mooo", "www.google.com") + assert list(sc.jar.keys())[0] == ('www.google.com', 80, '/') def test_response_multiple(self): - m, sc = self.mk() - - # Test setting of multiple cookies - c1 = "somecookie=test; Path=/" - c2 = "othercookie=helloworld; Path=/" - f = self._response(m, sc, c1, "www.google.com") - f.response.headers["Set-Cookie"] = c2 - m.response(f) - googlekey = list(sc.jar.keys())[0] - assert len(sc.jar[googlekey].keys()) == 2 + sc = stickycookie.StickyCookie() + with taddons.context() as tctx: + tctx.configure(sc, stickycookie=".*") + + # Test setting of multiple cookies + c1 = "somecookie=test; Path=/" + c2 = "othercookie=helloworld; Path=/" + f = self._response(sc, c1, "www.google.com") + f.response.headers["Set-Cookie"] = c2 + sc.response(f) + googlekey = list(sc.jar.keys())[0] + assert len(sc.jar[googlekey].keys()) == 2 def test_response_weird(self): - m, sc = self.mk() - - # Test setting of weird cookie keys - f = tflow.tflow(req=ntutils.treq(host="www.google.com", port=80), resp=True) - cs = [ - "foo/bar=hello", - "foo:bar=world", - "foo@bar=fizz", - ] - for c in cs: - f.response.headers["Set-Cookie"] = c - m.response(f) - googlekey = list(sc.jar.keys())[0] - assert len(sc.jar[googlekey].keys()) == len(cs) + sc = stickycookie.StickyCookie() + with taddons.context() as tctx: + tctx.configure(sc, stickycookie=".*") + + # Test setting of weird cookie keys + f = tflow.tflow(req=ntutils.treq(host="www.google.com", port=80), resp=True) + cs = [ + "foo/bar=hello", + "foo:bar=world", + "foo@bar=fizz", + ] + for c in cs: + f.response.headers["Set-Cookie"] = c + sc.response(f) + googlekey = list(sc.jar.keys())[0] + assert len(sc.jar[googlekey].keys()) == len(cs) def test_response_overwrite(self): - m, sc = self.mk() - - # Test overwriting of a cookie value - c1 = "somecookie=helloworld; Path=/" - c2 = "somecookie=newvalue; Path=/" - f = self._response(m, sc, c1, "www.google.com") - f.response.headers["Set-Cookie"] = c2 - m.response(f) - googlekey = list(sc.jar.keys())[0] - assert len(sc.jar[googlekey].keys()) == 1 - assert list(sc.jar[googlekey]["somecookie"].items())[0][1] == "newvalue" + sc = stickycookie.StickyCookie() + with taddons.context() as tctx: + tctx.configure(sc, stickycookie=".*") + + # Test overwriting of a cookie value + c1 = "somecookie=helloworld; Path=/" + c2 = "somecookie=newvalue; Path=/" + f = self._response(sc, c1, "www.google.com") + f.response.headers["Set-Cookie"] = c2 + sc.response(f) + googlekey = list(sc.jar.keys())[0] + assert len(sc.jar[googlekey].keys()) == 1 + assert list(sc.jar[googlekey]["somecookie"].items())[0][1] == "newvalue" def test_response_delete(self): - m, sc = self.mk() + sc = stickycookie.StickyCookie() + with taddons.context() as tctx: + tctx.configure(sc, stickycookie=".*") - # Test that a cookie is be deleted - # by setting the expire time in the past - f = self._response(m, sc, "duffer=zafar; Path=/", "www.google.com") - f.response.headers["Set-Cookie"] = "duffer=; Expires=Thu, 01-Jan-1970 00:00:00 GMT" - m.response(f) - assert not sc.jar.keys() + # Test that a cookie is be deleted + # by setting the expire time in the past + f = self._response(sc, "duffer=zafar; Path=/", "www.google.com") + f.response.headers["Set-Cookie"] = "duffer=; Expires=Thu, 01-Jan-1970 00:00:00 GMT" + sc.response(f) + assert not sc.jar.keys() def test_request(self): - m, sc = self.mk() + sc = stickycookie.StickyCookie() + with taddons.context() as tctx: + tctx.configure(sc, stickycookie=".*") - f = self._response(m, sc, "SSID=mooo", "www.google.com") - assert "cookie" not in f.request.headers - m.request(f) - assert "cookie" in f.request.headers + f = self._response(sc, "SSID=mooo", "www.google.com") + assert "cookie" not in f.request.headers + sc.request(f) + assert "cookie" in f.request.headers diff --git a/test/mitmproxy/addons/test_termlog.py b/test/mitmproxy/addons/test_termlog.py index 1045ec29..880fcb51 100644 --- a/test/mitmproxy/addons/test_termlog.py +++ b/test/mitmproxy/addons/test_termlog.py @@ -1,4 +1,3 @@ -from .. import mastertest import io from mitmproxy.addons import termlog @@ -6,7 +5,7 @@ from mitmproxy import log from mitmproxy.tools import dump -class TestTermLog(mastertest.MasterTest): +class TestTermLog: def test_simple(self): t = termlog.TermLog() sio = io.StringIO() diff --git a/test/mitmproxy/addons/test_view.py b/test/mitmproxy/addons/test_view.py index 5243e9d4..37a8d866 100644 --- a/test/mitmproxy/addons/test_view.py +++ b/test/mitmproxy/addons/test_view.py @@ -7,6 +7,13 @@ from mitmproxy import options from mitmproxy.test import taddons +def tft(*, method="get", start=0): + f = tflow.tflow() + f.request.method = method + f.request.timestamp_start = start + return f + + class Options(options.Options): def __init__( self, @@ -62,18 +69,24 @@ def test_order_generators(): def test_simple(): v = view.View() - f = tflow.tflow() + f = tft(start=1) assert v.store_count() == 0 - f.request.timestamp_start = 1 v.request(f) assert list(v) == [f] + + # These all just call udpate + v.error(f) + v.response(f) + v.intercept(f) + v.resume(f) + assert list(v) == [f] + v.request(f) assert list(v) == [f] assert len(v._store) == 1 assert v.store_count() == 1 - f2 = tflow.tflow() - f2.request.timestamp_start = 3 + f2 = tft(start=3) v.request(f2) assert list(v) == [f, f2] v.request(f2) @@ -84,8 +97,7 @@ def test_simple(): assert not v.inbounds(-1) assert not v.inbounds(100) - f3 = tflow.tflow() - f3.request.timestamp_start = 2 + f3 = tft(start=2) v.request(f3) assert list(v) == [f, f3, f2] v.request(f3) @@ -97,13 +109,6 @@ def test_simple(): assert len(v._store) == 0 -def tft(*, method="get", start=0): - f = tflow.tflow() - f.request.method = method - f.request.timestamp_start = start - return f - - def test_filter(): v = view.View() f = flowfilter.parse("~m get") @@ -160,8 +165,8 @@ def test_reversed(): tutils.raises(IndexError, v.__getitem__, 5) tutils.raises(IndexError, v.__getitem__, -5) - assert v.bisect(v[0]) == 1 - assert v.bisect(v[2]) == 3 + assert v._bisect(v[0]) == 1 + assert v._bisect(v[2]) == 3 def test_update(): @@ -255,6 +260,33 @@ def test_signals(): assert not any([rec_add, rec_update, rec_remove, rec_refresh]) +def test_focus_follow(): + v = view.View() + with taddons.context(options=Options()) as tctx: + tctx.configure(v, focus_follow=True, filter="~m get") + + v.add(tft(start=5)) + assert v.focus.index == 0 + + v.add(tft(start=4)) + assert v.focus.index == 0 + assert v.focus.flow.request.timestamp_start == 4 + + v.add(tft(start=7)) + assert v.focus.index == 2 + assert v.focus.flow.request.timestamp_start == 7 + + mod = tft(method="put", start=6) + v.add(mod) + assert v.focus.index == 2 + assert v.focus.flow.request.timestamp_start == 7 + + mod.request.method = "GET" + v.update(mod) + assert v.focus.index == 2 + assert v.focus.flow.request.timestamp_start == 6 + + def test_focus(): # Special case - initialising with a view that already contains data v = view.View() @@ -273,6 +305,10 @@ def test_focus(): assert f.index == 0 assert f.flow is v[0] + # Try to set to something not in view + tutils.raises(ValueError, f.__setattr__, "flow", tft()) + tutils.raises(ValueError, f.__setattr__, "index", 99) + v.add(tft(start=0)) assert f.index == 1 assert f.flow is v[1] @@ -281,6 +317,10 @@ def test_focus(): assert f.index == 1 assert f.flow is v[1] + f.index = 0 + assert f.index == 0 + f.index = 1 + v.remove(v[1]) assert f.index == 1 assert f.flow is v[1] |