aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/http/__init__.py (renamed from test/http2/__init__.py)0
-rw-r--r--test/http/http1/__init__.py0
-rw-r--r--test/http/http1/test_protocol.py (renamed from test/test_http.py)274
-rw-r--r--test/http/http2/__init__.py0
-rw-r--r--test/http/http2/test_frames.py (renamed from test/http2/test_frames.py)2
-rw-r--r--test/http/http2/test_protocol.py (renamed from test/http2/test_protocol.py)38
-rw-r--r--test/http/test_authentication.py (renamed from test/test_http_auth.py)48
-rw-r--r--test/http/test_cookies.py (renamed from test/test_http_cookies.py)32
-rw-r--r--test/http/test_semantics.py54
-rw-r--r--test/http/test_user_agents.py6
-rw-r--r--test/test_http_uastrings.py6
-rw-r--r--test/websockets/__init__.py0
-rw-r--r--test/websockets/test_websockets.py (renamed from test/test_websockets.py)57
13 files changed, 276 insertions, 241 deletions
diff --git a/test/http2/__init__.py b/test/http/__init__.py
index e69de29b..e69de29b 100644
--- a/test/http2/__init__.py
+++ b/test/http/__init__.py
diff --git a/test/http/http1/__init__.py b/test/http/http1/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/http/http1/__init__.py
diff --git a/test/test_http.py b/test/http/http1/test_protocol.py
index 2ad81d24..dcebbd5e 100644
--- a/test/test_http.py
+++ b/test/http/http1/test_protocol.py
@@ -1,75 +1,78 @@
import cStringIO
import textwrap
import binascii
+
from netlib import http, odict, tcp
-from . import tutils, tservers
+from netlib.http.http1 import HTTP1Protocol
+from ... import tutils, tservers
+
+def mock_protocol(data='', chunked=False):
+ rfile = cStringIO.StringIO(data)
+ wfile = cStringIO.StringIO()
+ return HTTP1Protocol(rfile=rfile, wfile=wfile)
-def test_httperror():
- e = http.HttpError(404, "Not found")
- assert str(e)
def test_has_chunked_encoding():
h = odict.ODictCaseless()
- assert not http.has_chunked_encoding(h)
+ assert not HTTP1Protocol.has_chunked_encoding(h)
h["transfer-encoding"] = ["chunked"]
- assert http.has_chunked_encoding(h)
+ assert HTTP1Protocol.has_chunked_encoding(h)
def test_read_chunked():
-
h = odict.ODictCaseless()
h["transfer-encoding"] = ["chunked"]
- s = cStringIO.StringIO("1\r\na\r\n0\r\n")
+ data = "1\r\na\r\n0\r\n"
tutils.raises(
"malformed chunked body",
- http.read_http_body,
- s, h, None, "GET", None, True
+ mock_protocol(data).read_http_body,
+ h, None, "GET", None, True
)
- s = cStringIO.StringIO("1\r\na\r\n0\r\n\r\n")
- assert http.read_http_body(s, h, None, "GET", None, True) == "a"
+ data = "1\r\na\r\n0\r\n\r\n"
+ assert mock_protocol(data).read_http_body(h, None, "GET", None, True) == "a"
- s = cStringIO.StringIO("\r\n\r\n1\r\na\r\n0\r\n\r\n")
- assert http.read_http_body(s, h, None, "GET", None, True) == "a"
+ data = "\r\n\r\n1\r\na\r\n0\r\n\r\n"
+ assert mock_protocol(data).read_http_body(h, None, "GET", None, True) == "a"
- s = cStringIO.StringIO("\r\n")
+ data = "\r\n"
tutils.raises(
"closed prematurely",
- http.read_http_body,
- s, h, None, "GET", None, True
+ mock_protocol(data).read_http_body,
+ h, None, "GET", None, True
)
- s = cStringIO.StringIO("1\r\nfoo")
+ data = "1\r\nfoo"
tutils.raises(
"malformed chunked body",
- http.read_http_body,
- s, h, None, "GET", None, True
+ mock_protocol(data).read_http_body,
+ h, None, "GET", None, True
)
- s = cStringIO.StringIO("foo\r\nfoo")
+ data = "foo\r\nfoo"
tutils.raises(
http.HttpError,
- http.read_http_body,
- s, h, None, "GET", None, True
+ mock_protocol(data).read_http_body,
+ h, None, "GET", None, True
)
- s = cStringIO.StringIO("5\r\naaaaa\r\n0\r\n\r\n")
- tutils.raises("too large", http.read_http_body, s, h, 2, "GET", None, True)
+ data = "5\r\naaaaa\r\n0\r\n\r\n"
+ tutils.raises("too large", mock_protocol(data).read_http_body, h, 2, "GET", None, True)
def test_connection_close():
h = odict.ODictCaseless()
- assert http.connection_close((1, 0), h)
- assert not http.connection_close((1, 1), h)
+ assert HTTP1Protocol.connection_close((1, 0), h)
+ assert not HTTP1Protocol.connection_close((1, 1), h)
h["connection"] = ["keep-alive"]
- assert not http.connection_close((1, 1), h)
+ assert not HTTP1Protocol.connection_close((1, 1), h)
h["connection"] = ["close"]
- assert http.connection_close((1, 1), h)
+ assert HTTP1Protocol.connection_close((1, 1), h)
def test_get_header_tokens():
@@ -85,119 +88,119 @@ def test_get_header_tokens():
def test_read_http_body_request():
h = odict.ODictCaseless()
- r = cStringIO.StringIO("testing")
- assert http.read_http_body(r, h, None, "GET", None, True) == ""
+ data = "testing"
+ assert mock_protocol(data).read_http_body(h, None, "GET", None, True) == ""
def test_read_http_body_response():
h = odict.ODictCaseless()
- s = tcp.Reader(cStringIO.StringIO("testing"))
- assert http.read_http_body(s, h, None, "GET", 200, False) == "testing"
+ data = "testing"
+ assert mock_protocol(data, chunked=True).read_http_body(h, None, "GET", 200, False) == "testing"
def test_read_http_body():
# test default case
h = odict.ODictCaseless()
h["content-length"] = [7]
- s = cStringIO.StringIO("testing")
- assert http.read_http_body(s, h, None, "GET", 200, False) == "testing"
+ data = "testing"
+ assert mock_protocol(data).read_http_body(h, None, "GET", 200, False) == "testing"
# test content length: invalid header
h["content-length"] = ["foo"]
- s = cStringIO.StringIO("testing")
+ data = "testing"
tutils.raises(
http.HttpError,
- http.read_http_body,
- s, h, None, "GET", 200, False
+ mock_protocol(data).read_http_body,
+ h, None, "GET", 200, False
)
# test content length: invalid header #2
h["content-length"] = [-1]
- s = cStringIO.StringIO("testing")
+ data = "testing"
tutils.raises(
http.HttpError,
- http.read_http_body,
- s, h, None, "GET", 200, False
+ mock_protocol(data).read_http_body,
+ h, None, "GET", 200, False
)
# test content length: content length > actual content
h["content-length"] = [5]
- s = cStringIO.StringIO("testing")
+ data = "testing"
tutils.raises(
http.HttpError,
- http.read_http_body,
- s, h, 4, "GET", 200, False
+ mock_protocol(data).read_http_body,
+ h, 4, "GET", 200, False
)
# test content length: content length < actual content
- s = cStringIO.StringIO("testing")
- assert len(http.read_http_body(s, h, None, "GET", 200, False)) == 5
+ data = "testing"
+ assert len(mock_protocol(data).read_http_body(h, None, "GET", 200, False)) == 5
# test no content length: limit > actual content
h = odict.ODictCaseless()
- s = tcp.Reader(cStringIO.StringIO("testing"))
- assert len(http.read_http_body(s, h, 100, "GET", 200, False)) == 7
+ data = "testing"
+ assert len(mock_protocol(data, chunked=True).read_http_body(h, 100, "GET", 200, False)) == 7
# test no content length: limit < actual content
- s = tcp.Reader(cStringIO.StringIO("testing"))
+ data = "testing"
tutils.raises(
http.HttpError,
- http.read_http_body,
- s, h, 4, "GET", 200, False
+ mock_protocol(data, chunked=True).read_http_body,
+ h, 4, "GET", 200, False
)
# test chunked
h = odict.ODictCaseless()
h["transfer-encoding"] = ["chunked"]
- s = tcp.Reader(cStringIO.StringIO("5\r\naaaaa\r\n0\r\n\r\n"))
- assert http.read_http_body(s, h, 100, "GET", 200, False) == "aaaaa"
+ data = "5\r\naaaaa\r\n0\r\n\r\n"
+ assert mock_protocol(data, chunked=True).read_http_body(h, 100, "GET", 200, False) == "aaaaa"
def test_expected_http_body_size():
# gibber in the content-length field
h = odict.ODictCaseless()
h["content-length"] = ["foo"]
- assert http.expected_http_body_size(h, False, "GET", 200) is None
+ assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) is None
# negative number in the content-length field
h = odict.ODictCaseless()
h["content-length"] = ["-7"]
- assert http.expected_http_body_size(h, False, "GET", 200) is None
+ assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) is None
# explicit length
h = odict.ODictCaseless()
h["content-length"] = ["5"]
- assert http.expected_http_body_size(h, False, "GET", 200) == 5
+ assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) == 5
# no length
h = odict.ODictCaseless()
- assert http.expected_http_body_size(h, False, "GET", 200) == -1
+ assert HTTP1Protocol.expected_http_body_size(h, False, "GET", 200) == -1
# no length request
h = odict.ODictCaseless()
- assert http.expected_http_body_size(h, True, "GET", None) == 0
+ assert HTTP1Protocol.expected_http_body_size(h, True, "GET", None) == 0
def test_parse_http_protocol():
- assert http.parse_http_protocol("HTTP/1.1") == (1, 1)
- assert http.parse_http_protocol("HTTP/0.0") == (0, 0)
- assert not http.parse_http_protocol("HTTP/a.1")
- assert not http.parse_http_protocol("HTTP/1.a")
- assert not http.parse_http_protocol("foo/0.0")
- assert not http.parse_http_protocol("HTTP/x")
+ assert HTTP1Protocol._parse_http_protocol("HTTP/1.1") == (1, 1)
+ assert HTTP1Protocol._parse_http_protocol("HTTP/0.0") == (0, 0)
+ assert not HTTP1Protocol._parse_http_protocol("HTTP/a.1")
+ assert not HTTP1Protocol._parse_http_protocol("HTTP/1.a")
+ assert not HTTP1Protocol._parse_http_protocol("foo/0.0")
+ assert not HTTP1Protocol._parse_http_protocol("HTTP/x")
def test_parse_init_connect():
- assert http.parse_init_connect("CONNECT host.com:443 HTTP/1.0")
- assert not http.parse_init_connect("C\xfeONNECT host.com:443 HTTP/1.0")
- assert not http.parse_init_connect("CONNECT \0host.com:443 HTTP/1.0")
- assert not http.parse_init_connect("CONNECT host.com:444444 HTTP/1.0")
- assert not http.parse_init_connect("bogus")
- assert not http.parse_init_connect("GET host.com:443 HTTP/1.0")
- assert not http.parse_init_connect("CONNECT host.com443 HTTP/1.0")
- assert not http.parse_init_connect("CONNECT host.com:443 foo/1.0")
- assert not http.parse_init_connect("CONNECT host.com:foo HTTP/1.0")
+ assert HTTP1Protocol._parse_init_connect("CONNECT host.com:443 HTTP/1.0")
+ assert not HTTP1Protocol._parse_init_connect("C\xfeONNECT host.com:443 HTTP/1.0")
+ assert not HTTP1Protocol._parse_init_connect("CONNECT \0host.com:443 HTTP/1.0")
+ assert not HTTP1Protocol._parse_init_connect("CONNECT host.com:444444 HTTP/1.0")
+ assert not HTTP1Protocol._parse_init_connect("bogus")
+ assert not HTTP1Protocol._parse_init_connect("GET host.com:443 HTTP/1.0")
+ assert not HTTP1Protocol._parse_init_connect("CONNECT host.com443 HTTP/1.0")
+ assert not HTTP1Protocol._parse_init_connect("CONNECT host.com:443 foo/1.0")
+ assert not HTTP1Protocol._parse_init_connect("CONNECT host.com:foo HTTP/1.0")
def test_parse_init_proxy():
u = "GET http://foo.com:8888/test HTTP/1.1"
- m, s, h, po, pa, httpversion = http.parse_init_proxy(u)
+ m, s, h, po, pa, httpversion = HTTP1Protocol._parse_init_proxy(u)
assert m == "GET"
assert s == "http"
assert h == "foo.com"
@@ -206,27 +209,27 @@ def test_parse_init_proxy():
assert httpversion == (1, 1)
u = "G\xfeET http://foo.com:8888/test HTTP/1.1"
- assert not http.parse_init_proxy(u)
+ assert not HTTP1Protocol._parse_init_proxy(u)
- assert not http.parse_init_proxy("invalid")
- assert not http.parse_init_proxy("GET invalid HTTP/1.1")
- assert not http.parse_init_proxy("GET http://foo.com:8888/test foo/1.1")
+ assert not HTTP1Protocol._parse_init_proxy("invalid")
+ assert not HTTP1Protocol._parse_init_proxy("GET invalid HTTP/1.1")
+ assert not HTTP1Protocol._parse_init_proxy("GET http://foo.com:8888/test foo/1.1")
def test_parse_init_http():
u = "GET /test HTTP/1.1"
- m, u, httpversion = http.parse_init_http(u)
+ m, u, httpversion = HTTP1Protocol._parse_init_http(u)
assert m == "GET"
assert u == "/test"
assert httpversion == (1, 1)
u = "G\xfeET /test HTTP/1.1"
- assert not http.parse_init_http(u)
+ assert not HTTP1Protocol._parse_init_http(u)
- assert not http.parse_init_http("invalid")
- assert not http.parse_init_http("GET invalid HTTP/1.1")
- assert not http.parse_init_http("GET /test foo/1.1")
- assert not http.parse_init_http("GET /test\xc0 HTTP/1.1")
+ assert not HTTP1Protocol._parse_init_http("invalid")
+ assert not HTTP1Protocol._parse_init_http("GET invalid HTTP/1.1")
+ assert not HTTP1Protocol._parse_init_http("GET /test foo/1.1")
+ assert not HTTP1Protocol._parse_init_http("GET /test\xc0 HTTP/1.1")
class TestReadHeaders:
@@ -235,8 +238,7 @@ class TestReadHeaders:
if not verbatim:
data = textwrap.dedent(data)
data = data.strip()
- s = cStringIO.StringIO(data)
- return http.read_headers(s)
+ return mock_protocol(data).read_headers()
def test_read_simple(self):
data = """
@@ -290,16 +292,15 @@ class TestReadResponseNoContentLength(tservers.ServerTestBase):
def test_no_content_length(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
- resp = http.read_response(c.rfile, "GET", None)
- assert resp.content == "bar\r\n\r\n"
+ resp = HTTP1Protocol(c).read_response("GET", None)
+ assert resp.body == "bar\r\n\r\n"
def test_read_response():
def tst(data, method, limit, include_body=True):
data = textwrap.dedent(data)
- r = cStringIO.StringIO(data)
- return http.read_response(
- r, method, limit, include_body=include_body
+ return mock_protocol(data).read_response(
+ method, limit, include_body=include_body
)
tutils.raises("server disconnect", tst, "", "GET", None)
@@ -307,13 +308,13 @@ def test_read_response():
data = """
HTTP/1.1 200 OK
"""
- assert tst(data, "GET", None) == (
+ assert tst(data, "GET", None) == http.Response(
(1, 1), 200, 'OK', odict.ODictCaseless(), ''
)
data = """
HTTP/1.1 200
"""
- assert tst(data, "GET", None) == (
+ assert tst(data, "GET", None) == http.Response(
(1, 1), 200, '', odict.ODictCaseless(), ''
)
data = """
@@ -330,7 +331,7 @@ def test_read_response():
HTTP/1.1 200 OK
"""
- assert tst(data, "GET", None) == (
+ assert tst(data, "GET", None) == http.Response(
(1, 1), 100, 'CONTINUE', odict.ODictCaseless(), ''
)
@@ -340,8 +341,8 @@ def test_read_response():
foo
"""
- assert tst(data, "GET", None)[4] == 'foo'
- assert tst(data, "HEAD", None)[4] == ''
+ assert tst(data, "GET", None).body == 'foo'
+ assert tst(data, "HEAD", None).body == ''
data = """
HTTP/1.1 200 OK
@@ -357,74 +358,20 @@ def test_read_response():
foo
"""
- assert tst(data, "GET", None, include_body=False)[4] is None
-
-
-def test_parse_url():
- assert not http.parse_url("")
-
- u = "http://foo.com:8888/test"
- s, h, po, pa = http.parse_url(u)
- assert s == "http"
- assert h == "foo.com"
- assert po == 8888
- assert pa == "/test"
-
- s, h, po, pa = http.parse_url("http://foo/bar")
- assert s == "http"
- assert h == "foo"
- assert po == 80
- assert pa == "/bar"
-
- s, h, po, pa = http.parse_url("http://user:pass@foo/bar")
- assert s == "http"
- assert h == "foo"
- assert po == 80
- assert pa == "/bar"
-
- s, h, po, pa = http.parse_url("http://foo")
- assert pa == "/"
-
- s, h, po, pa = http.parse_url("https://foo")
- assert po == 443
-
- assert not http.parse_url("https://foo:bar")
- assert not http.parse_url("https://foo:")
-
- # Invalid IDNA
- assert not http.parse_url("http://\xfafoo")
- # Invalid PATH
- assert not http.parse_url("http:/\xc6/localhost:56121")
- # Null byte in host
- assert not http.parse_url("http://foo\0")
- # Port out of range
- assert not http.parse_url("http://foo:999999")
- # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt
- assert not http.parse_url('http://lo[calhost')
-
-
-def test_parse_http_basic_auth():
- vals = ("basic", "foo", "bar")
- assert http.parse_http_basic_auth(
- http.assemble_http_basic_auth(*vals)
- ) == vals
- assert not http.parse_http_basic_auth("")
- assert not http.parse_http_basic_auth("foo bar")
- v = "basic " + binascii.b2a_base64("foo")
- assert not http.parse_http_basic_auth(v)
+ assert tst(data, "GET", None, include_body=False).body is None
def test_get_request_line():
- r = cStringIO.StringIO("\nfoo")
- assert http.get_request_line(r) == "foo"
- assert not http.get_request_line(r)
+ data = "\nfoo"
+ p = mock_protocol(data)
+ assert p._get_request_line() == "foo"
+ assert not p._get_request_line()
class TestReadRequest():
def tst(self, data, **kwargs):
- r = cStringIO.StringIO(data)
- return http.read_request(r, **kwargs)
+ return mock_protocol(data).read_request(**kwargs)
def test_invalid(self):
tutils.raises(
@@ -478,14 +425,15 @@ class TestReadRequest():
assert v.host == "foo.com"
def test_expect(self):
- w = cStringIO.StringIO()
- r = cStringIO.StringIO(
+ data = "".join(
"GET / HTTP/1.1\r\n"
"Content-Length: 3\r\n"
"Expect: 100-continue\r\n\r\n"
- "foobar",
+ "foobar"
)
- v = http.read_request(r, wfile=w)
- assert w.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
- assert v.content == "foo"
- assert r.read(3) == "bar"
+
+ p = mock_protocol(data)
+ v = p.read_request()
+ assert p.tcp_handler.wfile.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
+ assert v.body == "foo"
+ assert p.tcp_handler.rfile.read(3) == "bar"
diff --git a/test/http/http2/__init__.py b/test/http/http2/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/http/http2/__init__.py
diff --git a/test/http2/test_frames.py b/test/http/http2/test_frames.py
index 76a4b712..ee2edc39 100644
--- a/test/http2/test_frames.py
+++ b/test/http/http2/test_frames.py
@@ -2,7 +2,7 @@ import cStringIO
from test import tutils
from nose.tools import assert_equal
from netlib import tcp
-from netlib.http2.frame import *
+from netlib.http.http2.frame import *
def hex_to_file(data):
diff --git a/test/http2/test_protocol.py b/test/http/http2/test_protocol.py
index 5e2af34e..d3040266 100644
--- a/test/http2/test_protocol.py
+++ b/test/http/http2/test_protocol.py
@@ -1,10 +1,9 @@
import OpenSSL
-from netlib import http2
-from netlib import tcp
-from netlib.http2.frame import *
-from test import tutils
-from .. import tservers
+from netlib import tcp, odict
+from netlib.http import http2
+from netlib.http.http2.frame import *
+from ... import tutils, tservers
class EchoHandler(tcp.BaseHandler):
@@ -252,11 +251,13 @@ class TestReadResponse(tservers.ServerTestBase):
c.convert_to_ssl()
protocol = http2.HTTP2Protocol(c)
- status, headers, body = protocol.read_response()
+ resp = protocol.read_response()
- assert headers == {':status': '200', 'etag': 'foobar'}
- assert status == "200"
- assert body == b'foobar'
+ assert resp.httpversion == "HTTP/2"
+ assert resp.status_code == "200"
+ assert resp.msg == ""
+ assert resp.headers.lst == [[':status', '200'], ['etag', 'foobar']]
+ assert resp.body == b'foobar'
class TestReadEmptyResponse(tservers.ServerTestBase):
@@ -275,11 +276,14 @@ class TestReadEmptyResponse(tservers.ServerTestBase):
c.convert_to_ssl()
protocol = http2.HTTP2Protocol(c)
- status, headers, body = protocol.read_response()
+ resp = protocol.read_response()
- assert headers == {':status': '200', 'etag': 'foobar'}
- assert status == "200"
- assert body == b''
+ assert resp.stream_id
+ assert resp.httpversion == "HTTP/2"
+ assert resp.status_code == "200"
+ assert resp.msg == ""
+ assert resp.headers.lst == [[':status', '200'], ['etag', 'foobar']]
+ assert resp.body == b''
class TestReadRequest(tservers.ServerTestBase):
@@ -300,11 +304,11 @@ class TestReadRequest(tservers.ServerTestBase):
c.convert_to_ssl()
protocol = http2.HTTP2Protocol(c, is_server=True)
- stream_id, headers, body = protocol.read_request()
+ resp = protocol.read_request()
- assert stream_id
- assert headers == {':method': 'GET', ':path': '/', ':scheme': 'https'}
- assert body == b'foobar'
+ assert resp.stream_id
+ assert resp.headers.lst == [[u':method', u'GET'], [u':path', u'/'], [u':scheme', u'https']]
+ assert resp.body == b'foobar'
class TestCreateResponse():
diff --git a/test/test_http_auth.py b/test/http/test_authentication.py
index c842925b..8f231643 100644
--- a/test/test_http_auth.py
+++ b/test/http/test_authentication.py
@@ -1,11 +1,25 @@
-from netlib import odict, http_auth, http
-import tutils
+import binascii
+
+from netlib import odict, http
+from netlib.http import authentication
+from .. import tutils
+
+
+def test_parse_http_basic_auth():
+ vals = ("basic", "foo", "bar")
+ assert http.authentication.parse_http_basic_auth(
+ http.authentication.assemble_http_basic_auth(*vals)
+ ) == vals
+ assert not http.authentication.parse_http_basic_auth("")
+ assert not http.authentication.parse_http_basic_auth("foo bar")
+ v = "basic " + binascii.b2a_base64("foo")
+ assert not http.authentication.parse_http_basic_auth(v)
class TestPassManNonAnon:
def test_simple(self):
- p = http_auth.PassManNonAnon()
+ p = authentication.PassManNonAnon()
assert not p.test("", "")
assert p.test("user", "")
@@ -15,14 +29,14 @@ class TestPassManHtpasswd:
def test_file_errors(self):
tutils.raises(
"malformed htpasswd file",
- http_auth.PassManHtpasswd,
+ authentication.PassManHtpasswd,
tutils.test_data.path("data/server.crt"))
def test_simple(self):
- pm = http_auth.PassManHtpasswd(tutils.test_data.path("data/htpasswd"))
+ pm = authentication.PassManHtpasswd(tutils.test_data.path("data/htpasswd"))
vals = ("basic", "test", "test")
- http.assemble_http_basic_auth(*vals)
+ authentication.assemble_http_basic_auth(*vals)
assert pm.test("test", "test")
assert not pm.test("test", "foo")
assert not pm.test("foo", "test")
@@ -33,7 +47,7 @@ class TestPassManHtpasswd:
class TestPassManSingleUser:
def test_simple(self):
- pm = http_auth.PassManSingleUser("test", "test")
+ pm = authentication.PassManSingleUser("test", "test")
assert pm.test("test", "test")
assert not pm.test("test", "foo")
assert not pm.test("foo", "test")
@@ -42,7 +56,7 @@ class TestPassManSingleUser:
class TestNullProxyAuth:
def test_simple(self):
- na = http_auth.NullProxyAuth(http_auth.PassManNonAnon())
+ na = authentication.NullProxyAuth(authentication.PassManNonAnon())
assert not na.auth_challenge_headers()
assert na.authenticate("foo")
na.clean({})
@@ -51,17 +65,17 @@ class TestNullProxyAuth:
class TestBasicProxyAuth:
def test_simple(self):
- ba = http_auth.BasicProxyAuth(http_auth.PassManNonAnon(), "test")
+ ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
h = odict.ODictCaseless()
assert ba.auth_challenge_headers()
assert not ba.authenticate(h)
def test_authenticate_clean(self):
- ba = http_auth.BasicProxyAuth(http_auth.PassManNonAnon(), "test")
+ ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
hdrs = odict.ODictCaseless()
vals = ("basic", "foo", "bar")
- hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)]
+ hdrs[ba.AUTH_HEADER] = [authentication.assemble_http_basic_auth(*vals)]
assert ba.authenticate(hdrs)
ba.clean(hdrs)
@@ -74,12 +88,12 @@ class TestBasicProxyAuth:
assert not ba.authenticate(hdrs)
vals = ("foo", "foo", "bar")
- hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)]
+ hdrs[ba.AUTH_HEADER] = [authentication.assemble_http_basic_auth(*vals)]
assert not ba.authenticate(hdrs)
- ba = http_auth.BasicProxyAuth(http_auth.PassMan(), "test")
+ ba = authentication.BasicProxyAuth(authentication.PassMan(), "test")
vals = ("basic", "foo", "bar")
- hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)]
+ hdrs[ba.AUTH_HEADER] = [authentication.assemble_http_basic_auth(*vals)]
assert not ba.authenticate(hdrs)
@@ -91,19 +105,19 @@ class TestAuthAction:
def test_nonanonymous(self):
m = Bunch()
- aa = http_auth.NonanonymousAuthAction(None, "authenticator")
+ aa = authentication.NonanonymousAuthAction(None, "authenticator")
aa(None, m, None, None)
assert m.authenticator
def test_singleuser(self):
m = Bunch()
- aa = http_auth.SingleuserAuthAction(None, "authenticator")
+ aa = authentication.SingleuserAuthAction(None, "authenticator")
aa(None, m, "foo:bar", None)
assert m.authenticator
tutils.raises("invalid", aa, None, m, "foo", None)
def test_httppasswd(self):
m = Bunch()
- aa = http_auth.HtpasswdAuthAction(None, "authenticator")
+ aa = authentication.HtpasswdAuthAction(None, "authenticator")
aa(None, m, tutils.test_data.path("data/htpasswd"), None)
assert m.authenticator
diff --git a/test/test_http_cookies.py b/test/http/test_cookies.py
index 070849cf..4f99593a 100644
--- a/test/test_http_cookies.py
+++ b/test/http/test_cookies.py
@@ -1,6 +1,6 @@
import nose.tools
-from netlib import http_cookies
+from netlib.http import cookies
def test_read_token():
@@ -13,7 +13,7 @@ def test_read_token():
[(" foo=bar", 1), ("foo", 4)],
]
for q, a in tokens:
- nose.tools.eq_(http_cookies._read_token(*q), a)
+ nose.tools.eq_(cookies._read_token(*q), a)
def test_read_quoted_string():
@@ -25,7 +25,7 @@ def test_read_quoted_string():
[('"fo\\\"" x', 0), ("fo\"", 6)],
]
for q, a in tokens:
- nose.tools.eq_(http_cookies._read_quoted_string(*q), a)
+ nose.tools.eq_(cookies._read_quoted_string(*q), a)
def test_read_pairs():
@@ -60,7 +60,7 @@ def test_read_pairs():
],
]
for s, lst in vals:
- ret, off = http_cookies._read_pairs(s)
+ ret, off = cookies._read_pairs(s)
nose.tools.eq_(ret, lst)
@@ -108,10 +108,10 @@ def test_pairs_roundtrips():
]
]
for s, lst in pairs:
- ret, off = http_cookies._read_pairs(s)
+ ret, off = cookies._read_pairs(s)
nose.tools.eq_(ret, lst)
- s2 = http_cookies._format_pairs(lst)
- ret, off = http_cookies._read_pairs(s2)
+ s2 = cookies._format_pairs(lst)
+ ret, off = cookies._read_pairs(s2)
nose.tools.eq_(ret, lst)
@@ -127,10 +127,10 @@ def test_cookie_roundtrips():
],
]
for s, lst in pairs:
- ret = http_cookies.parse_cookie_header(s)
+ ret = cookies.parse_cookie_header(s)
nose.tools.eq_(ret.lst, lst)
- s2 = http_cookies.format_cookie_header(ret)
- ret = http_cookies.parse_cookie_header(s2)
+ s2 = cookies.format_cookie_header(ret)
+ ret = cookies.parse_cookie_header(s2)
nose.tools.eq_(ret.lst, lst)
@@ -180,10 +180,10 @@ def test_parse_set_cookie_pairs():
],
]
for s, lst in pairs:
- ret = http_cookies._parse_set_cookie_pairs(s)
+ ret = cookies._parse_set_cookie_pairs(s)
nose.tools.eq_(ret, lst)
- s2 = http_cookies._format_set_cookie_pairs(ret)
- ret2 = http_cookies._parse_set_cookie_pairs(s2)
+ s2 = cookies._format_set_cookie_pairs(ret)
+ ret2 = cookies._parse_set_cookie_pairs(s2)
nose.tools.eq_(ret2, lst)
@@ -205,13 +205,13 @@ def test_parse_set_cookie_header():
]
]
for s, expected in vals:
- ret = http_cookies.parse_set_cookie_header(s)
+ ret = cookies.parse_set_cookie_header(s)
if expected:
assert ret[0] == expected[0]
assert ret[1] == expected[1]
nose.tools.eq_(ret[2].lst, expected[2])
- s2 = http_cookies.format_set_cookie_header(*ret)
- ret2 = http_cookies.parse_set_cookie_header(s2)
+ s2 = cookies.format_set_cookie_header(*ret)
+ ret2 = cookies.parse_set_cookie_header(s2)
assert ret2[0] == expected[0]
assert ret2[1] == expected[1]
nose.tools.eq_(ret2[2].lst, expected[2])
diff --git a/test/http/test_semantics.py b/test/http/test_semantics.py
new file mode 100644
index 00000000..c4605302
--- /dev/null
+++ b/test/http/test_semantics.py
@@ -0,0 +1,54 @@
+import cStringIO
+import textwrap
+import binascii
+
+from netlib import http, odict, tcp
+from netlib.http import http1
+from .. import tutils, tservers
+
+def test_httperror():
+ e = http.exceptions.HttpError(404, "Not found")
+ assert str(e)
+
+
+def test_parse_url():
+ assert not http.parse_url("")
+
+ u = "http://foo.com:8888/test"
+ s, h, po, pa = http.parse_url(u)
+ assert s == "http"
+ assert h == "foo.com"
+ assert po == 8888
+ assert pa == "/test"
+
+ s, h, po, pa = http.parse_url("http://foo/bar")
+ assert s == "http"
+ assert h == "foo"
+ assert po == 80
+ assert pa == "/bar"
+
+ s, h, po, pa = http.parse_url("http://user:pass@foo/bar")
+ assert s == "http"
+ assert h == "foo"
+ assert po == 80
+ assert pa == "/bar"
+
+ s, h, po, pa = http.parse_url("http://foo")
+ assert pa == "/"
+
+ s, h, po, pa = http.parse_url("https://foo")
+ assert po == 443
+
+ assert not http.parse_url("https://foo:bar")
+ assert not http.parse_url("https://foo:")
+
+ # Invalid IDNA
+ assert not http.parse_url("http://\xfafoo")
+ # Invalid PATH
+ assert not http.parse_url("http:/\xc6/localhost:56121")
+ # Null byte in host
+ assert not http.parse_url("http://foo\0")
+ # Port out of range
+ assert not http.parse_url("http://foo:999999")
+ # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt
+ assert not http.parse_url('http://lo[calhost')
diff --git a/test/http/test_user_agents.py b/test/http/test_user_agents.py
new file mode 100644
index 00000000..0bf1bba7
--- /dev/null
+++ b/test/http/test_user_agents.py
@@ -0,0 +1,6 @@
+from netlib.http import user_agents
+
+
+def test_get_shortcut():
+ assert user_agents.get_by_shortcut("c")[0] == "chrome"
+ assert not user_agents.get_by_shortcut("_")
diff --git a/test/test_http_uastrings.py b/test/test_http_uastrings.py
deleted file mode 100644
index 3fa4f359..00000000
--- a/test/test_http_uastrings.py
+++ /dev/null
@@ -1,6 +0,0 @@
-from netlib import http_uastrings
-
-
-def test_get_shortcut():
- assert http_uastrings.get_by_shortcut("c")[0] == "chrome"
- assert not http_uastrings.get_by_shortcut("_")
diff --git a/test/websockets/__init__.py b/test/websockets/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/websockets/__init__.py
diff --git a/test/test_websockets.py b/test/websockets/test_websockets.py
index 9956543b..fb7ba39a 100644
--- a/test/test_websockets.py
+++ b/test/websockets/test_websockets.py
@@ -2,8 +2,10 @@ import os
from nose.tools import raises
-from netlib import tcp, websockets, http
-from . import tutils, tservers
+from netlib import tcp, http, websockets
+from netlib.http.exceptions import *
+from netlib.http.http1 import HTTP1Protocol
+from .. import tutils, tservers
class WebSocketsEchoHandler(tcp.BaseHandler):
@@ -12,6 +14,7 @@ class WebSocketsEchoHandler(tcp.BaseHandler):
super(WebSocketsEchoHandler, self).__init__(
connection, address, server
)
+ self.protocol = websockets.WebsocketsProtocol()
self.handshake_done = False
def handle(self):
@@ -30,11 +33,14 @@ class WebSocketsEchoHandler(tcp.BaseHandler):
frame.to_file(self.wfile)
def handshake(self):
- req = http.read_request(self.rfile)
- key = websockets.check_client_handshake(req.headers)
+ http1_protocol = HTTP1Protocol(self)
- self.wfile.write(http.response_preamble(101) + "\r\n")
- headers = websockets.server_handshake_headers(key)
+ req = http1_protocol.read_request()
+ key = self.protocol.check_client_handshake(req.headers)
+
+ preamble = http1_protocol.response_preamble(101)
+ self.wfile.write(preamble + "\r\n")
+ headers = self.protocol.server_handshake_headers(key)
self.wfile.write(headers.format() + "\r\n")
self.wfile.flush()
self.handshake_done = True
@@ -48,22 +54,25 @@ class WebSocketsClient(tcp.TCPClient):
def __init__(self, address, source_address=None):
super(WebSocketsClient, self).__init__(address, source_address)
+ self.protocol = websockets.WebsocketsProtocol()
self.client_nonce = None
def connect(self):
super(WebSocketsClient, self).connect()
- preamble = http.request_preamble("GET", "/")
+ http1_protocol = HTTP1Protocol(self)
+
+ preamble = http1_protocol.request_preamble("GET", "/")
self.wfile.write(preamble + "\r\n")
- headers = websockets.client_handshake_headers()
+ headers = self.protocol.client_handshake_headers()
self.client_nonce = headers.get_first("sec-websocket-key")
self.wfile.write(headers.format() + "\r\n")
self.wfile.flush()
- resp = http.read_response(self.rfile, "get", None)
- server_nonce = websockets.check_server_handshake(resp.headers)
+ resp = http1_protocol.read_response("get", None)
+ server_nonce = self.protocol.check_server_handshake(resp.headers)
- if not server_nonce == websockets.create_server_nonce(
+ if not server_nonce == self.protocol.create_server_nonce(
self.client_nonce):
self.close()
@@ -78,6 +87,9 @@ class WebSocketsClient(tcp.TCPClient):
class TestWebSockets(tservers.ServerTestBase):
handler = WebSocketsEchoHandler
+ def __init__(self):
+ self.protocol = websockets.WebsocketsProtocol()
+
def random_bytes(self, n=100):
return os.urandom(n)
@@ -130,26 +142,29 @@ class TestWebSockets(tservers.ServerTestBase):
assert websockets.Frame.from_bytes(bytes).to_bytes() == bytes
def test_check_server_handshake(self):
- headers = websockets.server_handshake_headers("key")
- assert websockets.check_server_handshake(headers)
+ headers = self.protocol.server_handshake_headers("key")
+ assert self.protocol.check_server_handshake(headers)
headers["Upgrade"] = ["not_websocket"]
- assert not websockets.check_server_handshake(headers)
+ assert not self.protocol.check_server_handshake(headers)
def test_check_client_handshake(self):
- headers = websockets.client_handshake_headers("key")
- assert websockets.check_client_handshake(headers) == "key"
+ headers = self.protocol.client_handshake_headers("key")
+ assert self.protocol.check_client_handshake(headers) == "key"
headers["Upgrade"] = ["not_websocket"]
- assert not websockets.check_client_handshake(headers)
+ assert not self.protocol.check_client_handshake(headers)
class BadHandshakeHandler(WebSocketsEchoHandler):
def handshake(self):
- client_hs = http.read_request(self.rfile)
- websockets.check_client_handshake(client_hs.headers)
+ http1_protocol = HTTP1Protocol(self)
+
+ client_hs = http1_protocol.read_request()
+ self.protocol.check_client_handshake(client_hs.headers)
- self.wfile.write(http.response_preamble(101) + "\r\n")
- headers = websockets.server_handshake_headers("malformed key")
+ preamble = http1_protocol.response_preamble(101)
+ self.wfile.write(preamble + "\r\n")
+ headers = self.protocol.server_handshake_headers("malformed key")
self.wfile.write(headers.format() + "\r\n")
self.wfile.flush()
self.handshake_done = True