From d9538637c3f235f10e38ff58879867e2aaadd529 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Thu, 3 Nov 2016 18:57:34 +1300 Subject: addons.script: convert to test.taddons --- test/mitmproxy/addons/test_script.py | 198 +++++++++++++++++------------------ 1 file changed, 98 insertions(+), 100 deletions(-) (limited to 'test') diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py index c72dac40..23e9f1f1 100644 --- a/test/mitmproxy/addons/test_script.py +++ b/test/mitmproxy/addons/test_script.py @@ -1,18 +1,17 @@ import traceback - import sys import time +import re from mitmproxy.test import tflow from mitmproxy.test import tutils -import re +from mitmproxy.test import taddons from mitmproxy import exceptions from mitmproxy import options from mitmproxy import proxy from mitmproxy.addons import script from mitmproxy import master -from .. import mastertest from .. import tutils as ttutils @@ -64,73 +63,68 @@ def test_load_script(): assert ns.start -class TestScript(mastertest.MasterTest): +class TestScript: def test_simple(self): - o = options.Options() - m = master.Master(o, proxy.DummyServer()) - sc = script.Script( - tutils.test_data.path( - "mitmproxy/data/addonscripts/recorder.py" + with taddons.context() as tctx: + sc = script.Script( + tutils.test_data.path( + "mitmproxy/data/addonscripts/recorder.py" + ) ) - ) - m.addons.add(sc) - assert sc.ns.call_log == [ - ("solo", "start", (), {}), - ("solo", "configure", (o, o.keys()), {}) - ] + sc.load_script() + assert sc.ns.call_log == [ + ("solo", "start", (), {}), + ] - sc.ns.call_log = [] - f = tflow.tflow(resp=True) - m.request(f) + sc.ns.call_log = [] + f = tflow.tflow(resp=True) + sc.request(f) - recf = sc.ns.call_log[0] - assert recf[1] == "request" + recf = sc.ns.call_log[0] + assert recf[1] == "request" def test_reload(self): - o = options.Options() - m = mastertest.RecordingMaster(o, proxy.DummyServer()) - with tutils.tmpdir(): - with open("foo.py", "w"): - pass - sc = script.Script("foo.py") - m.addons.add(sc) - - for _ in range(100): - with open("foo.py", "a") as f: - f.write(".") - m.addons.invoke_with_context(sc, "tick") - time.sleep(0.1) - if m.event_log: - return - raise AssertionError("Change event not detected.") + with taddons.context() as tctx: + with tutils.tmpdir(): + with open("foo.py", "w"): + pass + sc = script.Script("foo.py") + tctx.configure(sc) + for _ in range(100): + with open("foo.py", "a") as f: + f.write(".") + sc.tick() + time.sleep(0.1) + if tctx.master.event_log: + return + raise AssertionError("Change event not detected.") def test_exception(self): - o = options.Options() - m = mastertest.RecordingMaster(o, proxy.DummyServer()) - sc = script.Script( - tutils.test_data.path("mitmproxy/data/addonscripts/error.py") - ) - m.addons.add(sc) - f = tflow.tflow(resp=True) - m.request(f) - assert m.event_log[0][0] == "error" - assert len(m.event_log[0][1].splitlines()) == 6 - assert re.search('addonscripts/error.py", line \d+, in request', m.event_log[0][1]) - assert re.search('addonscripts/error.py", line \d+, in mkerr', m.event_log[0][1]) - assert m.event_log[0][1].endswith("ValueError: Error!\n") + with taddons.context() as tctx: + sc = script.Script( + tutils.test_data.path("mitmproxy/data/addonscripts/error.py") + ) + sc.start() + f = tflow.tflow(resp=True) + sc.request(f) + assert tctx.master.event_log[0][0] == "error" + assert len(tctx.master.event_log[0][1].splitlines()) == 6 + assert re.search('addonscripts/error.py", line \d+, in request', tctx.master.event_log[0][1]) + assert re.search('addonscripts/error.py", line \d+, in mkerr', tctx.master.event_log[0][1]) + assert tctx.master.event_log[0][1].endswith("ValueError: Error!\n") def test_addon(self): - o = options.Options() - m = master.Master(o, proxy.DummyServer()) - sc = script.Script( - tutils.test_data.path( - "mitmproxy/data/addonscripts/addon.py" + with taddons.context() as tctx: + sc = script.Script( + tutils.test_data.path( + "mitmproxy/data/addonscripts/addon.py" + ) ) - ) - m.addons.add(sc) - assert sc.ns.event_log == [ - 'scriptstart', 'addonstart', 'addonconfigure' - ] + sc.start() + tctx.configure(sc) + assert sc.ns.event_log == [ + 'scriptstart', 'addonstart', 'addonconfigure' + ] class TestCutTraceback: @@ -151,7 +145,7 @@ class TestCutTraceback: assert len(traceback.extract_tb(tb_cut2)) == len(traceback.extract_tb(tb)) -class TestScriptLoader(mastertest.MasterTest): +class TestScriptLoader: def test_run_once(self): o = options.Options(scripts=[]) m = master.Master(o, proxy.DummyServer()) @@ -199,44 +193,48 @@ class TestScriptLoader(mastertest.MasterTest): def test_order(self): rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder.py") - - o = options.Options( - scripts = [ - "%s %s" % (rec, "a"), - "%s %s" % (rec, "b"), - "%s %s" % (rec, "c"), - ] - ) - m = mastertest.RecordingMaster(o, proxy.DummyServer()) sc = script.ScriptLoader() - m.addons.add(sc) - - debug = [(i[0], i[1]) for i in m.event_log if i[0] == "debug"] - assert debug == [ - ('debug', 'a start'), ('debug', 'a configure'), - ('debug', 'b start'), ('debug', 'b configure'), - ('debug', 'c start'), ('debug', 'c configure') - ] - m.event_log[:] = [] - - o.scripts = [ - "%s %s" % (rec, "c"), - "%s %s" % (rec, "a"), - "%s %s" % (rec, "b"), - ] - debug = [(i[0], i[1]) for i in m.event_log if i[0] == "debug"] - # No events, only order has changed - assert debug == [] - - o.scripts = [ - "%s %s" % (rec, "x"), - "%s %s" % (rec, "a"), - ] - debug = [(i[0], i[1]) for i in m.event_log if i[0] == "debug"] - assert debug == [ - ('debug', 'c done'), - ('debug', 'b done'), - ('debug', 'x start'), - ('debug', 'x configure'), - ('debug', 'a configure'), - ] + with taddons.context() as tctx: + tctx.master.addons.add(sc) + tctx.configure( + sc, + scripts = [ + "%s %s" % (rec, "a"), + "%s %s" % (rec, "b"), + "%s %s" % (rec, "c"), + ] + ) + debug = [(i[0], i[1]) for i in tctx.master.event_log if i[0] == "debug"] + assert debug == [ + ('debug', 'a start'), ('debug', 'a configure'), + ('debug', 'b start'), ('debug', 'b configure'), + ('debug', 'c start'), ('debug', 'c configure') + ] + tctx.master.event_log = [] + tctx.configure( + sc, + scripts = [ + "%s %s" % (rec, "c"), + "%s %s" % (rec, "a"), + "%s %s" % (rec, "b"), + ] + ) + debug = [(i[0], i[1]) for i in tctx.master.event_log if i[0] == "debug"] + # No events, only order has changed + assert debug == [] + + tctx.master.event_log = [] + tctx.configure( + sc, + scripts = [ + "%s %s" % (rec, "x"), + "%s %s" % (rec, "a"), + ] + ) + debug = [(i[0], i[1]) for i in tctx.master.event_log if i[0] == "debug"] + assert debug == [ + ('debug', 'c done'), + ('debug', 'b done'), + ('debug', 'x start'), + ('debug', 'x configure'), + ] -- cgit v1.2.3 From e9a96f4d7f087bb9f01a48224a0b0090635c91f1 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Thu, 3 Nov 2016 19:36:34 +1300 Subject: addons.script: 100% test coverage --- test/mitmproxy/addons/test_script.py | 51 ++++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) (limited to 'test') diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py index 23e9f1f1..c31f4e9b 100644 --- a/test/mitmproxy/addons/test_script.py +++ b/test/mitmproxy/addons/test_script.py @@ -9,12 +9,59 @@ from mitmproxy.test import taddons from mitmproxy import exceptions from mitmproxy import options from mitmproxy import proxy -from mitmproxy.addons import script from mitmproxy import master +from mitmproxy.addons import script + +import watchdog.events + from .. import tutils as ttutils +def test_ns(): + n = script.NS({}) + n.one = "one" + assert n.one == "one" + assert n.__dict__["ns"]["one"] == "one" + + +def test_scriptenv(): + with taddons.context() as tctx: + with script.scriptenv("path", []): + raise SystemExit + assert tctx.master.event_log[0][0] == "error" + assert "exited" in tctx.master.event_log[0][1] + + tctx.master.clear() + with script.scriptenv("path", []): + raise ValueError("fooo") + assert tctx.master.event_log[0][0] == "error" + assert "foo" in tctx.master.event_log[0][1] + + +class Called: + def __init__(self): + self.called = False + + def __call__(self, *args, **kwargs): + self.called = True + + +def test_reloadhandler(): + rh = script.ReloadHandler(Called()) + assert not rh.filter(watchdog.events.DirCreatedEvent("path")) + assert not rh.filter(watchdog.events.FileModifiedEvent("/foo/.bar")) + assert rh.filter(watchdog.events.FileModifiedEvent("/foo/bar")) + + assert not rh.callback.called + rh.on_modified(watchdog.events.FileModifiedEvent("/foo/bar")) + assert rh.callback.called + rh.callback.called = False + + rh.on_created(watchdog.events.FileCreatedEvent("foo")) + assert rh.callback.called + + class TestParseCommand: def test_empty_command(self): with tutils.raises(exceptions.AddonError): @@ -65,7 +112,7 @@ def test_load_script(): class TestScript: def test_simple(self): - with taddons.context() as tctx: + with taddons.context(): sc = script.Script( tutils.test_data.path( "mitmproxy/data/addonscripts/recorder.py" -- cgit v1.2.3 From e1fc80937d9d2a616b0786e1be5d51dbf9f7a596 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Thu, 3 Nov 2016 19:43:33 +1300 Subject: addons.serverplayback: test suite to taddons --- test/mitmproxy/addons/test_serverplayback.py | 119 ++++++++++++++------------- 1 file changed, 61 insertions(+), 58 deletions(-) (limited to 'test') diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py index f7124a91..d394168f 100644 --- a/test/mitmproxy/addons/test_serverplayback.py +++ b/test/mitmproxy/addons/test_serverplayback.py @@ -1,11 +1,9 @@ from mitmproxy.test import tflow - -from .. import mastertest +from mitmproxy.test import taddons import mitmproxy.test.tutils from mitmproxy.addons import serverplayback from mitmproxy import options -from mitmproxy import proxy from mitmproxy import exceptions @@ -87,27 +85,26 @@ class TestServerPlayback: def test_ignore_payload_params_other_content_type(self): s = serverplayback.ServerPlayback() - s.configure( - options.Options( + with taddons.context() as tctx: + tctx.configure( + s, server_replay_ignore_content=False, server_replay_ignore_payload_params=[ "param1", "param2" ] - ), - [] - - ) - r = tflow.tflow(resp=True) - r.request.headers["Content-Type"] = "application/json" - r.request.content = b'{"param1":"1"}' - r2 = tflow.tflow(resp=True) - r2.request.headers["Content-Type"] = "application/json" - r2.request.content = b'{"param1":"1"}' - # same content - assert s._hash(r) == s._hash(r2) - # distint content (note only x-www-form-urlencoded payload is analysed) - r2.request.content = b'{"param1":"2"}' - assert not s._hash(r) == s._hash(r2) + ) + + r = tflow.tflow(resp=True) + r.request.headers["Content-Type"] = "application/json" + r.request.content = b'{"param1":"1"}' + r2 = tflow.tflow(resp=True) + r2.request.headers["Content-Type"] = "application/json" + r2.request.content = b'{"param1":"1"}' + # same content + assert s._hash(r) == s._hash(r2) + # distint content (note only x-www-form-urlencoded payload is analysed) + r2.request.content = b'{"param1":"2"}' + assert not s._hash(r) == s._hash(r2) def test_hash(self): s = serverplayback.ServerPlayback() @@ -241,44 +238,50 @@ class TestServerPlayback: def test_server_playback_full(self): s = serverplayback.ServerPlayback() - o = options.Options(refresh_server_playback = True, keepserving=False) - m = mastertest.RecordingMaster(o, proxy.DummyServer()) - m.addons.add(s) - - f = tflow.tflow() - f.response = mitmproxy.test.tutils.tresp(content=f.request.content) - s.load([f, f]) - - tf = tflow.tflow() - assert not tf.response - m.request(tf) - assert tf.response == f.response - - tf = tflow.tflow() - tf.request.content = b"gibble" - assert not tf.response - m.request(tf) - assert not tf.response - - assert not s.stop - s.tick() - assert not s.stop - - tf = tflow.tflow() - m.request(tflow.tflow()) - assert s.stop + with taddons.context() as tctx: + tctx.configure( + s, + refresh_server_playback = True, + keepserving=False + ) + + f = tflow.tflow() + f.response = mitmproxy.test.tutils.tresp(content=f.request.content) + s.load([f, f]) + + tf = tflow.tflow() + assert not tf.response + s.request(tf) + assert tf.response == f.response + + tf = tflow.tflow() + tf.request.content = b"gibble" + assert not tf.response + s.request(tf) + assert not tf.response + + assert not s.stop + s.tick() + assert not s.stop + + tf = tflow.tflow() + s.request(tflow.tflow()) + assert s.stop def test_server_playback_kill(self): s = serverplayback.ServerPlayback() - o = options.Options(refresh_server_playback = True, replay_kill_extra=True) - m = mastertest.RecordingMaster(o, proxy.DummyServer()) - m.addons.add(s) - - f = tflow.tflow() - f.response = mitmproxy.test.tutils.tresp(content=f.request.content) - s.load([f]) - - f = tflow.tflow() - f.request.host = "nonexistent" - m.request(f) - assert f.reply.value == exceptions.Kill + with taddons.context() as tctx: + tctx.configure( + s, + refresh_server_playback = True, + replay_kill_extra=True + ) + + f = tflow.tflow() + f.response = mitmproxy.test.tutils.tresp(content=f.request.content) + s.load([f]) + + f = tflow.tflow() + f.request.host = "nonexistent" + tctx.cycle(s, f) + assert f.reply.value == exceptions.Kill -- cgit v1.2.3 From d7d6edb3d132a67ff59a3024f6685ef9728dca96 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Thu, 3 Nov 2016 19:58:18 +1300 Subject: addons.serverplayback: 100% test coverage --- test/mitmproxy/addons/test_serverplayback.py | 561 +++++++++++++++------------ test/mitmproxy/addons/test_stickyauth.py | 28 +- 2 files changed, 318 insertions(+), 271 deletions(-) (limited to 'test') diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py index d394168f..613a290c 100644 --- a/test/mitmproxy/addons/test_serverplayback.py +++ b/test/mitmproxy/addons/test_serverplayback.py @@ -1,3 +1,5 @@ +import os +from mitmproxy.test import tutils from mitmproxy.test import tflow from mitmproxy.test import taddons @@ -5,283 +7,326 @@ import mitmproxy.test.tutils from mitmproxy.addons import serverplayback from mitmproxy import options from mitmproxy import exceptions +from mitmproxy import io -class TestServerPlayback: - def test_server_playback(self): - sp = serverplayback.ServerPlayback() - sp.configure(options.Options(), []) - f = tflow.tflow(resp=True) +def tdump(path, flows): + w = io.FlowWriter(open(path, "wb")) + for i in flows: + w.add(i) - assert not sp.flowmap - sp.load([f]) - assert sp.flowmap - assert sp.next_flow(f) - assert not sp.flowmap - - def test_ignore_host(self): - sp = serverplayback.ServerPlayback() - sp.configure(options.Options(server_replay_ignore_host=True), []) - - r = tflow.tflow(resp=True) - r2 = tflow.tflow(resp=True) - - r.request.host = "address" - r2.request.host = "address" - assert sp._hash(r) == sp._hash(r2) - r2.request.host = "wrong_address" - assert sp._hash(r) == sp._hash(r2) - - def test_ignore_content(self): - s = serverplayback.ServerPlayback() - s.configure(options.Options(server_replay_ignore_content=False), []) - - r = tflow.tflow(resp=True) - r2 = tflow.tflow(resp=True) - - r.request.content = b"foo" - r2.request.content = b"foo" - assert s._hash(r) == s._hash(r2) - r2.request.content = b"bar" - assert not s._hash(r) == s._hash(r2) - - s.configure(options.Options(server_replay_ignore_content=True), []) - r = tflow.tflow(resp=True) - r2 = tflow.tflow(resp=True) - r.request.content = b"foo" - r2.request.content = b"foo" - assert s._hash(r) == s._hash(r2) - r2.request.content = b"bar" - assert s._hash(r) == s._hash(r2) - r2.request.content = b"" - assert s._hash(r) == s._hash(r2) - r2.request.content = None - assert s._hash(r) == s._hash(r2) - - def test_ignore_content_wins_over_params(self): - s = serverplayback.ServerPlayback() - s.configure( - options.Options( - server_replay_ignore_content=True, - server_replay_ignore_payload_params=[ - "param1", "param2" - ] - ), - [] +def test_config(): + s = serverplayback.ServerPlayback() + with tutils.tmpdir() as p: + with taddons.context() as tctx: + fpath = os.path.join(p, "flows") + tdump(fpath, [tflow.tflow(resp=True)]) + tctx.configure(s, server_replay = [fpath]) + tutils.raises(exceptions.OptionsError, tctx.configure, s, server_replay = [p]) + + +def test_tick(): + s = serverplayback.ServerPlayback() + with taddons.context() as tctx: + s.stop = True + s.final_flow = tflow.tflow() + s.final_flow.live = False + s.tick() + assert tctx.master.should_exit.is_set() + + +def test_server_playback(): + sp = serverplayback.ServerPlayback() + sp.configure(options.Options(), []) + f = tflow.tflow(resp=True) + + assert not sp.flowmap + + sp.load([f]) + assert sp.flowmap + assert sp.next_flow(f) + assert not sp.flowmap + + sp.load([f]) + assert sp.flowmap + sp.clear() + assert not sp.flowmap + + +def test_ignore_host(): + sp = serverplayback.ServerPlayback() + sp.configure(options.Options(server_replay_ignore_host=True), []) + + r = tflow.tflow(resp=True) + r2 = tflow.tflow(resp=True) + + r.request.host = "address" + r2.request.host = "address" + assert sp._hash(r) == sp._hash(r2) + r2.request.host = "wrong_address" + assert sp._hash(r) == sp._hash(r2) + + +def test_ignore_content(): + s = serverplayback.ServerPlayback() + s.configure(options.Options(server_replay_ignore_content=False), []) + + r = tflow.tflow(resp=True) + r2 = tflow.tflow(resp=True) + + r.request.content = b"foo" + r2.request.content = b"foo" + assert s._hash(r) == s._hash(r2) + r2.request.content = b"bar" + assert not s._hash(r) == s._hash(r2) + + s.configure(options.Options(server_replay_ignore_content=True), []) + r = tflow.tflow(resp=True) + r2 = tflow.tflow(resp=True) + r.request.content = b"foo" + r2.request.content = b"foo" + assert s._hash(r) == s._hash(r2) + r2.request.content = b"bar" + assert s._hash(r) == s._hash(r2) + r2.request.content = b"" + assert s._hash(r) == s._hash(r2) + r2.request.content = None + assert s._hash(r) == s._hash(r2) + + +def test_ignore_content_wins_over_params(): + s = serverplayback.ServerPlayback() + s.configure( + options.Options( + server_replay_ignore_content=True, + server_replay_ignore_payload_params=[ + "param1", "param2" + ] + ), + [] + ) + # NOTE: parameters are mutually exclusive in options + + r = tflow.tflow(resp=True) + r.request.headers["Content-Type"] = "application/x-www-form-urlencoded" + r.request.content = b"paramx=y" + + r2 = tflow.tflow(resp=True) + r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded" + r2.request.content = b"paramx=x" + + # same parameters + assert s._hash(r) == s._hash(r2) + + +def test_ignore_payload_params_other_content_type(): + s = serverplayback.ServerPlayback() + with taddons.context() as tctx: + tctx.configure( + s, + server_replay_ignore_content=False, + server_replay_ignore_payload_params=[ + "param1", "param2" + ] ) - # NOTE: parameters are mutually exclusive in options r = tflow.tflow(resp=True) - r.request.headers["Content-Type"] = "application/x-www-form-urlencoded" - r.request.content = b"paramx=y" - + r.request.headers["Content-Type"] = "application/json" + r.request.content = b'{"param1":"1"}' r2 = tflow.tflow(resp=True) - r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded" - r2.request.content = b"paramx=x" - - # same parameters - assert s._hash(r) == s._hash(r2) - - def test_ignore_payload_params_other_content_type(self): - s = serverplayback.ServerPlayback() - with taddons.context() as tctx: - tctx.configure( - s, - server_replay_ignore_content=False, - server_replay_ignore_payload_params=[ - "param1", "param2" - ] - ) - - r = tflow.tflow(resp=True) - r.request.headers["Content-Type"] = "application/json" - r.request.content = b'{"param1":"1"}' - r2 = tflow.tflow(resp=True) - r2.request.headers["Content-Type"] = "application/json" - r2.request.content = b'{"param1":"1"}' - # same content - assert s._hash(r) == s._hash(r2) - # distint content (note only x-www-form-urlencoded payload is analysed) - r2.request.content = b'{"param1":"2"}' - assert not s._hash(r) == s._hash(r2) - - def test_hash(self): - s = serverplayback.ServerPlayback() - s.configure(options.Options(), []) - - r = tflow.tflow() - r2 = tflow.tflow() - - assert s._hash(r) - assert s._hash(r) == s._hash(r2) - r.request.headers["foo"] = "bar" + r2.request.headers["Content-Type"] = "application/json" + r2.request.content = b'{"param1":"1"}' + # same content assert s._hash(r) == s._hash(r2) - r.request.path = "voing" - assert s._hash(r) != s._hash(r2) - - r.request.path = "path?blank_value" - r2.request.path = "path?" - assert s._hash(r) != s._hash(r2) - - def test_headers(self): - s = serverplayback.ServerPlayback() - s.configure(options.Options(server_replay_use_headers=["foo"]), []) - - r = tflow.tflow(resp=True) - r.request.headers["foo"] = "bar" - r2 = tflow.tflow(resp=True) + # distint content (note only x-www-form-urlencoded payload is analysed) + r2.request.content = b'{"param1":"2"}' assert not s._hash(r) == s._hash(r2) - r2.request.headers["foo"] = "bar" - assert s._hash(r) == s._hash(r2) - r2.request.headers["oink"] = "bar" - assert s._hash(r) == s._hash(r2) - - r = tflow.tflow(resp=True) - r2 = tflow.tflow(resp=True) - assert s._hash(r) == s._hash(r2) - def test_load(self): - s = serverplayback.ServerPlayback() - s.configure(options.Options(), []) - - r = tflow.tflow(resp=True) - r.request.headers["key"] = "one" - - r2 = tflow.tflow(resp=True) - r2.request.headers["key"] = "two" - s.load([r, r2]) +def test_hash(): + s = serverplayback.ServerPlayback() + s.configure(options.Options(), []) - assert s.count() == 2 + r = tflow.tflow() + r2 = tflow.tflow() - n = s.next_flow(r) - assert n.request.headers["key"] == "one" - assert s.count() == 1 + assert s._hash(r) + assert s._hash(r) == s._hash(r2) + r.request.headers["foo"] = "bar" + assert s._hash(r) == s._hash(r2) + r.request.path = "voing" + assert s._hash(r) != s._hash(r2) - n = s.next_flow(r) - assert n.request.headers["key"] == "two" - assert not s.flowmap - assert s.count() == 0 + r.request.path = "path?blank_value" + r2.request.path = "path?" + assert s._hash(r) != s._hash(r2) - assert not s.next_flow(r) - def test_load_with_server_replay_nopop(self): - s = serverplayback.ServerPlayback() - s.configure(options.Options(server_replay_nopop=True), []) - - r = tflow.tflow(resp=True) - r.request.headers["key"] = "one" - - r2 = tflow.tflow(resp=True) - r2.request.headers["key"] = "two" - - s.load([r, r2]) - - assert s.count() == 2 - s.next_flow(r) - assert s.count() == 2 - - def test_ignore_params(self): - s = serverplayback.ServerPlayback() - s.configure( - options.Options( - server_replay_ignore_params=["param1", "param2"] - ), - [] +def test_headers(): + s = serverplayback.ServerPlayback() + s.configure(options.Options(server_replay_use_headers=["foo"]), []) + + r = tflow.tflow(resp=True) + r.request.headers["foo"] = "bar" + r2 = tflow.tflow(resp=True) + assert not s._hash(r) == s._hash(r2) + r2.request.headers["foo"] = "bar" + assert s._hash(r) == s._hash(r2) + r2.request.headers["oink"] = "bar" + assert s._hash(r) == s._hash(r2) + + r = tflow.tflow(resp=True) + r2 = tflow.tflow(resp=True) + assert s._hash(r) == s._hash(r2) + + +def test_load(): + s = serverplayback.ServerPlayback() + s.configure(options.Options(), []) + + r = tflow.tflow(resp=True) + r.request.headers["key"] = "one" + + r2 = tflow.tflow(resp=True) + r2.request.headers["key"] = "two" + + s.load([r, r2]) + + assert s.count() == 2 + + n = s.next_flow(r) + assert n.request.headers["key"] == "one" + assert s.count() == 1 + + n = s.next_flow(r) + assert n.request.headers["key"] == "two" + assert not s.flowmap + assert s.count() == 0 + + assert not s.next_flow(r) + + +def test_load_with_server_replay_nopop(): + s = serverplayback.ServerPlayback() + s.configure(options.Options(server_replay_nopop=True), []) + + r = tflow.tflow(resp=True) + r.request.headers["key"] = "one" + + r2 = tflow.tflow(resp=True) + r2.request.headers["key"] = "two" + + s.load([r, r2]) + + assert s.count() == 2 + s.next_flow(r) + assert s.count() == 2 + + +def test_ignore_params(): + s = serverplayback.ServerPlayback() + s.configure( + options.Options( + server_replay_ignore_params=["param1", "param2"] + ), + [] + ) + + r = tflow.tflow(resp=True) + r.request.path = "/test?param1=1" + r2 = tflow.tflow(resp=True) + r2.request.path = "/test" + assert s._hash(r) == s._hash(r2) + r2.request.path = "/test?param1=2" + assert s._hash(r) == s._hash(r2) + r2.request.path = "/test?param2=1" + assert s._hash(r) == s._hash(r2) + r2.request.path = "/test?param3=2" + assert not s._hash(r) == s._hash(r2) + + +def test_ignore_payload_params(): + s = serverplayback.ServerPlayback() + s.configure( + options.Options( + server_replay_ignore_payload_params=["param1", "param2"] + ), + [] + ) + + r = tflow.tflow(resp=True) + r.request.headers["Content-Type"] = "application/x-www-form-urlencoded" + r.request.content = b"paramx=x¶m1=1" + r2 = tflow.tflow(resp=True) + r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded" + r2.request.content = b"paramx=x¶m1=1" + # same parameters + assert s._hash(r) == s._hash(r2) + # ignored parameters != + r2.request.content = b"paramx=x¶m1=2" + assert s._hash(r) == s._hash(r2) + # missing parameter + r2.request.content = b"paramx=x" + assert s._hash(r) == s._hash(r2) + # ignorable parameter added + r2.request.content = b"paramx=x¶m1=2" + assert s._hash(r) == s._hash(r2) + # not ignorable parameter changed + r2.request.content = b"paramx=y¶m1=1" + assert not s._hash(r) == s._hash(r2) + # not ignorable parameter missing + r2.request.content = b"param1=1" + assert not s._hash(r) == s._hash(r2) + + +def test_server_playback_full(): + s = serverplayback.ServerPlayback() + with taddons.context() as tctx: + tctx.configure( + s, + refresh_server_playback = True, + keepserving=False ) - r = tflow.tflow(resp=True) - r.request.path = "/test?param1=1" - r2 = tflow.tflow(resp=True) - r2.request.path = "/test" - assert s._hash(r) == s._hash(r2) - r2.request.path = "/test?param1=2" - assert s._hash(r) == s._hash(r2) - r2.request.path = "/test?param2=1" - assert s._hash(r) == s._hash(r2) - r2.request.path = "/test?param3=2" - assert not s._hash(r) == s._hash(r2) - - def test_ignore_payload_params(self): - s = serverplayback.ServerPlayback() - s.configure( - options.Options( - server_replay_ignore_payload_params=["param1", "param2"] - ), - [] + f = tflow.tflow() + f.response = mitmproxy.test.tutils.tresp(content=f.request.content) + s.load([f, f]) + + tf = tflow.tflow() + assert not tf.response + s.request(tf) + assert tf.response == f.response + + tf = tflow.tflow() + tf.request.content = b"gibble" + assert not tf.response + s.request(tf) + assert not tf.response + + assert not s.stop + s.tick() + assert not s.stop + + tf = tflow.tflow() + s.request(tflow.tflow()) + assert s.stop + + +def test_server_playback_kill(): + s = serverplayback.ServerPlayback() + with taddons.context() as tctx: + tctx.configure( + s, + refresh_server_playback = True, + replay_kill_extra=True ) - r = tflow.tflow(resp=True) - r.request.headers["Content-Type"] = "application/x-www-form-urlencoded" - r.request.content = b"paramx=x¶m1=1" - r2 = tflow.tflow(resp=True) - r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded" - r2.request.content = b"paramx=x¶m1=1" - # same parameters - assert s._hash(r) == s._hash(r2) - # ignored parameters != - r2.request.content = b"paramx=x¶m1=2" - assert s._hash(r) == s._hash(r2) - # missing parameter - r2.request.content = b"paramx=x" - assert s._hash(r) == s._hash(r2) - # ignorable parameter added - r2.request.content = b"paramx=x¶m1=2" - assert s._hash(r) == s._hash(r2) - # not ignorable parameter changed - r2.request.content = b"paramx=y¶m1=1" - assert not s._hash(r) == s._hash(r2) - # not ignorable parameter missing - r2.request.content = b"param1=1" - assert not s._hash(r) == s._hash(r2) + f = tflow.tflow() + f.response = mitmproxy.test.tutils.tresp(content=f.request.content) + s.load([f]) - def test_server_playback_full(self): - s = serverplayback.ServerPlayback() - with taddons.context() as tctx: - tctx.configure( - s, - refresh_server_playback = True, - keepserving=False - ) - - f = tflow.tflow() - f.response = mitmproxy.test.tutils.tresp(content=f.request.content) - s.load([f, f]) - - tf = tflow.tflow() - assert not tf.response - s.request(tf) - assert tf.response == f.response - - tf = tflow.tflow() - tf.request.content = b"gibble" - assert not tf.response - s.request(tf) - assert not tf.response - - assert not s.stop - s.tick() - assert not s.stop - - tf = tflow.tflow() - s.request(tflow.tflow()) - assert s.stop - - def test_server_playback_kill(self): - s = serverplayback.ServerPlayback() - with taddons.context() as tctx: - tctx.configure( - s, - refresh_server_playback = True, - replay_kill_extra=True - ) - - f = tflow.tflow() - f.response = mitmproxy.test.tutils.tresp(content=f.request.content) - s.load([f]) - - f = tflow.tflow() - f.request.host = "nonexistent" - tctx.cycle(s, f) - assert f.reply.value == exceptions.Kill + f = tflow.tflow() + f.request.host = "nonexistent" + tctx.cycle(s, f) + assert f.reply.value == exceptions.Kill diff --git a/test/mitmproxy/addons/test_stickyauth.py b/test/mitmproxy/addons/test_stickyauth.py index b02bf1f5..490e9aac 100644 --- a/test/mitmproxy/addons/test_stickyauth.py +++ b/test/mitmproxy/addons/test_stickyauth.py @@ -1,25 +1,27 @@ from mitmproxy.test import tflow +from mitmproxy.test import taddons +from mitmproxy.test import tutils -from .. import mastertest from mitmproxy.addons import stickyauth -from mitmproxy import master -from mitmproxy import options -from mitmproxy import proxy +from mitmproxy import exceptions -class TestStickyAuth(mastertest.MasterTest): - def test_simple(self): - o = options.Options(stickyauth = ".*") - m = master.Master(o, proxy.DummyServer()) - sa = stickyauth.StickyAuth() - m.addons.add(sa) +def test_configure(): + r = stickyauth.StickyAuth() + with taddons.context() as tctx: + tctx.configure(r, stickyauth="~s") + tutils.raises(exceptions.OptionsError, tctx.configure, r, stickyauth="~~") + +def test_simple(): + r = stickyauth.StickyAuth() + with taddons.context(): f = tflow.tflow(resp=True) f.request.headers["authorization"] = "foo" - m.request(f) + r.request(f) - assert "address" in sa.hosts + assert "address" in r.hosts f = tflow.tflow(resp=True) - m.request(f) + r.request(f) assert f.request.headers["authorization"] == "foo" -- cgit v1.2.3