aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@nullcube.com>2016-07-14 13:51:00 +1200
committerAldo Cortesi <aldo@nullcube.com>2016-07-14 19:54:15 +1200
commita3a22fba337fc4ac750b8c18663233920a0d646b (patch)
treef274ec77e28b9dc0e260cfb26b63efa15f54f209 /test
parent126625584251d6a246ba46943cfa71d5a57fbdda (diff)
downloadmitmproxy-a3a22fba337fc4ac750b8c18663233920a0d646b.tar.gz
mitmproxy-a3a22fba337fc4ac750b8c18663233920a0d646b.tar.bz2
mitmproxy-a3a22fba337fc4ac750b8c18663233920a0d646b.zip
First-order integration of scripts addon
Diffstat (limited to 'test')
-rw-r--r--test/mitmproxy/builtins/test_script.py136
-rw-r--r--test/mitmproxy/data/addonscripts/duplicate_flow.py6
-rw-r--r--test/mitmproxy/data/addonscripts/error.py7
-rw-r--r--test/mitmproxy/data/addonscripts/recorder.py18
-rw-r--r--test/mitmproxy/data/addonscripts/stream_modify.py8
-rw-r--r--test/mitmproxy/data/addonscripts/tcp_stream_modify.py5
-rw-r--r--test/mitmproxy/test_dump.py4
-rw-r--r--test/mitmproxy/test_flow.py64
-rw-r--r--test/mitmproxy/test_script.py13
-rw-r--r--test/mitmproxy/test_server.py22
10 files changed, 197 insertions, 86 deletions
diff --git a/test/mitmproxy/builtins/test_script.py b/test/mitmproxy/builtins/test_script.py
new file mode 100644
index 00000000..d3366189
--- /dev/null
+++ b/test/mitmproxy/builtins/test_script.py
@@ -0,0 +1,136 @@
+import time
+
+from mitmproxy.builtins import script
+from mitmproxy import exceptions
+from mitmproxy.flow import master
+from mitmproxy.flow import state
+from mitmproxy import options
+
+from .. import tutils, mastertest
+
+
+class TestParseCommand:
+ def test_empty_command(self):
+ with tutils.raises(exceptions.AddonError):
+ script.parse_command("")
+
+ with tutils.raises(exceptions.AddonError):
+ script.parse_command(" ")
+
+ def test_no_script_file(self):
+ with tutils.raises("not found"):
+ script.parse_command("notfound")
+
+ with tutils.tmpdir() as dir:
+ with tutils.raises("not a file"):
+ script.parse_command(dir)
+
+ def test_parse_args(self):
+ with tutils.chdir(tutils.test_data.dirname):
+ assert script.parse_command("data/scripts/a.py") == ("data/scripts/a.py", [])
+ assert script.parse_command("data/scripts/a.py foo bar") == ("data/scripts/a.py", ["foo", "bar"])
+ assert script.parse_command("data/scripts/a.py 'foo bar'") == ("data/scripts/a.py", ["foo bar"])
+
+ @tutils.skip_not_windows
+ def test_parse_windows(self):
+ with tutils.chdir(tutils.test_data.dirname):
+ assert script.parse_command("data\\scripts\\a.py") == ("data\\scripts\\a.py", [])
+ assert script.parse_command("data\\scripts\\a.py 'foo \\ bar'") == ("data\\scripts\\a.py", 'foo \\ bar', [])
+
+
+def test_load_script():
+ ns = script.load_script(
+ tutils.test_data.path(
+ "data/addonscripts/recorder.py"
+ ), []
+ )
+ assert ns["configure"]
+
+
+class RecordingMaster(master.FlowMaster):
+ def __init__(self, *args, **kwargs):
+ master.FlowMaster.__init__(self, *args, **kwargs)
+ self.event_log = []
+
+ def add_event(self, e, level):
+ self.event_log.append((level, e))
+
+
+class TestScript(mastertest.MasterTest):
+ def test_simple(self):
+ s = state.State()
+ m = master.FlowMaster(options.Options(), None, s)
+ sc = script.Script(
+ tutils.test_data.path(
+ "data/addonscripts/recorder.py"
+ )
+ )
+ m.addons.add(sc)
+ assert sc.ns["call_log"] == [("configure", (options.Options(),), {})]
+
+ sc.ns["call_log"] = []
+ f = tutils.tflow(resp=True)
+ self.invoke(m, "request", f)
+
+ recf = sc.ns["call_log"][0]
+ assert recf[0] == "request"
+
+ def test_reload(self):
+ s = state.State()
+ m = RecordingMaster(options.Options(), None, s)
+ with tutils.tmpdir():
+ with open("foo.py", "w"):
+ pass
+ sc = script.Script("foo.py")
+ m.addons.add(sc)
+
+ for _ in range(100):
+ with open("foo.py", "a") as f:
+ f.write(".")
+ time.sleep(0.1)
+ if m.event_log:
+ return
+ raise AssertionError("Change event not detected.")
+
+ def test_exception(self):
+ s = state.State()
+ m = RecordingMaster(options.Options(), None, s)
+ sc = script.Script(
+ tutils.test_data.path("data/addonscripts/error.py")
+ )
+ m.addons.add(sc)
+ f = tutils.tflow(resp=True)
+ self.invoke(m, "request", f)
+ assert m.event_log[0][0] == "warn"
+
+ def test_duplicate_flow(self):
+ s = state.State()
+ fm = master.FlowMaster(None, None, s)
+ fm.addons.add(
+ script.Script(
+ tutils.test_data.path("data/addonscripts/duplicate_flow.py")
+ )
+ )
+ f = tutils.tflow()
+ fm.request(f)
+ assert fm.state.flow_count() == 2
+ assert not fm.state.view[0].request.is_replay
+ assert fm.state.view[1].request.is_replay
+
+
+class TestScriptLoader(mastertest.MasterTest):
+ def test_simple(self):
+ s = state.State()
+ o = options.Options(scripts=[])
+ m = master.FlowMaster(o, None, s)
+ sc = script.ScriptLoader()
+ m.addons.add(sc)
+ assert len(m.addons) == 1
+ o.update(
+ scripts = [
+ tutils.test_data.path("data/addonscripts/recorder.py")
+ ]
+ )
+ assert len(m.addons) == 2
+ o.update(scripts = [])
+ assert len(m.addons) == 1
diff --git a/test/mitmproxy/data/addonscripts/duplicate_flow.py b/test/mitmproxy/data/addonscripts/duplicate_flow.py
new file mode 100644
index 00000000..b466423c
--- /dev/null
+++ b/test/mitmproxy/data/addonscripts/duplicate_flow.py
@@ -0,0 +1,6 @@
+from mitmproxy import ctx
+
+
+def request(flow):
+ f = ctx.master.duplicate_flow(flow)
+ ctx.master.replay_request(f, block=True)
diff --git a/test/mitmproxy/data/addonscripts/error.py b/test/mitmproxy/data/addonscripts/error.py
new file mode 100644
index 00000000..8ece9fce
--- /dev/null
+++ b/test/mitmproxy/data/addonscripts/error.py
@@ -0,0 +1,7 @@
+
+def mkerr():
+ raise ValueError("Error!")
+
+
+def request(flow):
+ mkerr()
diff --git a/test/mitmproxy/data/addonscripts/recorder.py b/test/mitmproxy/data/addonscripts/recorder.py
new file mode 100644
index 00000000..728203e3
--- /dev/null
+++ b/test/mitmproxy/data/addonscripts/recorder.py
@@ -0,0 +1,18 @@
+from mitmproxy import controller
+from mitmproxy import ctx
+
+call_log = []
+
+# Keep a log of all possible event calls
+evts = list(controller.Events) + ["configure"]
+for i in evts:
+ def mkprox():
+ evt = i
+
+ def prox(*args, **kwargs):
+ lg = (evt, args, kwargs)
+ if evt != "log":
+ ctx.log.info(str(lg))
+ call_log.append(lg)
+ return prox
+ globals()[i] = mkprox()
diff --git a/test/mitmproxy/data/addonscripts/stream_modify.py b/test/mitmproxy/data/addonscripts/stream_modify.py
new file mode 100644
index 00000000..bc616342
--- /dev/null
+++ b/test/mitmproxy/data/addonscripts/stream_modify.py
@@ -0,0 +1,8 @@
+
+def modify(chunks):
+ for chunk in chunks:
+ yield chunk.replace(b"foo", b"bar")
+
+
+def responseheaders(flow):
+ flow.response.stream = modify
diff --git a/test/mitmproxy/data/addonscripts/tcp_stream_modify.py b/test/mitmproxy/data/addonscripts/tcp_stream_modify.py
new file mode 100644
index 00000000..af4ccf7e
--- /dev/null
+++ b/test/mitmproxy/data/addonscripts/tcp_stream_modify.py
@@ -0,0 +1,5 @@
+
+def tcp_message(flow):
+ message = flow.messages[-1]
+ if not message.from_client:
+ message.content = message.content.replace(b"foo", b"bar")
diff --git a/test/mitmproxy/test_dump.py b/test/mitmproxy/test_dump.py
index 9686be84..201386e3 100644
--- a/test/mitmproxy/test_dump.py
+++ b/test/mitmproxy/test_dump.py
@@ -245,12 +245,12 @@ class TestDumpMaster(mastertest.MasterTest):
assert "XRESPONSE" in ret
assert "XCLIENTDISCONNECT" in ret
tutils.raises(
- dump.DumpError,
+ exceptions.AddonError,
self.mkmaster,
None, scripts=["nonexistent"]
)
tutils.raises(
- dump.DumpError,
+ exceptions.AddonError,
self.mkmaster,
None, scripts=["starterr.py"]
)
diff --git a/test/mitmproxy/test_flow.py b/test/mitmproxy/test_flow.py
index 1a07f74d..c58a9703 100644
--- a/test/mitmproxy/test_flow.py
+++ b/test/mitmproxy/test_flow.py
@@ -5,7 +5,7 @@ import netlib.utils
from netlib.http import Headers
from mitmproxy import filt, controller, flow
from mitmproxy.contrib import tnetstring
-from mitmproxy.exceptions import FlowReadException, ScriptException
+from mitmproxy.exceptions import FlowReadException
from mitmproxy.models import Error
from mitmproxy.models import Flow
from mitmproxy.models import HTTPFlow
@@ -674,21 +674,6 @@ class TestSerialize:
class TestFlowMaster:
- def test_load_script(self):
- s = flow.State()
- fm = flow.FlowMaster(None, None, s)
-
- fm.load_script(tutils.test_data.path("data/scripts/a.py"))
- fm.load_script(tutils.test_data.path("data/scripts/a.py"))
- fm.unload_scripts()
- with tutils.raises(ScriptException):
- fm.load_script("nonexistent")
- try:
- fm.load_script(tutils.test_data.path("data/scripts/starterr.py"))
- except ScriptException as e:
- assert "ValueError" in str(e)
- assert len(fm.scripts) == 0
-
def test_getset_ignore(self):
p = mock.Mock()
p.config.check_ignore = HostMatcher()
@@ -708,51 +693,7 @@ class TestFlowMaster:
assert "intercepting" in fm.replay_request(f)
f.live = True
- assert "live" in fm.replay_request(f, run_scripthooks=True)
-
- def test_script_reqerr(self):
- s = flow.State()
- fm = flow.FlowMaster(None, None, s)
- fm.load_script(tutils.test_data.path("data/scripts/reqerr.py"))
- f = tutils.tflow()
- fm.clientconnect(f.client_conn)
- assert fm.request(f)
-
- def test_script(self):
- s = flow.State()
- fm = flow.FlowMaster(None, None, s)
- fm.load_script(tutils.test_data.path("data/scripts/all.py"))
- f = tutils.tflow(resp=True)
-
- f.client_conn.acked = False
- fm.clientconnect(f.client_conn)
- assert fm.scripts[0].ns["log"][-1] == "clientconnect"
- f.server_conn.acked = False
- fm.serverconnect(f.server_conn)
- assert fm.scripts[0].ns["log"][-1] == "serverconnect"
- f.reply.acked = False
- fm.request(f)
- assert fm.scripts[0].ns["log"][-1] == "request"
- f.reply.acked = False
- fm.response(f)
- assert fm.scripts[0].ns["log"][-1] == "response"
- # load second script
- fm.load_script(tutils.test_data.path("data/scripts/all.py"))
- assert len(fm.scripts) == 2
- f.server_conn.reply.acked = False
- fm.clientdisconnect(f.server_conn)
- assert fm.scripts[0].ns["log"][-1] == "clientdisconnect"
- assert fm.scripts[1].ns["log"][-1] == "clientdisconnect"
-
- # unload first script
- fm.unload_scripts()
- assert len(fm.scripts) == 0
- fm.load_script(tutils.test_data.path("data/scripts/all.py"))
-
- f.error = tutils.terr()
- f.reply.acked = False
- fm.error(f)
- assert fm.scripts[0].ns["log"][-1] == "error"
+ assert "live" in fm.replay_request(f)
def test_duplicate_flow(self):
s = flow.State()
@@ -789,7 +730,6 @@ class TestFlowMaster:
f.error.reply = controller.DummyReply()
fm.error(f)
- fm.load_script(tutils.test_data.path("data/scripts/a.py"))
fm.shutdown()
def test_client_playback(self):
diff --git a/test/mitmproxy/test_script.py b/test/mitmproxy/test_script.py
deleted file mode 100644
index 1e8220f1..00000000
--- a/test/mitmproxy/test_script.py
+++ /dev/null
@@ -1,13 +0,0 @@
-from mitmproxy import flow
-from . import tutils
-
-
-def test_duplicate_flow():
- s = flow.State()
- fm = flow.FlowMaster(None, None, s)
- fm.load_script(tutils.test_data.path("data/scripts/duplicate_flow.py"))
- f = tutils.tflow()
- fm.request(f)
- assert fm.state.flow_count() == 2
- assert not fm.state.view[0].request.is_replay
- assert fm.state.view[1].request.is_replay
diff --git a/test/mitmproxy/test_server.py b/test/mitmproxy/test_server.py
index 9dd8b79c..b1ca6910 100644
--- a/test/mitmproxy/test_server.py
+++ b/test/mitmproxy/test_server.py
@@ -13,6 +13,7 @@ from netlib.http import authentication, http1
from netlib.tutils import raises
from pathod import pathoc, pathod
+from mitmproxy.builtins import script
from mitmproxy import controller
from mitmproxy.proxy.config import HostMatcher
from mitmproxy.models import Error, HTTPResponse, HTTPFlow
@@ -287,10 +288,13 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin, AppMixin):
self.master.set_stream_large_bodies(None)
def test_stream_modify(self):
- self.master.load_script(tutils.test_data.path("data/scripts/stream_modify.py"))
+ s = script.Script(
+ tutils.test_data.path("data/addonscripts/stream_modify.py")
+ )
+ self.master.addons.add(s)
d = self.pathod('200:b"foo"')
- assert d.content == b"bar"
- self.master.unload_scripts()
+ assert d.content == "bar"
+ self.master.addons.remove(s)
class TestHTTPAuth(tservers.HTTPProxyTest):
@@ -512,15 +516,15 @@ class TestTransparent(tservers.TransparentProxyTest, CommonMixin, TcpMixin):
ssl = False
def test_tcp_stream_modify(self):
- self.master.load_script(tutils.test_data.path("data/scripts/tcp_stream_modify.py"))
-
+ s = script.Script(
+ tutils.test_data.path("data/addonscripts/tcp_stream_modify.py")
+ )
+ self.master.addons.add(s)
self._tcpproxy_on()
d = self.pathod('200:b"foo"')
self._tcpproxy_off()
-
- assert d.content == b"bar"
-
- self.master.unload_scripts()
+ assert d.content == "bar"
+ self.master.addons.remove(s)
class TestTransparentSSL(tservers.TransparentProxyTest, CommonMixin, TcpMixin):