diff options
author | Thomas Kriechbaumer <thomas@kriechbaumer.name> | 2017-03-12 22:55:22 +0100 |
---|---|---|
committer | Thomas Kriechbaumer <thomas@kriechbaumer.name> | 2017-03-12 22:55:22 +0100 |
commit | 1b045d24bccd68f6db1e15c655af192cb5217a6a (patch) | |
tree | e25684ad8daf7d0ab59dc15d7dda40f59f5f066c /test | |
parent | d069ba9da58beb65d8139c728cc20abcb01de3a4 (diff) | |
download | mitmproxy-1b045d24bccd68f6db1e15c655af192cb5217a6a.tar.gz mitmproxy-1b045d24bccd68f6db1e15c655af192cb5217a6a.tar.bz2 mitmproxy-1b045d24bccd68f6db1e15c655af192cb5217a6a.zip |
nuke tutils.tmpdir, use pytest tmpdir
Diffstat (limited to 'test')
-rw-r--r-- | test/mitmproxy/addons/test_clientplayback.py | 19 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_replace.py | 56 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_script.py | 33 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_serverplayback.py | 19 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_streamfile.py | 68 | ||||
-rw-r--r-- | test/mitmproxy/net/test_tcp.py | 31 | ||||
-rw-r--r-- | test/mitmproxy/test_certs.py | 204 | ||||
-rw-r--r-- | test/mitmproxy/test_examples.py | 49 | ||||
-rw-r--r-- | test/mitmproxy/test_optmanager.py | 29 | ||||
-rw-r--r-- | test/mitmproxy/tools/test_dump.py | 25 | ||||
-rw-r--r-- | test/pathod/language/test_base.py | 35 | ||||
-rw-r--r-- | test/pathod/language/test_generators.py | 34 |
12 files changed, 273 insertions, 329 deletions
diff --git a/test/mitmproxy/addons/test_clientplayback.py b/test/mitmproxy/addons/test_clientplayback.py index 6b8b7c90..c22b3589 100644 --- a/test/mitmproxy/addons/test_clientplayback.py +++ b/test/mitmproxy/addons/test_clientplayback.py @@ -1,9 +1,7 @@ -import os import pytest from unittest import mock from mitmproxy.test import tflow -from mitmproxy.test import tutils from mitmproxy import io from mitmproxy import exceptions @@ -49,14 +47,13 @@ class TestClientPlayback: cp.tick() assert cp.current_thread is None - def test_configure(self): + def test_configure(self, tmpdir): cp = clientplayback.ClientPlayback() with taddons.context() as tctx: - with tutils.tmpdir() as td: - path = os.path.join(td, "flows") - tdump(path, [tflow.tflow()]) - tctx.configure(cp, client_replay=[path]) - tctx.configure(cp, client_replay=[]) - tctx.configure(cp) - with pytest.raises(exceptions.OptionsError): - tctx.configure(cp, client_replay=["nonexistent"]) + path = str(tmpdir.join("flows")) + tdump(path, [tflow.tflow()]) + tctx.configure(cp, client_replay=[path]) + tctx.configure(cp, client_replay=[]) + tctx.configure(cp) + with pytest.raises(exceptions.OptionsError): + tctx.configure(cp, client_replay=["nonexistent"]) diff --git a/test/mitmproxy/addons/test_replace.py b/test/mitmproxy/addons/test_replace.py index 8c280c51..2311641a 100644 --- a/test/mitmproxy/addons/test_replace.py +++ b/test/mitmproxy/addons/test_replace.py @@ -1,11 +1,9 @@ -import os.path import pytest -from mitmproxy.test import tflow -from mitmproxy.test import tutils from .. import tservers from mitmproxy.addons import replace from mitmproxy.test import taddons +from mitmproxy.test import tflow class TestReplace: @@ -71,33 +69,31 @@ class TestUpstreamProxy(tservers.HTTPUpstreamProxyTest): class TestReplaceFile: - def test_simple(self): + def test_simple(self, tmpdir): r = replace.ReplaceFile() - with tutils.tmpdir() as td: - rp = os.path.join(td, "replacement") - with open(rp, "w") as f: - f.write("bar") - with taddons.context() as tctx: - tctx.configure( - r, - replacement_files = [ - "/~q/foo/" + rp, - "/~s/foo/" + rp, - "/~b nonexistent/nonexistent/nonexistent", - ] - ) - f = tflow.tflow() - f.request.content = b"foo" - r.request(f) - assert f.request.content == b"bar" + rp = tmpdir.join("replacement") + rp.write("bar") + with taddons.context() as tctx: + tctx.configure( + r, + replacement_files = [ + "/~q/foo/" + str(rp), + "/~s/foo/" + str(rp), + "/~b nonexistent/nonexistent/nonexistent", + ] + ) + f = tflow.tflow() + f.request.content = b"foo" + r.request(f) + assert f.request.content == b"bar" - f = tflow.tflow(resp=True) - f.response.content = b"foo" - r.response(f) - assert f.response.content == b"bar" + f = tflow.tflow(resp=True) + f.response.content = b"foo" + r.response(f) + assert f.response.content == b"bar" - f = tflow.tflow() - f.request.content = b"nonexistent" - assert not tctx.master.event_log - r.request(f) - assert tctx.master.event_log + f = tflow.tflow() + f.request.content = b"nonexistent" + assert not tctx.master.event_log + r.request(f) + assert tctx.master.event_log diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py index 5f196ebf..d79ed4ef 100644 --- a/test/mitmproxy/addons/test_script.py +++ b/test/mitmproxy/addons/test_script.py @@ -68,13 +68,12 @@ class TestParseCommand: with pytest.raises(ValueError): script.parse_command(" ") - def test_no_script_file(self): + def test_no_script_file(self, tmpdir): with pytest.raises(Exception, match="not found"): script.parse_command("notfound") - with tutils.tmpdir() as dir: - with pytest.raises(Exception, match="Not a file"): - script.parse_command(dir) + with pytest.raises(Exception, match="Not a file"): + script.parse_command(str(tmpdir)) def test_parse_args(self): with utils.chdir(tutils.test_data.dirname): @@ -128,21 +127,19 @@ class TestScript: recf = sc.ns.call_log[0] assert recf[1] == "request" - def test_reload(self): + def test_reload(self, tmpdir): with taddons.context() as tctx: - with tutils.tmpdir(): - with open("foo.py", "w"): - pass - sc = script.Script("foo.py") - tctx.configure(sc) - for _ in range(100): - with open("foo.py", "a") as f: - f.write(".") - sc.tick() - time.sleep(0.1) - if tctx.master.event_log: - return - raise AssertionError("Change event not detected.") + f = tmpdir.join("foo.py") + f.ensure(file=True) + sc = script.Script(str(f)) + tctx.configure(sc) + for _ in range(100): + f.write(".") + sc.tick() + time.sleep(0.1) + if tctx.master.event_log: + return + raise AssertionError("Change event not detected.") def test_exception(self): with taddons.context() as tctx: diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py index e2afa516..54e4d281 100644 --- a/test/mitmproxy/addons/test_serverplayback.py +++ b/test/mitmproxy/addons/test_serverplayback.py @@ -1,10 +1,8 @@ -import os import urllib import pytest -from mitmproxy.test import tutils -from mitmproxy.test import tflow from mitmproxy.test import taddons +from mitmproxy.test import tflow import mitmproxy.test.tutils from mitmproxy.addons import serverplayback @@ -19,15 +17,14 @@ def tdump(path, flows): w.add(i) -def test_config(): +def test_config(tmpdir): s = serverplayback.ServerPlayback() - with tutils.tmpdir() as p: - with taddons.context() as tctx: - fpath = os.path.join(p, "flows") - tdump(fpath, [tflow.tflow(resp=True)]) - tctx.configure(s, server_replay=[fpath]) - with pytest.raises(exceptions.OptionsError): - tctx.configure(s, server_replay=[p]) + with taddons.context() as tctx: + fpath = str(tmpdir.join("flows")) + tdump(fpath, [tflow.tflow(resp=True)]) + tctx.configure(s, server_replay=[fpath]) + with pytest.raises(exceptions.OptionsError): + tctx.configure(s, server_replay=[str(tmpdir)]) def test_tick(): diff --git a/test/mitmproxy/addons/test_streamfile.py b/test/mitmproxy/addons/test_streamfile.py index 4105c1fc..3f78521c 100644 --- a/test/mitmproxy/addons/test_streamfile.py +++ b/test/mitmproxy/addons/test_streamfile.py @@ -1,9 +1,7 @@ -import os.path import pytest -from mitmproxy.test import tflow -from mitmproxy.test import tutils from mitmproxy.test import taddons +from mitmproxy.test import tflow from mitmproxy import io from mitmproxy import exceptions @@ -11,19 +9,17 @@ from mitmproxy import options from mitmproxy.addons import streamfile -def test_configure(): +def test_configure(tmpdir): sa = streamfile.StreamFile() with taddons.context(options=options.Options()) as tctx: - with tutils.tmpdir() as tdir: - p = os.path.join(tdir, "foo") - with pytest.raises(exceptions.OptionsError): - tctx.configure(sa, streamfile=tdir) - with pytest.raises(Exception, match="Invalid filter"): - tctx.configure(sa, streamfile=p, filtstr="~~") - tctx.configure(sa, filtstr="foo") - assert sa.filt - tctx.configure(sa, filtstr=None) - assert not sa.filt + with pytest.raises(exceptions.OptionsError): + tctx.configure(sa, streamfile=str(tmpdir)) + with pytest.raises(Exception, match="Invalid filter"): + tctx.configure(sa, streamfile=str(tmpdir.join("foo")), filtstr="~~") + tctx.configure(sa, filtstr="foo") + assert sa.filt + tctx.configure(sa, filtstr=None) + assert not sa.filt def rd(p): @@ -31,36 +27,34 @@ def rd(p): return list(x.stream()) -def test_tcp(): +def test_tcp(tmpdir): sa = streamfile.StreamFile() with taddons.context() as tctx: - with tutils.tmpdir() as tdir: - p = os.path.join(tdir, "foo") - tctx.configure(sa, streamfile=p) + p = str(tmpdir.join("foo")) + tctx.configure(sa, streamfile=p) - tt = tflow.ttcpflow() - sa.tcp_start(tt) - sa.tcp_end(tt) - tctx.configure(sa, streamfile=None) - assert rd(p) + tt = tflow.ttcpflow() + sa.tcp_start(tt) + sa.tcp_end(tt) + tctx.configure(sa, streamfile=None) + assert rd(p) -def test_simple(): +def test_simple(tmpdir): sa = streamfile.StreamFile() with taddons.context() as tctx: - with tutils.tmpdir() as tdir: - p = os.path.join(tdir, "foo") + p = str(tmpdir.join("foo")) - tctx.configure(sa, streamfile=p) + tctx.configure(sa, streamfile=p) - f = tflow.tflow(resp=True) - sa.request(f) - sa.response(f) - tctx.configure(sa, streamfile=None) - assert rd(p)[0].response + f = tflow.tflow(resp=True) + sa.request(f) + sa.response(f) + tctx.configure(sa, streamfile=None) + assert rd(p)[0].response - tctx.configure(sa, streamfile="+" + p) - f = tflow.tflow() - sa.request(f) - tctx.configure(sa, streamfile=None) - assert not rd(p)[1].response + tctx.configure(sa, streamfile="+" + p) + f = tflow.tflow() + sa.request(f) + tctx.configure(sa, streamfile=None) + assert not rd(p)[1].response diff --git a/test/mitmproxy/net/test_tcp.py b/test/mitmproxy/net/test_tcp.py index cf010f6e..8b26784a 100644 --- a/test/mitmproxy/net/test_tcp.py +++ b/test/mitmproxy/net/test_tcp.py @@ -11,8 +11,8 @@ from OpenSSL import SSL from mitmproxy import certs from mitmproxy.net import tcp -from mitmproxy.test import tutils from mitmproxy import exceptions +from mitmproxy.test import tutils from . import tservers from ...conftest import requires_alpn @@ -783,25 +783,24 @@ class TestSSLKeyLogger(tservers.ServerTestBase): cipher_list="AES256-SHA" ) - def test_log(self): + def test_log(self, tmpdir): testval = b"echo!\n" _logfun = tcp.log_ssl_key - with tutils.tmpdir() as d: - logfile = os.path.join(d, "foo", "bar", "logfile") - tcp.log_ssl_key = tcp.SSLKeyLogger(logfile) + logfile = str(tmpdir.join("foo", "bar", "logfile")) + tcp.log_ssl_key = tcp.SSLKeyLogger(logfile) - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - c.convert_to_ssl() - c.wfile.write(testval) - c.wfile.flush() - assert c.rfile.readline() == testval - c.finish() - - tcp.log_ssl_key.close() - with open(logfile, "rb") as f: - assert f.read().count(b"CLIENT_RANDOM") == 2 + c = tcp.TCPClient(("127.0.0.1", self.port)) + with c.connect(): + c.convert_to_ssl() + c.wfile.write(testval) + c.wfile.flush() + assert c.rfile.readline() == testval + c.finish() + + tcp.log_ssl_key.close() + with open(logfile, "rb") as f: + assert f.read().count(b"CLIENT_RANDOM") == 2 tcp.log_ssl_key = _logfun diff --git a/test/mitmproxy/test_certs.py b/test/mitmproxy/test_certs.py index 9bd3ad25..2d12c370 100644 --- a/test/mitmproxy/test_certs.py +++ b/test/mitmproxy/test_certs.py @@ -34,118 +34,106 @@ from mitmproxy.test import tutils class TestCertStore: - def test_create_explicit(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - assert ca.get_cert(b"foo", []) - - ca2 = certs.CertStore.from_store(d, "test") - assert ca2.get_cert(b"foo", []) - - assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() - - def test_create_no_common_name(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - assert ca.get_cert(None, [])[0].cn is None - - def test_create_tmp(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - assert ca.get_cert(b"foo.com", []) - assert ca.get_cert(b"foo.com", []) - assert ca.get_cert(b"*.foo.com", []) - - r = ca.get_cert(b"*.foo.com", []) - assert r[1] == ca.default_privatekey - - def test_sans(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - c1 = ca.get_cert(b"foo.com", [b"*.bar.com"]) - ca.get_cert(b"foo.bar.com", []) - # assert c1 == c2 - c3 = ca.get_cert(b"bar.com", []) - assert not c1 == c3 - - def test_sans_change(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - ca.get_cert(b"foo.com", [b"*.bar.com"]) - cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"]) - assert b"*.baz.com" in cert.altnames - - def test_expire(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - ca.STORE_CAP = 3 - ca.get_cert(b"one.com", []) - ca.get_cert(b"two.com", []) - ca.get_cert(b"three.com", []) - - assert (b"one.com", ()) in ca.certs - assert (b"two.com", ()) in ca.certs - assert (b"three.com", ()) in ca.certs - - ca.get_cert(b"one.com", []) - - assert (b"one.com", ()) in ca.certs - assert (b"two.com", ()) in ca.certs - assert (b"three.com", ()) in ca.certs - - ca.get_cert(b"four.com", []) - - assert (b"one.com", ()) not in ca.certs - assert (b"two.com", ()) in ca.certs - assert (b"three.com", ()) in ca.certs - assert (b"four.com", ()) in ca.certs - - def test_overrides(self): - with tutils.tmpdir() as d: - ca1 = certs.CertStore.from_store(os.path.join(d, "ca1"), "test") - ca2 = certs.CertStore.from_store(os.path.join(d, "ca2"), "test") - assert not ca1.default_ca.get_serial_number( - ) == ca2.default_ca.get_serial_number() - - dc = ca2.get_cert(b"foo.com", [b"sans.example.com"]) - dcp = os.path.join(d, "dc") - f = open(dcp, "wb") - f.write(dc[0].to_pem()) - f.close() - ca1.add_cert_file(b"foo.com", dcp) - - ret = ca1.get_cert(b"foo.com", []) - assert ret[0].serial == dc[0].serial - - def test_create_dhparams(self): - with tutils.tmpdir() as d: - filename = os.path.join(d, "dhparam.pem") - certs.CertStore.load_dhparam(filename) - assert os.path.exists(filename) + def test_create_explicit(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + assert ca.get_cert(b"foo", []) + + ca2 = certs.CertStore.from_store(str(tmpdir), "test") + assert ca2.get_cert(b"foo", []) + + assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() + + def test_create_no_common_name(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + assert ca.get_cert(None, [])[0].cn is None + + def test_create_tmp(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + assert ca.get_cert(b"foo.com", []) + assert ca.get_cert(b"foo.com", []) + assert ca.get_cert(b"*.foo.com", []) + + r = ca.get_cert(b"*.foo.com", []) + assert r[1] == ca.default_privatekey + + def test_sans(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + c1 = ca.get_cert(b"foo.com", [b"*.bar.com"]) + ca.get_cert(b"foo.bar.com", []) + # assert c1 == c2 + c3 = ca.get_cert(b"bar.com", []) + assert not c1 == c3 + + def test_sans_change(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + ca.get_cert(b"foo.com", [b"*.bar.com"]) + cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"]) + assert b"*.baz.com" in cert.altnames + + def test_expire(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + ca.STORE_CAP = 3 + ca.get_cert(b"one.com", []) + ca.get_cert(b"two.com", []) + ca.get_cert(b"three.com", []) + + assert (b"one.com", ()) in ca.certs + assert (b"two.com", ()) in ca.certs + assert (b"three.com", ()) in ca.certs + + ca.get_cert(b"one.com", []) + + assert (b"one.com", ()) in ca.certs + assert (b"two.com", ()) in ca.certs + assert (b"three.com", ()) in ca.certs + + ca.get_cert(b"four.com", []) + + assert (b"one.com", ()) not in ca.certs + assert (b"two.com", ()) in ca.certs + assert (b"three.com", ()) in ca.certs + assert (b"four.com", ()) in ca.certs + + def test_overrides(self, tmpdir): + ca1 = certs.CertStore.from_store(str(tmpdir.join("ca1")), "test") + ca2 = certs.CertStore.from_store(str(tmpdir.join("ca2")), "test") + assert not ca1.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() + + dc = ca2.get_cert(b"foo.com", [b"sans.example.com"]) + dcp = tmpdir.join("dc") + dcp.write(dc[0].to_pem()) + ca1.add_cert_file(b"foo.com", str(dcp)) + + ret = ca1.get_cert(b"foo.com", []) + assert ret[0].serial == dc[0].serial + + def test_create_dhparams(self, tmpdir): + filename = str(tmpdir.join("dhparam.pem")) + certs.CertStore.load_dhparam(filename) + assert os.path.exists(filename) class TestDummyCert: - def test_with_ca(self): - with tutils.tmpdir() as d: - ca = certs.CertStore.from_store(d, "test") - r = certs.dummy_cert( - ca.default_privatekey, - ca.default_ca, - b"foo.com", - [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"] - ) - assert r.cn == b"foo.com" - assert r.altnames == [b'one.com', b'two.com', b'*.three.com'] - - r = certs.dummy_cert( - ca.default_privatekey, - ca.default_ca, - None, - [] - ) - assert r.cn is None - assert r.altnames == [] + def test_with_ca(self, tmpdir): + ca = certs.CertStore.from_store(str(tmpdir), "test") + r = certs.dummy_cert( + ca.default_privatekey, + ca.default_ca, + b"foo.com", + [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"] + ) + assert r.cn == b"foo.com" + assert r.altnames == [b'one.com', b'two.com', b'*.three.com'] + + r = certs.dummy_cert( + ca.default_privatekey, + ca.default_ca, + None, + [] + ) + assert r.cn is None + assert r.altnames == [] class TestSSLCert: diff --git a/test/mitmproxy/test_examples.py b/test/mitmproxy/test_examples.py index 668d0d4a..f20e0c8c 100644 --- a/test/mitmproxy/test_examples.py +++ b/test/mitmproxy/test_examples.py @@ -1,5 +1,4 @@ import json -import os import shlex import pytest @@ -142,30 +141,26 @@ class TestHARDump: with pytest.raises(ScriptError): tscript("complex/har_dump.py") - def test_simple(self): - with tutils.tmpdir() as tdir: - path = os.path.join(tdir, "somefile") + def test_simple(self, tmpdir): + path = str(tmpdir.join("somefile")) - m, sc = tscript("complex/har_dump.py", shlex.quote(path)) - m.addons.invoke(m, "response", self.flow()) - m.addons.remove(sc) - - with open(path, "r") as inp: - har = json.load(inp) + m, sc = tscript("complex/har_dump.py", shlex.quote(path)) + m.addons.invoke(m, "response", self.flow()) + m.addons.remove(sc) + with open(path, "r") as inp: + har = json.load(inp) assert len(har["log"]["entries"]) == 1 - def test_base64(self): - with tutils.tmpdir() as tdir: - path = os.path.join(tdir, "somefile") - - m, sc = tscript("complex/har_dump.py", shlex.quote(path)) - m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10)) - m.addons.remove(sc) + def test_base64(self, tmpdir): + path = str(tmpdir.join("somefile")) - with open(path, "r") as inp: - har = json.load(inp) + m, sc = tscript("complex/har_dump.py", shlex.quote(path)) + m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10)) + m.addons.remove(sc) + with open(path, "r") as inp: + har = json.load(inp) assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64" def test_format_cookies(self): @@ -187,7 +182,7 @@ class TestHARDump: f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0] assert f['expires'] - def test_binary(self): + def test_binary(self, tmpdir): f = self.flow() f.request.method = "POST" @@ -196,14 +191,12 @@ class TestHARDump: f.response.headers["random-junk"] = bytes(range(256)) f.response.content = bytes(range(256)) - with tutils.tmpdir() as tdir: - path = os.path.join(tdir, "somefile") - - m, sc = tscript("complex/har_dump.py", shlex.quote(path)) - m.addons.invoke(m, "response", f) - m.addons.remove(sc) + path = str(tmpdir.join("somefile")) - with open(path, "r") as inp: - har = json.load(inp) + m, sc = tscript("complex/har_dump.py", shlex.quote(path)) + m.addons.invoke(m, "response", f) + m.addons.remove(sc) + with open(path, "r") as inp: + har = json.load(inp) assert len(har["log"]["entries"]) == 1 diff --git a/test/mitmproxy/test_optmanager.py b/test/mitmproxy/test_optmanager.py index db33cddd..0ecfc4e5 100644 --- a/test/mitmproxy/test_optmanager.py +++ b/test/mitmproxy/test_optmanager.py @@ -1,5 +1,4 @@ import copy -import os import pytest import typing import argparse @@ -7,7 +6,6 @@ import argparse from mitmproxy import options from mitmproxy import optmanager from mitmproxy import exceptions -from mitmproxy.test import tutils class TO(optmanager.OptManager): @@ -238,25 +236,24 @@ def test_serialize_defaults(): assert o.serialize(None, defaults=True) -def test_saving(): +def test_saving(tmpdir): o = TD2() o.three = "set" - with tutils.tmpdir() as tdir: - dst = os.path.join(tdir, "conf") - o.save(dst, defaults=True) + dst = str(tmpdir.join("conf")) + o.save(dst, defaults=True) - o2 = TD2() - o2.load_paths(dst) - o2.three = "foo" - o2.save(dst, defaults=True) + o2 = TD2() + o2.load_paths(dst) + o2.three = "foo" + o2.save(dst, defaults=True) - o.load_paths(dst) - assert o.three == "foo" + o.load_paths(dst) + assert o.three == "foo" - with open(dst, 'a') as f: - f.write("foobar: '123'") - with pytest.raises(exceptions.OptionsError, matches=''): - o.load_paths(dst) + with open(dst, 'a') as f: + f.write("foobar: '123'") + with pytest.raises(exceptions.OptionsError, matches=''): + o.load_paths(dst) def test_merge(): diff --git a/test/mitmproxy/tools/test_dump.py b/test/mitmproxy/tools/test_dump.py index 2542ec4b..a15bf583 100644 --- a/test/mitmproxy/tools/test_dump.py +++ b/test/mitmproxy/tools/test_dump.py @@ -1,4 +1,3 @@ -import os import pytest from unittest import mock @@ -9,7 +8,6 @@ from mitmproxy import controller from mitmproxy import options from mitmproxy.tools import dump -from mitmproxy.test import tutils from .. import tservers @@ -19,18 +17,17 @@ class TestDumpMaster(tservers.MasterTest): m = dump.DumpMaster(o, proxy.DummyServer(), with_termlog=False, with_dumper=False) return m - def test_read(self): - with tutils.tmpdir() as t: - p = os.path.join(t, "read") - self.flowfile(p) - self.dummy_cycle( - self.mkmaster(None, rfile=p), - 1, b"", - ) - with pytest.raises(exceptions.OptionsError): - self.mkmaster(None, rfile="/nonexistent") - with pytest.raises(exceptions.OptionsError): - self.mkmaster(None, rfile="test_dump.py") + def test_read(self, tmpdir): + p = str(tmpdir.join("read")) + self.flowfile(p) + self.dummy_cycle( + self.mkmaster(None, rfile=p), + 1, b"", + ) + with pytest.raises(exceptions.OptionsError): + self.mkmaster(None, rfile="/nonexistent") + with pytest.raises(exceptions.OptionsError): + self.mkmaster(None, rfile="test_dump.py") def test_has_error(self): m = self.mkmaster(None) diff --git a/test/pathod/language/test_base.py b/test/pathod/language/test_base.py index 85e9e53b..ec460b07 100644 --- a/test/pathod/language/test_base.py +++ b/test/pathod/language/test_base.py @@ -1,11 +1,8 @@ -import os import pytest from pathod import language from pathod.language import base, exceptions -from mitmproxy.test import tutils - def parse_request(s): return language.parse_pathoc(s).next() @@ -137,24 +134,22 @@ class TestTokValueFile: v = base.TokValue.parseString("<path")[0] assert v.path == "path" - def test_access_control(self): + def test_access_control(self, tmpdir): v = base.TokValue.parseString("<path")[0] - with tutils.tmpdir() as t: - p = os.path.join(t, "path") - with open(p, "wb") as f: - f.write(b"x" * 10000) - - assert v.get_generator(language.Settings(staticdir=t)) - - v = base.TokValue.parseString("<path2")[0] - with pytest.raises(exceptions.FileAccessDenied): - v.get_generator(language.Settings(staticdir=t)) - with pytest.raises(Exception, match="access disabled"): - v.get_generator(language.Settings()) - - v = base.TokValue.parseString("</outside")[0] - with pytest.raises(Exception, match="outside"): - v.get_generator(language.Settings(staticdir=t)) + f = tmpdir.join("path") + f.write(b"x" * 10000) + + assert v.get_generator(language.Settings(staticdir=str(tmpdir))) + + v = base.TokValue.parseString("<path2")[0] + with pytest.raises(exceptions.FileAccessDenied): + v.get_generator(language.Settings(staticdir=str(tmpdir))) + with pytest.raises(Exception, match="access disabled"): + v.get_generator(language.Settings()) + + v = base.TokValue.parseString("</outside")[0] + with pytest.raises(Exception, match="outside"): + v.get_generator(language.Settings(staticdir=str(tmpdir))) def test_spec(self): v = base.TokValue.parseString("<'one two'")[0] diff --git a/test/pathod/language/test_generators.py b/test/pathod/language/test_generators.py index b3ce0335..dc15aaa1 100644 --- a/test/pathod/language/test_generators.py +++ b/test/pathod/language/test_generators.py @@ -1,7 +1,4 @@ -import os - from pathod.language import generators -from mitmproxy.test import tutils def test_randomgenerator(): @@ -15,23 +12,20 @@ def test_randomgenerator(): assert len(g[1000:1001]) == 0 -def test_filegenerator(): - with tutils.tmpdir() as t: - path = os.path.join(t, "foo") - f = open(path, "wb") - f.write(b"x" * 10000) - f.close() - g = generators.FileGenerator(path) - assert len(g) == 10000 - assert g[0] == b"x" - assert g[-1] == b"x" - assert g[0:5] == b"xxxxx" - assert len(g[1:10]) == 9 - assert len(g[10000:10001]) == 0 - assert repr(g) - # remove all references to FileGenerator instance to close the file - # handle. - del g +def test_filegenerator(tmpdir): + f = tmpdir.join("foo") + f.write(b"x" * 10000) + g = generators.FileGenerator(str(f)) + assert len(g) == 10000 + assert g[0] == b"x" + assert g[-1] == b"x" + assert g[0:5] == b"xxxxx" + assert len(g[1:10]) == 9 + assert len(g[10000:10001]) == 0 + assert repr(g) + # remove all references to FileGenerator instance to close the file + # handle. + del g def test_transform_generator(): |