aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2017-03-12 23:33:49 +0100
committerMaximilian Hils <git@maximilianhils.com>2017-03-12 23:33:49 +0100
commit05e11547f5875a4b5b3c109db8a1db477d1986ad (patch)
tree5c6c5acb7fedeee2e62fc5266d0d9211421fdfa5 /test
parent0f4b523868856fccaf373c9108c9220a3aafb30b (diff)
parent7d5ab70ce3a6deb70aa3e30e755fdee0f3b2fdeb (diff)
downloadmitmproxy-05e11547f5875a4b5b3c109db8a1db477d1986ad.tar.gz
mitmproxy-05e11547f5875a4b5b3c109db8a1db477d1986ad.tar.bz2
mitmproxy-05e11547f5875a4b5b3c109db8a1db477d1986ad.zip
Merge remote-tracking branch 'origin/master' into pr-2120
Conflicts: test/mitmproxy/addons/test_replace.py
Diffstat (limited to 'test')
-rw-r--r--test/mitmproxy/addons/test_clientplayback.py19
-rw-r--r--test/mitmproxy/addons/test_disable_h2c.py39
-rw-r--r--test/mitmproxy/addons/test_disable_h2c_upgrade.py17
-rw-r--r--test/mitmproxy/addons/test_proxyauth.py1
-rw-r--r--test/mitmproxy/addons/test_script.py33
-rw-r--r--test/mitmproxy/addons/test_serverplayback.py19
-rw-r--r--test/mitmproxy/addons/test_streamfile.py68
-rw-r--r--test/mitmproxy/net/test_tcp.py31
-rw-r--r--test/mitmproxy/test_certs.py204
-rw-r--r--test/mitmproxy/test_examples.py49
-rw-r--r--test/mitmproxy/test_optmanager.py29
-rw-r--r--test/mitmproxy/test_stateobject.py22
-rw-r--r--test/mitmproxy/test_websocket.py15
-rw-r--r--test/mitmproxy/tools/test_dump.py25
-rw-r--r--test/pathod/language/test_base.py35
-rw-r--r--test/pathod/language/test_generators.py34
16 files changed, 321 insertions, 319 deletions
diff --git a/test/mitmproxy/addons/test_clientplayback.py b/test/mitmproxy/addons/test_clientplayback.py
index 6b8b7c90..c22b3589 100644
--- a/test/mitmproxy/addons/test_clientplayback.py
+++ b/test/mitmproxy/addons/test_clientplayback.py
@@ -1,9 +1,7 @@
-import os
import pytest
from unittest import mock
from mitmproxy.test import tflow
-from mitmproxy.test import tutils
from mitmproxy import io
from mitmproxy import exceptions
@@ -49,14 +47,13 @@ class TestClientPlayback:
cp.tick()
assert cp.current_thread is None
- def test_configure(self):
+ def test_configure(self, tmpdir):
cp = clientplayback.ClientPlayback()
with taddons.context() as tctx:
- with tutils.tmpdir() as td:
- path = os.path.join(td, "flows")
- tdump(path, [tflow.tflow()])
- tctx.configure(cp, client_replay=[path])
- tctx.configure(cp, client_replay=[])
- tctx.configure(cp)
- with pytest.raises(exceptions.OptionsError):
- tctx.configure(cp, client_replay=["nonexistent"])
+ path = str(tmpdir.join("flows"))
+ tdump(path, [tflow.tflow()])
+ tctx.configure(cp, client_replay=[path])
+ tctx.configure(cp, client_replay=[])
+ tctx.configure(cp)
+ with pytest.raises(exceptions.OptionsError):
+ tctx.configure(cp, client_replay=["nonexistent"])
diff --git a/test/mitmproxy/addons/test_disable_h2c.py b/test/mitmproxy/addons/test_disable_h2c.py
new file mode 100644
index 00000000..d4df8390
--- /dev/null
+++ b/test/mitmproxy/addons/test_disable_h2c.py
@@ -0,0 +1,39 @@
+import io
+from mitmproxy import http
+from mitmproxy.addons import disable_h2c
+from mitmproxy.net.http import http1
+from mitmproxy.exceptions import Kill
+from mitmproxy.test import tflow
+from mitmproxy.test import taddons
+
+
+class TestDisableH2CleartextUpgrade:
+ def test_upgrade(self):
+ with taddons.context() as tctx:
+ a = disable_h2c.DisableH2C()
+ tctx.configure(a)
+
+ f = tflow.tflow()
+ f.request.headers['upgrade'] = 'h2c'
+ f.request.headers['connection'] = 'foo'
+ f.request.headers['http2-settings'] = 'bar'
+
+ a.request(f)
+ assert 'upgrade' not in f.request.headers
+ assert 'connection' not in f.request.headers
+ assert 'http2-settings' not in f.request.headers
+
+ def test_prior_knowledge(self):
+ with taddons.context() as tctx:
+ a = disable_h2c.DisableH2C()
+ tctx.configure(a)
+
+ b = io.BytesIO(b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")
+ f = tflow.tflow()
+ f.request = http.HTTPRequest.wrap(http1.read_request(b))
+ f.reply.handle()
+ f.intercept()
+
+ a.request(f)
+ assert not f.killable
+ assert f.reply.value == Kill
diff --git a/test/mitmproxy/addons/test_disable_h2c_upgrade.py b/test/mitmproxy/addons/test_disable_h2c_upgrade.py
deleted file mode 100644
index 6cab713d..00000000
--- a/test/mitmproxy/addons/test_disable_h2c_upgrade.py
+++ /dev/null
@@ -1,17 +0,0 @@
-from mitmproxy.addons import disable_h2c_upgrade
-from mitmproxy.test import tflow
-
-
-class TestTermLog:
- def test_simple(self):
- a = disable_h2c_upgrade.DisableH2CleartextUpgrade()
-
- f = tflow.tflow()
- f.request.headers['upgrade'] = 'h2c'
- f.request.headers['connection'] = 'foo'
- f.request.headers['http2-settings'] = 'bar'
-
- a.request(f)
- assert 'upgrade' not in f.request.headers
- assert 'connection' not in f.request.headers
- assert 'http2-settings' not in f.request.headers
diff --git a/test/mitmproxy/addons/test_proxyauth.py b/test/mitmproxy/addons/test_proxyauth.py
index 14782755..513f3f08 100644
--- a/test/mitmproxy/addons/test_proxyauth.py
+++ b/test/mitmproxy/addons/test_proxyauth.py
@@ -173,3 +173,4 @@ def test_handlers():
f2 = tflow.tflow(client_conn=f.client_conn)
up.requestheaders(f2)
assert not f2.response
+ assert f2.metadata["proxyauth"] == ('test', 'test')
diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py
index 5f196ebf..d79ed4ef 100644
--- a/test/mitmproxy/addons/test_script.py
+++ b/test/mitmproxy/addons/test_script.py
@@ -68,13 +68,12 @@ class TestParseCommand:
with pytest.raises(ValueError):
script.parse_command(" ")
- def test_no_script_file(self):
+ def test_no_script_file(self, tmpdir):
with pytest.raises(Exception, match="not found"):
script.parse_command("notfound")
- with tutils.tmpdir() as dir:
- with pytest.raises(Exception, match="Not a file"):
- script.parse_command(dir)
+ with pytest.raises(Exception, match="Not a file"):
+ script.parse_command(str(tmpdir))
def test_parse_args(self):
with utils.chdir(tutils.test_data.dirname):
@@ -128,21 +127,19 @@ class TestScript:
recf = sc.ns.call_log[0]
assert recf[1] == "request"
- def test_reload(self):
+ def test_reload(self, tmpdir):
with taddons.context() as tctx:
- with tutils.tmpdir():
- with open("foo.py", "w"):
- pass
- sc = script.Script("foo.py")
- tctx.configure(sc)
- for _ in range(100):
- with open("foo.py", "a") as f:
- f.write(".")
- sc.tick()
- time.sleep(0.1)
- if tctx.master.event_log:
- return
- raise AssertionError("Change event not detected.")
+ f = tmpdir.join("foo.py")
+ f.ensure(file=True)
+ sc = script.Script(str(f))
+ tctx.configure(sc)
+ for _ in range(100):
+ f.write(".")
+ sc.tick()
+ time.sleep(0.1)
+ if tctx.master.event_log:
+ return
+ raise AssertionError("Change event not detected.")
def test_exception(self):
with taddons.context() as tctx:
diff --git a/test/mitmproxy/addons/test_serverplayback.py b/test/mitmproxy/addons/test_serverplayback.py
index e2afa516..54e4d281 100644
--- a/test/mitmproxy/addons/test_serverplayback.py
+++ b/test/mitmproxy/addons/test_serverplayback.py
@@ -1,10 +1,8 @@
-import os
import urllib
import pytest
-from mitmproxy.test import tutils
-from mitmproxy.test import tflow
from mitmproxy.test import taddons
+from mitmproxy.test import tflow
import mitmproxy.test.tutils
from mitmproxy.addons import serverplayback
@@ -19,15 +17,14 @@ def tdump(path, flows):
w.add(i)
-def test_config():
+def test_config(tmpdir):
s = serverplayback.ServerPlayback()
- with tutils.tmpdir() as p:
- with taddons.context() as tctx:
- fpath = os.path.join(p, "flows")
- tdump(fpath, [tflow.tflow(resp=True)])
- tctx.configure(s, server_replay=[fpath])
- with pytest.raises(exceptions.OptionsError):
- tctx.configure(s, server_replay=[p])
+ with taddons.context() as tctx:
+ fpath = str(tmpdir.join("flows"))
+ tdump(fpath, [tflow.tflow(resp=True)])
+ tctx.configure(s, server_replay=[fpath])
+ with pytest.raises(exceptions.OptionsError):
+ tctx.configure(s, server_replay=[str(tmpdir)])
def test_tick():
diff --git a/test/mitmproxy/addons/test_streamfile.py b/test/mitmproxy/addons/test_streamfile.py
index 4105c1fc..3f78521c 100644
--- a/test/mitmproxy/addons/test_streamfile.py
+++ b/test/mitmproxy/addons/test_streamfile.py
@@ -1,9 +1,7 @@
-import os.path
import pytest
-from mitmproxy.test import tflow
-from mitmproxy.test import tutils
from mitmproxy.test import taddons
+from mitmproxy.test import tflow
from mitmproxy import io
from mitmproxy import exceptions
@@ -11,19 +9,17 @@ from mitmproxy import options
from mitmproxy.addons import streamfile
-def test_configure():
+def test_configure(tmpdir):
sa = streamfile.StreamFile()
with taddons.context(options=options.Options()) as tctx:
- with tutils.tmpdir() as tdir:
- p = os.path.join(tdir, "foo")
- with pytest.raises(exceptions.OptionsError):
- tctx.configure(sa, streamfile=tdir)
- with pytest.raises(Exception, match="Invalid filter"):
- tctx.configure(sa, streamfile=p, filtstr="~~")
- tctx.configure(sa, filtstr="foo")
- assert sa.filt
- tctx.configure(sa, filtstr=None)
- assert not sa.filt
+ with pytest.raises(exceptions.OptionsError):
+ tctx.configure(sa, streamfile=str(tmpdir))
+ with pytest.raises(Exception, match="Invalid filter"):
+ tctx.configure(sa, streamfile=str(tmpdir.join("foo")), filtstr="~~")
+ tctx.configure(sa, filtstr="foo")
+ assert sa.filt
+ tctx.configure(sa, filtstr=None)
+ assert not sa.filt
def rd(p):
@@ -31,36 +27,34 @@ def rd(p):
return list(x.stream())
-def test_tcp():
+def test_tcp(tmpdir):
sa = streamfile.StreamFile()
with taddons.context() as tctx:
- with tutils.tmpdir() as tdir:
- p = os.path.join(tdir, "foo")
- tctx.configure(sa, streamfile=p)
+ p = str(tmpdir.join("foo"))
+ tctx.configure(sa, streamfile=p)
- tt = tflow.ttcpflow()
- sa.tcp_start(tt)
- sa.tcp_end(tt)
- tctx.configure(sa, streamfile=None)
- assert rd(p)
+ tt = tflow.ttcpflow()
+ sa.tcp_start(tt)
+ sa.tcp_end(tt)
+ tctx.configure(sa, streamfile=None)
+ assert rd(p)
-def test_simple():
+def test_simple(tmpdir):
sa = streamfile.StreamFile()
with taddons.context() as tctx:
- with tutils.tmpdir() as tdir:
- p = os.path.join(tdir, "foo")
+ p = str(tmpdir.join("foo"))
- tctx.configure(sa, streamfile=p)
+ tctx.configure(sa, streamfile=p)
- f = tflow.tflow(resp=True)
- sa.request(f)
- sa.response(f)
- tctx.configure(sa, streamfile=None)
- assert rd(p)[0].response
+ f = tflow.tflow(resp=True)
+ sa.request(f)
+ sa.response(f)
+ tctx.configure(sa, streamfile=None)
+ assert rd(p)[0].response
- tctx.configure(sa, streamfile="+" + p)
- f = tflow.tflow()
- sa.request(f)
- tctx.configure(sa, streamfile=None)
- assert not rd(p)[1].response
+ tctx.configure(sa, streamfile="+" + p)
+ f = tflow.tflow()
+ sa.request(f)
+ tctx.configure(sa, streamfile=None)
+ assert not rd(p)[1].response
diff --git a/test/mitmproxy/net/test_tcp.py b/test/mitmproxy/net/test_tcp.py
index cf010f6e..8b26784a 100644
--- a/test/mitmproxy/net/test_tcp.py
+++ b/test/mitmproxy/net/test_tcp.py
@@ -11,8 +11,8 @@ from OpenSSL import SSL
from mitmproxy import certs
from mitmproxy.net import tcp
-from mitmproxy.test import tutils
from mitmproxy import exceptions
+from mitmproxy.test import tutils
from . import tservers
from ...conftest import requires_alpn
@@ -783,25 +783,24 @@ class TestSSLKeyLogger(tservers.ServerTestBase):
cipher_list="AES256-SHA"
)
- def test_log(self):
+ def test_log(self, tmpdir):
testval = b"echo!\n"
_logfun = tcp.log_ssl_key
- with tutils.tmpdir() as d:
- logfile = os.path.join(d, "foo", "bar", "logfile")
- tcp.log_ssl_key = tcp.SSLKeyLogger(logfile)
+ logfile = str(tmpdir.join("foo", "bar", "logfile"))
+ tcp.log_ssl_key = tcp.SSLKeyLogger(logfile)
- c = tcp.TCPClient(("127.0.0.1", self.port))
- with c.connect():
- c.convert_to_ssl()
- c.wfile.write(testval)
- c.wfile.flush()
- assert c.rfile.readline() == testval
- c.finish()
-
- tcp.log_ssl_key.close()
- with open(logfile, "rb") as f:
- assert f.read().count(b"CLIENT_RANDOM") == 2
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ with c.connect():
+ c.convert_to_ssl()
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
+ c.finish()
+
+ tcp.log_ssl_key.close()
+ with open(logfile, "rb") as f:
+ assert f.read().count(b"CLIENT_RANDOM") == 2
tcp.log_ssl_key = _logfun
diff --git a/test/mitmproxy/test_certs.py b/test/mitmproxy/test_certs.py
index 9bd3ad25..2d12c370 100644
--- a/test/mitmproxy/test_certs.py
+++ b/test/mitmproxy/test_certs.py
@@ -34,118 +34,106 @@ from mitmproxy.test import tutils
class TestCertStore:
- def test_create_explicit(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- assert ca.get_cert(b"foo", [])
-
- ca2 = certs.CertStore.from_store(d, "test")
- assert ca2.get_cert(b"foo", [])
-
- assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number()
-
- def test_create_no_common_name(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- assert ca.get_cert(None, [])[0].cn is None
-
- def test_create_tmp(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- assert ca.get_cert(b"foo.com", [])
- assert ca.get_cert(b"foo.com", [])
- assert ca.get_cert(b"*.foo.com", [])
-
- r = ca.get_cert(b"*.foo.com", [])
- assert r[1] == ca.default_privatekey
-
- def test_sans(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- c1 = ca.get_cert(b"foo.com", [b"*.bar.com"])
- ca.get_cert(b"foo.bar.com", [])
- # assert c1 == c2
- c3 = ca.get_cert(b"bar.com", [])
- assert not c1 == c3
-
- def test_sans_change(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- ca.get_cert(b"foo.com", [b"*.bar.com"])
- cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"])
- assert b"*.baz.com" in cert.altnames
-
- def test_expire(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- ca.STORE_CAP = 3
- ca.get_cert(b"one.com", [])
- ca.get_cert(b"two.com", [])
- ca.get_cert(b"three.com", [])
-
- assert (b"one.com", ()) in ca.certs
- assert (b"two.com", ()) in ca.certs
- assert (b"three.com", ()) in ca.certs
-
- ca.get_cert(b"one.com", [])
-
- assert (b"one.com", ()) in ca.certs
- assert (b"two.com", ()) in ca.certs
- assert (b"three.com", ()) in ca.certs
-
- ca.get_cert(b"four.com", [])
-
- assert (b"one.com", ()) not in ca.certs
- assert (b"two.com", ()) in ca.certs
- assert (b"three.com", ()) in ca.certs
- assert (b"four.com", ()) in ca.certs
-
- def test_overrides(self):
- with tutils.tmpdir() as d:
- ca1 = certs.CertStore.from_store(os.path.join(d, "ca1"), "test")
- ca2 = certs.CertStore.from_store(os.path.join(d, "ca2"), "test")
- assert not ca1.default_ca.get_serial_number(
- ) == ca2.default_ca.get_serial_number()
-
- dc = ca2.get_cert(b"foo.com", [b"sans.example.com"])
- dcp = os.path.join(d, "dc")
- f = open(dcp, "wb")
- f.write(dc[0].to_pem())
- f.close()
- ca1.add_cert_file(b"foo.com", dcp)
-
- ret = ca1.get_cert(b"foo.com", [])
- assert ret[0].serial == dc[0].serial
-
- def test_create_dhparams(self):
- with tutils.tmpdir() as d:
- filename = os.path.join(d, "dhparam.pem")
- certs.CertStore.load_dhparam(filename)
- assert os.path.exists(filename)
+ def test_create_explicit(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ assert ca.get_cert(b"foo", [])
+
+ ca2 = certs.CertStore.from_store(str(tmpdir), "test")
+ assert ca2.get_cert(b"foo", [])
+
+ assert ca.default_ca.get_serial_number() == ca2.default_ca.get_serial_number()
+
+ def test_create_no_common_name(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ assert ca.get_cert(None, [])[0].cn is None
+
+ def test_create_tmp(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ assert ca.get_cert(b"foo.com", [])
+ assert ca.get_cert(b"foo.com", [])
+ assert ca.get_cert(b"*.foo.com", [])
+
+ r = ca.get_cert(b"*.foo.com", [])
+ assert r[1] == ca.default_privatekey
+
+ def test_sans(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ c1 = ca.get_cert(b"foo.com", [b"*.bar.com"])
+ ca.get_cert(b"foo.bar.com", [])
+ # assert c1 == c2
+ c3 = ca.get_cert(b"bar.com", [])
+ assert not c1 == c3
+
+ def test_sans_change(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ ca.get_cert(b"foo.com", [b"*.bar.com"])
+ cert, key, chain_file = ca.get_cert(b"foo.bar.com", [b"*.baz.com"])
+ assert b"*.baz.com" in cert.altnames
+
+ def test_expire(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ ca.STORE_CAP = 3
+ ca.get_cert(b"one.com", [])
+ ca.get_cert(b"two.com", [])
+ ca.get_cert(b"three.com", [])
+
+ assert (b"one.com", ()) in ca.certs
+ assert (b"two.com", ()) in ca.certs
+ assert (b"three.com", ()) in ca.certs
+
+ ca.get_cert(b"one.com", [])
+
+ assert (b"one.com", ()) in ca.certs
+ assert (b"two.com", ()) in ca.certs
+ assert (b"three.com", ()) in ca.certs
+
+ ca.get_cert(b"four.com", [])
+
+ assert (b"one.com", ()) not in ca.certs
+ assert (b"two.com", ()) in ca.certs
+ assert (b"three.com", ()) in ca.certs
+ assert (b"four.com", ()) in ca.certs
+
+ def test_overrides(self, tmpdir):
+ ca1 = certs.CertStore.from_store(str(tmpdir.join("ca1")), "test")
+ ca2 = certs.CertStore.from_store(str(tmpdir.join("ca2")), "test")
+ assert not ca1.default_ca.get_serial_number() == ca2.default_ca.get_serial_number()
+
+ dc = ca2.get_cert(b"foo.com", [b"sans.example.com"])
+ dcp = tmpdir.join("dc")
+ dcp.write(dc[0].to_pem())
+ ca1.add_cert_file(b"foo.com", str(dcp))
+
+ ret = ca1.get_cert(b"foo.com", [])
+ assert ret[0].serial == dc[0].serial
+
+ def test_create_dhparams(self, tmpdir):
+ filename = str(tmpdir.join("dhparam.pem"))
+ certs.CertStore.load_dhparam(filename)
+ assert os.path.exists(filename)
class TestDummyCert:
- def test_with_ca(self):
- with tutils.tmpdir() as d:
- ca = certs.CertStore.from_store(d, "test")
- r = certs.dummy_cert(
- ca.default_privatekey,
- ca.default_ca,
- b"foo.com",
- [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"]
- )
- assert r.cn == b"foo.com"
- assert r.altnames == [b'one.com', b'two.com', b'*.three.com']
-
- r = certs.dummy_cert(
- ca.default_privatekey,
- ca.default_ca,
- None,
- []
- )
- assert r.cn is None
- assert r.altnames == []
+ def test_with_ca(self, tmpdir):
+ ca = certs.CertStore.from_store(str(tmpdir), "test")
+ r = certs.dummy_cert(
+ ca.default_privatekey,
+ ca.default_ca,
+ b"foo.com",
+ [b"one.com", b"two.com", b"*.three.com", b"127.0.0.1"]
+ )
+ assert r.cn == b"foo.com"
+ assert r.altnames == [b'one.com', b'two.com', b'*.three.com']
+
+ r = certs.dummy_cert(
+ ca.default_privatekey,
+ ca.default_ca,
+ None,
+ []
+ )
+ assert r.cn is None
+ assert r.altnames == []
class TestSSLCert:
diff --git a/test/mitmproxy/test_examples.py b/test/mitmproxy/test_examples.py
index 668d0d4a..f20e0c8c 100644
--- a/test/mitmproxy/test_examples.py
+++ b/test/mitmproxy/test_examples.py
@@ -1,5 +1,4 @@
import json
-import os
import shlex
import pytest
@@ -142,30 +141,26 @@ class TestHARDump:
with pytest.raises(ScriptError):
tscript("complex/har_dump.py")
- def test_simple(self):
- with tutils.tmpdir() as tdir:
- path = os.path.join(tdir, "somefile")
+ def test_simple(self, tmpdir):
+ path = str(tmpdir.join("somefile"))
- m, sc = tscript("complex/har_dump.py", shlex.quote(path))
- m.addons.invoke(m, "response", self.flow())
- m.addons.remove(sc)
-
- with open(path, "r") as inp:
- har = json.load(inp)
+ m, sc = tscript("complex/har_dump.py", shlex.quote(path))
+ m.addons.invoke(m, "response", self.flow())
+ m.addons.remove(sc)
+ with open(path, "r") as inp:
+ har = json.load(inp)
assert len(har["log"]["entries"]) == 1
- def test_base64(self):
- with tutils.tmpdir() as tdir:
- path = os.path.join(tdir, "somefile")
-
- m, sc = tscript("complex/har_dump.py", shlex.quote(path))
- m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10))
- m.addons.remove(sc)
+ def test_base64(self, tmpdir):
+ path = str(tmpdir.join("somefile"))
- with open(path, "r") as inp:
- har = json.load(inp)
+ m, sc = tscript("complex/har_dump.py", shlex.quote(path))
+ m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10))
+ m.addons.remove(sc)
+ with open(path, "r") as inp:
+ har = json.load(inp)
assert har["log"]["entries"][0]["response"]["content"]["encoding"] == "base64"
def test_format_cookies(self):
@@ -187,7 +182,7 @@ class TestHARDump:
f = format_cookies([("n", "v", CA([("expires", "Mon, 24-Aug-2037 00:00:00 GMT")]))])[0]
assert f['expires']
- def test_binary(self):
+ def test_binary(self, tmpdir):
f = self.flow()
f.request.method = "POST"
@@ -196,14 +191,12 @@ class TestHARDump:
f.response.headers["random-junk"] = bytes(range(256))
f.response.content = bytes(range(256))
- with tutils.tmpdir() as tdir:
- path = os.path.join(tdir, "somefile")
-
- m, sc = tscript("complex/har_dump.py", shlex.quote(path))
- m.addons.invoke(m, "response", f)
- m.addons.remove(sc)
+ path = str(tmpdir.join("somefile"))
- with open(path, "r") as inp:
- har = json.load(inp)
+ m, sc = tscript("complex/har_dump.py", shlex.quote(path))
+ m.addons.invoke(m, "response", f)
+ m.addons.remove(sc)
+ with open(path, "r") as inp:
+ har = json.load(inp)
assert len(har["log"]["entries"]) == 1
diff --git a/test/mitmproxy/test_optmanager.py b/test/mitmproxy/test_optmanager.py
index db33cddd..0ecfc4e5 100644
--- a/test/mitmproxy/test_optmanager.py
+++ b/test/mitmproxy/test_optmanager.py
@@ -1,5 +1,4 @@
import copy
-import os
import pytest
import typing
import argparse
@@ -7,7 +6,6 @@ import argparse
from mitmproxy import options
from mitmproxy import optmanager
from mitmproxy import exceptions
-from mitmproxy.test import tutils
class TO(optmanager.OptManager):
@@ -238,25 +236,24 @@ def test_serialize_defaults():
assert o.serialize(None, defaults=True)
-def test_saving():
+def test_saving(tmpdir):
o = TD2()
o.three = "set"
- with tutils.tmpdir() as tdir:
- dst = os.path.join(tdir, "conf")
- o.save(dst, defaults=True)
+ dst = str(tmpdir.join("conf"))
+ o.save(dst, defaults=True)
- o2 = TD2()
- o2.load_paths(dst)
- o2.three = "foo"
- o2.save(dst, defaults=True)
+ o2 = TD2()
+ o2.load_paths(dst)
+ o2.three = "foo"
+ o2.save(dst, defaults=True)
- o.load_paths(dst)
- assert o.three == "foo"
+ o.load_paths(dst)
+ assert o.three == "foo"
- with open(dst, 'a') as f:
- f.write("foobar: '123'")
- with pytest.raises(exceptions.OptionsError, matches=''):
- o.load_paths(dst)
+ with open(dst, 'a') as f:
+ f.write("foobar: '123'")
+ with pytest.raises(exceptions.OptionsError, matches=''):
+ o.load_paths(dst)
def test_merge():
diff --git a/test/mitmproxy/test_stateobject.py b/test/mitmproxy/test_stateobject.py
index 7b8e30d0..d8c7a8e9 100644
--- a/test/mitmproxy/test_stateobject.py
+++ b/test/mitmproxy/test_stateobject.py
@@ -26,10 +26,12 @@ class Container(StateObject):
def __init__(self):
self.child = None
self.children = None
+ self.dictionary = None
_stateobject_attributes = dict(
child=Child,
children=List[Child],
+ dictionary=dict,
)
@classmethod
@@ -62,12 +64,30 @@ def test_container_list():
a.children = [Child(42), Child(44)]
assert a.get_state() == {
"child": None,
- "children": [{"x": 42}, {"x": 44}]
+ "children": [{"x": 42}, {"x": 44}],
+ "dictionary": None,
}
copy = a.copy()
assert len(copy.children) == 2
assert copy.children is not a.children
assert copy.children[0] is not a.children[0]
+ assert Container.from_state(a.get_state())
+
+
+def test_container_dict():
+ a = Container()
+ a.dictionary = dict()
+ a.dictionary['foo'] = 'bar'
+ a.dictionary['bar'] = Child(44)
+ assert a.get_state() == {
+ "child": None,
+ "children": None,
+ "dictionary": {'bar': {'x': 44}, 'foo': 'bar'},
+ }
+ copy = a.copy()
+ assert len(copy.dictionary) == 2
+ assert copy.dictionary is not a.dictionary
+ assert copy.dictionary['bar'] is not a.dictionary['bar']
def test_too_much_state():
diff --git a/test/mitmproxy/test_websocket.py b/test/mitmproxy/test_websocket.py
index f2963390..62f69e2d 100644
--- a/test/mitmproxy/test_websocket.py
+++ b/test/mitmproxy/test_websocket.py
@@ -1,5 +1,7 @@
+import io
import pytest
+from mitmproxy.contrib import tnetstring
from mitmproxy import flowfilter
from mitmproxy.test import tflow
@@ -14,8 +16,6 @@ class TestWebSocketFlow:
b = f2.get_state()
del a["id"]
del b["id"]
- del a["handshake_flow"]["id"]
- del b["handshake_flow"]["id"]
assert a == b
assert not f == f2
assert f is not f2
@@ -60,3 +60,14 @@ class TestWebSocketFlow:
assert 'WebSocketFlow' in repr(f)
assert 'binary message: ' in repr(f.messages[0])
assert 'text message: ' in repr(f.messages[1])
+
+ def test_serialize(self):
+ b = io.BytesIO()
+ d = tflow.twebsocketflow().get_state()
+ tnetstring.dump(d, b)
+ assert b.getvalue()
+
+ b = io.BytesIO()
+ d = tflow.twebsocketflow().handshake_flow.get_state()
+ tnetstring.dump(d, b)
+ assert b.getvalue()
diff --git a/test/mitmproxy/tools/test_dump.py b/test/mitmproxy/tools/test_dump.py
index 2542ec4b..a15bf583 100644
--- a/test/mitmproxy/tools/test_dump.py
+++ b/test/mitmproxy/tools/test_dump.py
@@ -1,4 +1,3 @@
-import os
import pytest
from unittest import mock
@@ -9,7 +8,6 @@ from mitmproxy import controller
from mitmproxy import options
from mitmproxy.tools import dump
-from mitmproxy.test import tutils
from .. import tservers
@@ -19,18 +17,17 @@ class TestDumpMaster(tservers.MasterTest):
m = dump.DumpMaster(o, proxy.DummyServer(), with_termlog=False, with_dumper=False)
return m
- def test_read(self):
- with tutils.tmpdir() as t:
- p = os.path.join(t, "read")
- self.flowfile(p)
- self.dummy_cycle(
- self.mkmaster(None, rfile=p),
- 1, b"",
- )
- with pytest.raises(exceptions.OptionsError):
- self.mkmaster(None, rfile="/nonexistent")
- with pytest.raises(exceptions.OptionsError):
- self.mkmaster(None, rfile="test_dump.py")
+ def test_read(self, tmpdir):
+ p = str(tmpdir.join("read"))
+ self.flowfile(p)
+ self.dummy_cycle(
+ self.mkmaster(None, rfile=p),
+ 1, b"",
+ )
+ with pytest.raises(exceptions.OptionsError):
+ self.mkmaster(None, rfile="/nonexistent")
+ with pytest.raises(exceptions.OptionsError):
+ self.mkmaster(None, rfile="test_dump.py")
def test_has_error(self):
m = self.mkmaster(None)
diff --git a/test/pathod/language/test_base.py b/test/pathod/language/test_base.py
index 85e9e53b..ec460b07 100644
--- a/test/pathod/language/test_base.py
+++ b/test/pathod/language/test_base.py
@@ -1,11 +1,8 @@
-import os
import pytest
from pathod import language
from pathod.language import base, exceptions
-from mitmproxy.test import tutils
-
def parse_request(s):
return language.parse_pathoc(s).next()
@@ -137,24 +134,22 @@ class TestTokValueFile:
v = base.TokValue.parseString("<path")[0]
assert v.path == "path"
- def test_access_control(self):
+ def test_access_control(self, tmpdir):
v = base.TokValue.parseString("<path")[0]
- with tutils.tmpdir() as t:
- p = os.path.join(t, "path")
- with open(p, "wb") as f:
- f.write(b"x" * 10000)
-
- assert v.get_generator(language.Settings(staticdir=t))
-
- v = base.TokValue.parseString("<path2")[0]
- with pytest.raises(exceptions.FileAccessDenied):
- v.get_generator(language.Settings(staticdir=t))
- with pytest.raises(Exception, match="access disabled"):
- v.get_generator(language.Settings())
-
- v = base.TokValue.parseString("</outside")[0]
- with pytest.raises(Exception, match="outside"):
- v.get_generator(language.Settings(staticdir=t))
+ f = tmpdir.join("path")
+ f.write(b"x" * 10000)
+
+ assert v.get_generator(language.Settings(staticdir=str(tmpdir)))
+
+ v = base.TokValue.parseString("<path2")[0]
+ with pytest.raises(exceptions.FileAccessDenied):
+ v.get_generator(language.Settings(staticdir=str(tmpdir)))
+ with pytest.raises(Exception, match="access disabled"):
+ v.get_generator(language.Settings())
+
+ v = base.TokValue.parseString("</outside")[0]
+ with pytest.raises(Exception, match="outside"):
+ v.get_generator(language.Settings(staticdir=str(tmpdir)))
def test_spec(self):
v = base.TokValue.parseString("<'one two'")[0]
diff --git a/test/pathod/language/test_generators.py b/test/pathod/language/test_generators.py
index b3ce0335..dc15aaa1 100644
--- a/test/pathod/language/test_generators.py
+++ b/test/pathod/language/test_generators.py
@@ -1,7 +1,4 @@
-import os
-
from pathod.language import generators
-from mitmproxy.test import tutils
def test_randomgenerator():
@@ -15,23 +12,20 @@ def test_randomgenerator():
assert len(g[1000:1001]) == 0
-def test_filegenerator():
- with tutils.tmpdir() as t:
- path = os.path.join(t, "foo")
- f = open(path, "wb")
- f.write(b"x" * 10000)
- f.close()
- g = generators.FileGenerator(path)
- assert len(g) == 10000
- assert g[0] == b"x"
- assert g[-1] == b"x"
- assert g[0:5] == b"xxxxx"
- assert len(g[1:10]) == 9
- assert len(g[10000:10001]) == 0
- assert repr(g)
- # remove all references to FileGenerator instance to close the file
- # handle.
- del g
+def test_filegenerator(tmpdir):
+ f = tmpdir.join("foo")
+ f.write(b"x" * 10000)
+ g = generators.FileGenerator(str(f))
+ assert len(g) == 10000
+ assert g[0] == b"x"
+ assert g[-1] == b"x"
+ assert g[0:5] == b"xxxxx"
+ assert len(g[1:10]) == 9
+ assert len(g[10000:10001]) == 0
+ assert repr(g)
+ # remove all references to FileGenerator instance to close the file
+ # handle.
+ del g
def test_transform_generator():