aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@corte.si>2016-06-14 17:11:41 +1200
committerGitHub <noreply@github.com>2016-06-14 17:11:41 +1200
commit04394ebd503fc26d9a4bcb201607b9dd3af157fa (patch)
tree9553d0f45e705da04bfa4be8dd5e39d09fcd112f /test
parentd8ae2f156203a81a8e6d325f5c460c351cfbfc5c (diff)
parent93276d45be68790b5f4aeb4577c380ca1608efb3 (diff)
downloadmitmproxy-04394ebd503fc26d9a4bcb201607b9dd3af157fa.tar.gz
mitmproxy-04394ebd503fc26d9a4bcb201607b9dd3af157fa.tar.bz2
mitmproxy-04394ebd503fc26d9a4bcb201607b9dd3af157fa.zip
Merge pull request #1251 from cortesi/netlibrace
Roll connect handlers and thread leak detection out in more of the netlib test suite
Diffstat (limited to 'test')
-rw-r--r--test/netlib/http/http2/test_connections.py200
-rw-r--r--test/netlib/test_tcp.py429
-rw-r--r--test/netlib/tservers.py3
3 files changed, 321 insertions, 311 deletions
diff --git a/test/netlib/http/http2/test_connections.py b/test/netlib/http/http2/test_connections.py
index 27cc30ba..2a43627a 100644
--- a/test/netlib/http/http2/test_connections.py
+++ b/test/netlib/http/http2/test_connections.py
@@ -75,10 +75,10 @@ class TestCheckALPNMatch(tservers.ServerTestBase):
def test_check_alpn(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl(alpn_protos=[b'h2'])
- protocol = HTTP2Protocol(c)
- assert protocol.check_alpn()
+ with c.connect():
+ c.convert_to_ssl(alpn_protos=[b'h2'])
+ protocol = HTTP2Protocol(c)
+ assert protocol.check_alpn()
class TestCheckALPNMismatch(tservers.ServerTestBase):
@@ -91,11 +91,11 @@ class TestCheckALPNMismatch(tservers.ServerTestBase):
def test_check_alpn(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl(alpn_protos=[b'h2'])
- protocol = HTTP2Protocol(c)
- with raises(NotImplementedError):
- protocol.check_alpn()
+ with c.connect():
+ c.convert_to_ssl(alpn_protos=[b'h2'])
+ protocol = HTTP2Protocol(c)
+ with raises(NotImplementedError):
+ protocol.check_alpn()
class TestPerformServerConnectionPreface(tservers.ServerTestBase):
@@ -124,15 +124,15 @@ class TestPerformServerConnectionPreface(tservers.ServerTestBase):
def test_perform_server_connection_preface(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- protocol = HTTP2Protocol(c)
+ with c.connect():
+ protocol = HTTP2Protocol(c)
- assert not protocol.connection_preface_performed
- protocol.perform_server_connection_preface()
- assert protocol.connection_preface_performed
+ assert not protocol.connection_preface_performed
+ protocol.perform_server_connection_preface()
+ assert protocol.connection_preface_performed
- with raises(TcpDisconnect):
- protocol.perform_server_connection_preface(force=True)
+ with raises(TcpDisconnect):
+ protocol.perform_server_connection_preface(force=True)
class TestPerformClientConnectionPreface(tservers.ServerTestBase):
@@ -160,12 +160,12 @@ class TestPerformClientConnectionPreface(tservers.ServerTestBase):
def test_perform_client_connection_preface(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- protocol = HTTP2Protocol(c)
+ with c.connect():
+ protocol = HTTP2Protocol(c)
- assert not protocol.connection_preface_performed
- protocol.perform_client_connection_preface()
- assert protocol.connection_preface_performed
+ assert not protocol.connection_preface_performed
+ protocol.perform_client_connection_preface()
+ assert protocol.connection_preface_performed
class TestClientStreamIds(object):
@@ -209,24 +209,24 @@ class TestApplySettings(tservers.ServerTestBase):
def test_apply_settings(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- protocol = HTTP2Protocol(c)
+ with c.connect():
+ c.convert_to_ssl()
+ protocol = HTTP2Protocol(c)
- protocol._apply_settings({
- hyperframe.frame.SettingsFrame.ENABLE_PUSH: 'foo',
- hyperframe.frame.SettingsFrame.MAX_CONCURRENT_STREAMS: 'bar',
- hyperframe.frame.SettingsFrame.INITIAL_WINDOW_SIZE: 'deadbeef',
- })
+ protocol._apply_settings({
+ hyperframe.frame.SettingsFrame.ENABLE_PUSH: 'foo',
+ hyperframe.frame.SettingsFrame.MAX_CONCURRENT_STREAMS: 'bar',
+ hyperframe.frame.SettingsFrame.INITIAL_WINDOW_SIZE: 'deadbeef',
+ })
- assert c.rfile.safe_read(2) == b"OK"
+ assert c.rfile.safe_read(2) == b"OK"
- assert protocol.http2_settings[
- hyperframe.frame.SettingsFrame.ENABLE_PUSH] == 'foo'
- assert protocol.http2_settings[
- hyperframe.frame.SettingsFrame.MAX_CONCURRENT_STREAMS] == 'bar'
- assert protocol.http2_settings[
- hyperframe.frame.SettingsFrame.INITIAL_WINDOW_SIZE] == 'deadbeef'
+ assert protocol.http2_settings[
+ hyperframe.frame.SettingsFrame.ENABLE_PUSH] == 'foo'
+ assert protocol.http2_settings[
+ hyperframe.frame.SettingsFrame.MAX_CONCURRENT_STREAMS] == 'bar'
+ assert protocol.http2_settings[
+ hyperframe.frame.SettingsFrame.INITIAL_WINDOW_SIZE] == 'deadbeef'
class TestCreateHeaders(object):
@@ -304,19 +304,19 @@ class TestReadRequest(tservers.ServerTestBase):
def test_read_request(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- protocol = HTTP2Protocol(c, is_server=True)
- protocol.connection_preface_performed = True
+ with c.connect():
+ c.convert_to_ssl()
+ protocol = HTTP2Protocol(c, is_server=True)
+ protocol.connection_preface_performed = True
- req = protocol.read_request(NotImplemented)
+ req = protocol.read_request(NotImplemented)
- assert req.stream_id
- assert req.headers.fields == ()
- assert req.method == "GET"
- assert req.path == "/"
- assert req.scheme == "https"
- assert req.content == b'foobar'
+ assert req.stream_id
+ assert req.headers.fields == ()
+ assert req.method == "GET"
+ assert req.path == "/"
+ assert req.scheme == "https"
+ assert req.content == b'foobar'
class TestReadRequestRelative(tservers.ServerTestBase):
@@ -330,16 +330,16 @@ class TestReadRequestRelative(tservers.ServerTestBase):
def test_asterisk_form(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- protocol = HTTP2Protocol(c, is_server=True)
- protocol.connection_preface_performed = True
+ with c.connect():
+ c.convert_to_ssl()
+ protocol = HTTP2Protocol(c, is_server=True)
+ protocol.connection_preface_performed = True
- req = protocol.read_request(NotImplemented)
+ req = protocol.read_request(NotImplemented)
- assert req.first_line_format == "relative"
- assert req.method == "OPTIONS"
- assert req.path == "*"
+ assert req.first_line_format == "relative"
+ assert req.method == "OPTIONS"
+ assert req.path == "*"
class TestReadRequestAbsolute(tservers.ServerTestBase):
@@ -353,17 +353,17 @@ class TestReadRequestAbsolute(tservers.ServerTestBase):
def test_absolute_form(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- protocol = HTTP2Protocol(c, is_server=True)
- protocol.connection_preface_performed = True
+ with c.connect():
+ c.convert_to_ssl()
+ protocol = HTTP2Protocol(c, is_server=True)
+ protocol.connection_preface_performed = True
- req = protocol.read_request(NotImplemented)
+ req = protocol.read_request(NotImplemented)
- assert req.first_line_format == "absolute"
- assert req.scheme == "http"
- assert req.host == "address"
- assert req.port == 22
+ assert req.first_line_format == "absolute"
+ assert req.scheme == "http"
+ assert req.host == "address"
+ assert req.port == 22
class TestReadRequestConnect(tservers.ServerTestBase):
@@ -379,22 +379,22 @@ class TestReadRequestConnect(tservers.ServerTestBase):
def test_connect(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- protocol = HTTP2Protocol(c, is_server=True)
- protocol.connection_preface_performed = True
+ with c.connect():
+ c.convert_to_ssl()
+ protocol = HTTP2Protocol(c, is_server=True)
+ protocol.connection_preface_performed = True
- req = protocol.read_request(NotImplemented)
- assert req.first_line_format == "authority"
- assert req.method == "CONNECT"
- assert req.host == "address"
- assert req.port == 22
+ req = protocol.read_request(NotImplemented)
+ assert req.first_line_format == "authority"
+ assert req.method == "CONNECT"
+ assert req.host == "address"
+ assert req.port == 22
- req = protocol.read_request(NotImplemented)
- assert req.first_line_format == "authority"
- assert req.method == "CONNECT"
- assert req.host == "example.com"
- assert req.port == 443
+ req = protocol.read_request(NotImplemented)
+ assert req.first_line_format == "authority"
+ assert req.method == "CONNECT"
+ assert req.host == "example.com"
+ assert req.port == 443
class TestReadResponse(tservers.ServerTestBase):
@@ -411,19 +411,19 @@ class TestReadResponse(tservers.ServerTestBase):
def test_read_response(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- protocol = HTTP2Protocol(c)
- protocol.connection_preface_performed = True
+ with c.connect():
+ c.convert_to_ssl()
+ protocol = HTTP2Protocol(c)
+ protocol.connection_preface_performed = True
- resp = protocol.read_response(NotImplemented, stream_id=42)
+ resp = protocol.read_response(NotImplemented, stream_id=42)
- assert resp.http_version == "HTTP/2.0"
- assert resp.status_code == 200
- assert resp.reason == ''
- assert resp.headers.fields == ((b':status', b'200'), (b'etag', b'foobar'))
- assert resp.content == b'foobar'
- assert resp.timestamp_end
+ assert resp.http_version == "HTTP/2.0"
+ assert resp.status_code == 200
+ assert resp.reason == ''
+ assert resp.headers.fields == ((b':status', b'200'), (b'etag', b'foobar'))
+ assert resp.content == b'foobar'
+ assert resp.timestamp_end
class TestReadEmptyResponse(tservers.ServerTestBase):
@@ -437,19 +437,19 @@ class TestReadEmptyResponse(tservers.ServerTestBase):
def test_read_empty_response(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- protocol = HTTP2Protocol(c)
- protocol.connection_preface_performed = True
+ with c.connect():
+ c.convert_to_ssl()
+ protocol = HTTP2Protocol(c)
+ protocol.connection_preface_performed = True
- resp = protocol.read_response(NotImplemented, stream_id=42)
+ resp = protocol.read_response(NotImplemented, stream_id=42)
- assert resp.stream_id == 42
- assert resp.http_version == "HTTP/2.0"
- assert resp.status_code == 200
- assert resp.reason == ''
- assert resp.headers.fields == ((b':status', b'200'), (b'etag', b'foobar'))
- assert resp.content == b''
+ assert resp.stream_id == 42
+ assert resp.http_version == "HTTP/2.0"
+ assert resp.status_code == 200
+ assert resp.reason == ''
+ assert resp.headers.fields == ((b':status', b'200'), (b'etag', b'foobar'))
+ assert resp.content == b''
class TestAssembleRequest(object):
diff --git a/test/netlib/test_tcp.py b/test/netlib/test_tcp.py
index 083360b4..590bcc01 100644
--- a/test/netlib/test_tcp.py
+++ b/test/netlib/test_tcp.py
@@ -39,8 +39,21 @@ class ClientCipherListHandler(tcp.BaseHandler):
class HangHandler(tcp.BaseHandler):
def handle(self):
+ # Hang as long as the client connection is alive
while True:
- time.sleep(1)
+ try:
+ self.connection.setblocking(0)
+ ret = self.connection.recv(1)
+ # Client connection is dead...
+ if ret == "" or ret == b"":
+ return
+ except socket.error:
+ pass
+ except SSL.WantReadError:
+ pass
+ except Exception:
+ return
+ time.sleep(0.1)
class ALPNHandler(tcp.BaseHandler):
@@ -61,18 +74,18 @@ class TestServer(tservers.ServerTestBase):
def test_echo(self):
testval = b"echo!\n"
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.wfile.write(testval)
- c.wfile.flush()
- assert c.rfile.readline() == testval
+ with c.connect():
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
def test_thread_start_error(self):
with mock.patch.object(threading.Thread, "start", side_effect=threading.ThreadError("nonewthread")) as m:
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- assert not c.rfile.read(1)
- assert m.called
- assert "nonewthread" in self.q.get_nowait()
+ with c.connect():
+ assert not c.rfile.read(1)
+ assert m.called
+ assert "nonewthread" in self.q.get_nowait()
self.test_echo()
@@ -92,9 +105,9 @@ class TestServerBind(tservers.ServerTestBase):
c = tcp.TCPClient(
("127.0.0.1", self.port), source_address=(
"127.0.0.1", random_port))
- c.connect()
- assert c.rfile.readline() == str(("127.0.0.1", random_port)).encode()
- return
+ with c.connect():
+ assert c.rfile.readline() == str(("127.0.0.1", random_port)).encode()
+ return
except TcpException: # port probably already in use
pass
@@ -106,10 +119,10 @@ class TestServerIPv6(tservers.ServerTestBase):
def test_echo(self):
testval = b"echo!\n"
c = tcp.TCPClient(tcp.Address(("::1", self.port), use_ipv6=True))
- c.connect()
- c.wfile.write(testval)
- c.wfile.flush()
- assert c.rfile.readline() == testval
+ with c.connect():
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
class TestEcho(tservers.ServerTestBase):
@@ -118,10 +131,10 @@ class TestEcho(tservers.ServerTestBase):
def test_echo(self):
testval = b"echo!\n"
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.wfile.write(testval)
- c.wfile.flush()
- assert c.rfile.readline() == testval
+ with c.connect():
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
class HardDisconnectHandler(tcp.BaseHandler):
@@ -140,10 +153,10 @@ class TestFinishFail(tservers.ServerTestBase):
def test_disconnect_in_finish(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.wfile.write(b"foo\n")
- c.wfile.flush = mock.Mock(side_effect=TcpDisconnect)
- c.finish()
+ with c.connect():
+ c.wfile.write(b"foo\n")
+ c.wfile.flush = mock.Mock(side_effect=TcpDisconnect)
+ c.finish()
class TestServerSSL(tservers.ServerTestBase):
@@ -155,21 +168,21 @@ class TestServerSSL(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl(sni=b"foo.com", options=SSL.OP_ALL)
- testval = b"echo!\n"
- c.wfile.write(testval)
- c.wfile.flush()
- assert c.rfile.readline() == testval
+ with c.connect():
+ c.convert_to_ssl(sni=b"foo.com", options=SSL.OP_ALL)
+ testval = b"echo!\n"
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
def test_get_current_cipher(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- assert not c.get_current_cipher()
- c.convert_to_ssl(sni=b"foo.com")
- ret = c.get_current_cipher()
- assert ret
- assert "AES" in ret[0]
+ with c.connect():
+ assert not c.get_current_cipher()
+ c.convert_to_ssl(sni=b"foo.com")
+ ret = c.get_current_cipher()
+ assert ret
+ assert "AES" in ret[0]
class TestSSLv3Only(tservers.ServerTestBase):
@@ -181,8 +194,8 @@ class TestSSLv3Only(tservers.ServerTestBase):
def test_failure(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- tutils.raises(TlsException, c.convert_to_ssl, sni=b"foo.com")
+ with c.connect():
+ tutils.raises(TlsException, c.convert_to_ssl, sni=b"foo.com")
class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
@@ -195,49 +208,46 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
def test_mode_default_should_pass(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
-
- c.convert_to_ssl()
+ with c.connect():
+ c.convert_to_ssl()
- # Verification errors should be saved even if connection isn't aborted
- # aborted
- assert c.ssl_verification_error is not None
+ # Verification errors should be saved even if connection isn't aborted
+ # aborted
+ assert c.ssl_verification_error is not None
- testval = b"echo!\n"
- c.wfile.write(testval)
- c.wfile.flush()
- assert c.rfile.readline() == testval
+ testval = b"echo!\n"
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
def test_mode_none_should_pass(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
-
- c.convert_to_ssl(verify_options=SSL.VERIFY_NONE)
+ with c.connect():
+ c.convert_to_ssl(verify_options=SSL.VERIFY_NONE)
- # Verification errors should be saved even if connection isn't aborted
- assert c.ssl_verification_error is not None
+ # Verification errors should be saved even if connection isn't aborted
+ assert c.ssl_verification_error is not None
- testval = b"echo!\n"
- c.wfile.write(testval)
- c.wfile.flush()
- assert c.rfile.readline() == testval
+ testval = b"echo!\n"
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
def test_mode_strict_should_fail(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
-
- with tutils.raises(InvalidCertificateException):
- c.convert_to_ssl(
- sni=b"example.mitmproxy.org",
- verify_options=SSL.VERIFY_PEER,
- ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
- )
+ with c.connect():
+ with tutils.raises(InvalidCertificateException):
+ c.convert_to_ssl(
+ sni=b"example.mitmproxy.org",
+ verify_options=SSL.VERIFY_PEER,
+ ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
+ )
- assert c.ssl_verification_error is not None
+ assert c.ssl_verification_error is not None
- # Unknown issuing certificate authority for first certificate
- assert c.ssl_verification_error['errno'] == 18
- assert c.ssl_verification_error['depth'] == 0
+ # Unknown issuing certificate authority for first certificate
+ assert c.ssl_verification_error['errno'] == 18
+ assert c.ssl_verification_error['depth'] == 0
class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase):
@@ -250,26 +260,23 @@ class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase):
def test_should_fail_without_sni(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
-
- with tutils.raises(TlsException):
- c.convert_to_ssl(
- verify_options=SSL.VERIFY_PEER,
- ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
- )
+ with c.connect():
+ with tutils.raises(TlsException):
+ c.convert_to_ssl(
+ verify_options=SSL.VERIFY_PEER,
+ ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
+ )
def test_should_fail(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
-
- with tutils.raises(InvalidCertificateException):
- c.convert_to_ssl(
- sni=b"mitmproxy.org",
- verify_options=SSL.VERIFY_PEER,
- ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
- )
-
- assert c.ssl_verification_error is not None
+ with c.connect():
+ with tutils.raises(InvalidCertificateException):
+ c.convert_to_ssl(
+ sni=b"mitmproxy.org",
+ verify_options=SSL.VERIFY_PEER,
+ ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
+ )
+ assert c.ssl_verification_error is not None
class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase):
@@ -282,37 +289,35 @@ class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase):
def test_mode_strict_w_pemfile_should_pass(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
-
- c.convert_to_ssl(
- sni=b"example.mitmproxy.org",
- verify_options=SSL.VERIFY_PEER,
- ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
- )
+ with c.connect():
+ c.convert_to_ssl(
+ sni=b"example.mitmproxy.org",
+ verify_options=SSL.VERIFY_PEER,
+ ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
+ )
- assert c.ssl_verification_error is None
+ assert c.ssl_verification_error is None
- testval = b"echo!\n"
- c.wfile.write(testval)
- c.wfile.flush()
- assert c.rfile.readline() == testval
+ testval = b"echo!\n"
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
def test_mode_strict_w_cadir_should_pass(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
-
- c.convert_to_ssl(
- sni=b"example.mitmproxy.org",
- verify_options=SSL.VERIFY_PEER,
- ca_path=tutils.test_data.path("data/verificationcerts/")
- )
+ with c.connect():
+ c.convert_to_ssl(
+ sni=b"example.mitmproxy.org",
+ verify_options=SSL.VERIFY_PEER,
+ ca_path=tutils.test_data.path("data/verificationcerts/")
+ )
- assert c.ssl_verification_error is None
+ assert c.ssl_verification_error is None
- testval = b"echo!\n"
- c.wfile.write(testval)
- c.wfile.flush()
- assert c.rfile.readline() == testval
+ testval = b"echo!\n"
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
class TestSSLClientCert(tservers.ServerTestBase):
@@ -334,19 +339,19 @@ class TestSSLClientCert(tservers.ServerTestBase):
def test_clientcert(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl(
- cert=tutils.test_data.path("data/clientcert/client.pem"))
- assert c.rfile.readline().strip() == b"1"
+ with c.connect():
+ c.convert_to_ssl(
+ cert=tutils.test_data.path("data/clientcert/client.pem"))
+ assert c.rfile.readline().strip() == b"1"
def test_clientcert_err(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- tutils.raises(
- TlsException,
- c.convert_to_ssl,
- cert=tutils.test_data.path("data/clientcert/make")
- )
+ with c.connect():
+ tutils.raises(
+ TlsException,
+ c.convert_to_ssl,
+ cert=tutils.test_data.path("data/clientcert/make")
+ )
class TestSNI(tservers.ServerTestBase):
@@ -365,10 +370,10 @@ class TestSNI(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl(sni=b"foo.com")
- assert c.sni == b"foo.com"
- assert c.rfile.readline() == b"foo.com"
+ with c.connect():
+ c.convert_to_ssl(sni=b"foo.com")
+ assert c.sni == b"foo.com"
+ assert c.rfile.readline() == b"foo.com"
class TestServerCipherList(tservers.ServerTestBase):
@@ -379,9 +384,9 @@ class TestServerCipherList(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl(sni=b"foo.com")
- assert c.rfile.readline() == b"['RC4-SHA']"
+ with c.connect():
+ c.convert_to_ssl(sni=b"foo.com")
+ assert c.rfile.readline() == b"['RC4-SHA']"
class TestServerCurrentCipher(tservers.ServerTestBase):
@@ -399,9 +404,9 @@ class TestServerCurrentCipher(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl(sni=b"foo.com")
- assert b"RC4-SHA" in c.rfile.readline()
+ with c.connect():
+ c.convert_to_ssl(sni=b"foo.com")
+ assert b"RC4-SHA" in c.rfile.readline()
class TestServerCipherListError(tservers.ServerTestBase):
@@ -412,8 +417,8 @@ class TestServerCipherListError(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- tutils.raises("handshake error", c.convert_to_ssl, sni=b"foo.com")
+ with c.connect():
+ tutils.raises("handshake error", c.convert_to_ssl, sni=b"foo.com")
class TestClientCipherListError(tservers.ServerTestBase):
@@ -424,12 +429,13 @@ class TestClientCipherListError(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- tutils.raises(
- "cipher specification",
- c.convert_to_ssl,
- sni=b"foo.com",
- cipher_list="bogus")
+ with c.connect():
+ tutils.raises(
+ "cipher specification",
+ c.convert_to_ssl,
+ sni=b"foo.com",
+ cipher_list="bogus"
+ )
class TestSSLDisconnect(tservers.ServerTestBase):
@@ -443,13 +449,13 @@ class TestSSLDisconnect(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- # Excercise SSL.ZeroReturnError
- c.rfile.read(10)
- c.close()
- tutils.raises(TcpDisconnect, c.wfile.write, b"foo")
- tutils.raises(queue.Empty, self.q.get_nowait)
+ with c.connect():
+ c.convert_to_ssl()
+ # Excercise SSL.ZeroReturnError
+ c.rfile.read(10)
+ c.close()
+ tutils.raises(TcpDisconnect, c.wfile.write, b"foo")
+ tutils.raises(queue.Empty, self.q.get_nowait)
class TestSSLHardDisconnect(tservers.ServerTestBase):
@@ -458,23 +464,23 @@ class TestSSLHardDisconnect(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- # Exercise SSL.SysCallError
- c.rfile.read(10)
- c.close()
- tutils.raises(TcpDisconnect, c.wfile.write, b"foo")
+ with c.connect():
+ c.convert_to_ssl()
+ # Exercise SSL.SysCallError
+ c.rfile.read(10)
+ c.close()
+ tutils.raises(TcpDisconnect, c.wfile.write, b"foo")
class TestDisconnect(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.rfile.read(10)
- c.wfile.write(b"foo")
- c.close()
- c.close()
+ with c.connect():
+ c.rfile.read(10)
+ c.wfile.write(b"foo")
+ c.close()
+ c.close()
class TestServerTimeOut(tservers.ServerTestBase):
@@ -491,9 +497,9 @@ class TestServerTimeOut(tservers.ServerTestBase):
def test_timeout(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- time.sleep(0.3)
- assert self.last_handler.timeout
+ with c.connect():
+ time.sleep(0.3)
+ assert self.last_handler.timeout
class TestTimeOut(tservers.ServerTestBase):
@@ -501,10 +507,10 @@ class TestTimeOut(tservers.ServerTestBase):
def test_timeout(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.settimeout(0.1)
- assert c.gettimeout() == 0.1
- tutils.raises(TcpTimeout, c.rfile.read, 10)
+ with c.connect():
+ c.settimeout(0.1)
+ assert c.gettimeout() == 0.1
+ tutils.raises(TcpTimeout, c.rfile.read, 10)
class TestALPNClient(tservers.ServerTestBase):
@@ -516,25 +522,25 @@ class TestALPNClient(tservers.ServerTestBase):
if tcp.HAS_ALPN:
def test_alpn(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl(alpn_protos=[b"foo", b"bar", b"fasel"])
- assert c.get_alpn_proto_negotiated() == b"bar"
- assert c.rfile.readline().strip() == b"bar"
+ with c.connect():
+ c.convert_to_ssl(alpn_protos=[b"foo", b"bar", b"fasel"])
+ assert c.get_alpn_proto_negotiated() == b"bar"
+ assert c.rfile.readline().strip() == b"bar"
def test_no_alpn(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- assert c.get_alpn_proto_negotiated() == b""
- assert c.rfile.readline().strip() == b"NONE"
+ with c.connect():
+ c.convert_to_ssl()
+ assert c.get_alpn_proto_negotiated() == b""
+ assert c.rfile.readline().strip() == b"NONE"
else:
def test_none_alpn(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl(alpn_protos=[b"foo", b"bar", b"fasel"])
- assert c.get_alpn_proto_negotiated() == b""
- assert c.rfile.readline() == b"NONE"
+ with c.connect():
+ c.convert_to_ssl(alpn_protos=[b"foo", b"bar", b"fasel"])
+ assert c.get_alpn_proto_negotiated() == b""
+ assert c.rfile.readline() == b"NONE"
class TestNoSSLNoALPNClient(tservers.ServerTestBase):
@@ -542,9 +548,9 @@ class TestNoSSLNoALPNClient(tservers.ServerTestBase):
def test_no_ssl_no_alpn(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- assert c.get_alpn_proto_negotiated() == b""
- assert c.rfile.readline().strip() == b"NONE"
+ with c.connect():
+ assert c.get_alpn_proto_negotiated() == b""
+ assert c.rfile.readline().strip() == b"NONE"
class TestSSLTimeOut(tservers.ServerTestBase):
@@ -553,10 +559,10 @@ class TestSSLTimeOut(tservers.ServerTestBase):
def test_timeout_client(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- c.settimeout(0.1)
- tutils.raises(TcpTimeout, c.rfile.read, 10)
+ with c.connect():
+ c.convert_to_ssl()
+ c.settimeout(0.1)
+ tutils.raises(TcpTimeout, c.rfile.read, 10)
class TestDHParams(tservers.ServerTestBase):
@@ -570,10 +576,10 @@ class TestDHParams(tservers.ServerTestBase):
def test_dhparams(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- ret = c.get_current_cipher()
- assert ret[0] == "DHE-RSA-AES256-SHA"
+ with c.connect():
+ c.convert_to_ssl()
+ ret = c.get_current_cipher()
+ assert ret[0] == "DHE-RSA-AES256-SHA"
def test_create_dhparams(self):
with tutils.tmpdir() as d:
@@ -718,33 +724,34 @@ class TestPeek(tservers.ServerTestBase):
handler = EchoHandler
def _connect(self, c):
- c.connect()
+ return c.connect()
def test_peek(self):
testval = b"peek!\n"
c = tcp.TCPClient(("127.0.0.1", self.port))
- self._connect(c)
- c.wfile.write(testval)
- c.wfile.flush()
+ with self._connect(c):
+ c.wfile.write(testval)
+ c.wfile.flush()
- assert c.rfile.peek(4) == b"peek"
- assert c.rfile.peek(6) == b"peek!\n"
- assert c.rfile.readline() == testval
+ assert c.rfile.peek(4) == b"peek"
+ assert c.rfile.peek(6) == b"peek!\n"
+ assert c.rfile.readline() == testval
- c.close()
- with tutils.raises(NetlibException):
- if c.rfile.peek(1) == b"":
- # Workaround for Python 2 on Unix:
- # Peeking a closed connection does not raise an exception here.
- raise NetlibException()
+ c.close()
+ with tutils.raises(NetlibException):
+ if c.rfile.peek(1) == b"":
+ # Workaround for Python 2 on Unix:
+ # Peeking a closed connection does not raise an exception here.
+ raise NetlibException()
class TestPeekSSL(TestPeek):
ssl = True
def _connect(self, c):
- c.connect()
- c.convert_to_ssl()
+ with c.connect() as conn:
+ c.convert_to_ssl()
+ return conn.pop()
class TestAddress:
@@ -774,16 +781,16 @@ class TestSSLKeyLogger(tservers.ServerTestBase):
tcp.log_ssl_key = tcp.SSLKeyLogger(logfile)
c = tcp.TCPClient(("127.0.0.1", self.port))
- c.connect()
- c.convert_to_ssl()
- c.wfile.write(testval)
- c.wfile.flush()
- assert c.rfile.readline() == testval
- c.finish()
-
- tcp.log_ssl_key.close()
- with open(logfile, "rb") as f:
- assert f.read().count(b"CLIENT_RANDOM") == 2
+ with c.connect():
+ c.convert_to_ssl()
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
+ c.finish()
+
+ tcp.log_ssl_key.close()
+ with open(logfile, "rb") as f:
+ assert f.read().count(b"CLIENT_RANDOM") == 2
tcp.log_ssl_key = _logfun
diff --git a/test/netlib/tservers.py b/test/netlib/tservers.py
index 569745e6..803aaa72 100644
--- a/test/netlib/tservers.py
+++ b/test/netlib/tservers.py
@@ -104,6 +104,9 @@ class ServerTestBase(object):
def teardown_class(cls):
cls.server.shutdown()
+ def teardown(self):
+ self.server.server.wait_for_silence()
+
@property
def last_handler(self):
return self.server.server.last_handler