aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/scripts/a.py5
-rw-r--r--test/scripts/a_helper.py4
-rw-r--r--test/scripts/unloaderr.py2
-rw-r--r--test/test_examples.py6
-rw-r--r--test/test_filt.py17
-rw-r--r--test/test_protocol_http.py4
-rw-r--r--test/test_proxy.py4
-rw-r--r--test/test_script.py224
-rw-r--r--test/test_server.py45
9 files changed, 167 insertions, 144 deletions
diff --git a/test/scripts/a.py b/test/scripts/a.py
index 210fea78..d4272ac8 100644
--- a/test/scripts/a.py
+++ b/test/scripts/a.py
@@ -1,7 +1,4 @@
-import argparse
-
-parser = argparse.ArgumentParser()
-parser.add_argument('--var', type=int)
+from a_helper import parser
var = 0
diff --git a/test/scripts/a_helper.py b/test/scripts/a_helper.py
new file mode 100644
index 00000000..2eeed0d4
--- /dev/null
+++ b/test/scripts/a_helper.py
@@ -0,0 +1,4 @@
+import argparse
+
+parser = argparse.ArgumentParser()
+parser.add_argument('--var', type=int) \ No newline at end of file
diff --git a/test/scripts/unloaderr.py b/test/scripts/unloaderr.py
new file mode 100644
index 00000000..f3743107
--- /dev/null
+++ b/test/scripts/unloaderr.py
@@ -0,0 +1,2 @@
+def done(ctx):
+ raise RuntimeError() \ No newline at end of file
diff --git a/test/test_examples.py b/test/test_examples.py
index e9bccd2e..dce257cf 100644
--- a/test/test_examples.py
+++ b/test/test_examples.py
@@ -11,7 +11,9 @@ def test_load_scripts():
tmaster = tservers.TestMaster(config.ProxyConfig())
for f in scripts:
- if "har_extractor" in f or "flowwriter" in f:
+ if "har_extractor" in f:
+ continue
+ if "flowwriter" in f:
f += " -"
if "iframe_injector" in f:
f += " foo" # one argument required
@@ -22,7 +24,7 @@ def test_load_scripts():
try:
s = script.Script(f, tmaster) # Loads the script file.
except Exception as v:
- if not "ImportError" in str(v):
+ if "ImportError" not in str(v):
raise
else:
s.unload()
diff --git a/test/test_filt.py b/test/test_filt.py
index 3ad17dfe..bcdf6e4c 100644
--- a/test/test_filt.py
+++ b/test/test_filt.py
@@ -241,6 +241,23 @@ class TestMatching:
assert self.q("~c 200", s)
assert not self.q("~c 201", s)
+ def test_src(self):
+ q = self.req()
+ assert self.q("~src address", q)
+ assert not self.q("~src foobar", q)
+ assert self.q("~src :22", q)
+ assert not self.q("~src :99", q)
+ assert self.q("~src address:22", q)
+
+ def test_dst(self):
+ q = self.req()
+ q.server_conn = tutils.tserver_conn()
+ assert self.q("~dst address", q)
+ assert not self.q("~dst foobar", q)
+ assert self.q("~dst :22", q)
+ assert not self.q("~dst :99", q)
+ assert self.q("~dst address:22", q)
+
def test_and(self):
s = self.resp()
assert self.q("~c 200 & ~h head", s)
diff --git a/test/test_protocol_http.py b/test/test_protocol_http.py
index d8489d4d..747fdc1e 100644
--- a/test/test_protocol_http.py
+++ b/test/test_protocol_http.py
@@ -327,11 +327,11 @@ class TestInvalidRequests(tservers.HTTPProxTest):
p = self.pathoc()
r = p.request("connect:'%s:%s'" % ("127.0.0.1", self.server2.port))
assert r.status_code == 400
- assert "Must not CONNECT on already encrypted connection" in r.content
+ assert "Must not CONNECT on already encrypted connection" in r.body
def test_relative_request(self):
p = self.pathoc_raw()
p.connect()
r = p.request("get:/p/200")
assert r.status_code == 400
- assert "Invalid HTTP request form" in r.content
+ assert "Invalid HTTP request form" in r.body
diff --git a/test/test_proxy.py b/test/test_proxy.py
index 77051edd..01fbe953 100644
--- a/test/test_proxy.py
+++ b/test/test_proxy.py
@@ -31,7 +31,9 @@ class TestServerConnection:
f.server_conn = sc
f.request.path = "/p/200:da"
sc.send(f.request.assemble())
- assert http.read_response(sc.rfile, f.request.method, 1000)
+
+ protocol = http.http1.HTTP1Protocol(rfile=sc.rfile)
+ assert protocol.read_response(f.request.method, 1000)
assert self.d.last_log()
sc.finish()
diff --git a/test/test_script.py b/test/test_script.py
index 0a063740..1b0e5a5b 100644
--- a/test/test_script.py
+++ b/test/test_script.py
@@ -1,120 +1,124 @@
-from libmproxy import script, flow
-import tutils
-import shlex
import os
import time
import mock
+from libmproxy import script, flow
+import tutils
+
+
+def test_simple():
+ s = flow.State()
+ fm = flow.FlowMaster(None, s)
+ sp = tutils.test_data.path("scripts/a.py")
+ p = script.Script("%s --var 40" % sp, fm)
+
+ assert "here" in p.ns
+ assert p.run("here") == 41
+ assert p.run("here") == 42
+
+ tutils.raises(script.ScriptError, p.run, "errargs")
+
+ # Check reload
+ p.load()
+ assert p.run("here") == 41
+
+
+def test_duplicate_flow():
+ s = flow.State()
+ fm = flow.FlowMaster(None, s)
+ fm.load_script(tutils.test_data.path("scripts/duplicate_flow.py"))
+ f = tutils.tflow()
+ fm.handle_request(f)
+ assert fm.state.flow_count() == 2
+ assert not fm.state.view[0].request.is_replay
+ assert fm.state.view[1].request.is_replay
+
+
+def test_err():
+ s = flow.State()
+ fm = flow.FlowMaster(None, s)
+
+ tutils.raises(
+ "not found",
+ script.Script, "nonexistent", fm
+ )
+
+ tutils.raises(
+ "not a file",
+ script.Script, tutils.test_data.path("scripts"), fm
+ )
+
+ tutils.raises(
+ script.ScriptError,
+ script.Script, tutils.test_data.path("scripts/syntaxerr.py"), fm
+ )
+
+ tutils.raises(
+ script.ScriptError,
+ script.Script, tutils.test_data.path("scripts/loaderr.py"), fm
+ )
+
+ scr = script.Script(tutils.test_data.path("scripts/unloaderr.py"), fm)
+ tutils.raises(script.ScriptError, scr.unload)
-class TestScript:
- def test_simple(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- sp = tutils.test_data.path("scripts/a.py")
- p = script.Script("%s --var 40" % sp, fm)
-
- assert "here" in p.ns
- assert p.run("here") == (True, 41)
- assert p.run("here") == (True, 42)
-
- ret = p.run("errargs")
- assert not ret[0]
- assert len(ret[1]) == 2
-
- # Check reload
- p.load()
- assert p.run("here") == (True, 41)
-
- def test_duplicate_flow(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- fm.load_script(tutils.test_data.path("scripts/duplicate_flow.py"))
- f = tutils.tflow()
- fm.handle_request(f)
- assert fm.state.flow_count() == 2
- assert not fm.state.view[0].request.is_replay
- assert fm.state.view[1].request.is_replay
-
- def test_err(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
-
- tutils.raises(
- "not found",
- script.Script, "nonexistent", fm
- )
-
- tutils.raises(
- "not a file",
- script.Script, tutils.test_data.path("scripts"), fm
- )
-
- tutils.raises(
- script.ScriptError,
- script.Script, tutils.test_data.path("scripts/syntaxerr.py"), fm
- )
-
- tutils.raises(
- script.ScriptError,
- script.Script, tutils.test_data.path("scripts/loaderr.py"), fm
- )
-
- def test_concurrent(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- fm.load_script(tutils.test_data.path("scripts/concurrent_decorator.py"))
-
- with mock.patch("libmproxy.controller.DummyReply.__call__") as m:
- f1, f2 = tutils.tflow(), tutils.tflow()
- t_start = time.time()
- fm.handle_request(f1)
- f1.reply()
- fm.handle_request(f2)
- f2.reply()
-
- # Two instantiations
- assert m.call_count == 0 # No calls yet.
- assert (time.time() - t_start) < 0.09
-
- def test_concurrent2(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- s = script.Script(
- tutils.test_data.path("scripts/concurrent_decorator.py"),
- fm)
- s.load()
- m = mock.Mock()
-
- class Dummy:
- def __init__(self):
- self.response = self
- self.error = self
- self.reply = m
+def test_concurrent():
+ s = flow.State()
+ fm = flow.FlowMaster(None, s)
+ fm.load_script(tutils.test_data.path("scripts/concurrent_decorator.py"))
+ with mock.patch("libmproxy.controller.DummyReply.__call__") as m:
+ f1, f2 = tutils.tflow(), tutils.tflow()
t_start = time.time()
+ fm.handle_request(f1)
+ f1.reply()
+ fm.handle_request(f2)
+ f2.reply()
+
+ # Two instantiations
+ assert m.call_count == 0 # No calls yet.
+ assert (time.time() - t_start) < 0.09
+
- for hook in ("clientconnect",
- "serverconnect",
- "response",
- "error",
- "clientconnect"):
- d = Dummy()
- assert s.run(hook, d)[0]
- d.reply()
- while (time.time() - t_start) < 20 and m.call_count <= 5:
- if m.call_count == 5:
- return
- time.sleep(0.001)
- assert False
-
- def test_concurrent_err(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- tutils.raises(
- "decorator not supported for this method",
- script.Script,
- tutils.test_data.path("scripts/concurrent_decorator_err.py"),
- fm)
+def test_concurrent2():
+ s = flow.State()
+ fm = flow.FlowMaster(None, s)
+ s = script.Script(
+ tutils.test_data.path("scripts/concurrent_decorator.py"),
+ fm)
+ s.load()
+ m = mock.Mock()
+
+ class Dummy:
+ def __init__(self):
+ self.response = self
+ self.error = self
+ self.reply = m
+
+ t_start = time.time()
+
+ for hook in ("clientconnect",
+ "serverconnect",
+ "response",
+ "error",
+ "clientconnect"):
+ d = Dummy()
+ s.run(hook, d)
+ d.reply()
+ while (time.time() - t_start) < 20 and m.call_count <= 5:
+ if m.call_count == 5:
+ return
+ time.sleep(0.001)
+ assert False
+
+
+def test_concurrent_err():
+ s = flow.State()
+ fm = flow.FlowMaster(None, s)
+ tutils.raises(
+ "Concurrent decorator not supported for 'start' method",
+ script.Script,
+ tutils.test_data.path("scripts/concurrent_decorator_err.py"),
+ fm)
def test_command_parsing():
@@ -122,4 +126,4 @@ def test_command_parsing():
fm = flow.FlowMaster(None, s)
absfilepath = os.path.normcase(tutils.test_data.path("scripts/a.py"))
s = script.Script(absfilepath, fm)
- assert os.path.isfile(s.argv[0])
+ assert os.path.isfile(s.args[0])
diff --git a/test/test_server.py b/test/test_server.py
index 9df4ef82..066e628a 100644
--- a/test/test_server.py
+++ b/test/test_server.py
@@ -1,15 +1,17 @@
import socket
import time
-from libmproxy.proxy.config import HostMatcher
-import libpathod
-from netlib import tcp, http_auth, http, socks
-from libpathod import pathoc, pathod
+from OpenSSL import SSL
+
+from netlib import tcp, http, socks
from netlib.certutils import SSLCert
-import tutils
-import tservers
+from netlib.http import authentication
+from libpathod import pathoc, pathod
+
+from libmproxy.proxy.config import HostMatcher
from libmproxy.protocol import KILL, Error
from libmproxy.protocol.http import CONTENT_MISSING
-from OpenSSL import SSL
+import tutils
+import tservers
"""
Note that the choice of response code in these tests matters more than you
@@ -295,8 +297,8 @@ class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin):
class TestHTTPAuth(tservers.HTTPProxTest):
- authenticator = http_auth.BasicProxyAuth(
- http_auth.PassManSingleUser(
+ authenticator = http.authentication.BasicProxyAuth(
+ http.authentication.PassManSingleUser(
"test",
"test"),
"realm")
@@ -310,8 +312,8 @@ class TestHTTPAuth(tservers.HTTPProxTest):
h'%s'='%s'
""" % (
self.server.port,
- http_auth.BasicProxyAuth.AUTH_HEADER,
- http.assemble_http_basic_auth("basic", "test", "test")
+ http.authentication.BasicProxyAuth.AUTH_HEADER,
+ authentication.assemble_http_basic_auth("basic", "test", "test")
))
assert ret.status_code == 202
@@ -526,7 +528,7 @@ class TestHttps2Http(tservers.ReverseProxTest):
"""
Returns a connected Pathoc instance.
"""
- p = libpathod.pathoc.Pathoc(
+ p = pathoc.Pathoc(
("localhost", self.proxy.port), ssl=ssl, sni=sni, fp=None
)
p.connect()
@@ -765,22 +767,15 @@ class TestStreamRequest(tservers.HTTPProxTest):
(self.server.urlbase, spec))
connection.send("\r\n")
- httpversion, code, msg, headers, content = http.read_response(
- fconn, "GET", None, include_body=False)
+ protocol = http.http1.HTTP1Protocol(rfile=fconn)
+ resp = protocol.read_response("GET", None, include_body=False)
- assert headers["Transfer-Encoding"][0] == 'chunked'
- assert code == 200
+ assert resp.headers["Transfer-Encoding"][0] == 'chunked'
+ assert resp.status_code == 200
chunks = list(
- content for _,
- content,
- _ in http.read_http_body_chunked(
- fconn,
- headers,
- None,
- "GET",
- 200,
- False))
+ content for _, content, _ in protocol.read_http_body_chunked(
+ resp.headers, None, "GET", 200, False))
assert chunks == ["this", "isatest", ""]
connection.close()