diff options
| -rw-r--r-- | mitmproxy/test/tutils.py | 18 | ||||
| -rw-r--r-- | test/mitmproxy/addons/test_clientplayback.py | 19 | ||||
| -rw-r--r-- | test/mitmproxy/addons/test_replace.py | 56 | ||||
| -rw-r--r-- | test/mitmproxy/addons/test_script.py | 33 | ||||
| -rw-r--r-- | test/mitmproxy/addons/test_serverplayback.py | 19 | ||||
| -rw-r--r-- | test/mitmproxy/addons/test_streamfile.py | 68 | ||||
| -rw-r--r-- | test/mitmproxy/net/test_tcp.py | 31 | ||||
| -rw-r--r-- | test/mitmproxy/test_certs.py | 204 | ||||
| -rw-r--r-- | test/mitmproxy/test_examples.py | 49 | ||||
| -rw-r--r-- | test/mitmproxy/test_optmanager.py | 29 | ||||
| -rw-r--r-- | test/mitmproxy/tools/test_dump.py | 25 | ||||
| -rw-r--r-- | test/pathod/language/test_base.py | 35 | ||||
| -rw-r--r-- | test/pathod/language/test_generators.py | 34 | 
13 files changed, 274 insertions, 346 deletions
diff --git a/mitmproxy/test/tutils.py b/mitmproxy/test/tutils.py index 7b311492..80e5b6fd 100644 --- a/mitmproxy/test/tutils.py +++ b/mitmproxy/test/tutils.py @@ -1,9 +1,5 @@ -from io import BytesIO -import tempfile -import os  import time -import shutil -from contextlib import contextmanager +from io import BytesIO  from mitmproxy.utils import data  from mitmproxy.net import tcp @@ -13,18 +9,6 @@ from mitmproxy.net import http  test_data = data.Data(__name__).push("../../test/") -@contextmanager -def tmpdir(*args, **kwargs): -    orig_workdir = os.getcwd() -    temp_workdir = tempfile.mkdtemp(*args, **kwargs) -    os.chdir(temp_workdir) - -    yield temp_workdir - -    os.chdir(orig_workdir) -    shutil.rmtree(temp_workdir) - -  def treader(bytes):      """          Construct a tcp.Read object from bytes. diff --git a/test/mitmproxy/addons/test_clientplayback.py b/test/mitmproxy/addons/test_clientplayback.py index 6b8b7c90..c22b3589 100644 --- a/test/mitmproxy/addons/test_clientplayback.py +++ b/test/mitmproxy/addons/test_clientplayback.py @@ -1,9 +1,7 @@ -import os  import pytest  from unittest import mock  from mitmproxy.test import tflow -from mitmproxy.test import tutils  from mitmproxy import io  from mitmproxy import exceptions @@ -49,14 +47,13 @@ class TestClientPlayback:                  cp.tick()                  assert cp.current_thread is None -    def test_configure(self): +    def test_configure(self, tmpdir):          cp = clientplayback.ClientPlayback()          with taddons.context() as tctx: -            with tutils.tmpdir() as td: -                path = os.path.join(td, "flows") -                tdump(path, [tflow.tflow()]) -                tctx.configure(cp, client_replay=[path]) -                tctx.configure(cp, client_replay=[]) -                tctx.configure(cp) -                with pytest.raises(exceptions.OptionsError): -                    tctx.configure(cp, client_replay=["nonexistent"]) +            path = str(tmpdir.join("flows")) +            tdump(path, [tflow.tflow()]) +            tctx.configure(cp, client_replay=[path]) +            tctx.configure(cp, client_replay=[]) +            tctx.configure(cp) +            with pytest.raises(exceptions.OptionsError): +                tctx.configure(cp, client_replay=["nonexistent"]) diff --git a/test/mitmproxy/addons/test_replace.py b/test/mitmproxy/addons/test_replace.py index 8c280c51..2311641a 100644 --- a/test/mitmproxy/addons/test_replace.py +++ b/test/mitmproxy/addons/test_replace.py @@ -1,11 +1,9 @@ -import os.path  import pytest -from mitmproxy.test import tflow -from mitmproxy.test import tutils  from .. import tservers  from mitmproxy.addons import replace  from mitmproxy.test import taddons +from mitmproxy.test import tflow  class TestReplace: @@ -71,33 +69,31 @@ class TestUpstreamProxy(tservers.HTTPUpstreamProxyTest):  class TestReplaceFile: -    def test_simple(self): +    def test_simple(self, tmpdir):          r = replace.ReplaceFile() -        with tutils.tmpdir() as td: -            rp = os.path.join(td, "replacement") -            with open(rp, "w") as f: -                f.write("bar") -            with taddons.context() as tctx: -                tctx.configure( -                    r, -                    replacement_files = [ -                        "/~q/foo/" + rp, -                        "/~s/foo/" + rp, -                        "/~b nonexistent/nonexistent/nonexistent", -                    ] -                ) -                f = tflow.tflow() -                f.request.content = b"foo" -                r.request(f) -                assert f.request.content == b"bar" +        rp = tmpdir.join("replacement") +        rp.write("bar") +        with taddons.context() as tctx: +            tctx.configure( +                r, +                replacement_files = [ +                    "/~q/foo/" + str(rp), +                    "/~s/foo/" + str(rp), +                    "/~b nonexistent/nonexistent/nonexistent", +                ] +            ) +            f = tflow.tflow() +            f.request.content = b"foo" +            r.request(f) +            assert f.request.content == b"bar" -                f = tflow.tflow(resp=True) -                f.response.content = b"foo" -                r.response(f) -                assert f.response.content == b"bar" +            f = tflow.tflow(resp=True) +            f.response.content = b"foo" +            r.response(f) +            assert f.response.content == b"bar" -                f = tflow.tflow() -                f.request.content = b"nonexistent" -                assert not tctx.master.event_log -                r.request(f) -                assert tctx.master.event_log +            f = tflow.tflow() +            f.request.content = b"nonexistent" +            assert not tctx.master.event_log +            r.request(f) +            assert tctx.master.event_log diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py index 5f196ebf..d79ed4ef 100644 --- a/test/mitmproxy/addons/test_script.py +++ b/test/mitmproxy/addons/test_script.py @@ -68,13 +68,12 @@ class TestParseCommand:          with pytest.raises(ValueError):              script.parse_command("  ") -    def test_no_script_file(self): +    def test_no_script_file(self, tmpdir):          with pytest.raises(Exception, match="not found"):              script.parse_command("notfound") -        with tutils.tmpdir() as dir: -            with pytest.raises(Exception, match="Not a file"): -                script.parse_command(dir) +        with pytest.raises(Exception, match="Not a file"): +            script.parse_command(str(tmpdir))      def test_parse_args(self):          with utils.chdir(tutils.test_data.dirname): @@ -128,21 +127,19 @@ class TestScript:              recf = sc.ns.call_log[0]              assert recf[1] == "request" -    def test_reload(self): +    def test_reload(self, tmpdir):          with taddons.context() as tctx: -            with tutils.tmpdir(): -                with open("foo.py", "w"): -                    pass -                sc = script.Script("foo.py") -                tctx.configure(sc) -                for _ in range(100): -                    with open("foo.py", "a") as f: -                        f.write(".") -                    sc.tick() -                    time.sleep(0.1) -                    if tctx.master.event_log: -                        return -                raise AssertionError("Change event not detected.") +            f = tmpdir.join("foo.py") +            f.ensure(file=True) +            sc = script.Script(str(f)) +            tctx.configure(sc) +            for _ in range(100): +                f.write(".") +                sc.tick() +                time.sleep(0.1) +                if tctx.master.event_log: +                    return +            raise AssertionError("Change event not detected.")      def test_exception(self):          with taddons.context() as tctx: diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py index e2afa516..54e4d281 100644 --- a/test/mitmproxy/addons/test_serverplayback.py +++ b/test/mitmproxy/addons/test_serverplayback.py @@ -1,10 +1,8 @@ -import os  import urllib  import pytest -from mitmproxy.test import tutils -from mitmproxy.test import tflow  from mitmproxy.test import taddons +from mitmproxy.test import tflow  import mitmproxy.test.tutils  from mitmproxy.addons import serverplayback @@ -19,15 +17,14 @@ def tdump(path, flows):          w.add(i) -def test_config(): +def test_config(tmpdir):      s = serverplayback.ServerPlayback() -    with tutils.tmpdir() as p: -        with taddons.context() as tctx: -            fpath = os.path.join(p, "flows") -            tdump(fpath, [tflow.tflow(resp=True)]) -            tctx.configure(s, server_replay=[fpath]) -            with pytest.raises(exceptions.OptionsError): -                tctx.configure(s, server_replay=[p]) +    with taddons.context() as tctx: +        fpath = str(tmpdir.join("flows")) +        tdump(fpath, [tflow.tflow(resp=True)]) +        tctx.configure(s, server_replay=[fpath]) +        with pytest.raises(exceptions.OptionsError): +            tctx.configure(s, server_replay=[str(tmpdir)])  def test_tick(): diff --git a/test/mitmproxy/addons/test_streamfile.py b/test/mitmproxy/addons/test_streamfile.py index 4105c1fc..3f78521c 100644 --- a/test/mitmproxy/addons/test_streamfile.py +++ b/test/mitmproxy/addons/test_streamfile.py @@ -1,9 +1,7 @@ -import os.path  import pytest -from mitmproxy.test import tflow -from mitmproxy.test import tutils  from mitmproxy.test import taddons +from mitmproxy.test import tflow  from mitmproxy import io  from mitmproxy import exceptions @@ -11,19 +9,17 @@ from mitmproxy import options  from mitmproxy.addons import streamfile -def test_configure(): +def test_configure(tmpdir):      sa = streamfile.StreamFile()      with taddons.context(options=options.Options()) as tctx: -        with tutils.tmpdir() as tdir: -            p = os.path.join(tdir, "foo") -            with pytest.raises(exceptions.OptionsError): -                tctx.configure(sa, streamfile=tdir) -            with pytest.raises(Exception, match="Invalid filter"): -                tctx.configure(sa, streamfile=p, filtstr="~~") -            tctx.configure(sa, filtstr="foo") -            assert sa.filt -            tctx.configure(sa, filtstr=None) -            assert not sa.filt +        with pytest.raises(exceptions.OptionsError): +            tctx.configure(sa, streamfile=str(tmpdir)) +        with pytest.raises(Exception, match="Invalid filter"): +            tctx.configure(sa, streamfile=str(tmpdir.join("foo")), filtstr="~~") +        tctx.configure(sa, filtstr="foo") +        assert sa.filt +        tctx.configure(sa, filtstr=None) +        assert not sa.filt  def rd(p): @@ -31,36 +27,34 @@ def rd(p):      return list(x.stream()) -def test_tcp(): +def test_tcp(tmpdir):      sa = streamfile.StreamFile()      with taddons.context() as tctx: -        with tutils.tmpdir() as tdir: -            p = os.path.join(tdir, "foo") -            tctx.configure(sa, streamfile=p) +        p = str(tmpdir.join("foo")) +        tctx.configure(sa, streamfile=p) -            tt = tflow.ttcpflow() -            sa.tcp_start(tt) -            sa.tcp_end(tt) -            tctx.configure(sa, streamfile=None) -            assert rd(p) +        tt = tflow.ttcpflow() +        sa.tcp_start(tt) +        sa.tcp_end(tt) +        tctx.configure(sa, streamfile=None) +        assert rd(p) -def test_simple(): +def test_simple(tmpdir):      sa = streamfile.StreamFile()      with taddons.context() as tctx: -        with tutils.tmpdir() as tdir: -            p = os.path.join(tdir, "foo") +        p = str(tmpdir.join("foo")) -            tctx.configure(sa, streamfile=p) +        tctx.configure(sa, streamfile=p) -            f = tflow.tflow(resp=True) -            sa.request(f) -            sa.response(f) -            tctx.configure(sa, streamfile=None) -            assert rd(p)[0].response +        f = tflow.tflow(resp=True) +        sa.request(f) +        sa.response(f) +        tctx.configure(sa, streamfile=None) +        assert rd(p)[0].response -            tctx.configure(sa, streamfile="+" + p) -            f = tflow.tflow() -            sa.request(f) -            tctx.configure(sa, streamfile=None) -            assert not rd(p)[1].response +        tctx.configure(sa, streamfile="+" + p) +        f = tflow.tflow() +        sa.request(f) +        tctx.configure(sa, streamfile=None) +        assert not rd(p)[1].response diff --git a/test/mitmproxy/net/test_tcp.py b/test/mitmproxy/net/test_tcp.py index cf010f6e..8b26784a 100644 --- a/test/mitmproxy/net/test_tcp.py +++ b/test/mitmproxy/net/test_tcp.py @@ -11,8 +11,8 @@ from OpenSSL import SSL  from mitmproxy import certs  from mitmproxy.net import tcp -from mitmproxy.test import tutils  from mitmproxy import exceptions +from mitmproxy.test import tutils  from . import tservers  from ...conftest import requires_alpn @@ -783,25 +783,24 @@ class TestSSLKeyLogger(tservers.ServerTestBase):          cipher_list="AES256-SHA"      ) -    def test_log(self): +    def test_log(self, tmpdir):          testval = b"echo!\n"          _logfun = tcp.log_ssl_key -        with tutils.tmpdir() as d: -            logfile = os.path.join(d, "foo", "bar", "logfile") -            tcp.log_ssl_key = tcp.SSLKeyLogger(logfile) +        logfile = str(tmpdir.join("foo", "bar", "logfile")) +        tcp.log_ssl_key = tcp.SSLKeyLogger(logfile) -            c = tcp.TCPClient(("127.0.0.1", self.port)) -            with c.connect(): -                c.convert_to_ssl() -                c.wfile.write(testval) -                c.wfile.flush() -                assert c.rfile.readline() == testval -                c.finish() - -                tcp.log_ssl_key.close() -                with open(logfile, "rb") as f: -                    assert f.read().count(b"CLIENT_RANDOM") == 2 +        c = tcp.TCPClient(("127.0.0.1", self.port)) +        with c.connect(): +            c.convert_to_ssl() +            c.wfile.write(testval) +            c.wfile.flush() +            assert c.rfile.readline() == testval +            c.finish() + +            tcp.log_ssl_key.close() +            with open(logfile, "rb") as f: +                assert f.read().count(b"CLIENT_RANDOM") == 2          tcp.log_ssl_key = _logfun diff --git a/test/mitmproxy/test_certs.py b/test/mitmproxy/test_certs.py index 9bd3ad25..2d12c370 100644 --- a/test/mitmproxy/test_certs.py +++ b/test/mitmproxy/test_certs.py @@ -34,118 +34,106 @@ from mitmproxy.test import tutils  class TestCertStore: -    def test_create_explicit(self): -        with tutils.tmpdir() as d: -            ca = certs.CertStore.from_store(d, "test") -            assert ca.get_cert(b"foo", []) - -            ca2 = certs.CertStore.from_store(d, "test") -            assert ca2.get_cert(b"foo", []) - -            assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() - -    def test_create_no_common_name(self): -        with tutils.tmpdir() as d: -            ca = certs.CertStore.from_store(d, "test") -            assert ca.get_cert(None, [])[0].cn is None - -    def test_create_tmp(self): -        with tutils.tmpdir() as d: -            ca = certs.CertStore.from_store(d, "test") -            assert ca.get_cert(b"foo.com", []) -            assert ca.get_cert(b"foo.com", []) -            assert ca.get_cert(b"*.foo.com", []) - -            r = ca.get_cert(b"*.foo.com", []) -            assert r[1] == ca.default_privatekey - -    def test_sans(self): -        with tutils.tmpdir() as d: -            ca = certs.CertStore.from_store(d, "test") -            c1 = ca.get_cert(b"foo.com", [b"*.bar.com"]) -            ca.get_cert(b"foo.bar.com", []) -            # assert c1 == c2 -            c3 = ca.get_cert(b"bar.com", []) -            assert not c1 == c3 - -    def test_sans_change(self): -        with tutils.tmpdir() as d: -            ca = certs.CertStore.from_store(d, "test") -            ca.get_cert(b"foo.com", [b"*.bar.com"]) -            cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"]) -            assert b"*.baz.com" in cert.altnames - -    def test_expire(self): -        with tutils.tmpdir() as d: -            ca = certs.CertStore.from_store(d, "test") -            ca.STORE_CAP = 3 -            ca.get_cert(b"one.com", []) -            ca.get_cert(b"two.com", []) -            ca.get_cert(b"three.com", []) - -            assert (b"one.com", ()) in ca.certs -            assert (b"two.com", ()) in ca.certs -            assert (b"three.com", ()) in ca.certs - -            ca.get_cert(b"one.com", []) - -            assert (b"one.com", ()) in ca.certs -            assert (b"two.com", ()) in ca.certs -            assert (b"three.com", ()) in ca.certs - -            ca.get_cert(b"four.com", []) - -            assert (b"one.com", ()) not in ca.certs -            assert (b"two.com", ()) in ca.certs -            assert (b"three.com", ()) in ca.certs -            assert (b"four.com", ()) in ca.certs - -    def test_overrides(self): -        with tutils.tmpdir() as d: -            ca1 = certs.CertStore.from_store(os.path.join(d, "ca1"), "test") -            ca2 = certs.CertStore.from_store(os.path.join(d, "ca2"), "test") -            assert not ca1.default_ca.get_serial_number( -            ) == ca2.default_ca.get_serial_number() - -            dc = ca2.get_cert(b"foo.com", [b"sans.example.com"]) -            dcp = os.path.join(d, "dc") -            f = open(dcp, "wb") -            f.write(dc[0].to_pem()) -            f.close() -            ca1.add_cert_file(b"foo.com", dcp) - -            ret = ca1.get_cert(b"foo.com", []) -            assert ret[0].serial == dc[0].serial - -    def test_create_dhparams(self): -        with tutils.tmpdir() as d: -            filename = os.path.join(d, "dhparam.pem") -            certs.CertStore.load_dhparam(filename) -            assert os.path.exists(filename) +    def test_create_explicit(self, tmpdir): +        ca = certs.CertStore.from_store(str(tmpdir), "test") +        assert ca.get_cert(b"foo", []) + +        ca2 = certs.CertStore.from_store(str(tmpdir), "test") +        assert ca2.get_cert(b"foo", []) + +        assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() + +    def test_create_no_common_name(self, tmpdir): +        ca = certs.CertStore.from_store(str(tmpdir), "test") +        assert ca.get_cert(None, [])[0].cn is None + +    def test_create_tmp(self, tmpdir): +        ca = certs.CertStore.from_store(str(tmpdir), "test") +        assert ca.get_cert(b"foo.com", []) +        assert ca.get_cert(b"foo.com", []) +        assert ca.get_cert(b"*.foo.com", []) + +        r = ca.get_cert(b"*.foo.com", []) +        assert r[1] == ca.default_privatekey + +    def test_sans(self, tmpdir): +        ca = certs.CertStore.from_store(str(tmpdir), "test") +        c1 = ca.get_cert(b"foo.com", [b"*.bar.com"]) +        ca.get_cert(b"foo.bar.com", []) +        # assert c1 == c2 +        c3 = ca.get_cert(b"bar.com", []) +        assert not c1 == c3 + +    def test_sans_change(self, tmpdir): +        ca = certs.CertStore.from_store(str(tmpdir), "test") +        ca.get_cert(b"foo.com", [b"*.bar.com"]) +        cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"]) +        assert b"*.baz.com" in cert.altnames + +    def test_expire(self, tmpdir): +        ca = certs.CertStore.from_store(str(tmpdir), "test") +        ca.STORE_CAP = 3 +        ca.get_cert(b"one.com", []) +        ca.get_cert(b"two.com", []) +        ca.get_cert(b"three.com", []) + +        assert (b"one.com", ()) in ca.certs +        assert (b"two.com", ()) in ca.certs +        assert (b"three.com", ()) in ca.certs + +        ca.get_cert(b"one.com", []) + +        assert (b"one.com", ()) in ca.certs +        assert (b"two.com", ()) in ca.certs +        assert (b"three.com", ()) in ca.certs + +        ca.get_cert(b"four.com", []) + +        assert (b"one.com", ()) not in ca.certs +        assert (b"two.com", ()) in ca.certs +        assert (b"three.com", ()) in ca.certs +        assert (b"four.com", ()) in ca.certs + +    def test_overrides(self, tmpdir): +        ca1 = certs.CertStore.from_store(str(tmpdir.join("ca1")), "test") +        ca2 = certs.CertStore.from_store(str(tmpdir.join("ca2")), "test") +        assert not ca1.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() + +        dc = ca2.get_cert(b"foo.com", [b"sans.example.com"]) +        dcp = tmpdir.join("dc") +        dcp.write(dc[0].to_pem()) +        ca1.add_cert_file(b"foo.com", str(dcp)) + +        ret = ca1.get_cert(b"foo.com", []) +        assert ret[0].serial == dc[0].serial + +    def test_create_dhparams(self, tmpdir): +        filename = str(tmpdir.join("dhparam.pem")) +        certs.CertStore.load_dhparam(filename) +        assert os.path.exists(filename)  class TestDummyCert: -    def test_with_ca(self): -        with tutils.tmpdir() as d: -            ca = certs.CertStore.from_store(d, "test") -            r = certs.dummy_cert( -                ca.default_privatekey, -                ca.default_ca, -                b"foo.com", -                [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"] -            ) -            assert r.cn == b"foo.com" -            assert r.altnames == [b'one.com', b'two.com', b'*.three.com'] - -            r = certs.dummy_cert( -                ca.default_privatekey, -                ca.default_ca, -                None, -                [] -            ) -            assert r.cn is None -            assert r.altnames == [] +    def test_with_ca(self, tmpdir): +        ca = certs.CertStore.from_store(str(tmpdir), "test") +        r = certs.dummy_cert( +            ca.default_privatekey, +            ca.default_ca, +            b"foo.com", +            [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"] +        ) +        assert r.cn == b"foo.com" +        assert r.altnames == [b'one.com', b'two.com', b'*.three.com'] + +        r = certs.dummy_cert( +            ca.default_privatekey, +            ca.default_ca, +            None, +            [] +        ) +        assert r.cn is None +        assert r.altnames == []  class TestSSLCert: diff --git a/test/mitmproxy/test_examples.py b/test/mitmproxy/test_examples.py index 668d0d4a..f20e0c8c 100644 --- a/test/mitmproxy/test_examples.py +++ b/test/mitmproxy/test_examples.py @@ -1,5 +1,4 @@  import json -import os  import shlex  import pytest @@ -142,30 +141,26 @@ class TestHARDump:          with pytest.raises(ScriptError):              tscript("complex/har_dump.py") -    def test_simple(self): -        with tutils.tmpdir() as tdir: -            path = os.path.join(tdir, "somefile") +    def test_simple(self, tmpdir): +        path = str(tmpdir.join("somefile")) -            m, sc = tscript("complex/har_dump.py", shlex.quote(path)) -            m.addons.invoke(m, "response", self.flow()) -            m.addons.remove(sc) - -            with open(path, "r") as inp: -                har = json.load(inp) +        m, sc = tscript("complex/har_dump.py", shlex.quote(path)) +        m.addons.invoke(m, "response", self.flow()) +        m.addons.remove(sc) +        with open(path, "r") as inp: +            har = json.load(inp)          assert len(har["log"]["entries"]) == 1 -    def test_base64(self): -        with tutils.tmpdir() as tdir: -            path = os.path.join(tdir, "somefile") - -            m, sc = tscript("complex/har_dump.py", shlex.quote(path)) -            m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10)) -            m.addons.remove(sc) +    def test_base64(self, tmpdir): +        path = str(tmpdir.join("somefile")) -            with open(path, "r") as inp: -                har = json.load(inp) +        m, sc = tscript("complex/har_dump.py", shlex.quote(path)) +        m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10)) +        m.addons.remove(sc) +        with open(path, "r") as inp: +            har = json.load(inp)          assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64"      def test_format_cookies(self): @@ -187,7 +182,7 @@ class TestHARDump:          f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0]          assert f['expires'] -    def test_binary(self): +    def test_binary(self, tmpdir):          f = self.flow()          f.request.method = "POST" @@ -196,14 +191,12 @@ class TestHARDump:          f.response.headers["random-junk"] = bytes(range(256))          f.response.content = bytes(range(256)) -        with tutils.tmpdir() as tdir: -            path = os.path.join(tdir, "somefile") - -            m, sc = tscript("complex/har_dump.py", shlex.quote(path)) -            m.addons.invoke(m, "response", f) -            m.addons.remove(sc) +        path = str(tmpdir.join("somefile")) -            with open(path, "r") as inp: -                har = json.load(inp) +        m, sc = tscript("complex/har_dump.py", shlex.quote(path)) +        m.addons.invoke(m, "response", f) +        m.addons.remove(sc) +        with open(path, "r") as inp: +            har = json.load(inp)          assert len(har["log"]["entries"]) == 1 diff --git a/test/mitmproxy/test_optmanager.py b/test/mitmproxy/test_optmanager.py index db33cddd..0ecfc4e5 100644 --- a/test/mitmproxy/test_optmanager.py +++ b/test/mitmproxy/test_optmanager.py @@ -1,5 +1,4 @@  import copy -import os  import pytest  import typing  import argparse @@ -7,7 +6,6 @@ import argparse  from mitmproxy import options  from mitmproxy import optmanager  from mitmproxy import exceptions -from mitmproxy.test import tutils  class TO(optmanager.OptManager): @@ -238,25 +236,24 @@ def test_serialize_defaults():      assert o.serialize(None, defaults=True) -def test_saving(): +def test_saving(tmpdir):      o = TD2()      o.three = "set" -    with tutils.tmpdir() as tdir: -        dst = os.path.join(tdir, "conf") -        o.save(dst, defaults=True) +    dst = str(tmpdir.join("conf")) +    o.save(dst, defaults=True) -        o2 = TD2() -        o2.load_paths(dst) -        o2.three = "foo" -        o2.save(dst, defaults=True) +    o2 = TD2() +    o2.load_paths(dst) +    o2.three = "foo" +    o2.save(dst, defaults=True) -        o.load_paths(dst) -        assert o.three == "foo" +    o.load_paths(dst) +    assert o.three == "foo" -        with open(dst, 'a') as f: -            f.write("foobar: '123'") -        with pytest.raises(exceptions.OptionsError, matches=''): -            o.load_paths(dst) +    with open(dst, 'a') as f: +        f.write("foobar: '123'") +    with pytest.raises(exceptions.OptionsError, matches=''): +        o.load_paths(dst)  def test_merge(): diff --git a/test/mitmproxy/tools/test_dump.py b/test/mitmproxy/tools/test_dump.py index 2542ec4b..a15bf583 100644 --- a/test/mitmproxy/tools/test_dump.py +++ b/test/mitmproxy/tools/test_dump.py @@ -1,4 +1,3 @@ -import os  import pytest  from unittest import mock @@ -9,7 +8,6 @@ from mitmproxy import controller  from mitmproxy import options  from mitmproxy.tools import dump -from mitmproxy.test import tutils  from .. import tservers @@ -19,18 +17,17 @@ class TestDumpMaster(tservers.MasterTest):          m = dump.DumpMaster(o, proxy.DummyServer(), with_termlog=False, with_dumper=False)          return m -    def test_read(self): -        with tutils.tmpdir() as t: -            p = os.path.join(t, "read") -            self.flowfile(p) -            self.dummy_cycle( -                self.mkmaster(None, rfile=p), -                1, b"", -            ) -            with pytest.raises(exceptions.OptionsError): -                self.mkmaster(None, rfile="/nonexistent") -            with pytest.raises(exceptions.OptionsError): -                self.mkmaster(None, rfile="test_dump.py") +    def test_read(self, tmpdir): +        p = str(tmpdir.join("read")) +        self.flowfile(p) +        self.dummy_cycle( +            self.mkmaster(None, rfile=p), +            1, b"", +        ) +        with pytest.raises(exceptions.OptionsError): +            self.mkmaster(None, rfile="/nonexistent") +        with pytest.raises(exceptions.OptionsError): +            self.mkmaster(None, rfile="test_dump.py")      def test_has_error(self):          m = self.mkmaster(None) diff --git a/test/pathod/language/test_base.py b/test/pathod/language/test_base.py index 85e9e53b..ec460b07 100644 --- a/test/pathod/language/test_base.py +++ b/test/pathod/language/test_base.py @@ -1,11 +1,8 @@ -import os  import pytest  from pathod import language  from pathod.language import base, exceptions -from mitmproxy.test import tutils -  def parse_request(s):      return language.parse_pathoc(s).next() @@ -137,24 +134,22 @@ class TestTokValueFile:          v = base.TokValue.parseString("<path")[0]          assert v.path == "path" -    def test_access_control(self): +    def test_access_control(self, tmpdir):          v = base.TokValue.parseString("<path")[0] -        with tutils.tmpdir() as t: -            p = os.path.join(t, "path") -            with open(p, "wb") as f: -                f.write(b"x" * 10000) - -            assert v.get_generator(language.Settings(staticdir=t)) - -            v = base.TokValue.parseString("<path2")[0] -            with pytest.raises(exceptions.FileAccessDenied): -                v.get_generator(language.Settings(staticdir=t)) -            with pytest.raises(Exception, match="access disabled"): -                v.get_generator(language.Settings()) - -            v = base.TokValue.parseString("</outside")[0] -            with pytest.raises(Exception, match="outside"): -                v.get_generator(language.Settings(staticdir=t)) +        f = tmpdir.join("path") +        f.write(b"x" * 10000) + +        assert v.get_generator(language.Settings(staticdir=str(tmpdir))) + +        v = base.TokValue.parseString("<path2")[0] +        with pytest.raises(exceptions.FileAccessDenied): +            v.get_generator(language.Settings(staticdir=str(tmpdir))) +        with pytest.raises(Exception, match="access disabled"): +            v.get_generator(language.Settings()) + +        v = base.TokValue.parseString("</outside")[0] +        with pytest.raises(Exception, match="outside"): +            v.get_generator(language.Settings(staticdir=str(tmpdir)))      def test_spec(self):          v = base.TokValue.parseString("<'one two'")[0] diff --git a/test/pathod/language/test_generators.py b/test/pathod/language/test_generators.py index b3ce0335..dc15aaa1 100644 --- a/test/pathod/language/test_generators.py +++ b/test/pathod/language/test_generators.py @@ -1,7 +1,4 @@ -import os -  from pathod.language import generators -from mitmproxy.test import tutils  def test_randomgenerator(): @@ -15,23 +12,20 @@ def test_randomgenerator():      assert len(g[1000:1001]) == 0 -def test_filegenerator(): -    with tutils.tmpdir() as t: -        path = os.path.join(t, "foo") -        f = open(path, "wb") -        f.write(b"x" * 10000) -        f.close() -        g = generators.FileGenerator(path) -        assert len(g) == 10000 -        assert g[0] == b"x" -        assert g[-1] == b"x" -        assert g[0:5] == b"xxxxx" -        assert len(g[1:10]) == 9 -        assert len(g[10000:10001]) == 0 -        assert repr(g) -        # remove all references to FileGenerator instance to close the file -        # handle. -        del g +def test_filegenerator(tmpdir): +    f = tmpdir.join("foo") +    f.write(b"x" * 10000) +    g = generators.FileGenerator(str(f)) +    assert len(g) == 10000 +    assert g[0] == b"x" +    assert g[-1] == b"x" +    assert g[0:5] == b"xxxxx" +    assert len(g[1:10]) == 9 +    assert len(g[10000:10001]) == 0 +    assert repr(g) +    # remove all references to FileGenerator instance to close the file +    # handle. +    del g  def test_transform_generator():  | 
