diff options
Diffstat (limited to 'test')
26 files changed, 429 insertions, 408 deletions
| diff --git a/test/mitmproxy/addons/test_clientplayback.py b/test/mitmproxy/addons/test_clientplayback.py index 6b8b7c90..c22b3589 100644 --- a/test/mitmproxy/addons/test_clientplayback.py +++ b/test/mitmproxy/addons/test_clientplayback.py @@ -1,9 +1,7 @@ -import os  import pytest  from unittest import mock  from mitmproxy.test import tflow -from mitmproxy.test import tutils  from mitmproxy import io  from mitmproxy import exceptions @@ -49,14 +47,13 @@ class TestClientPlayback:                  cp.tick()                  assert cp.current_thread is None -    def test_configure(self): +    def test_configure(self, tmpdir):          cp = clientplayback.ClientPlayback()          with taddons.context() as tctx: -            with tutils.tmpdir() as td: -                path = os.path.join(td, "flows") -                tdump(path, [tflow.tflow()]) -                tctx.configure(cp, client_replay=[path]) -                tctx.configure(cp, client_replay=[]) -                tctx.configure(cp) -                with pytest.raises(exceptions.OptionsError): -                    tctx.configure(cp, client_replay=["nonexistent"]) +            path = str(tmpdir.join("flows")) +            tdump(path, [tflow.tflow()]) +            tctx.configure(cp, client_replay=[path]) +            tctx.configure(cp, client_replay=[]) +            tctx.configure(cp) +            with pytest.raises(exceptions.OptionsError): +                tctx.configure(cp, client_replay=["nonexistent"]) diff --git a/test/mitmproxy/addons/test_replace.py b/test/mitmproxy/addons/test_replace.py index 8c280c51..7d590b35 100644 --- a/test/mitmproxy/addons/test_replace.py +++ b/test/mitmproxy/addons/test_replace.py @@ -1,11 +1,8 @@ -import os.path  import pytest -from mitmproxy.test import tflow -from mitmproxy.test import tutils -from .. import tservers  from mitmproxy.addons import replace  from mitmproxy.test import taddons +from mitmproxy.test import tflow  class TestReplace: @@ -34,7 +31,7 @@ class TestReplace:          with taddons.context() as tctx:              tctx.configure(                  r, -                replacements = [ +                replacements=[                      "/~q/foo/bar",                      "/~s/foo/bar",                  ] @@ -49,55 +46,57 @@ class TestReplace:              r.response(f)              assert f.response.content == b"bar" - -class TestUpstreamProxy(tservers.HTTPUpstreamProxyTest): -    ssl = False -      def test_order(self): -        sa = replace.Replace() -        self.proxy.tmaster.addons.add(sa) - -        self.proxy.tmaster.options.replacements = [ -            "/~q/foo/bar", -            "/~q/bar/baz", -            "/~q/foo/oh noes!", -            "/~s/baz/ORLY" -        ] -        p = self.pathoc() -        with p.connect(): -            req = p.request("get:'%s/p/418:b\"foo\"'" % self.server.urlbase) -        assert req.content == b"ORLY" -        assert req.status_code == 418 +        r = replace.Replace() +        with taddons.context() as tctx: +            tctx.configure( +                r, +                replacements=[ +                    "/foo/bar", +                    "/bar/baz", +                    "/foo/oh noes!", +                    "/bar/oh noes!", +                ] +            ) +            f = tflow.tflow() +            f.request.content = b"foo" +            r.request(f) +            assert f.request.content == b"baz"  class TestReplaceFile: -    def test_simple(self): -        r = replace.ReplaceFile() -        with tutils.tmpdir() as td: -            rp = os.path.join(td, "replacement") -            with open(rp, "w") as f: -                f.write("bar") -            with taddons.context() as tctx: +    def test_simple(self, tmpdir): +        r = replace.Replace() +        with taddons.context() as tctx: +            tmpfile = tmpdir.join("replacement") +            tmpfile.write("bar") +            tctx.configure( +                r, +                replacements=["/~q/foo/@" + str(tmpfile)] +            ) +            f = tflow.tflow() +            f.request.content = b"foo" +            r.request(f) +            assert f.request.content == b"bar" + +    def test_nonexistent(self, tmpdir): +        r = replace.Replace() +        with taddons.context() as tctx: +            with pytest.raises(Exception, match="Invalid file path"):                  tctx.configure(                      r, -                    replacement_files = [ -                        "/~q/foo/" + rp, -                        "/~s/foo/" + rp, -                        "/~b nonexistent/nonexistent/nonexistent", -                    ] +                    replacements=["/~q/foo/@nonexistent"]                  ) -                f = tflow.tflow() -                f.request.content = b"foo" -                r.request(f) -                assert f.request.content == b"bar" - -                f = tflow.tflow(resp=True) -                f.response.content = b"foo" -                r.response(f) -                assert f.response.content == b"bar" -                f = tflow.tflow() -                f.request.content = b"nonexistent" -                assert not tctx.master.event_log -                r.request(f) -                assert tctx.master.event_log +            tmpfile = tmpdir.join("replacement") +            tmpfile.write("bar") +            tctx.configure( +                r, +                replacements=["/~q/foo/@" + str(tmpfile)] +            ) +            tmpfile.remove() +            f = tflow.tflow() +            f.request.content = b"foo" +            assert not tctx.master.event_log +            r.request(f) +            assert tctx.master.event_log diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py index 5f196ebf..4c1b2e43 100644 --- a/test/mitmproxy/addons/test_script.py +++ b/test/mitmproxy/addons/test_script.py @@ -68,13 +68,12 @@ class TestParseCommand:          with pytest.raises(ValueError):              script.parse_command("  ") -    def test_no_script_file(self): +    def test_no_script_file(self, tmpdir):          with pytest.raises(Exception, match="not found"):              script.parse_command("notfound") -        with tutils.tmpdir() as dir: -            with pytest.raises(Exception, match="Not a file"): -                script.parse_command(dir) +        with pytest.raises(Exception, match="Not a file"): +            script.parse_command(str(tmpdir))      def test_parse_args(self):          with utils.chdir(tutils.test_data.dirname): @@ -117,9 +116,7 @@ class TestScript:                  )              )              sc.load_script() -            assert sc.ns.call_log == [ -                ("solo", "start", (), {}), -            ] +            assert sc.ns.call_log[0][0:2] == ("solo", "start")              sc.ns.call_log = []              f = tflow.tflow(resp=True) @@ -128,28 +125,26 @@ class TestScript:              recf = sc.ns.call_log[0]              assert recf[1] == "request" -    def test_reload(self): +    def test_reload(self, tmpdir):          with taddons.context() as tctx: -            with tutils.tmpdir(): -                with open("foo.py", "w"): -                    pass -                sc = script.Script("foo.py") -                tctx.configure(sc) -                for _ in range(100): -                    with open("foo.py", "a") as f: -                        f.write(".") -                    sc.tick() -                    time.sleep(0.1) -                    if tctx.master.event_log: -                        return -                raise AssertionError("Change event not detected.") +            f = tmpdir.join("foo.py") +            f.ensure(file=True) +            sc = script.Script(str(f)) +            tctx.configure(sc) +            for _ in range(100): +                f.write(".") +                sc.tick() +                time.sleep(0.1) +                if tctx.master.event_log: +                    return +            raise AssertionError("Change event not detected.")      def test_exception(self):          with taddons.context() as tctx:              sc = script.Script(                  tutils.test_data.path("mitmproxy/data/addonscripts/error.py")              ) -            sc.start() +            sc.start(tctx.options)              f = tflow.tflow(resp=True)              sc.request(f)              assert tctx.master.event_log[0][0] == "error" @@ -165,7 +160,7 @@ class TestScript:                      "mitmproxy/data/addonscripts/addon.py"                  )              ) -            sc.start() +            sc.start(tctx.options)              tctx.configure(sc)              assert sc.ns.event_log == [                  'scriptstart', 'addonstart', 'addonconfigure' @@ -228,24 +223,31 @@ class TestScriptLoader:          assert len(m.addons) == 1      def test_dupes(self): -        o = options.Options(scripts=["one", "one"]) -        m = master.Master(o, proxy.DummyServer())          sc = script.ScriptLoader() -        with pytest.raises(exceptions.OptionsError): -            m.addons.add(o, sc) +        with taddons.context() as tctx: +            tctx.master.addons.add(sc) +            with pytest.raises(exceptions.OptionsError): +                tctx.configure( +                    sc, +                    scripts = ["one", "one"] +                )      def test_nonexistent(self): -        o = options.Options(scripts=["nonexistent"]) -        m = master.Master(o, proxy.DummyServer())          sc = script.ScriptLoader() -        with pytest.raises(exceptions.OptionsError): -            m.addons.add(o, sc) +        with taddons.context() as tctx: +            tctx.master.addons.add(sc) +            with pytest.raises(exceptions.OptionsError): +                tctx.configure( +                    sc, +                    scripts = ["nonexistent"] +                )      def test_order(self):          rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder.py")          sc = script.ScriptLoader()          with taddons.context() as tctx:              tctx.master.addons.add(sc) +            sc.running()              tctx.configure(                  sc,                  scripts = [ @@ -256,9 +258,17 @@ class TestScriptLoader:              )              debug = [(i[0], i[1]) for i in tctx.master.event_log if i[0] == "debug"]              assert debug == [ -                ('debug', 'a start'), ('debug', 'a configure'), -                ('debug', 'b start'), ('debug', 'b configure'), -                ('debug', 'c start'), ('debug', 'c configure') +                ('debug', 'a start'), +                ('debug', 'a configure'), +                ('debug', 'a running'), + +                ('debug', 'b start'), +                ('debug', 'b configure'), +                ('debug', 'b running'), + +                ('debug', 'c start'), +                ('debug', 'c configure'), +                ('debug', 'c running'),              ]              tctx.master.event_log = []              tctx.configure( @@ -287,4 +297,5 @@ class TestScriptLoader:                  ('debug', 'b done'),                  ('debug', 'x start'),                  ('debug', 'x configure'), +                ('debug', 'x running'),              ] diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py index e2afa516..54e4d281 100644 --- a/test/mitmproxy/addons/test_serverplayback.py +++ b/test/mitmproxy/addons/test_serverplayback.py @@ -1,10 +1,8 @@ -import os  import urllib  import pytest -from mitmproxy.test import tutils -from mitmproxy.test import tflow  from mitmproxy.test import taddons +from mitmproxy.test import tflow  import mitmproxy.test.tutils  from mitmproxy.addons import serverplayback @@ -19,15 +17,14 @@ def tdump(path, flows):          w.add(i) -def test_config(): +def test_config(tmpdir):      s = serverplayback.ServerPlayback() -    with tutils.tmpdir() as p: -        with taddons.context() as tctx: -            fpath = os.path.join(p, "flows") -            tdump(fpath, [tflow.tflow(resp=True)]) -            tctx.configure(s, server_replay=[fpath]) -            with pytest.raises(exceptions.OptionsError): -                tctx.configure(s, server_replay=[p]) +    with taddons.context() as tctx: +        fpath = str(tmpdir.join("flows")) +        tdump(fpath, [tflow.tflow(resp=True)]) +        tctx.configure(s, server_replay=[fpath]) +        with pytest.raises(exceptions.OptionsError): +            tctx.configure(s, server_replay=[str(tmpdir)])  def test_tick(): diff --git a/test/mitmproxy/addons/test_streamfile.py b/test/mitmproxy/addons/test_streamfile.py index 4105c1fc..3f78521c 100644 --- a/test/mitmproxy/addons/test_streamfile.py +++ b/test/mitmproxy/addons/test_streamfile.py @@ -1,9 +1,7 @@ -import os.path  import pytest -from mitmproxy.test import tflow -from mitmproxy.test import tutils  from mitmproxy.test import taddons +from mitmproxy.test import tflow  from mitmproxy import io  from mitmproxy import exceptions @@ -11,19 +9,17 @@ from mitmproxy import options  from mitmproxy.addons import streamfile -def test_configure(): +def test_configure(tmpdir):      sa = streamfile.StreamFile()      with taddons.context(options=options.Options()) as tctx: -        with tutils.tmpdir() as tdir: -            p = os.path.join(tdir, "foo") -            with pytest.raises(exceptions.OptionsError): -                tctx.configure(sa, streamfile=tdir) -            with pytest.raises(Exception, match="Invalid filter"): -                tctx.configure(sa, streamfile=p, filtstr="~~") -            tctx.configure(sa, filtstr="foo") -            assert sa.filt -            tctx.configure(sa, filtstr=None) -            assert not sa.filt +        with pytest.raises(exceptions.OptionsError): +            tctx.configure(sa, streamfile=str(tmpdir)) +        with pytest.raises(Exception, match="Invalid filter"): +            tctx.configure(sa, streamfile=str(tmpdir.join("foo")), filtstr="~~") +        tctx.configure(sa, filtstr="foo") +        assert sa.filt +        tctx.configure(sa, filtstr=None) +        assert not sa.filt  def rd(p): @@ -31,36 +27,34 @@ def rd(p):      return list(x.stream()) -def test_tcp(): +def test_tcp(tmpdir):      sa = streamfile.StreamFile()      with taddons.context() as tctx: -        with tutils.tmpdir() as tdir: -            p = os.path.join(tdir, "foo") -            tctx.configure(sa, streamfile=p) +        p = str(tmpdir.join("foo")) +        tctx.configure(sa, streamfile=p) -            tt = tflow.ttcpflow() -            sa.tcp_start(tt) -            sa.tcp_end(tt) -            tctx.configure(sa, streamfile=None) -            assert rd(p) +        tt = tflow.ttcpflow() +        sa.tcp_start(tt) +        sa.tcp_end(tt) +        tctx.configure(sa, streamfile=None) +        assert rd(p) -def test_simple(): +def test_simple(tmpdir):      sa = streamfile.StreamFile()      with taddons.context() as tctx: -        with tutils.tmpdir() as tdir: -            p = os.path.join(tdir, "foo") +        p = str(tmpdir.join("foo")) -            tctx.configure(sa, streamfile=p) +        tctx.configure(sa, streamfile=p) -            f = tflow.tflow(resp=True) -            sa.request(f) -            sa.response(f) -            tctx.configure(sa, streamfile=None) -            assert rd(p)[0].response +        f = tflow.tflow(resp=True) +        sa.request(f) +        sa.response(f) +        tctx.configure(sa, streamfile=None) +        assert rd(p)[0].response -            tctx.configure(sa, streamfile="+" + p) -            f = tflow.tflow() -            sa.request(f) -            tctx.configure(sa, streamfile=None) -            assert not rd(p)[1].response +        tctx.configure(sa, streamfile="+" + p) +        f = tflow.tflow() +        sa.request(f) +        tctx.configure(sa, streamfile=None) +        assert not rd(p)[1].response diff --git a/test/mitmproxy/addons/test_termstatus.py b/test/mitmproxy/addons/test_termstatus.py new file mode 100644 index 00000000..01c14814 --- /dev/null +++ b/test/mitmproxy/addons/test_termstatus.py @@ -0,0 +1,12 @@ +from mitmproxy.addons import termstatus +from mitmproxy.test import taddons + + +def test_configure(): +    ts = termstatus.TermStatus() +    with taddons.context() as ctx: +        ts.running() +        assert not ctx.master.event_log +        ctx.configure(ts, server=True) +        ts.running() +        assert ctx.master.event_log diff --git a/test/mitmproxy/data/addonscripts/addon.py b/test/mitmproxy/data/addonscripts/addon.py index 84173cb6..f34f41cb 100644 --- a/test/mitmproxy/data/addonscripts/addon.py +++ b/test/mitmproxy/data/addonscripts/addon.py @@ -6,7 +6,7 @@ class Addon:      def event_log(self):          return event_log -    def start(self): +    def start(self, opts):          event_log.append("addonstart")      def configure(self, options, updated): @@ -17,6 +17,6 @@ def configure(options, updated):      event_log.append("addonconfigure") -def start(): +def start(opts):      event_log.append("scriptstart")      return Addon() diff --git a/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py b/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py index bd047c99..10ba24cd 100644 --- a/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py +++ b/test/mitmproxy/data/addonscripts/concurrent_decorator_class.py @@ -9,5 +9,5 @@ class ConcurrentClass:          time.sleep(0.1) -def start(): +def start(opts):      return ConcurrentClass() diff --git a/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py b/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py index 756869c8..7bc28182 100644 --- a/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py +++ b/test/mitmproxy/data/addonscripts/concurrent_decorator_err.py @@ -2,5 +2,5 @@ from mitmproxy.script import concurrent  @concurrent -def start(): +def start(opts):      pass diff --git a/test/mitmproxy/data/addonscripts/recorder.py b/test/mitmproxy/data/addonscripts/recorder.py index 6b9b6ea8..aff524a8 100644 --- a/test/mitmproxy/data/addonscripts/recorder.py +++ b/test/mitmproxy/data/addonscripts/recorder.py @@ -22,5 +22,5 @@ class CallLogger:          raise AttributeError -def start(): +def start(opts):      return CallLogger(*sys.argv[1:]) diff --git a/test/mitmproxy/net/http/test_message.py b/test/mitmproxy/net/http/test_message.py index 034bd600..b75bc7c2 100644 --- a/test/mitmproxy/net/http/test_message.py +++ b/test/mitmproxy/net/http/test_message.py @@ -38,14 +38,12 @@ def _test_decoded_attr(message, attr):  class TestMessageData: -    def test_eq_ne(self): +    def test_eq(self):          data = tutils.tresp(timestamp_start=42, timestamp_end=42).data          same = tutils.tresp(timestamp_start=42, timestamp_end=42).data          assert data == same -        assert not data != same          other = tutils.tresp(content=b"foo").data -        assert not data == other          assert data != other          assert data != 0 @@ -61,10 +59,8 @@ class TestMessage:          resp = tutils.tresp(timestamp_start=42, timestamp_end=42)          same = tutils.tresp(timestamp_start=42, timestamp_end=42)          assert resp == same -        assert not resp != same          other = tutils.tresp(timestamp_start=0, timestamp_end=0) -        assert not resp == other          assert resp != other          assert resp != 0 diff --git a/test/mitmproxy/net/test_tcp.py b/test/mitmproxy/net/test_tcp.py index cf010f6e..8b26784a 100644 --- a/test/mitmproxy/net/test_tcp.py +++ b/test/mitmproxy/net/test_tcp.py @@ -11,8 +11,8 @@ from OpenSSL import SSL  from mitmproxy import certs  from mitmproxy.net import tcp -from mitmproxy.test import tutils  from mitmproxy import exceptions +from mitmproxy.test import tutils  from . import tservers  from ...conftest import requires_alpn @@ -783,25 +783,24 @@ class TestSSLKeyLogger(tservers.ServerTestBase):          cipher_list="AES256-SHA"      ) -    def test_log(self): +    def test_log(self, tmpdir):          testval = b"echo!\n"          _logfun = tcp.log_ssl_key -        with tutils.tmpdir() as d: -            logfile = os.path.join(d, "foo", "bar", "logfile") -            tcp.log_ssl_key = tcp.SSLKeyLogger(logfile) +        logfile = str(tmpdir.join("foo", "bar", "logfile")) +        tcp.log_ssl_key = tcp.SSLKeyLogger(logfile) -            c = tcp.TCPClient(("127.0.0.1", self.port)) -            with c.connect(): -                c.convert_to_ssl() -                c.wfile.write(testval) -                c.wfile.flush() -                assert c.rfile.readline() == testval -                c.finish() - -                tcp.log_ssl_key.close() -                with open(logfile, "rb") as f: -                    assert f.read().count(b"CLIENT_RANDOM") == 2 +        c = tcp.TCPClient(("127.0.0.1", self.port)) +        with c.connect(): +            c.convert_to_ssl() +            c.wfile.write(testval) +            c.wfile.flush() +            assert c.rfile.readline() == testval +            c.finish() + +            tcp.log_ssl_key.close() +            with open(logfile, "rb") as f: +                assert f.read().count(b"CLIENT_RANDOM") == 2          tcp.log_ssl_key = _logfun diff --git a/test/mitmproxy/proxy/test_server.py b/test/mitmproxy/proxy/test_server.py index aa45761a..16efe415 100644 --- a/test/mitmproxy/proxy/test_server.py +++ b/test/mitmproxy/proxy/test_server.py @@ -302,6 +302,9 @@ class TestHTTP(tservers.HTTPProxyTest, CommonMixin):  class TestHTTPAuth(tservers.HTTPProxyTest):      def test_auth(self):          self.master.addons.add(proxyauth.ProxyAuth()) +        self.master.addons.configure_all( +            self.master.options, self.master.options.keys() +        )          self.master.options.proxyauth = "test:test"          assert self.pathod("202").status_code == 407          p = self.pathoc() diff --git a/test/mitmproxy/script/test_concurrent.py b/test/mitmproxy/script/test_concurrent.py index e81c023d..a9b6f0c4 100644 --- a/test/mitmproxy/script/test_concurrent.py +++ b/test/mitmproxy/script/test_concurrent.py @@ -24,7 +24,7 @@ class TestConcurrent(tservers.MasterTest):                      "mitmproxy/data/addonscripts/concurrent_decorator.py"                  )              ) -            sc.start() +            sc.start(tctx.options)              f1, f2 = tflow.tflow(), tflow.tflow()              tctx.cycle(sc, f1) @@ -42,7 +42,7 @@ class TestConcurrent(tservers.MasterTest):                      "mitmproxy/data/addonscripts/concurrent_decorator_err.py"                  )              ) -            sc.start() +            sc.start(tctx.options)              assert "decorator not supported" in tctx.master.event_log[0][1]      def test_concurrent_class(self): @@ -52,7 +52,7 @@ class TestConcurrent(tservers.MasterTest):                          "mitmproxy/data/addonscripts/concurrent_decorator_class.py"                      )                  ) -                sc.start() +                sc.start(tctx.options)                  f1, f2 = tflow.tflow(), tflow.tflow()                  tctx.cycle(sc, f1) diff --git a/test/mitmproxy/test_addonmanager.py b/test/mitmproxy/test_addonmanager.py index 17402e26..3e5f71c6 100644 --- a/test/mitmproxy/test_addonmanager.py +++ b/test/mitmproxy/test_addonmanager.py @@ -10,12 +10,12 @@ from mitmproxy import proxy  class TAddon:      def __init__(self, name):          self.name = name -        self.noop_member = True +        self.tick = True      def __repr__(self):          return "Addon(%s)" % self.name -    def noop(self): +    def done(self):          pass @@ -30,6 +30,6 @@ def test_simple():      assert not a.chain      a.add(TAddon("one")) -    a("noop") +    a("done")      with pytest.raises(exceptions.AddonError): -        a("noop_member") +        a("tick") diff --git a/test/mitmproxy/test_certs.py b/test/mitmproxy/test_certs.py index 9bd3ad25..88c49561 100644 --- a/test/mitmproxy/test_certs.py +++ b/test/mitmproxy/test_certs.py @@ -34,118 +34,106 @@ from mitmproxy.test import tutils  class TestCertStore: -    def test_create_explicit(self): -        with tutils.tmpdir() as d: -            ca = certs.CertStore.from_store(d, "test") -            assert ca.get_cert(b"foo", []) - -            ca2 = certs.CertStore.from_store(d, "test") -            assert ca2.get_cert(b"foo", []) - -            assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() - -    def test_create_no_common_name(self): -        with tutils.tmpdir() as d: -            ca = certs.CertStore.from_store(d, "test") -            assert ca.get_cert(None, [])[0].cn is None - -    def test_create_tmp(self): -        with tutils.tmpdir() as d: -            ca = certs.CertStore.from_store(d, "test") -            assert ca.get_cert(b"foo.com", []) -            assert ca.get_cert(b"foo.com", []) -            assert ca.get_cert(b"*.foo.com", []) - -            r = ca.get_cert(b"*.foo.com", []) -            assert r[1] == ca.default_privatekey - -    def test_sans(self): -        with tutils.tmpdir() as d: -            ca = certs.CertStore.from_store(d, "test") -            c1 = ca.get_cert(b"foo.com", [b"*.bar.com"]) -            ca.get_cert(b"foo.bar.com", []) -            # assert c1 == c2 -            c3 = ca.get_cert(b"bar.com", []) -            assert not c1 == c3 - -    def test_sans_change(self): -        with tutils.tmpdir() as d: -            ca = certs.CertStore.from_store(d, "test") -            ca.get_cert(b"foo.com", [b"*.bar.com"]) -            cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"]) -            assert b"*.baz.com" in cert.altnames - -    def test_expire(self): -        with tutils.tmpdir() as d: -            ca = certs.CertStore.from_store(d, "test") -            ca.STORE_CAP = 3 -            ca.get_cert(b"one.com", []) -            ca.get_cert(b"two.com", []) -            ca.get_cert(b"three.com", []) - -            assert (b"one.com", ()) in ca.certs -            assert (b"two.com", ()) in ca.certs -            assert (b"three.com", ()) in ca.certs - -            ca.get_cert(b"one.com", []) - -            assert (b"one.com", ()) in ca.certs -            assert (b"two.com", ()) in ca.certs -            assert (b"three.com", ()) in ca.certs - -            ca.get_cert(b"four.com", []) - -            assert (b"one.com", ()) not in ca.certs -            assert (b"two.com", ()) in ca.certs -            assert (b"three.com", ()) in ca.certs -            assert (b"four.com", ()) in ca.certs - -    def test_overrides(self): -        with tutils.tmpdir() as d: -            ca1 = certs.CertStore.from_store(os.path.join(d, "ca1"), "test") -            ca2 = certs.CertStore.from_store(os.path.join(d, "ca2"), "test") -            assert not ca1.default_ca.get_serial_number( -            ) == ca2.default_ca.get_serial_number() - -            dc = ca2.get_cert(b"foo.com", [b"sans.example.com"]) -            dcp = os.path.join(d, "dc") -            f = open(dcp, "wb") -            f.write(dc[0].to_pem()) -            f.close() -            ca1.add_cert_file(b"foo.com", dcp) - -            ret = ca1.get_cert(b"foo.com", []) -            assert ret[0].serial == dc[0].serial - -    def test_create_dhparams(self): -        with tutils.tmpdir() as d: -            filename = os.path.join(d, "dhparam.pem") -            certs.CertStore.load_dhparam(filename) -            assert os.path.exists(filename) +    def test_create_explicit(self, tmpdir): +        ca = certs.CertStore.from_store(str(tmpdir), "test") +        assert ca.get_cert(b"foo", []) + +        ca2 = certs.CertStore.from_store(str(tmpdir), "test") +        assert ca2.get_cert(b"foo", []) + +        assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() + +    def test_create_no_common_name(self, tmpdir): +        ca = certs.CertStore.from_store(str(tmpdir), "test") +        assert ca.get_cert(None, [])[0].cn is None + +    def test_create_tmp(self, tmpdir): +        ca = certs.CertStore.from_store(str(tmpdir), "test") +        assert ca.get_cert(b"foo.com", []) +        assert ca.get_cert(b"foo.com", []) +        assert ca.get_cert(b"*.foo.com", []) + +        r = ca.get_cert(b"*.foo.com", []) +        assert r[1] == ca.default_privatekey + +    def test_sans(self, tmpdir): +        ca = certs.CertStore.from_store(str(tmpdir), "test") +        c1 = ca.get_cert(b"foo.com", [b"*.bar.com"]) +        ca.get_cert(b"foo.bar.com", []) +        # assert c1 == c2 +        c3 = ca.get_cert(b"bar.com", []) +        assert not c1 == c3 + +    def test_sans_change(self, tmpdir): +        ca = certs.CertStore.from_store(str(tmpdir), "test") +        ca.get_cert(b"foo.com", [b"*.bar.com"]) +        cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"]) +        assert b"*.baz.com" in cert.altnames + +    def test_expire(self, tmpdir): +        ca = certs.CertStore.from_store(str(tmpdir), "test") +        ca.STORE_CAP = 3 +        ca.get_cert(b"one.com", []) +        ca.get_cert(b"two.com", []) +        ca.get_cert(b"three.com", []) + +        assert (b"one.com", ()) in ca.certs +        assert (b"two.com", ()) in ca.certs +        assert (b"three.com", ()) in ca.certs + +        ca.get_cert(b"one.com", []) + +        assert (b"one.com", ()) in ca.certs +        assert (b"two.com", ()) in ca.certs +        assert (b"three.com", ()) in ca.certs + +        ca.get_cert(b"four.com", []) + +        assert (b"one.com", ()) not in ca.certs +        assert (b"two.com", ()) in ca.certs +        assert (b"three.com", ()) in ca.certs +        assert (b"four.com", ()) in ca.certs + +    def test_overrides(self, tmpdir): +        ca1 = certs.CertStore.from_store(str(tmpdir.join("ca1")), "test") +        ca2 = certs.CertStore.from_store(str(tmpdir.join("ca2")), "test") +        assert not ca1.default_ca.get_serial_number() == ca2.default_ca.get_serial_number() + +        dc = ca2.get_cert(b"foo.com", [b"sans.example.com"]) +        dcp = tmpdir.join("dc") +        dcp.write(dc[0].to_pem()) +        ca1.add_cert_file(b"foo.com", str(dcp)) + +        ret = ca1.get_cert(b"foo.com", []) +        assert ret[0].serial == dc[0].serial + +    def test_create_dhparams(self, tmpdir): +        filename = str(tmpdir.join("dhparam.pem")) +        certs.CertStore.load_dhparam(filename) +        assert os.path.exists(filename)  class TestDummyCert: -    def test_with_ca(self): -        with tutils.tmpdir() as d: -            ca = certs.CertStore.from_store(d, "test") -            r = certs.dummy_cert( -                ca.default_privatekey, -                ca.default_ca, -                b"foo.com", -                [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"] -            ) -            assert r.cn == b"foo.com" -            assert r.altnames == [b'one.com', b'two.com', b'*.three.com'] - -            r = certs.dummy_cert( -                ca.default_privatekey, -                ca.default_ca, -                None, -                [] -            ) -            assert r.cn is None -            assert r.altnames == [] +    def test_with_ca(self, tmpdir): +        ca = certs.CertStore.from_store(str(tmpdir), "test") +        r = certs.dummy_cert( +            ca.default_privatekey, +            ca.default_ca, +            b"foo.com", +            [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"] +        ) +        assert r.cn == b"foo.com" +        assert r.altnames == [b'one.com', b'two.com', b'*.three.com'] + +        r = certs.dummy_cert( +            ca.default_privatekey, +            ca.default_ca, +            None, +            [] +        ) +        assert r.cn is None +        assert r.altnames == []  class TestSSLCert: @@ -172,7 +160,6 @@ class TestSSLCert:          assert c2.to_pem()          assert c2.has_expired is not None -        assert not c1 == c2          assert c1 != c2      def test_err_broken_sans(self): diff --git a/test/mitmproxy/test_connections.py b/test/mitmproxy/test_connections.py index 0083f57c..67a6552f 100644 --- a/test/mitmproxy/test_connections.py +++ b/test/mitmproxy/test_connections.py @@ -66,8 +66,18 @@ class TestClientConnection:          assert c.timestamp_start == 42          c3 = c.copy() +        assert c3.get_state() != c.get_state() +        c.id = c3.id = "foo"          assert c3.get_state() == c.get_state() +    def test_eq(self): +        c = tflow.tclient_conn() +        c2 = c.copy() +        assert c == c +        assert c != c2 +        assert c != 42 +        assert hash(c) != hash(c2) +  class TestServerConnection: @@ -147,6 +157,21 @@ class TestServerConnection:          with pytest.raises(ValueError, matches='sni must be str, not '):              c.establish_ssl(None, b'foobar') +    def test_state(self): +        c = tflow.tserver_conn() +        c2 = c.copy() +        assert c2.get_state() != c.get_state() +        c.id = c2.id = "foo" +        assert c2.get_state() == c.get_state() + +    def test_eq(self): +        c = tflow.tserver_conn() +        c2 = c.copy() +        assert c == c +        assert c != c2 +        assert c != 42 +        assert hash(c) != hash(c2) +  class TestClientConnectionTLS: diff --git a/test/mitmproxy/test_examples.py b/test/mitmproxy/test_examples.py index 668d0d4a..f20e0c8c 100644 --- a/test/mitmproxy/test_examples.py +++ b/test/mitmproxy/test_examples.py @@ -1,5 +1,4 @@  import json -import os  import shlex  import pytest @@ -142,30 +141,26 @@ class TestHARDump:          with pytest.raises(ScriptError):              tscript("complex/har_dump.py") -    def test_simple(self): -        with tutils.tmpdir() as tdir: -            path = os.path.join(tdir, "somefile") +    def test_simple(self, tmpdir): +        path = str(tmpdir.join("somefile")) -            m, sc = tscript("complex/har_dump.py", shlex.quote(path)) -            m.addons.invoke(m, "response", self.flow()) -            m.addons.remove(sc) - -            with open(path, "r") as inp: -                har = json.load(inp) +        m, sc = tscript("complex/har_dump.py", shlex.quote(path)) +        m.addons.invoke(m, "response", self.flow()) +        m.addons.remove(sc) +        with open(path, "r") as inp: +            har = json.load(inp)          assert len(har["log"]["entries"]) == 1 -    def test_base64(self): -        with tutils.tmpdir() as tdir: -            path = os.path.join(tdir, "somefile") - -            m, sc = tscript("complex/har_dump.py", shlex.quote(path)) -            m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10)) -            m.addons.remove(sc) +    def test_base64(self, tmpdir): +        path = str(tmpdir.join("somefile")) -            with open(path, "r") as inp: -                har = json.load(inp) +        m, sc = tscript("complex/har_dump.py", shlex.quote(path)) +        m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10)) +        m.addons.remove(sc) +        with open(path, "r") as inp: +            har = json.load(inp)          assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64"      def test_format_cookies(self): @@ -187,7 +182,7 @@ class TestHARDump:          f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0]          assert f['expires'] -    def test_binary(self): +    def test_binary(self, tmpdir):          f = self.flow()          f.request.method = "POST" @@ -196,14 +191,12 @@ class TestHARDump:          f.response.headers["random-junk"] = bytes(range(256))          f.response.content = bytes(range(256)) -        with tutils.tmpdir() as tdir: -            path = os.path.join(tdir, "somefile") - -            m, sc = tscript("complex/har_dump.py", shlex.quote(path)) -            m.addons.invoke(m, "response", f) -            m.addons.remove(sc) +        path = str(tmpdir.join("somefile")) -            with open(path, "r") as inp: -                har = json.load(inp) +        m, sc = tscript("complex/har_dump.py", shlex.quote(path)) +        m.addons.invoke(m, "response", f) +        m.addons.remove(sc) +        with open(path, "r") as inp: +            har = json.load(inp)          assert len(har["log"]["entries"]) == 1 diff --git a/test/mitmproxy/test_optmanager.py b/test/mitmproxy/test_optmanager.py index db33cddd..df392829 100644 --- a/test/mitmproxy/test_optmanager.py +++ b/test/mitmproxy/test_optmanager.py @@ -1,5 +1,4 @@  import copy -import os  import pytest  import typing  import argparse @@ -7,7 +6,6 @@ import argparse  from mitmproxy import options  from mitmproxy import optmanager  from mitmproxy import exceptions -from mitmproxy.test import tutils  class TO(optmanager.OptManager): @@ -85,10 +83,11 @@ def test_options():      with pytest.raises(TypeError):          TO(nonexistent = "value") -    with pytest.raises(Exception, match="No such option"): +    with pytest.raises(Exception, match="Unknown options"):          o.nonexistent = "value" -    with pytest.raises(Exception, match="No such option"): +    with pytest.raises(Exception, match="Unknown options"):          o.update(nonexistent = "value") +    assert o.update_known(nonexistent = "value") == {"nonexistent": "value"}      rec = [] @@ -201,62 +200,63 @@ def test_simple():  def test_serialize():      o = TD2()      o.three = "set" -    assert "dfour" in o.serialize(None, defaults=True) +    assert "dfour" in optmanager.serialize(o, None, defaults=True) -    data = o.serialize(None) +    data = optmanager.serialize(o, None)      assert "dfour" not in data      o2 = TD2() -    o2.load(data) +    optmanager.load(o2, data)      assert o2 == o      t = """          unknown: foo      """ -    data = o.serialize(t) +    data = optmanager.serialize(o, t)      o2 = TD2() -    o2.load(data) +    optmanager.load(o2, data)      assert o2 == o      t = "invalid: foo\ninvalid"      with pytest.raises(Exception, match="Config error"): -        o2.load(t) +        optmanager.load(o2, t)      t = "invalid"      with pytest.raises(Exception, match="Config error"): -        o2.load(t) +        optmanager.load(o2, t)      t = "" -    o2.load(t) - -    with pytest.raises(exceptions.OptionsError, matches='No such option: foobar'): -        o2.load("foobar: '123'") +    optmanager.load(o2, t) +    assert optmanager.load(o2, "foobar: '123'") == {"foobar": "123"}  def test_serialize_defaults():      o = options.Options() -    assert o.serialize(None, defaults=True) +    assert optmanager.serialize(o, None, defaults=True) -def test_saving(): +def test_saving(tmpdir):      o = TD2()      o.three = "set" -    with tutils.tmpdir() as tdir: -        dst = os.path.join(tdir, "conf") -        o.save(dst, defaults=True) +    dst = str(tmpdir.join("conf")) +    optmanager.save(o, dst, defaults=True) + +    o2 = TD2() +    optmanager.load_paths(o2, dst) +    o2.three = "foo" +    optmanager.save(o2, dst, defaults=True) -        o2 = TD2() -        o2.load_paths(dst) -        o2.three = "foo" -        o2.save(dst, defaults=True) +    optmanager.load_paths(o, dst) +    assert o.three == "foo" -        o.load_paths(dst) -        assert o.three == "foo" +    with open(dst, 'a') as f: +        f.write("foobar: '123'") +    assert optmanager.load_paths(o, dst) == {"foobar": "123"} -        with open(dst, 'a') as f: -            f.write("foobar: '123'") -        with pytest.raises(exceptions.OptionsError, matches=''): -            o.load_paths(dst) +    with open(dst, 'a') as f: +        f.write("'''") +    with pytest.raises(exceptions.OptionsError): +        optmanager.load_paths(o, dst)  def test_merge(): @@ -283,9 +283,9 @@ def test_option():      assert o2 != o -def test_dump(): +def test_dump_defaults():      o = options.Options() -    assert optmanager.dump(o) +    assert optmanager.dump_defaults(o)  class TTypes(optmanager.OptManager): @@ -349,3 +349,6 @@ def test_set():      assert opts.seqstr == ["foo", "bar"]      opts.set("seqstr")      assert opts.seqstr == [] + +    with pytest.raises(exceptions.OptionsError): +        opts.set("nonexistent=wobble") diff --git a/test/mitmproxy/tools/console/test_master.py b/test/mitmproxy/tools/console/test_master.py index 0bf3734b..6c716ad1 100644 --- a/test/mitmproxy/tools/console/test_master.py +++ b/test/mitmproxy/tools/console/test_master.py @@ -28,7 +28,9 @@ class TestMaster(tservers.MasterTest):          if "verbosity" not in opts:              opts["verbosity"] = 1          o = options.Options(**opts) -        return console.master.ConsoleMaster(o, proxy.DummyServer()) +        m = console.master.ConsoleMaster(o, proxy.DummyServer()) +        m.addons.configure_all(o, o.keys()) +        return m      def test_basic(self):          m = self.mkmaster() diff --git a/test/mitmproxy/tools/test_dump.py b/test/mitmproxy/tools/test_dump.py index 2542ec4b..a15bf583 100644 --- a/test/mitmproxy/tools/test_dump.py +++ b/test/mitmproxy/tools/test_dump.py @@ -1,4 +1,3 @@ -import os  import pytest  from unittest import mock @@ -9,7 +8,6 @@ from mitmproxy import controller  from mitmproxy import options  from mitmproxy.tools import dump -from mitmproxy.test import tutils  from .. import tservers @@ -19,18 +17,17 @@ class TestDumpMaster(tservers.MasterTest):          m = dump.DumpMaster(o, proxy.DummyServer(), with_termlog=False, with_dumper=False)          return m -    def test_read(self): -        with tutils.tmpdir() as t: -            p = os.path.join(t, "read") -            self.flowfile(p) -            self.dummy_cycle( -                self.mkmaster(None, rfile=p), -                1, b"", -            ) -            with pytest.raises(exceptions.OptionsError): -                self.mkmaster(None, rfile="/nonexistent") -            with pytest.raises(exceptions.OptionsError): -                self.mkmaster(None, rfile="test_dump.py") +    def test_read(self, tmpdir): +        p = str(tmpdir.join("read")) +        self.flowfile(p) +        self.dummy_cycle( +            self.mkmaster(None, rfile=p), +            1, b"", +        ) +        with pytest.raises(exceptions.OptionsError): +            self.mkmaster(None, rfile="/nonexistent") +        with pytest.raises(exceptions.OptionsError): +            self.mkmaster(None, rfile="test_dump.py")      def test_has_error(self):          m = self.mkmaster(None) diff --git a/test/mitmproxy/tservers.py b/test/mitmproxy/tservers.py index a8aaa358..c47411ee 100644 --- a/test/mitmproxy/tservers.py +++ b/test/mitmproxy/tservers.py @@ -79,6 +79,8 @@ class TestMaster(master.Master):          self.state = TestState()          self.addons.add(self.state)          self.addons.add(*addons) +        self.addons.configure_all(self.options, self.options.keys()) +        self.addons.invoke_all_with_context("running")      def clear_log(self):          self.tlog = [] diff --git a/test/mitmproxy/types/test_multidict.py b/test/mitmproxy/types/test_multidict.py index 9b13c5cd..c76cd753 100644 --- a/test/mitmproxy/types/test_multidict.py +++ b/test/mitmproxy/types/test_multidict.py @@ -93,11 +93,6 @@ class TestMultiDict:          md1.fields = md1.fields[1:] + md1.fields[:1]          assert not (md1 == md2) -    def test_ne(self): -        assert not TMultiDict() != TMultiDict() -        assert TMultiDict() != self._multi() -        assert TMultiDict() != 42 -      def test_hash(self):          """          If a class defines mutable objects and implements an __eq__() method, @@ -205,3 +200,12 @@ class TestMultiDictView:          tv["c"] = "b"          assert p.vals == (("a", "b"), ("c", "b"))          assert tv["a"] == "b" + +    def test_copy(self): +        p = TParent() +        tv = multidict.MultiDictView(p.getter, p.setter) +        c = tv.copy() +        assert isinstance(c, multidict.MultiDict) +        assert tv.items() == c.items() +        c["foo"] = "bar" +        assert tv.items() != c.items() diff --git a/test/mitmproxy/types/test_serializable.py b/test/mitmproxy/types/test_serializable.py index dd4a3778..390d17e1 100644 --- a/test/mitmproxy/types/test_serializable.py +++ b/test/mitmproxy/types/test_serializable.py @@ -1,3 +1,5 @@ +import copy +  from mitmproxy.types import serializable @@ -6,17 +8,17 @@ class SerializableDummy(serializable.Serializable):          self.i = i      def get_state(self): -        return self.i +        return copy.copy(self.i)      def set_state(self, i):          self.i = i -    def from_state(self, state): -        return type(self)(state) +    @classmethod +    def from_state(cls, state): +        return cls(state)  class TestSerializable: -      def test_copy(self):          a = SerializableDummy(42)          assert a.i == 42 @@ -26,3 +28,12 @@ class TestSerializable:          a.set_state(1)          assert a.i == 1          assert b.i == 42 + +    def test_copy_id(self): +        a = SerializableDummy({ +            "id": "foo", +            "foo": 42 +        }) +        b = a.copy() +        assert a.get_state()["id"] != b.get_state()["id"] +        assert a.get_state()["foo"] == b.get_state()["foo"] diff --git a/test/pathod/language/test_base.py b/test/pathod/language/test_base.py index 85e9e53b..ec460b07 100644 --- a/test/pathod/language/test_base.py +++ b/test/pathod/language/test_base.py @@ -1,11 +1,8 @@ -import os  import pytest  from pathod import language  from pathod.language import base, exceptions -from mitmproxy.test import tutils -  def parse_request(s):      return language.parse_pathoc(s).next() @@ -137,24 +134,22 @@ class TestTokValueFile:          v = base.TokValue.parseString("<path")[0]          assert v.path == "path" -    def test_access_control(self): +    def test_access_control(self, tmpdir):          v = base.TokValue.parseString("<path")[0] -        with tutils.tmpdir() as t: -            p = os.path.join(t, "path") -            with open(p, "wb") as f: -                f.write(b"x" * 10000) - -            assert v.get_generator(language.Settings(staticdir=t)) - -            v = base.TokValue.parseString("<path2")[0] -            with pytest.raises(exceptions.FileAccessDenied): -                v.get_generator(language.Settings(staticdir=t)) -            with pytest.raises(Exception, match="access disabled"): -                v.get_generator(language.Settings()) - -            v = base.TokValue.parseString("</outside")[0] -            with pytest.raises(Exception, match="outside"): -                v.get_generator(language.Settings(staticdir=t)) +        f = tmpdir.join("path") +        f.write(b"x" * 10000) + +        assert v.get_generator(language.Settings(staticdir=str(tmpdir))) + +        v = base.TokValue.parseString("<path2")[0] +        with pytest.raises(exceptions.FileAccessDenied): +            v.get_generator(language.Settings(staticdir=str(tmpdir))) +        with pytest.raises(Exception, match="access disabled"): +            v.get_generator(language.Settings()) + +        v = base.TokValue.parseString("</outside")[0] +        with pytest.raises(Exception, match="outside"): +            v.get_generator(language.Settings(staticdir=str(tmpdir)))      def test_spec(self):          v = base.TokValue.parseString("<'one two'")[0] diff --git a/test/pathod/language/test_generators.py b/test/pathod/language/test_generators.py index b3ce0335..dc15aaa1 100644 --- a/test/pathod/language/test_generators.py +++ b/test/pathod/language/test_generators.py @@ -1,7 +1,4 @@ -import os -  from pathod.language import generators -from mitmproxy.test import tutils  def test_randomgenerator(): @@ -15,23 +12,20 @@ def test_randomgenerator():      assert len(g[1000:1001]) == 0 -def test_filegenerator(): -    with tutils.tmpdir() as t: -        path = os.path.join(t, "foo") -        f = open(path, "wb") -        f.write(b"x" * 10000) -        f.close() -        g = generators.FileGenerator(path) -        assert len(g) == 10000 -        assert g[0] == b"x" -        assert g[-1] == b"x" -        assert g[0:5] == b"xxxxx" -        assert len(g[1:10]) == 9 -        assert len(g[10000:10001]) == 0 -        assert repr(g) -        # remove all references to FileGenerator instance to close the file -        # handle. -        del g +def test_filegenerator(tmpdir): +    f = tmpdir.join("foo") +    f.write(b"x" * 10000) +    g = generators.FileGenerator(str(f)) +    assert len(g) == 10000 +    assert g[0] == b"x" +    assert g[-1] == b"x" +    assert g[0:5] == b"xxxxx" +    assert len(g[1:10]) == 9 +    assert len(g[10000:10001]) == 0 +    assert repr(g) +    # remove all references to FileGenerator instance to close the file +    # handle. +    del g  def test_transform_generator(): | 
