diff options
author | Aldo Cortesi <aldo@corte.si> | 2016-11-03 20:16:26 +1300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-11-03 20:16:26 +1300 |
commit | e300f24bdc15561cbdedbc7e6468b5fefeac9c4f (patch) | |
tree | 47b67ac80c5630fd33f3ce04135b889c026684df | |
parent | 50d393960c44fa76075e3b374f578381fd6588db (diff) | |
parent | d7d6edb3d132a67ff59a3024f6685ef9728dca96 (diff) | |
download | mitmproxy-e300f24bdc15561cbdedbc7e6468b5fefeac9c4f.tar.gz mitmproxy-e300f24bdc15561cbdedbc7e6468b5fefeac9c4f.tar.bz2 mitmproxy-e300f24bdc15561cbdedbc7e6468b5fefeac9c4f.zip |
Merge pull request #1707 from cortesi/taddons2
Addon test suite improvements
-rw-r--r-- | mitmproxy/test/taddons.py | 3 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_script.py | 247 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_serverplayback.py | 492 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_stickyauth.py | 28 |
4 files changed, 434 insertions, 336 deletions
diff --git a/mitmproxy/test/taddons.py b/mitmproxy/test/taddons.py index 7804b90d..a25b6891 100644 --- a/mitmproxy/test/taddons.py +++ b/mitmproxy/test/taddons.py @@ -15,6 +15,9 @@ class RecordingMaster(mitmproxy.master.Master): def add_log(self, e, level): self.event_log.append((level, e)) + def clear(self): + self.event_log = [] + class context: """ diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py index c72dac40..c31f4e9b 100644 --- a/test/mitmproxy/addons/test_script.py +++ b/test/mitmproxy/addons/test_script.py @@ -1,21 +1,67 @@ import traceback - import sys import time +import re from mitmproxy.test import tflow from mitmproxy.test import tutils -import re +from mitmproxy.test import taddons from mitmproxy import exceptions from mitmproxy import options from mitmproxy import proxy -from mitmproxy.addons import script from mitmproxy import master -from .. import mastertest +from mitmproxy.addons import script + +import watchdog.events + from .. import tutils as ttutils +def test_ns(): + n = script.NS({}) + n.one = "one" + assert n.one == "one" + assert n.__dict__["ns"]["one"] == "one" + + +def test_scriptenv(): + with taddons.context() as tctx: + with script.scriptenv("path", []): + raise SystemExit + assert tctx.master.event_log[0][0] == "error" + assert "exited" in tctx.master.event_log[0][1] + + tctx.master.clear() + with script.scriptenv("path", []): + raise ValueError("fooo") + assert tctx.master.event_log[0][0] == "error" + assert "foo" in tctx.master.event_log[0][1] + + +class Called: + def __init__(self): + self.called = False + + def __call__(self, *args, **kwargs): + self.called = True + + +def test_reloadhandler(): + rh = script.ReloadHandler(Called()) + assert not rh.filter(watchdog.events.DirCreatedEvent("path")) + assert not rh.filter(watchdog.events.FileModifiedEvent("/foo/.bar")) + assert rh.filter(watchdog.events.FileModifiedEvent("/foo/bar")) + + assert not rh.callback.called + rh.on_modified(watchdog.events.FileModifiedEvent("/foo/bar")) + assert rh.callback.called + rh.callback.called = False + + rh.on_created(watchdog.events.FileCreatedEvent("foo")) + assert rh.callback.called + + class TestParseCommand: def test_empty_command(self): with tutils.raises(exceptions.AddonError): @@ -64,73 +110,68 @@ def test_load_script(): assert ns.start -class TestScript(mastertest.MasterTest): +class TestScript: def test_simple(self): - o = options.Options() - m = master.Master(o, proxy.DummyServer()) - sc = script.Script( - tutils.test_data.path( - "mitmproxy/data/addonscripts/recorder.py" + with taddons.context(): + sc = script.Script( + tutils.test_data.path( + "mitmproxy/data/addonscripts/recorder.py" + ) ) - ) - m.addons.add(sc) - assert sc.ns.call_log == [ - ("solo", "start", (), {}), - ("solo", "configure", (o, o.keys()), {}) - ] + sc.load_script() + assert sc.ns.call_log == [ + ("solo", "start", (), {}), + ] - sc.ns.call_log = [] - f = tflow.tflow(resp=True) - m.request(f) + sc.ns.call_log = [] + f = tflow.tflow(resp=True) + sc.request(f) - recf = sc.ns.call_log[0] - assert recf[1] == "request" + recf = sc.ns.call_log[0] + assert recf[1] == "request" def test_reload(self): - o = options.Options() - m = mastertest.RecordingMaster(o, proxy.DummyServer()) - with tutils.tmpdir(): - with open("foo.py", "w"): - pass - sc = script.Script("foo.py") - m.addons.add(sc) - - for _ in range(100): - with open("foo.py", "a") as f: - f.write(".") - m.addons.invoke_with_context(sc, "tick") - time.sleep(0.1) - if m.event_log: - return - raise AssertionError("Change event not detected.") + with taddons.context() as tctx: + with tutils.tmpdir(): + with open("foo.py", "w"): + pass + sc = script.Script("foo.py") + tctx.configure(sc) + for _ in range(100): + with open("foo.py", "a") as f: + f.write(".") + sc.tick() + time.sleep(0.1) + if tctx.master.event_log: + return + raise AssertionError("Change event not detected.") def test_exception(self): - o = options.Options() - m = mastertest.RecordingMaster(o, proxy.DummyServer()) - sc = script.Script( - tutils.test_data.path("mitmproxy/data/addonscripts/error.py") - ) - m.addons.add(sc) - f = tflow.tflow(resp=True) - m.request(f) - assert m.event_log[0][0] == "error" - assert len(m.event_log[0][1].splitlines()) == 6 - assert re.search('addonscripts/error.py", line \d+, in request', m.event_log[0][1]) - assert re.search('addonscripts/error.py", line \d+, in mkerr', m.event_log[0][1]) - assert m.event_log[0][1].endswith("ValueError: Error!\n") + with taddons.context() as tctx: + sc = script.Script( + tutils.test_data.path("mitmproxy/data/addonscripts/error.py") + ) + sc.start() + f = tflow.tflow(resp=True) + sc.request(f) + assert tctx.master.event_log[0][0] == "error" + assert len(tctx.master.event_log[0][1].splitlines()) == 6 + assert re.search('addonscripts/error.py", line \d+, in request', tctx.master.event_log[0][1]) + assert re.search('addonscripts/error.py", line \d+, in mkerr', tctx.master.event_log[0][1]) + assert tctx.master.event_log[0][1].endswith("ValueError: Error!\n") def test_addon(self): - o = options.Options() - m = master.Master(o, proxy.DummyServer()) - sc = script.Script( - tutils.test_data.path( - "mitmproxy/data/addonscripts/addon.py" + with taddons.context() as tctx: + sc = script.Script( + tutils.test_data.path( + "mitmproxy/data/addonscripts/addon.py" + ) ) - ) - m.addons.add(sc) - assert sc.ns.event_log == [ - 'scriptstart', 'addonstart', 'addonconfigure' - ] + sc.start() + tctx.configure(sc) + assert sc.ns.event_log == [ + 'scriptstart', 'addonstart', 'addonconfigure' + ] class TestCutTraceback: @@ -151,7 +192,7 @@ class TestCutTraceback: assert len(traceback.extract_tb(tb_cut2)) == len(traceback.extract_tb(tb)) -class TestScriptLoader(mastertest.MasterTest): +class TestScriptLoader: def test_run_once(self): o = options.Options(scripts=[]) m = master.Master(o, proxy.DummyServer()) @@ -199,44 +240,48 @@ class TestScriptLoader(mastertest.MasterTest): def test_order(self): rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder.py") - - o = options.Options( - scripts = [ - "%s %s" % (rec, "a"), - "%s %s" % (rec, "b"), - "%s %s" % (rec, "c"), - ] - ) - m = mastertest.RecordingMaster(o, proxy.DummyServer()) sc = script.ScriptLoader() - m.addons.add(sc) - - debug = [(i[0], i[1]) for i in m.event_log if i[0] == "debug"] - assert debug == [ - ('debug', 'a start'), ('debug', 'a configure'), - ('debug', 'b start'), ('debug', 'b configure'), - ('debug', 'c start'), ('debug', 'c configure') - ] - m.event_log[:] = [] - - o.scripts = [ - "%s %s" % (rec, "c"), - "%s %s" % (rec, "a"), - "%s %s" % (rec, "b"), - ] - debug = [(i[0], i[1]) for i in m.event_log if i[0] == "debug"] - # No events, only order has changed - assert debug == [] - - o.scripts = [ - "%s %s" % (rec, "x"), - "%s %s" % (rec, "a"), - ] - debug = [(i[0], i[1]) for i in m.event_log if i[0] == "debug"] - assert debug == [ - ('debug', 'c done'), - ('debug', 'b done'), - ('debug', 'x start'), - ('debug', 'x configure'), - ('debug', 'a configure'), - ] + with taddons.context() as tctx: + tctx.master.addons.add(sc) + tctx.configure( + sc, + scripts = [ + "%s %s" % (rec, "a"), + "%s %s" % (rec, "b"), + "%s %s" % (rec, "c"), + ] + ) + debug = [(i[0], i[1]) for i in tctx.master.event_log if i[0] == "debug"] + assert debug == [ + ('debug', 'a start'), ('debug', 'a configure'), + ('debug', 'b start'), ('debug', 'b configure'), + ('debug', 'c start'), ('debug', 'c configure') + ] + tctx.master.event_log = [] + tctx.configure( + sc, + scripts = [ + "%s %s" % (rec, "c"), + "%s %s" % (rec, "a"), + "%s %s" % (rec, "b"), + ] + ) + debug = [(i[0], i[1]) for i in tctx.master.event_log if i[0] == "debug"] + # No events, only order has changed + assert debug == [] + + tctx.master.event_log = [] + tctx.configure( + sc, + scripts = [ + "%s %s" % (rec, "x"), + "%s %s" % (rec, "a"), + ] + ) + debug = [(i[0], i[1]) for i in tctx.master.event_log if i[0] == "debug"] + assert debug == [ + ('debug', 'c done'), + ('debug', 'b done'), + ('debug', 'x start'), + ('debug', 'x configure'), + ] diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py index f7124a91..613a290c 100644 --- a/test/mitmproxy/addons/test_serverplayback.py +++ b/test/mitmproxy/addons/test_serverplayback.py @@ -1,103 +1,137 @@ +import os +from mitmproxy.test import tutils from mitmproxy.test import tflow - -from .. import mastertest +from mitmproxy.test import taddons import mitmproxy.test.tutils from mitmproxy.addons import serverplayback from mitmproxy import options -from mitmproxy import proxy from mitmproxy import exceptions +from mitmproxy import io -class TestServerPlayback: - def test_server_playback(self): - sp = serverplayback.ServerPlayback() - sp.configure(options.Options(), []) - f = tflow.tflow(resp=True) - - assert not sp.flowmap - - sp.load([f]) - assert sp.flowmap - assert sp.next_flow(f) - assert not sp.flowmap - - def test_ignore_host(self): - sp = serverplayback.ServerPlayback() - sp.configure(options.Options(server_replay_ignore_host=True), []) - - r = tflow.tflow(resp=True) - r2 = tflow.tflow(resp=True) - - r.request.host = "address" - r2.request.host = "address" - assert sp._hash(r) == sp._hash(r2) - r2.request.host = "wrong_address" - assert sp._hash(r) == sp._hash(r2) +def tdump(path, flows): + w = io.FlowWriter(open(path, "wb")) + for i in flows: + w.add(i) - def test_ignore_content(self): - s = serverplayback.ServerPlayback() - s.configure(options.Options(server_replay_ignore_content=False), []) - - r = tflow.tflow(resp=True) - r2 = tflow.tflow(resp=True) - r.request.content = b"foo" - r2.request.content = b"foo" - assert s._hash(r) == s._hash(r2) - r2.request.content = b"bar" - assert not s._hash(r) == s._hash(r2) +def test_config(): + s = serverplayback.ServerPlayback() + with tutils.tmpdir() as p: + with taddons.context() as tctx: + fpath = os.path.join(p, "flows") + tdump(fpath, [tflow.tflow(resp=True)]) + tctx.configure(s, server_replay = [fpath]) + tutils.raises(exceptions.OptionsError, tctx.configure, s, server_replay = [p]) - s.configure(options.Options(server_replay_ignore_content=True), []) - r = tflow.tflow(resp=True) - r2 = tflow.tflow(resp=True) - r.request.content = b"foo" - r2.request.content = b"foo" - assert s._hash(r) == s._hash(r2) - r2.request.content = b"bar" - assert s._hash(r) == s._hash(r2) - r2.request.content = b"" - assert s._hash(r) == s._hash(r2) - r2.request.content = None - assert s._hash(r) == s._hash(r2) - def test_ignore_content_wins_over_params(self): - s = serverplayback.ServerPlayback() - s.configure( - options.Options( - server_replay_ignore_content=True, - server_replay_ignore_payload_params=[ - "param1", "param2" - ] - ), - [] +def test_tick(): + s = serverplayback.ServerPlayback() + with taddons.context() as tctx: + s.stop = True + s.final_flow = tflow.tflow() + s.final_flow.live = False + s.tick() + assert tctx.master.should_exit.is_set() + + +def test_server_playback(): + sp = serverplayback.ServerPlayback() + sp.configure(options.Options(), []) + f = tflow.tflow(resp=True) + + assert not sp.flowmap + + sp.load([f]) + assert sp.flowmap + assert sp.next_flow(f) + assert not sp.flowmap + + sp.load([f]) + assert sp.flowmap + sp.clear() + assert not sp.flowmap + + +def test_ignore_host(): + sp = serverplayback.ServerPlayback() + sp.configure(options.Options(server_replay_ignore_host=True), []) + + r = tflow.tflow(resp=True) + r2 = tflow.tflow(resp=True) + + r.request.host = "address" + r2.request.host = "address" + assert sp._hash(r) == sp._hash(r2) + r2.request.host = "wrong_address" + assert sp._hash(r) == sp._hash(r2) + + +def test_ignore_content(): + s = serverplayback.ServerPlayback() + s.configure(options.Options(server_replay_ignore_content=False), []) + + r = tflow.tflow(resp=True) + r2 = tflow.tflow(resp=True) + + r.request.content = b"foo" + r2.request.content = b"foo" + assert s._hash(r) == s._hash(r2) + r2.request.content = b"bar" + assert not s._hash(r) == s._hash(r2) + + s.configure(options.Options(server_replay_ignore_content=True), []) + r = tflow.tflow(resp=True) + r2 = tflow.tflow(resp=True) + r.request.content = b"foo" + r2.request.content = b"foo" + assert s._hash(r) == s._hash(r2) + r2.request.content = b"bar" + assert s._hash(r) == s._hash(r2) + r2.request.content = b"" + assert s._hash(r) == s._hash(r2) + r2.request.content = None + assert s._hash(r) == s._hash(r2) + + +def test_ignore_content_wins_over_params(): + s = serverplayback.ServerPlayback() + s.configure( + options.Options( + server_replay_ignore_content=True, + server_replay_ignore_payload_params=[ + "param1", "param2" + ] + ), + [] + ) + # NOTE: parameters are mutually exclusive in options + + r = tflow.tflow(resp=True) + r.request.headers["Content-Type"] = "application/x-www-form-urlencoded" + r.request.content = b"paramx=y" + + r2 = tflow.tflow(resp=True) + r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded" + r2.request.content = b"paramx=x" + + # same parameters + assert s._hash(r) == s._hash(r2) + + +def test_ignore_payload_params_other_content_type(): + s = serverplayback.ServerPlayback() + with taddons.context() as tctx: + tctx.configure( + s, + server_replay_ignore_content=False, + server_replay_ignore_payload_params=[ + "param1", "param2" + ] ) - # NOTE: parameters are mutually exclusive in options r = tflow.tflow(resp=True) - r.request.headers["Content-Type"] = "application/x-www-form-urlencoded" - r.request.content = b"paramx=y" - - r2 = tflow.tflow(resp=True) - r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded" - r2.request.content = b"paramx=x" - - # same parameters - assert s._hash(r) == s._hash(r2) - - def test_ignore_payload_params_other_content_type(self): - s = serverplayback.ServerPlayback() - s.configure( - options.Options( - server_replay_ignore_content=False, - server_replay_ignore_payload_params=[ - "param1", "param2" - ] - ), - [] - - ) - r = tflow.tflow(resp=True) r.request.headers["Content-Type"] = "application/json" r.request.content = b'{"param1":"1"}' r2 = tflow.tflow(resp=True) @@ -109,155 +143,165 @@ class TestServerPlayback: r2.request.content = b'{"param1":"2"}' assert not s._hash(r) == s._hash(r2) - def test_hash(self): - s = serverplayback.ServerPlayback() - s.configure(options.Options(), []) - r = tflow.tflow() - r2 = tflow.tflow() +def test_hash(): + s = serverplayback.ServerPlayback() + s.configure(options.Options(), []) - assert s._hash(r) - assert s._hash(r) == s._hash(r2) - r.request.headers["foo"] = "bar" - assert s._hash(r) == s._hash(r2) - r.request.path = "voing" - assert s._hash(r) != s._hash(r2) + r = tflow.tflow() + r2 = tflow.tflow() - r.request.path = "path?blank_value" - r2.request.path = "path?" - assert s._hash(r) != s._hash(r2) + assert s._hash(r) + assert s._hash(r) == s._hash(r2) + r.request.headers["foo"] = "bar" + assert s._hash(r) == s._hash(r2) + r.request.path = "voing" + assert s._hash(r) != s._hash(r2) - def test_headers(self): - s = serverplayback.ServerPlayback() - s.configure(options.Options(server_replay_use_headers=["foo"]), []) + r.request.path = "path?blank_value" + r2.request.path = "path?" + assert s._hash(r) != s._hash(r2) - r = tflow.tflow(resp=True) - r.request.headers["foo"] = "bar" - r2 = tflow.tflow(resp=True) - assert not s._hash(r) == s._hash(r2) - r2.request.headers["foo"] = "bar" - assert s._hash(r) == s._hash(r2) - r2.request.headers["oink"] = "bar" - assert s._hash(r) == s._hash(r2) - - r = tflow.tflow(resp=True) - r2 = tflow.tflow(resp=True) - assert s._hash(r) == s._hash(r2) - def test_load(self): - s = serverplayback.ServerPlayback() - s.configure(options.Options(), []) - - r = tflow.tflow(resp=True) - r.request.headers["key"] = "one" - - r2 = tflow.tflow(resp=True) - r2.request.headers["key"] = "two" - - s.load([r, r2]) - - assert s.count() == 2 - - n = s.next_flow(r) - assert n.request.headers["key"] == "one" - assert s.count() == 1 - - n = s.next_flow(r) - assert n.request.headers["key"] == "two" - assert not s.flowmap - assert s.count() == 0 - - assert not s.next_flow(r) - - def test_load_with_server_replay_nopop(self): - s = serverplayback.ServerPlayback() - s.configure(options.Options(server_replay_nopop=True), []) - - r = tflow.tflow(resp=True) - r.request.headers["key"] = "one" - - r2 = tflow.tflow(resp=True) - r2.request.headers["key"] = "two" - - s.load([r, r2]) - - assert s.count() == 2 - s.next_flow(r) - assert s.count() == 2 - - def test_ignore_params(self): - s = serverplayback.ServerPlayback() - s.configure( - options.Options( - server_replay_ignore_params=["param1", "param2"] - ), - [] - ) - - r = tflow.tflow(resp=True) - r.request.path = "/test?param1=1" - r2 = tflow.tflow(resp=True) - r2.request.path = "/test" - assert s._hash(r) == s._hash(r2) - r2.request.path = "/test?param1=2" - assert s._hash(r) == s._hash(r2) - r2.request.path = "/test?param2=1" - assert s._hash(r) == s._hash(r2) - r2.request.path = "/test?param3=2" - assert not s._hash(r) == s._hash(r2) - - def test_ignore_payload_params(self): - s = serverplayback.ServerPlayback() - s.configure( - options.Options( - server_replay_ignore_payload_params=["param1", "param2"] - ), - [] +def test_headers(): + s = serverplayback.ServerPlayback() + s.configure(options.Options(server_replay_use_headers=["foo"]), []) + + r = tflow.tflow(resp=True) + r.request.headers["foo"] = "bar" + r2 = tflow.tflow(resp=True) + assert not s._hash(r) == s._hash(r2) + r2.request.headers["foo"] = "bar" + assert s._hash(r) == s._hash(r2) + r2.request.headers["oink"] = "bar" + assert s._hash(r) == s._hash(r2) + + r = tflow.tflow(resp=True) + r2 = tflow.tflow(resp=True) + assert s._hash(r) == s._hash(r2) + + +def test_load(): + s = serverplayback.ServerPlayback() + s.configure(options.Options(), []) + + r = tflow.tflow(resp=True) + r.request.headers["key"] = "one" + + r2 = tflow.tflow(resp=True) + r2.request.headers["key"] = "two" + + s.load([r, r2]) + + assert s.count() == 2 + + n = s.next_flow(r) + assert n.request.headers["key"] == "one" + assert s.count() == 1 + + n = s.next_flow(r) + assert n.request.headers["key"] == "two" + assert not s.flowmap + assert s.count() == 0 + + assert not s.next_flow(r) + + +def test_load_with_server_replay_nopop(): + s = serverplayback.ServerPlayback() + s.configure(options.Options(server_replay_nopop=True), []) + + r = tflow.tflow(resp=True) + r.request.headers["key"] = "one" + + r2 = tflow.tflow(resp=True) + r2.request.headers["key"] = "two" + + s.load([r, r2]) + + assert s.count() == 2 + s.next_flow(r) + assert s.count() == 2 + + +def test_ignore_params(): + s = serverplayback.ServerPlayback() + s.configure( + options.Options( + server_replay_ignore_params=["param1", "param2"] + ), + [] + ) + + r = tflow.tflow(resp=True) + r.request.path = "/test?param1=1" + r2 = tflow.tflow(resp=True) + r2.request.path = "/test" + assert s._hash(r) == s._hash(r2) + r2.request.path = "/test?param1=2" + assert s._hash(r) == s._hash(r2) + r2.request.path = "/test?param2=1" + assert s._hash(r) == s._hash(r2) + r2.request.path = "/test?param3=2" + assert not s._hash(r) == s._hash(r2) + + +def test_ignore_payload_params(): + s = serverplayback.ServerPlayback() + s.configure( + options.Options( + server_replay_ignore_payload_params=["param1", "param2"] + ), + [] + ) + + r = tflow.tflow(resp=True) + r.request.headers["Content-Type"] = "application/x-www-form-urlencoded" + r.request.content = b"paramx=x¶m1=1" + r2 = tflow.tflow(resp=True) + r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded" + r2.request.content = b"paramx=x¶m1=1" + # same parameters + assert s._hash(r) == s._hash(r2) + # ignored parameters != + r2.request.content = b"paramx=x¶m1=2" + assert s._hash(r) == s._hash(r2) + # missing parameter + r2.request.content = b"paramx=x" + assert s._hash(r) == s._hash(r2) + # ignorable parameter added + r2.request.content = b"paramx=x¶m1=2" + assert s._hash(r) == s._hash(r2) + # not ignorable parameter changed + r2.request.content = b"paramx=y¶m1=1" + assert not s._hash(r) == s._hash(r2) + # not ignorable parameter missing + r2.request.content = b"param1=1" + assert not s._hash(r) == s._hash(r2) + + +def test_server_playback_full(): + s = serverplayback.ServerPlayback() + with taddons.context() as tctx: + tctx.configure( + s, + refresh_server_playback = True, + keepserving=False ) - r = tflow.tflow(resp=True) - r.request.headers["Content-Type"] = "application/x-www-form-urlencoded" - r.request.content = b"paramx=x¶m1=1" - r2 = tflow.tflow(resp=True) - r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded" - r2.request.content = b"paramx=x¶m1=1" - # same parameters - assert s._hash(r) == s._hash(r2) - # ignored parameters != - r2.request.content = b"paramx=x¶m1=2" - assert s._hash(r) == s._hash(r2) - # missing parameter - r2.request.content = b"paramx=x" - assert s._hash(r) == s._hash(r2) - # ignorable parameter added - r2.request.content = b"paramx=x¶m1=2" - assert s._hash(r) == s._hash(r2) - # not ignorable parameter changed - r2.request.content = b"paramx=y¶m1=1" - assert not s._hash(r) == s._hash(r2) - # not ignorable parameter missing - r2.request.content = b"param1=1" - assert not s._hash(r) == s._hash(r2) - - def test_server_playback_full(self): - s = serverplayback.ServerPlayback() - o = options.Options(refresh_server_playback = True, keepserving=False) - m = mastertest.RecordingMaster(o, proxy.DummyServer()) - m.addons.add(s) - f = tflow.tflow() f.response = mitmproxy.test.tutils.tresp(content=f.request.content) s.load([f, f]) tf = tflow.tflow() assert not tf.response - m.request(tf) + s.request(tf) assert tf.response == f.response tf = tflow.tflow() tf.request.content = b"gibble" assert not tf.response - m.request(tf) + s.request(tf) assert not tf.response assert not s.stop @@ -265,14 +309,18 @@ class TestServerPlayback: assert not s.stop tf = tflow.tflow() - m.request(tflow.tflow()) + s.request(tflow.tflow()) assert s.stop - def test_server_playback_kill(self): - s = serverplayback.ServerPlayback() - o = options.Options(refresh_server_playback = True, replay_kill_extra=True) - m = mastertest.RecordingMaster(o, proxy.DummyServer()) - m.addons.add(s) + +def test_server_playback_kill(): + s = serverplayback.ServerPlayback() + with taddons.context() as tctx: + tctx.configure( + s, + refresh_server_playback = True, + replay_kill_extra=True + ) f = tflow.tflow() f.response = mitmproxy.test.tutils.tresp(content=f.request.content) @@ -280,5 +328,5 @@ class TestServerPlayback: f = tflow.tflow() f.request.host = "nonexistent" - m.request(f) + tctx.cycle(s, f) assert f.reply.value == exceptions.Kill diff --git a/test/mitmproxy/addons/test_stickyauth.py b/test/mitmproxy/addons/test_stickyauth.py index b02bf1f5..490e9aac 100644 --- a/test/mitmproxy/addons/test_stickyauth.py +++ b/test/mitmproxy/addons/test_stickyauth.py @@ -1,25 +1,27 @@ from mitmproxy.test import tflow +from mitmproxy.test import taddons +from mitmproxy.test import tutils -from .. import mastertest from mitmproxy.addons import stickyauth -from mitmproxy import master -from mitmproxy import options -from mitmproxy import proxy +from mitmproxy import exceptions -class TestStickyAuth(mastertest.MasterTest): - def test_simple(self): - o = options.Options(stickyauth = ".*") - m = master.Master(o, proxy.DummyServer()) - sa = stickyauth.StickyAuth() - m.addons.add(sa) +def test_configure(): + r = stickyauth.StickyAuth() + with taddons.context() as tctx: + tctx.configure(r, stickyauth="~s") + tutils.raises(exceptions.OptionsError, tctx.configure, r, stickyauth="~~") + +def test_simple(): + r = stickyauth.StickyAuth() + with taddons.context(): f = tflow.tflow(resp=True) f.request.headers["authorization"] = "foo" - m.request(f) + r.request(f) - assert "address" in sa.hosts + assert "address" in r.hosts f = tflow.tflow(resp=True) - m.request(f) + r.request(f) assert f.request.headers["authorization"] == "foo" |