aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/data/not-server.crt15
-rw-r--r--test/http2/test_protocol.py (renamed from test/http2/test_http2_protocol.py)133
-rw-r--r--test/test_tcp.py97
3 files changed, 226 insertions, 19 deletions
diff --git a/test/data/not-server.crt b/test/data/not-server.crt
new file mode 100644
index 00000000..08c015c2
--- /dev/null
+++ b/test/data/not-server.crt
@@ -0,0 +1,15 @@
+-----BEGIN CERTIFICATE-----
+MIICRTCCAa4CCQD/j4qq1h3iCjANBgkqhkiG9w0BAQUFADBnMQswCQYDVQQGEwJV
+UzELMAkGA1UECBMCQ0ExETAPBgNVBAcTCFNvbWVDaXR5MRcwFQYDVQQKEw5Ob3RU
+aGVSaWdodE9yZzELMAkGA1UECxMCTkExEjAQBgNVBAMTCU5vdFNlcnZlcjAeFw0x
+NTA2MTMwMTE2MDZaFw0yNTA2MTAwMTE2MDZaMGcxCzAJBgNVBAYTAlVTMQswCQYD
+VQQIEwJDQTERMA8GA1UEBxMIU29tZUNpdHkxFzAVBgNVBAoTDk5vdFRoZVJpZ2h0
+T3JnMQswCQYDVQQLEwJOQTESMBAGA1UEAxMJTm90U2VydmVyMIGfMA0GCSqGSIb3
+DQEBAQUAA4GNADCBiQKBgQDPkJlXAOCMKF0R7aDn5QJ7HtrJgOUDk/LpbhKhRZZR
+dRGnJ4/HQxYYHh9k/4yZamYcvQPUxvFJt7UJUocf+84LUcIusUk7GvJMgsMVtFMq
+7UKNXBN5tl3oOtoFDWGMZ8ksaIxS6oW3V/9v2WgU23PfvwE0EZqy+QhMLZZP5GOH
+RwIDAQABMA0GCSqGSIb3DQEBBQUAA4GBAJI6UtMKdCS2ghjqhAek2W1rt9u+Wuvx
+776WYm5VyrJEtBDc/axLh0OteXzy/A31JrYe15fnVWIeFbDF0Ief9/Ezv6Jn+Pk8
+DErw5IHk2B399O4K3L3Eig06piu7uf3vE4l8ZanY02ZEnw7DyL6kmG9lX98VGenF
+uXPfu3yxKbR4
+-----END CERTIFICATE-----
diff --git a/test/http2/test_http2_protocol.py b/test/http2/test_protocol.py
index cb46bc68..9b49acd3 100644
--- a/test/http2/test_http2_protocol.py
+++ b/test/http2/test_protocol.py
@@ -1,4 +1,3 @@
-
import OpenSSL
from netlib import http2
@@ -50,7 +49,39 @@ class TestCheckALPNMismatch(test.ServerTestBase):
tutils.raises(NotImplementedError, protocol.check_alpn)
-class TestPerformConnectionPreface(test.ServerTestBase):
+class TestPerformServerConnectionPreface(test.ServerTestBase):
+ class handler(tcp.BaseHandler):
+
+ def handle(self):
+ # send magic
+ self.wfile.write(
+ '505249202a20485454502f322e300d0a0d0a534d0d0a0d0a'.decode('hex'))
+ self.wfile.flush()
+
+ # send empty settings frame
+ self.wfile.write('000000040000000000'.decode('hex'))
+ self.wfile.flush()
+
+ # check empty settings frame
+ assert self.rfile.read(9) ==\
+ '000000040000000000'.decode('hex')
+
+ # check settings acknowledgement
+ assert self.rfile.read(9) == \
+ '000000040100000000'.decode('hex')
+
+ # send settings acknowledgement
+ self.wfile.write('000000040100000000'.decode('hex'))
+ self.wfile.flush()
+
+ def test_perform_server_connection_preface(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ protocol = http2.HTTP2Protocol(c)
+ protocol.perform_server_connection_preface()
+
+
+class TestPerformClientConnectionPreface(test.ServerTestBase):
class handler(tcp.BaseHandler):
def handle(self):
@@ -74,21 +105,18 @@ class TestPerformConnectionPreface(test.ServerTestBase):
self.wfile.write('000000040100000000'.decode('hex'))
self.wfile.flush()
- ssl = True
-
- def test_perform_connection_preface(self):
+ def test_perform_client_connection_preface(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
- c.convert_to_ssl()
protocol = http2.HTTP2Protocol(c)
- protocol.perform_connection_preface()
+ protocol.perform_client_connection_preface()
-class TestStreamIds():
+class TestClientStreamIds():
c = tcp.TCPClient(("127.0.0.1", 0))
protocol = http2.HTTP2Protocol(c)
- def test_stream_ids(self):
+ def test_client_stream_ids(self):
assert self.protocol.current_stream_id is None
assert self.protocol.next_stream_id() == 1
assert self.protocol.current_stream_id == 1
@@ -98,6 +126,20 @@ class TestStreamIds():
assert self.protocol.current_stream_id == 5
+class TestServerStreamIds():
+ c = tcp.TCPClient(("127.0.0.1", 0))
+ protocol = http2.HTTP2Protocol(c, is_server=True)
+
+ def test_server_stream_ids(self):
+ assert self.protocol.current_stream_id is None
+ assert self.protocol.next_stream_id() == 2
+ assert self.protocol.current_stream_id == 2
+ assert self.protocol.next_stream_id() == 4
+ assert self.protocol.current_stream_id == 4
+ assert self.protocol.next_stream_id() == 6
+ assert self.protocol.current_stream_id == 6
+
+
class TestApplySettings(test.ServerTestBase):
class handler(tcp.BaseHandler):
@@ -180,14 +222,14 @@ class TestCreateRequest():
def test_create_request_simple(self):
bytes = http2.HTTP2Protocol(self.c).create_request('GET', '/')
assert len(bytes) == 1
- assert bytes[0] == '000003010500000001828487'.decode('hex')
+ assert bytes[0] == '00000d0105000000018284874188089d5c0b8170dc07'.decode('hex')
def test_create_request_with_body(self):
bytes = http2.HTTP2Protocol(self.c).create_request(
'GET', '/', [(b'foo', b'bar')], 'foobar')
assert len(bytes) == 2
assert bytes[0] ==\
- '00000b010400000001828487408294e7838c767f'.decode('hex')
+ '0000150104000000018284874188089d5c0b8170dc07408294e7838c767f'.decode('hex')
assert bytes[1] ==\
'000006000100000001666f6f626172'.decode('hex')
@@ -213,5 +255,72 @@ class TestReadResponse(test.ServerTestBase):
status, headers, body = protocol.read_response()
assert headers == {':status': '200', 'etag': 'foobar'}
- assert status == '200'
+ assert status == "200"
assert body == b'foobar'
+
+
+class TestReadEmptyResponse(test.ServerTestBase):
+ class handler(tcp.BaseHandler):
+
+ def handle(self):
+ self.wfile.write(
+ b'00000801050000000188628594e78c767f'.decode('hex'))
+ self.wfile.flush()
+
+ ssl = True
+
+ def test_read_empty_response(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl()
+ protocol = http2.HTTP2Protocol(c)
+
+ status, headers, body = protocol.read_response()
+
+ assert headers == {':status': '200', 'etag': 'foobar'}
+ assert status == "200"
+ assert body == b''
+
+
+class TestReadRequest(test.ServerTestBase):
+ class handler(tcp.BaseHandler):
+
+ def handle(self):
+ self.wfile.write(
+ b'000003010400000001828487'.decode('hex'))
+ self.wfile.write(
+ b'000006000100000001666f6f626172'.decode('hex'))
+ self.wfile.flush()
+
+ ssl = True
+
+ def test_read_request(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl()
+ protocol = http2.HTTP2Protocol(c, is_server=True)
+
+ stream_id, headers, body = protocol.read_request()
+
+ assert stream_id
+ assert headers == {':method': 'GET', ':path': '/', ':scheme': 'https'}
+ assert body == b'foobar'
+
+
+class TestCreateResponse():
+ c = tcp.TCPClient(("127.0.0.1", 0))
+
+ def test_create_response_simple(self):
+ bytes = http2.HTTP2Protocol(self.c, is_server=True).create_response(200)
+ assert len(bytes) == 1
+ assert bytes[0] ==\
+ '00000101050000000288'.decode('hex')
+
+ def test_create_response_with_body(self):
+ bytes = http2.HTTP2Protocol(self.c, is_server=True).create_response(
+ 200, 1, [(b'foo', b'bar')], 'foobar')
+ assert len(bytes) == 2
+ assert bytes[0] ==\
+ '00000901040000000188408294e7838c767f'.decode('hex')
+ assert bytes[1] ==\
+ '000006000100000001666f6f626172'.decode('hex')
diff --git a/test/test_tcp.py b/test/test_tcp.py
index d5506556..122c1f0f 100644
--- a/test/test_tcp.py
+++ b/test/test_tcp.py
@@ -41,6 +41,18 @@ class HangHandler(tcp.BaseHandler):
time.sleep(1)
+class ALPNHandler(tcp.BaseHandler):
+ sni = None
+
+ def handle(self):
+ alp = self.get_alpn_proto_negotiated()
+ if alp:
+ self.wfile.write("%s" % alp)
+ else:
+ self.wfile.write("NONE")
+ self.wfile.flush()
+
+
class TestServer(test.ServerTestBase):
handler = EchoHandler
@@ -171,6 +183,59 @@ class TestSSLv3Only(test.ServerTestBase):
tutils.raises(tcp.NetLibError, c.convert_to_ssl, sni="foo.com")
+class TestSSLUpstreamCertVerification(test.ServerTestBase):
+ handler = EchoHandler
+
+ ssl = dict(
+ cert=tutils.test_data.path("data/server.crt")
+ )
+
+ def test_mode_default(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+
+ c.convert_to_ssl()
+
+ testval = "echo!\n"
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
+
+ def test_mode_none(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+
+ c.convert_to_ssl(verify_options=SSL.VERIFY_NONE)
+
+ testval = "echo!\n"
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
+
+ def test_mode_strict_w_bad_cert(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+
+ tutils.raises(
+ tcp.NetLibError,
+ c.convert_to_ssl,
+ verify_options=SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+ ca_pemfile=tutils.test_data.path("data/not-server.crt"))
+
+ def test_mode_strict_w_cert(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+
+ c.convert_to_ssl(
+ verify_options=SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+ ca_pemfile=tutils.test_data.path("data/server.crt"))
+
+ testval = "echo!\n"
+ c.wfile.write(testval)
+ c.wfile.flush()
+ assert c.rfile.readline() == testval
+
+
class TestSSLClientCert(test.ServerTestBase):
class handler(tcp.BaseHandler):
@@ -363,25 +428,43 @@ class TestTimeOut(test.ServerTestBase):
tutils.raises(tcp.NetLibTimeout, c.rfile.read, 10)
-class TestALPN(test.ServerTestBase):
- handler = EchoHandler
+class TestALPNClient(test.ServerTestBase):
+ handler = ALPNHandler
ssl = dict(
- alpn_select="foobar"
+ alpn_select="bar"
)
if OpenSSL._util.lib.Cryptography_HAS_ALPN:
def test_alpn(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
- c.convert_to_ssl(alpn_protos=["foobar"])
- assert c.get_alpn_proto_negotiated() == "foobar"
+ c.convert_to_ssl(alpn_protos=["foo", "bar", "fasel"])
+ assert c.get_alpn_proto_negotiated() == "bar"
+ assert c.rfile.readline().strip() == "bar"
+
+ def test_no_alpn(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl()
+ assert c.get_alpn_proto_negotiated() == ""
+ assert c.rfile.readline().strip() == "NONE"
else:
def test_none_alpn(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
- c.convert_to_ssl(alpn_protos=["foobar"])
- assert c.get_alpn_proto_negotiated() == None
+ c.convert_to_ssl(alpn_protos=["foo", "bar", "fasel"])
+ assert c.get_alpn_proto_negotiated() == ""
+ assert c.rfile.readline() == "NONE"
+
+class TestNoSSLNoALPNClient(test.ServerTestBase):
+ handler = ALPNHandler
+
+ def test_no_ssl_no_alpn(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ assert c.get_alpn_proto_negotiated() == ""
+ assert c.rfile.readline().strip() == "NONE"
class TestSSLTimeOut(test.ServerTestBase):