diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/http/__init__.py (renamed from test/http2/__init__.py) | 0 | ||||
-rw-r--r-- | test/http/http1/__init__.py | 0 | ||||
-rw-r--r-- | test/http/http1/test_protocol.py (renamed from test/test_http.py) | 274 | ||||
-rw-r--r-- | test/http/http2/__init__.py | 0 | ||||
-rw-r--r-- | test/http/http2/test_frames.py (renamed from test/http2/test_frames.py) | 2 | ||||
-rw-r--r-- | test/http/http2/test_protocol.py (renamed from test/http2/test_protocol.py) | 38 | ||||
-rw-r--r-- | test/http/test_authentication.py (renamed from test/test_http_auth.py) | 48 | ||||
-rw-r--r-- | test/http/test_cookies.py (renamed from test/test_http_cookies.py) | 32 | ||||
-rw-r--r-- | test/http/test_semantics.py | 54 | ||||
-rw-r--r-- | test/http/test_user_agents.py | 6 | ||||
-rw-r--r-- | test/test_http_uastrings.py | 6 | ||||
-rw-r--r-- | test/websockets/__init__.py | 0 | ||||
-rw-r--r-- | test/websockets/test_websockets.py (renamed from test/test_websockets.py) | 57 |
13 files changed, 276 insertions, 241 deletions
diff --git a/test/http2/__init__.py b/test/http/__init__.py index e69de29b..e69de29b 100644 --- a/test/http2/__init__.py +++ b/test/http/__init__.py diff --git a/test/http/http1/__init__.py b/test/http/http1/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/http/http1/__init__.py diff --git a/test/test_http.py b/test/http/http1/test_protocol.py index 2ad81d24..dcebbd5e 100644 --- a/test/test_http.py +++ b/test/http/http1/test_protocol.py @@ -1,75 +1,78 @@ import cStringIO import textwrap import binascii + from netlib import http, odict, tcp -from . import tutils, tservers +from netlib.http.http1 import HTTP1Protocol +from ... import tutils, tservers + +def mock_protocol(data='', chunked=False): + rfile = cStringIO.StringIO(data) + wfile = cStringIO.StringIO() + return HTTP1Protocol(rfile=rfile, wfile=wfile) -def test_httperror(): - e = http.HttpError(404, "Not found") - assert str(e) def test_has_chunked_encoding(): h = odict.ODictCaseless() - assert not http.has_chunked_encoding(h) + assert not HTTP1Protocol.has_chunked_encoding(h) h["transfer-encoding"] = ["chunked"] - assert http.has_chunked_encoding(h) + assert HTTP1Protocol.has_chunked_encoding(h) def test_read_chunked(): - h = odict.ODictCaseless() h["transfer-encoding"] = ["chunked"] - s = cStringIO.StringIO("1\r\na\r\n0\r\n") + data = "1\r\na\r\n0\r\n" tutils.raises( "malformed chunked body", - http.read_http_body, - s, h, None, "GET", None, True + mock_protocol(data).read_http_body, + h, None, "GET", None, True ) - s = cStringIO.StringIO("1\r\na\r\n0\r\n\r\n") - assert http.read_http_body(s, h, None, "GET", None, True) == "a" + data = "1\r\na\r\n0\r\n\r\n" + assert mock_protocol(data).read_http_body(h, None, "GET", None, True) == "a" - s = cStringIO.StringIO("\r\n\r\n1\r\na\r\n0\r\n\r\n") - assert http.read_http_body(s, h, None, "GET", None, True) == "a" + data = "\r\n\r\n1\r\na\r\n0\r\n\r\n" + assert mock_protocol(data).read_http_body(h, None, "GET", None, True) == "a" - s = cStringIO.StringIO("\r\n") + data = "\r\n" tutils.raises( "closed prematurely", - http.read_http_body, - s, h, None, "GET", None, True + mock_protocol(data).read_http_body, + h, None, "GET", None, True ) - s = cStringIO.StringIO("1\r\nfoo") + data = "1\r\nfoo" tutils.raises( "malformed chunked body", - http.read_http_body, - s, h, None, "GET", None, True + mock_protocol(data).read_http_body, + h, None, "GET", None, True ) - s = cStringIO.StringIO("foo\r\nfoo") + data = "foo\r\nfoo" tutils.raises( http.HttpError, - http.read_http_body, - s, h, None, "GET", None, True + mock_protocol(data).read_http_body, + h, None, "GET", None, True ) - s = cStringIO.StringIO("5\r\naaaaa\r\n0\r\n\r\n") - tutils.raises("too large", http.read_http_body, s, h, 2, "GET", None, True) + data = "5\r\naaaaa\r\n0\r\n\r\n" + tutils.raises("too large", mock_protocol(data).read_http_body, h, 2, "GET", None, True) def test_connection_close(): h = odict.ODictCaseless() - assert http.connection_close((1, 0), h) - assert not http.connection_close((1, 1), h) + assert HTTP1Protocol.connection_close((1, 0), h) + assert not HTTP1Protocol.connection_close((1, 1), h) h["connection"] = ["keep-alive"] - assert not http.connection_close((1, 1), h) + assert not HTTP1Protocol.connection_close((1, 1), h) h["connection"] = ["close"] - assert http.connection_close((1, 1), h) + assert HTTP1Protocol.connection_close((1, 1), h) def test_get_header_tokens(): @@ -85,119 +88,119 @@ def test_get_header_tokens(): def test_read_http_body_request(): h = odict.ODictCaseless() - r = cStringIO.StringIO("testing") - assert http.read_http_body(r, h, None, "GET", None, True) == "" + data = "testing" + assert mock_protocol(data).read_http_body(h, None, "GET", None, True) == "" def test_read_http_body_response(): h = odict.ODictCaseless() - s = tcp.Reader(cStringIO.StringIO("testing")) - assert http.read_http_body(s, h, None, "GET", 200, False) == "testing" + data = "testing" + assert mock_protocol(data, chunked=True).read_http_body(h, None, "GET", 200, False) == "testing" def test_read_http_body(): # test default case h = odict.ODictCaseless() h["content-length"] = [7] - s = cStringIO.StringIO("testing") - assert http.read_http_body(s, h, None, "GET", 200, False) == "testing" + data = "testing" + assert mock_protocol(data).read_http_body(h, None, "GET", 200, False) == "testing" # test content length: invalid header h["content-length"] = ["foo"] - s = cStringIO.StringIO("testing") + data = "testing" tutils.raises( http.HttpError, - http.read_http_body, - s, h, None, "GET", 200, False + mock_protocol(data).read_http_body, + h, None, "GET", 200, False ) # test content length: invalid header #2 h["content-length"] = [-1] - s = cStringIO.StringIO("testing") + data = "testing" tutils.raises( http.HttpError, - http.read_http_body, - s, h, None, "GET", 200, False + mock_protocol(data).read_http_body, + h, None, "GET", 200, False ) # test content length: content length > actual content h["content-length"] = [5] - s = cStringIO.StringIO("testing") + data = "testing" tutils.raises( http.HttpError, - http.read_http_body, - s, h, 4, "GET", 200, False + mock_protocol(data).read_http_body, + h, 4, "GET", 200, False ) # test content length: content length < actual content - s = cStringIO.StringIO("testing") - assert len(http.read_http_body(s, h, None, "GET", 200, False)) == 5 + data = "testing" + assert len(mock_protocol(data).read_http_body(h, None, "GET", 200, False)) == 5 # test no content length: limit > actual content h = odict.ODictCaseless() - s = tcp.Reader(cStringIO.StringIO("testing")) - assert len(http.read_http_body(s, h, 100, "GET", 200, False)) == 7 + data = "testing" + assert len(mock_protocol(data, chunked=True).read_http_body(h, 100, "GET", 200, False)) == 7 # test no content length: limit < actual content - s = tcp.Reader(cStringIO.StringIO("testing")) + data = "testing" tutils.raises( http.HttpError, - http.read_http_body, - s, h, 4, "GET", 200, False + mock_protocol(data, chunked=True).read_http_body, + h, 4, "GET", 200, False ) # test chunked h = odict.ODictCaseless() h["transfer-encoding"] = ["chunked"] - s = tcp.Reader(cStringIO.StringIO("5\r\naaaaa\r\n0\r\n\r\n")) - assert http.read_http_body(s, h, 100, "GET", 200, False) == "aaaaa" + data = "5\r\naaaaa\r\n0\r\n\r\n" + assert mock_protocol(data, chunked=True).read_http_body(h, 100, "GET", 200, False) == "aaaaa" def test_expected_http_body_size(): # gibber in the content-length field h = odict.ODictCaseless() h["content-length"] = ["foo"] - assert http.expected_http_body_size(h, False, "GET", 200) is None + assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) is None # negative number in the content-length field h = odict.ODictCaseless() h["content-length"] = ["-7"] - assert http.expected_http_body_size(h, False, "GET", 200) is None + assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) is None # explicit length h = odict.ODictCaseless() h["content-length"] = ["5"] - assert http.expected_http_body_size(h, False, "GET", 200) == 5 + assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) == 5 # no length h = odict.ODictCaseless() - assert http.expected_http_body_size(h, False, "GET", 200) == -1 + assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) == -1 # no length request h = odict.ODictCaseless() - assert http.expected_http_body_size(h, True, "GET", None) == 0 + assert HTTP1Protocol.expected_http_body_size(h, True, "GET", None) == 0 def test_parse_http_protocol(): - assert http.parse_http_protocol("HTTP/1.1") == (1, 1) - assert http.parse_http_protocol("HTTP/0.0") == (0, 0) - assert not http.parse_http_protocol("HTTP/a.1") - assert not http.parse_http_protocol("HTTP/1.a") - assert not http.parse_http_protocol("foo/0.0") - assert not http.parse_http_protocol("HTTP/x") + assert HTTP1Protocol._parse_http_protocol("HTTP/1.1") == (1, 1) + assert HTTP1Protocol._parse_http_protocol("HTTP/0.0") == (0, 0) + assert not HTTP1Protocol._parse_http_protocol("HTTP/a.1") + assert not HTTP1Protocol._parse_http_protocol("HTTP/1.a") + assert not HTTP1Protocol._parse_http_protocol("foo/0.0") + assert not HTTP1Protocol._parse_http_protocol("HTTP/x") def test_parse_init_connect(): - assert http.parse_init_connect("CONNECT host.com:443 HTTP/1.0") - assert not http.parse_init_connect("C\xfeONNECT host.com:443 HTTP/1.0") - assert not http.parse_init_connect("CONNECT \0host.com:443 HTTP/1.0") - assert not http.parse_init_connect("CONNECT host.com:444444 HTTP/1.0") - assert not http.parse_init_connect("bogus") - assert not http.parse_init_connect("GET host.com:443 HTTP/1.0") - assert not http.parse_init_connect("CONNECT host.com443 HTTP/1.0") - assert not http.parse_init_connect("CONNECT host.com:443 foo/1.0") - assert not http.parse_init_connect("CONNECT host.com:foo HTTP/1.0") + assert HTTP1Protocol._parse_init_connect("CONNECT host.com:443 HTTP/1.0") + assert not HTTP1Protocol._parse_init_connect("C\xfeONNECT host.com:443 HTTP/1.0") + assert not HTTP1Protocol._parse_init_connect("CONNECT \0host.com:443 HTTP/1.0") + assert not HTTP1Protocol._parse_init_connect("CONNECT host.com:444444 HTTP/1.0") + assert not HTTP1Protocol._parse_init_connect("bogus") + assert not HTTP1Protocol._parse_init_connect("GET host.com:443 HTTP/1.0") + assert not HTTP1Protocol._parse_init_connect("CONNECT host.com443 HTTP/1.0") + assert not HTTP1Protocol._parse_init_connect("CONNECT host.com:443 foo/1.0") + assert not HTTP1Protocol._parse_init_connect("CONNECT host.com:foo HTTP/1.0") def test_parse_init_proxy(): u = "GET http://foo.com:8888/test HTTP/1.1" - m, s, h, po, pa, httpversion = http.parse_init_proxy(u) + m, s, h, po, pa, httpversion = HTTP1Protocol._parse_init_proxy(u) assert m == "GET" assert s == "http" assert h == "foo.com" @@ -206,27 +209,27 @@ def test_parse_init_proxy(): assert httpversion == (1, 1) u = "G\xfeET http://foo.com:8888/test HTTP/1.1" - assert not http.parse_init_proxy(u) + assert not HTTP1Protocol._parse_init_proxy(u) - assert not http.parse_init_proxy("invalid") - assert not http.parse_init_proxy("GET invalid HTTP/1.1") - assert not http.parse_init_proxy("GET http://foo.com:8888/test foo/1.1") + assert not HTTP1Protocol._parse_init_proxy("invalid") + assert not HTTP1Protocol._parse_init_proxy("GET invalid HTTP/1.1") + assert not HTTP1Protocol._parse_init_proxy("GET http://foo.com:8888/test foo/1.1") def test_parse_init_http(): u = "GET /test HTTP/1.1" - m, u, httpversion = http.parse_init_http(u) + m, u, httpversion = HTTP1Protocol._parse_init_http(u) assert m == "GET" assert u == "/test" assert httpversion == (1, 1) u = "G\xfeET /test HTTP/1.1" - assert not http.parse_init_http(u) + assert not HTTP1Protocol._parse_init_http(u) - assert not http.parse_init_http("invalid") - assert not http.parse_init_http("GET invalid HTTP/1.1") - assert not http.parse_init_http("GET /test foo/1.1") - assert not http.parse_init_http("GET /test\xc0 HTTP/1.1") + assert not HTTP1Protocol._parse_init_http("invalid") + assert not HTTP1Protocol._parse_init_http("GET invalid HTTP/1.1") + assert not HTTP1Protocol._parse_init_http("GET /test foo/1.1") + assert not HTTP1Protocol._parse_init_http("GET /test\xc0 HTTP/1.1") class TestReadHeaders: @@ -235,8 +238,7 @@ class TestReadHeaders: if not verbatim: data = textwrap.dedent(data) data = data.strip() - s = cStringIO.StringIO(data) - return http.read_headers(s) + return mock_protocol(data).read_headers() def test_read_simple(self): data = """ @@ -290,16 +292,15 @@ class TestReadResponseNoContentLength(tservers.ServerTestBase): def test_no_content_length(self): c = tcp.TCPClient(("127.0.0.1", self.port)) c.connect() - resp = http.read_response(c.rfile, "GET", None) - assert resp.content == "bar\r\n\r\n" + resp = HTTP1Protocol(c).read_response("GET", None) + assert resp.body == "bar\r\n\r\n" def test_read_response(): def tst(data, method, limit, include_body=True): data = textwrap.dedent(data) - r = cStringIO.StringIO(data) - return http.read_response( - r, method, limit, include_body=include_body + return mock_protocol(data).read_response( + method, limit, include_body=include_body ) tutils.raises("server disconnect", tst, "", "GET", None) @@ -307,13 +308,13 @@ def test_read_response(): data = """ HTTP/1.1 200 OK """ - assert tst(data, "GET", None) == ( + assert tst(data, "GET", None) == http.Response( (1, 1), 200, 'OK', odict.ODictCaseless(), '' ) data = """ HTTP/1.1 200 """ - assert tst(data, "GET", None) == ( + assert tst(data, "GET", None) == http.Response( (1, 1), 200, '', odict.ODictCaseless(), '' ) data = """ @@ -330,7 +331,7 @@ def test_read_response(): HTTP/1.1 200 OK """ - assert tst(data, "GET", None) == ( + assert tst(data, "GET", None) == http.Response( (1, 1), 100, 'CONTINUE', odict.ODictCaseless(), '' ) @@ -340,8 +341,8 @@ def test_read_response(): foo """ - assert tst(data, "GET", None)[4] == 'foo' - assert tst(data, "HEAD", None)[4] == '' + assert tst(data, "GET", None).body == 'foo' + assert tst(data, "HEAD", None).body == '' data = """ HTTP/1.1 200 OK @@ -357,74 +358,20 @@ def test_read_response(): foo """ - assert tst(data, "GET", None, include_body=False)[4] is None - - -def test_parse_url(): - assert not http.parse_url("") - - u = "http://foo.com:8888/test" - s, h, po, pa = http.parse_url(u) - assert s == "http" - assert h == "foo.com" - assert po == 8888 - assert pa == "/test" - - s, h, po, pa = http.parse_url("http://foo/bar") - assert s == "http" - assert h == "foo" - assert po == 80 - assert pa == "/bar" - - s, h, po, pa = http.parse_url("http://user:pass@foo/bar") - assert s == "http" - assert h == "foo" - assert po == 80 - assert pa == "/bar" - - s, h, po, pa = http.parse_url("http://foo") - assert pa == "/" - - s, h, po, pa = http.parse_url("https://foo") - assert po == 443 - - assert not http.parse_url("https://foo:bar") - assert not http.parse_url("https://foo:") - - # Invalid IDNA - assert not http.parse_url("http://\xfafoo") - # Invalid PATH - assert not http.parse_url("http:/\xc6/localhost:56121") - # Null byte in host - assert not http.parse_url("http://foo\0") - # Port out of range - assert not http.parse_url("http://foo:999999") - # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt - assert not http.parse_url('http://lo[calhost') - - -def test_parse_http_basic_auth(): - vals = ("basic", "foo", "bar") - assert http.parse_http_basic_auth( - http.assemble_http_basic_auth(*vals) - ) == vals - assert not http.parse_http_basic_auth("") - assert not http.parse_http_basic_auth("foo bar") - v = "basic " + binascii.b2a_base64("foo") - assert not http.parse_http_basic_auth(v) + assert tst(data, "GET", None, include_body=False).body is None def test_get_request_line(): - r = cStringIO.StringIO("\nfoo") - assert http.get_request_line(r) == "foo" - assert not http.get_request_line(r) + data = "\nfoo" + p = mock_protocol(data) + assert p._get_request_line() == "foo" + assert not p._get_request_line() class TestReadRequest(): def tst(self, data, **kwargs): - r = cStringIO.StringIO(data) - return http.read_request(r, **kwargs) + return mock_protocol(data).read_request(**kwargs) def test_invalid(self): tutils.raises( @@ -478,14 +425,15 @@ class TestReadRequest(): assert v.host == "foo.com" def test_expect(self): - w = cStringIO.StringIO() - r = cStringIO.StringIO( + data = "".join( "GET / HTTP/1.1\r\n" "Content-Length: 3\r\n" "Expect: 100-continue\r\n\r\n" - "foobar", + "foobar" ) - v = http.read_request(r, wfile=w) - assert w.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n" - assert v.content == "foo" - assert r.read(3) == "bar" + + p = mock_protocol(data) + v = p.read_request() + assert p.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n" + assert v.body == "foo" + assert p.tcp_handler.rfile.read(3) == "bar" diff --git a/test/http/http2/__init__.py b/test/http/http2/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/http/http2/__init__.py diff --git a/test/http2/test_frames.py b/test/http/http2/test_frames.py index 76a4b712..ee2edc39 100644 --- a/test/http2/test_frames.py +++ b/test/http/http2/test_frames.py @@ -2,7 +2,7 @@ import cStringIO from test import tutils from nose.tools import assert_equal from netlib import tcp -from netlib.http2.frame import * +from netlib.http.http2.frame import * def hex_to_file(data): diff --git a/test/http2/test_protocol.py b/test/http/http2/test_protocol.py index 5e2af34e..d3040266 100644 --- a/test/http2/test_protocol.py +++ b/test/http/http2/test_protocol.py @@ -1,10 +1,9 @@ import OpenSSL -from netlib import http2 -from netlib import tcp -from netlib.http2.frame import * -from test import tutils -from .. import tservers +from netlib import tcp, odict +from netlib.http import http2 +from netlib.http.http2.frame import * +from ... import tutils, tservers class EchoHandler(tcp.BaseHandler): @@ -252,11 +251,13 @@ class TestReadResponse(tservers.ServerTestBase): c.convert_to_ssl() protocol = http2.HTTP2Protocol(c) - status, headers, body = protocol.read_response() + resp = protocol.read_response() - assert headers == {':status': '200', 'etag': 'foobar'} - assert status == "200" - assert body == b'foobar' + assert resp.httpversion == "HTTP/2" + assert resp.status_code == "200" + assert resp.msg == "" + assert resp.headers.lst == [[':status', '200'], ['etag', 'foobar']] + assert resp.body == b'foobar' class TestReadEmptyResponse(tservers.ServerTestBase): @@ -275,11 +276,14 @@ class TestReadEmptyResponse(tservers.ServerTestBase): c.convert_to_ssl() protocol = http2.HTTP2Protocol(c) - status, headers, body = protocol.read_response() + resp = protocol.read_response() - assert headers == {':status': '200', 'etag': 'foobar'} - assert status == "200" - assert body == b'' + assert resp.stream_id + assert resp.httpversion == "HTTP/2" + assert resp.status_code == "200" + assert resp.msg == "" + assert resp.headers.lst == [[':status', '200'], ['etag', 'foobar']] + assert resp.body == b'' class TestReadRequest(tservers.ServerTestBase): @@ -300,11 +304,11 @@ class TestReadRequest(tservers.ServerTestBase): c.convert_to_ssl() protocol = http2.HTTP2Protocol(c, is_server=True) - stream_id, headers, body = protocol.read_request() + resp = protocol.read_request() - assert stream_id - assert headers == {':method': 'GET', ':path': '/', ':scheme': 'https'} - assert body == b'foobar' + assert resp.stream_id + assert resp.headers.lst == [[u':method', u'GET'], [u':path', u'/'], [u':scheme', u'https']] + assert resp.body == b'foobar' class TestCreateResponse(): diff --git a/test/test_http_auth.py b/test/http/test_authentication.py index c842925b..8f231643 100644 --- a/test/test_http_auth.py +++ b/test/http/test_authentication.py @@ -1,11 +1,25 @@ -from netlib import odict, http_auth, http -import tutils +import binascii + +from netlib import odict, http +from netlib.http import authentication +from .. import tutils + + +def test_parse_http_basic_auth(): + vals = ("basic", "foo", "bar") + assert http.authentication.parse_http_basic_auth( + http.authentication.assemble_http_basic_auth(*vals) + ) == vals + assert not http.authentication.parse_http_basic_auth("") + assert not http.authentication.parse_http_basic_auth("foo bar") + v = "basic " + binascii.b2a_base64("foo") + assert not http.authentication.parse_http_basic_auth(v) class TestPassManNonAnon: def test_simple(self): - p = http_auth.PassManNonAnon() + p = authentication.PassManNonAnon() assert not p.test("", "") assert p.test("user", "") @@ -15,14 +29,14 @@ class TestPassManHtpasswd: def test_file_errors(self): tutils.raises( "malformed htpasswd file", - http_auth.PassManHtpasswd, + authentication.PassManHtpasswd, tutils.test_data.path("data/server.crt")) def test_simple(self): - pm = http_auth.PassManHtpasswd(tutils.test_data.path("data/htpasswd")) + pm = authentication.PassManHtpasswd(tutils.test_data.path("data/htpasswd")) vals = ("basic", "test", "test") - http.assemble_http_basic_auth(*vals) + authentication.assemble_http_basic_auth(*vals) assert pm.test("test", "test") assert not pm.test("test", "foo") assert not pm.test("foo", "test") @@ -33,7 +47,7 @@ class TestPassManHtpasswd: class TestPassManSingleUser: def test_simple(self): - pm = http_auth.PassManSingleUser("test", "test") + pm = authentication.PassManSingleUser("test", "test") assert pm.test("test", "test") assert not pm.test("test", "foo") assert not pm.test("foo", "test") @@ -42,7 +56,7 @@ class TestPassManSingleUser: class TestNullProxyAuth: def test_simple(self): - na = http_auth.NullProxyAuth(http_auth.PassManNonAnon()) + na = authentication.NullProxyAuth(authentication.PassManNonAnon()) assert not na.auth_challenge_headers() assert na.authenticate("foo") na.clean({}) @@ -51,17 +65,17 @@ class TestNullProxyAuth: class TestBasicProxyAuth: def test_simple(self): - ba = http_auth.BasicProxyAuth(http_auth.PassManNonAnon(), "test") + ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test") h = odict.ODictCaseless() assert ba.auth_challenge_headers() assert not ba.authenticate(h) def test_authenticate_clean(self): - ba = http_auth.BasicProxyAuth(http_auth.PassManNonAnon(), "test") + ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test") hdrs = odict.ODictCaseless() vals = ("basic", "foo", "bar") - hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)] + hdrs[ba.AUTH_HEADER] = [authentication.assemble_http_basic_auth(*vals)] assert ba.authenticate(hdrs) ba.clean(hdrs) @@ -74,12 +88,12 @@ class TestBasicProxyAuth: assert not ba.authenticate(hdrs) vals = ("foo", "foo", "bar") - hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)] + hdrs[ba.AUTH_HEADER] = [authentication.assemble_http_basic_auth(*vals)] assert not ba.authenticate(hdrs) - ba = http_auth.BasicProxyAuth(http_auth.PassMan(), "test") + ba = authentication.BasicProxyAuth(authentication.PassMan(), "test") vals = ("basic", "foo", "bar") - hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)] + hdrs[ba.AUTH_HEADER] = [authentication.assemble_http_basic_auth(*vals)] assert not ba.authenticate(hdrs) @@ -91,19 +105,19 @@ class TestAuthAction: def test_nonanonymous(self): m = Bunch() - aa = http_auth.NonanonymousAuthAction(None, "authenticator") + aa = authentication.NonanonymousAuthAction(None, "authenticator") aa(None, m, None, None) assert m.authenticator def test_singleuser(self): m = Bunch() - aa = http_auth.SingleuserAuthAction(None, "authenticator") + aa = authentication.SingleuserAuthAction(None, "authenticator") aa(None, m, "foo:bar", None) assert m.authenticator tutils.raises("invalid", aa, None, m, "foo", None) def test_httppasswd(self): m = Bunch() - aa = http_auth.HtpasswdAuthAction(None, "authenticator") + aa = authentication.HtpasswdAuthAction(None, "authenticator") aa(None, m, tutils.test_data.path("data/htpasswd"), None) assert m.authenticator diff --git a/test/test_http_cookies.py b/test/http/test_cookies.py index 070849cf..4f99593a 100644 --- a/test/test_http_cookies.py +++ b/test/http/test_cookies.py @@ -1,6 +1,6 @@ import nose.tools -from netlib import http_cookies +from netlib.http import cookies def test_read_token(): @@ -13,7 +13,7 @@ def test_read_token(): [(" foo=bar", 1), ("foo", 4)], ] for q, a in tokens: - nose.tools.eq_(http_cookies._read_token(*q), a) + nose.tools.eq_(cookies._read_token(*q), a) def test_read_quoted_string(): @@ -25,7 +25,7 @@ def test_read_quoted_string(): [('"fo\\\"" x', 0), ("fo\"", 6)], ] for q, a in tokens: - nose.tools.eq_(http_cookies._read_quoted_string(*q), a) + nose.tools.eq_(cookies._read_quoted_string(*q), a) def test_read_pairs(): @@ -60,7 +60,7 @@ def test_read_pairs(): ], ] for s, lst in vals: - ret, off = http_cookies._read_pairs(s) + ret, off = cookies._read_pairs(s) nose.tools.eq_(ret, lst) @@ -108,10 +108,10 @@ def test_pairs_roundtrips(): ] ] for s, lst in pairs: - ret, off = http_cookies._read_pairs(s) + ret, off = cookies._read_pairs(s) nose.tools.eq_(ret, lst) - s2 = http_cookies._format_pairs(lst) - ret, off = http_cookies._read_pairs(s2) + s2 = cookies._format_pairs(lst) + ret, off = cookies._read_pairs(s2) nose.tools.eq_(ret, lst) @@ -127,10 +127,10 @@ def test_cookie_roundtrips(): ], ] for s, lst in pairs: - ret = http_cookies.parse_cookie_header(s) + ret = cookies.parse_cookie_header(s) nose.tools.eq_(ret.lst, lst) - s2 = http_cookies.format_cookie_header(ret) - ret = http_cookies.parse_cookie_header(s2) + s2 = cookies.format_cookie_header(ret) + ret = cookies.parse_cookie_header(s2) nose.tools.eq_(ret.lst, lst) @@ -180,10 +180,10 @@ def test_parse_set_cookie_pairs(): ], ] for s, lst in pairs: - ret = http_cookies._parse_set_cookie_pairs(s) + ret = cookies._parse_set_cookie_pairs(s) nose.tools.eq_(ret, lst) - s2 = http_cookies._format_set_cookie_pairs(ret) - ret2 = http_cookies._parse_set_cookie_pairs(s2) + s2 = cookies._format_set_cookie_pairs(ret) + ret2 = cookies._parse_set_cookie_pairs(s2) nose.tools.eq_(ret2, lst) @@ -205,13 +205,13 @@ def test_parse_set_cookie_header(): ] ] for s, expected in vals: - ret = http_cookies.parse_set_cookie_header(s) + ret = cookies.parse_set_cookie_header(s) if expected: assert ret[0] == expected[0] assert ret[1] == expected[1] nose.tools.eq_(ret[2].lst, expected[2]) - s2 = http_cookies.format_set_cookie_header(*ret) - ret2 = http_cookies.parse_set_cookie_header(s2) + s2 = cookies.format_set_cookie_header(*ret) + ret2 = cookies.parse_set_cookie_header(s2) assert ret2[0] == expected[0] assert ret2[1] == expected[1] nose.tools.eq_(ret2[2].lst, expected[2]) diff --git a/test/http/test_semantics.py b/test/http/test_semantics.py new file mode 100644 index 00000000..c4605302 --- /dev/null +++ b/test/http/test_semantics.py @@ -0,0 +1,54 @@ +import cStringIO +import textwrap +import binascii + +from netlib import http, odict, tcp +from netlib.http import http1 +from .. import tutils, tservers + +def test_httperror(): + e = http.exceptions.HttpError(404, "Not found") + assert str(e) + + +def test_parse_url(): + assert not http.parse_url("") + + u = "http://foo.com:8888/test" + s, h, po, pa = http.parse_url(u) + assert s == "http" + assert h == "foo.com" + assert po == 8888 + assert pa == "/test" + + s, h, po, pa = http.parse_url("http://foo/bar") + assert s == "http" + assert h == "foo" + assert po == 80 + assert pa == "/bar" + + s, h, po, pa = http.parse_url("http://user:pass@foo/bar") + assert s == "http" + assert h == "foo" + assert po == 80 + assert pa == "/bar" + + s, h, po, pa = http.parse_url("http://foo") + assert pa == "/" + + s, h, po, pa = http.parse_url("https://foo") + assert po == 443 + + assert not http.parse_url("https://foo:bar") + assert not http.parse_url("https://foo:") + + # Invalid IDNA + assert not http.parse_url("http://\xfafoo") + # Invalid PATH + assert not http.parse_url("http:/\xc6/localhost:56121") + # Null byte in host + assert not http.parse_url("http://foo\0") + # Port out of range + assert not http.parse_url("http://foo:999999") + # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt + assert not http.parse_url('http://lo[calhost') diff --git a/test/http/test_user_agents.py b/test/http/test_user_agents.py new file mode 100644 index 00000000..0bf1bba7 --- /dev/null +++ b/test/http/test_user_agents.py @@ -0,0 +1,6 @@ +from netlib.http import user_agents + + +def test_get_shortcut(): + assert user_agents.get_by_shortcut("c")[0] == "chrome" + assert not user_agents.get_by_shortcut("_") diff --git a/test/test_http_uastrings.py b/test/test_http_uastrings.py deleted file mode 100644 index 3fa4f359..00000000 --- a/test/test_http_uastrings.py +++ /dev/null @@ -1,6 +0,0 @@ -from netlib import http_uastrings - - -def test_get_shortcut(): - assert http_uastrings.get_by_shortcut("c")[0] == "chrome" - assert not http_uastrings.get_by_shortcut("_") diff --git a/test/websockets/__init__.py b/test/websockets/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/test/websockets/__init__.py diff --git a/test/test_websockets.py b/test/websockets/test_websockets.py index 9956543b..fb7ba39a 100644 --- a/test/test_websockets.py +++ b/test/websockets/test_websockets.py @@ -2,8 +2,10 @@ import os from nose.tools import raises -from netlib import tcp, websockets, http -from . import tutils, tservers +from netlib import tcp, http, websockets +from netlib.http.exceptions import * +from netlib.http.http1 import HTTP1Protocol +from .. import tutils, tservers class WebSocketsEchoHandler(tcp.BaseHandler): @@ -12,6 +14,7 @@ class WebSocketsEchoHandler(tcp.BaseHandler): super(WebSocketsEchoHandler, self).__init__( connection, address, server ) + self.protocol = websockets.WebsocketsProtocol() self.handshake_done = False def handle(self): @@ -30,11 +33,14 @@ class WebSocketsEchoHandler(tcp.BaseHandler): frame.to_file(self.wfile) def handshake(self): - req = http.read_request(self.rfile) - key = websockets.check_client_handshake(req.headers) + http1_protocol = HTTP1Protocol(self) - self.wfile.write(http.response_preamble(101) + "\r\n") - headers = websockets.server_handshake_headers(key) + req = http1_protocol.read_request() + key = self.protocol.check_client_handshake(req.headers) + + preamble = http1_protocol.response_preamble(101) + self.wfile.write(preamble + "\r\n") + headers = self.protocol.server_handshake_headers(key) self.wfile.write(headers.format() + "\r\n") self.wfile.flush() self.handshake_done = True @@ -48,22 +54,25 @@ class WebSocketsClient(tcp.TCPClient): def __init__(self, address, source_address=None): super(WebSocketsClient, self).__init__(address, source_address) + self.protocol = websockets.WebsocketsProtocol() self.client_nonce = None def connect(self): super(WebSocketsClient, self).connect() - preamble = http.request_preamble("GET", "/") + http1_protocol = HTTP1Protocol(self) + + preamble = http1_protocol.request_preamble("GET", "/") self.wfile.write(preamble + "\r\n") - headers = websockets.client_handshake_headers() + headers = self.protocol.client_handshake_headers() self.client_nonce = headers.get_first("sec-websocket-key") self.wfile.write(headers.format() + "\r\n") self.wfile.flush() - resp = http.read_response(self.rfile, "get", None) - server_nonce = websockets.check_server_handshake(resp.headers) + resp = http1_protocol.read_response("get", None) + server_nonce = self.protocol.check_server_handshake(resp.headers) - if not server_nonce == websockets.create_server_nonce( + if not server_nonce == self.protocol.create_server_nonce( self.client_nonce): self.close() @@ -78,6 +87,9 @@ class WebSocketsClient(tcp.TCPClient): class TestWebSockets(tservers.ServerTestBase): handler = WebSocketsEchoHandler + def __init__(self): + self.protocol = websockets.WebsocketsProtocol() + def random_bytes(self, n=100): return os.urandom(n) @@ -130,26 +142,29 @@ class TestWebSockets(tservers.ServerTestBase): assert websockets.Frame.from_bytes(bytes).to_bytes() == bytes def test_check_server_handshake(self): - headers = websockets.server_handshake_headers("key") - assert websockets.check_server_handshake(headers) + headers = self.protocol.server_handshake_headers("key") + assert self.protocol.check_server_handshake(headers) headers["Upgrade"] = ["not_websocket"] - assert not websockets.check_server_handshake(headers) + assert not self.protocol.check_server_handshake(headers) def test_check_client_handshake(self): - headers = websockets.client_handshake_headers("key") - assert websockets.check_client_handshake(headers) == "key" + headers = self.protocol.client_handshake_headers("key") + assert self.protocol.check_client_handshake(headers) == "key" headers["Upgrade"] = ["not_websocket"] - assert not websockets.check_client_handshake(headers) + assert not self.protocol.check_client_handshake(headers) class BadHandshakeHandler(WebSocketsEchoHandler): def handshake(self): - client_hs = http.read_request(self.rfile) - websockets.check_client_handshake(client_hs.headers) + http1_protocol = HTTP1Protocol(self) + + client_hs = http1_protocol.read_request() + self.protocol.check_client_handshake(client_hs.headers) - self.wfile.write(http.response_preamble(101) + "\r\n") - headers = websockets.server_handshake_headers("malformed key") + preamble = http1_protocol.response_preamble(101) + self.wfile.write(preamble + "\r\n") + headers = self.protocol.server_handshake_headers("malformed key") self.wfile.write(headers.format() + "\r\n") self.wfile.flush() self.handshake_done = True |