diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/mitmproxy/test_examples.py | 119 | ||||
-rw-r--r-- | test/mitmproxy/test_flow_export.py | 1 | ||||
-rw-r--r-- | test/mitmproxy/test_har_extractor.py | 37 | ||||
-rw-r--r-- | test/mitmproxy/tutils.py | 1 | ||||
-rw-r--r-- | test/netlib/test_socks.py | 39 |
5 files changed, 157 insertions, 40 deletions
diff --git a/test/mitmproxy/test_examples.py b/test/mitmproxy/test_examples.py index 163ace17..803776ac 100644 --- a/test/mitmproxy/test_examples.py +++ b/test/mitmproxy/test_examples.py @@ -1,11 +1,42 @@ import glob +import json +import os +from contextlib import contextmanager + from mitmproxy import utils, script from mitmproxy.proxy import config -from . import tservers +from netlib import tutils as netutils +from netlib.http import Headers +from . import tservers, tutils + +example_dir = utils.Data(__name__).path("../../examples") + + +class DummyContext(object): + """Emulate script.ScriptContext() functionality.""" + + contentview = None + + def log(self, *args, **kwargs): + pass + + def add_contentview(self, view_obj): + self.contentview = view_obj + + def remove_contentview(self, view_obj): + self.contentview = None + + +@contextmanager +def example(command): + command = os.path.join(example_dir, command) + ctx = DummyContext() + s = script.Script(command, ctx) + yield s + s.unload() def test_load_scripts(): - example_dir = utils.Data(__name__).path("../../examples") scripts = glob.glob("%s/*.py" % example_dir) tmaster = tservers.TestMaster(config.ProxyConfig()) @@ -28,3 +59,87 @@ def test_load_scripts(): raise else: s.unload() + + +def test_add_header(): + flow = tutils.tflow(resp=netutils.tresp()) + with example("add_header.py") as ex: + ex.run("response", flow) + assert flow.response.headers["newheader"] == "foo" + + +def test_custom_contentviews(): + with example("custom_contentviews.py") as ex: + pig = ex.ctx.contentview + _, fmt = pig("<html>test!</html>") + assert any('esttay!' in val[0][1] for val in fmt) + assert not pig("gobbledygook") + + +def test_iframe_injector(): + with tutils.raises(script.ScriptException): + with example("iframe_injector.py") as ex: + pass + + flow = tutils.tflow(resp=netutils.tresp(content="<html>mitmproxy</html>")) + with example("iframe_injector.py http://example.org/evil_iframe") as ex: + ex.run("response", flow) + content = flow.response.content + assert 'iframe' in content and 'evil_iframe' in content + + +def test_modify_form(): + form_header = Headers(content_type="application/x-www-form-urlencoded") + flow = tutils.tflow(req=netutils.treq(headers=form_header)) + with example("modify_form.py") as ex: + ex.run("request", flow) + assert flow.request.urlencoded_form["mitmproxy"] == ["rocks"] + + +def test_modify_querystring(): + flow = tutils.tflow(req=netutils.treq(path="/search?q=term")) + with example("modify_querystring.py") as ex: + ex.run("request", flow) + assert flow.request.query["mitmproxy"] == ["rocks"] + + +def test_modify_response_body(): + with tutils.raises(script.ScriptException): + with example("modify_response_body.py") as ex: + pass + + flow = tutils.tflow(resp=netutils.tresp(content="I <3 mitmproxy")) + with example("modify_response_body.py mitmproxy rocks") as ex: + assert ex.ctx.old == "mitmproxy" and ex.ctx.new == "rocks" + ex.run("response", flow) + assert flow.response.content == "I <3 rocks" + + +def test_redirect_requests(): + flow = tutils.tflow(req=netutils.treq(host="example.org")) + with example("redirect_requests.py") as ex: + ex.run("request", flow) + assert flow.request.host == "mitmproxy.org" + + +def test_har_extractor(): + with tutils.raises(script.ScriptException): + with example("har_extractor.py") as ex: + pass + + times = dict( + timestamp_start=746203272, + timestamp_end=746203272, + ) + + flow = tutils.tflow( + req=netutils.treq(**times), + resp=netutils.tresp(**times) + ) + + with example("har_extractor.py -") as ex: + ex.run("response", flow) + + with open(tutils.test_data.path("data/har_extractor.har")) as fp: + test_data = json.load(fp) + assert json.loads(ex.ctx.HARLog.json()) == test_data["test_response"] diff --git a/test/mitmproxy/test_flow_export.py b/test/mitmproxy/test_flow_export.py index 3dc07427..62161d5d 100644 --- a/test/mitmproxy/test_flow_export.py +++ b/test/mitmproxy/test_flow_export.py @@ -1,4 +1,3 @@ -import json from textwrap import dedent import netlib.tutils diff --git a/test/mitmproxy/test_har_extractor.py b/test/mitmproxy/test_har_extractor.py deleted file mode 100644 index 7838f713..00000000 --- a/test/mitmproxy/test_har_extractor.py +++ /dev/null @@ -1,37 +0,0 @@ -import json -import netlib.tutils -from . import tutils - -from examples import har_extractor - - -class Context(object): - pass - - -trequest = netlib.tutils.treq( - timestamp_start=746203272, - timestamp_end=746203272, -) - -tresponse = netlib.tutils.tresp( - timestamp_start=746203272, - timestamp_end=746203272, -) - - -def test_start(): - tutils.raises(ValueError, har_extractor.start, Context(), []) - - -def test_response(): - ctx = Context() - ctx.HARLog = har_extractor._HARLog([]) - ctx.seen_server = set() - - fl = tutils.tflow(req=trequest, resp=tresponse) - har_extractor.response(ctx, fl) - - with open(tutils.test_data.path("data/har_extractor.har")) as fp: - test_data = json.load(fp) - assert json.loads(ctx.HARLog.json()) == test_data["test_response"] diff --git a/test/mitmproxy/tutils.py b/test/mitmproxy/tutils.py index edcdf3e2..0d65df71 100644 --- a/test/mitmproxy/tutils.py +++ b/test/mitmproxy/tutils.py @@ -93,6 +93,7 @@ def tserver_conn(): c = ServerConnection.from_state(dict( address=dict(address=("address", 22), use_ipv6=True), source_address=dict(address=("address", 22), use_ipv6=True), + peer_address=None, cert=None, timestamp_start=1, timestamp_tcp_setup=2, diff --git a/test/netlib/test_socks.py b/test/netlib/test_socks.py index d95dee41..486b975b 100644 --- a/test/netlib/test_socks.py +++ b/test/netlib/test_socks.py @@ -85,6 +85,45 @@ def test_server_greeting_assert_socks5(): assert False +def test_username_password_auth(): + raw = tutils.treader(b"\x01\x03usr\x03psd\xBE\xEF") + out = BytesIO() + auth = socks.UsernamePasswordAuth.from_file(raw) + auth.assert_authver1() + assert raw.read(2) == b"\xBE\xEF" + auth.to_file(out) + + assert out.getvalue() == raw.getvalue()[:-2] + assert auth.ver == socks.USERNAME_PASSWORD_VERSION.DEFAULT + assert auth.username == "usr" + assert auth.password == "psd" + + +def test_username_password_auth_assert_ver1(): + raw = tutils.treader(b"\x02\x03usr\x03psd\xBE\xEF") + auth = socks.UsernamePasswordAuth.from_file(raw) + tutils.raises(socks.SocksError, auth.assert_authver1) + + +def test_username_password_auth_response(): + raw = tutils.treader(b"\x01\x00\xBE\xEF") + out = BytesIO() + auth = socks.UsernamePasswordAuthResponse.from_file(raw) + auth.assert_authver1() + assert raw.read(2) == b"\xBE\xEF" + auth.to_file(out) + + assert out.getvalue() == raw.getvalue()[:-2] + assert auth.ver == socks.USERNAME_PASSWORD_VERSION.DEFAULT + assert auth.status == 0 + + +def test_username_password_auth_response_auth_assert_ver1(): + raw = tutils.treader(b"\x02\x00\xBE\xEF") + auth = socks.UsernamePasswordAuthResponse.from_file(raw) + tutils.raises(socks.SocksError, auth.assert_authver1) + + def test_message(): raw = tutils.treader(b"\x05\x01\x00\x03\x0bexample.com\xDE\xAD\xBE\xEF") out = BytesIO() |