aboutsummaryrefslogtreecommitdiffstats
path: root/netlib
diff options
context:
space:
mode:
Diffstat (limited to 'netlib')
-rw-r--r--netlib/http/headers.py4
-rw-r--r--netlib/http/message.py6
-rw-r--r--netlib/http/request.py4
-rw-r--r--netlib/odict.py6
-rw-r--r--netlib/strutils.py103
-rw-r--r--netlib/utils.py102
-rw-r--r--netlib/websockets/frame.py3
-rw-r--r--netlib/wsgi.py22
8 files changed, 128 insertions, 122 deletions
diff --git a/netlib/http/headers.py b/netlib/http/headers.py
index 9bf4b69d..14888ea9 100644
--- a/netlib/http/headers.py
+++ b/netlib/http/headers.py
@@ -4,7 +4,7 @@ import re
import six
from netlib import multidict
-from netlib import utils
+from netlib import strutils
# See also: http://lucumr.pocoo.org/2013/7/2/the-updated-guide-to-unicode/
@@ -20,7 +20,7 @@ else:
return x.decode("utf-8", "surrogateescape")
def _always_bytes(x):
- return utils.always_bytes(x, "utf-8", "surrogateescape")
+ return strutils.always_bytes(x, "utf-8", "surrogateescape")
class Headers(multidict.MultiDict):
diff --git a/netlib/http/message.py b/netlib/http/message.py
index c51f16a2..b633b671 100644
--- a/netlib/http/message.py
+++ b/netlib/http/message.py
@@ -4,7 +4,7 @@ import warnings
import six
-from netlib import encoding, utils, basetypes
+from netlib import encoding, strutils, basetypes
from netlib.http import headers
if six.PY2: # pragma: no cover
@@ -19,7 +19,7 @@ else:
return x.decode("utf-8", "surrogateescape")
def _always_bytes(x):
- return utils.always_bytes(x, "utf-8", "surrogateescape")
+ return strutils.always_bytes(x, "utf-8", "surrogateescape")
class MessageData(basetypes.Serializable):
@@ -200,7 +200,7 @@ class Message(basetypes.Serializable):
replacements = 0
if self.content:
with decoded(self):
- self.content, replacements = utils.safe_subn(
+ self.content, replacements = strutils.safe_subn(
pattern, repl, self.content, flags=flags
)
replacements += self.headers.replace(pattern, repl, flags)
diff --git a/netlib/http/request.py b/netlib/http/request.py
index 890cf593..91d5f020 100644
--- a/netlib/http/request.py
+++ b/netlib/http/request.py
@@ -7,7 +7,7 @@ from six.moves import urllib
from netlib import encoding
from netlib import multidict
-from netlib import utils
+from netlib import strutils
from netlib.http import multipart
from netlib.http import cookies
from netlib.http import headers as nheaders
@@ -67,7 +67,7 @@ class Request(message.Message):
"""
# TODO: Proper distinction between text and bytes.
c = super(Request, self).replace(pattern, repl, flags)
- self.path, pc = utils.safe_subn(
+ self.path, pc = strutils.safe_subn(
pattern, repl, self.path, flags=flags
)
c += pc
diff --git a/netlib/odict.py b/netlib/odict.py
index 0cd58f65..f9f55991 100644
--- a/netlib/odict.py
+++ b/netlib/odict.py
@@ -3,7 +3,7 @@ import copy
import six
-from netlib import basetypes, utils
+from netlib import basetypes, strutils
class ODict(basetypes.Serializable):
@@ -139,9 +139,9 @@ class ODict(basetypes.Serializable):
"""
new, count = [], 0
for k, v in self.lst:
- k, c = utils.safe_subn(pattern, repl, k, *args, **kwargs)
+ k, c = strutils.safe_subn(pattern, repl, k, *args, **kwargs)
count += c
- v, c = utils.safe_subn(pattern, repl, v, *args, **kwargs)
+ v, c = strutils.safe_subn(pattern, repl, v, *args, **kwargs)
count += c
new.append([k, v])
self.lst = new
diff --git a/netlib/strutils.py b/netlib/strutils.py
new file mode 100644
index 00000000..7a62185b
--- /dev/null
+++ b/netlib/strutils.py
@@ -0,0 +1,103 @@
+import re
+import unicodedata
+import codecs
+
+import six
+
+
+def always_bytes(unicode_or_bytes, *encode_args):
+ if isinstance(unicode_or_bytes, six.text_type):
+ return unicode_or_bytes.encode(*encode_args)
+ return unicode_or_bytes
+
+
+def native(s, *encoding_opts):
+ """
+ Convert :py:class:`bytes` or :py:class:`unicode` to the native
+ :py:class:`str` type, using latin1 encoding if conversion is necessary.
+
+ https://www.python.org/dev/peps/pep-3333/#a-note-on-string-types
+ """
+ if not isinstance(s, (six.binary_type, six.text_type)):
+ raise TypeError("%r is neither bytes nor unicode" % s)
+ if six.PY3:
+ if isinstance(s, six.binary_type):
+ return s.decode(*encoding_opts)
+ else:
+ if isinstance(s, six.text_type):
+ return s.encode(*encoding_opts)
+ return s
+
+
+def clean_bin(s, keep_spacing=True):
+ """
+ Cleans binary data to make it safe to display.
+
+ Args:
+ keep_spacing: If False, tabs and newlines will also be replaced.
+ """
+ if isinstance(s, six.text_type):
+ if keep_spacing:
+ keep = u" \n\r\t"
+ else:
+ keep = u" "
+ return u"".join(
+ ch if (unicodedata.category(ch)[0] not in "CZ" or ch in keep) else u"."
+ for ch in s
+ )
+ else:
+ if keep_spacing:
+ keep = (9, 10, 13) # \t, \n, \r,
+ else:
+ keep = ()
+ return b"".join(
+ six.int2byte(ch) if (31 < ch < 127 or ch in keep) else b"."
+ for ch in six.iterbytes(s)
+ )
+
+
+def safe_subn(pattern, repl, target, *args, **kwargs):
+ """
+ There are Unicode conversion problems with re.subn. We try to smooth
+ that over by casting the pattern and replacement to strings. We really
+ need a better solution that is aware of the actual content ecoding.
+ """
+ return re.subn(str(pattern), str(repl), target, *args, **kwargs)
+
+
+def bytes_to_escaped_str(data):
+ """
+ Take bytes and return a safe string that can be displayed to the user.
+
+ Single quotes are always escaped, double quotes are never escaped:
+ "'" + bytes_to_escaped_str(...) + "'"
+ gives a valid Python string.
+ """
+ # TODO: We may want to support multi-byte characters without escaping them.
+ # One way to do would be calling .decode("utf8", "backslashreplace") first
+ # and then escaping UTF8 control chars (see clean_bin).
+
+ if not isinstance(data, bytes):
+ raise ValueError("data must be bytes, but is {}".format(data.__class__.__name__))
+ # We always insert a double-quote here so that we get a single-quoted string back
+ # https://stackoverflow.com/questions/29019340/why-does-python-use-different-quotes-for-representing-strings-depending-on-their
+ return repr(b'"' + data).lstrip("b")[2:-1]
+
+
+def escaped_str_to_bytes(data):
+ """
+ Take an escaped string and return the unescaped bytes equivalent.
+ """
+ if not isinstance(data, six.string_types):
+ if six.PY2:
+ raise ValueError("data must be str or unicode, but is {}".format(data.__class__.__name__))
+ raise ValueError("data must be str, but is {}".format(data.__class__.__name__))
+
+ if six.PY2:
+ if isinstance(data, unicode):
+ data = data.encode("utf8")
+ return data.decode("string-escape")
+
+ # This one is difficult - we use an undocumented Python API here
+ # as per http://stackoverflow.com/a/23151714/934719
+ return codecs.escape_decode(data)[0]
diff --git a/netlib/utils.py b/netlib/utils.py
index b8408d1d..00e7e5d9 100644
--- a/netlib/utils.py
+++ b/netlib/utils.py
@@ -1,63 +1,12 @@
from __future__ import absolute_import, print_function, division
import os.path
import re
-import codecs
-import unicodedata
import importlib
import inspect
import six
-
-def always_bytes(unicode_or_bytes, *encode_args):
- if isinstance(unicode_or_bytes, six.text_type):
- return unicode_or_bytes.encode(*encode_args)
- return unicode_or_bytes
-
-
-def native(s, *encoding_opts):
- """
- Convert :py:class:`bytes` or :py:class:`unicode` to the native
- :py:class:`str` type, using latin1 encoding if conversion is necessary.
-
- https://www.python.org/dev/peps/pep-3333/#a-note-on-string-types
- """
- if not isinstance(s, (six.binary_type, six.text_type)):
- raise TypeError("%r is neither bytes nor unicode" % s)
- if six.PY3:
- if isinstance(s, six.binary_type):
- return s.decode(*encoding_opts)
- else:
- if isinstance(s, six.text_type):
- return s.encode(*encoding_opts)
- return s
-
-
-def clean_bin(s, keep_spacing=True):
- """
- Cleans binary data to make it safe to display.
-
- Args:
- keep_spacing: If False, tabs and newlines will also be replaced.
- """
- if isinstance(s, six.text_type):
- if keep_spacing:
- keep = u" \n\r\t"
- else:
- keep = u" "
- return u"".join(
- ch if (unicodedata.category(ch)[0] not in "CZ" or ch in keep) else u"."
- for ch in s
- )
- else:
- if keep_spacing:
- keep = (9, 10, 13) # \t, \n, \r,
- else:
- keep = ()
- return b"".join(
- six.int2byte(ch) if (31 < ch < 127 or ch in keep) else b"."
- for ch in six.iterbytes(s)
- )
+from netlib import strutils
def hexdump(s):
@@ -70,7 +19,7 @@ def hexdump(s):
part = s[i:i + 16]
x = b" ".join("{:0=2x}".format(i).encode() for i in six.iterbytes(part))
x = x.ljust(47) # 16*2 + 15
- yield (offset, x, clean_bin(part, False))
+ yield (offset, x, strutils.clean_bin(part, False))
def setbit(byte, offset, value):
@@ -173,50 +122,3 @@ def hostport(scheme, host, port):
return b"%s:%d" % (host, port)
else:
return "%s:%d" % (host, port)
-
-
-def safe_subn(pattern, repl, target, *args, **kwargs):
- """
- There are Unicode conversion problems with re.subn. We try to smooth
- that over by casting the pattern and replacement to strings. We really
- need a better solution that is aware of the actual content ecoding.
- """
- return re.subn(str(pattern), str(repl), target, *args, **kwargs)
-
-
-def bytes_to_escaped_str(data):
- """
- Take bytes and return a safe string that can be displayed to the user.
-
- Single quotes are always escaped, double quotes are never escaped:
- "'" + bytes_to_escaped_str(...) + "'"
- gives a valid Python string.
- """
- # TODO: We may want to support multi-byte characters without escaping them.
- # One way to do would be calling .decode("utf8", "backslashreplace") first
- # and then escaping UTF8 control chars (see clean_bin).
-
- if not isinstance(data, bytes):
- raise ValueError("data must be bytes, but is {}".format(data.__class__.__name__))
- # We always insert a double-quote here so that we get a single-quoted string back
- # https://stackoverflow.com/questions/29019340/why-does-python-use-different-quotes-for-representing-strings-depending-on-their
- return repr(b'"' + data).lstrip("b")[2:-1]
-
-
-def escaped_str_to_bytes(data):
- """
- Take an escaped string and return the unescaped bytes equivalent.
- """
- if not isinstance(data, six.string_types):
- if six.PY2:
- raise ValueError("data must be str or unicode, but is {}".format(data.__class__.__name__))
- raise ValueError("data must be str, but is {}".format(data.__class__.__name__))
-
- if six.PY2:
- if isinstance(data, unicode):
- data = data.encode("utf8")
- return data.decode("string-escape")
-
- # This one is difficult - we use an undocumented Python API here
- # as per http://stackoverflow.com/a/23151714/934719
- return codecs.escape_decode(data)[0]
diff --git a/netlib/websockets/frame.py b/netlib/websockets/frame.py
index deb0ce33..42196ffb 100644
--- a/netlib/websockets/frame.py
+++ b/netlib/websockets/frame.py
@@ -7,6 +7,7 @@ import warnings
import six
from netlib import tcp
+from netlib import strutils
from netlib import utils
from netlib import human
from netlib.websockets import protocol
@@ -254,7 +255,7 @@ class Frame(object):
def __repr__(self):
ret = repr(self.header)
if self.payload:
- ret = ret + "\nPayload:\n" + utils.clean_bin(self.payload).decode("ascii")
+ ret = ret + "\nPayload:\n" + strutils.clean_bin(self.payload).decode("ascii")
return ret
def human_readable(self):
diff --git a/netlib/wsgi.py b/netlib/wsgi.py
index 7661388b..c66fddc2 100644
--- a/netlib/wsgi.py
+++ b/netlib/wsgi.py
@@ -6,7 +6,7 @@ import six
from io import BytesIO
from six.moves import urllib
-from netlib import http, tcp, utils
+from netlib import http, tcp, strutils
class ClientConn(object):
@@ -54,38 +54,38 @@ class WSGIAdaptor(object):
self.app, self.domain, self.port, self.sversion = app, domain, port, sversion
def make_environ(self, flow, errsoc, **extra):
- path = utils.native(flow.request.path, "latin-1")
+ path = strutils.native(flow.request.path, "latin-1")
if '?' in path:
- path_info, query = utils.native(path, "latin-1").split('?', 1)
+ path_info, query = strutils.native(path, "latin-1").split('?', 1)
else:
path_info = path
query = ''
environ = {
'wsgi.version': (1, 0),
- 'wsgi.url_scheme': utils.native(flow.request.scheme, "latin-1"),
+ 'wsgi.url_scheme': strutils.native(flow.request.scheme, "latin-1"),
'wsgi.input': BytesIO(flow.request.content or b""),
'wsgi.errors': errsoc,
'wsgi.multithread': True,
'wsgi.multiprocess': False,
'wsgi.run_once': False,
'SERVER_SOFTWARE': self.sversion,
- 'REQUEST_METHOD': utils.native(flow.request.method, "latin-1"),
+ 'REQUEST_METHOD': strutils.native(flow.request.method, "latin-1"),
'SCRIPT_NAME': '',
'PATH_INFO': urllib.parse.unquote(path_info),
'QUERY_STRING': query,
- 'CONTENT_TYPE': utils.native(flow.request.headers.get('Content-Type', ''), "latin-1"),
- 'CONTENT_LENGTH': utils.native(flow.request.headers.get('Content-Length', ''), "latin-1"),
+ 'CONTENT_TYPE': strutils.native(flow.request.headers.get('Content-Type', ''), "latin-1"),
+ 'CONTENT_LENGTH': strutils.native(flow.request.headers.get('Content-Length', ''), "latin-1"),
'SERVER_NAME': self.domain,
'SERVER_PORT': str(self.port),
- 'SERVER_PROTOCOL': utils.native(flow.request.http_version, "latin-1"),
+ 'SERVER_PROTOCOL': strutils.native(flow.request.http_version, "latin-1"),
}
environ.update(extra)
if flow.client_conn.address:
- environ["REMOTE_ADDR"] = utils.native(flow.client_conn.address.host, "latin-1")
+ environ["REMOTE_ADDR"] = strutils.native(flow.client_conn.address.host, "latin-1")
environ["REMOTE_PORT"] = flow.client_conn.address.port
for key, value in flow.request.headers.items():
- key = 'HTTP_' + utils.native(key, "latin-1").upper().replace('-', '_')
+ key = 'HTTP_' + strutils.native(key, "latin-1").upper().replace('-', '_')
if key not in ('HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH'):
environ[key] = value
return environ
@@ -139,7 +139,7 @@ class WSGIAdaptor(object):
elif state["status"]:
raise AssertionError('Response already started')
state["status"] = status
- state["headers"] = http.Headers([[utils.always_bytes(k), utils.always_bytes(v)] for k, v in headers])
+ state["headers"] = http.Headers([[strutils.always_bytes(k), strutils.always_bytes(v)] for k, v in headers])
if exc_info:
self.error_page(soc, state["headers_sent"], traceback.format_tb(exc_info[2]))
state["headers_sent"] = True