aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--examples/custom_contentviews.py2
-rw-r--r--examples/filt.py2
-rw-r--r--examples/flowwriter.py2
-rw-r--r--examples/har_extractor.py2
-rw-r--r--examples/iframe_injector.py2
-rw-r--r--examples/modify_response_body.py2
-rw-r--r--examples/proxapp.py2
-rw-r--r--examples/stub.py6
-rw-r--r--examples/tls_passthrough.py2
-rw-r--r--mitmproxy/controller.py1
-rw-r--r--netlib/utils.py7
-rw-r--r--test/mitmproxy/script/test_reloader.py34
-rw-r--r--test/mitmproxy/script/test_script.py83
-rw-r--r--test/mitmproxy/test_examples.py234
14 files changed, 120 insertions, 261 deletions
diff --git a/examples/custom_contentviews.py b/examples/custom_contentviews.py
index 5a63e2a0..b10d936f 100644
--- a/examples/custom_contentviews.py
+++ b/examples/custom_contentviews.py
@@ -62,7 +62,7 @@ class ViewPigLatin(contentviews.View):
pig_view = ViewPigLatin()
-def start():
+def configure(options):
contentviews.add(pig_view)
diff --git a/examples/filt.py b/examples/filt.py
index 21744edd..102d1274 100644
--- a/examples/filt.py
+++ b/examples/filt.py
@@ -6,7 +6,7 @@ from mitmproxy import filt
state = {}
-def start():
+def configure(options):
if len(sys.argv) != 2:
raise ValueError("Usage: -s 'filt.py FILTER'")
state["filter"] = filt.parse(sys.argv[1])
diff --git a/examples/flowwriter.py b/examples/flowwriter.py
index 07c7ca20..d8fbc1f4 100644
--- a/examples/flowwriter.py
+++ b/examples/flowwriter.py
@@ -6,7 +6,7 @@ from mitmproxy.flow import FlowWriter
state = {}
-def start():
+def configure(options):
if len(sys.argv) != 2:
raise ValueError('Usage: -s "flowriter.py filename"')
diff --git a/examples/har_extractor.py b/examples/har_extractor.py
index 2a69b9af..23deb43a 100644
--- a/examples/har_extractor.py
+++ b/examples/har_extractor.py
@@ -61,7 +61,7 @@ class Context(object):
context = Context()
-def start():
+def configure(options):
"""
On start we create a HARLog instance. You will have to adapt this to
suit your actual needs of HAR generation. As it will probably be
diff --git a/examples/iframe_injector.py b/examples/iframe_injector.py
index 70247d31..40934dd3 100644
--- a/examples/iframe_injector.py
+++ b/examples/iframe_injector.py
@@ -7,7 +7,7 @@ from mitmproxy.models import decoded
iframe_url = None
-def start():
+def configure(options):
if len(sys.argv) != 2:
raise ValueError('Usage: -s "iframe_injector.py url"')
global iframe_url
diff --git a/examples/modify_response_body.py b/examples/modify_response_body.py
index 23ad0151..8b6908a4 100644
--- a/examples/modify_response_body.py
+++ b/examples/modify_response_body.py
@@ -8,7 +8,7 @@ from mitmproxy.models import decoded
state = {}
-def start():
+def configure(options):
if len(sys.argv) != 3:
raise ValueError('Usage: -s "modify_response_body.py old new"')
# You may want to use Python's argparse for more sophisticated argument
diff --git a/examples/proxapp.py b/examples/proxapp.py
index 2935b587..095f412a 100644
--- a/examples/proxapp.py
+++ b/examples/proxapp.py
@@ -16,7 +16,7 @@ def hello_world():
# Register the app using the magic domain "proxapp" on port 80. Requests to
# this domain and port combination will now be routed to the WSGI app instance.
-def start():
+def configure(options):
mitmproxy.ctx.master.apps.add(app, "proxapp", 80)
# SSL works too, but the magic domain needs to be resolvable from the mitmproxy machine due to mitmproxy's design.
diff --git a/examples/stub.py b/examples/stub.py
index 10b34283..614acee2 100644
--- a/examples/stub.py
+++ b/examples/stub.py
@@ -4,11 +4,11 @@ import mitmproxy
"""
-def start():
+def configure(options):
"""
- Called once on script startup, before any other events.
+ Called once on script startup before any other events, and whenever options changes.
"""
- mitmproxy.ctx.log("start")
+ mitmproxy.ctx.log("configure")
def clientconnect(root_layer):
diff --git a/examples/tls_passthrough.py b/examples/tls_passthrough.py
index 20e8f9be..306f55f6 100644
--- a/examples/tls_passthrough.py
+++ b/examples/tls_passthrough.py
@@ -113,7 +113,7 @@ class TlsFeedback(TlsLayer):
tls_strategy = None
-def start():
+def configure(options):
global tls_strategy
if len(sys.argv) == 2:
tls_strategy = ProbabilisticStrategy(float(sys.argv[1]))
diff --git a/mitmproxy/controller.py b/mitmproxy/controller.py
index 464842b6..bffef58a 100644
--- a/mitmproxy/controller.py
+++ b/mitmproxy/controller.py
@@ -32,6 +32,7 @@ Events = frozenset([
"error",
"log",
+ "done",
"script_change",
])
diff --git a/netlib/utils.py b/netlib/utils.py
index 23c16dc3..9eebf22c 100644
--- a/netlib/utils.py
+++ b/netlib/utils.py
@@ -56,6 +56,13 @@ class Data(object):
dirname = os.path.dirname(inspect.getsourcefile(m))
self.dirname = os.path.abspath(dirname)
+ def push(self, subpath):
+ """
+ Change the data object to a path relative to the module.
+ """
+ self.dirname = os.path.join(self.dirname, subpath)
+ return self
+
def path(self, path):
"""
Returns a path to the package data housed at 'path' under this
diff --git a/test/mitmproxy/script/test_reloader.py b/test/mitmproxy/script/test_reloader.py
deleted file mode 100644
index e33903b9..00000000
--- a/test/mitmproxy/script/test_reloader.py
+++ /dev/null
@@ -1,34 +0,0 @@
-import mock
-from mitmproxy.script.reloader import watch, unwatch
-from test.mitmproxy import tutils
-from threading import Event
-
-
-def test_simple():
- with tutils.tmpdir():
- with open("foo.py", "w"):
- pass
-
- script = mock.Mock()
- script.path = "foo.py"
-
- e = Event()
-
- def _onchange():
- e.set()
-
- watch(script, _onchange)
- with tutils.raises("already observed"):
- watch(script, _onchange)
-
- # Some reloaders don't register a change directly after watching, because they first need to initialize.
- # To test if watching works at all, we do repeated writes every 100ms.
- for _ in range(100):
- with open("foo.py", "a") as f:
- f.write(".")
- if e.wait(0.1):
- break
- else:
- raise AssertionError("No change detected.")
-
- unwatch(script)
diff --git a/test/mitmproxy/script/test_script.py b/test/mitmproxy/script/test_script.py
deleted file mode 100644
index 48fe65c9..00000000
--- a/test/mitmproxy/script/test_script.py
+++ /dev/null
@@ -1,83 +0,0 @@
-from mitmproxy.script import Script
-from mitmproxy.exceptions import ScriptException
-from test.mitmproxy import tutils
-
-
-class TestParseCommand:
- def test_empty_command(self):
- with tutils.raises(ScriptException):
- Script.parse_command("")
-
- with tutils.raises(ScriptException):
- Script.parse_command(" ")
-
- def test_no_script_file(self):
- with tutils.raises("not found"):
- Script.parse_command("notfound")
-
- with tutils.tmpdir() as dir:
- with tutils.raises("not a file"):
- Script.parse_command(dir)
-
- def test_parse_args(self):
- with tutils.chdir(tutils.test_data.dirname):
- assert Script.parse_command("data/scripts/a.py") == ("data/scripts/a.py", [])
- assert Script.parse_command("data/scripts/a.py foo bar") == ("data/scripts/a.py", ["foo", "bar"])
- assert Script.parse_command("data/scripts/a.py 'foo bar'") == ("data/scripts/a.py", ["foo bar"])
-
- @tutils.skip_not_windows
- def test_parse_windows(self):
- with tutils.chdir(tutils.test_data.dirname):
- assert Script.parse_command("data\\scripts\\a.py") == ("data\\scripts\\a.py", [])
- assert Script.parse_command("data\\scripts\\a.py 'foo \\ bar'") == ("data\\scripts\\a.py", ['foo \\ bar'])
-
-
-def test_simple():
- with tutils.chdir(tutils.test_data.path("data/scripts")):
- s = Script("a.py --var 42")
- assert s.path == "a.py"
- assert s.ns is None
-
- s.load()
- assert s.ns["var"] == 42
-
- s.run("here")
- assert s.ns["var"] == 43
-
- s.unload()
- assert s.ns is None
-
- with tutils.raises(ScriptException):
- s.run("here")
-
- with Script("a.py --var 42") as s:
- s.run("here")
-
-
-def test_script_exception():
- with tutils.chdir(tutils.test_data.path("data/scripts")):
- s = Script("syntaxerr.py")
- with tutils.raises(ScriptException):
- s.load()
-
- s = Script("starterr.py")
- with tutils.raises(ScriptException):
- s.load()
-
- s = Script("a.py")
- s.load()
- with tutils.raises(ScriptException):
- s.load()
-
- s = Script("a.py")
- with tutils.raises(ScriptException):
- s.run("here")
-
- with tutils.raises(ScriptException):
- with Script("reqerr.py") as s:
- s.run("request", None)
-
- s = Script("unloaderr.py")
- s.load()
- with tutils.raises(ScriptException):
- s.unload()
diff --git a/test/mitmproxy/test_examples.py b/test/mitmproxy/test_examples.py
index bdadcd11..9c8edb29 100644
--- a/test/mitmproxy/test_examples.py
+++ b/test/mitmproxy/test_examples.py
@@ -1,151 +1,119 @@
-import glob
import json
-import mock
-import os
-import sys
-from contextlib import contextmanager
-from mitmproxy import script
+import os.path
+from mitmproxy.flow import master
+from mitmproxy.flow import state
+from mitmproxy import options
+from mitmproxy import contentviews
+from mitmproxy.builtins import script
import netlib.utils
from netlib import tutils as netutils
from netlib.http import Headers
-from . import tutils
-
-example_dir = netlib.utils.Data(__name__).path("../../examples")
-
-
-@contextmanager
-def example(command):
- command = os.path.join(example_dir, command)
- with script.Script(command) as s:
- yield s
-
-
-@mock.patch("mitmproxy.ctx.master")
-@mock.patch("mitmproxy.ctx.log")
-def test_load_scripts(log, master):
- scripts = glob.glob("%s/*.py" % example_dir)
-
- for f in scripts:
- if "har_extractor" in f:
- continue
- if "flowwriter" in f:
- f += " -"
- if "iframe_injector" in f:
- f += " foo" # one argument required
- if "filt" in f:
- f += " ~a"
- if "modify_response_body" in f:
- f += " foo bar" # two arguments required
-
- s = script.Script(f)
- try:
- s.load()
- except Exception as v:
- if "ImportError" not in str(v):
- raise
- else:
- s.unload()
-
-
-def test_add_header():
- flow = tutils.tflow(resp=netutils.tresp())
- with example("add_header.py") as ex:
- ex.run("response", flow)
- assert flow.response.headers["newheader"] == "foo"
-
-
-@mock.patch("mitmproxy.contentviews.remove")
-@mock.patch("mitmproxy.contentviews.add")
-def test_custom_contentviews(add, remove):
- with example("custom_contentviews.py"):
- assert add.called
- pig = add.call_args[0][0]
- _, fmt = pig(b"<html>test!</html>")
- assert any(b'esttay!' in val[0][1] for val in fmt)
- assert not pig(b"gobbledygook")
- assert remove.called
-
-
-def test_iframe_injector():
- with tutils.raises(script.ScriptException):
- with example("iframe_injector.py"):
- pass
-
- flow = tutils.tflow(resp=netutils.tresp(content=b"<html>mitmproxy</html>"))
- with example("iframe_injector.py http://example.org/evil_iframe") as ex:
- ex.run("response", flow)
- content = flow.response.content
- assert b'iframe' in content and b'evil_iframe' in content
-
-
-def test_modify_form():
- form_header = Headers(content_type="application/x-www-form-urlencoded")
- flow = tutils.tflow(req=netutils.treq(headers=form_header))
- with example("modify_form.py") as ex:
- ex.run("request", flow)
- assert flow.request.urlencoded_form[b"mitmproxy"] == b"rocks"
-
- flow.request.headers["content-type"] = ""
- ex.run("request", flow)
- assert list(flow.request.urlencoded_form.items()) == [(b"foo", b"bar")]
+from . import tutils, mastertest
+example_dir = netlib.utils.Data(__name__).push("../../examples")
-def test_modify_querystring():
- flow = tutils.tflow(req=netutils.treq(path=b"/search?q=term"))
- with example("modify_querystring.py") as ex:
- ex.run("request", flow)
- assert flow.request.query["mitmproxy"] == "rocks"
- flow.request.path = "/"
- ex.run("request", flow)
- assert flow.request.query["mitmproxy"] == "rocks"
+class ScriptError(Exception):
+ pass
-def test_modify_response_body():
- with tutils.raises(script.ScriptException):
- with example("modify_response_body.py"):
- assert True
+class RaiseMaster(master.FlowMaster):
+ def add_event(self, e, level):
+ if level in ("warn", "error"):
+ raise ScriptError(e)
- flow = tutils.tflow(resp=netutils.tresp(content=b"I <3 mitmproxy"))
- with example("modify_response_body.py mitmproxy rocks") as ex:
- assert ex.ns["state"]["old"] == b"mitmproxy" and ex.ns["state"]["new"] == b"rocks"
- ex.run("response", flow)
- assert flow.response.content == b"I <3 rocks"
+def tscript(cmd, args=""):
+ cmd = example_dir.path(cmd) + " " + args
+ m = RaiseMaster(options.Options(), None, state.State())
+ sc = script.Script(cmd)
+ m.addons.add(sc)
+ return m, sc
-def test_redirect_requests():
- flow = tutils.tflow(req=netutils.treq(host=b"example.org"))
- with example("redirect_requests.py") as ex:
- ex.run("request", flow)
- assert flow.request.host == "mitmproxy.org"
+class TestScripts(mastertest.MasterTest):
+ def test_add_header(self):
+ m, _ = tscript("add_header.py")
+ f = tutils.tflow(resp=netutils.tresp())
+ self.invoke(m, "response", f)
+ assert f.response.headers["newheader"] == "foo"
-@mock.patch("mitmproxy.ctx.log")
-def test_har_extractor(log):
- if sys.version_info >= (3, 0):
- with tutils.raises("does not work on Python 3"):
- with example("har_extractor.py -"):
- pass
- return
+ def test_custom_contentviews(self):
+ m, sc = tscript("custom_contentviews.py")
+ pig = contentviews.get("pig_latin_HTML")
+ _, fmt = pig("<html>test!</html>")
+ assert any('esttay!' in val[0][1] for val in fmt)
+ assert not pig("gobbledygook")
- with tutils.raises(script.ScriptException):
- with example("har_extractor.py"):
- pass
+ def test_iframe_injector(self):
+ with tutils.raises(ScriptError):
+ tscript("iframe_injector.py")
- times = dict(
- timestamp_start=746203272,
- timestamp_end=746203272,
- )
-
- flow = tutils.tflow(
- req=netutils.treq(**times),
- resp=netutils.tresp(**times)
- )
-
- with example("har_extractor.py -") as ex:
- ex.run("response", flow)
-
- with open(tutils.test_data.path("data/har_extractor.har")) as fp:
+ m, sc = tscript("iframe_injector.py", "http://example.org/evil_iframe")
+ flow = tutils.tflow(resp=netutils.tresp(content="<html>mitmproxy</html>"))
+ self.invoke(m, "response", flow)
+ content = flow.response.content
+ assert 'iframe' in content and 'evil_iframe' in content
+
+ def test_modify_form(self):
+ m, sc = tscript("modify_form.py")
+
+ form_header = Headers(content_type="application/x-www-form-urlencoded")
+ f = tutils.tflow(req=netutils.treq(headers=form_header))
+ self.invoke(m, "request", f)
+
+ assert f.request.urlencoded_form["mitmproxy"] == "rocks"
+
+ f.request.headers["content-type"] = ""
+ self.invoke(m, "request", f)
+ assert list(f.request.urlencoded_form.items()) == [("foo", "bar")]
+
+ def test_modify_querystring(self):
+ m, sc = tscript("modify_querystring.py")
+ f = tutils.tflow(req=netutils.treq(path="/search?q=term"))
+
+ self.invoke(m, "request", f)
+ assert f.request.query["mitmproxy"] == "rocks"
+
+ f.request.path = "/"
+ self.invoke(m, "request", f)
+ assert f.request.query["mitmproxy"] == "rocks"
+
+ def test_modify_response_body(self):
+ with tutils.raises(ScriptError):
+ tscript("modify_response_body.py")
+
+ m, sc = tscript("modify_response_body.py", "mitmproxy rocks")
+ f = tutils.tflow(resp=netutils.tresp(content="I <3 mitmproxy"))
+ self.invoke(m, "response", f)
+ assert f.response.content == "I <3 rocks"
+
+ def test_redirect_requests(self):
+ m, sc = tscript("redirect_requests.py")
+ f = tutils.tflow(req=netutils.treq(host="example.org"))
+ self.invoke(m, "request", f)
+ assert f.request.host == "mitmproxy.org"
+
+ def test_har_extractor(self):
+ with tutils.raises(ScriptError):
+ tscript("har_extractor.py")
+
+ with tutils.tmpdir() as tdir:
+ times = dict(
+ timestamp_start=746203272,
+ timestamp_end=746203272,
+ )
+
+ path = os.path.join(tdir, "file")
+ m, sc = tscript("har_extractor.py", path)
+ f = tutils.tflow(
+ req=netutils.treq(**times),
+ resp=netutils.tresp(**times)
+ )
+ self.invoke(m, "response", f)
+ m.addons.remove(sc)
+
+ fp = open(path, "rb")
test_data = json.load(fp)
- assert json.loads(ex.ns["context"].HARLog.json()) == test_data["test_response"]
+ assert len(test_data["log"]["pages"]) == 1