aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/http/__init__.py (renamed from test/http2/__init__.py)0
-rw-r--r--test/http/http1/__init__.py0
-rw-r--r--test/http/http1/test_protocol.py (renamed from test/test_http.py)208
-rw-r--r--test/http/http2/__init__.py0
-rw-r--r--test/http/http2/test_frames.py (renamed from test/http2/test_frames.py)2
-rw-r--r--test/http/http2/test_protocol.py (renamed from test/http2/test_protocol.py)7
-rw-r--r--test/http/test_authentication.py (renamed from test/test_http_auth.py)35
-rw-r--r--test/http/test_cookies.py (renamed from test/test_http_cookies.py)32
-rw-r--r--test/http/test_semantics.py54
-rw-r--r--test/http/test_user_agents.py6
-rw-r--r--test/test_http_uastrings.py6
-rw-r--r--test/websockets/__init__.py0
-rw-r--r--test/websockets/test_websockets.py (renamed from test/test_websockets.py)17
13 files changed, 188 insertions, 179 deletions
diff --git a/test/http2/__init__.py b/test/http/__init__.py
index e69de29b..e69de29b 100644
--- a/test/http2/__init__.py
+++ b/test/http/__init__.py
diff --git a/test/http/http1/__init__.py b/test/http/http1/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/http/http1/__init__.py
diff --git a/test/test_http.py b/test/http/http1/test_protocol.py
index bbc78847..05e82831 100644
--- a/test/test_http.py
+++ b/test/http/http1/test_protocol.py
@@ -1,20 +1,17 @@
import cStringIO
import textwrap
import binascii
-from netlib import http, http_semantics, odict, tcp
-from . import tutils, tservers
-
-def test_httperror():
- e = http.HttpError(404, "Not found")
- assert str(e)
+from netlib import http, odict, tcp
+from netlib.http.http1 import protocol
+from ... import tutils, tservers
def test_has_chunked_encoding():
h = odict.ODictCaseless()
- assert not http.has_chunked_encoding(h)
+ assert not protocol.has_chunked_encoding(h)
h["transfer-encoding"] = ["chunked"]
- assert http.has_chunked_encoding(h)
+ assert protocol.has_chunked_encoding(h)
def test_read_chunked():
@@ -25,74 +22,74 @@ def test_read_chunked():
tutils.raises(
"malformed chunked body",
- http.read_http_body,
+ protocol.read_http_body,
s, h, None, "GET", None, True
)
s = cStringIO.StringIO("1\r\na\r\n0\r\n\r\n")
- assert http.read_http_body(s, h, None, "GET", None, True) == "a"
+ assert protocol.read_http_body(s, h, None, "GET", None, True) == "a"
s = cStringIO.StringIO("\r\n\r\n1\r\na\r\n0\r\n\r\n")
- assert http.read_http_body(s, h, None, "GET", None, True) == "a"
+ assert protocol.read_http_body(s, h, None, "GET", None, True) == "a"
s = cStringIO.StringIO("\r\n")
tutils.raises(
"closed prematurely",
- http.read_http_body,
+ protocol.read_http_body,
s, h, None, "GET", None, True
)
s = cStringIO.StringIO("1\r\nfoo")
tutils.raises(
"malformed chunked body",
- http.read_http_body,
+ protocol.read_http_body,
s, h, None, "GET", None, True
)
s = cStringIO.StringIO("foo\r\nfoo")
tutils.raises(
- http.HttpError,
- http.read_http_body,
+ protocol.HttpError,
+ protocol.read_http_body,
s, h, None, "GET", None, True
)
s = cStringIO.StringIO("5\r\naaaaa\r\n0\r\n\r\n")
- tutils.raises("too large", http.read_http_body, s, h, 2, "GET", None, True)
+ tutils.raises("too large", protocol.read_http_body, s, h, 2, "GET", None, True)
def test_connection_close():
h = odict.ODictCaseless()
- assert http.connection_close((1, 0), h)
- assert not http.connection_close((1, 1), h)
+ assert protocol.connection_close((1, 0), h)
+ assert not protocol.connection_close((1, 1), h)
h["connection"] = ["keep-alive"]
- assert not http.connection_close((1, 1), h)
+ assert not protocol.connection_close((1, 1), h)
h["connection"] = ["close"]
- assert http.connection_close((1, 1), h)
+ assert protocol.connection_close((1, 1), h)
def test_get_header_tokens():
h = odict.ODictCaseless()
- assert http.get_header_tokens(h, "foo") == []
+ assert protocol.get_header_tokens(h, "foo") == []
h["foo"] = ["bar"]
- assert http.get_header_tokens(h, "foo") == ["bar"]
+ assert protocol.get_header_tokens(h, "foo") == ["bar"]
h["foo"] = ["bar, voing"]
- assert http.get_header_tokens(h, "foo") == ["bar", "voing"]
+ assert protocol.get_header_tokens(h, "foo") == ["bar", "voing"]
h["foo"] = ["bar, voing", "oink"]
- assert http.get_header_tokens(h, "foo") == ["bar", "voing", "oink"]
+ assert protocol.get_header_tokens(h, "foo") == ["bar", "voing", "oink"]
def test_read_http_body_request():
h = odict.ODictCaseless()
r = cStringIO.StringIO("testing")
- assert http.read_http_body(r, h, None, "GET", None, True) == ""
+ assert protocol.read_http_body(r, h, None, "GET", None, True) == ""
def test_read_http_body_response():
h = odict.ODictCaseless()
s = tcp.Reader(cStringIO.StringIO("testing"))
- assert http.read_http_body(s, h, None, "GET", 200, False) == "testing"
+ assert protocol.read_http_body(s, h, None, "GET", 200, False) == "testing"
def test_read_http_body():
@@ -100,14 +97,14 @@ def test_read_http_body():
h = odict.ODictCaseless()
h["content-length"] = [7]
s = cStringIO.StringIO("testing")
- assert http.read_http_body(s, h, None, "GET", 200, False) == "testing"
+ assert protocol.read_http_body(s, h, None, "GET", 200, False) == "testing"
# test content length: invalid header
h["content-length"] = ["foo"]
s = cStringIO.StringIO("testing")
tutils.raises(
- http.HttpError,
- http.read_http_body,
+ protocol.HttpError,
+ protocol.read_http_body,
s, h, None, "GET", 200, False
)
@@ -115,8 +112,8 @@ def test_read_http_body():
h["content-length"] = [-1]
s = cStringIO.StringIO("testing")
tutils.raises(
- http.HttpError,
- http.read_http_body,
+ protocol.HttpError,
+ protocol.read_http_body,
s, h, None, "GET", 200, False
)
@@ -124,25 +121,25 @@ def test_read_http_body():
h["content-length"] = [5]
s = cStringIO.StringIO("testing")
tutils.raises(
- http.HttpError,
- http.read_http_body,
+ protocol.HttpError,
+ protocol.read_http_body,
s, h, 4, "GET", 200, False
)
# test content length: content length < actual content
s = cStringIO.StringIO("testing")
- assert len(http.read_http_body(s, h, None, "GET", 200, False)) == 5
+ assert len(protocol.read_http_body(s, h, None, "GET", 200, False)) == 5
# test no content length: limit > actual content
h = odict.ODictCaseless()
s = tcp.Reader(cStringIO.StringIO("testing"))
- assert len(http.read_http_body(s, h, 100, "GET", 200, False)) == 7
+ assert len(protocol.read_http_body(s, h, 100, "GET", 200, False)) == 7
# test no content length: limit < actual content
s = tcp.Reader(cStringIO.StringIO("testing"))
tutils.raises(
- http.HttpError,
- http.read_http_body,
+ protocol.HttpError,
+ protocol.read_http_body,
s, h, 4, "GET", 200, False
)
@@ -150,54 +147,54 @@ def test_read_http_body():
h = odict.ODictCaseless()
h["transfer-encoding"] = ["chunked"]
s = tcp.Reader(cStringIO.StringIO("5\r\naaaaa\r\n0\r\n\r\n"))
- assert http.read_http_body(s, h, 100, "GET", 200, False) == "aaaaa"
+ assert protocol.read_http_body(s, h, 100, "GET", 200, False) == "aaaaa"
def test_expected_http_body_size():
# gibber in the content-length field
h = odict.ODictCaseless()
h["content-length"] = ["foo"]
- assert http.expected_http_body_size(h, False, "GET", 200) is None
+ assert protocol.expected_http_body_size(h, False, "GET", 200) is None
# negative number in the content-length field
h = odict.ODictCaseless()
h["content-length"] = ["-7"]
- assert http.expected_http_body_size(h, False, "GET", 200) is None
+ assert protocol.expected_http_body_size(h, False, "GET", 200) is None
# explicit length
h = odict.ODictCaseless()
h["content-length"] = ["5"]
- assert http.expected_http_body_size(h, False, "GET", 200) == 5
+ assert protocol.expected_http_body_size(h, False, "GET", 200) == 5
# no length
h = odict.ODictCaseless()
- assert http.expected_http_body_size(h, False, "GET", 200) == -1
+ assert protocol.expected_http_body_size(h, False, "GET", 200) == -1
# no length request
h = odict.ODictCaseless()
- assert http.expected_http_body_size(h, True, "GET", None) == 0
+ assert protocol.expected_http_body_size(h, True, "GET", None) == 0
def test_parse_http_protocol():
- assert http.parse_http_protocol("HTTP/1.1") == (1, 1)
- assert http.parse_http_protocol("HTTP/0.0") == (0, 0)
- assert not http.parse_http_protocol("HTTP/a.1")
- assert not http.parse_http_protocol("HTTP/1.a")
- assert not http.parse_http_protocol("foo/0.0")
- assert not http.parse_http_protocol("HTTP/x")
+ assert protocol.parse_http_protocol("HTTP/1.1") == (1, 1)
+ assert protocol.parse_http_protocol("HTTP/0.0") == (0, 0)
+ assert not protocol.parse_http_protocol("HTTP/a.1")
+ assert not protocol.parse_http_protocol("HTTP/1.a")
+ assert not protocol.parse_http_protocol("foo/0.0")
+ assert not protocol.parse_http_protocol("HTTP/x")
def test_parse_init_connect():
- assert http.parse_init_connect("CONNECT host.com:443 HTTP/1.0")
- assert not http.parse_init_connect("C\xfeONNECT host.com:443 HTTP/1.0")
- assert not http.parse_init_connect("CONNECT \0host.com:443 HTTP/1.0")
- assert not http.parse_init_connect("CONNECT host.com:444444 HTTP/1.0")
- assert not http.parse_init_connect("bogus")
- assert not http.parse_init_connect("GET host.com:443 HTTP/1.0")
- assert not http.parse_init_connect("CONNECT host.com443 HTTP/1.0")
- assert not http.parse_init_connect("CONNECT host.com:443 foo/1.0")
- assert not http.parse_init_connect("CONNECT host.com:foo HTTP/1.0")
+ assert protocol.parse_init_connect("CONNECT host.com:443 HTTP/1.0")
+ assert not protocol.parse_init_connect("C\xfeONNECT host.com:443 HTTP/1.0")
+ assert not protocol.parse_init_connect("CONNECT \0host.com:443 HTTP/1.0")
+ assert not protocol.parse_init_connect("CONNECT host.com:444444 HTTP/1.0")
+ assert not protocol.parse_init_connect("bogus")
+ assert not protocol.parse_init_connect("GET host.com:443 HTTP/1.0")
+ assert not protocol.parse_init_connect("CONNECT host.com443 HTTP/1.0")
+ assert not protocol.parse_init_connect("CONNECT host.com:443 foo/1.0")
+ assert not protocol.parse_init_connect("CONNECT host.com:foo HTTP/1.0")
def test_parse_init_proxy():
u = "GET http://foo.com:8888/test HTTP/1.1"
- m, s, h, po, pa, httpversion = http.parse_init_proxy(u)
+ m, s, h, po, pa, httpversion = protocol.parse_init_proxy(u)
assert m == "GET"
assert s == "http"
assert h == "foo.com"
@@ -206,27 +203,27 @@ def test_parse_init_proxy():
assert httpversion == (1, 1)
u = "G\xfeET http://foo.com:8888/test HTTP/1.1"
- assert not http.parse_init_proxy(u)
+ assert not protocol.parse_init_proxy(u)
- assert not http.parse_init_proxy("invalid")
- assert not http.parse_init_proxy("GET invalid HTTP/1.1")
- assert not http.parse_init_proxy("GET http://foo.com:8888/test foo/1.1")
+ assert not protocol.parse_init_proxy("invalid")
+ assert not protocol.parse_init_proxy("GET invalid HTTP/1.1")
+ assert not protocol.parse_init_proxy("GET http://foo.com:8888/test foo/1.1")
def test_parse_init_http():
u = "GET /test HTTP/1.1"
- m, u, httpversion = http.parse_init_http(u)
+ m, u, httpversion = protocol.parse_init_http(u)
assert m == "GET"
assert u == "/test"
assert httpversion == (1, 1)
u = "G\xfeET /test HTTP/1.1"
- assert not http.parse_init_http(u)
+ assert not protocol.parse_init_http(u)
- assert not http.parse_init_http("invalid")
- assert not http.parse_init_http("GET invalid HTTP/1.1")
- assert not http.parse_init_http("GET /test foo/1.1")
- assert not http.parse_init_http("GET /test\xc0 HTTP/1.1")
+ assert not protocol.parse_init_http("invalid")
+ assert not protocol.parse_init_http("GET invalid HTTP/1.1")
+ assert not protocol.parse_init_http("GET /test foo/1.1")
+ assert not protocol.parse_init_http("GET /test\xc0 HTTP/1.1")
class TestReadHeaders:
@@ -236,7 +233,7 @@ class TestReadHeaders:
data = textwrap.dedent(data)
data = data.strip()
s = cStringIO.StringIO(data)
- return http.read_headers(s)
+ return protocol.read_headers(s)
def test_read_simple(self):
data = """
@@ -290,7 +287,7 @@ class TestReadResponseNoContentLength(tservers.ServerTestBase):
def test_no_content_length(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
c.connect()
- resp = http.read_response(c.rfile, "GET", None)
+ resp = protocol.read_response(c.rfile, "GET", None)
assert resp.content == "bar\r\n\r\n"
@@ -298,7 +295,7 @@ def test_read_response():
def tst(data, method, limit, include_body=True):
data = textwrap.dedent(data)
r = cStringIO.StringIO(data)
- return http.read_response(
+ return protocol.read_response(
r, method, limit, include_body=include_body
)
@@ -307,13 +304,13 @@ def test_read_response():
data = """
HTTP/1.1 200 OK
"""
- assert tst(data, "GET", None) == http_semantics.Response(
+ assert tst(data, "GET", None) == http.Response(
(1, 1), 200, 'OK', odict.ODictCaseless(), ''
)
data = """
HTTP/1.1 200
"""
- assert tst(data, "GET", None) == http_semantics.Response(
+ assert tst(data, "GET", None) == http.Response(
(1, 1), 200, '', odict.ODictCaseless(), ''
)
data = """
@@ -330,7 +327,7 @@ def test_read_response():
HTTP/1.1 200 OK
"""
- assert tst(data, "GET", None) == http_semantics.Response(
+ assert tst(data, "GET", None) == http.Response(
(1, 1), 100, 'CONTINUE', odict.ODictCaseless(), ''
)
@@ -360,71 +357,28 @@ def test_read_response():
assert tst(data, "GET", None, include_body=False).content is None
-def test_parse_url():
- assert not http.parse_url("")
-
- u = "http://foo.com:8888/test"
- s, h, po, pa = http.parse_url(u)
- assert s == "http"
- assert h == "foo.com"
- assert po == 8888
- assert pa == "/test"
-
- s, h, po, pa = http.parse_url("http://foo/bar")
- assert s == "http"
- assert h == "foo"
- assert po == 80
- assert pa == "/bar"
-
- s, h, po, pa = http.parse_url("http://user:pass@foo/bar")
- assert s == "http"
- assert h == "foo"
- assert po == 80
- assert pa == "/bar"
-
- s, h, po, pa = http.parse_url("http://foo")
- assert pa == "/"
-
- s, h, po, pa = http.parse_url("https://foo")
- assert po == 443
-
- assert not http.parse_url("https://foo:bar")
- assert not http.parse_url("https://foo:")
-
- # Invalid IDNA
- assert not http.parse_url("http://\xfafoo")
- # Invalid PATH
- assert not http.parse_url("http:/\xc6/localhost:56121")
- # Null byte in host
- assert not http.parse_url("http://foo\0")
- # Port out of range
- assert not http.parse_url("http://foo:999999")
- # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt
- assert not http.parse_url('http://lo[calhost')
-
-
def test_parse_http_basic_auth():
vals = ("basic", "foo", "bar")
- assert http.parse_http_basic_auth(
- http.assemble_http_basic_auth(*vals)
+ assert protocol.parse_http_basic_auth(
+ protocol.assemble_http_basic_auth(*vals)
) == vals
- assert not http.parse_http_basic_auth("")
- assert not http.parse_http_basic_auth("foo bar")
+ assert not protocol.parse_http_basic_auth("")
+ assert not protocol.parse_http_basic_auth("foo bar")
v = "basic " + binascii.b2a_base64("foo")
- assert not http.parse_http_basic_auth(v)
+ assert not protocol.parse_http_basic_auth(v)
def test_get_request_line():
r = cStringIO.StringIO("\nfoo")
- assert http.get_request_line(r) == "foo"
- assert not http.get_request_line(r)
+ assert protocol.get_request_line(r) == "foo"
+ assert not protocol.get_request_line(r)
class TestReadRequest():
def tst(self, data, **kwargs):
r = cStringIO.StringIO(data)
- return http.read_request(r, **kwargs)
+ return protocol.read_request(r, **kwargs)
def test_invalid(self):
tutils.raises(
@@ -485,7 +439,7 @@ class TestReadRequest():
"Expect: 100-continue\r\n\r\n"
"foobar",
)
- v = http.read_request(r, wfile=w)
+ v = protocol.read_request(r, wfile=w)
assert w.getvalue() == "HTTP/1.1 100 Continue\r\n\r\n"
assert v.content == "foo"
assert r.read(3) == "bar"
diff --git a/test/http/http2/__init__.py b/test/http/http2/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/http/http2/__init__.py
diff --git a/test/http2/test_frames.py b/test/http/http2/test_frames.py
index 76a4b712..ee2edc39 100644
--- a/test/http2/test_frames.py
+++ b/test/http/http2/test_frames.py
@@ -2,7 +2,7 @@ import cStringIO
from test import tutils
from nose.tools import assert_equal
from netlib import tcp
-from netlib.http2.frame import *
+from netlib.http.http2.frame import *
def hex_to_file(data):
diff --git a/test/http2/test_protocol.py b/test/http/http2/test_protocol.py
index 5e2af34e..f607860e 100644
--- a/test/http2/test_protocol.py
+++ b/test/http/http2/test_protocol.py
@@ -1,10 +1,9 @@
import OpenSSL
-from netlib import http2
from netlib import tcp
-from netlib.http2.frame import *
-from test import tutils
-from .. import tservers
+from netlib.http import http2
+from netlib.http.http2.frame import *
+from ... import tutils, tservers
class EchoHandler(tcp.BaseHandler):
diff --git a/test/test_http_auth.py b/test/http/test_authentication.py
index c842925b..c0dae1a2 100644
--- a/test/test_http_auth.py
+++ b/test/http/test_authentication.py
@@ -1,11 +1,12 @@
-from netlib import odict, http_auth, http
-import tutils
+from netlib import odict, http
+from netlib.http import authentication
+from .. import tutils
class TestPassManNonAnon:
def test_simple(self):
- p = http_auth.PassManNonAnon()
+ p = authentication.PassManNonAnon()
assert not p.test("", "")
assert p.test("user", "")
@@ -15,14 +16,14 @@ class TestPassManHtpasswd:
def test_file_errors(self):
tutils.raises(
"malformed htpasswd file",
- http_auth.PassManHtpasswd,
+ authentication.PassManHtpasswd,
tutils.test_data.path("data/server.crt"))
def test_simple(self):
- pm = http_auth.PassManHtpasswd(tutils.test_data.path("data/htpasswd"))
+ pm = authentication.PassManHtpasswd(tutils.test_data.path("data/htpasswd"))
vals = ("basic", "test", "test")
- http.assemble_http_basic_auth(*vals)
+ http.http1.assemble_http_basic_auth(*vals)
assert pm.test("test", "test")
assert not pm.test("test", "foo")
assert not pm.test("foo", "test")
@@ -33,7 +34,7 @@ class TestPassManHtpasswd:
class TestPassManSingleUser:
def test_simple(self):
- pm = http_auth.PassManSingleUser("test", "test")
+ pm = authentication.PassManSingleUser("test", "test")
assert pm.test("test", "test")
assert not pm.test("test", "foo")
assert not pm.test("foo", "test")
@@ -42,7 +43,7 @@ class TestPassManSingleUser:
class TestNullProxyAuth:
def test_simple(self):
- na = http_auth.NullProxyAuth(http_auth.PassManNonAnon())
+ na = authentication.NullProxyAuth(authentication.PassManNonAnon())
assert not na.auth_challenge_headers()
assert na.authenticate("foo")
na.clean({})
@@ -51,17 +52,17 @@ class TestNullProxyAuth:
class TestBasicProxyAuth:
def test_simple(self):
- ba = http_auth.BasicProxyAuth(http_auth.PassManNonAnon(), "test")
+ ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
h = odict.ODictCaseless()
assert ba.auth_challenge_headers()
assert not ba.authenticate(h)
def test_authenticate_clean(self):
- ba = http_auth.BasicProxyAuth(http_auth.PassManNonAnon(), "test")
+ ba = authentication.BasicProxyAuth(authentication.PassManNonAnon(), "test")
hdrs = odict.ODictCaseless()
vals = ("basic", "foo", "bar")
- hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)]
+ hdrs[ba.AUTH_HEADER] = [http.http1.assemble_http_basic_auth(*vals)]
assert ba.authenticate(hdrs)
ba.clean(hdrs)
@@ -74,12 +75,12 @@ class TestBasicProxyAuth:
assert not ba.authenticate(hdrs)
vals = ("foo", "foo", "bar")
- hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)]
+ hdrs[ba.AUTH_HEADER] = [http.http1.assemble_http_basic_auth(*vals)]
assert not ba.authenticate(hdrs)
- ba = http_auth.BasicProxyAuth(http_auth.PassMan(), "test")
+ ba = authentication.BasicProxyAuth(authentication.PassMan(), "test")
vals = ("basic", "foo", "bar")
- hdrs[ba.AUTH_HEADER] = [http.assemble_http_basic_auth(*vals)]
+ hdrs[ba.AUTH_HEADER] = [http.http1.assemble_http_basic_auth(*vals)]
assert not ba.authenticate(hdrs)
@@ -91,19 +92,19 @@ class TestAuthAction:
def test_nonanonymous(self):
m = Bunch()
- aa = http_auth.NonanonymousAuthAction(None, "authenticator")
+ aa = authentication.NonanonymousAuthAction(None, "authenticator")
aa(None, m, None, None)
assert m.authenticator
def test_singleuser(self):
m = Bunch()
- aa = http_auth.SingleuserAuthAction(None, "authenticator")
+ aa = authentication.SingleuserAuthAction(None, "authenticator")
aa(None, m, "foo:bar", None)
assert m.authenticator
tutils.raises("invalid", aa, None, m, "foo", None)
def test_httppasswd(self):
m = Bunch()
- aa = http_auth.HtpasswdAuthAction(None, "authenticator")
+ aa = authentication.HtpasswdAuthAction(None, "authenticator")
aa(None, m, tutils.test_data.path("data/htpasswd"), None)
assert m.authenticator
diff --git a/test/test_http_cookies.py b/test/http/test_cookies.py
index 070849cf..4f99593a 100644
--- a/test/test_http_cookies.py
+++ b/test/http/test_cookies.py
@@ -1,6 +1,6 @@
import nose.tools
-from netlib import http_cookies
+from netlib.http import cookies
def test_read_token():
@@ -13,7 +13,7 @@ def test_read_token():
[(" foo=bar", 1), ("foo", 4)],
]
for q, a in tokens:
- nose.tools.eq_(http_cookies._read_token(*q), a)
+ nose.tools.eq_(cookies._read_token(*q), a)
def test_read_quoted_string():
@@ -25,7 +25,7 @@ def test_read_quoted_string():
[('"fo\\\"" x', 0), ("fo\"", 6)],
]
for q, a in tokens:
- nose.tools.eq_(http_cookies._read_quoted_string(*q), a)
+ nose.tools.eq_(cookies._read_quoted_string(*q), a)
def test_read_pairs():
@@ -60,7 +60,7 @@ def test_read_pairs():
],
]
for s, lst in vals:
- ret, off = http_cookies._read_pairs(s)
+ ret, off = cookies._read_pairs(s)
nose.tools.eq_(ret, lst)
@@ -108,10 +108,10 @@ def test_pairs_roundtrips():
]
]
for s, lst in pairs:
- ret, off = http_cookies._read_pairs(s)
+ ret, off = cookies._read_pairs(s)
nose.tools.eq_(ret, lst)
- s2 = http_cookies._format_pairs(lst)
- ret, off = http_cookies._read_pairs(s2)
+ s2 = cookies._format_pairs(lst)
+ ret, off = cookies._read_pairs(s2)
nose.tools.eq_(ret, lst)
@@ -127,10 +127,10 @@ def test_cookie_roundtrips():
],
]
for s, lst in pairs:
- ret = http_cookies.parse_cookie_header(s)
+ ret = cookies.parse_cookie_header(s)
nose.tools.eq_(ret.lst, lst)
- s2 = http_cookies.format_cookie_header(ret)
- ret = http_cookies.parse_cookie_header(s2)
+ s2 = cookies.format_cookie_header(ret)
+ ret = cookies.parse_cookie_header(s2)
nose.tools.eq_(ret.lst, lst)
@@ -180,10 +180,10 @@ def test_parse_set_cookie_pairs():
],
]
for s, lst in pairs:
- ret = http_cookies._parse_set_cookie_pairs(s)
+ ret = cookies._parse_set_cookie_pairs(s)
nose.tools.eq_(ret, lst)
- s2 = http_cookies._format_set_cookie_pairs(ret)
- ret2 = http_cookies._parse_set_cookie_pairs(s2)
+ s2 = cookies._format_set_cookie_pairs(ret)
+ ret2 = cookies._parse_set_cookie_pairs(s2)
nose.tools.eq_(ret2, lst)
@@ -205,13 +205,13 @@ def test_parse_set_cookie_header():
]
]
for s, expected in vals:
- ret = http_cookies.parse_set_cookie_header(s)
+ ret = cookies.parse_set_cookie_header(s)
if expected:
assert ret[0] == expected[0]
assert ret[1] == expected[1]
nose.tools.eq_(ret[2].lst, expected[2])
- s2 = http_cookies.format_set_cookie_header(*ret)
- ret2 = http_cookies.parse_set_cookie_header(s2)
+ s2 = cookies.format_set_cookie_header(*ret)
+ ret2 = cookies.parse_set_cookie_header(s2)
assert ret2[0] == expected[0]
assert ret2[1] == expected[1]
nose.tools.eq_(ret2[2].lst, expected[2])
diff --git a/test/http/test_semantics.py b/test/http/test_semantics.py
new file mode 100644
index 00000000..c4605302
--- /dev/null
+++ b/test/http/test_semantics.py
@@ -0,0 +1,54 @@
+import cStringIO
+import textwrap
+import binascii
+
+from netlib import http, odict, tcp
+from netlib.http import http1
+from .. import tutils, tservers
+
+def test_httperror():
+ e = http.exceptions.HttpError(404, "Not found")
+ assert str(e)
+
+
+def test_parse_url():
+ assert not http.parse_url("")
+
+ u = "http://foo.com:8888/test"
+ s, h, po, pa = http.parse_url(u)
+ assert s == "http"
+ assert h == "foo.com"
+ assert po == 8888
+ assert pa == "/test"
+
+ s, h, po, pa = http.parse_url("http://foo/bar")
+ assert s == "http"
+ assert h == "foo"
+ assert po == 80
+ assert pa == "/bar"
+
+ s, h, po, pa = http.parse_url("http://user:pass@foo/bar")
+ assert s == "http"
+ assert h == "foo"
+ assert po == 80
+ assert pa == "/bar"
+
+ s, h, po, pa = http.parse_url("http://foo")
+ assert pa == "/"
+
+ s, h, po, pa = http.parse_url("https://foo")
+ assert po == 443
+
+ assert not http.parse_url("https://foo:bar")
+ assert not http.parse_url("https://foo:")
+
+ # Invalid IDNA
+ assert not http.parse_url("http://\xfafoo")
+ # Invalid PATH
+ assert not http.parse_url("http:/\xc6/localhost:56121")
+ # Null byte in host
+ assert not http.parse_url("http://foo\0")
+ # Port out of range
+ assert not http.parse_url("http://foo:999999")
+ # Invalid IPv6 URL - see http://www.ietf.org/rfc/rfc2732.txt
+ assert not http.parse_url('http://lo[calhost')
diff --git a/test/http/test_user_agents.py b/test/http/test_user_agents.py
new file mode 100644
index 00000000..0bf1bba7
--- /dev/null
+++ b/test/http/test_user_agents.py
@@ -0,0 +1,6 @@
+from netlib.http import user_agents
+
+
+def test_get_shortcut():
+ assert user_agents.get_by_shortcut("c")[0] == "chrome"
+ assert not user_agents.get_by_shortcut("_")
diff --git a/test/test_http_uastrings.py b/test/test_http_uastrings.py
deleted file mode 100644
index 3fa4f359..00000000
--- a/test/test_http_uastrings.py
+++ /dev/null
@@ -1,6 +0,0 @@
-from netlib import http_uastrings
-
-
-def test_get_shortcut():
- assert http_uastrings.get_by_shortcut("c")[0] == "chrome"
- assert not http_uastrings.get_by_shortcut("_")
diff --git a/test/websockets/__init__.py b/test/websockets/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/websockets/__init__.py
diff --git a/test/test_websockets.py b/test/websockets/test_websockets.py
index ae0a5e33..07ad0452 100644
--- a/test/test_websockets.py
+++ b/test/websockets/test_websockets.py
@@ -2,8 +2,9 @@ import os
from nose.tools import raises
-from netlib import tcp, websockets, http
-from . import tutils, tservers
+from netlib import tcp, http, websockets
+from netlib.http.exceptions import *
+from .. import tutils, tservers
class WebSocketsEchoHandler(tcp.BaseHandler):
@@ -31,10 +32,10 @@ class WebSocketsEchoHandler(tcp.BaseHandler):
frame.to_file(self.wfile)
def handshake(self):
- req = http.read_request(self.rfile)
+ req = http.http1.read_request(self.rfile)
key = self.protocol.check_client_handshake(req.headers)
- self.wfile.write(http.response_preamble(101) + "\r\n")
+ self.wfile.write(http.http1.response_preamble(101) + "\r\n")
headers = self.protocol.server_handshake_headers(key)
self.wfile.write(headers.format() + "\r\n")
self.wfile.flush()
@@ -55,14 +56,14 @@ class WebSocketsClient(tcp.TCPClient):
def connect(self):
super(WebSocketsClient, self).connect()
- preamble = http.request_preamble("GET", "/")
+ preamble = http.http1.protocol.request_preamble("GET", "/")
self.wfile.write(preamble + "\r\n")
headers = self.protocol.client_handshake_headers()
self.client_nonce = headers.get_first("sec-websocket-key")
self.wfile.write(headers.format() + "\r\n")
self.wfile.flush()
- resp = http.read_response(self.rfile, "get", None)
+ resp = http.http1.protocol.read_response(self.rfile, "get", None)
server_nonce = self.protocol.check_server_handshake(resp.headers)
if not server_nonce == self.protocol.create_server_nonce(
@@ -150,10 +151,10 @@ class TestWebSockets(tservers.ServerTestBase):
class BadHandshakeHandler(WebSocketsEchoHandler):
def handshake(self):
- client_hs = http.read_request(self.rfile)
+ client_hs = http.http1.protocol.read_request(self.rfile)
self.protocol.check_client_handshake(client_hs.headers)
- self.wfile.write(http.response_preamble(101) + "\r\n")
+ self.wfile.write(http.http1.protocol.response_preamble(101) + "\r\n")
headers = self.protocol.server_handshake_headers("malformed key")
self.wfile.write(headers.format() + "\r\n")
self.wfile.flush()