diff options
author | Aldo Cortesi <aldo@nullcube.com> | 2016-07-14 16:20:27 +1200 |
---|---|---|
committer | Aldo Cortesi <aldo@nullcube.com> | 2016-07-14 19:54:15 +1200 |
commit | b94f5fd361af6255ad4d3c7a88b9a21868736dea (patch) | |
tree | 5237bbd32c84d284a5c4b2478a9034f11d39a850 /test | |
parent | a6821aad8e9296640c3efd4275e8922dd7c6e43b (diff) | |
download | mitmproxy-b94f5fd361af6255ad4d3c7a88b9a21868736dea.tar.gz mitmproxy-b94f5fd361af6255ad4d3c7a88b9a21868736dea.tar.bz2 mitmproxy-b94f5fd361af6255ad4d3c7a88b9a21868736dea.zip |
Convert examples and example tests for new-style scripts
Remove the test that just loads all the example scripts for now - it's a very
low-value test, and we need to think of something better.
Diffstat (limited to 'test')
-rw-r--r-- | test/mitmproxy/script/test_reloader.py | 34 | ||||
-rw-r--r-- | test/mitmproxy/script/test_script.py | 83 | ||||
-rw-r--r-- | test/mitmproxy/test_examples.py | 234 |
3 files changed, 101 insertions, 250 deletions
diff --git a/test/mitmproxy/script/test_reloader.py b/test/mitmproxy/script/test_reloader.py deleted file mode 100644 index e33903b9..00000000 --- a/test/mitmproxy/script/test_reloader.py +++ /dev/null @@ -1,34 +0,0 @@ -import mock -from mitmproxy.script.reloader import watch, unwatch -from test.mitmproxy import tutils -from threading import Event - - -def test_simple(): - with tutils.tmpdir(): - with open("foo.py", "w"): - pass - - script = mock.Mock() - script.path = "foo.py" - - e = Event() - - def _onchange(): - e.set() - - watch(script, _onchange) - with tutils.raises("already observed"): - watch(script, _onchange) - - # Some reloaders don't register a change directly after watching, because they first need to initialize. - # To test if watching works at all, we do repeated writes every 100ms. - for _ in range(100): - with open("foo.py", "a") as f: - f.write(".") - if e.wait(0.1): - break - else: - raise AssertionError("No change detected.") - - unwatch(script) diff --git a/test/mitmproxy/script/test_script.py b/test/mitmproxy/script/test_script.py deleted file mode 100644 index 48fe65c9..00000000 --- a/test/mitmproxy/script/test_script.py +++ /dev/null @@ -1,83 +0,0 @@ -from mitmproxy.script import Script -from mitmproxy.exceptions import ScriptException -from test.mitmproxy import tutils - - -class TestParseCommand: - def test_empty_command(self): - with tutils.raises(ScriptException): - Script.parse_command("") - - with tutils.raises(ScriptException): - Script.parse_command(" ") - - def test_no_script_file(self): - with tutils.raises("not found"): - Script.parse_command("notfound") - - with tutils.tmpdir() as dir: - with tutils.raises("not a file"): - Script.parse_command(dir) - - def test_parse_args(self): - with tutils.chdir(tutils.test_data.dirname): - assert Script.parse_command("data/scripts/a.py") == ("data/scripts/a.py", []) - assert Script.parse_command("data/scripts/a.py foo bar") == ("data/scripts/a.py", ["foo", "bar"]) - assert Script.parse_command("data/scripts/a.py 'foo bar'") == ("data/scripts/a.py", ["foo bar"]) - - @tutils.skip_not_windows - def test_parse_windows(self): - with tutils.chdir(tutils.test_data.dirname): - assert Script.parse_command("data\\scripts\\a.py") == ("data\\scripts\\a.py", []) - assert Script.parse_command("data\\scripts\\a.py 'foo \\ bar'") == ("data\\scripts\\a.py", ['foo \\ bar']) - - -def test_simple(): - with tutils.chdir(tutils.test_data.path("data/scripts")): - s = Script("a.py --var 42") - assert s.path == "a.py" - assert s.ns is None - - s.load() - assert s.ns["var"] == 42 - - s.run("here") - assert s.ns["var"] == 43 - - s.unload() - assert s.ns is None - - with tutils.raises(ScriptException): - s.run("here") - - with Script("a.py --var 42") as s: - s.run("here") - - -def test_script_exception(): - with tutils.chdir(tutils.test_data.path("data/scripts")): - s = Script("syntaxerr.py") - with tutils.raises(ScriptException): - s.load() - - s = Script("starterr.py") - with tutils.raises(ScriptException): - s.load() - - s = Script("a.py") - s.load() - with tutils.raises(ScriptException): - s.load() - - s = Script("a.py") - with tutils.raises(ScriptException): - s.run("here") - - with tutils.raises(ScriptException): - with Script("reqerr.py") as s: - s.run("request", None) - - s = Script("unloaderr.py") - s.load() - with tutils.raises(ScriptException): - s.unload() diff --git a/test/mitmproxy/test_examples.py b/test/mitmproxy/test_examples.py index bdadcd11..9c8edb29 100644 --- a/test/mitmproxy/test_examples.py +++ b/test/mitmproxy/test_examples.py @@ -1,151 +1,119 @@ -import glob import json -import mock -import os -import sys -from contextlib import contextmanager -from mitmproxy import script +import os.path +from mitmproxy.flow import master +from mitmproxy.flow import state +from mitmproxy import options +from mitmproxy import contentviews +from mitmproxy.builtins import script import netlib.utils from netlib import tutils as netutils from netlib.http import Headers -from . import tutils - -example_dir = netlib.utils.Data(__name__).path("../../examples") - - -@contextmanager -def example(command): - command = os.path.join(example_dir, command) - with script.Script(command) as s: - yield s - - -@mock.patch("mitmproxy.ctx.master") -@mock.patch("mitmproxy.ctx.log") -def test_load_scripts(log, master): - scripts = glob.glob("%s/*.py" % example_dir) - - for f in scripts: - if "har_extractor" in f: - continue - if "flowwriter" in f: - f += " -" - if "iframe_injector" in f: - f += " foo" # one argument required - if "filt" in f: - f += " ~a" - if "modify_response_body" in f: - f += " foo bar" # two arguments required - - s = script.Script(f) - try: - s.load() - except Exception as v: - if "ImportError" not in str(v): - raise - else: - s.unload() - - -def test_add_header(): - flow = tutils.tflow(resp=netutils.tresp()) - with example("add_header.py") as ex: - ex.run("response", flow) - assert flow.response.headers["newheader"] == "foo" - - -@mock.patch("mitmproxy.contentviews.remove") -@mock.patch("mitmproxy.contentviews.add") -def test_custom_contentviews(add, remove): - with example("custom_contentviews.py"): - assert add.called - pig = add.call_args[0][0] - _, fmt = pig(b"<html>test!</html>") - assert any(b'esttay!' in val[0][1] for val in fmt) - assert not pig(b"gobbledygook") - assert remove.called - - -def test_iframe_injector(): - with tutils.raises(script.ScriptException): - with example("iframe_injector.py"): - pass - - flow = tutils.tflow(resp=netutils.tresp(content=b"<html>mitmproxy</html>")) - with example("iframe_injector.py http://example.org/evil_iframe") as ex: - ex.run("response", flow) - content = flow.response.content - assert b'iframe' in content and b'evil_iframe' in content - - -def test_modify_form(): - form_header = Headers(content_type="application/x-www-form-urlencoded") - flow = tutils.tflow(req=netutils.treq(headers=form_header)) - with example("modify_form.py") as ex: - ex.run("request", flow) - assert flow.request.urlencoded_form[b"mitmproxy"] == b"rocks" - - flow.request.headers["content-type"] = "" - ex.run("request", flow) - assert list(flow.request.urlencoded_form.items()) == [(b"foo", b"bar")] +from . import tutils, mastertest +example_dir = netlib.utils.Data(__name__).push("../../examples") -def test_modify_querystring(): - flow = tutils.tflow(req=netutils.treq(path=b"/search?q=term")) - with example("modify_querystring.py") as ex: - ex.run("request", flow) - assert flow.request.query["mitmproxy"] == "rocks" - flow.request.path = "/" - ex.run("request", flow) - assert flow.request.query["mitmproxy"] == "rocks" +class ScriptError(Exception): + pass -def test_modify_response_body(): - with tutils.raises(script.ScriptException): - with example("modify_response_body.py"): - assert True +class RaiseMaster(master.FlowMaster): + def add_event(self, e, level): + if level in ("warn", "error"): + raise ScriptError(e) - flow = tutils.tflow(resp=netutils.tresp(content=b"I <3 mitmproxy")) - with example("modify_response_body.py mitmproxy rocks") as ex: - assert ex.ns["state"]["old"] == b"mitmproxy" and ex.ns["state"]["new"] == b"rocks" - ex.run("response", flow) - assert flow.response.content == b"I <3 rocks" +def tscript(cmd, args=""): + cmd = example_dir.path(cmd) + " " + args + m = RaiseMaster(options.Options(), None, state.State()) + sc = script.Script(cmd) + m.addons.add(sc) + return m, sc -def test_redirect_requests(): - flow = tutils.tflow(req=netutils.treq(host=b"example.org")) - with example("redirect_requests.py") as ex: - ex.run("request", flow) - assert flow.request.host == "mitmproxy.org" +class TestScripts(mastertest.MasterTest): + def test_add_header(self): + m, _ = tscript("add_header.py") + f = tutils.tflow(resp=netutils.tresp()) + self.invoke(m, "response", f) + assert f.response.headers["newheader"] == "foo" -@mock.patch("mitmproxy.ctx.log") -def test_har_extractor(log): - if sys.version_info >= (3, 0): - with tutils.raises("does not work on Python 3"): - with example("har_extractor.py -"): - pass - return + def test_custom_contentviews(self): + m, sc = tscript("custom_contentviews.py") + pig = contentviews.get("pig_latin_HTML") + _, fmt = pig("<html>test!</html>") + assert any('esttay!' in val[0][1] for val in fmt) + assert not pig("gobbledygook") - with tutils.raises(script.ScriptException): - with example("har_extractor.py"): - pass + def test_iframe_injector(self): + with tutils.raises(ScriptError): + tscript("iframe_injector.py") - times = dict( - timestamp_start=746203272, - timestamp_end=746203272, - ) - - flow = tutils.tflow( - req=netutils.treq(**times), - resp=netutils.tresp(**times) - ) - - with example("har_extractor.py -") as ex: - ex.run("response", flow) - - with open(tutils.test_data.path("data/har_extractor.har")) as fp: + m, sc = tscript("iframe_injector.py", "http://example.org/evil_iframe") + flow = tutils.tflow(resp=netutils.tresp(content="<html>mitmproxy</html>")) + self.invoke(m, "response", flow) + content = flow.response.content + assert 'iframe' in content and 'evil_iframe' in content + + def test_modify_form(self): + m, sc = tscript("modify_form.py") + + form_header = Headers(content_type="application/x-www-form-urlencoded") + f = tutils.tflow(req=netutils.treq(headers=form_header)) + self.invoke(m, "request", f) + + assert f.request.urlencoded_form["mitmproxy"] == "rocks" + + f.request.headers["content-type"] = "" + self.invoke(m, "request", f) + assert list(f.request.urlencoded_form.items()) == [("foo", "bar")] + + def test_modify_querystring(self): + m, sc = tscript("modify_querystring.py") + f = tutils.tflow(req=netutils.treq(path="/search?q=term")) + + self.invoke(m, "request", f) + assert f.request.query["mitmproxy"] == "rocks" + + f.request.path = "/" + self.invoke(m, "request", f) + assert f.request.query["mitmproxy"] == "rocks" + + def test_modify_response_body(self): + with tutils.raises(ScriptError): + tscript("modify_response_body.py") + + m, sc = tscript("modify_response_body.py", "mitmproxy rocks") + f = tutils.tflow(resp=netutils.tresp(content="I <3 mitmproxy")) + self.invoke(m, "response", f) + assert f.response.content == "I <3 rocks" + + def test_redirect_requests(self): + m, sc = tscript("redirect_requests.py") + f = tutils.tflow(req=netutils.treq(host="example.org")) + self.invoke(m, "request", f) + assert f.request.host == "mitmproxy.org" + + def test_har_extractor(self): + with tutils.raises(ScriptError): + tscript("har_extractor.py") + + with tutils.tmpdir() as tdir: + times = dict( + timestamp_start=746203272, + timestamp_end=746203272, + ) + + path = os.path.join(tdir, "file") + m, sc = tscript("har_extractor.py", path) + f = tutils.tflow( + req=netutils.treq(**times), + resp=netutils.tresp(**times) + ) + self.invoke(m, "response", f) + m.addons.remove(sc) + + fp = open(path, "rb") test_data = json.load(fp) - assert json.loads(ex.ns["context"].HARLog.json()) == test_data["test_response"] + assert len(test_data["log"]["pages"]) == 1 |