diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/mitmproxy/builtins/test_script.py | 136 | ||||
-rw-r--r-- | test/mitmproxy/data/addonscripts/duplicate_flow.py | 6 | ||||
-rw-r--r-- | test/mitmproxy/data/addonscripts/error.py | 7 | ||||
-rw-r--r-- | test/mitmproxy/data/addonscripts/recorder.py | 18 | ||||
-rw-r--r-- | test/mitmproxy/data/addonscripts/stream_modify.py | 8 | ||||
-rw-r--r-- | test/mitmproxy/data/addonscripts/tcp_stream_modify.py | 5 | ||||
-rw-r--r-- | test/mitmproxy/test_dump.py | 4 | ||||
-rw-r--r-- | test/mitmproxy/test_flow.py | 64 | ||||
-rw-r--r-- | test/mitmproxy/test_script.py | 13 | ||||
-rw-r--r-- | test/mitmproxy/test_server.py | 22 |
10 files changed, 197 insertions, 86 deletions
diff --git a/test/mitmproxy/builtins/test_script.py b/test/mitmproxy/builtins/test_script.py new file mode 100644 index 00000000..d3366189 --- /dev/null +++ b/test/mitmproxy/builtins/test_script.py @@ -0,0 +1,136 @@ +import time + +from mitmproxy.builtins import script +from mitmproxy import exceptions +from mitmproxy.flow import master +from mitmproxy.flow import state +from mitmproxy import options + +from .. import tutils, mastertest + + +class TestParseCommand: + def test_empty_command(self): + with tutils.raises(exceptions.AddonError): + script.parse_command("") + + with tutils.raises(exceptions.AddonError): + script.parse_command(" ") + + def test_no_script_file(self): + with tutils.raises("not found"): + script.parse_command("notfound") + + with tutils.tmpdir() as dir: + with tutils.raises("not a file"): + script.parse_command(dir) + + def test_parse_args(self): + with tutils.chdir(tutils.test_data.dirname): + assert script.parse_command("data/scripts/a.py") == ("data/scripts/a.py", []) + assert script.parse_command("data/scripts/a.py foo bar") == ("data/scripts/a.py", ["foo", "bar"]) + assert script.parse_command("data/scripts/a.py 'foo bar'") == ("data/scripts/a.py", ["foo bar"]) + + @tutils.skip_not_windows + def test_parse_windows(self): + with tutils.chdir(tutils.test_data.dirname): + assert script.parse_command("data\\scripts\\a.py") == ("data\\scripts\\a.py", []) + assert script.parse_command("data\\scripts\\a.py 'foo \\ bar'") == ("data\\scripts\\a.py", 'foo \\ bar', []) + + +def test_load_script(): + ns = script.load_script( + tutils.test_data.path( + "data/addonscripts/recorder.py" + ), [] + ) + assert ns["configure"] + + +class RecordingMaster(master.FlowMaster): + def __init__(self, *args, **kwargs): + master.FlowMaster.__init__(self, *args, **kwargs) + self.event_log = [] + + def add_event(self, e, level): + self.event_log.append((level, e)) + + +class TestScript(mastertest.MasterTest): + def test_simple(self): + s = state.State() + m = master.FlowMaster(options.Options(), None, s) + sc = script.Script( + tutils.test_data.path( + "data/addonscripts/recorder.py" + ) + ) + m.addons.add(sc) + assert sc.ns["call_log"] == [("configure", (options.Options(),), {})] + + sc.ns["call_log"] = [] + f = tutils.tflow(resp=True) + self.invoke(m, "request", f) + + recf = sc.ns["call_log"][0] + assert recf[0] == "request" + + def test_reload(self): + s = state.State() + m = RecordingMaster(options.Options(), None, s) + with tutils.tmpdir(): + with open("foo.py", "w"): + pass + sc = script.Script("foo.py") + m.addons.add(sc) + + for _ in range(100): + with open("foo.py", "a") as f: + f.write(".") + time.sleep(0.1) + if m.event_log: + return + raise AssertionError("Change event not detected.") + + def test_exception(self): + s = state.State() + m = RecordingMaster(options.Options(), None, s) + sc = script.Script( + tutils.test_data.path("data/addonscripts/error.py") + ) + m.addons.add(sc) + f = tutils.tflow(resp=True) + self.invoke(m, "request", f) + assert m.event_log[0][0] == "warn" + + def test_duplicate_flow(self): + s = state.State() + fm = master.FlowMaster(None, None, s) + fm.addons.add( + script.Script( + tutils.test_data.path("data/addonscripts/duplicate_flow.py") + ) + ) + f = tutils.tflow() + fm.request(f) + assert fm.state.flow_count() == 2 + assert not fm.state.view[0].request.is_replay + assert fm.state.view[1].request.is_replay + + +class TestScriptLoader(mastertest.MasterTest): + def test_simple(self): + s = state.State() + o = options.Options(scripts=[]) + m = master.FlowMaster(o, None, s) + sc = script.ScriptLoader() + m.addons.add(sc) + assert len(m.addons) == 1 + o.update( + scripts = [ + tutils.test_data.path("data/addonscripts/recorder.py") + ] + ) + assert len(m.addons) == 2 + o.update(scripts = []) + assert len(m.addons) == 1 diff --git a/test/mitmproxy/data/addonscripts/duplicate_flow.py b/test/mitmproxy/data/addonscripts/duplicate_flow.py new file mode 100644 index 00000000..b466423c --- /dev/null +++ b/test/mitmproxy/data/addonscripts/duplicate_flow.py @@ -0,0 +1,6 @@ +from mitmproxy import ctx + + +def request(flow): + f = ctx.master.duplicate_flow(flow) + ctx.master.replay_request(f, block=True) diff --git a/test/mitmproxy/data/addonscripts/error.py b/test/mitmproxy/data/addonscripts/error.py new file mode 100644 index 00000000..8ece9fce --- /dev/null +++ b/test/mitmproxy/data/addonscripts/error.py @@ -0,0 +1,7 @@ + +def mkerr(): + raise ValueError("Error!") + + +def request(flow): + mkerr() diff --git a/test/mitmproxy/data/addonscripts/recorder.py b/test/mitmproxy/data/addonscripts/recorder.py new file mode 100644 index 00000000..728203e3 --- /dev/null +++ b/test/mitmproxy/data/addonscripts/recorder.py @@ -0,0 +1,18 @@ +from mitmproxy import controller +from mitmproxy import ctx + +call_log = [] + +# Keep a log of all possible event calls +evts = list(controller.Events) + ["configure"] +for i in evts: + def mkprox(): + evt = i + + def prox(*args, **kwargs): + lg = (evt, args, kwargs) + if evt != "log": + ctx.log.info(str(lg)) + call_log.append(lg) + return prox + globals()[i] = mkprox() diff --git a/test/mitmproxy/data/addonscripts/stream_modify.py b/test/mitmproxy/data/addonscripts/stream_modify.py new file mode 100644 index 00000000..bc616342 --- /dev/null +++ b/test/mitmproxy/data/addonscripts/stream_modify.py @@ -0,0 +1,8 @@ + +def modify(chunks): + for chunk in chunks: + yield chunk.replace(b"foo", b"bar") + + +def responseheaders(flow): + flow.response.stream = modify diff --git a/test/mitmproxy/data/addonscripts/tcp_stream_modify.py b/test/mitmproxy/data/addonscripts/tcp_stream_modify.py new file mode 100644 index 00000000..af4ccf7e --- /dev/null +++ b/test/mitmproxy/data/addonscripts/tcp_stream_modify.py @@ -0,0 +1,5 @@ + +def tcp_message(flow): + message = flow.messages[-1] + if not message.from_client: + message.content = message.content.replace(b"foo", b"bar") diff --git a/test/mitmproxy/test_dump.py b/test/mitmproxy/test_dump.py index 9686be84..201386e3 100644 --- a/test/mitmproxy/test_dump.py +++ b/test/mitmproxy/test_dump.py @@ -245,12 +245,12 @@ class TestDumpMaster(mastertest.MasterTest): assert "XRESPONSE" in ret assert "XCLIENTDISCONNECT" in ret tutils.raises( - dump.DumpError, + exceptions.AddonError, self.mkmaster, None, scripts=["nonexistent"] ) tutils.raises( - dump.DumpError, + exceptions.AddonError, self.mkmaster, None, scripts=["starterr.py"] ) diff --git a/test/mitmproxy/test_flow.py b/test/mitmproxy/test_flow.py index 1a07f74d..c58a9703 100644 --- a/test/mitmproxy/test_flow.py +++ b/test/mitmproxy/test_flow.py @@ -5,7 +5,7 @@ import netlib.utils from netlib.http import Headers from mitmproxy import filt, controller, flow from mitmproxy.contrib import tnetstring -from mitmproxy.exceptions import FlowReadException, ScriptException +from mitmproxy.exceptions import FlowReadException from mitmproxy.models import Error from mitmproxy.models import Flow from mitmproxy.models import HTTPFlow @@ -674,21 +674,6 @@ class TestSerialize: class TestFlowMaster: - def test_load_script(self): - s = flow.State() - fm = flow.FlowMaster(None, None, s) - - fm.load_script(tutils.test_data.path("data/scripts/a.py")) - fm.load_script(tutils.test_data.path("data/scripts/a.py")) - fm.unload_scripts() - with tutils.raises(ScriptException): - fm.load_script("nonexistent") - try: - fm.load_script(tutils.test_data.path("data/scripts/starterr.py")) - except ScriptException as e: - assert "ValueError" in str(e) - assert len(fm.scripts) == 0 - def test_getset_ignore(self): p = mock.Mock() p.config.check_ignore = HostMatcher() @@ -708,51 +693,7 @@ class TestFlowMaster: assert "intercepting" in fm.replay_request(f) f.live = True - assert "live" in fm.replay_request(f, run_scripthooks=True) - - def test_script_reqerr(self): - s = flow.State() - fm = flow.FlowMaster(None, None, s) - fm.load_script(tutils.test_data.path("data/scripts/reqerr.py")) - f = tutils.tflow() - fm.clientconnect(f.client_conn) - assert fm.request(f) - - def test_script(self): - s = flow.State() - fm = flow.FlowMaster(None, None, s) - fm.load_script(tutils.test_data.path("data/scripts/all.py")) - f = tutils.tflow(resp=True) - - f.client_conn.acked = False - fm.clientconnect(f.client_conn) - assert fm.scripts[0].ns["log"][-1] == "clientconnect" - f.server_conn.acked = False - fm.serverconnect(f.server_conn) - assert fm.scripts[0].ns["log"][-1] == "serverconnect" - f.reply.acked = False - fm.request(f) - assert fm.scripts[0].ns["log"][-1] == "request" - f.reply.acked = False - fm.response(f) - assert fm.scripts[0].ns["log"][-1] == "response" - # load second script - fm.load_script(tutils.test_data.path("data/scripts/all.py")) - assert len(fm.scripts) == 2 - f.server_conn.reply.acked = False - fm.clientdisconnect(f.server_conn) - assert fm.scripts[0].ns["log"][-1] == "clientdisconnect" - assert fm.scripts[1].ns["log"][-1] == "clientdisconnect" - - # unload first script - fm.unload_scripts() - assert len(fm.scripts) == 0 - fm.load_script(tutils.test_data.path("data/scripts/all.py")) - - f.error = tutils.terr() - f.reply.acked = False - fm.error(f) - assert fm.scripts[0].ns["log"][-1] == "error" + assert "live" in fm.replay_request(f) def test_duplicate_flow(self): s = flow.State() @@ -789,7 +730,6 @@ class TestFlowMaster: f.error.reply = controller.DummyReply() fm.error(f) - fm.load_script(tutils.test_data.path("data/scripts/a.py")) fm.shutdown() def test_client_playback(self): diff --git a/test/mitmproxy/test_script.py b/test/mitmproxy/test_script.py deleted file mode 100644 index 1e8220f1..00000000 --- a/test/mitmproxy/test_script.py +++ /dev/null @@ -1,13 +0,0 @@ -from mitmproxy import flow -from . import tutils - - -def test_duplicate_flow(): - s = flow.State() - fm = flow.FlowMaster(None, None, s) - fm.load_script(tutils.test_data.path("data/scripts/duplicate_flow.py")) - f = tutils.tflow() - fm.request(f) - assert fm.state.flow_count() == 2 - assert not fm.state.view[0].request.is_replay - assert fm.state.view[1].request.is_replay diff --git a/test/mitmproxy/test_server.py b/test/mitmproxy/test_server.py index 9dd8b79c..b1ca6910 100644 --- a/test/mitmproxy/test_server.py +++ b/test/mitmproxy/test_server.py @@ -13,6 +13,7 @@ from netlib.http import authentication, http1 from netlib.tutils import raises from pathod import pathoc, pathod +from mitmproxy.builtins import script from mitmproxy import controller from mitmproxy.proxy.config import HostMatcher from mitmproxy.models import Error, HTTPResponse, HTTPFlow @@ -287,10 +288,13 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin, AppMixin): self.master.set_stream_large_bodies(None) def test_stream_modify(self): - self.master.load_script(tutils.test_data.path("data/scripts/stream_modify.py")) + s = script.Script( + tutils.test_data.path("data/addonscripts/stream_modify.py") + ) + self.master.addons.add(s) d = self.pathod('200:b"foo"') - assert d.content == b"bar" - self.master.unload_scripts() + assert d.content == "bar" + self.master.addons.remove(s) class TestHTTPAuth(tservers.HTTPProxyTest): @@ -512,15 +516,15 @@ class TestTransparent(tservers.TransparentProxyTest, CommonMixin, TcpMixin): ssl = False def test_tcp_stream_modify(self): - self.master.load_script(tutils.test_data.path("data/scripts/tcp_stream_modify.py")) - + s = script.Script( + tutils.test_data.path("data/addonscripts/tcp_stream_modify.py") + ) + self.master.addons.add(s) self._tcpproxy_on() d = self.pathod('200:b"foo"') self._tcpproxy_off() - - assert d.content == b"bar" - - self.master.unload_scripts() + assert d.content == "bar" + self.master.addons.remove(s) class TestTransparentSSL(tservers.TransparentProxyTest, CommonMixin, TcpMixin): |