diff options
Diffstat (limited to 'test/pathod/test_protocols_http2.py')
| -rw-r--r-- | test/pathod/test_protocols_http2.py | 545 | 
1 files changed, 545 insertions, 0 deletions
diff --git a/test/pathod/test_protocols_http2.py b/test/pathod/test_protocols_http2.py new file mode 100644 index 00000000..e42c2858 --- /dev/null +++ b/test/pathod/test_protocols_http2.py @@ -0,0 +1,545 @@ +import mock +import codecs + +import hyperframe +from netlib import tcp, http +from netlib.tutils import raises +from netlib.exceptions import TcpDisconnect +from netlib.http.http2 import framereader + +from ..netlib import tservers as netlib_tservers + +from pathod.protocols.http2 import HTTP2StateProtocol, TCPHandler + + +class TestTCPHandlerWrapper: +    def test_wrapped(self): +        h = TCPHandler(rfile='foo', wfile='bar') +        p = HTTP2StateProtocol(h) +        assert p.tcp_handler.rfile == 'foo' +        assert p.tcp_handler.wfile == 'bar' + +    def test_direct(self): +        p = HTTP2StateProtocol(rfile='foo', wfile='bar') +        assert isinstance(p.tcp_handler, TCPHandler) +        assert p.tcp_handler.rfile == 'foo' +        assert p.tcp_handler.wfile == 'bar' + + +class EchoHandler(tcp.BaseHandler): +    sni = None + +    def handle(self): +        while True: +            v = self.rfile.safe_read(1) +            self.wfile.write(v) +            self.wfile.flush() + + +class TestProtocol: +    @mock.patch("pathod.protocols.http2.HTTP2StateProtocol.perform_server_connection_preface") +    @mock.patch("pathod.protocols.http2.HTTP2StateProtocol.perform_client_connection_preface") +    def test_perform_connection_preface(self, mock_client_method, mock_server_method): +        protocol = HTTP2StateProtocol(is_server=False) +        protocol.connection_preface_performed = True + +        protocol.perform_connection_preface() +        assert not mock_client_method.called +        assert not mock_server_method.called + +        protocol.perform_connection_preface(force=True) +        assert mock_client_method.called +        assert not mock_server_method.called + +    @mock.patch("pathod.protocols.http2.HTTP2StateProtocol.perform_server_connection_preface") +    @mock.patch("pathod.protocols.http2.HTTP2StateProtocol.perform_client_connection_preface") +    def test_perform_connection_preface_server(self, mock_client_method, mock_server_method): +        protocol = HTTP2StateProtocol(is_server=True) +        protocol.connection_preface_performed = True + +        protocol.perform_connection_preface() +        assert not mock_client_method.called +        assert not mock_server_method.called + +        protocol.perform_connection_preface(force=True) +        assert not mock_client_method.called +        assert mock_server_method.called + + +class TestCheckALPNMatch(netlib_tservers.ServerTestBase): +    handler = EchoHandler +    ssl = dict( +        alpn_select=b'h2', +    ) + +    if tcp.HAS_ALPN: + +        def test_check_alpn(self): +            c = tcp.TCPClient(("127.0.0.1", self.port)) +            with c.connect(): +                c.convert_to_ssl(alpn_protos=[b'h2']) +                protocol = HTTP2StateProtocol(c) +                assert protocol.check_alpn() + + +class TestCheckALPNMismatch(netlib_tservers.ServerTestBase): +    handler = EchoHandler +    ssl = dict( +        alpn_select=None, +    ) + +    if tcp.HAS_ALPN: + +        def test_check_alpn(self): +            c = tcp.TCPClient(("127.0.0.1", self.port)) +            with c.connect(): +                c.convert_to_ssl(alpn_protos=[b'h2']) +                protocol = HTTP2StateProtocol(c) +                with raises(NotImplementedError): +                    protocol.check_alpn() + + +class TestPerformServerConnectionPreface(netlib_tservers.ServerTestBase): +    class handler(tcp.BaseHandler): + +        def handle(self): +            # send magic +            self.wfile.write(codecs.decode('505249202a20485454502f322e300d0a0d0a534d0d0a0d0a', 'hex_codec')) +            self.wfile.flush() + +            # send empty settings frame +            self.wfile.write(codecs.decode('000000040000000000', 'hex_codec')) +            self.wfile.flush() + +            # check empty settings frame +            raw = framereader.http2_read_raw_frame(self.rfile) +            assert raw == codecs.decode('00000c040000000000000200000000000300000001', 'hex_codec') + +            # check settings acknowledgement +            raw = framereader.http2_read_raw_frame(self.rfile) +            assert raw == codecs.decode('000000040100000000', 'hex_codec') + +            # send settings acknowledgement +            self.wfile.write(codecs.decode('000000040100000000', 'hex_codec')) +            self.wfile.flush() + +    def test_perform_server_connection_preface(self): +        c = tcp.TCPClient(("127.0.0.1", self.port)) +        with c.connect(): +            protocol = HTTP2StateProtocol(c) + +            assert not protocol.connection_preface_performed +            protocol.perform_server_connection_preface() +            assert protocol.connection_preface_performed + +            with raises(TcpDisconnect): +                protocol.perform_server_connection_preface(force=True) + + +class TestPerformClientConnectionPreface(netlib_tservers.ServerTestBase): +    class handler(tcp.BaseHandler): + +        def handle(self): +            # check magic +            assert self.rfile.read(24) == HTTP2StateProtocol.CLIENT_CONNECTION_PREFACE + +            # check empty settings frame +            assert self.rfile.read(9) ==\ +                codecs.decode('000000040000000000', 'hex_codec') + +            # send empty settings frame +            self.wfile.write(codecs.decode('000000040000000000', 'hex_codec')) +            self.wfile.flush() + +            # check settings acknowledgement +            assert self.rfile.read(9) == \ +                codecs.decode('000000040100000000', 'hex_codec') + +            # send settings acknowledgement +            self.wfile.write(codecs.decode('000000040100000000', 'hex_codec')) +            self.wfile.flush() + +    def test_perform_client_connection_preface(self): +        c = tcp.TCPClient(("127.0.0.1", self.port)) +        with c.connect(): +            protocol = HTTP2StateProtocol(c) + +            assert not protocol.connection_preface_performed +            protocol.perform_client_connection_preface() +            assert protocol.connection_preface_performed + + +class TestClientStreamIds(object): +    c = tcp.TCPClient(("127.0.0.1", 0)) +    protocol = HTTP2StateProtocol(c) + +    def test_client_stream_ids(self): +        assert self.protocol.current_stream_id is None +        assert self.protocol._next_stream_id() == 1 +        assert self.protocol.current_stream_id == 1 +        assert self.protocol._next_stream_id() == 3 +        assert self.protocol.current_stream_id == 3 +        assert self.protocol._next_stream_id() == 5 +        assert self.protocol.current_stream_id == 5 + + +class TestserverstreamIds(object): +    c = tcp.TCPClient(("127.0.0.1", 0)) +    protocol = HTTP2StateProtocol(c, is_server=True) + +    def test_server_stream_ids(self): +        assert self.protocol.current_stream_id is None +        assert self.protocol._next_stream_id() == 2 +        assert self.protocol.current_stream_id == 2 +        assert self.protocol._next_stream_id() == 4 +        assert self.protocol.current_stream_id == 4 +        assert self.protocol._next_stream_id() == 6 +        assert self.protocol.current_stream_id == 6 + + +class TestApplySettings(netlib_tservers.ServerTestBase): +    class handler(tcp.BaseHandler): +        def handle(self): +            # check settings acknowledgement +            assert self.rfile.read(9) == codecs.decode('000000040100000000', 'hex_codec') +            self.wfile.write("OK") +            self.wfile.flush() +            self.rfile.safe_read(9)  # just to keep the connection alive a bit longer + +    ssl = True + +    def test_apply_settings(self): +        c = tcp.TCPClient(("127.0.0.1", self.port)) +        with c.connect(): +            c.convert_to_ssl() +            protocol = HTTP2StateProtocol(c) + +            protocol._apply_settings({ +                hyperframe.frame.SettingsFrame.ENABLE_PUSH: 'foo', +                hyperframe.frame.SettingsFrame.MAX_CONCURRENT_STREAMS: 'bar', +                hyperframe.frame.SettingsFrame.INITIAL_WINDOW_SIZE: 'deadbeef', +            }) + +            assert c.rfile.safe_read(2) == b"OK" + +            assert protocol.http2_settings[ +                hyperframe.frame.SettingsFrame.ENABLE_PUSH] == 'foo' +            assert protocol.http2_settings[ +                hyperframe.frame.SettingsFrame.MAX_CONCURRENT_STREAMS] == 'bar' +            assert protocol.http2_settings[ +                hyperframe.frame.SettingsFrame.INITIAL_WINDOW_SIZE] == 'deadbeef' + + +class TestCreateHeaders(object): +    c = tcp.TCPClient(("127.0.0.1", 0)) + +    def test_create_headers(self): +        headers = http.Headers([ +            (b':method', b'GET'), +            (b':path', b'index.html'), +            (b':scheme', b'https'), +            (b'foo', b'bar')]) + +        bytes = HTTP2StateProtocol(self.c)._create_headers( +            headers, 1, end_stream=True) +        assert b''.join(bytes) ==\ +            codecs.decode('000014010500000001824488355217caf3a69a3f87408294e7838c767f', 'hex_codec') + +        bytes = HTTP2StateProtocol(self.c)._create_headers( +            headers, 1, end_stream=False) +        assert b''.join(bytes) ==\ +            codecs.decode('000014010400000001824488355217caf3a69a3f87408294e7838c767f', 'hex_codec') + +    def test_create_headers_multiple_frames(self): +        headers = http.Headers([ +            (b':method', b'GET'), +            (b':path', b'/'), +            (b':scheme', b'https'), +            (b'foo', b'bar'), +            (b'server', b'version')]) + +        protocol = HTTP2StateProtocol(self.c) +        protocol.http2_settings[hyperframe.frame.SettingsFrame.MAX_FRAME_SIZE] = 8 +        bytes = protocol._create_headers(headers, 1, end_stream=True) +        assert len(bytes) == 3 +        assert bytes[0] == codecs.decode('000008010100000001828487408294e783', 'hex_codec') +        assert bytes[1] == codecs.decode('0000080900000000018c767f7685ee5b10', 'hex_codec') +        assert bytes[2] == codecs.decode('00000209040000000163d5', 'hex_codec') + + +class TestCreateBody(object): +    c = tcp.TCPClient(("127.0.0.1", 0)) + +    def test_create_body_empty(self): +        protocol = HTTP2StateProtocol(self.c) +        bytes = protocol._create_body(b'', 1) +        assert b''.join(bytes) == b'' + +    def test_create_body_single_frame(self): +        protocol = HTTP2StateProtocol(self.c) +        bytes = protocol._create_body(b'foobar', 1) +        assert b''.join(bytes) == codecs.decode('000006000100000001666f6f626172', 'hex_codec') + +    def test_create_body_multiple_frames(self): +        protocol = HTTP2StateProtocol(self.c) +        protocol.http2_settings[hyperframe.frame.SettingsFrame.MAX_FRAME_SIZE] = 5 +        bytes = protocol._create_body(b'foobarmehm42', 1) +        assert len(bytes) == 3 +        assert bytes[0] == codecs.decode('000005000000000001666f6f6261', 'hex_codec') +        assert bytes[1] == codecs.decode('000005000000000001726d65686d', 'hex_codec') +        assert bytes[2] == codecs.decode('0000020001000000013432', 'hex_codec') + + +class TestReadRequest(netlib_tservers.ServerTestBase): +    class handler(tcp.BaseHandler): + +        def handle(self): +            self.wfile.write( +                codecs.decode('000003010400000001828487', 'hex_codec')) +            self.wfile.write( +                codecs.decode('000006000100000001666f6f626172', 'hex_codec')) +            self.wfile.flush() +            self.rfile.safe_read(9)  # just to keep the connection alive a bit longer + +    ssl = True + +    def test_read_request(self): +        c = tcp.TCPClient(("127.0.0.1", self.port)) +        with c.connect(): +            c.convert_to_ssl() +            protocol = HTTP2StateProtocol(c, is_server=True) +            protocol.connection_preface_performed = True + +            req = protocol.read_request(NotImplemented) + +            assert req.stream_id +            assert req.headers.fields == () +            assert req.method == "GET" +            assert req.path == "/" +            assert req.scheme == "https" +            assert req.content == b'foobar' + + +class TestReadRequestRelative(netlib_tservers.ServerTestBase): +    class handler(tcp.BaseHandler): +        def handle(self): +            self.wfile.write( +                codecs.decode('00000c0105000000014287d5af7e4d5a777f4481f9', 'hex_codec')) +            self.wfile.flush() + +    ssl = True + +    def test_asterisk_form(self): +        c = tcp.TCPClient(("127.0.0.1", self.port)) +        with c.connect(): +            c.convert_to_ssl() +            protocol = HTTP2StateProtocol(c, is_server=True) +            protocol.connection_preface_performed = True + +            req = protocol.read_request(NotImplemented) + +            assert req.first_line_format == "relative" +            assert req.method == "OPTIONS" +            assert req.path == "*" + + +class TestReadRequestAbsolute(netlib_tservers.ServerTestBase): +    class handler(tcp.BaseHandler): +        def handle(self): +            self.wfile.write( +                codecs.decode('00001901050000000182448d9d29aee30c0e492c2a1170426366871c92585422e085', 'hex_codec')) +            self.wfile.flush() + +    ssl = True + +    def test_absolute_form(self): +        c = tcp.TCPClient(("127.0.0.1", self.port)) +        with c.connect(): +            c.convert_to_ssl() +            protocol = HTTP2StateProtocol(c, is_server=True) +            protocol.connection_preface_performed = True + +            req = protocol.read_request(NotImplemented) + +            assert req.first_line_format == "absolute" +            assert req.scheme == "http" +            assert req.host == "address" +            assert req.port == 22 + + +class TestReadRequestConnect(netlib_tservers.ServerTestBase): +    class handler(tcp.BaseHandler): +        def handle(self): +            self.wfile.write( +                codecs.decode('00001b0105000000014287bdab4e9c17b7ff44871c92585422e08541871c92585422e085', 'hex_codec')) +            self.wfile.write( +                codecs.decode('00001d0105000000014287bdab4e9c17b7ff44882f91d35d055c87a741882f91d35d055c87a7', 'hex_codec')) +            self.wfile.flush() + +    ssl = True + +    def test_connect(self): +        c = tcp.TCPClient(("127.0.0.1", self.port)) +        with c.connect(): +            c.convert_to_ssl() +            protocol = HTTP2StateProtocol(c, is_server=True) +            protocol.connection_preface_performed = True + +            req = protocol.read_request(NotImplemented) +            assert req.first_line_format == "authority" +            assert req.method == "CONNECT" +            assert req.host == "address" +            assert req.port == 22 + +            req = protocol.read_request(NotImplemented) +            assert req.first_line_format == "authority" +            assert req.method == "CONNECT" +            assert req.host == "example.com" +            assert req.port == 443 + + +class TestReadResponse(netlib_tservers.ServerTestBase): +    class handler(tcp.BaseHandler): +        def handle(self): +            self.wfile.write( +                codecs.decode('00000801040000002a88628594e78c767f', 'hex_codec')) +            self.wfile.write( +                codecs.decode('00000600010000002a666f6f626172', 'hex_codec')) +            self.wfile.flush() +            self.rfile.safe_read(9)  # just to keep the connection alive a bit longer + +    ssl = True + +    def test_read_response(self): +        c = tcp.TCPClient(("127.0.0.1", self.port)) +        with c.connect(): +            c.convert_to_ssl() +            protocol = HTTP2StateProtocol(c) +            protocol.connection_preface_performed = True + +            resp = protocol.read_response(NotImplemented, stream_id=42) + +            assert resp.http_version == "HTTP/2.0" +            assert resp.status_code == 200 +            assert resp.reason == '' +            assert resp.headers.fields == ((b':status', b'200'), (b'etag', b'foobar')) +            assert resp.content == b'foobar' +            assert resp.timestamp_end + + +class TestReadEmptyResponse(netlib_tservers.ServerTestBase): +    class handler(tcp.BaseHandler): +        def handle(self): +            self.wfile.write( +                codecs.decode('00000801050000002a88628594e78c767f', 'hex_codec')) +            self.wfile.flush() + +    ssl = True + +    def test_read_empty_response(self): +        c = tcp.TCPClient(("127.0.0.1", self.port)) +        with c.connect(): +            c.convert_to_ssl() +            protocol = HTTP2StateProtocol(c) +            protocol.connection_preface_performed = True + +            resp = protocol.read_response(NotImplemented, stream_id=42) + +            assert resp.stream_id == 42 +            assert resp.http_version == "HTTP/2.0" +            assert resp.status_code == 200 +            assert resp.reason == '' +            assert resp.headers.fields == ((b':status', b'200'), (b'etag', b'foobar')) +            assert resp.content == b'' + + +class TestAssembleRequest(object): +    c = tcp.TCPClient(("127.0.0.1", 0)) + +    def test_request_simple(self): +        bytes = HTTP2StateProtocol(self.c).assemble_request(http.Request( +            b'', +            b'GET', +            b'https', +            b'', +            b'', +            b'/', +            b"HTTP/2.0", +            (), +            None, +        )) +        assert len(bytes) == 1 +        assert bytes[0] == codecs.decode('00000d0105000000018284874188089d5c0b8170dc07', 'hex_codec') + +    def test_request_with_stream_id(self): +        req = http.Request( +            b'', +            b'GET', +            b'https', +            b'', +            b'', +            b'/', +            b"HTTP/2.0", +            (), +            None, +        ) +        req.stream_id = 0x42 +        bytes = HTTP2StateProtocol(self.c).assemble_request(req) +        assert len(bytes) == 1 +        assert bytes[0] == codecs.decode('00000d0105000000428284874188089d5c0b8170dc07', 'hex_codec') + +    def test_request_with_body(self): +        bytes = HTTP2StateProtocol(self.c).assemble_request(http.Request( +            b'', +            b'GET', +            b'https', +            b'', +            b'', +            b'/', +            b"HTTP/2.0", +            http.Headers([(b'foo', b'bar')]), +            b'foobar', +        )) +        assert len(bytes) == 2 +        assert bytes[0] ==\ +            codecs.decode('0000150104000000018284874188089d5c0b8170dc07408294e7838c767f', 'hex_codec') +        assert bytes[1] ==\ +            codecs.decode('000006000100000001666f6f626172', 'hex_codec') + + +class TestAssembleResponse(object): +    c = tcp.TCPClient(("127.0.0.1", 0)) + +    def test_simple(self): +        bytes = HTTP2StateProtocol(self.c, is_server=True).assemble_response(http.Response( +            b"HTTP/2.0", +            200, +        )) +        assert len(bytes) == 1 +        assert bytes[0] ==\ +            codecs.decode('00000101050000000288', 'hex_codec') + +    def test_with_stream_id(self): +        resp = http.Response( +            b"HTTP/2.0", +            200, +        ) +        resp.stream_id = 0x42 +        bytes = HTTP2StateProtocol(self.c, is_server=True).assemble_response(resp) +        assert len(bytes) == 1 +        assert bytes[0] ==\ +            codecs.decode('00000101050000004288', 'hex_codec') + +    def test_with_body(self): +        bytes = HTTP2StateProtocol(self.c, is_server=True).assemble_response(http.Response( +            b"HTTP/2.0", +            200, +            b'', +            http.Headers(foo=b"bar"), +            b'foobar' +        )) +        assert len(bytes) == 2 +        assert bytes[0] ==\ +            codecs.decode('00000901040000000288408294e7838c767f', 'hex_codec') +        assert bytes[1] ==\ +            codecs.decode('000006000100000002666f6f626172', 'hex_codec')  | 
