aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@nullcube.com>2012-06-09 13:42:43 +1200
committerAldo Cortesi <aldo@nullcube.com>2012-06-09 13:42:43 +1200
commitb7b357528c3d3867f1a29400de3a11f2d976e60d (patch)
tree2312713e8c01d8532e2f5f12b31e97a3688eba15
parenta63240a8483c0bbc52218380c0c70da46a29f75b (diff)
downloadmitmproxy-b7b357528c3d3867f1a29400de3a11f2d976e60d.tar.gz
mitmproxy-b7b357528c3d3867f1a29400de3a11f2d976e60d.tar.bz2
mitmproxy-b7b357528c3d3867f1a29400de3a11f2d976e60d.zip
Port mitmproxy test suite entirely to nose.
-rw-r--r--.coveragerc2
-rw-r--r--.gitignore1
-rw-r--r--libmproxy/flow.py8
-rw-r--r--libmproxy/wsgi.py6
-rw-r--r--test/test_certutils.py74
-rw-r--r--test/test_cmdline.py178
-rw-r--r--test/test_console.py55
-rw-r--r--test/test_console_contentview.py24
-rw-r--r--test/test_console_help.py8
-rw-r--r--test/test_dump.py87
-rw-r--r--test/test_encoding.py35
-rw-r--r--test/test_filt.py13
-rw-r--r--test/test_flow.py196
-rw-r--r--test/test_proxy.py162
-rw-r--r--test/test_script.py27
-rw-r--r--test/test_wsgi.py11
-rw-r--r--test/tutils.py21
17 files changed, 408 insertions, 500 deletions
diff --git a/.coveragerc b/.coveragerc
new file mode 100644
index 00000000..7ba22a38
--- /dev/null
+++ b/.coveragerc
@@ -0,0 +1,2 @@
+[report]
+omit = *contrib*
diff --git a/.gitignore b/.gitignore
index dceac379..78b1cdb5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,3 +10,4 @@ mitmproxyc
mitmdumpc
mitmplaybackc
mitmrecordc
+.coverage
diff --git a/libmproxy/flow.py b/libmproxy/flow.py
index 6b3d868e..bb9d34f8 100644
--- a/libmproxy/flow.py
+++ b/libmproxy/flow.py
@@ -903,9 +903,7 @@ class ClientPlaybackState:
n.request.client_conn = None
self.current = master.handle_request(n.request)
if not testing and not self.current.response:
- #begin nocover
- master.replay_request(self.current)
- #end nocover
+ master.replay_request(self.current) # pragma: no cover
elif self.current.response:
master.handle_response(self.current.response)
@@ -1498,7 +1496,6 @@ class FlowMaster(controller.Master):
return "Can't replay while intercepting..."
if f.request.content == CONTENT_MISSING:
return "Can't replay request with missing content..."
- #begin nocover
if f.request:
f.request._set_replay()
if f.request.content:
@@ -1511,8 +1508,7 @@ class FlowMaster(controller.Master):
f,
self.masterq,
)
- rt.start()
- #end nocover
+ rt.start() # pragma: no cover
def run_script_hook(self, name, *args, **kwargs):
if self.script and not self.pause_scripts:
diff --git a/libmproxy/wsgi.py b/libmproxy/wsgi.py
index ce0367fb..b4555312 100644
--- a/libmproxy/wsgi.py
+++ b/libmproxy/wsgi.py
@@ -118,10 +118,8 @@ class WSGIAdaptor:
try:
s = traceback.format_exc()
self.error_page(soc, state["headers_sent"], s)
- # begin nocover
- except Exception, v:
- pass
- # end nocover
+ except Exception, v: # pragma: no cover
+ pass # pragma: no cover
return errs.getvalue()
diff --git a/test/test_certutils.py b/test/test_certutils.py
index 9b89599a..1fd6cbb2 100644
--- a/test/test_certutils.py
+++ b/test/test_certutils.py
@@ -1,11 +1,10 @@
import os
-import libpry
from libmproxy import certutils
+import tutils
-class udummy_ca(libpry.AutoTree):
- def test_all(self):
- d = self.tmpdir()
+def test_dummy_ca():
+ with tutils.tmpdir() as d:
path = os.path.join(d, "foo/cert.cnf")
assert certutils.dummy_ca(path)
assert os.path.exists(path)
@@ -17,45 +16,45 @@ class udummy_ca(libpry.AutoTree):
assert os.path.exists(os.path.join(d, "foo/cert2-cert.p12"))
-class udummy_cert(libpry.AutoTree):
+class TestDummyCert:
def test_with_ca(self):
- d = self.tmpdir()
- cacert = os.path.join(d, "foo/cert.cnf")
- assert certutils.dummy_ca(cacert)
- p = certutils.dummy_cert(
- os.path.join(d, "foo"),
- cacert,
- "foo.com",
- ["one.com", "two.com", "*.three.com"]
- )
- assert os.path.exists(p)
+ with tutils.tmpdir() as d:
+ cacert = os.path.join(d, "foo/cert.cnf")
+ assert certutils.dummy_ca(cacert)
+ p = certutils.dummy_cert(
+ os.path.join(d, "foo"),
+ cacert,
+ "foo.com",
+ ["one.com", "two.com", "*.three.com"]
+ )
+ assert os.path.exists(p)
- # Short-circuit
- assert certutils.dummy_cert(
- os.path.join(d, "foo"),
- cacert,
- "foo.com",
- []
- )
+ # Short-circuit
+ assert certutils.dummy_cert(
+ os.path.join(d, "foo"),
+ cacert,
+ "foo.com",
+ []
+ )
def test_no_ca(self):
- d = self.tmpdir()
- p = certutils.dummy_cert(
- d,
- None,
- "foo.com",
- []
- )
- assert os.path.exists(p)
+ with tutils.tmpdir() as d:
+ p = certutils.dummy_cert(
+ d,
+ None,
+ "foo.com",
+ []
+ )
+ assert os.path.exists(p)
-class uSSLCert(libpry.AutoTree):
+class TestSSLCert:
def test_simple(self):
- c = certutils.SSLCert(file("data/text_cert", "r").read())
+ c = certutils.SSLCert(file(tutils.test_data.path("data/text_cert"), "r").read())
assert c.cn == "google.com"
assert len(c.altnames) == 436
- c = certutils.SSLCert(file("data/text_cert_2", "r").read())
+ c = certutils.SSLCert(file(tutils.test_data.path("data/text_cert_2"), "r").read())
assert c.cn == "www.inode.co.nz"
assert len(c.altnames) == 2
assert c.digest("sha1")
@@ -68,13 +67,6 @@ class uSSLCert(libpry.AutoTree):
c.has_expired
def test_der(self):
- d = file("data/dercert").read()
+ d = file(tutils.test_data.path("data/dercert")).read()
s = certutils.SSLCert.from_der(d)
assert s.cn
-
-
-tests = [
- udummy_ca(),
- udummy_cert(),
- uSSLCert(),
-]
diff --git a/test/test_cmdline.py b/test/test_cmdline.py
index 03f230a6..45c0b3e6 100644
--- a/test/test_cmdline.py
+++ b/test/test_cmdline.py
@@ -1,95 +1,89 @@
import optparse
-import libpry
from libmproxy import cmdline
-
-
-class uAll(libpry.AutoTree):
- def test_parse_replace_hook(self):
- x = cmdline.parse_replace_hook("/foo/bar/voing")
- assert x == ("foo", "bar", "voing")
-
- x = cmdline.parse_replace_hook("/foo/bar/vo/ing/")
- assert x == ("foo", "bar", "vo/ing/")
-
- x = cmdline.parse_replace_hook("/bar/voing")
- assert x == (".*", "bar", "voing")
-
- libpry.raises(
- cmdline.ParseReplaceException,
- cmdline.parse_replace_hook,
- "/foo"
- )
- libpry.raises(
- "replacement regex",
- cmdline.parse_replace_hook,
- "patt/[/rep"
- )
- libpry.raises(
- "filter pattern",
- cmdline.parse_replace_hook,
- "/~/foo/rep"
- )
- libpry.raises(
- "empty replacement regex",
- cmdline.parse_replace_hook,
- "//"
- )
-
- def test_common(self):
- parser = optparse.OptionParser()
- cmdline.common_options(parser)
- opts, args = parser.parse_args(args=[])
-
- assert cmdline.get_common_options(opts)
-
- opts.stickycookie_all = True
- opts.stickyauth_all = True
- v = cmdline.get_common_options(opts)
- assert v["stickycookie"] == ".*"
- assert v["stickyauth"] == ".*"
-
- opts.stickycookie_all = False
- opts.stickyauth_all = False
- opts.stickycookie_filt = "foo"
- opts.stickyauth_filt = "foo"
- v = cmdline.get_common_options(opts)
- assert v["stickycookie"] == "foo"
- assert v["stickyauth"] == "foo"
-
- opts.replace = ["/foo/bar/voing"]
- v = cmdline.get_common_options(opts)
- assert v["replacements"] == [("foo", "bar", "voing")]
-
- opts.replace = ["//"]
- libpry.raises(
- "empty replacement regex",
- cmdline.get_common_options,
- opts
- )
-
- opts.replace = []
- opts.replace_file = [("/foo/bar/nonexistent")]
- libpry.raises(
- "could not read replace file",
- cmdline.get_common_options,
- opts
- )
-
- opts.replace_file = [("/~/bar/nonexistent")]
- libpry.raises(
- "filter pattern",
- cmdline.get_common_options,
- opts
- )
-
- opts.replace_file = [("/foo/bar/./data/replace")]
- v = cmdline.get_common_options(opts)["replacements"]
- assert len(v) == 1
- assert v[0][2].strip() == "replacecontents"
-
-
-
-tests = [
- uAll()
-]
+import tutils
+
+
+def test_parse_replace_hook():
+ x = cmdline.parse_replace_hook("/foo/bar/voing")
+ assert x == ("foo", "bar", "voing")
+
+ x = cmdline.parse_replace_hook("/foo/bar/vo/ing/")
+ assert x == ("foo", "bar", "vo/ing/")
+
+ x = cmdline.parse_replace_hook("/bar/voing")
+ assert x == (".*", "bar", "voing")
+
+ tutils.raises(
+ cmdline.ParseReplaceException,
+ cmdline.parse_replace_hook,
+ "/foo"
+ )
+ tutils.raises(
+ "replacement regex",
+ cmdline.parse_replace_hook,
+ "patt/[/rep"
+ )
+ tutils.raises(
+ "filter pattern",
+ cmdline.parse_replace_hook,
+ "/~/foo/rep"
+ )
+ tutils.raises(
+ "empty replacement regex",
+ cmdline.parse_replace_hook,
+ "//"
+ )
+
+def test_common():
+ parser = optparse.OptionParser()
+ cmdline.common_options(parser)
+ opts, args = parser.parse_args(args=[])
+
+ assert cmdline.get_common_options(opts)
+
+ opts.stickycookie_all = True
+ opts.stickyauth_all = True
+ v = cmdline.get_common_options(opts)
+ assert v["stickycookie"] == ".*"
+ assert v["stickyauth"] == ".*"
+
+ opts.stickycookie_all = False
+ opts.stickyauth_all = False
+ opts.stickycookie_filt = "foo"
+ opts.stickyauth_filt = "foo"
+ v = cmdline.get_common_options(opts)
+ assert v["stickycookie"] == "foo"
+ assert v["stickyauth"] == "foo"
+
+ opts.replace = ["/foo/bar/voing"]
+ v = cmdline.get_common_options(opts)
+ assert v["replacements"] == [("foo", "bar", "voing")]
+
+ opts.replace = ["//"]
+ tutils.raises(
+ "empty replacement regex",
+ cmdline.get_common_options,
+ opts
+ )
+
+ opts.replace = []
+ opts.replace_file = [("/foo/bar/nonexistent")]
+ tutils.raises(
+ "could not read replace file",
+ cmdline.get_common_options,
+ opts
+ )
+
+ opts.replace_file = [("/~/bar/nonexistent")]
+ tutils.raises(
+ "filter pattern",
+ cmdline.get_common_options,
+ opts
+ )
+
+ p = tutils.test_data.path("data/replace")
+ opts.replace_file = [("/foo/bar/%s"%p)]
+ v = cmdline.get_common_options(opts)["replacements"]
+ assert len(v) == 1
+ assert v[0][2].strip() == "replacecontents"
diff --git a/test/test_console.py b/test/test_console.py
index a570f3e5..498a93bd 100644
--- a/test/test_console.py
+++ b/test/test_console.py
@@ -1,10 +1,9 @@
+import os
from libmproxy import console
from libmproxy.console import common
import tutils
-import libpry
-
-class uConsoleState(libpry.AutoTree):
+class TestConsoleState:
def test_flow(self):
"""
normal flow:
@@ -89,32 +88,34 @@ class uConsoleState(libpry.AutoTree):
assert len(c.flowsettings) == 0
-class uformat_keyvals(libpry.AutoTree):
- def test_simple(self):
- assert common.format_keyvals(
- [
- ("aa", "bb"),
- None,
- ("cc", "dd"),
- (None, "dd"),
- (None, "dd"),
- ]
- )
+def test_format_keyvals():
+ assert common.format_keyvals(
+ [
+ ("aa", "bb"),
+ None,
+ ("cc", "dd"),
+ (None, "dd"),
+ (None, "dd"),
+ ]
+ )
-class uPathCompleter(libpry.AutoTree):
+class TestPathCompleter:
def test_lookup_construction(self):
c = console._PathCompleter()
assert c.complete("/tm") == "/tmp/"
c.reset()
- assert c.complete("./completion/a") == "./completion/aaa"
- assert c.complete("./completion/a") == "./completion/aab"
+ cd = tutils.test_data.path("completion")
+ ca = os.path.join(cd, "a")
+ assert c.complete(ca).endswith("/completion/aaa")
+ assert c.complete(ca).endswith("/completion/aab")
c.reset()
- assert c.complete("./completion/aaa") == "./completion/aaa"
- assert c.complete("./completion/aaa") == "./completion/aaa"
+ ca = os.path.join(cd, "aaa")
+ assert c.complete(ca).endswith("/completion/aaa")
+ assert c.complete(ca).endswith("/completion/aaa")
c.reset()
- assert c.complete("./completion") == "./completion/aaa"
+ assert c.complete(cd).endswith("/completion/aaa")
def test_completion(self):
c = console._PathCompleter(True)
@@ -144,15 +145,5 @@ class uPathCompleter(libpry.AutoTree):
assert c.final == s
-class uOptions(libpry.AutoTree):
- def test_all(self):
- assert console.Options(kill=True)
-
-
-
-tests = [
- uformat_keyvals(),
- uConsoleState(),
- uPathCompleter(),
- uOptions()
-]
+def test_options():
+ assert console.Options(kill=True)
diff --git a/test/test_console_contentview.py b/test/test_console_contentview.py
index bd91d2eb..fbb7e6d2 100644
--- a/test/test_console_contentview.py
+++ b/test/test_console_contentview.py
@@ -1,9 +1,9 @@
import sys
-import libpry
import libmproxy.console.contentview as cv
from libmproxy import utils, flow, encoding
+import tutils
-class uContentView(libpry.AutoTree):
+class TestContentView:
def test_trailer(self):
txt = []
cv.trailer(5, txt, 1000)
@@ -97,10 +97,18 @@ class uContentView(libpry.AutoTree):
assert cv.view_hex([], "foo", 1000)
def test_view_image(self):
- assert cv.view_image([], file("data/image.png").read(), sys.maxint)
- assert cv.view_image([], file("data/image.gif").read(), sys.maxint)
- assert cv.view_image([], file("data/image-err1.jpg").read(), sys.maxint)
- assert cv.view_image([], file("data/image.ico").read(), sys.maxint)
+ p = tutils.test_data.path("data/image.png")
+ assert cv.view_image([], file(p).read(), sys.maxint)
+
+ p = tutils.test_data.path("data/image.gif")
+ assert cv.view_image([], file(p).read(), sys.maxint)
+
+ p = tutils.test_data.path("data/image-err1.jpg")
+ assert cv.view_image([], file(p).read(), sys.maxint)
+
+ p = tutils.test_data.path("data/image.ico")
+ assert cv.view_image([], file(p).read(), sys.maxint)
+
assert not cv.view_image([], "flibble", sys.maxint)
def test_view_multipart(self):
@@ -178,7 +186,3 @@ Larry
assert "decoded gzip" in r[0]
assert "Raw" in r[0]
-
-tests = [
- uContentView()
-]
diff --git a/test/test_console_help.py b/test/test_console_help.py
index e747635f..c9ef7228 100644
--- a/test/test_console_help.py
+++ b/test/test_console_help.py
@@ -1,5 +1,4 @@
import sys
-import libpry
import libmproxy.console.help as help
from libmproxy import utils, flow, encoding
@@ -8,7 +7,7 @@ class DummyMaster:
pass
-class uHelp(libpry.AutoTree):
+class TestHelp:
def test_helptext(self):
h = help.HelpView(None, "foo", None)
assert h.helptext()
@@ -18,8 +17,3 @@ class uHelp(libpry.AutoTree):
assert not h.keypress((0, 0), "q")
assert not h.keypress((0, 0), "?")
assert h.keypress((0, 0), "o") == "o"
-
-
-tests = [
- uHelp()
-]
diff --git a/test/test_dump.py b/test/test_dump.py
index b2b5d8cd..621e76e7 100644
--- a/test/test_dump.py
+++ b/test/test_dump.py
@@ -4,20 +4,19 @@ import libpry
from libmproxy import dump, flow
import tutils
-class uStrFuncs(libpry.AutoTree):
- def test_all(self):
- t = tutils.tresp()
- t._set_replay()
- dump.str_response(t)
+def test_strfuncs():
+ t = tutils.tresp()
+ t._set_replay()
+ dump.str_response(t)
- t = tutils.treq()
- t.client_conn = None
- t.stickycookie = True
- assert "stickycookie" in dump.str_request(t)
- assert "replay" in dump.str_request(t)
+ t = tutils.treq()
+ t.client_conn = None
+ t.stickycookie = True
+ assert "stickycookie" in dump.str_request(t)
+ assert "replay" in dump.str_request(t)
-class uDumpMaster(libpry.AutoTree):
+class TestDumpMaster:
def _cycle(self, m, content):
req = tutils.treq()
req.content = content
@@ -53,38 +52,38 @@ class uDumpMaster(libpry.AutoTree):
o = dump.Options(server_replay="nonexistent", kill=True)
libpry.raises(dump.DumpError, dump.DumpMaster, None, o, None, outfile=cs)
- t = self.tmpdir()
- p = os.path.join(t, "rep")
- self._flowfile(p)
+ with tutils.tmpdir() as t:
+ p = os.path.join(t, "rep")
+ self._flowfile(p)
- o = dump.Options(server_replay=p, kill=True)
- m = dump.DumpMaster(None, o, None, outfile=cs)
+ o = dump.Options(server_replay=p, kill=True)
+ m = dump.DumpMaster(None, o, None, outfile=cs)
- self._cycle(m, "content")
- self._cycle(m, "content")
+ self._cycle(m, "content")
+ self._cycle(m, "content")
- o = dump.Options(server_replay=p, kill=False)
- m = dump.DumpMaster(None, o, None, outfile=cs)
- self._cycle(m, "nonexistent")
+ o = dump.Options(server_replay=p, kill=False)
+ m = dump.DumpMaster(None, o, None, outfile=cs)
+ self._cycle(m, "nonexistent")
- o = dump.Options(client_replay=p, kill=False)
- m = dump.DumpMaster(None, o, None, outfile=cs)
+ o = dump.Options(client_replay=p, kill=False)
+ m = dump.DumpMaster(None, o, None, outfile=cs)
def test_read(self):
- t = self.tmpdir()
- p = os.path.join(t, "read")
- self._flowfile(p)
- assert "GET" in self._dummy_cycle(0, None, "", verbosity=1, rfile=p)
+ with tutils.tmpdir() as t:
+ p = os.path.join(t, "read")
+ self._flowfile(p)
+ assert "GET" in self._dummy_cycle(0, None, "", verbosity=1, rfile=p)
- libpry.raises(
- dump.DumpError, self._dummy_cycle,
- 0, None, "", verbosity=1, rfile="/nonexistent"
- )
+ libpry.raises(
+ dump.DumpError, self._dummy_cycle,
+ 0, None, "", verbosity=1, rfile="/nonexistent"
+ )
- libpry.raises(
- dump.DumpError, self._dummy_cycle,
- 0, None, "", verbosity=1, rfile="test_dump.py"
- )
+ libpry.raises(
+ dump.DumpError, self._dummy_cycle,
+ 0, None, "", verbosity=1, rfile="test_dump.py"
+ )
def test_options(self):
o = dump.Options(verbosity = 2)
@@ -107,10 +106,10 @@ class uDumpMaster(libpry.AutoTree):
assert "GET" in self._dummy_cycle(1, "~s", "ascii", verbosity=i)
def test_write(self):
- d = self.tmpdir()
- p = os.path.join(d, "a")
- self._dummy_cycle(1, None, "", wfile=p, verbosity=0)
- assert len(list(flow.FlowReader(open(p)).stream())) == 1
+ with tutils.tmpdir() as d:
+ p = os.path.join(d, "a")
+ self._dummy_cycle(1, None, "", wfile=p, verbosity=0)
+ assert len(list(flow.FlowReader(open(p)).stream())) == 1
def test_write_err(self):
libpry.raises(
@@ -125,7 +124,7 @@ class uDumpMaster(libpry.AutoTree):
def test_script(self):
ret = self._dummy_cycle(
1, None, "",
- script="scripts/all.py", verbosity=0, eventlog=True
+ script=tutils.test_data.path("scripts/all.py"), verbosity=0, eventlog=True
)
assert "XCLIENTCONNECT" in ret
assert "XREQUEST" in ret
@@ -146,11 +145,3 @@ class uDumpMaster(libpry.AutoTree):
def test_stickyauth(self):
self._dummy_cycle(1, None, "", stickyauth = ".*")
-
-
-
-
-tests = [
- uStrFuncs(),
- uDumpMaster()
-]
diff --git a/test/test_encoding.py b/test/test_encoding.py
index 61caf507..732447e2 100644
--- a/test/test_encoding.py
+++ b/test/test_encoding.py
@@ -1,28 +1,19 @@
from libmproxy import encoding
-import libpry
-class uidentity(libpry.AutoTree):
- def test_simple(self):
- assert "string" == encoding.decode("identity", "string")
- assert "string" == encoding.encode("identity", "string")
- assert not encoding.encode("nonexistent", "string")
+def test_identity():
+ assert "string" == encoding.decode("identity", "string")
+ assert "string" == encoding.encode("identity", "string")
+ assert not encoding.encode("nonexistent", "string")
+ assert None == encoding.decode("nonexistent encoding", "string")
- def test_fallthrough(self):
- assert None == encoding.decode("nonexistent encoding", "string")
-class ugzip(libpry.AutoTree):
- def test_simple(self):
- assert "string" == encoding.decode("gzip", encoding.encode("gzip", "string"))
- assert None == encoding.decode("gzip", "bogus")
+def test_gzip():
+ assert "string" == encoding.decode("gzip", encoding.encode("gzip", "string"))
+ assert None == encoding.decode("gzip", "bogus")
-class udeflate(libpry.AutoTree):
- def test_simple(self):
- assert "string" == encoding.decode("deflate", encoding.encode("deflate", "string"))
- assert "string" == encoding.decode("deflate", encoding.encode("deflate", "string")[2:-4])
- assert None == encoding.decode("deflate", "bogus")
-tests = [
- uidentity(),
- ugzip(),
- udeflate()
-]
+def test_deflate():
+ assert "string" == encoding.decode("deflate", encoding.encode("deflate", "string"))
+ assert "string" == encoding.decode("deflate", encoding.encode("deflate", "string")[2:-4])
+ assert None == encoding.decode("deflate", "bogus")
+
diff --git a/test/test_filt.py b/test/test_filt.py
index 8e349d7c..287ef376 100644
--- a/test/test_filt.py
+++ b/test/test_filt.py
@@ -1,9 +1,8 @@
import cStringIO
from libmproxy import filt, flow
-import libpry
-class uParsing(libpry.AutoTree):
+class TestParsing:
def _dump(self, x):
c = cStringIO.StringIO()
x.dump(fp=c)
@@ -71,7 +70,7 @@ class uParsing(libpry.AutoTree):
self._dump(a)
-class uMatching(libpry.AutoTree):
+class TestMatching:
def req(self):
conn = flow.ClientConnect(("one", 2222))
headers = flow.ODictCaseless()
@@ -235,11 +234,3 @@ class uMatching(libpry.AutoTree):
assert self.q("!~c 201 !~c 202", s)
assert not self.q("!~c 201 !~c 200", s)
-
-
-
-
-tests = [
- uMatching(),
- uParsing()
-]
diff --git a/test/test_flow.py b/test/test_flow.py
index 0b7fee4f..994264e4 100644
--- a/test/test_flow.py
+++ b/test/test_flow.py
@@ -6,7 +6,7 @@ import tutils
import libpry
-class uStickyCookieState(libpry.AutoTree):
+class TestStickyCookieState:
def _response(self, cookie, host):
s = flow.StickyCookieState(filt.parse(".*"))
f = tutils.tflow_full()
@@ -40,7 +40,7 @@ class uStickyCookieState(libpry.AutoTree):
assert "cookie" in f.request.headers
-class uStickyAuthState(libpry.AutoTree):
+class TestStickyAuthState:
def test_handle_response(self):
s = flow.StickyAuthState(filt.parse(".*"))
f = tutils.tflow_full()
@@ -53,7 +53,7 @@ class uStickyAuthState(libpry.AutoTree):
assert f.request.headers["authorization"] == ["foo"]
-class uClientPlaybackState(libpry.AutoTree):
+class TestClientPlaybackState:
def test_tick(self):
first = tutils.tflow()
s = flow.State()
@@ -85,7 +85,7 @@ class uClientPlaybackState(libpry.AutoTree):
assert not fm.client_playback
-class uServerPlaybackState(libpry.AutoTree):
+class TestServerPlaybackState:
def test_hash(self):
s = flow.ServerPlaybackState(None, [], False, False)
r = tutils.tflow()
@@ -147,7 +147,8 @@ class uServerPlaybackState(libpry.AutoTree):
n = s.next_flow(r)
assert s.count() == 2
-class uFlow(libpry.AutoTree):
+
+class TestFlow:
def test_copy(self):
f = tutils.tflow_full()
f2 = f.copy()
@@ -302,7 +303,7 @@ class uFlow(libpry.AutoTree):
-class uState(libpry.AutoTree):
+class TestState:
def test_backup(self):
c = flow.State()
req = tutils.treq()
@@ -451,7 +452,7 @@ class uState(libpry.AutoTree):
c.accept_all()
-class uSerialize(libpry.AutoTree):
+class TestSerialize:
def _treader(self):
sio = StringIO()
w = flow.FlowWriter(sio)
@@ -494,7 +495,7 @@ class uSerialize(libpry.AutoTree):
sio.write("bogus")
sio.seek(0)
r = flow.FlowReader(sio)
- libpry.raises(flow.FlowReadError, list, r.stream())
+ tutils.raises(flow.FlowReadError, list, r.stream())
f = flow.FlowReadError("foo")
assert f.strerror == "foo"
@@ -508,18 +509,18 @@ class uSerialize(libpry.AutoTree):
sio.seek(0)
r = flow.FlowReader(sio)
- libpry.raises("version", list, r.stream())
+ tutils.raises("version", list, r.stream())
-class uFlowMaster(libpry.AutoTree):
+class TestFlowMaster:
def test_load_script(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
- assert not fm.load_script("scripts/a.py")
- assert not fm.load_script("scripts/a.py")
+ assert not fm.load_script(tutils.test_data.path("scripts/a.py"))
+ assert not fm.load_script(tutils.test_data.path("scripts/a.py"))
assert not fm.load_script(None)
assert fm.load_script("nonexistent")
- assert "ValueError" in fm.load_script("scripts/starterr.py")
+ assert "ValueError" in fm.load_script(tutils.test_data.path("scripts/starterr.py"))
def test_replay(self):
s = flow.State()
@@ -534,7 +535,7 @@ class uFlowMaster(libpry.AutoTree):
def test_script_reqerr(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
- assert not fm.load_script("scripts/reqerr.py")
+ assert not fm.load_script(tutils.test_data.path("scripts/reqerr.py"))
req = tutils.treq()
fm.handle_clientconnect(req.client_conn)
assert fm.handle_request(req)
@@ -542,7 +543,7 @@ class uFlowMaster(libpry.AutoTree):
def test_script(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
- assert not fm.load_script("scripts/all.py")
+ assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
req = tutils.treq()
fm.handle_clientconnect(req.client_conn)
assert fm.script.ns["log"][-1] == "clientconnect"
@@ -680,7 +681,7 @@ class uFlowMaster(libpry.AutoTree):
fm.handle_request(f.request)
assert f.request.headers["authorization"] == ["foo"]
-class uRequest(libpry.AutoTree):
+class TestRequest:
def test_simple(self):
h = flow.ODictCaseless()
h["test"] = ["test"]
@@ -819,7 +820,7 @@ class uRequest(libpry.AutoTree):
assert r.content == "falafel"
-class uResponse(libpry.AutoTree):
+class TestResponse:
def test_simple(self):
h = flow.ODictCaseless()
h["test"] = ["test"]
@@ -869,7 +870,10 @@ class uResponse(libpry.AutoTree):
def test_get_cert(self):
req = tutils.treq()
- resp = flow.Response(req, 200, "msg", flow.ODictCaseless(), "content", file("data/dercert").read())
+ resp = flow.Response(
+ req, 200, "msg", flow.ODictCaseless(), "content",
+ file(tutils.test_data.path("data/dercert")).read()
+ )
assert resp.get_cert()
resp = tutils.tresp()
@@ -924,7 +928,7 @@ class uResponse(libpry.AutoTree):
assert r.content == "falafel"
-class uError(libpry.AutoTree):
+class TestError:
def test_getset_state(self):
e = flow.Error(None, "Error")
state = e._get_state()
@@ -947,7 +951,7 @@ class uError(libpry.AutoTree):
assert e.msg == "abarp"
-class uClientConnect(libpry.AutoTree):
+class TestClientConnect:
def test_state(self):
c = flow.ClientConnect(("a", 22))
assert flow.ClientConnect._from_state(c._get_state()) == c
@@ -963,13 +967,13 @@ class uClientConnect(libpry.AutoTree):
assert c3 == c
-class uODict(libpry.AutoTree):
+class TestODict:
def setUp(self):
self.od = flow.ODict()
def test_str_err(self):
h = flow.ODict()
- libpry.raises(ValueError, h.__setitem__, "key", "foo")
+ tutils.raises(ValueError, h.__setitem__, "key", "foo")
def test_dictToHeader1(self):
self.od.add("one", "uno")
@@ -1047,7 +1051,7 @@ class uODict(libpry.AutoTree):
assert self.od.get("two") == None
-class uODictCaseless(libpry.AutoTree):
+class TestODictCaseless:
def setUp(self):
self.od = flow.ODictCaseless()
@@ -1068,89 +1072,67 @@ class uODictCaseless(libpry.AutoTree):
assert len(self.od) == 1
-class udecoded(libpry.AutoTree):
- def test_del(self):
- r = tutils.treq()
- assert r.content == "content"
+def test_decoded():
+ r = tutils.treq()
+ assert r.content == "content"
+ assert not r.headers["content-encoding"]
+ r.encode("gzip")
+ assert r.headers["content-encoding"]
+ assert r.content != "content"
+ with flow.decoded(r):
assert not r.headers["content-encoding"]
- r.encode("gzip")
- assert r.headers["content-encoding"]
- assert r.content != "content"
- with flow.decoded(r):
- assert not r.headers["content-encoding"]
- assert r.content == "content"
- assert r.headers["content-encoding"]
- assert r.content != "content"
-
- with flow.decoded(r):
- r.content = "foo"
-
- assert r.content != "foo"
- r.decode()
- assert r.content == "foo"
-
-
-class uReplaceHooks(libpry.AutoTree):
- def test_add_remove(self):
- h = flow.ReplaceHooks()
- h.add("~q", "foo", "bar")
- assert h.lst
- h.remove("~q", "foo", "bar")
- assert not h.lst
-
- h.add("~q", "foo", "bar")
- h.add("~s", "foo", "bar")
-
- v = h.get_specs()
- assert v == [('~q', 'foo', 'bar'), ('~s', 'foo', 'bar')]
- assert h.count() == 2
- h.remove("~q", "foo", "bar")
- assert h.count() == 1
- h.remove("~q", "foo", "bar")
- assert h.count() == 1
- h.clear()
- assert h.count() == 0
-
- f = tutils.tflow()
- f.request.content = "foo"
- h.add("~s", "foo", "bar")
- h.run(f)
- assert f.request.content == "foo"
-
- f = tutils.tflow_full()
- f.request.content = "foo"
- f.response.content = "foo"
- h.run(f)
- assert f.response.content == "bar"
- assert f.request.content == "foo"
+ assert r.content == "content"
+ assert r.headers["content-encoding"]
+ assert r.content != "content"
+
+ with flow.decoded(r):
+ r.content = "foo"
+
+ assert r.content != "foo"
+ r.decode()
+ assert r.content == "foo"
+
+
+def test_replacehooks():
+ h = flow.ReplaceHooks()
+ h.add("~q", "foo", "bar")
+ assert h.lst
+ h.remove("~q", "foo", "bar")
+ assert not h.lst
+
+ h.add("~q", "foo", "bar")
+ h.add("~s", "foo", "bar")
+
+ v = h.get_specs()
+ assert v == [('~q', 'foo', 'bar'), ('~s', 'foo', 'bar')]
+ assert h.count() == 2
+ h.remove("~q", "foo", "bar")
+ assert h.count() == 1
+ h.remove("~q", "foo", "bar")
+ assert h.count() == 1
+ h.clear()
+ assert h.count() == 0
+
+ f = tutils.tflow()
+ f.request.content = "foo"
+ h.add("~s", "foo", "bar")
+ h.run(f)
+ assert f.request.content == "foo"
+
+ f = tutils.tflow_full()
+ f.request.content = "foo"
+ f.response.content = "foo"
+ h.run(f)
+ assert f.response.content == "bar"
+ assert f.request.content == "foo"
+
+ f = tutils.tflow()
+ h.clear()
+ h.add("~q", "foo", "bar")
+ f.request.content = "foo"
+ h.run(f)
+ assert f.request.content == "bar"
+
+ assert not h.add("~", "foo", "bar")
+ assert not h.add("foo", "*", "bar")
- f = tutils.tflow()
- h.clear()
- h.add("~q", "foo", "bar")
- f.request.content = "foo"
- h.run(f)
- assert f.request.content == "bar"
-
- assert not h.add("~", "foo", "bar")
- assert not h.add("foo", "*", "bar")
-
-
-
-tests = [
- uReplaceHooks(),
- uStickyCookieState(),
- uStickyAuthState(),
- uServerPlaybackState(),
- uClientPlaybackState(),
- uFlow(),
- uState(),
- uSerialize(),
- uFlowMaster(),
- uRequest(),
- uResponse(),
- uError(),
- uClientConnect(),
- uODict(),
- uODictCaseless(),
- udecoded()
-]
diff --git a/test/test_proxy.py b/test/test_proxy.py
index c67e93a0..a5dc666a 100644
--- a/test/test_proxy.py
+++ b/test/test_proxy.py
@@ -2,60 +2,57 @@ import cStringIO, textwrap
from cStringIO import StringIO
import libpry
from libmproxy import proxy, flow
+import tutils
+class Dummy: pass
-class u_read_chunked(libpry.AutoTree):
- def test_all(self):
- s = cStringIO.StringIO("1\r\na\r\n0\r\n")
- libpry.raises(IOError, proxy.read_chunked, s, None)
-
- s = cStringIO.StringIO("1\r\na\r\n0\r\n\r\n")
- assert proxy.read_chunked(s, None) == "a"
- s = cStringIO.StringIO("\r\n")
- libpry.raises(IOError, proxy.read_chunked, s, None)
+def test_read_chunked():
+ s = cStringIO.StringIO("1\r\na\r\n0\r\n")
+ tutils.raises(IOError, proxy.read_chunked, s, None)
- s = cStringIO.StringIO("1\r\nfoo")
- libpry.raises(IOError, proxy.read_chunked, s, None)
+ s = cStringIO.StringIO("1\r\na\r\n0\r\n\r\n")
+ assert proxy.read_chunked(s, None) == "a"
- s = cStringIO.StringIO("foo\r\nfoo")
- libpry.raises(proxy.ProxyError, proxy.read_chunked, s, None)
+ s = cStringIO.StringIO("\r\n")
+ tutils.raises(IOError, proxy.read_chunked, s, None)
+ s = cStringIO.StringIO("1\r\nfoo")
+ tutils.raises(IOError, proxy.read_chunked, s, None)
-class Dummy: pass
+ s = cStringIO.StringIO("foo\r\nfoo")
+ tutils.raises(proxy.ProxyError, proxy.read_chunked, s, None)
-class u_read_http_body(libpry.AutoTree):
- def test_all(self):
+def test_read_http_body():
+ d = Dummy()
+ h = flow.ODict()
+ s = cStringIO.StringIO("testing")
+ assert proxy.read_http_body(s, d, h, False, None) == ""
- d = Dummy()
- h = flow.ODict()
- s = cStringIO.StringIO("testing")
- assert proxy.read_http_body(s, d, h, False, None) == ""
+ h["content-length"] = ["foo"]
+ s = cStringIO.StringIO("testing")
+ tutils.raises(proxy.ProxyError, proxy.read_http_body, s, d, h, False, None)
- h["content-length"] = ["foo"]
- s = cStringIO.StringIO("testing")
- libpry.raises(proxy.ProxyError, proxy.read_http_body, s, d, h, False, None)
+ h["content-length"] = [5]
+ s = cStringIO.StringIO("testing")
+ assert len(proxy.read_http_body(s, d, h, False, None)) == 5
+ s = cStringIO.StringIO("testing")
+ tutils.raises(proxy.ProxyError, proxy.read_http_body, s, d, h, False, 4)
- h["content-length"] = [5]
- s = cStringIO.StringIO("testing")
- assert len(proxy.read_http_body(s, d, h, False, None)) == 5
- s = cStringIO.StringIO("testing")
- libpry.raises(proxy.ProxyError, proxy.read_http_body, s, d, h, False, 4)
+ h = flow.ODict()
+ s = cStringIO.StringIO("testing")
+ assert len(proxy.read_http_body(s, d, h, True, 4)) == 4
+ s = cStringIO.StringIO("testing")
+ assert len(proxy.read_http_body(s, d, h, True, 100)) == 7
- h = flow.ODict()
- s = cStringIO.StringIO("testing")
- assert len(proxy.read_http_body(s, d, h, True, 4)) == 4
- s = cStringIO.StringIO("testing")
- assert len(proxy.read_http_body(s, d, h, True, 100)) == 7
-
-class u_parse_request_line(libpry.AutoTree):
+class TestParseRequestLine:
def test_simple(self):
- libpry.raises(proxy.ProxyError, proxy.parse_request_line, "")
+ tutils.raises(proxy.ProxyError, proxy.parse_request_line, "")
u = "GET ... HTTP/1.1"
- libpry.raises("invalid url", proxy.parse_request_line, u)
+ tutils.raises("invalid url", proxy.parse_request_line, u)
u = "GET http://foo.com:8888/test HTTP/1.1"
m, s, h, po, pa, minor = proxy.parse_request_line(u)
@@ -77,7 +74,7 @@ class u_parse_request_line(libpry.AutoTree):
assert proxy.parse_request_line(u) == ('GET', None, None, None, '/', 1)
-class uFileLike(libpry.AutoTree):
+class TestFileLike:
def test_wrap(self):
s = cStringIO.StringIO("foobar\nfoobar")
s = proxy.FileLike(s)
@@ -88,14 +85,13 @@ class uFileLike(libpry.AutoTree):
assert s.isatty
-
-class uProxyError(libpry.AutoTree):
+class TestProxyError:
def test_simple(self):
p = proxy.ProxyError(111, "msg")
assert repr(p)
-class u_read_headers(libpry.AutoTree):
+class TestReadHeaders:
def test_read_simple(self):
data = """
Header: one
@@ -135,63 +131,45 @@ class u_read_headers(libpry.AutoTree):
assert headers["header"] == ['one\r\n two']
-class u_parse_http_protocol(libpry.AutoTree):
- def test_simple(self):
- assert proxy.parse_http_protocol("HTTP/1.1") == (1, 1)
- assert proxy.parse_http_protocol("HTTP/0.0") == (0, 0)
- assert not proxy.parse_http_protocol("foo/0.0")
-
+def test_parse_http_protocol():
+ assert proxy.parse_http_protocol("HTTP/1.1") == (1, 1)
+ assert proxy.parse_http_protocol("HTTP/0.0") == (0, 0)
+ assert not proxy.parse_http_protocol("foo/0.0")
-class u_parse_init_connect(libpry.AutoTree):
- def test_simple(self):
- assert proxy.parse_init_connect("CONNECT host.com:443 HTTP/1.0")
- assert not proxy.parse_init_connect("bogus")
- assert not proxy.parse_init_connect("GET host.com:443 HTTP/1.0")
- assert not proxy.parse_init_connect("CONNECT host.com443 HTTP/1.0")
- assert not proxy.parse_init_connect("CONNECT host.com:443 foo/1.0")
+def test_parse_init_connect():
+ assert proxy.parse_init_connect("CONNECT host.com:443 HTTP/1.0")
+ assert not proxy.parse_init_connect("bogus")
+ assert not proxy.parse_init_connect("GET host.com:443 HTTP/1.0")
+ assert not proxy.parse_init_connect("CONNECT host.com443 HTTP/1.0")
+ assert not proxy.parse_init_connect("CONNECT host.com:443 foo/1.0")
-class u_parse_init_proxy(libpry.AutoTree):
- def test_simple(self):
- u = "GET http://foo.com:8888/test HTTP/1.1"
- m, s, h, po, pa, major, minor = proxy.parse_init_proxy(u)
- assert m == "GET"
- assert s == "http"
- assert h == "foo.com"
- assert po == 8888
- assert pa == "/test"
- assert major == 1
- assert minor == 1
-
- assert not proxy.parse_init_proxy("invalid")
- assert not proxy.parse_init_proxy("GET invalid HTTP/1.1")
- assert not proxy.parse_init_proxy("GET http://foo.com:8888/test foo/1.1")
+def test_prase_init_proxy():
+ u = "GET http://foo.com:8888/test HTTP/1.1"
+ m, s, h, po, pa, major, minor = proxy.parse_init_proxy(u)
+ assert m == "GET"
+ assert s == "http"
+ assert h == "foo.com"
+ assert po == 8888
+ assert pa == "/test"
+ assert major == 1
+ assert minor == 1
-class u_parse_init_http(libpry.AutoTree):
- def test_simple(self):
- u = "GET /test HTTP/1.1"
- m, u, major, minor = proxy.parse_init_http(u)
- assert m == "GET"
- assert u == "/test"
- assert major == 1
- assert minor == 1
+ assert not proxy.parse_init_proxy("invalid")
+ assert not proxy.parse_init_proxy("GET invalid HTTP/1.1")
+ assert not proxy.parse_init_proxy("GET http://foo.com:8888/test foo/1.1")
- assert not proxy.parse_init_http("invalid")
- assert not proxy.parse_init_http("GET invalid HTTP/1.1")
- assert not proxy.parse_init_http("GET /test foo/1.1")
+def test_parse_init_http():
+ u = "GET /test HTTP/1.1"
+ m, u, major, minor = proxy.parse_init_http(u)
+ assert m == "GET"
+ assert u == "/test"
+ assert major == 1
+ assert minor == 1
+ assert not proxy.parse_init_http("invalid")
+ assert not proxy.parse_init_http("GET invalid HTTP/1.1")
+ assert not proxy.parse_init_http("GET /test foo/1.1")
-tests = [
- u_parse_http_protocol(),
- u_parse_init_connect(),
- u_parse_init_proxy(),
- u_parse_init_http(),
- uProxyError(),
- uFileLike(),
- u_parse_request_line(),
- u_read_chunked(),
- u_read_http_body(),
- u_read_headers()
-]
diff --git a/test/test_script.py b/test/test_script.py
index 94f036d9..88f32ddf 100644
--- a/test/test_script.py
+++ b/test/test_script.py
@@ -1,15 +1,14 @@
import os
from libmproxy import script, flow
-import libpry
import tutils
-class uScript(libpry.AutoTree):
+class TestScript:
def test_simple(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
ctx = flow.ScriptContext(fm)
- p = script.Script(os.path.join("scripts", "a.py"), ctx)
+ p = script.Script(tutils.test_data.path("scripts/a.py"), ctx)
p.load()
assert "here" in p.ns
assert p.run("here") == (True, 1)
@@ -26,7 +25,7 @@ class uScript(libpry.AutoTree):
def test_duplicate_flow(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
- fm.load_script(os.path.join("scripts", "duplicate_flow.py"))
+ fm.load_script(tutils.test_data.path("scripts/duplicate_flow.py"))
r = tutils.treq()
fm.handle_request(r)
assert fm.state.flow_count() == 2
@@ -40,32 +39,26 @@ class uScript(libpry.AutoTree):
s = script.Script("nonexistent", ctx)
- libpry.raises(
+ tutils.raises(
"no such file",
s.load
)
- s = script.Script("scripts", ctx)
- libpry.raises(
+ s = script.Script(tutils.test_data.path("scripts"), ctx)
+ tutils.raises(
"not a file",
s.load
)
- s = script.Script("scripts/syntaxerr.py", ctx)
- libpry.raises(
+ s = script.Script(tutils.test_data.path("scripts/syntaxerr.py"), ctx)
+ tutils.raises(
script.ScriptError,
s.load
)
- s = script.Script("scripts/loaderr.py", ctx)
- libpry.raises(
+ s = script.Script(tutils.test_data.path("scripts/loaderr.py"), ctx)
+ tutils.raises(
script.ScriptError,
s.load
)
-
-
-tests = [
- uScript(),
-]
-
diff --git a/test/test_wsgi.py b/test/test_wsgi.py
index ab2e2668..1ae81d11 100644
--- a/test/test_wsgi.py
+++ b/test/test_wsgi.py
@@ -1,5 +1,4 @@
import cStringIO, sys
-import libpry
from libmproxy import wsgi
import tutils
@@ -16,7 +15,7 @@ class TestApp:
return ['Hello', ' world!\n']
-class uWSGIAdaptor(libpry.AutoTree):
+class TestWSGIAdaptor:
def test_make_environ(self):
w = wsgi.WSGIAdaptor(None, "foo", 80)
tr = tutils.treq()
@@ -97,7 +96,7 @@ class uWSGIAdaptor(libpry.AutoTree):
assert "Internal Server Error" in self._serve(app)
-class uAppRegistry(libpry.AutoTree):
+class TestAppRegistry:
def test_add_get(self):
ar = wsgi.AppRegistry()
ar.add("foo", "domain", 80)
@@ -109,9 +108,3 @@ class uAppRegistry(libpry.AutoTree):
r.port = 81
assert not ar.get(r)
-
-
-tests = [
- uWSGIAdaptor(),
- uAppRegistry()
-]
diff --git a/test/tutils.py b/test/tutils.py
index 34ef928d..19dbb139 100644
--- a/test/tutils.py
+++ b/test/tutils.py
@@ -1,6 +1,8 @@
-import threading, Queue, unittest
+import threading, Queue
+import os, shutil,tempfile
+from contextlib import contextmanager
import libpry
-from libmproxy import proxy, flow, controller
+from libmproxy import proxy, flow, controller, utils
import requests
import libpathod.test
import random
@@ -128,6 +130,19 @@ class ProxTest:
return pthread.tmaster.log
+
+@contextmanager
+def tmpdir(*args, **kwargs):
+ orig_workdir = os.getcwd()
+ temp_workdir = tempfile.mkdtemp(*args, **kwargs)
+ os.chdir(temp_workdir)
+
+ yield temp_workdir
+
+ os.chdir(orig_workdir)
+ shutil.rmtree(temp_workdir)
+
+
def raises(exc, obj, *args, **kwargs):
"""
Assert that a callable raises a specified exception.
@@ -165,3 +180,5 @@ def raises(exc, obj, *args, **kwargs):
)
)
raise AssertionError("No exception raised.")
+
+test_data = utils.Data(__name__)