diff options
-rw-r--r-- | netlib/http.py | 19 | ||||
-rw-r--r-- | netlib/utils.py | 4 | ||||
-rw-r--r-- | test/test_http.py | 70 |
3 files changed, 74 insertions, 19 deletions
diff --git a/netlib/http.py b/netlib/http.py index 5501ce73..b925fe87 100644 --- a/netlib/http.py +++ b/netlib/http.py @@ -331,12 +331,15 @@ def read_response(rfile, request_method, body_size_limit, include_body=True): Return an (httpversion, code, msg, headers, content) tuple. By default, both response header and body are read. - If include_body=False is specified, content may be one of the following: + If include_body=False is specified, content may be one of the + following: - None, if the response is technically allowed to have a response body - - "", if the response must not have a response body (e.g. it's a response to a HEAD request) + - "", if the response must not have a response body (e.g. it's a + response to a HEAD request) """ line = rfile.readline() - if line == "\r\n" or line == "\n": # Possible leftover from previous message + # Possible leftover from previous message + if line == "\r\n" or line == "\n": line = rfile.readline() if not line: raise HttpErrorConnClosed(502, "Server disconnect.") @@ -373,7 +376,15 @@ def read_http_body(*args, **kwargs): ) -def read_http_body_chunked(rfile, headers, limit, request_method, response_code, is_request, max_chunk_size=None): +def read_http_body_chunked( + rfile, + headers, + limit, + request_method, + response_code, + is_request, + max_chunk_size=None +): """ Read an HTTP message body: diff --git a/netlib/utils.py b/netlib/utils.py index 57532453..66bbdb5e 100644 --- a/netlib/utils.py +++ b/netlib/utils.py @@ -8,9 +8,11 @@ def isascii(s): return False return True + # best way to do it in python 2.x def bytes_to_int(i): - return int(i.encode('hex'), 16) + return int(i.encode('hex'), 16) + def cleanBin(s, fixspacing=False): """ diff --git a/test/test_http.py b/test/test_http.py index 4f8ef2c5..8b99c769 100644 --- a/test/test_http.py +++ b/test/test_http.py @@ -1,4 +1,6 @@ -import cStringIO, textwrap, binascii +import cStringIO +import textwrap +import binascii from netlib import http, odict, tcp, test import tutils @@ -21,7 +23,11 @@ def test_read_chunked(): h["transfer-encoding"] = ["chunked"] s = cStringIO.StringIO("1\r\na\r\n0\r\n") - tutils.raises("malformed chunked body", http.read_http_body, s, h, None, "GET", None, True) + tutils.raises( + "malformed chunked body", + http.read_http_body, + s, h, None, "GET", None, True + ) s = cStringIO.StringIO("1\r\na\r\n0\r\n\r\n") assert http.read_http_body(s, h, None, "GET", None, True) == "a" @@ -30,13 +36,25 @@ def test_read_chunked(): assert http.read_http_body(s, h, None, "GET", None, True) == "a" s = cStringIO.StringIO("\r\n") - tutils.raises("closed prematurely", http.read_http_body, s, h, None, "GET", None, True) + tutils.raises( + "closed prematurely", + http.read_http_body, + s, h, None, "GET", None, True + ) s = cStringIO.StringIO("1\r\nfoo") - tutils.raises("malformed chunked body", http.read_http_body, s, h, None, "GET", None, True) + tutils.raises( + "malformed chunked body", + http.read_http_body, + s, h, None, "GET", None, True + ) s = cStringIO.StringIO("foo\r\nfoo") - tutils.raises(http.HttpError, http.read_http_body, s, h, None, "GET", None, True) + tutils.raises( + http.HttpError, + http.read_http_body, + s, h, None, "GET", None, True + ) s = cStringIO.StringIO("5\r\naaaaa\r\n0\r\n\r\n") tutils.raises("too large", http.read_http_body, s, h, 2, "GET", None, True) @@ -87,17 +105,29 @@ def test_read_http_body(): # test content length: invalid header h["content-length"] = ["foo"] s = cStringIO.StringIO("testing") - tutils.raises(http.HttpError, http.read_http_body, s, h, None, "GET", 200, False) + tutils.raises( + http.HttpError, + http.read_http_body, + s, h, None, "GET", 200, False + ) # test content length: invalid header #2 h["content-length"] = [-1] s = cStringIO.StringIO("testing") - tutils.raises(http.HttpError, http.read_http_body, s, h, None, "GET", 200, False) + tutils.raises( + http.HttpError, + http.read_http_body, + s, h, None, "GET", 200, False + ) # test content length: content length > actual content h["content-length"] = [5] s = cStringIO.StringIO("testing") - tutils.raises(http.HttpError, http.read_http_body, s, h, 4, "GET", 200, False) + tutils.raises( + http.HttpError, + http.read_http_body, + s, h, 4, "GET", 200, False + ) # test content length: content length < actual content s = cStringIO.StringIO("testing") @@ -110,7 +140,11 @@ def test_read_http_body(): # test no content length: limit < actual content s = cStringIO.StringIO("testing") - tutils.raises(http.HttpError, http.read_http_body, s, h, 4, "GET", 200, False) + tutils.raises( + http.HttpError, + http.read_http_body, + s, h, 4, "GET", 200, False + ) # test chunked h = odict.ODictCaseless() @@ -271,11 +305,15 @@ def test_read_response(): data = """ HTTP/1.1 200 OK """ - assert tst(data, "GET", None) == ((1, 1), 200, 'OK', odict.ODictCaseless(), '') + assert tst(data, "GET", None) == ( + (1, 1), 200, 'OK', odict.ODictCaseless(), '' + ) data = """ HTTP/1.1 200 """ - assert tst(data, "GET", None) == ((1, 1), 200, '', odict.ODictCaseless(), '') + assert tst(data, "GET", None) == ( + (1, 1), 200, '', odict.ODictCaseless(), '' + ) data = """ HTTP/x 200 OK """ @@ -290,7 +328,9 @@ def test_read_response(): HTTP/1.1 200 OK """ - assert tst(data, "GET", None) == ((1, 1), 100, 'CONTINUE', odict.ODictCaseless(), '') + assert tst(data, "GET", None) == ( + (1, 1), 100, 'CONTINUE', odict.ODictCaseless(), '' + ) data = """ HTTP/1.1 200 OK @@ -315,7 +355,7 @@ def test_read_response(): foo """ - assert tst(data, "GET", None, include_body=False)[4] == None + assert tst(data, "GET", None, include_body=False)[4] is None def test_parse_url(): @@ -363,7 +403,9 @@ def test_parse_url(): def test_parse_http_basic_auth(): vals = ("basic", "foo", "bar") - assert http.parse_http_basic_auth(http.assemble_http_basic_auth(*vals)) == vals + assert http.parse_http_basic_auth( + http.assemble_http_basic_auth(*vals) + ) == vals assert not http.parse_http_basic_auth("") assert not http.parse_http_basic_auth("foo bar") v = "basic " + binascii.b2a_base64("foo") |