diff options
author | Aldo Cortesi <aldo@nullcube.com> | 2016-10-20 11:02:52 +1300 |
---|---|---|
committer | Aldo Cortesi <aldo@nullcube.com> | 2016-10-20 11:02:52 +1300 |
commit | f964d49853a3f0d22e0f6d4cff7cfbc49008e40e (patch) | |
tree | 6aef6ca8942dccc8c879e851f99afa7c0a1e2cb7 /test/netlib/test_tcp.py | |
parent | 9870844b38c84e7446b15909758497cecb26301e (diff) | |
download | mitmproxy-f964d49853a3f0d22e0f6d4cff7cfbc49008e40e.tar.gz mitmproxy-f964d49853a3f0d22e0f6d4cff7cfbc49008e40e.tar.bz2 mitmproxy-f964d49853a3f0d22e0f6d4cff7cfbc49008e40e.zip |
netlib.certutils -> mitmproxy.certs
Diffstat (limited to 'test/netlib/test_tcp.py')
-rw-r--r-- | test/netlib/test_tcp.py | 53 |
1 files changed, 27 insertions, 26 deletions
diff --git a/test/netlib/test_tcp.py b/test/netlib/test_tcp.py index 797a5a04..2c1b92dc 100644 --- a/test/netlib/test_tcp.py +++ b/test/netlib/test_tcp.py @@ -9,9 +9,10 @@ import mock from OpenSSL import SSL -from netlib import tcp, certutils, tutils -from netlib.exceptions import InvalidCertificateException, TcpReadIncomplete, TlsException, \ - TcpTimeout, TcpDisconnect, TcpException, NetlibException +from mitmproxy import certs +from netlib import tcp +from netlib import tutils +from netlib import exceptions from . import tservers @@ -108,7 +109,7 @@ class TestServerBind(tservers.ServerTestBase): with c.connect(): assert c.rfile.readline() == str(("127.0.0.1", random_port)).encode() return - except TcpException: # port probably already in use + except exceptions.TcpException: # port probably already in use pass @@ -155,7 +156,7 @@ class TestFinishFail(tservers.ServerTestBase): c = tcp.TCPClient(("127.0.0.1", self.port)) with c.connect(): c.wfile.write(b"foo\n") - c.wfile.flush = mock.Mock(side_effect=TcpDisconnect) + c.wfile.flush = mock.Mock(side_effect=exceptions.TcpDisconnect) c.finish() @@ -195,7 +196,7 @@ class TestSSLv3Only(tservers.ServerTestBase): def test_failure(self): c = tcp.TCPClient(("127.0.0.1", self.port)) with c.connect(): - tutils.raises(TlsException, c.convert_to_ssl, sni="foo.com") + tutils.raises(exceptions.TlsException, c.convert_to_ssl, sni="foo.com") class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase): @@ -236,7 +237,7 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase): def test_mode_strict_should_fail(self): c = tcp.TCPClient(("127.0.0.1", self.port)) with c.connect(): - with tutils.raises(InvalidCertificateException): + with tutils.raises(exceptions.InvalidCertificateException): c.convert_to_ssl( sni="example.mitmproxy.org", verify_options=SSL.VERIFY_PEER, @@ -261,7 +262,7 @@ class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase): def test_should_fail_without_sni(self): c = tcp.TCPClient(("127.0.0.1", self.port)) with c.connect(): - with tutils.raises(TlsException): + with tutils.raises(exceptions.TlsException): c.convert_to_ssl( verify_options=SSL.VERIFY_PEER, ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt") @@ -270,7 +271,7 @@ class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase): def test_should_fail(self): c = tcp.TCPClient(("127.0.0.1", self.port)) with c.connect(): - with tutils.raises(InvalidCertificateException): + with tutils.raises(exceptions.InvalidCertificateException): c.convert_to_ssl( sni="mitmproxy.org", verify_options=SSL.VERIFY_PEER, @@ -348,7 +349,7 @@ class TestSSLClientCert(tservers.ServerTestBase): c = tcp.TCPClient(("127.0.0.1", self.port)) with c.connect(): tutils.raises( - TlsException, + exceptions.TlsException, c.convert_to_ssl, cert=tutils.test_data.path("data/clientcert/make") ) @@ -454,7 +455,7 @@ class TestSSLDisconnect(tservers.ServerTestBase): # Excercise SSL.ZeroReturnError c.rfile.read(10) c.close() - tutils.raises(TcpDisconnect, c.wfile.write, b"foo") + tutils.raises(exceptions.TcpDisconnect, c.wfile.write, b"foo") tutils.raises(queue.Empty, self.q.get_nowait) @@ -469,7 +470,7 @@ class TestSSLHardDisconnect(tservers.ServerTestBase): # Exercise SSL.SysCallError c.rfile.read(10) c.close() - tutils.raises(TcpDisconnect, c.wfile.write, b"foo") + tutils.raises(exceptions.TcpDisconnect, c.wfile.write, b"foo") class TestDisconnect(tservers.ServerTestBase): @@ -492,7 +493,7 @@ class TestServerTimeOut(tservers.ServerTestBase): self.settimeout(0.01) try: self.rfile.read(10) - except TcpTimeout: + except exceptions.TcpTimeout: self.timeout = True def test_timeout(self): @@ -510,7 +511,7 @@ class TestTimeOut(tservers.ServerTestBase): with c.connect(): c.settimeout(0.1) assert c.gettimeout() == 0.1 - tutils.raises(TcpTimeout, c.rfile.read, 10) + tutils.raises(exceptions.TcpTimeout, c.rfile.read, 10) class TestALPNClient(tservers.ServerTestBase): @@ -562,13 +563,13 @@ class TestSSLTimeOut(tservers.ServerTestBase): with c.connect(): c.convert_to_ssl() c.settimeout(0.1) - tutils.raises(TcpTimeout, c.rfile.read, 10) + tutils.raises(exceptions.TcpTimeout, c.rfile.read, 10) class TestDHParams(tservers.ServerTestBase): handler = HangHandler ssl = dict( - dhparams=certutils.CertStore.load_dhparam( + dhparams=certs.CertStore.load_dhparam( tutils.test_data.path("data/dhparam.pem"), ), cipher_list="DHE-RSA-AES256-SHA" @@ -584,7 +585,7 @@ class TestDHParams(tservers.ServerTestBase): def test_create_dhparams(self): with tutils.tmpdir() as d: filename = os.path.join(d, "dhparam.pem") - certutils.CertStore.load_dhparam(filename) + certs.CertStore.load_dhparam(filename) assert os.path.exists(filename) @@ -592,7 +593,7 @@ class TestTCPClient: def test_conerr(self): c = tcp.TCPClient(("127.0.0.1", 0)) - tutils.raises(TcpException, c.connect) + tutils.raises(exceptions.TcpException, c.connect) class TestFileLike: @@ -661,7 +662,7 @@ class TestFileLike: o = mock.MagicMock() o.flush = mock.MagicMock(side_effect=socket.error) s.o = o - tutils.raises(TcpDisconnect, s.flush) + tutils.raises(exceptions.TcpDisconnect, s.flush) def test_reader_read_error(self): s = BytesIO(b"foobar\nfoobar") @@ -669,7 +670,7 @@ class TestFileLike: o = mock.MagicMock() o.read = mock.MagicMock(side_effect=socket.error) s.o = o - tutils.raises(TcpDisconnect, s.read, 10) + tutils.raises(exceptions.TcpDisconnect, s.read, 10) def test_reset_timestamps(self): s = BytesIO(b"foobar\nfoobar") @@ -700,24 +701,24 @@ class TestFileLike: s = mock.MagicMock() s.read = mock.MagicMock(side_effect=SSL.Error()) s = tcp.Reader(s) - tutils.raises(TlsException, s.read, 1) + tutils.raises(exceptions.TlsException, s.read, 1) def test_read_syscall_ssl_error(self): s = mock.MagicMock() s.read = mock.MagicMock(side_effect=SSL.SysCallError()) s = tcp.Reader(s) - tutils.raises(TlsException, s.read, 1) + tutils.raises(exceptions.TlsException, s.read, 1) def test_reader_readline_disconnect(self): o = mock.MagicMock() o.read = mock.MagicMock(side_effect=socket.error) s = tcp.Reader(o) - tutils.raises(TcpDisconnect, s.readline, 10) + tutils.raises(exceptions.TcpDisconnect, s.readline, 10) def test_reader_incomplete_error(self): s = BytesIO(b"foobar") s = tcp.Reader(s) - tutils.raises(TcpReadIncomplete, s.safe_read, 10) + tutils.raises(exceptions.TcpReadIncomplete, s.safe_read, 10) class TestPeek(tservers.ServerTestBase): @@ -738,11 +739,11 @@ class TestPeek(tservers.ServerTestBase): assert c.rfile.readline() == testval c.close() - with tutils.raises(NetlibException): + with tutils.raises(exceptions.NetlibException): if c.rfile.peek(1) == b"": # Workaround for Python 2 on Unix: # Peeking a closed connection does not raise an exception here. - raise NetlibException() + raise exceptions.NetlibException() class TestPeekSSL(TestPeek): |