aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorThomas Kriechbaumer <thomas@kriechbaumer.name>2017-03-12 22:55:22 +0100
committerThomas Kriechbaumer <thomas@kriechbaumer.name>2017-03-12 22:55:22 +0100
commit1b045d24bccd68f6db1e15c655af192cb5217a6a (patch)
treee25684ad8daf7d0ab59dc15d7dda40f59f5f066c /test
parentd069ba9da58beb65d8139c728cc20abcb01de3a4 (diff)
downloadmitmproxy-1b045d24bccd68f6db1e15c655af192cb5217a6a.tar.gz
mitmproxy-1b045d24bccd68f6db1e15c655af192cb5217a6a.tar.bz2
mitmproxy-1b045d24bccd68f6db1e15c655af192cb5217a6a.zip
nuke tutils.tmpdir, use pytest tmpdir
Diffstat (limited to 'test')
-rw-r--r--test/mitmproxy/addons/test_clientplayback.py19
-rw-r--r--test/mitmproxy/addons/test_replace.py56
-rw-r--r--test/mitmproxy/addons/test_script.py33
-rw-r--r--test/mitmproxy/addons/test_serverplayback.py19
-rw-r--r--test/mitmproxy/addons/test_streamfile.py68
-rw-r--r--test/mitmproxy/net/test_tcp.py31
-rw-r--r--test/mitmproxy/test_certs.py204
-rw-r--r--test/mitmproxy/test_examples.py49
-rw-r--r--test/mitmproxy/test_optmanager.py29
-rw-r--r--test/mitmproxy/tools/test_dump.py25
-rw-r--r--test/pathod/language/test_base.py35
-rw-r--r--test/pathod/language/test_generators.py34
12 files changed, 273 insertions, 329 deletions
diff --git a/test/mitmproxy/addons/test_clientplayback.py b/test/mitmproxy/addons/test_clientplayback.py
index 6b8b7c90..c22b3589 100644
--- a/test/mitmproxy/addons/test_clientplayback.py
+++ b/test/mitmproxy/addons/test_clientplayback.py
@@ -1,9 +1,7 @@
-import os
import pytest
from unittest import mock
from mitmproxy.test import tflow
-from mitmproxy.test import tutils
from mitmproxy import io
from mitmproxy import exceptions
@@ -49,14 +47,13 @@ class TestClientPlayback:
cp.tick()
assert cp.current_thread is None
- def test_configure(self):
+ def test_configure(self, tmpdir):
cp = clientplayback.ClientPlayback()
with taddons.context() as tctx:
- with tutils.tmpdir() as td:
- path = os.path.join(td, "flows")
- tdump(path, [tflow.tflow()])
- tctx.configure(cp, client_replay=[path])
- tctx.configure(cp, client_replay=[])
- tctx.configure(cp)
- with pytest.raises(exceptions.OptionsError):
- tctx.configure(cp, client_replay=["nonexistent"])
+ path = str(tmpdir.join("flows"))
+ tdump(path, [tflow.tflow()])
+ tctx.configure(cp, client_replay=[path])
+ tctx.configure(cp, client_replay=[])
+ tctx.configure(cp)
+ with pytest.raises(exceptions.OptionsError):
+ tctx.configure(cp, client_replay=["nonexistent"])
diff --git a/test/mitmproxy/addons/test_replace.py b/test/mitmproxy/addons/test_replace.py
index 8c280c51..2311641a 100644
--- a/test/mitmproxy/addons/test_replace.py
+++ b/test/mitmproxy/addons/test_replace.py
@@ -1,11 +1,9 @@
-import os.path
import pytest
-from mitmproxy.test import tflow
-from mitmproxy.test import tutils
from .. import tservers
from mitmproxy.addons import replace
from mitmproxy.test import taddons
+from mitmproxy.test import tflow
class TestReplace:
@@ -71,33 +69,31 @@ class TestUpstreamProxy(tservers.HTTPUpstreamProxyTest):
class TestReplaceFile:
- def test_simple(self):
+ def test_simple(self, tmpdir):
r = replace.ReplaceFile()
- with tutils.tmpdir() as td:
- rp = os.path.join(td, "replacement")
- with open(rp, "w") as f:
- f.write("bar")
- with taddons.context() as tctx:
- tctx.configure(
- r,
- replacement_files = [
- "/~q/foo/" + rp,
- "/~s/foo/" + rp,
- "/~b nonexistent/nonexistent/nonexistent",
- ]
- )
- f = tflow.tflow()
- f.request.content = b"foo"
- r.request(f)
- assert f.request.content == b"bar"
+ rp = tmpdir.join("replacement")
+ rp.write("bar")
+ with taddons.context() as tctx:
+ tctx.configure(
+ r,
+ replacement_files = [
+ "/~q/foo/" + str(rp),
+ "/~s/foo/" + str(rp),
+ "/~b nonexistent/nonexistent/nonexistent",
+ ]
+ )
+ f = tflow.tflow()
+ f.request.content = b"foo"
+ r.request(f)
+ assert f.request.content == b"bar"
- f = tflow.tflow(resp=True)
- f.response.content = b"foo"
- r.response(f)
- assert f.response.content == b"bar"
+ f = tflow.tflow(resp=True)
+ f.response.content = b"foo"
+ r.response(f)
+ assert f.response.content == b"bar"
- f = tflow.tflow()
- f.request.content = b"nonexistent"
- assert not tctx.master.event_log
- r.request(f)
- assert tctx.master.event_log
+ f = tflow.tflow()
+ f.request.content = b"nonexistent"
+ assert not tctx.master.event_log
+ r.request(f)
+ assert tctx.master.event_log
diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py
index 5f196ebf..d79ed4ef 100644
--- a/test/mitmproxy/addons/test_script.py
+++ b/test/mitmproxy/addons/test_script.py
@@ -68,13 +68,12 @@ class TestParseCommand:
with pytest.raises(ValueError):
script.parse_command(" ")
- def test_no_script_file(self):
+ def test_no_script_file(self, tmpdir):
with pytest.raises(Exception, match="not found"):
script.parse_command("notfound")
- with tutils.tmpdir() as dir:
- with pytest.raises(Exception, match="Not a file"):
- script.parse_command(dir)
+ with pytest.raises(Exception, match="Not a file"):
+ script.parse_command(str(tmpdir))
def test_parse_args(self):
with utils.chdir(tutils.test_data.dirname):
@@ -128,21 +127,19 @@ class TestScript:
recf = sc.ns.call_log[0]
assert recf[1] == "request"
- def test_reload(self):
+ def test_reload(self, tmpdir):
with taddons.context() as tctx:
- with tutils.tmpdir():
- with open("foo.py", "w"):
- pass
- sc = script.Script("foo.py")
- tctx.configure(sc)
- for _ in range(100):
- with open("foo.py", "a") as f:
- f.write(".")
- sc.tick()
- time.sleep(0.1)
- if tctx.master.event_log:
- return
- raise AssertionError("Change event not detected.")
+ f = tmpdir.join("foo.py")
+ f.ensure(file=True)
+ sc = script.Script(str(f))
+ tctx.configure(sc)
+ for _ in range(100):
+ f.write(".")
+ sc.tick()
+ time.sleep(0.1)
+ if tctx.master.event_log:
+ return
+ raise AssertionError("Change event not detected.")
def test_exception(self):
with taddons.context() as tctx:
diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py
index e2afa516..54e4d281 100644
--- a/test/mitmproxy/addons/test_serverplayback.py
+++ b/test/mitmproxy/addons/test_serverplayback.py
@@ -1,10 +1,8 @@
-import os
import urllib
import pytest
-from mitmproxy.test import tutils
-from mitmproxy.test import tflow
from mitmproxy.test import taddons
+from mitmproxy.test import tflow
import mitmproxy.test.tutils
from mitmproxy.addons import serverplayback
@@ -19,15 +17,14 @@ def tdump(path, flows):
w.add(i)
-def test_config():
+def test_config(tmpdir):
s = serverplayback.ServerPlayback()
- with tutils.tmpdir() as p:
- with taddons.context() as tctx:
- fpath = os.path.join(p, "flows")
- tdump(fpath, [tflow.tflow(resp=True)])
- tctx.configure(s, server_replay=[fpath])
- with pytest.raises(exceptions.OptionsError):
- tctx.configure(s, server_replay=[p])
+ with taddons.context() as tctx:
+ fpath = str(tmpdir.join("flows"))
+ tdump(fpath, [tflow.tflow(resp=True)])
+ tctx.configure(s, server_replay=[fpath])
+ with pytest.raises(exceptions.OptionsError):
+ tctx.configure(s, server_replay=[str(tmpdir)])
def test_tick():
diff --git a/test/mitmproxy/addons/test_streamfile.py b/test/mitmproxy/addons/test_streamfile.py
index 4105c1fc..3f78521c 100644
--- a/test/mitmproxy/addons/test_streamfile.py
+++ b/test/mitmproxy/addons/test_streamfile.py
@@ -1,9 +1,7 @@
-import os.path
import pytest
-from mitmproxy.test import tflow
-from mitmproxy.test import tutils
from mitmproxy.test import taddons
+from mitmproxy.test import tflow
from mitmproxy import io
from mitmproxy import exceptions
@@ -11,19 +9,17 @@ from mitmproxy import options
from mitmproxy.addons import streamfile
-def test_configure():
+def test_configure(tmpdir):
sa = streamfile.StreamFile()
with taddons.context(options=options.Options()) as tctx:
- with tutils.tmpdir() as tdir:
- p = os.path.join(tdir, "foo")
- with pytest.raises(exceptions.OptionsError):
- tctx.configure(sa, streamfile=tdir)
- with pytest.raises(Exception, match="Invalid filter"):
- tctx.configure(sa, streamfile=p, filtstr="~~")
- tctx.configure(sa, filtstr="foo")
- assert sa.filt
- tctx.configure(sa, filtstr=None)
- assert not sa.filt
+ with pytest.raises(exceptions.OptionsError):
+ tctx.configure(sa, streamfile=str(tmpdir))
+ with pytest.raises(Exception, match="Invalid filter"):
+ tctx.configure(sa, streamfile=str(tmpdir.join("foo")), filtstr="~~")
+ tctx.configure(sa, filtstr="foo")
+ assert sa.filt
+ tctx.configure(sa, filtstr=None)
+ assert not sa.filt
def rd(p):
@@ -31,36 +27,34 @@ def rd(p):
return list(x.stream())
-def test_tcp():
+def test_tcp(tmpdir):
sa = streamfile.StreamFile()
with taddons.context() as tctx:
- with tutils.tmpdir() as tdir:
- p = os.path.join(tdir, "foo")
- tctx.configure(sa, streamfile=p)
+ p = str(tmpdir.join("foo"))
+ tctx.configure(sa, streamfile=p)
- tt = tflow.ttcpflow()
- sa.tcp_start(tt)
- sa.tcp_end(tt)
- tctx.configure(sa, streamfile=None)
- assert rd(p)
+ tt = tflow.ttcpflow()
+ sa.tcp_start(tt)
+ sa.tcp_end(tt)
+ tctx.configure(sa, streamfile=None)
+ assert rd(p)
-def test_simple():
+def test_simple(tmpdir):
sa = streamfile.StreamFile()
with taddons.context() as tctx:
- with tutils.tmpdir() as tdir:
- p = os.path.join(tdir, "foo")
+ p = str(tmpdir.join("foo"))
- tctx.configure(sa, streamfile=p)
+ tctx.configure(sa, streamfile=p)
- f = tflow.tflow(resp=True)
- sa.request(f)
- sa.response(f)
- tctx.configure(sa, streamfile=None)
- assert rd(p)[0].response
+ f = tflow.tflow(resp=True)
+ sa.request(f)
+ sa.response(f)
+ tctx.configure(sa, streamfile=None)
+ assert rd(p)[0].response
- tctx.configure(sa, streamfile="+" + p)
- f = tflow.tflow()
- sa.request(f)
- tctx.configure(sa, streamfile=None)
- assert not rd(p)[1].response
+ tctx.configure(sa, streamfile="+" + p)
+ f = tflow.tflow()
+ sa.request(f)
+ tctx.configure(sa, streamfile=None)
+ assert not rd(p)[1].response
diff --git a/test/mitmproxy/net/test_tcp.py b/test/mitmproxy/net/test_tcp.py
index cf010f6e..8b26784a 100644
--- a/test/mitmproxy/net/test_tcp.py
+++ b/test/mitmproxy/net/test_tcp.py
@@ -11,8 +11,8 @@ from OpenSSL import SSL
from mitmproxy import certs
from mitmproxy.net import tcp
-from mitmproxy.test import tutils
from mitmproxy import exceptions
+from mitmproxy.test import tutils
from . import tservers
from ...conftest import requires_alpn
@@ -783,25 +783,24 @@ class TestSSLKeyLogger(tservers.ServerTestBase):
cipher_list="AES256-SHA"
)
- def test_log(self):
+ def test_log(self, tmpdir):
testval = b"echo!\n"
_logfun = tcp.log_ssl_key
- with tutils.tmpdir() as d:
- logfile = os.path.join(d, "foo", "bar", "logfile")
- tcp.log_ssl_key = tcp.SSLKeyLogger(logfile)
+ logfile = str(tmpdir.join("foo", "bar", "logfile"))
+ tcp.log_ssl_key = tcp.SSLKeyLogger(logfile)
- c = tcp.TCPClient(("127.0.0.1", self.port))
- with c.connect():
- c.convert_to_ssl()
- c.wfile.write(testval)
- c.wfile.flush()
- assert c.rfile.readline() == testval
- c.finish()
-
- tcp.log_ssl_key.close()
- with open(logfile, "rb") as f:
- assert f.read().count(b"CLIENT_RANDOM") == 2
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ with c.connect():
+ c.convert_to_ssl()
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
+ c.finish()
+
+ tcp.log_ssl_key.close()
+ with open(logfile, "rb") as f:
+ assert f.read().count(b"CLIENT_RANDOM") == 2
tcp.log_ssl_key = _logfun
diff --git a/test/mitmproxy/test_certs.py b/test/mitmproxy/test_certs.py
index 9bd3ad25..2d12c370 100644
--- a/test/mitmproxy/test_certs.py
+++ b/test/mitmproxy/test_certs.py
@@ -34,118 +34,106 @@ from mitmproxy.test import tutils
class TestCertStore:
- def test_create_explicit(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- assert ca.get_cert(b"foo", [])
-
- ca2 = certs.CertStore.from_store(d, "test")
- assert ca2.get_cert(b"foo", [])
-
- assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number()
-
- def test_create_no_common_name(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- assert ca.get_cert(None, [])[0].cn is None
-
- def test_create_tmp(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- assert ca.get_cert(b"foo.com", [])
- assert ca.get_cert(b"foo.com", [])
- assert ca.get_cert(b"*.foo.com", [])
-
- r = ca.get_cert(b"*.foo.com", [])
- assert r[1] == ca.default_privatekey
-
- def test_sans(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- c1 = ca.get_cert(b"foo.com", [b"*.bar.com"])
- ca.get_cert(b"foo.bar.com", [])
- # assert c1 == c2
- c3 = ca.get_cert(b"bar.com", [])
- assert not c1 == c3
-
- def test_sans_change(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- ca.get_cert(b"foo.com", [b"*.bar.com"])
- cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"])
- assert b"*.baz.com" in cert.altnames
-
- def test_expire(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- ca.STORE_CAP = 3
- ca.get_cert(b"one.com", [])
- ca.get_cert(b"two.com", [])
- ca.get_cert(b"three.com", [])
-
- assert (b"one.com", ()) in ca.certs
- assert (b"two.com", ()) in ca.certs
- assert (b"three.com", ()) in ca.certs
-
- ca.get_cert(b"one.com", [])
-
- assert (b"one.com", ()) in ca.certs
- assert (b"two.com", ()) in ca.certs
- assert (b"three.com", ()) in ca.certs
-
- ca.get_cert(b"four.com", [])
-
- assert (b"one.com", ()) not in ca.certs
- assert (b"two.com", ()) in ca.certs
- assert (b"three.com", ()) in ca.certs
- assert (b"four.com", ()) in ca.certs
-
- def test_overrides(self):
- with tutils.tmpdir() as d:
- ca1 = certs.CertStore.from_store(os.path.join(d, "ca1"), "test")
- ca2 = certs.CertStore.from_store(os.path.join(d, "ca2"), "test")
- assert not ca1.default_ca.get_serial_number(
- ) == ca2.default_ca.get_serial_number()
-
- dc = ca2.get_cert(b"foo.com", [b"sans.example.com"])
- dcp = os.path.join(d, "dc")
- f = open(dcp, "wb")
- f.write(dc[0].to_pem())
- f.close()
- ca1.add_cert_file(b"foo.com", dcp)
-
- ret = ca1.get_cert(b"foo.com", [])
- assert ret[0].serial == dc[0].serial
-
- def test_create_dhparams(self):
- with tutils.tmpdir() as d:
- filename = os.path.join(d, "dhparam.pem")
- certs.CertStore.load_dhparam(filename)
- assert os.path.exists(filename)
+ def test_create_explicit(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ assert ca.get_cert(b"foo", [])
+
+ ca2 = certs.CertStore.from_store(str(tmpdir), "test")
+ assert ca2.get_cert(b"foo", [])
+
+ assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number()
+
+ def test_create_no_common_name(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ assert ca.get_cert(None, [])[0].cn is None
+
+ def test_create_tmp(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ assert ca.get_cert(b"foo.com", [])
+ assert ca.get_cert(b"foo.com", [])
+ assert ca.get_cert(b"*.foo.com", [])
+
+ r = ca.get_cert(b"*.foo.com", [])
+ assert r[1] == ca.default_privatekey
+
+ def test_sans(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ c1 = ca.get_cert(b"foo.com", [b"*.bar.com"])
+ ca.get_cert(b"foo.bar.com", [])
+ # assert c1 == c2
+ c3 = ca.get_cert(b"bar.com", [])
+ assert not c1 == c3
+
+ def test_sans_change(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ ca.get_cert(b"foo.com", [b"*.bar.com"])
+ cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"])
+ assert b"*.baz.com" in cert.altnames
+
+ def test_expire(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ ca.STORE_CAP = 3
+ ca.get_cert(b"one.com", [])
+ ca.get_cert(b"two.com", [])
+ ca.get_cert(b"three.com", [])
+
+ assert (b"one.com", ()) in ca.certs
+ assert (b"two.com", ()) in ca.certs
+ assert (b"three.com", ()) in ca.certs
+
+ ca.get_cert(b"one.com", [])
+
+ assert (b"one.com", ()) in ca.certs
+ assert (b"two.com", ()) in ca.certs
+ assert (b"three.com", ()) in ca.certs
+
+ ca.get_cert(b"four.com", [])
+
+ assert (b"one.com", ()) not in ca.certs
+ assert (b"two.com", ()) in ca.certs
+ assert (b"three.com", ()) in ca.certs
+ assert (b"four.com", ()) in ca.certs
+
+ def test_overrides(self, tmpdir):
+ ca1 = certs.CertStore.from_store(str(tmpdir.join("ca1")), "test")
+ ca2 = certs.CertStore.from_store(str(tmpdir.join("ca2")), "test")
+ assert not ca1.default_ca.get_serial_number() == ca2.default_ca.get_serial_number()
+
+ dc = ca2.get_cert(b"foo.com", [b"sans.example.com"])
+ dcp = tmpdir.join("dc")
+ dcp.write(dc[0].to_pem())
+ ca1.add_cert_file(b"foo.com", str(dcp))
+
+ ret = ca1.get_cert(b"foo.com", [])
+ assert ret[0].serial == dc[0].serial
+
+ def test_create_dhparams(self, tmpdir):
+ filename = str(tmpdir.join("dhparam.pem"))
+ certs.CertStore.load_dhparam(filename)
+ assert os.path.exists(filename)
class TestDummyCert:
- def test_with_ca(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- r = certs.dummy_cert(
- ca.default_privatekey,
- ca.default_ca,
- b"foo.com",
- [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"]
- )
- assert r.cn == b"foo.com"
- assert r.altnames == [b'one.com', b'two.com', b'*.three.com']
-
- r = certs.dummy_cert(
- ca.default_privatekey,
- ca.default_ca,
- None,
- []
- )
- assert r.cn is None
- assert r.altnames == []
+ def test_with_ca(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ r = certs.dummy_cert(
+ ca.default_privatekey,
+ ca.default_ca,
+ b"foo.com",
+ [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"]
+ )
+ assert r.cn == b"foo.com"
+ assert r.altnames == [b'one.com', b'two.com', b'*.three.com']
+
+ r = certs.dummy_cert(
+ ca.default_privatekey,
+ ca.default_ca,
+ None,
+ []
+ )
+ assert r.cn is None
+ assert r.altnames == []
class TestSSLCert:
diff --git a/test/mitmproxy/test_examples.py b/test/mitmproxy/test_examples.py
index 668d0d4a..f20e0c8c 100644
--- a/test/mitmproxy/test_examples.py
+++ b/test/mitmproxy/test_examples.py
@@ -1,5 +1,4 @@
import json
-import os
import shlex
import pytest
@@ -142,30 +141,26 @@ class TestHARDump:
with pytest.raises(ScriptError):
tscript("complex/har_dump.py")
- def test_simple(self):
- with tutils.tmpdir() as tdir:
- path = os.path.join(tdir, "somefile")
+ def test_simple(self, tmpdir):
+ path = str(tmpdir.join("somefile"))
- m, sc = tscript("complex/har_dump.py", shlex.quote(path))
- m.addons.invoke(m, "response", self.flow())
- m.addons.remove(sc)
-
- with open(path, "r") as inp:
- har = json.load(inp)
+ m, sc = tscript("complex/har_dump.py", shlex.quote(path))
+ m.addons.invoke(m, "response", self.flow())
+ m.addons.remove(sc)
+ with open(path, "r") as inp:
+ har = json.load(inp)
assert len(har["log"]["entries"]) == 1
- def test_base64(self):
- with tutils.tmpdir() as tdir:
- path = os.path.join(tdir, "somefile")
-
- m, sc = tscript("complex/har_dump.py", shlex.quote(path))
- m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10))
- m.addons.remove(sc)
+ def test_base64(self, tmpdir):
+ path = str(tmpdir.join("somefile"))
- with open(path, "r") as inp:
- har = json.load(inp)
+ m, sc = tscript("complex/har_dump.py", shlex.quote(path))
+ m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10))
+ m.addons.remove(sc)
+ with open(path, "r") as inp:
+ har = json.load(inp)
assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64"
def test_format_cookies(self):
@@ -187,7 +182,7 @@ class TestHARDump:
f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0]
assert f['expires']
- def test_binary(self):
+ def test_binary(self, tmpdir):
f = self.flow()
f.request.method = "POST"
@@ -196,14 +191,12 @@ class TestHARDump:
f.response.headers["random-junk"] = bytes(range(256))
f.response.content = bytes(range(256))
- with tutils.tmpdir() as tdir:
- path = os.path.join(tdir, "somefile")
-
- m, sc = tscript("complex/har_dump.py", shlex.quote(path))
- m.addons.invoke(m, "response", f)
- m.addons.remove(sc)
+ path = str(tmpdir.join("somefile"))
- with open(path, "r") as inp:
- har = json.load(inp)
+ m, sc = tscript("complex/har_dump.py", shlex.quote(path))
+ m.addons.invoke(m, "response", f)
+ m.addons.remove(sc)
+ with open(path, "r") as inp:
+ har = json.load(inp)
assert len(har["log"]["entries"]) == 1
diff --git a/test/mitmproxy/test_optmanager.py b/test/mitmproxy/test_optmanager.py
index db33cddd..0ecfc4e5 100644
--- a/test/mitmproxy/test_optmanager.py
+++ b/test/mitmproxy/test_optmanager.py
@@ -1,5 +1,4 @@
import copy
-import os
import pytest
import typing
import argparse
@@ -7,7 +6,6 @@ import argparse
from mitmproxy import options
from mitmproxy import optmanager
from mitmproxy import exceptions
-from mitmproxy.test import tutils
class TO(optmanager.OptManager):
@@ -238,25 +236,24 @@ def test_serialize_defaults():
assert o.serialize(None, defaults=True)
-def test_saving():
+def test_saving(tmpdir):
o = TD2()
o.three = "set"
- with tutils.tmpdir() as tdir:
- dst = os.path.join(tdir, "conf")
- o.save(dst, defaults=True)
+ dst = str(tmpdir.join("conf"))
+ o.save(dst, defaults=True)
- o2 = TD2()
- o2.load_paths(dst)
- o2.three = "foo"
- o2.save(dst, defaults=True)
+ o2 = TD2()
+ o2.load_paths(dst)
+ o2.three = "foo"
+ o2.save(dst, defaults=True)
- o.load_paths(dst)
- assert o.three == "foo"
+ o.load_paths(dst)
+ assert o.three == "foo"
- with open(dst, 'a') as f:
- f.write("foobar: '123'")
- with pytest.raises(exceptions.OptionsError, matches=''):
- o.load_paths(dst)
+ with open(dst, 'a') as f:
+ f.write("foobar: '123'")
+ with pytest.raises(exceptions.OptionsError, matches=''):
+ o.load_paths(dst)
def test_merge():
diff --git a/test/mitmproxy/tools/test_dump.py b/test/mitmproxy/tools/test_dump.py
index 2542ec4b..a15bf583 100644
--- a/test/mitmproxy/tools/test_dump.py
+++ b/test/mitmproxy/tools/test_dump.py
@@ -1,4 +1,3 @@
-import os
import pytest
from unittest import mock
@@ -9,7 +8,6 @@ from mitmproxy import controller
from mitmproxy import options
from mitmproxy.tools import dump
-from mitmproxy.test import tutils
from .. import tservers
@@ -19,18 +17,17 @@ class TestDumpMaster(tservers.MasterTest):
m = dump.DumpMaster(o, proxy.DummyServer(), with_termlog=False, with_dumper=False)
return m
- def test_read(self):
- with tutils.tmpdir() as t:
- p = os.path.join(t, "read")
- self.flowfile(p)
- self.dummy_cycle(
- self.mkmaster(None, rfile=p),
- 1, b"",
- )
- with pytest.raises(exceptions.OptionsError):
- self.mkmaster(None, rfile="/nonexistent")
- with pytest.raises(exceptions.OptionsError):
- self.mkmaster(None, rfile="test_dump.py")
+ def test_read(self, tmpdir):
+ p = str(tmpdir.join("read"))
+ self.flowfile(p)
+ self.dummy_cycle(
+ self.mkmaster(None, rfile=p),
+ 1, b"",
+ )
+ with pytest.raises(exceptions.OptionsError):
+ self.mkmaster(None, rfile="/nonexistent")
+ with pytest.raises(exceptions.OptionsError):
+ self.mkmaster(None, rfile="test_dump.py")
def test_has_error(self):
m = self.mkmaster(None)
diff --git a/test/pathod/language/test_base.py b/test/pathod/language/test_base.py
index 85e9e53b..ec460b07 100644
--- a/test/pathod/language/test_base.py
+++ b/test/pathod/language/test_base.py
@@ -1,11 +1,8 @@
-import os
import pytest
from pathod import language
from pathod.language import base, exceptions
-from mitmproxy.test import tutils
-
def parse_request(s):
return language.parse_pathoc(s).next()
@@ -137,24 +134,22 @@ class TestTokValueFile:
v = base.TokValue.parseString("<path")[0]
assert v.path == "path"
- def test_access_control(self):
+ def test_access_control(self, tmpdir):
v = base.TokValue.parseString("<path")[0]
- with tutils.tmpdir() as t:
- p = os.path.join(t, "path")
- with open(p, "wb") as f:
- f.write(b"x" * 10000)
-
- assert v.get_generator(language.Settings(staticdir=t))
-
- v = base.TokValue.parseString("<path2")[0]
- with pytest.raises(exceptions.FileAccessDenied):
- v.get_generator(language.Settings(staticdir=t))
- with pytest.raises(Exception, match="access disabled"):
- v.get_generator(language.Settings())
-
- v = base.TokValue.parseString("</outside")[0]
- with pytest.raises(Exception, match="outside"):
- v.get_generator(language.Settings(staticdir=t))
+ f = tmpdir.join("path")
+ f.write(b"x" * 10000)
+
+ assert v.get_generator(language.Settings(staticdir=str(tmpdir)))
+
+ v = base.TokValue.parseString("<path2")[0]
+ with pytest.raises(exceptions.FileAccessDenied):
+ v.get_generator(language.Settings(staticdir=str(tmpdir)))
+ with pytest.raises(Exception, match="access disabled"):
+ v.get_generator(language.Settings())
+
+ v = base.TokValue.parseString("</outside")[0]
+ with pytest.raises(Exception, match="outside"):
+ v.get_generator(language.Settings(staticdir=str(tmpdir)))
def test_spec(self):
v = base.TokValue.parseString("<'one two'")[0]
diff --git a/test/pathod/language/test_generators.py b/test/pathod/language/test_generators.py
index b3ce0335..dc15aaa1 100644
--- a/test/pathod/language/test_generators.py
+++ b/test/pathod/language/test_generators.py
@@ -1,7 +1,4 @@
-import os
-
from pathod.language import generators
-from mitmproxy.test import tutils
def test_randomgenerator():
@@ -15,23 +12,20 @@ def test_randomgenerator():
assert len(g[1000:1001]) == 0
-def test_filegenerator():
- with tutils.tmpdir() as t:
- path = os.path.join(t, "foo")
- f = open(path, "wb")
- f.write(b"x" * 10000)
- f.close()
- g = generators.FileGenerator(path)
- assert len(g) == 10000
- assert g[0] == b"x"
- assert g[-1] == b"x"
- assert g[0:5] == b"xxxxx"
- assert len(g[1:10]) == 9
- assert len(g[10000:10001]) == 0
- assert repr(g)
- # remove all references to FileGenerator instance to close the file
- # handle.
- del g
+def test_filegenerator(tmpdir):
+ f = tmpdir.join("foo")
+ f.write(b"x" * 10000)
+ g = generators.FileGenerator(str(f))
+ assert len(g) == 10000
+ assert g[0] == b"x"
+ assert g[-1] == b"x"
+ assert g[0:5] == b"xxxxx"
+ assert len(g[1:10]) == 9
+ assert len(g[10000:10001]) == 0
+ assert repr(g)
+ # remove all references to FileGenerator instance to close the file
+ # handle.
+ del g
def test_transform_generator():