aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/mitmproxy/test_examples.py119
-rw-r--r--test/mitmproxy/test_flow_export.py1
-rw-r--r--test/mitmproxy/test_har_extractor.py37
-rw-r--r--test/mitmproxy/tutils.py1
-rw-r--r--test/netlib/test_socks.py39
5 files changed, 157 insertions, 40 deletions
diff --git a/test/mitmproxy/test_examples.py b/test/mitmproxy/test_examples.py
index 163ace17..803776ac 100644
--- a/test/mitmproxy/test_examples.py
+++ b/test/mitmproxy/test_examples.py
@@ -1,11 +1,42 @@
import glob
+import json
+import os
+from contextlib import contextmanager
+
from mitmproxy import utils, script
from mitmproxy.proxy import config
-from . import tservers
+from netlib import tutils as netutils
+from netlib.http import Headers
+from . import tservers, tutils
+
+example_dir = utils.Data(__name__).path("../../examples")
+
+
+class DummyContext(object):
+ """Emulate script.ScriptContext() functionality."""
+
+ contentview = None
+
+ def log(self, *args, **kwargs):
+ pass
+
+ def add_contentview(self, view_obj):
+ self.contentview = view_obj
+
+ def remove_contentview(self, view_obj):
+ self.contentview = None
+
+
+@contextmanager
+def example(command):
+ command = os.path.join(example_dir, command)
+ ctx = DummyContext()
+ s = script.Script(command, ctx)
+ yield s
+ s.unload()
def test_load_scripts():
- example_dir = utils.Data(__name__).path("../../examples")
scripts = glob.glob("%s/*.py" % example_dir)
tmaster = tservers.TestMaster(config.ProxyConfig())
@@ -28,3 +59,87 @@ def test_load_scripts():
raise
else:
s.unload()
+
+
+def test_add_header():
+ flow = tutils.tflow(resp=netutils.tresp())
+ with example("add_header.py") as ex:
+ ex.run("response", flow)
+ assert flow.response.headers["newheader"] == "foo"
+
+
+def test_custom_contentviews():
+ with example("custom_contentviews.py") as ex:
+ pig = ex.ctx.contentview
+ _, fmt = pig("<html>test!</html>")
+ assert any('esttay!' in val[0][1] for val in fmt)
+ assert not pig("gobbledygook")
+
+
+def test_iframe_injector():
+ with tutils.raises(script.ScriptException):
+ with example("iframe_injector.py") as ex:
+ pass
+
+ flow = tutils.tflow(resp=netutils.tresp(content="<html>mitmproxy</html>"))
+ with example("iframe_injector.py http://example.org/evil_iframe") as ex:
+ ex.run("response", flow)
+ content = flow.response.content
+ assert 'iframe' in content and 'evil_iframe' in content
+
+
+def test_modify_form():
+ form_header = Headers(content_type="application/x-www-form-urlencoded")
+ flow = tutils.tflow(req=netutils.treq(headers=form_header))
+ with example("modify_form.py") as ex:
+ ex.run("request", flow)
+ assert flow.request.urlencoded_form["mitmproxy"] == ["rocks"]
+
+
+def test_modify_querystring():
+ flow = tutils.tflow(req=netutils.treq(path="/search?q=term"))
+ with example("modify_querystring.py") as ex:
+ ex.run("request", flow)
+ assert flow.request.query["mitmproxy"] == ["rocks"]
+
+
+def test_modify_response_body():
+ with tutils.raises(script.ScriptException):
+ with example("modify_response_body.py") as ex:
+ pass
+
+ flow = tutils.tflow(resp=netutils.tresp(content="I <3 mitmproxy"))
+ with example("modify_response_body.py mitmproxy rocks") as ex:
+ assert ex.ctx.old == "mitmproxy" and ex.ctx.new == "rocks"
+ ex.run("response", flow)
+ assert flow.response.content == "I <3 rocks"
+
+
+def test_redirect_requests():
+ flow = tutils.tflow(req=netutils.treq(host="example.org"))
+ with example("redirect_requests.py") as ex:
+ ex.run("request", flow)
+ assert flow.request.host == "mitmproxy.org"
+
+
+def test_har_extractor():
+ with tutils.raises(script.ScriptException):
+ with example("har_extractor.py") as ex:
+ pass
+
+ times = dict(
+ timestamp_start=746203272,
+ timestamp_end=746203272,
+ )
+
+ flow = tutils.tflow(
+ req=netutils.treq(**times),
+ resp=netutils.tresp(**times)
+ )
+
+ with example("har_extractor.py -") as ex:
+ ex.run("response", flow)
+
+ with open(tutils.test_data.path("data/har_extractor.har")) as fp:
+ test_data = json.load(fp)
+ assert json.loads(ex.ctx.HARLog.json()) == test_data["test_response"]
diff --git a/test/mitmproxy/test_flow_export.py b/test/mitmproxy/test_flow_export.py
index 3dc07427..62161d5d 100644
--- a/test/mitmproxy/test_flow_export.py
+++ b/test/mitmproxy/test_flow_export.py
@@ -1,4 +1,3 @@
-import json
from textwrap import dedent
import netlib.tutils
diff --git a/test/mitmproxy/test_har_extractor.py b/test/mitmproxy/test_har_extractor.py
deleted file mode 100644
index 7838f713..00000000
--- a/test/mitmproxy/test_har_extractor.py
+++ /dev/null
@@ -1,37 +0,0 @@
-import json
-import netlib.tutils
-from . import tutils
-
-from examples import har_extractor
-
-
-class Context(object):
- pass
-
-
-trequest = netlib.tutils.treq(
- timestamp_start=746203272,
- timestamp_end=746203272,
-)
-
-tresponse = netlib.tutils.tresp(
- timestamp_start=746203272,
- timestamp_end=746203272,
-)
-
-
-def test_start():
- tutils.raises(ValueError, har_extractor.start, Context(), [])
-
-
-def test_response():
- ctx = Context()
- ctx.HARLog = har_extractor._HARLog([])
- ctx.seen_server = set()
-
- fl = tutils.tflow(req=trequest, resp=tresponse)
- har_extractor.response(ctx, fl)
-
- with open(tutils.test_data.path("data/har_extractor.har")) as fp:
- test_data = json.load(fp)
- assert json.loads(ctx.HARLog.json()) == test_data["test_response"]
diff --git a/test/mitmproxy/tutils.py b/test/mitmproxy/tutils.py
index edcdf3e2..0d65df71 100644
--- a/test/mitmproxy/tutils.py
+++ b/test/mitmproxy/tutils.py
@@ -93,6 +93,7 @@ def tserver_conn():
c = ServerConnection.from_state(dict(
address=dict(address=("address", 22), use_ipv6=True),
source_address=dict(address=("address", 22), use_ipv6=True),
+ peer_address=None,
cert=None,
timestamp_start=1,
timestamp_tcp_setup=2,
diff --git a/test/netlib/test_socks.py b/test/netlib/test_socks.py
index d95dee41..486b975b 100644
--- a/test/netlib/test_socks.py
+++ b/test/netlib/test_socks.py
@@ -85,6 +85,45 @@ def test_server_greeting_assert_socks5():
assert False
+def test_username_password_auth():
+ raw = tutils.treader(b"\x01\x03usr\x03psd\xBE\xEF")
+ out = BytesIO()
+ auth = socks.UsernamePasswordAuth.from_file(raw)
+ auth.assert_authver1()
+ assert raw.read(2) == b"\xBE\xEF"
+ auth.to_file(out)
+
+ assert out.getvalue() == raw.getvalue()[:-2]
+ assert auth.ver == socks.USERNAME_PASSWORD_VERSION.DEFAULT
+ assert auth.username == "usr"
+ assert auth.password == "psd"
+
+
+def test_username_password_auth_assert_ver1():
+ raw = tutils.treader(b"\x02\x03usr\x03psd\xBE\xEF")
+ auth = socks.UsernamePasswordAuth.from_file(raw)
+ tutils.raises(socks.SocksError, auth.assert_authver1)
+
+
+def test_username_password_auth_response():
+ raw = tutils.treader(b"\x01\x00\xBE\xEF")
+ out = BytesIO()
+ auth = socks.UsernamePasswordAuthResponse.from_file(raw)
+ auth.assert_authver1()
+ assert raw.read(2) == b"\xBE\xEF"
+ auth.to_file(out)
+
+ assert out.getvalue() == raw.getvalue()[:-2]
+ assert auth.ver == socks.USERNAME_PASSWORD_VERSION.DEFAULT
+ assert auth.status == 0
+
+
+def test_username_password_auth_response_auth_assert_ver1():
+ raw = tutils.treader(b"\x02\x00\xBE\xEF")
+ auth = socks.UsernamePasswordAuthResponse.from_file(raw)
+ tutils.raises(socks.SocksError, auth.assert_authver1)
+
+
def test_message():
raw = tutils.treader(b"\x05\x01\x00\x03\x0bexample.com\xDE\xAD\xBE\xEF")
out = BytesIO()