diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/scripts/a.py | 5 | ||||
-rw-r--r-- | test/scripts/a_helper.py | 4 | ||||
-rw-r--r-- | test/scripts/unloaderr.py | 2 | ||||
-rw-r--r-- | test/test_examples.py | 6 | ||||
-rw-r--r-- | test/test_filt.py | 17 | ||||
-rw-r--r-- | test/test_protocol_http.py | 4 | ||||
-rw-r--r-- | test/test_proxy.py | 4 | ||||
-rw-r--r-- | test/test_script.py | 224 | ||||
-rw-r--r-- | test/test_server.py | 45 |
9 files changed, 167 insertions, 144 deletions
diff --git a/test/scripts/a.py b/test/scripts/a.py index 210fea78..d4272ac8 100644 --- a/test/scripts/a.py +++ b/test/scripts/a.py @@ -1,7 +1,4 @@ -import argparse - -parser = argparse.ArgumentParser() -parser.add_argument('--var', type=int) +from a_helper import parser var = 0 diff --git a/test/scripts/a_helper.py b/test/scripts/a_helper.py new file mode 100644 index 00000000..2eeed0d4 --- /dev/null +++ b/test/scripts/a_helper.py @@ -0,0 +1,4 @@ +import argparse + +parser = argparse.ArgumentParser() +parser.add_argument('--var', type=int)
\ No newline at end of file diff --git a/test/scripts/unloaderr.py b/test/scripts/unloaderr.py new file mode 100644 index 00000000..f3743107 --- /dev/null +++ b/test/scripts/unloaderr.py @@ -0,0 +1,2 @@ +def done(ctx): + raise RuntimeError()
\ No newline at end of file diff --git a/test/test_examples.py b/test/test_examples.py index e9bccd2e..dce257cf 100644 --- a/test/test_examples.py +++ b/test/test_examples.py @@ -11,7 +11,9 @@ def test_load_scripts(): tmaster = tservers.TestMaster(config.ProxyConfig()) for f in scripts: - if "har_extractor" in f or "flowwriter" in f: + if "har_extractor" in f: + continue + if "flowwriter" in f: f += " -" if "iframe_injector" in f: f += " foo" # one argument required @@ -22,7 +24,7 @@ def test_load_scripts(): try: s = script.Script(f, tmaster) # Loads the script file. except Exception as v: - if not "ImportError" in str(v): + if "ImportError" not in str(v): raise else: s.unload() diff --git a/test/test_filt.py b/test/test_filt.py index 3ad17dfe..bcdf6e4c 100644 --- a/test/test_filt.py +++ b/test/test_filt.py @@ -241,6 +241,23 @@ class TestMatching: assert self.q("~c 200", s) assert not self.q("~c 201", s) + def test_src(self): + q = self.req() + assert self.q("~src address", q) + assert not self.q("~src foobar", q) + assert self.q("~src :22", q) + assert not self.q("~src :99", q) + assert self.q("~src address:22", q) + + def test_dst(self): + q = self.req() + q.server_conn = tutils.tserver_conn() + assert self.q("~dst address", q) + assert not self.q("~dst foobar", q) + assert self.q("~dst :22", q) + assert not self.q("~dst :99", q) + assert self.q("~dst address:22", q) + def test_and(self): s = self.resp() assert self.q("~c 200 & ~h head", s) diff --git a/test/test_protocol_http.py b/test/test_protocol_http.py index d8489d4d..747fdc1e 100644 --- a/test/test_protocol_http.py +++ b/test/test_protocol_http.py @@ -327,11 +327,11 @@ class TestInvalidRequests(tservers.HTTPProxTest): p = self.pathoc() r = p.request("connect:'%s:%s'" % ("127.0.0.1", self.server2.port)) assert r.status_code == 400 - assert "Must not CONNECT on already encrypted connection" in r.content + assert "Must not CONNECT on already encrypted connection" in r.body def test_relative_request(self): p = self.pathoc_raw() p.connect() r = p.request("get:/p/200") assert r.status_code == 400 - assert "Invalid HTTP request form" in r.content + assert "Invalid HTTP request form" in r.body diff --git a/test/test_proxy.py b/test/test_proxy.py index 77051edd..01fbe953 100644 --- a/test/test_proxy.py +++ b/test/test_proxy.py @@ -31,7 +31,9 @@ class TestServerConnection: f.server_conn = sc f.request.path = "/p/200:da" sc.send(f.request.assemble()) - assert http.read_response(sc.rfile, f.request.method, 1000) + + protocol = http.http1.HTTP1Protocol(rfile=sc.rfile) + assert protocol.read_response(f.request.method, 1000) assert self.d.last_log() sc.finish() diff --git a/test/test_script.py b/test/test_script.py index 0a063740..1b0e5a5b 100644 --- a/test/test_script.py +++ b/test/test_script.py @@ -1,120 +1,124 @@ -from libmproxy import script, flow -import tutils -import shlex import os import time import mock +from libmproxy import script, flow +import tutils + + +def test_simple(): + s = flow.State() + fm = flow.FlowMaster(None, s) + sp = tutils.test_data.path("scripts/a.py") + p = script.Script("%s --var 40" % sp, fm) + + assert "here" in p.ns + assert p.run("here") == 41 + assert p.run("here") == 42 + + tutils.raises(script.ScriptError, p.run, "errargs") + + # Check reload + p.load() + assert p.run("here") == 41 + + +def test_duplicate_flow(): + s = flow.State() + fm = flow.FlowMaster(None, s) + fm.load_script(tutils.test_data.path("scripts/duplicate_flow.py")) + f = tutils.tflow() + fm.handle_request(f) + assert fm.state.flow_count() == 2 + assert not fm.state.view[0].request.is_replay + assert fm.state.view[1].request.is_replay + + +def test_err(): + s = flow.State() + fm = flow.FlowMaster(None, s) + + tutils.raises( + "not found", + script.Script, "nonexistent", fm + ) + + tutils.raises( + "not a file", + script.Script, tutils.test_data.path("scripts"), fm + ) + + tutils.raises( + script.ScriptError, + script.Script, tutils.test_data.path("scripts/syntaxerr.py"), fm + ) + + tutils.raises( + script.ScriptError, + script.Script, tutils.test_data.path("scripts/loaderr.py"), fm + ) + + scr = script.Script(tutils.test_data.path("scripts/unloaderr.py"), fm) + tutils.raises(script.ScriptError, scr.unload) -class TestScript: - def test_simple(self): - s = flow.State() - fm = flow.FlowMaster(None, s) - sp = tutils.test_data.path("scripts/a.py") - p = script.Script("%s --var 40" % sp, fm) - - assert "here" in p.ns - assert p.run("here") == (True, 41) - assert p.run("here") == (True, 42) - - ret = p.run("errargs") - assert not ret[0] - assert len(ret[1]) == 2 - - # Check reload - p.load() - assert p.run("here") == (True, 41) - - def test_duplicate_flow(self): - s = flow.State() - fm = flow.FlowMaster(None, s) - fm.load_script(tutils.test_data.path("scripts/duplicate_flow.py")) - f = tutils.tflow() - fm.handle_request(f) - assert fm.state.flow_count() == 2 - assert not fm.state.view[0].request.is_replay - assert fm.state.view[1].request.is_replay - - def test_err(self): - s = flow.State() - fm = flow.FlowMaster(None, s) - - tutils.raises( - "not found", - script.Script, "nonexistent", fm - ) - - tutils.raises( - "not a file", - script.Script, tutils.test_data.path("scripts"), fm - ) - - tutils.raises( - script.ScriptError, - script.Script, tutils.test_data.path("scripts/syntaxerr.py"), fm - ) - - tutils.raises( - script.ScriptError, - script.Script, tutils.test_data.path("scripts/loaderr.py"), fm - ) - - def test_concurrent(self): - s = flow.State() - fm = flow.FlowMaster(None, s) - fm.load_script(tutils.test_data.path("scripts/concurrent_decorator.py")) - - with mock.patch("libmproxy.controller.DummyReply.__call__") as m: - f1, f2 = tutils.tflow(), tutils.tflow() - t_start = time.time() - fm.handle_request(f1) - f1.reply() - fm.handle_request(f2) - f2.reply() - - # Two instantiations - assert m.call_count == 0 # No calls yet. - assert (time.time() - t_start) < 0.09 - - def test_concurrent2(self): - s = flow.State() - fm = flow.FlowMaster(None, s) - s = script.Script( - tutils.test_data.path("scripts/concurrent_decorator.py"), - fm) - s.load() - m = mock.Mock() - - class Dummy: - def __init__(self): - self.response = self - self.error = self - self.reply = m +def test_concurrent(): + s = flow.State() + fm = flow.FlowMaster(None, s) + fm.load_script(tutils.test_data.path("scripts/concurrent_decorator.py")) + with mock.patch("libmproxy.controller.DummyReply.__call__") as m: + f1, f2 = tutils.tflow(), tutils.tflow() t_start = time.time() + fm.handle_request(f1) + f1.reply() + fm.handle_request(f2) + f2.reply() + + # Two instantiations + assert m.call_count == 0 # No calls yet. + assert (time.time() - t_start) < 0.09 + - for hook in ("clientconnect", - "serverconnect", - "response", - "error", - "clientconnect"): - d = Dummy() - assert s.run(hook, d)[0] - d.reply() - while (time.time() - t_start) < 20 and m.call_count <= 5: - if m.call_count == 5: - return - time.sleep(0.001) - assert False - - def test_concurrent_err(self): - s = flow.State() - fm = flow.FlowMaster(None, s) - tutils.raises( - "decorator not supported for this method", - script.Script, - tutils.test_data.path("scripts/concurrent_decorator_err.py"), - fm) +def test_concurrent2(): + s = flow.State() + fm = flow.FlowMaster(None, s) + s = script.Script( + tutils.test_data.path("scripts/concurrent_decorator.py"), + fm) + s.load() + m = mock.Mock() + + class Dummy: + def __init__(self): + self.response = self + self.error = self + self.reply = m + + t_start = time.time() + + for hook in ("clientconnect", + "serverconnect", + "response", + "error", + "clientconnect"): + d = Dummy() + s.run(hook, d) + d.reply() + while (time.time() - t_start) < 20 and m.call_count <= 5: + if m.call_count == 5: + return + time.sleep(0.001) + assert False + + +def test_concurrent_err(): + s = flow.State() + fm = flow.FlowMaster(None, s) + tutils.raises( + "Concurrent decorator not supported for 'start' method", + script.Script, + tutils.test_data.path("scripts/concurrent_decorator_err.py"), + fm) def test_command_parsing(): @@ -122,4 +126,4 @@ def test_command_parsing(): fm = flow.FlowMaster(None, s) absfilepath = os.path.normcase(tutils.test_data.path("scripts/a.py")) s = script.Script(absfilepath, fm) - assert os.path.isfile(s.argv[0]) + assert os.path.isfile(s.args[0]) diff --git a/test/test_server.py b/test/test_server.py index 9df4ef82..066e628a 100644 --- a/test/test_server.py +++ b/test/test_server.py @@ -1,15 +1,17 @@ import socket import time -from libmproxy.proxy.config import HostMatcher -import libpathod -from netlib import tcp, http_auth, http, socks -from libpathod import pathoc, pathod +from OpenSSL import SSL + +from netlib import tcp, http, socks from netlib.certutils import SSLCert -import tutils -import tservers +from netlib.http import authentication +from libpathod import pathoc, pathod + +from libmproxy.proxy.config import HostMatcher from libmproxy.protocol import KILL, Error from libmproxy.protocol.http import CONTENT_MISSING -from OpenSSL import SSL +import tutils +import tservers """ Note that the choice of response code in these tests matters more than you @@ -295,8 +297,8 @@ class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin): class TestHTTPAuth(tservers.HTTPProxTest): - authenticator = http_auth.BasicProxyAuth( - http_auth.PassManSingleUser( + authenticator = http.authentication.BasicProxyAuth( + http.authentication.PassManSingleUser( "test", "test"), "realm") @@ -310,8 +312,8 @@ class TestHTTPAuth(tservers.HTTPProxTest): h'%s'='%s' """ % ( self.server.port, - http_auth.BasicProxyAuth.AUTH_HEADER, - http.assemble_http_basic_auth("basic", "test", "test") + http.authentication.BasicProxyAuth.AUTH_HEADER, + authentication.assemble_http_basic_auth("basic", "test", "test") )) assert ret.status_code == 202 @@ -526,7 +528,7 @@ class TestHttps2Http(tservers.ReverseProxTest): """ Returns a connected Pathoc instance. """ - p = libpathod.pathoc.Pathoc( + p = pathoc.Pathoc( ("localhost", self.proxy.port), ssl=ssl, sni=sni, fp=None ) p.connect() @@ -765,22 +767,15 @@ class TestStreamRequest(tservers.HTTPProxTest): (self.server.urlbase, spec)) connection.send("\r\n") - httpversion, code, msg, headers, content = http.read_response( - fconn, "GET", None, include_body=False) + protocol = http.http1.HTTP1Protocol(rfile=fconn) + resp = protocol.read_response("GET", None, include_body=False) - assert headers["Transfer-Encoding"][0] == 'chunked' - assert code == 200 + assert resp.headers["Transfer-Encoding"][0] == 'chunked' + assert resp.status_code == 200 chunks = list( - content for _, - content, - _ in http.read_http_body_chunked( - fconn, - headers, - None, - "GET", - 200, - False)) + content for _, content, _ in protocol.read_http_body_chunked( + resp.headers, None, "GET", 200, False)) assert chunks == ["this", "isatest", ""] connection.close() |