aboutsummaryrefslogtreecommitdiffstats
path: root/test/http
diff options
context:
space:
mode:
Diffstat (limited to 'test/http')
-rw-r--r--test/http/__init__.py0
-rw-r--r--test/http/http1/__init__.py0
-rw-r--r--test/http/http1/test_protocol.py445
-rw-r--r--test/http/http2/__init__.py0
-rw-r--r--test/http/http2/test_frames.py704
-rw-r--r--test/http/http2/test_protocol.py325
-rw-r--r--test/http/test_authentication.py110
-rw-r--r--test/http/test_cookies.py219
-rw-r--r--test/http/test_semantics.py54
-rw-r--r--test/http/test_user_agents.py6
10 files changed, 1863 insertions, 0 deletions
diff --git a/test/http/__init__.py b/test/http/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/http/__init__.py
diff --git a/test/http/http1/__init__.py b/test/http/http1/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/http/http1/__init__.py
diff --git a/test/http/http1/test_protocol.py b/test/http/http1/test_protocol.py
new file mode 100644
index 00000000..05e82831
--- /dev/null
+++ b/test/http/http1/test_protocol.py
@@ -0,0 +1,445 @@
+import cStringIO
+import textwrap
+import binascii
+
+from netlib import http, odict, tcp
+from netlib.http.http1 import protocol
+from ... import tutils, tservers
+
+
+def test_has_chunked_encoding():
+ h = odict.ODictCaseless()
+ assert not protocol.has_chunked_encoding(h)
+ h["transfer-encoding"] = ["chunked"]
+ assert protocol.has_chunked_encoding(h)
+
+
+def test_read_chunked():
+
+ h = odict.ODictCaseless()
+ h["transfer-encoding"] = ["chunked"]
+ s = cStringIO.StringIO("1\r\na\r\n0\r\n")
+
+ tutils.raises(
+ "malformed chunked body",
+ protocol.read_http_body,
+ s, h, None, "GET", None, True
+ )
+
+ s = cStringIO.StringIO("1\r\na\r\n0\r\n\r\n")
+ assert protocol.read_http_body(s, h, None, "GET", None, True) == "a"
+
+ s = cStringIO.StringIO("\r\n\r\n1\r\na\r\n0\r\n\r\n")
+ assert protocol.read_http_body(s, h, None, "GET", None, True) == "a"
+
+ s = cStringIO.StringIO("\r\n")
+ tutils.raises(
+ "closed prematurely",
+ protocol.read_http_body,
+ s, h, None, "GET", None, True
+ )
+
+ s = cStringIO.StringIO("1\r\nfoo")
+ tutils.raises(
+ "malformed chunked body",
+ protocol.read_http_body,
+ s, h, None, "GET", None, True
+ )
+
+ s = cStringIO.StringIO("foo\r\nfoo")
+ tutils.raises(
+ protocol.HttpError,
+ protocol.read_http_body,
+ s, h, None, "GET", None, True
+ )
+
+ s = cStringIO.StringIO("5\r\naaaaa\r\n0\r\n\r\n")
+ tutils.raises("too large", protocol.read_http_body, s, h, 2, "GET", None, True)
+
+
+def test_connection_close():
+ h = odict.ODictCaseless()
+ assert protocol.connection_close((1, 0), h)
+ assert not protocol.connection_close((1, 1), h)
+
+ h["connection"] = ["keep-alive"]
+ assert not protocol.connection_close((1, 1), h)
+
+ h["connection"] = ["close"]
+ assert protocol.connection_close((1, 1), h)
+
+
+def test_get_header_tokens():
+ h = odict.ODictCaseless()
+ assert protocol.get_header_tokens(h, "foo") == []
+ h["foo"] = ["bar"]
+ assert protocol.get_header_tokens(h, "foo") == ["bar"]
+ h["foo"] = ["bar, voing"]
+ assert protocol.get_header_tokens(h, "foo") == ["bar", "voing"]
+ h["foo"] = ["bar, voing", "oink"]
+ assert protocol.get_header_tokens(h, "foo") == ["bar", "voing", "oink"]
+
+
+def test_read_http_body_request():
+ h = odict.ODictCaseless()
+ r = cStringIO.StringIO("testing")
+ assert protocol.read_http_body(r, h, None, "GET", None, True) == ""
+
+
+def test_read_http_body_response():
+ h = odict.ODictCaseless()
+ s = tcp.Reader(cStringIO.StringIO("testing"))
+ assert protocol.read_http_body(s, h, None, "GET", 200, False) == "testing"
+
+
+def test_read_http_body():
+ # test default case
+ h = odict.ODictCaseless()
+ h["content-length"] = [7]
+ s = cStringIO.StringIO("testing")
+ assert protocol.read_http_body(s, h, None, "GET", 200, False) == "testing"
+
+ # test content length: invalid header
+ h["content-length"] = ["foo"]
+ s = cStringIO.StringIO("testing")
+ tutils.raises(
+ protocol.HttpError,
+ protocol.read_http_body,
+ s, h, None, "GET", 200, False
+ )
+
+ # test content length: invalid header #2
+ h["content-length"] = [-1]
+ s = cStringIO.StringIO("testing")
+ tutils.raises(
+ protocol.HttpError,
+ protocol.read_http_body,
+ s, h, None, "GET", 200, False
+ )
+
+ # test content length: content length > actual content
+ h["content-length"] = [5]
+ s = cStringIO.StringIO("testing")
+ tutils.raises(
+ protocol.HttpError,
+ protocol.read_http_body,
+ s, h, 4, "GET", 200, False
+ )
+
+ # test content length: content length < actual content
+ s = cStringIO.StringIO("testing")
+ assert len(protocol.read_http_body(s, h, None, "GET", 200, False)) == 5
+
+ # test no content length: limit > actual content
+ h = odict.ODictCaseless()
+ s = tcp.Reader(cStringIO.StringIO("testing"))
+ assert len(protocol.read_http_body(s, h, 100, "GET", 200, False)) == 7
+
+ # test no content length: limit < actual content
+ s = tcp.Reader(cStringIO.StringIO("testing"))
+ tutils.raises(
+ protocol.HttpError,
+ protocol.read_http_body,
+ s, h, 4, "GET", 200, False
+ )
+
+ # test chunked
+ h = odict.ODictCaseless()
+ h["transfer-encoding"] = ["chunked"]
+ s = tcp.Reader(cStringIO.StringIO("5\r\naaaaa\r\n0\r\n\r\n"))
+ assert protocol.read_http_body(s, h, 100, "GET", 200, False) == "aaaaa"
+
+
+def test_expected_http_body_size():
+ # gibber in the content-length field
+ h = odict.ODictCaseless()
+ h["content-length"] = ["foo"]
+ assert protocol.expected_http_body_size(h, False, "GET", 200) is None
+ # negative number in the content-length field
+ h = odict.ODictCaseless()
+ h["content-length"] = ["-7"]
+ assert protocol.expected_http_body_size(h, False, "GET", 200) is None
+ # explicit length
+ h = odict.ODictCaseless()
+ h["content-length"] = ["5"]
+ assert protocol.expected_http_body_size(h, False, "GET", 200) == 5
+ # no length
+ h = odict.ODictCaseless()
+ assert protocol.expected_http_body_size(h, False, "GET", 200) == -1
+ # no length request
+ h = odict.ODictCaseless()
+ assert protocol.expected_http_body_size(h, True, "GET", None) == 0
+
+
+def test_parse_http_protocol():
+ assert protocol.parse_http_protocol("HTTP/1.1") == (1, 1)
+ assert protocol.parse_http_protocol("HTTP/0.0") == (0, 0)
+ assert not protocol.parse_http_protocol("HTTP/a.1")
+ assert not protocol.parse_http_protocol("HTTP/1.a")
+ assert not protocol.parse_http_protocol("foo/0.0")
+ assert not protocol.parse_http_protocol("HTTP/x")
+
+
+def test_parse_init_connect():
+ assert protocol.parse_init_connect("CONNECT host.com:443 HTTP/1.0")
+ assert not protocol.parse_init_connect("C\xfeONNECT host.com:443 HTTP/1.0")
+ assert not protocol.parse_init_connect("CONNECT \0host.com:443 HTTP/1.0")
+ assert not protocol.parse_init_connect("CONNECT host.com:444444 HTTP/1.0")
+ assert not protocol.parse_init_connect("bogus")
+ assert not protocol.parse_init_connect("GET host.com:443 HTTP/1.0")
+ assert not protocol.parse_init_connect("CONNECT host.com443 HTTP/1.0")
+ assert not protocol.parse_init_connect("CONNECT host.com:443 foo/1.0")
+ assert not protocol.parse_init_connect("CONNECT host.com:foo HTTP/1.0")
+
+
+def test_parse_init_proxy():
+ u = "GET http://foo.com:8888/test HTTP/1.1"
+ m, s, h, po, pa, httpversion = protocol.parse_init_proxy(u)
+ assert m == "GET"
+ assert s == "http"
+ assert h == "foo.com"
+ assert po == 8888
+ assert pa == "/test"
+ assert httpversion == (1, 1)
+
+ u = "G\xfeET http://foo.com:8888/test HTTP/1.1"
+ assert not protocol.parse_init_proxy(u)
+
+ assert not protocol.parse_init_proxy("invalid")
+ assert not protocol.parse_init_proxy("GET invalid HTTP/1.1")
+ assert not protocol.parse_init_proxy("GET http://foo.com:8888/test foo/1.1")
+
+
+def test_parse_init_http():
+ u = "GET /test HTTP/1.1"
+ m, u, httpversion = protocol.parse_init_http(u)
+ assert m == "GET"
+ assert u == "/test"
+ assert httpversion == (1, 1)
+
+ u = "G\xfeET /test HTTP/1.1"
+ assert not protocol.parse_init_http(u)
+
+ assert not protocol.parse_init_http("invalid")
+ assert not protocol.parse_init_http("GET invalid HTTP/1.1")
+ assert not protocol.parse_init_http("GET /test foo/1.1")
+ assert not protocol.parse_init_http("GET /test\xc0 HTTP/1.1")
+
+
+class TestReadHeaders:
+
+ def _read(self, data, verbatim=False):
+ if not verbatim:
+ data = textwrap.dedent(data)
+ data = data.strip()
+ s = cStringIO.StringIO(data)
+ return protocol.read_headers(s)
+
+ def test_read_simple(self):
+ data = """
+ Header: one
+ Header2: two
+ \r\n
+ """
+ h = self._read(data)
+ assert h.lst == [["Header", "one"], ["Header2", "two"]]
+
+ def test_read_multi(self):
+ data = """
+ Header: one
+ Header: two
+ \r\n
+ """
+ h = self._read(data)
+ assert h.lst == [["Header", "one"], ["Header", "two"]]
+
+ def test_read_continued(self):
+ data = """
+ Header: one
+ \ttwo
+ Header2: three
+ \r\n
+ """
+ h = self._read(data)
+ assert h.lst == [["Header", "one\r\n two"], ["Header2", "three"]]
+
+ def test_read_continued_err(self):
+ data = "\tfoo: bar\r\n"
+ assert self._read(data, True) is None
+
+ def test_read_err(self):
+ data = """
+ foo
+ """
+ assert self._read(data) is None
+
+
+class NoContentLengthHTTPHandler(tcp.BaseHandler):
+
+ def handle(self):
+ self.wfile.write("HTTP/1.1 200 OK\r\n\r\nbar\r\n\r\n")
+ self.wfile.flush()
+
+
+class TestReadResponseNoContentLength(tservers.ServerTestBase):
+ handler = NoContentLengthHTTPHandler
+
+ def test_no_content_length(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ resp = protocol.read_response(c.rfile, "GET", None)
+ assert resp.content == "bar\r\n\r\n"
+
+
+def test_read_response():
+ def tst(data, method, limit, include_body=True):
+ data = textwrap.dedent(data)
+ r = cStringIO.StringIO(data)
+ return protocol.read_response(
+ r, method, limit, include_body=include_body
+ )
+
+ tutils.raises("server disconnect", tst, "", "GET", None)
+ tutils.raises("invalid server response", tst, "foo", "GET", None)
+ data = """
+ HTTP/1.1 200 OK
+ """
+ assert tst(data, "GET", None) == http.Response(
+ (1, 1), 200, 'OK', odict.ODictCaseless(), ''
+ )
+ data = """
+ HTTP/1.1 200
+ """
+ assert tst(data, "GET", None) == http.Response(
+ (1, 1), 200, '', odict.ODictCaseless(), ''
+ )
+ data = """
+ HTTP/x 200 OK
+ """
+ tutils.raises("invalid http version", tst, data, "GET", None)
+ data = """
+ HTTP/1.1 xx OK
+ """
+ tutils.raises("invalid server response", tst, data, "GET", None)
+
+ data = """
+ HTTP/1.1 100 CONTINUE
+
+ HTTP/1.1 200 OK
+ """
+ assert tst(data, "GET", None) == http.Response(
+ (1, 1), 100, 'CONTINUE', odict.ODictCaseless(), ''
+ )
+
+ data = """
+ HTTP/1.1 200 OK
+ Content-Length: 3
+
+ foo
+ """
+ assert tst(data, "GET", None).content == 'foo'
+ assert tst(data, "HEAD", None).content == ''
+
+ data = """
+ HTTP/1.1 200 OK
+ \tContent-Length: 3
+
+ foo
+ """
+ tutils.raises("invalid headers", tst, data, "GET", None)
+
+ data = """
+ HTTP/1.1 200 OK
+ Content-Length: 3
+
+ foo
+ """
+ assert tst(data, "GET", None, include_body=False).content is None
+
+
+def test_parse_http_basic_auth():
+ vals = ("basic", "foo", "bar")
+ assert protocol.parse_http_basic_auth(
+ protocol.assemble_http_basic_auth(*vals)
+ ) == vals
+ assert not protocol.parse_http_basic_auth("")
+ assert not protocol.parse_http_basic_auth("foo bar")
+ v = "basic " + binascii.b2a_base64("foo")
+ assert not protocol.parse_http_basic_auth(v)
+
+
+def test_get_request_line():
+ r = cStringIO.StringIO("\nfoo")
+ assert protocol.get_request_line(r) == "foo"
+ assert not protocol.get_request_line(r)
+
+
+class TestReadRequest():
+
+ def tst(self, data, **kwargs):
+ r = cStringIO.StringIO(data)
+ return protocol.read_request(r, **kwargs)
+
+ def test_invalid(self):
+ tutils.raises(
+ "bad http request",
+ self.tst,
+ "xxx"
+ )
+ tutils.raises(
+ "bad http request line",
+ self.tst,
+ "get /\xff HTTP/1.1"
+ )
+ tutils.raises(
+ "invalid headers",
+ self.tst,
+ "get / HTTP/1.1\r\nfoo"
+ )
+ tutils.raises(
+ tcp.NetLibDisconnect,
+ self.tst,
+ "\r\n"
+ )
+
+ def test_asterisk_form_in(self):
+ v = self.tst("OPTIONS * HTTP/1.1")
+ assert v.form_in == "relative"
+ assert v.method == "OPTIONS"
+
+ def test_absolute_form_in(self):
+ tutils.raises(
+ "Bad HTTP request line",
+ self.tst,
+ "GET oops-no-protocol.com HTTP/1.1"
+ )
+ v = self.tst("GET http://address:22/ HTTP/1.1")
+ assert v.form_in == "absolute"
+ assert v.port == 22
+ assert v.host == "address"
+ assert v.scheme == "http"
+
+ def test_connect(self):
+ tutils.raises(
+ "Bad HTTP request line",
+ self.tst,
+ "CONNECT oops-no-port.com HTTP/1.1"
+ )
+ v = self.tst("CONNECT foo.com:443 HTTP/1.1")
+ assert v.form_in == "authority"
+ assert v.method == "CONNECT"
+ assert v.port == 443
+ assert v.host == "foo.com"
+
+ def test_expect(self):
+ w = cStringIO.StringIO()
+ r = cStringIO.StringIO(
+ "GET / HTTP/1.1\r\n"
+ "Content-Length: 3\r\n"
+ "Expect: 100-continue\r\n\r\n"
+ "foobar",
+ )
+ v = protocol.read_request(r, wfile=w)
+ assert w.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
+ assert v.content == "foo"
+ assert r.read(3) == "bar"
diff --git a/test/http/http2/__init__.py b/test/http/http2/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/http/http2/__init__.py
diff --git a/test/http/http2/test_frames.py b/test/http/http2/test_frames.py
new file mode 100644
index 00000000..ee2edc39
--- /dev/null
+++ b/test/http/http2/test_frames.py
@@ -0,0 +1,704 @@
+import cStringIO
+from test import tutils
+from nose.tools import assert_equal
+from netlib import tcp
+from netlib.http.http2.frame import *
+
+
+def hex_to_file(data):
+ data = data.decode('hex')
+ return tcp.Reader(cStringIO.StringIO(data))
+
+
+def test_invalid_flags():
+ tutils.raises(
+ ValueError,
+ DataFrame,
+ flags=ContinuationFrame.FLAG_END_HEADERS,
+ stream_id=0x1234567,
+ payload='foobar')
+
+
+def test_frame_equality():
+ a = DataFrame(
+ length=6,
+ flags=Frame.FLAG_END_STREAM,
+ stream_id=0x1234567,
+ payload='foobar')
+ b = DataFrame(
+ length=6,
+ flags=Frame.FLAG_END_STREAM,
+ stream_id=0x1234567,
+ payload='foobar')
+ assert_equal(a, b)
+
+
+def test_too_large_frames():
+ f = DataFrame(
+ length=9000,
+ flags=Frame.FLAG_END_STREAM,
+ stream_id=0x1234567,
+ payload='foobar' * 3000)
+ tutils.raises(FrameSizeError, f.to_bytes)
+
+
+def test_data_frame_to_bytes():
+ f = DataFrame(
+ length=6,
+ flags=Frame.FLAG_END_STREAM,
+ stream_id=0x1234567,
+ payload='foobar')
+ assert_equal(f.to_bytes().encode('hex'), '000006000101234567666f6f626172')
+
+ f = DataFrame(
+ length=11,
+ flags=(Frame.FLAG_END_STREAM | Frame.FLAG_PADDED),
+ stream_id=0x1234567,
+ payload='foobar',
+ pad_length=3)
+ assert_equal(
+ f.to_bytes().encode('hex'),
+ '00000a00090123456703666f6f626172000000')
+
+ f = DataFrame(
+ length=6,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x0,
+ payload='foobar')
+ tutils.raises(ValueError, f.to_bytes)
+
+
+def test_data_frame_from_bytes():
+ f = Frame.from_file(hex_to_file('000006000101234567666f6f626172'))
+ assert isinstance(f, DataFrame)
+ assert_equal(f.length, 6)
+ assert_equal(f.TYPE, DataFrame.TYPE)
+ assert_equal(f.flags, Frame.FLAG_END_STREAM)
+ assert_equal(f.stream_id, 0x1234567)
+ assert_equal(f.payload, 'foobar')
+
+ f = Frame.from_file(hex_to_file('00000a00090123456703666f6f626172000000'))
+ assert isinstance(f, DataFrame)
+ assert_equal(f.length, 10)
+ assert_equal(f.TYPE, DataFrame.TYPE)
+ assert_equal(f.flags, Frame.FLAG_END_STREAM | Frame.FLAG_PADDED)
+ assert_equal(f.stream_id, 0x1234567)
+ assert_equal(f.payload, 'foobar')
+
+
+def test_data_frame_human_readable():
+ f = DataFrame(
+ length=11,
+ flags=(Frame.FLAG_END_STREAM | Frame.FLAG_PADDED),
+ stream_id=0x1234567,
+ payload='foobar',
+ pad_length=3)
+ assert f.human_readable()
+
+
+def test_headers_frame_to_bytes():
+ f = HeadersFrame(
+ length=6,
+ flags=(Frame.FLAG_NO_FLAGS),
+ stream_id=0x1234567,
+ header_block_fragment='668594e75e31d9'.decode('hex'))
+ assert_equal(f.to_bytes().encode('hex'), '000007010001234567668594e75e31d9')
+
+ f = HeadersFrame(
+ length=10,
+ flags=(HeadersFrame.FLAG_PADDED),
+ stream_id=0x1234567,
+ header_block_fragment='668594e75e31d9'.decode('hex'),
+ pad_length=3)
+ assert_equal(
+ f.to_bytes().encode('hex'),
+ '00000b01080123456703668594e75e31d9000000')
+
+ f = HeadersFrame(
+ length=10,
+ flags=(HeadersFrame.FLAG_PRIORITY),
+ stream_id=0x1234567,
+ header_block_fragment='668594e75e31d9'.decode('hex'),
+ exclusive=True,
+ stream_dependency=0x7654321,
+ weight=42)
+ assert_equal(
+ f.to_bytes().encode('hex'),
+ '00000c012001234567876543212a668594e75e31d9')
+
+ f = HeadersFrame(
+ length=14,
+ flags=(HeadersFrame.FLAG_PADDED | HeadersFrame.FLAG_PRIORITY),
+ stream_id=0x1234567,
+ header_block_fragment='668594e75e31d9'.decode('hex'),
+ pad_length=3,
+ exclusive=True,
+ stream_dependency=0x7654321,
+ weight=42)
+ assert_equal(
+ f.to_bytes().encode('hex'),
+ '00001001280123456703876543212a668594e75e31d9000000')
+
+ f = HeadersFrame(
+ length=14,
+ flags=(HeadersFrame.FLAG_PADDED | HeadersFrame.FLAG_PRIORITY),
+ stream_id=0x1234567,
+ header_block_fragment='668594e75e31d9'.decode('hex'),
+ pad_length=3,
+ exclusive=False,
+ stream_dependency=0x7654321,
+ weight=42)
+ assert_equal(
+ f.to_bytes().encode('hex'),
+ '00001001280123456703076543212a668594e75e31d9000000')
+
+ f = HeadersFrame(
+ length=6,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x0,
+ header_block_fragment='668594e75e31d9'.decode('hex'))
+ tutils.raises(ValueError, f.to_bytes)
+
+
+def test_headers_frame_from_bytes():
+ f = Frame.from_file(hex_to_file(
+ '000007010001234567668594e75e31d9'))
+ assert isinstance(f, HeadersFrame)
+ assert_equal(f.length, 7)
+ assert_equal(f.TYPE, HeadersFrame.TYPE)
+ assert_equal(f.flags, Frame.FLAG_NO_FLAGS)
+ assert_equal(f.stream_id, 0x1234567)
+ assert_equal(f.header_block_fragment, '668594e75e31d9'.decode('hex'))
+
+ f = Frame.from_file(hex_to_file(
+ '00000b01080123456703668594e75e31d9000000'))
+ assert isinstance(f, HeadersFrame)
+ assert_equal(f.length, 11)
+ assert_equal(f.TYPE, HeadersFrame.TYPE)
+ assert_equal(f.flags, HeadersFrame.FLAG_PADDED)
+ assert_equal(f.stream_id, 0x1234567)
+ assert_equal(f.header_block_fragment, '668594e75e31d9'.decode('hex'))
+
+ f = Frame.from_file(hex_to_file(
+ '00000c012001234567876543212a668594e75e31d9'))
+ assert isinstance(f, HeadersFrame)
+ assert_equal(f.length, 12)
+ assert_equal(f.TYPE, HeadersFrame.TYPE)
+ assert_equal(f.flags, HeadersFrame.FLAG_PRIORITY)
+ assert_equal(f.stream_id, 0x1234567)
+ assert_equal(f.header_block_fragment, '668594e75e31d9'.decode('hex'))
+ assert_equal(f.exclusive, True)
+ assert_equal(f.stream_dependency, 0x7654321)
+ assert_equal(f.weight, 42)
+
+ f = Frame.from_file(hex_to_file(
+ '00001001280123456703876543212a668594e75e31d9000000'))
+ assert isinstance(f, HeadersFrame)
+ assert_equal(f.length, 16)
+ assert_equal(f.TYPE, HeadersFrame.TYPE)
+ assert_equal(f.flags, HeadersFrame.FLAG_PADDED | HeadersFrame.FLAG_PRIORITY)
+ assert_equal(f.stream_id, 0x1234567)
+ assert_equal(f.header_block_fragment, '668594e75e31d9'.decode('hex'))
+ assert_equal(f.exclusive, True)
+ assert_equal(f.stream_dependency, 0x7654321)
+ assert_equal(f.weight, 42)
+
+ f = Frame.from_file(hex_to_file(
+ '00001001280123456703076543212a668594e75e31d9000000'))
+ assert isinstance(f, HeadersFrame)
+ assert_equal(f.length, 16)
+ assert_equal(f.TYPE, HeadersFrame.TYPE)
+ assert_equal(f.flags, HeadersFrame.FLAG_PADDED | HeadersFrame.FLAG_PRIORITY)
+ assert_equal(f.stream_id, 0x1234567)
+ assert_equal(f.header_block_fragment, '668594e75e31d9'.decode('hex'))
+ assert_equal(f.exclusive, False)
+ assert_equal(f.stream_dependency, 0x7654321)
+ assert_equal(f.weight, 42)
+
+
+def test_headers_frame_human_readable():
+ f = HeadersFrame(
+ length=7,
+ flags=(HeadersFrame.FLAG_PADDED | HeadersFrame.FLAG_PRIORITY),
+ stream_id=0x1234567,
+ header_block_fragment=b'',
+ pad_length=3,
+ exclusive=False,
+ stream_dependency=0x7654321,
+ weight=42)
+ assert f.human_readable()
+
+ f = HeadersFrame(
+ length=14,
+ flags=(HeadersFrame.FLAG_PADDED | HeadersFrame.FLAG_PRIORITY),
+ stream_id=0x1234567,
+ header_block_fragment='668594e75e31d9'.decode('hex'),
+ pad_length=3,
+ exclusive=False,
+ stream_dependency=0x7654321,
+ weight=42)
+ assert f.human_readable()
+
+
+def test_priority_frame_to_bytes():
+ f = PriorityFrame(
+ length=5,
+ flags=(Frame.FLAG_NO_FLAGS),
+ stream_id=0x1234567,
+ exclusive=True,
+ stream_dependency=0x7654321,
+ weight=42)
+ assert_equal(f.to_bytes().encode('hex'), '000005020001234567876543212a')
+
+ f = PriorityFrame(
+ length=5,
+ flags=(Frame.FLAG_NO_FLAGS),
+ stream_id=0x1234567,
+ exclusive=False,
+ stream_dependency=0x7654321,
+ weight=21)
+ assert_equal(f.to_bytes().encode('hex'), '0000050200012345670765432115')
+
+ f = PriorityFrame(
+ length=5,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x0,
+ stream_dependency=0x1234567)
+ tutils.raises(ValueError, f.to_bytes)
+
+ f = PriorityFrame(
+ length=5,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x1234567,
+ stream_dependency=0x0)
+ tutils.raises(ValueError, f.to_bytes)
+
+
+def test_priority_frame_from_bytes():
+ f = Frame.from_file(hex_to_file('000005020001234567876543212a'))
+ assert isinstance(f, PriorityFrame)
+ assert_equal(f.length, 5)
+ assert_equal(f.TYPE, PriorityFrame.TYPE)
+ assert_equal(f.flags, Frame.FLAG_NO_FLAGS)
+ assert_equal(f.stream_id, 0x1234567)
+ assert_equal(f.exclusive, True)
+ assert_equal(f.stream_dependency, 0x7654321)
+ assert_equal(f.weight, 42)
+
+ f = Frame.from_file(hex_to_file('0000050200012345670765432115'))
+ assert isinstance(f, PriorityFrame)
+ assert_equal(f.length, 5)
+ assert_equal(f.TYPE, PriorityFrame.TYPE)
+ assert_equal(f.flags, Frame.FLAG_NO_FLAGS)
+ assert_equal(f.stream_id, 0x1234567)
+ assert_equal(f.exclusive, False)
+ assert_equal(f.stream_dependency, 0x7654321)
+ assert_equal(f.weight, 21)
+
+
+def test_priority_frame_human_readable():
+ f = PriorityFrame(
+ length=5,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x1234567,
+ exclusive=False,
+ stream_dependency=0x7654321,
+ weight=21)
+ assert f.human_readable()
+
+
+def test_rst_stream_frame_to_bytes():
+ f = RstStreamFrame(
+ length=4,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x1234567,
+ error_code=0x7654321)
+ assert_equal(f.to_bytes().encode('hex'), '00000403000123456707654321')
+
+ f = RstStreamFrame(
+ length=4,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x0)
+ tutils.raises(ValueError, f.to_bytes)
+
+
+def test_rst_stream_frame_from_bytes():
+ f = Frame.from_file(hex_to_file('00000403000123456707654321'))
+ assert isinstance(f, RstStreamFrame)
+ assert_equal(f.length, 4)
+ assert_equal(f.TYPE, RstStreamFrame.TYPE)
+ assert_equal(f.flags, Frame.FLAG_NO_FLAGS)
+ assert_equal(f.stream_id, 0x1234567)
+ assert_equal(f.error_code, 0x07654321)
+
+
+def test_rst_stream_frame_human_readable():
+ f = RstStreamFrame(
+ length=4,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x1234567,
+ error_code=0x7654321)
+ assert f.human_readable()
+
+
+def test_settings_frame_to_bytes():
+ f = SettingsFrame(
+ length=0,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x0)
+ assert_equal(f.to_bytes().encode('hex'), '000000040000000000')
+
+ f = SettingsFrame(
+ length=0,
+ flags=SettingsFrame.FLAG_ACK,
+ stream_id=0x0)
+ assert_equal(f.to_bytes().encode('hex'), '000000040100000000')
+
+ f = SettingsFrame(
+ length=6,
+ flags=SettingsFrame.FLAG_ACK,
+ stream_id=0x0,
+ settings={
+ SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH: 1})
+ assert_equal(f.to_bytes().encode('hex'), '000006040100000000000200000001')
+
+ f = SettingsFrame(
+ length=12,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x0,
+ settings={
+ SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH: 1,
+ SettingsFrame.SETTINGS.SETTINGS_MAX_CONCURRENT_STREAMS: 0x12345678})
+ assert_equal(
+ f.to_bytes().encode('hex'),
+ '00000c040000000000000200000001000312345678')
+
+ f = SettingsFrame(
+ length=0,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x1234567)
+ tutils.raises(ValueError, f.to_bytes)
+
+
+def test_settings_frame_from_bytes():
+ f = Frame.from_file(hex_to_file('000000040000000000'))
+ assert isinstance(f, SettingsFrame)
+ assert_equal(f.length, 0)
+ assert_equal(f.TYPE, SettingsFrame.TYPE)
+ assert_equal(f.flags, Frame.FLAG_NO_FLAGS)
+ assert_equal(f.stream_id, 0x0)
+
+ f = Frame.from_file(hex_to_file('000000040100000000'))
+ assert isinstance(f, SettingsFrame)
+ assert_equal(f.length, 0)
+ assert_equal(f.TYPE, SettingsFrame.TYPE)
+ assert_equal(f.flags, SettingsFrame.FLAG_ACK)
+ assert_equal(f.stream_id, 0x0)
+
+ f = Frame.from_file(hex_to_file('000006040100000000000200000001'))
+ assert isinstance(f, SettingsFrame)
+ assert_equal(f.length, 6)
+ assert_equal(f.TYPE, SettingsFrame.TYPE)
+ assert_equal(f.flags, SettingsFrame.FLAG_ACK, 0x0)
+ assert_equal(f.stream_id, 0x0)
+ assert_equal(len(f.settings), 1)
+ assert_equal(f.settings[SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH], 1)
+
+ f = Frame.from_file(hex_to_file(
+ '00000c040000000000000200000001000312345678'))
+ assert isinstance(f, SettingsFrame)
+ assert_equal(f.length, 12)
+ assert_equal(f.TYPE, SettingsFrame.TYPE)
+ assert_equal(f.flags, Frame.FLAG_NO_FLAGS)
+ assert_equal(f.stream_id, 0x0)
+ assert_equal(len(f.settings), 2)
+ assert_equal(f.settings[SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH], 1)
+ assert_equal(
+ f.settings[
+ SettingsFrame.SETTINGS.SETTINGS_MAX_CONCURRENT_STREAMS],
+ 0x12345678)
+
+
+def test_settings_frame_human_readable():
+ f = SettingsFrame(
+ length=12,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x0,
+ settings={})
+ assert f.human_readable()
+
+ f = SettingsFrame(
+ length=12,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x0,
+ settings={
+ SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH: 1,
+ SettingsFrame.SETTINGS.SETTINGS_MAX_CONCURRENT_STREAMS: 0x12345678})
+ assert f.human_readable()
+
+
+def test_push_promise_frame_to_bytes():
+ f = PushPromiseFrame(
+ length=10,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x1234567,
+ promised_stream=0x7654321,
+ header_block_fragment='foobar')
+ assert_equal(
+ f.to_bytes().encode('hex'),
+ '00000a05000123456707654321666f6f626172')
+
+ f = PushPromiseFrame(
+ length=14,
+ flags=HeadersFrame.FLAG_PADDED,
+ stream_id=0x1234567,
+ promised_stream=0x7654321,
+ header_block_fragment='foobar',
+ pad_length=3)
+ assert_equal(
+ f.to_bytes().encode('hex'),
+ '00000e0508012345670307654321666f6f626172000000')
+
+ f = PushPromiseFrame(
+ length=4,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x0,
+ promised_stream=0x1234567)
+ tutils.raises(ValueError, f.to_bytes)
+
+ f = PushPromiseFrame(
+ length=4,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x1234567,
+ promised_stream=0x0)
+ tutils.raises(ValueError, f.to_bytes)
+
+
+def test_push_promise_frame_from_bytes():
+ f = Frame.from_file(hex_to_file('00000a05000123456707654321666f6f626172'))
+ assert isinstance(f, PushPromiseFrame)
+ assert_equal(f.length, 10)
+ assert_equal(f.TYPE, PushPromiseFrame.TYPE)
+ assert_equal(f.flags, Frame.FLAG_NO_FLAGS)
+ assert_equal(f.stream_id, 0x1234567)
+ assert_equal(f.header_block_fragment, 'foobar')
+
+ f = Frame.from_file(hex_to_file(
+ '00000e0508012345670307654321666f6f626172000000'))
+ assert isinstance(f, PushPromiseFrame)
+ assert_equal(f.length, 14)
+ assert_equal(f.TYPE, PushPromiseFrame.TYPE)
+ assert_equal(f.flags, PushPromiseFrame.FLAG_PADDED)
+ assert_equal(f.stream_id, 0x1234567)
+ assert_equal(f.header_block_fragment, 'foobar')
+
+
+def test_push_promise_frame_human_readable():
+ f = PushPromiseFrame(
+ length=14,
+ flags=HeadersFrame.FLAG_PADDED,
+ stream_id=0x1234567,
+ promised_stream=0x7654321,
+ header_block_fragment='foobar',
+ pad_length=3)
+ assert f.human_readable()
+
+
+def test_ping_frame_to_bytes():
+ f = PingFrame(
+ length=8,
+ flags=PingFrame.FLAG_ACK,
+ stream_id=0x0,
+ payload=b'foobar')
+ assert_equal(
+ f.to_bytes().encode('hex'),
+ '000008060100000000666f6f6261720000')
+
+ f = PingFrame(
+ length=8,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x0,
+ payload=b'foobardeadbeef')
+ assert_equal(
+ f.to_bytes().encode('hex'),
+ '000008060000000000666f6f6261726465')
+
+ f = PingFrame(
+ length=8,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x1234567)
+ tutils.raises(ValueError, f.to_bytes)
+
+
+def test_ping_frame_from_bytes():
+ f = Frame.from_file(hex_to_file('000008060100000000666f6f6261720000'))
+ assert isinstance(f, PingFrame)
+ assert_equal(f.length, 8)
+ assert_equal(f.TYPE, PingFrame.TYPE)
+ assert_equal(f.flags, PingFrame.FLAG_ACK)
+ assert_equal(f.stream_id, 0x0)
+ assert_equal(f.payload, b'foobar\0\0')
+
+ f = Frame.from_file(hex_to_file('000008060000000000666f6f6261726465'))
+ assert isinstance(f, PingFrame)
+ assert_equal(f.length, 8)
+ assert_equal(f.TYPE, PingFrame.TYPE)
+ assert_equal(f.flags, Frame.FLAG_NO_FLAGS)
+ assert_equal(f.stream_id, 0x0)
+ assert_equal(f.payload, b'foobarde')
+
+
+def test_ping_frame_human_readable():
+ f = PingFrame(
+ length=8,
+ flags=PingFrame.FLAG_ACK,
+ stream_id=0x0,
+ payload=b'foobar')
+ assert f.human_readable()
+
+
+def test_goaway_frame_to_bytes():
+ f = GoAwayFrame(
+ length=8,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x0,
+ last_stream=0x1234567,
+ error_code=0x87654321,
+ data=b'')
+ assert_equal(
+ f.to_bytes().encode('hex'),
+ '0000080700000000000123456787654321')
+
+ f = GoAwayFrame(
+ length=14,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x0,
+ last_stream=0x1234567,
+ error_code=0x87654321,
+ data=b'foobar')
+ assert_equal(
+ f.to_bytes().encode('hex'),
+ '00000e0700000000000123456787654321666f6f626172')
+
+ f = GoAwayFrame(
+ length=8,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x1234567,
+ last_stream=0x1234567,
+ error_code=0x87654321)
+ tutils.raises(ValueError, f.to_bytes)
+
+
+def test_goaway_frame_from_bytes():
+ f = Frame.from_file(hex_to_file(
+ '0000080700000000000123456787654321'))
+ assert isinstance(f, GoAwayFrame)
+ assert_equal(f.length, 8)
+ assert_equal(f.TYPE, GoAwayFrame.TYPE)
+ assert_equal(f.flags, Frame.FLAG_NO_FLAGS)
+ assert_equal(f.stream_id, 0x0)
+ assert_equal(f.last_stream, 0x1234567)
+ assert_equal(f.error_code, 0x87654321)
+ assert_equal(f.data, b'')
+
+ f = Frame.from_file(hex_to_file(
+ '00000e0700000000000123456787654321666f6f626172'))
+ assert isinstance(f, GoAwayFrame)
+ assert_equal(f.length, 14)
+ assert_equal(f.TYPE, GoAwayFrame.TYPE)
+ assert_equal(f.flags, Frame.FLAG_NO_FLAGS)
+ assert_equal(f.stream_id, 0x0)
+ assert_equal(f.last_stream, 0x1234567)
+ assert_equal(f.error_code, 0x87654321)
+ assert_equal(f.data, b'foobar')
+
+
+def test_go_away_frame_human_readable():
+ f = GoAwayFrame(
+ length=14,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x0,
+ last_stream=0x1234567,
+ error_code=0x87654321,
+ data=b'foobar')
+ assert f.human_readable()
+
+
+def test_window_update_frame_to_bytes():
+ f = WindowUpdateFrame(
+ length=4,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x0,
+ window_size_increment=0x1234567)
+ assert_equal(f.to_bytes().encode('hex'), '00000408000000000001234567')
+
+ f = WindowUpdateFrame(
+ length=4,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x1234567,
+ window_size_increment=0x7654321)
+ assert_equal(f.to_bytes().encode('hex'), '00000408000123456707654321')
+
+ f = WindowUpdateFrame(
+ length=4,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x0,
+ window_size_increment=0xdeadbeef)
+ tutils.raises(ValueError, f.to_bytes)
+
+ f = WindowUpdateFrame(4, Frame.FLAG_NO_FLAGS, 0x0, window_size_increment=0)
+ tutils.raises(ValueError, f.to_bytes)
+
+
+def test_window_update_frame_from_bytes():
+ f = Frame.from_file(hex_to_file('00000408000000000001234567'))
+ assert isinstance(f, WindowUpdateFrame)
+ assert_equal(f.length, 4)
+ assert_equal(f.TYPE, WindowUpdateFrame.TYPE)
+ assert_equal(f.flags, Frame.FLAG_NO_FLAGS)
+ assert_equal(f.stream_id, 0x0)
+ assert_equal(f.window_size_increment, 0x1234567)
+
+
+def test_window_update_frame_human_readable():
+ f = WindowUpdateFrame(
+ length=4,
+ flags=Frame.FLAG_NO_FLAGS,
+ stream_id=0x1234567,
+ window_size_increment=0x7654321)
+ assert f.human_readable()
+
+
+def test_continuation_frame_to_bytes():
+ f = ContinuationFrame(
+ length=6,
+ flags=ContinuationFrame.FLAG_END_HEADERS,
+ stream_id=0x1234567,
+ header_block_fragment='foobar')
+ assert_equal(f.to_bytes().encode('hex'), '000006090401234567666f6f626172')
+
+ f = ContinuationFrame(
+ length=6,
+ flags=ContinuationFrame.FLAG_END_HEADERS,
+ stream_id=0x0,
+ header_block_fragment='foobar')
+ tutils.raises(ValueError, f.to_bytes)
+
+
+def test_continuation_frame_from_bytes():
+ f = Frame.from_file(hex_to_file('000006090401234567666f6f626172'))
+ assert isinstance(f, ContinuationFrame)
+ assert_equal(f.length, 6)
+ assert_equal(f.TYPE, ContinuationFrame.TYPE)
+ assert_equal(f.flags, ContinuationFrame.FLAG_END_HEADERS)
+ assert_equal(f.stream_id, 0x1234567)
+ assert_equal(f.header_block_fragment, 'foobar')
+
+
+def test_continuation_frame_human_readable():
+ f = ContinuationFrame(
+ length=6,
+ flags=ContinuationFrame.FLAG_END_HEADERS,
+ stream_id=0x1234567,
+ header_block_fragment='foobar')
+ assert f.human_readable()
diff --git a/test/http/http2/test_protocol.py b/test/http/http2/test_protocol.py
new file mode 100644
index 00000000..f607860e
--- /dev/null
+++ b/test/http/http2/test_protocol.py
@@ -0,0 +1,325 @@
+import OpenSSL
+
+from netlib import tcp
+from netlib.http import http2
+from netlib.http.http2.frame import *
+from ... import tutils, tservers
+
+
+class EchoHandler(tcp.BaseHandler):
+ sni = None
+
+ def handle(self):
+ while True:
+ v = self.rfile.safe_read(1)
+ self.wfile.write(v)
+ self.wfile.flush()
+
+
+class TestCheckALPNMatch(tservers.ServerTestBase):
+ handler = EchoHandler
+ ssl = dict(
+ alpn_select=http2.HTTP2Protocol.ALPN_PROTO_H2,
+ )
+
+ if OpenSSL._util.lib.Cryptography_HAS_ALPN:
+
+ def test_check_alpn(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl(alpn_protos=[http2.HTTP2Protocol.ALPN_PROTO_H2])
+ protocol = http2.HTTP2Protocol(c)
+ assert protocol.check_alpn()
+
+
+class TestCheckALPNMismatch(tservers.ServerTestBase):
+ handler = EchoHandler
+ ssl = dict(
+ alpn_select=None,
+ )
+
+ if OpenSSL._util.lib.Cryptography_HAS_ALPN:
+
+ def test_check_alpn(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl(alpn_protos=[http2.HTTP2Protocol.ALPN_PROTO_H2])
+ protocol = http2.HTTP2Protocol(c)
+ tutils.raises(NotImplementedError, protocol.check_alpn)
+
+
+class TestPerformServerConnectionPreface(tservers.ServerTestBase):
+ class handler(tcp.BaseHandler):
+
+ def handle(self):
+ # send magic
+ self.wfile.write(
+ '505249202a20485454502f322e300d0a0d0a534d0d0a0d0a'.decode('hex'))
+ self.wfile.flush()
+
+ # send empty settings frame
+ self.wfile.write('000000040000000000'.decode('hex'))
+ self.wfile.flush()
+
+ # check empty settings frame
+ assert self.rfile.read(9) ==\
+ '000000040000000000'.decode('hex')
+
+ # check settings acknowledgement
+ assert self.rfile.read(9) == \
+ '000000040100000000'.decode('hex')
+
+ # send settings acknowledgement
+ self.wfile.write('000000040100000000'.decode('hex'))
+ self.wfile.flush()
+
+ def test_perform_server_connection_preface(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ protocol = http2.HTTP2Protocol(c)
+ protocol.perform_server_connection_preface()
+
+
+class TestPerformClientConnectionPreface(tservers.ServerTestBase):
+ class handler(tcp.BaseHandler):
+
+ def handle(self):
+ # check magic
+ assert self.rfile.read(24) ==\
+ '505249202a20485454502f322e300d0a0d0a534d0d0a0d0a'.decode('hex')
+
+ # check empty settings frame
+ assert self.rfile.read(9) ==\
+ '000000040000000000'.decode('hex')
+
+ # send empty settings frame
+ self.wfile.write('000000040000000000'.decode('hex'))
+ self.wfile.flush()
+
+ # check settings acknowledgement
+ assert self.rfile.read(9) == \
+ '000000040100000000'.decode('hex')
+
+ # send settings acknowledgement
+ self.wfile.write('000000040100000000'.decode('hex'))
+ self.wfile.flush()
+
+ def test_perform_client_connection_preface(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ protocol = http2.HTTP2Protocol(c)
+ protocol.perform_client_connection_preface()
+
+
+class TestClientStreamIds():
+ c = tcp.TCPClient(("127.0.0.1", 0))
+ protocol = http2.HTTP2Protocol(c)
+
+ def test_client_stream_ids(self):
+ assert self.protocol.current_stream_id is None
+ assert self.protocol.next_stream_id() == 1
+ assert self.protocol.current_stream_id == 1
+ assert self.protocol.next_stream_id() == 3
+ assert self.protocol.current_stream_id == 3
+ assert self.protocol.next_stream_id() == 5
+ assert self.protocol.current_stream_id == 5
+
+
+class TestServerStreamIds():
+ c = tcp.TCPClient(("127.0.0.1", 0))
+ protocol = http2.HTTP2Protocol(c, is_server=True)
+
+ def test_server_stream_ids(self):
+ assert self.protocol.current_stream_id is None
+ assert self.protocol.next_stream_id() == 2
+ assert self.protocol.current_stream_id == 2
+ assert self.protocol.next_stream_id() == 4
+ assert self.protocol.current_stream_id == 4
+ assert self.protocol.next_stream_id() == 6
+ assert self.protocol.current_stream_id == 6
+
+
+class TestApplySettings(tservers.ServerTestBase):
+ class handler(tcp.BaseHandler):
+
+ def handle(self):
+ # check settings acknowledgement
+ assert self.rfile.read(9) == '000000040100000000'.decode('hex')
+ self.wfile.write("OK")
+ self.wfile.flush()
+
+ ssl = True
+
+ def test_apply_settings(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl()
+ protocol = http2.HTTP2Protocol(c)
+
+ protocol._apply_settings({
+ SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH: 'foo',
+ SettingsFrame.SETTINGS.SETTINGS_MAX_CONCURRENT_STREAMS: 'bar',
+ SettingsFrame.SETTINGS.SETTINGS_INITIAL_WINDOW_SIZE: 'deadbeef',
+ })
+
+ assert c.rfile.safe_read(2) == "OK"
+
+ assert protocol.http2_settings[
+ SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH] == 'foo'
+ assert protocol.http2_settings[
+ SettingsFrame.SETTINGS.SETTINGS_MAX_CONCURRENT_STREAMS] == 'bar'
+ assert protocol.http2_settings[
+ SettingsFrame.SETTINGS.SETTINGS_INITIAL_WINDOW_SIZE] == 'deadbeef'
+
+
+class TestCreateHeaders():
+ c = tcp.TCPClient(("127.0.0.1", 0))
+
+ def test_create_headers(self):
+ headers = [
+ (b':method', b'GET'),
+ (b':path', b'index.html'),
+ (b':scheme', b'https'),
+ (b'foo', b'bar')]
+
+ bytes = http2.HTTP2Protocol(self.c)._create_headers(
+ headers, 1, end_stream=True)
+ assert b''.join(bytes) ==\
+ '000014010500000001824488355217caf3a69a3f87408294e7838c767f'\
+ .decode('hex')
+
+ bytes = http2.HTTP2Protocol(self.c)._create_headers(
+ headers, 1, end_stream=False)
+ assert b''.join(bytes) ==\
+ '000014010400000001824488355217caf3a69a3f87408294e7838c767f'\
+ .decode('hex')
+
+ # TODO: add test for too large header_block_fragments
+
+
+class TestCreateBody():
+ c = tcp.TCPClient(("127.0.0.1", 0))
+ protocol = http2.HTTP2Protocol(c)
+
+ def test_create_body_empty(self):
+ bytes = self.protocol._create_body(b'', 1)
+ assert b''.join(bytes) == ''.decode('hex')
+
+ def test_create_body_single_frame(self):
+ bytes = self.protocol._create_body('foobar', 1)
+ assert b''.join(bytes) == '000006000100000001666f6f626172'.decode('hex')
+
+ def test_create_body_multiple_frames(self):
+ pass
+ # bytes = self.protocol._create_body('foobar' * 3000, 1)
+ # TODO: add test for too large frames
+
+
+class TestCreateRequest():
+ c = tcp.TCPClient(("127.0.0.1", 0))
+
+ def test_create_request_simple(self):
+ bytes = http2.HTTP2Protocol(self.c).create_request('GET', '/')
+ assert len(bytes) == 1
+ assert bytes[0] == '00000d0105000000018284874188089d5c0b8170dc07'.decode('hex')
+
+ def test_create_request_with_body(self):
+ bytes = http2.HTTP2Protocol(self.c).create_request(
+ 'GET', '/', [(b'foo', b'bar')], 'foobar')
+ assert len(bytes) == 2
+ assert bytes[0] ==\
+ '0000150104000000018284874188089d5c0b8170dc07408294e7838c767f'.decode('hex')
+ assert bytes[1] ==\
+ '000006000100000001666f6f626172'.decode('hex')
+
+
+class TestReadResponse(tservers.ServerTestBase):
+ class handler(tcp.BaseHandler):
+
+ def handle(self):
+ self.wfile.write(
+ b'00000801040000000188628594e78c767f'.decode('hex'))
+ self.wfile.write(
+ b'000006000100000001666f6f626172'.decode('hex'))
+ self.wfile.flush()
+
+ ssl = True
+
+ def test_read_response(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl()
+ protocol = http2.HTTP2Protocol(c)
+
+ status, headers, body = protocol.read_response()
+
+ assert headers == {':status': '200', 'etag': 'foobar'}
+ assert status == "200"
+ assert body == b'foobar'
+
+
+class TestReadEmptyResponse(tservers.ServerTestBase):
+ class handler(tcp.BaseHandler):
+
+ def handle(self):
+ self.wfile.write(
+ b'00000801050000000188628594e78c767f'.decode('hex'))
+ self.wfile.flush()
+
+ ssl = True
+
+ def test_read_empty_response(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl()
+ protocol = http2.HTTP2Protocol(c)
+
+ status, headers, body = protocol.read_response()
+
+ assert headers == {':status': '200', 'etag': 'foobar'}
+ assert status == "200"
+ assert body == b''
+
+
+class TestReadRequest(tservers.ServerTestBase):
+ class handler(tcp.BaseHandler):
+
+ def handle(self):
+ self.wfile.write(
+ b'000003010400000001828487'.decode('hex'))
+ self.wfile.write(
+ b'000006000100000001666f6f626172'.decode('hex'))
+ self.wfile.flush()
+
+ ssl = True
+
+ def test_read_request(self):
+ c = tcp.TCPClient(("127.0.0.1", self.port))
+ c.connect()
+ c.convert_to_ssl()
+ protocol = http2.HTTP2Protocol(c, is_server=True)
+
+ stream_id, headers, body = protocol.read_request()
+
+ assert stream_id
+ assert headers == {':method': 'GET', ':path': '/', ':scheme': 'https'}
+ assert body == b'foobar'
+
+
+class TestCreateResponse():
+ c = tcp.TCPClient(("127.0.0.1", 0))
+
+ def test_create_response_simple(self):
+ bytes = http2.HTTP2Protocol(self.c, is_server=True).create_response(200)
+ assert len(bytes) == 1
+ assert bytes[0] ==\
+ '00000101050000000288'.decode('hex')
+
+ def test_create_response_with_body(self):
+ bytes = http2.HTTP2Protocol(self.c, is_server=True).create_response(
+ 200, 1, [(b'foo', b'bar')], 'foobar')
+ assert len(bytes) == 2
+ assert bytes[0] ==\
+ '00000901040000000188408294e7838c767f'.decode('hex')
+ assert bytes[1] ==\
+ '000006000100000001666f6f626172'.decode('hex')
diff --git a/test/http/test_authentication.py b/test/http/test_authentication.py
new file mode 100644
index 00000000..c0dae1a2
--- /dev/null
+++ b/test/http/test_authentication.py
@@ -0,0 +1,110 @@
+from netlib import odict, http
+from netlib.http import authentication
+from .. import tutils
+
+
+class TestPassManNonAnon:
+
+ def test_simple(self):
+ p = authentication.PassManNonAnon()
+ assert not p.test("", "")
+ assert p.test("user", "")
+
+
+class TestPassManHtpasswd:
+
+ def test_file_errors(self):
+ tutils.raises(
+ "malformed htpasswd file",
+ authentication.PassManHtpasswd,
+ tutils.test_data.path("data/server.crt"))
+
+ def test_simple(self):
+ pm = authentication.PassManHtpasswd(tutils.test_data.path("data/htpasswd"))
+
+ vals = ("basic", "test", "test")
+ http.http1.assemble_http_basic_auth(*vals)
+ assert pm.test("test", "test")
+ assert not pm.test("test", "foo")
+ assert not pm.test("foo", "test")
+ assert not pm.test("test", "")
+ assert not pm.test("", "")
+
+
+class TestPassManSingleUser:
+
+ def test_simple(self):
+ pm = authentication.PassManSingleUser("test", "test")
+ assert pm.test("test", "test")
+ assert not pm.test("test", "foo")
+ assert not pm.test("foo", "test")
+
+
+class TestNullProxyAuth:
+
+ def test_simple(self):
+ na = authentication.NullProxyAuth(authentication.PassManNonAnon())
+ assert not na.auth_challenge_headers()
+ assert na.authenticate("foo")
+ na.clean({})
+
+
+class TestBasicProxyAuth:
+
+ def test_simple(self):
+ ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
+ h = odict.ODictCaseless()
+ assert ba.auth_challenge_headers()
+ assert not ba.authenticate(h)
+
+ def test_authenticate_clean(self):
+ ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
+
+ hdrs = odict.ODictCaseless()
+ vals = ("basic", "foo", "bar")
+ hdrs[ba.AUTH_HEADER] = [http.http1.assemble_http_basic_auth(*vals)]
+ assert ba.authenticate(hdrs)
+
+ ba.clean(hdrs)
+ assert not ba.AUTH_HEADER in hdrs
+
+ hdrs[ba.AUTH_HEADER] = [""]
+ assert not ba.authenticate(hdrs)
+
+ hdrs[ba.AUTH_HEADER] = ["foo"]
+ assert not ba.authenticate(hdrs)
+
+ vals = ("foo", "foo", "bar")
+ hdrs[ba.AUTH_HEADER] = [http.http1.assemble_http_basic_auth(*vals)]
+ assert not ba.authenticate(hdrs)
+
+ ba = authentication.BasicProxyAuth(authentication.PassMan(), "test")
+ vals = ("basic", "foo", "bar")
+ hdrs[ba.AUTH_HEADER] = [http.http1.assemble_http_basic_auth(*vals)]
+ assert not ba.authenticate(hdrs)
+
+
+class Bunch:
+ pass
+
+
+class TestAuthAction:
+
+ def test_nonanonymous(self):
+ m = Bunch()
+ aa = authentication.NonanonymousAuthAction(None, "authenticator")
+ aa(None, m, None, None)
+ assert m.authenticator
+
+ def test_singleuser(self):
+ m = Bunch()
+ aa = authentication.SingleuserAuthAction(None, "authenticator")
+ aa(None, m, "foo:bar", None)
+ assert m.authenticator
+ tutils.raises("invalid", aa, None, m, "foo", None)
+
+ def test_httppasswd(self):
+ m = Bunch()
+ aa = authentication.HtpasswdAuthAction(None, "authenticator")
+ aa(None, m, tutils.test_data.path("data/htpasswd"), None)
+ assert m.authenticator
diff --git a/test/http/test_cookies.py b/test/http/test_cookies.py
new file mode 100644
index 00000000..4f99593a
--- /dev/null
+++ b/test/http/test_cookies.py
@@ -0,0 +1,219 @@
+import nose.tools
+
+from netlib.http import cookies
+
+
+def test_read_token():
+ tokens = [
+ [("foo", 0), ("foo", 3)],
+ [("foo", 1), ("oo", 3)],
+ [(" foo", 1), ("foo", 4)],
+ [(" foo;", 1), ("foo", 4)],
+ [(" foo=", 1), ("foo", 4)],
+ [(" foo=bar", 1), ("foo", 4)],
+ ]
+ for q, a in tokens:
+ nose.tools.eq_(cookies._read_token(*q), a)
+
+
+def test_read_quoted_string():
+ tokens = [
+ [('"foo" x', 0), ("foo", 5)],
+ [('"f\oo" x', 0), ("foo", 6)],
+ [(r'"f\\o" x', 0), (r"f\o", 6)],
+ [(r'"f\\" x', 0), (r"f" + '\\', 5)],
+ [('"fo\\\"" x', 0), ("fo\"", 6)],
+ ]
+ for q, a in tokens:
+ nose.tools.eq_(cookies._read_quoted_string(*q), a)
+
+
+def test_read_pairs():
+ vals = [
+ [
+ "one",
+ [["one", None]]
+ ],
+ [
+ "one=two",
+ [["one", "two"]]
+ ],
+ [
+ "one=",
+ [["one", ""]]
+ ],
+ [
+ 'one="two"',
+ [["one", "two"]]
+ ],
+ [
+ 'one="two"; three=four',
+ [["one", "two"], ["three", "four"]]
+ ],
+ [
+ 'one="two"; three=four; five',
+ [["one", "two"], ["three", "four"], ["five", None]]
+ ],
+ [
+ 'one="\\"two"; three=four',
+ [["one", '"two'], ["three", "four"]]
+ ],
+ ]
+ for s, lst in vals:
+ ret, off = cookies._read_pairs(s)
+ nose.tools.eq_(ret, lst)
+
+
+def test_pairs_roundtrips():
+ pairs = [
+ [
+ "",
+ []
+ ],
+ [
+ "one=uno",
+ [["one", "uno"]]
+ ],
+ [
+ "one",
+ [["one", None]]
+ ],
+ [
+ "one=uno; two=due",
+ [["one", "uno"], ["two", "due"]]
+ ],
+ [
+ 'one="uno"; two="\due"',
+ [["one", "uno"], ["two", "due"]]
+ ],
+ [
+ 'one="un\\"o"',
+ [["one", 'un"o']]
+ ],
+ [
+ 'one="uno,due"',
+ [["one", 'uno,due']]
+ ],
+ [
+ "one=uno; two; three=tre",
+ [["one", "uno"], ["two", None], ["three", "tre"]]
+ ],
+ [
+ "_lvs2=zHai1+Hq+Tc2vmc2r4GAbdOI5Jopg3EwsdUT9g=; "
+ "_rcc2=53VdltWl+Ov6ordflA==;",
+ [
+ ["_lvs2", "zHai1+Hq+Tc2vmc2r4GAbdOI5Jopg3EwsdUT9g="],
+ ["_rcc2", "53VdltWl+Ov6ordflA=="]
+ ]
+ ]
+ ]
+ for s, lst in pairs:
+ ret, off = cookies._read_pairs(s)
+ nose.tools.eq_(ret, lst)
+ s2 = cookies._format_pairs(lst)
+ ret, off = cookies._read_pairs(s2)
+ nose.tools.eq_(ret, lst)
+
+
+def test_cookie_roundtrips():
+ pairs = [
+ [
+ "one=uno",
+ [["one", "uno"]]
+ ],
+ [
+ "one=uno; two=due",
+ [["one", "uno"], ["two", "due"]]
+ ],
+ ]
+ for s, lst in pairs:
+ ret = cookies.parse_cookie_header(s)
+ nose.tools.eq_(ret.lst, lst)
+ s2 = cookies.format_cookie_header(ret)
+ ret = cookies.parse_cookie_header(s2)
+ nose.tools.eq_(ret.lst, lst)
+
+
+def test_parse_set_cookie_pairs():
+ pairs = [
+ [
+ "one=uno",
+ [
+ ["one", "uno"]
+ ]
+ ],
+ [
+ "one=un\x20",
+ [
+ ["one", "un\x20"]
+ ]
+ ],
+ [
+ "one=uno; foo",
+ [
+ ["one", "uno"],
+ ["foo", None]
+ ]
+ ],
+ [
+ "mun=1.390.f60; "
+ "expires=sun, 11-oct-2015 12:38:31 gmt; path=/; "
+ "domain=b.aol.com",
+ [
+ ["mun", "1.390.f60"],
+ ["expires", "sun, 11-oct-2015 12:38:31 gmt"],
+ ["path", "/"],
+ ["domain", "b.aol.com"]
+ ]
+ ],
+ [
+ r'rpb=190%3d1%2616726%3d1%2634832%3d1%2634874%3d1; '
+ 'domain=.rubiconproject.com; '
+ 'expires=mon, 11-may-2015 21:54:57 gmt; '
+ 'path=/',
+ [
+ ['rpb', r'190%3d1%2616726%3d1%2634832%3d1%2634874%3d1'],
+ ['domain', '.rubiconproject.com'],
+ ['expires', 'mon, 11-may-2015 21:54:57 gmt'],
+ ['path', '/']
+ ]
+ ],
+ ]
+ for s, lst in pairs:
+ ret = cookies._parse_set_cookie_pairs(s)
+ nose.tools.eq_(ret, lst)
+ s2 = cookies._format_set_cookie_pairs(ret)
+ ret2 = cookies._parse_set_cookie_pairs(s2)
+ nose.tools.eq_(ret2, lst)
+
+
+def test_parse_set_cookie_header():
+ vals = [
+ [
+ "", None
+ ],
+ [
+ ";", None
+ ],
+ [
+ "one=uno",
+ ("one", "uno", [])
+ ],
+ [
+ "one=uno; foo=bar",
+ ("one", "uno", [["foo", "bar"]])
+ ]
+ ]
+ for s, expected in vals:
+ ret = cookies.parse_set_cookie_header(s)
+ if expected:
+ assert ret[0] == expected[0]
+ assert ret[1] == expected[1]
+ nose.tools.eq_(ret[2].lst, expected[2])
+ s2 = cookies.format_set_cookie_header(*ret)
+ ret2 = cookies.parse_set_cookie_header(s2)
+ assert ret2[0] == expected[0]
+ assert ret2[1] == expected[1]
+ nose.tools.eq_(ret2[2].lst, expected[2])
+ else:
+ assert ret is None
diff --git a/test/http/test_semantics.py b/test/http/test_semantics.py
new file mode 100644
index 00000000..c4605302
--- /dev/null
+++ b/test/http/test_semantics.py
@@ -0,0 +1,54 @@
+import cStringIO
+import textwrap
+import binascii
+
+from netlib import http, odict, tcp
+from netlib.http import http1
+from .. import tutils, tservers
+
+def test_httperror():
+ e = http.exceptions.HttpError(404, "Not found")
+ assert str(e)
+
+
+def test_parse_url():
+ assert not http.parse_url("")
+
+ u = "http://foo.com:8888/test"
+ s, h, po, pa = http.parse_url(u)
+ assert s == "http"
+ assert h == "foo.com"
+ assert po == 8888
+ assert pa == "/test"
+
+ s, h, po, pa = http.parse_url("http://foo/bar")
+ assert s == "http"
+ assert h == "foo"
+ assert po == 80
+ assert pa == "/bar"
+
+ s, h, po, pa = http.parse_url("http://user:pass@foo/bar")
+ assert s == "http"
+ assert h == "foo"
+ assert po == 80
+ assert pa == "/bar"
+
+ s, h, po, pa = http.parse_url("http://foo")
+ assert pa == "/"
+
+ s, h, po, pa = http.parse_url("https://foo")
+ assert po == 443
+
+ assert not http.parse_url("https://foo:bar")
+ assert not http.parse_url("https://foo:")
+
+ # Invalid IDNA
+ assert not http.parse_url("http://\xfafoo")
+ # Invalid PATH
+ assert not http.parse_url("http:/\xc6/localhost:56121")
+ # Null byte in host
+ assert not http.parse_url("http://foo\0")
+ # Port out of range
+ assert not http.parse_url("http://foo:999999")
+ # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt
+ assert not http.parse_url('http://lo[calhost')
diff --git a/test/http/test_user_agents.py b/test/http/test_user_agents.py
new file mode 100644
index 00000000..0bf1bba7
--- /dev/null
+++ b/test/http/test_user_agents.py
@@ -0,0 +1,6 @@
+from netlib.http import user_agents
+
+
+def test_get_shortcut():
+ assert user_agents.get_by_shortcut("c")[0] == "chrome"
+ assert not user_agents.get_by_shortcut("_")