aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@corte.si>2016-06-02 13:14:56 +1200
committerAldo Cortesi <aldo@corte.si>2016-06-02 13:14:56 +1200
commit07f7905f9182ad0098a5a2e37d40ce9b27298380 (patch)
tree76ba04357353645f49e0f4bd8d835e471239b54b
parenteaa3b308f7bb48256ccf56ea07d008fa5f9dd6ad (diff)
parent09da1febbd9beac5ef5650274899439f5ce10e98 (diff)
downloadmitmproxy-07f7905f9182ad0098a5a2e37d40ce9b27298380.tar.gz
mitmproxy-07f7905f9182ad0098a5a2e37d40ce9b27298380.tar.bz2
mitmproxy-07f7905f9182ad0098a5a2e37d40ce9b27298380.zip
Merge pull request #1191 from cortesi/utils
Utils reorganisation
-rw-r--r--examples/custom_contentviews.py5
-rw-r--r--examples/tcp_message.py4
-rw-r--r--mitmproxy/contentviews.py14
-rw-r--r--mitmproxy/dump.py14
-rw-r--r--mitmproxy/flow/master.py4
-rw-r--r--mitmproxy/utils.py38
-rw-r--r--netlib/http/headers.py4
-rw-r--r--netlib/http/message.py6
-rw-r--r--netlib/http/request.py4
-rw-r--r--netlib/odict.py6
-rw-r--r--netlib/strutils.py154
-rw-r--r--netlib/utils.py113
-rw-r--r--netlib/websockets/frame.py3
-rw-r--r--netlib/wsgi.py22
-rw-r--r--pathod/app.py5
-rw-r--r--pathod/language/base.py14
-rw-r--r--pathod/log.py8
-rw-r--r--pathod/pathoc.py9
-rw-r--r--pathod/utils.py18
-rw-r--r--test/mitmproxy/test_utils.py19
-rw-r--r--test/netlib/test_strutils.py63
-rw-r--r--test/netlib/test_utils.py41
-rw-r--r--test/pathod/test_utils.py11
23 files changed, 281 insertions, 298 deletions
diff --git a/examples/custom_contentviews.py b/examples/custom_contentviews.py
index 6cc9314c..034f356c 100644
--- a/examples/custom_contentviews.py
+++ b/examples/custom_contentviews.py
@@ -1,7 +1,8 @@
import string
import lxml.html
import lxml.etree
-from mitmproxy import utils, contentviews
+from mitmproxy import contentviews
+from netlib import strutils
class ViewPigLatin(contentviews.View):
@@ -10,7 +11,7 @@ class ViewPigLatin(contentviews.View):
content_types = ["text/html"]
def __call__(self, data, **metadata):
- if utils.isXML(data):
+ if strutils.isXML(data):
parser = lxml.etree.HTMLParser(
strip_cdata=True,
remove_blank_text=True
diff --git a/examples/tcp_message.py b/examples/tcp_message.py
index 52471ae9..2c210618 100644
--- a/examples/tcp_message.py
+++ b/examples/tcp_message.py
@@ -8,7 +8,7 @@ tcp_message Inline Script Hook API Demonstration
example cmdline invocation:
mitmdump -T --host --tcp ".*" -q -s examples/tcp_message.py
'''
-from netlib.utils import clean_bin
+from netlib import strutils
def tcp_message(ctx, tcp_msg):
@@ -22,4 +22,4 @@ def tcp_message(ctx, tcp_msg):
"client" if tcp_msg.sender == tcp_msg.client_conn else "server",
tcp_msg.sender.address,
"server" if tcp_msg.receiver == tcp_msg.server_conn else "client",
- tcp_msg.receiver.address, clean_bin(tcp_msg.message)))
+ tcp_msg.receiver.address, strutils.clean_bin(tcp_msg.message)))
diff --git a/mitmproxy/contentviews.py b/mitmproxy/contentviews.py
index 87cb2aa0..42061a8c 100644
--- a/mitmproxy/contentviews.py
+++ b/mitmproxy/contentviews.py
@@ -36,7 +36,7 @@ from netlib import encoding
from netlib import http
from netlib import odict
from netlib.http import url
-import netlib.utils
+from netlib import strutils
try:
import pyamf
@@ -129,11 +129,11 @@ class ViewAuto(View):
ct = "%s/%s" % (ct[0], ct[1])
if ct in content_types_map:
return content_types_map[ct][0](data, **metadata)
- elif mitmproxy.utils.isXML(data):
+ elif strutils.isXML(data):
return get("XML")(data, **metadata)
if metadata.get("query"):
return get("Query")(data, **metadata)
- if data and mitmproxy.utils.isMostlyBin(data):
+ if data and strutils.isMostlyBin(data):
return get("Hex")(data)
if not data:
return "No content", []
@@ -156,7 +156,7 @@ class ViewHex(View):
@staticmethod
def _format(data):
- for offset, hexa, s in netlib.utils.hexdump(data):
+ for offset, hexa, s in strutils.hexdump(data):
yield [
("offset", offset + " "),
("text", hexa + " "),
@@ -226,7 +226,7 @@ class ViewHTML(View):
content_types = ["text/html"]
def __call__(self, data, **metadata):
- if mitmproxy.utils.isXML(data):
+ if strutils.isXML(data):
parser = lxml.etree.HTMLParser(
strip_cdata=True,
remove_blank_text=True
@@ -581,9 +581,9 @@ def safe_to_print(lines, encoding="utf8"):
clean_line = []
for (style, text) in line:
try:
- text = netlib.utils.clean_bin(text.decode(encoding, "strict"))
+ text = strutils.clean_bin(text.decode(encoding, "strict"))
except UnicodeDecodeError:
- text = netlib.utils.clean_bin(text).decode(encoding, "strict")
+ text = strutils.clean_bin(text).decode(encoding, "strict")
clean_line.append((style, text))
yield clean_line
diff --git a/mitmproxy/dump.py b/mitmproxy/dump.py
index f5d6725c..b1005ee7 100644
--- a/mitmproxy/dump.py
+++ b/mitmproxy/dump.py
@@ -13,7 +13,7 @@ from mitmproxy import filt
from mitmproxy import flow
from netlib import human
from netlib import tcp
-from netlib import utils
+from netlib import strutils
class DumpError(Exception):
@@ -181,8 +181,8 @@ class DumpMaster(flow.FlowMaster):
if self.o.flow_detail >= 2:
headers = "\r\n".join(
"{}: {}".format(
- click.style(utils.bytes_to_escaped_str(k), fg="blue", bold=True),
- click.style(utils.bytes_to_escaped_str(v), fg="blue"))
+ click.style(strutils.bytes_to_escaped_str(k), fg="blue", bold=True),
+ click.style(strutils.bytes_to_escaped_str(v), fg="blue"))
for k, v in message.headers.fields
)
self.echo(headers, indent=4)
@@ -244,7 +244,7 @@ class DumpMaster(flow.FlowMaster):
stickycookie = ""
if flow.client_conn:
- client = click.style(utils.bytes_to_escaped_str(flow.client_conn.address.host), bold=True)
+ client = click.style(strutils.bytes_to_escaped_str(flow.client_conn.address.host), bold=True)
else:
client = click.style("[replay]", fg="yellow", bold=True)
@@ -253,12 +253,12 @@ class DumpMaster(flow.FlowMaster):
GET="green",
DELETE="red"
).get(method.upper(), "magenta")
- method = click.style(utils.bytes_to_escaped_str(method), fg=method_color, bold=True)
+ method = click.style(strutils.bytes_to_escaped_str(method), fg=method_color, bold=True)
if self.showhost:
url = flow.request.pretty_url
else:
url = flow.request.url
- url = click.style(utils.bytes_to_escaped_str(url), bold=True)
+ url = click.style(strutils.bytes_to_escaped_str(url), bold=True)
httpversion = ""
if flow.request.http_version not in ("HTTP/1.1", "HTTP/1.0"):
@@ -288,7 +288,7 @@ class DumpMaster(flow.FlowMaster):
elif 400 <= code < 600:
code_color = "red"
code = click.style(str(code), fg=code_color, bold=True, blink=(code == 418))
- reason = click.style(utils.bytes_to_escaped_str(flow.response.reason), fg=code_color, bold=True)
+ reason = click.style(strutils.bytes_to_escaped_str(flow.response.reason), fg=code_color, bold=True)
if flow.response.content is None:
size = "(content missing)"
diff --git a/mitmproxy/flow/master.py b/mitmproxy/flow/master.py
index a198018e..ec0bf36d 100644
--- a/mitmproxy/flow/master.py
+++ b/mitmproxy/flow/master.py
@@ -16,7 +16,7 @@ from mitmproxy.flow import modules
from mitmproxy.onboarding import app
from mitmproxy.protocol import http_replay
from mitmproxy.proxy.config import HostMatcher
-from netlib import utils
+from netlib import strutils
class FlowMaster(controller.Master):
@@ -499,7 +499,7 @@ class FlowMaster(controller.Master):
server=repr(flow.server_conn.address),
direction=direction,
), "info")
- self.add_event(utils.clean_bin(message.content), "debug")
+ self.add_event(strutils.clean_bin(message.content), "debug")
@controller.handler
def tcp_error(self, flow):
diff --git a/mitmproxy/utils.py b/mitmproxy/utils.py
index 672805d0..680bc495 100644
--- a/mitmproxy/utils.py
+++ b/mitmproxy/utils.py
@@ -25,32 +25,6 @@ def format_timestamp_with_milli(s):
return d.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
-def isBin(s):
- """
- Does this string have any non-ASCII characters?
- """
- for i in s:
- i = ord(i)
- if i < 9 or 13 < i < 32 or 126 < i:
- return True
- return False
-
-
-def isMostlyBin(s):
- s = s[:100]
- return sum(isBin(ch) for ch in s) / len(s) > 0.3
-
-
-def isXML(s):
- for i in s:
- if i in "\n \t":
- continue
- elif i == "<":
- return True
- else:
- return False
-
-
def pretty_json(s):
try:
p = json.loads(s)
@@ -92,15 +66,3 @@ class LRUCache:
d = self.cacheList.pop()
self.cache.pop(d)
return ret
-
-
-def clean_hanging_newline(t):
- """
- Many editors will silently add a newline to the final line of a
- document (I'm looking at you, Vim). This function fixes this common
- problem at the risk of removing a hanging newline in the rare cases
- where the user actually intends it.
- """
- if t and t[-1] == "\n":
- return t[:-1]
- return t
diff --git a/netlib/http/headers.py b/netlib/http/headers.py
index 9bf4b69d..14888ea9 100644
--- a/netlib/http/headers.py
+++ b/netlib/http/headers.py
@@ -4,7 +4,7 @@ import re
import six
from netlib import multidict
-from netlib import utils
+from netlib import strutils
# See also: http://lucumr.pocoo.org/2013/7/2/the-updated-guide-to-unicode/
@@ -20,7 +20,7 @@ else:
return x.decode("utf-8", "surrogateescape")
def _always_bytes(x):
- return utils.always_bytes(x, "utf-8", "surrogateescape")
+ return strutils.always_bytes(x, "utf-8", "surrogateescape")
class Headers(multidict.MultiDict):
diff --git a/netlib/http/message.py b/netlib/http/message.py
index c51f16a2..b633b671 100644
--- a/netlib/http/message.py
+++ b/netlib/http/message.py
@@ -4,7 +4,7 @@ import warnings
import six
-from netlib import encoding, utils, basetypes
+from netlib import encoding, strutils, basetypes
from netlib.http import headers
if six.PY2: # pragma: no cover
@@ -19,7 +19,7 @@ else:
return x.decode("utf-8", "surrogateescape")
def _always_bytes(x):
- return utils.always_bytes(x, "utf-8", "surrogateescape")
+ return strutils.always_bytes(x, "utf-8", "surrogateescape")
class MessageData(basetypes.Serializable):
@@ -200,7 +200,7 @@ class Message(basetypes.Serializable):
replacements = 0
if self.content:
with decoded(self):
- self.content, replacements = utils.safe_subn(
+ self.content, replacements = strutils.safe_subn(
pattern, repl, self.content, flags=flags
)
replacements += self.headers.replace(pattern, repl, flags)
diff --git a/netlib/http/request.py b/netlib/http/request.py
index 890cf593..91d5f020 100644
--- a/netlib/http/request.py
+++ b/netlib/http/request.py
@@ -7,7 +7,7 @@ from six.moves import urllib
from netlib import encoding
from netlib import multidict
-from netlib import utils
+from netlib import strutils
from netlib.http import multipart
from netlib.http import cookies
from netlib.http import headers as nheaders
@@ -67,7 +67,7 @@ class Request(message.Message):
"""
# TODO: Proper distinction between text and bytes.
c = super(Request, self).replace(pattern, repl, flags)
- self.path, pc = utils.safe_subn(
+ self.path, pc = strutils.safe_subn(
pattern, repl, self.path, flags=flags
)
c += pc
diff --git a/netlib/odict.py b/netlib/odict.py
index 0cd58f65..f9f55991 100644
--- a/netlib/odict.py
+++ b/netlib/odict.py
@@ -3,7 +3,7 @@ import copy
import six
-from netlib import basetypes, utils
+from netlib import basetypes, strutils
class ODict(basetypes.Serializable):
@@ -139,9 +139,9 @@ class ODict(basetypes.Serializable):
"""
new, count = [], 0
for k, v in self.lst:
- k, c = utils.safe_subn(pattern, repl, k, *args, **kwargs)
+ k, c = strutils.safe_subn(pattern, repl, k, *args, **kwargs)
count += c
- v, c = utils.safe_subn(pattern, repl, v, *args, **kwargs)
+ v, c = strutils.safe_subn(pattern, repl, v, *args, **kwargs)
count += c
new.append([k, v])
self.lst = new
diff --git a/netlib/strutils.py b/netlib/strutils.py
new file mode 100644
index 00000000..03b371f5
--- /dev/null
+++ b/netlib/strutils.py
@@ -0,0 +1,154 @@
+import re
+import unicodedata
+import codecs
+
+import six
+
+
+def always_bytes(unicode_or_bytes, *encode_args):
+ if isinstance(unicode_or_bytes, six.text_type):
+ return unicode_or_bytes.encode(*encode_args)
+ return unicode_or_bytes
+
+
+def native(s, *encoding_opts):
+ """
+ Convert :py:class:`bytes` or :py:class:`unicode` to the native
+ :py:class:`str` type, using latin1 encoding if conversion is necessary.
+
+ https://www.python.org/dev/peps/pep-3333/#a-note-on-string-types
+ """
+ if not isinstance(s, (six.binary_type, six.text_type)):
+ raise TypeError("%r is neither bytes nor unicode" % s)
+ if six.PY3:
+ if isinstance(s, six.binary_type):
+ return s.decode(*encoding_opts)
+ else:
+ if isinstance(s, six.text_type):
+ return s.encode(*encoding_opts)
+ return s
+
+
+def clean_bin(s, keep_spacing=True):
+ """
+ Cleans binary data to make it safe to display.
+
+ Args:
+ keep_spacing: If False, tabs and newlines will also be replaced.
+ """
+ if isinstance(s, six.text_type):
+ if keep_spacing:
+ keep = u" \n\r\t"
+ else:
+ keep = u" "
+ return u"".join(
+ ch if (unicodedata.category(ch)[0] not in "CZ" or ch in keep) else u"."
+ for ch in s
+ )
+ else:
+ if keep_spacing:
+ keep = (9, 10, 13) # \t, \n, \r,
+ else:
+ keep = ()
+ return b"".join(
+ six.int2byte(ch) if (31 < ch < 127 or ch in keep) else b"."
+ for ch in six.iterbytes(s)
+ )
+
+
+def safe_subn(pattern, repl, target, *args, **kwargs):
+ """
+ There are Unicode conversion problems with re.subn. We try to smooth
+ that over by casting the pattern and replacement to strings. We really
+ need a better solution that is aware of the actual content ecoding.
+ """
+ return re.subn(str(pattern), str(repl), target, *args, **kwargs)
+
+
+def bytes_to_escaped_str(data):
+ """
+ Take bytes and return a safe string that can be displayed to the user.
+
+ Single quotes are always escaped, double quotes are never escaped:
+ "'" + bytes_to_escaped_str(...) + "'"
+ gives a valid Python string.
+ """
+ # TODO: We may want to support multi-byte characters without escaping them.
+ # One way to do would be calling .decode("utf8", "backslashreplace") first
+ # and then escaping UTF8 control chars (see clean_bin).
+
+ if not isinstance(data, bytes):
+ raise ValueError("data must be bytes, but is {}".format(data.__class__.__name__))
+ # We always insert a double-quote here so that we get a single-quoted string back
+ # https://stackoverflow.com/questions/29019340/why-does-python-use-different-quotes-for-representing-strings-depending-on-their
+ return repr(b'"' + data).lstrip("b")[2:-1]
+
+
+def escaped_str_to_bytes(data):
+ """
+ Take an escaped string and return the unescaped bytes equivalent.
+ """
+ if not isinstance(data, six.string_types):
+ if six.PY2:
+ raise ValueError("data must be str or unicode, but is {}".format(data.__class__.__name__))
+ raise ValueError("data must be str, but is {}".format(data.__class__.__name__))
+
+ if six.PY2:
+ if isinstance(data, unicode):
+ data = data.encode("utf8")
+ return data.decode("string-escape")
+
+ # This one is difficult - we use an undocumented Python API here
+ # as per http://stackoverflow.com/a/23151714/934719
+ return codecs.escape_decode(data)[0]
+
+
+def isBin(s):
+ """
+ Does this string have any non-ASCII characters?
+ """
+ for i in s:
+ i = ord(i)
+ if i < 9 or 13 < i < 32 or 126 < i:
+ return True
+ return False
+
+
+def isMostlyBin(s):
+ s = s[:100]
+ return sum(isBin(ch) for ch in s) / len(s) > 0.3
+
+
+def isXML(s):
+ for i in s:
+ if i in "\n \t":
+ continue
+ elif i == "<":
+ return True
+ else:
+ return False
+
+
+def clean_hanging_newline(t):
+ """
+ Many editors will silently add a newline to the final line of a
+ document (I'm looking at you, Vim). This function fixes this common
+ problem at the risk of removing a hanging newline in the rare cases
+ where the user actually intends it.
+ """
+ if t and t[-1] == "\n":
+ return t[:-1]
+ return t
+
+
+def hexdump(s):
+ """
+ Returns:
+ A generator of (offset, hex, str) tuples
+ """
+ for i in range(0, len(s), 16):
+ offset = "{:0=10x}".format(i).encode()
+ part = s[i:i + 16]
+ x = b" ".join("{:0=2x}".format(i).encode() for i in six.iterbytes(part))
+ x = x.ljust(47) # 16*2 + 15
+ yield (offset, x, clean_bin(part, False))
diff --git a/netlib/utils.py b/netlib/utils.py
index b8408d1d..b4b99679 100644
--- a/netlib/utils.py
+++ b/netlib/utils.py
@@ -1,78 +1,12 @@
from __future__ import absolute_import, print_function, division
import os.path
import re
-import codecs
-import unicodedata
import importlib
import inspect
import six
-def always_bytes(unicode_or_bytes, *encode_args):
- if isinstance(unicode_or_bytes, six.text_type):
- return unicode_or_bytes.encode(*encode_args)
- return unicode_or_bytes
-
-
-def native(s, *encoding_opts):
- """
- Convert :py:class:`bytes` or :py:class:`unicode` to the native
- :py:class:`str` type, using latin1 encoding if conversion is necessary.
-
- https://www.python.org/dev/peps/pep-3333/#a-note-on-string-types
- """
- if not isinstance(s, (six.binary_type, six.text_type)):
- raise TypeError("%r is neither bytes nor unicode" % s)
- if six.PY3:
- if isinstance(s, six.binary_type):
- return s.decode(*encoding_opts)
- else:
- if isinstance(s, six.text_type):
- return s.encode(*encoding_opts)
- return s
-
-
-def clean_bin(s, keep_spacing=True):
- """
- Cleans binary data to make it safe to display.
-
- Args:
- keep_spacing: If False, tabs and newlines will also be replaced.
- """
- if isinstance(s, six.text_type):
- if keep_spacing:
- keep = u" \n\r\t"
- else:
- keep = u" "
- return u"".join(
- ch if (unicodedata.category(ch)[0] not in "CZ" or ch in keep) else u"."
- for ch in s
- )
- else:
- if keep_spacing:
- keep = (9, 10, 13) # \t, \n, \r,
- else:
- keep = ()
- return b"".join(
- six.int2byte(ch) if (31 < ch < 127 or ch in keep) else b"."
- for ch in six.iterbytes(s)
- )
-
-
-def hexdump(s):
- """
- Returns:
- A generator of (offset, hex, str) tuples
- """
- for i in range(0, len(s), 16):
- offset = "{:0=10x}".format(i).encode()
- part = s[i:i + 16]
- x = b" ".join("{:0=2x}".format(i).encode() for i in six.iterbytes(part))
- x = x.ljust(47) # 16*2 + 15
- yield (offset, x, clean_bin(part, False))
-
-
def setbit(byte, offset, value):
"""
Set a bit in a byte to 1 if value is truthy, 0 if not.
@@ -173,50 +107,3 @@ def hostport(scheme, host, port):
return b"%s:%d" % (host, port)
else:
return "%s:%d" % (host, port)
-
-
-def safe_subn(pattern, repl, target, *args, **kwargs):
- """
- There are Unicode conversion problems with re.subn. We try to smooth
- that over by casting the pattern and replacement to strings. We really
- need a better solution that is aware of the actual content ecoding.
- """
- return re.subn(str(pattern), str(repl), target, *args, **kwargs)
-
-
-def bytes_to_escaped_str(data):
- """
- Take bytes and return a safe string that can be displayed to the user.
-
- Single quotes are always escaped, double quotes are never escaped:
- "'" + bytes_to_escaped_str(...) + "'"
- gives a valid Python string.
- """
- # TODO: We may want to support multi-byte characters without escaping them.
- # One way to do would be calling .decode("utf8", "backslashreplace") first
- # and then escaping UTF8 control chars (see clean_bin).
-
- if not isinstance(data, bytes):
- raise ValueError("data must be bytes, but is {}".format(data.__class__.__name__))
- # We always insert a double-quote here so that we get a single-quoted string back
- # https://stackoverflow.com/questions/29019340/why-does-python-use-different-quotes-for-representing-strings-depending-on-their
- return repr(b'"' + data).lstrip("b")[2:-1]
-
-
-def escaped_str_to_bytes(data):
- """
- Take an escaped string and return the unescaped bytes equivalent.
- """
- if not isinstance(data, six.string_types):
- if six.PY2:
- raise ValueError("data must be str or unicode, but is {}".format(data.__class__.__name__))
- raise ValueError("data must be str, but is {}".format(data.__class__.__name__))
-
- if six.PY2:
- if isinstance(data, unicode):
- data = data.encode("utf8")
- return data.decode("string-escape")
-
- # This one is difficult - we use an undocumented Python API here
- # as per http://stackoverflow.com/a/23151714/934719
- return codecs.escape_decode(data)[0]
diff --git a/netlib/websockets/frame.py b/netlib/websockets/frame.py
index deb0ce33..42196ffb 100644
--- a/netlib/websockets/frame.py
+++ b/netlib/websockets/frame.py
@@ -7,6 +7,7 @@ import warnings
import six
from netlib import tcp
+from netlib import strutils
from netlib import utils
from netlib import human
from netlib.websockets import protocol
@@ -254,7 +255,7 @@ class Frame(object):
def __repr__(self):
ret = repr(self.header)
if self.payload:
- ret = ret + "\nPayload:\n" + utils.clean_bin(self.payload).decode("ascii")
+ ret = ret + "\nPayload:\n" + strutils.clean_bin(self.payload).decode("ascii")
return ret
def human_readable(self):
diff --git a/netlib/wsgi.py b/netlib/wsgi.py
index 7661388b..c66fddc2 100644
--- a/netlib/wsgi.py
+++ b/netlib/wsgi.py
@@ -6,7 +6,7 @@ import six
from io import BytesIO
from six.moves import urllib
-from netlib import http, tcp, utils
+from netlib import http, tcp, strutils
class ClientConn(object):
@@ -54,38 +54,38 @@ class WSGIAdaptor(object):
self.app, self.domain, self.port, self.sversion = app, domain, port, sversion
def make_environ(self, flow, errsoc, **extra):
- path = utils.native(flow.request.path, "latin-1")
+ path = strutils.native(flow.request.path, "latin-1")
if '?' in path:
- path_info, query = utils.native(path, "latin-1").split('?', 1)
+ path_info, query = strutils.native(path, "latin-1").split('?', 1)
else:
path_info = path
query = ''
environ = {
'wsgi.version': (1, 0),
- 'wsgi.url_scheme': utils.native(flow.request.scheme, "latin-1"),
+ 'wsgi.url_scheme': strutils.native(flow.request.scheme, "latin-1"),
'wsgi.input': BytesIO(flow.request.content or b""),
'wsgi.errors': errsoc,
'wsgi.multithread': True,
'wsgi.multiprocess': False,
'wsgi.run_once': False,
'SERVER_SOFTWARE': self.sversion,
- 'REQUEST_METHOD': utils.native(flow.request.method, "latin-1"),
+ 'REQUEST_METHOD': strutils.native(flow.request.method, "latin-1"),
'SCRIPT_NAME': '',
'PATH_INFO': urllib.parse.unquote(path_info),
'QUERY_STRING': query,
- 'CONTENT_TYPE': utils.native(flow.request.headers.get('Content-Type', ''), "latin-1"),
- 'CONTENT_LENGTH': utils.native(flow.request.headers.get('Content-Length', ''), "latin-1"),
+ 'CONTENT_TYPE': strutils.native(flow.request.headers.get('Content-Type', ''), "latin-1"),
+ 'CONTENT_LENGTH': strutils.native(flow.request.headers.get('Content-Length', ''), "latin-1"),
'SERVER_NAME': self.domain,
'SERVER_PORT': str(self.port),
- 'SERVER_PROTOCOL': utils.native(flow.request.http_version, "latin-1"),
+ 'SERVER_PROTOCOL': strutils.native(flow.request.http_version, "latin-1"),
}
environ.update(extra)
if flow.client_conn.address:
- environ["REMOTE_ADDR"] = utils.native(flow.client_conn.address.host, "latin-1")
+ environ["REMOTE_ADDR"] = strutils.native(flow.client_conn.address.host, "latin-1")
environ["REMOTE_PORT"] = flow.client_conn.address.port
for key, value in flow.request.headers.items():
- key = 'HTTP_' + utils.native(key, "latin-1").upper().replace('-', '_')
+ key = 'HTTP_' + strutils.native(key, "latin-1").upper().replace('-', '_')
if key not in ('HTTP_CONTENT_TYPE', 'HTTP_CONTENT_LENGTH'):
environ[key] = value
return environ
@@ -139,7 +139,7 @@ class WSGIAdaptor(object):
elif state["status"]:
raise AssertionError('Response already started')
state["status"] = status
- state["headers"] = http.Headers([[utils.always_bytes(k), utils.always_bytes(v)] for k, v in headers])
+ state["headers"] = http.Headers([[strutils.always_bytes(k), strutils.always_bytes(v)] for k, v in headers])
if exc_info:
self.error_page(soc, state["headers_sent"], traceback.format_tb(exc_info[2]))
state["headers_sent"] = True
diff --git a/pathod/app.py b/pathod/app.py
index 7e9860b9..e3216c58 100644
--- a/pathod/app.py
+++ b/pathod/app.py
@@ -3,8 +3,9 @@ import pprint
import io
import copy
from flask import Flask, jsonify, render_template, request, abort, make_response
-from . import version, language, utils
+from . import version, language
from netlib.http import user_agents
+from netlib import strutils
logging.basicConfig(level="DEBUG")
EXAMPLE_HOST = "example.com"
@@ -166,7 +167,7 @@ def make_app(noapi, debug):
settings.websocket_key = EXAMPLE_WEBSOCKET_KEY
language.serve(safe, s, settings)
- args["output"] = utils.escape_unprintables(s.getvalue())
+ args["output"] = strutils.bytes_to_escaped_str(s.getvalue())
return render(template, False, **args)
@app.route('/response_preview')
diff --git a/pathod/language/base.py b/pathod/language/base.py
index 1f4edb6f..11ee0623 100644
--- a/pathod/language/base.py
+++ b/pathod/language/base.py
@@ -5,7 +5,7 @@ import pyparsing as pp
import six
from six.moves import reduce
-from netlib.utils import escaped_str_to_bytes, bytes_to_escaped_str
+from netlib import strutils
from netlib import human
from . import generators, exceptions
@@ -110,7 +110,7 @@ class Token(object):
class _TokValueLiteral(Token):
def __init__(self, val):
- self.val = escaped_str_to_bytes(val)
+ self.val = strutils.escaped_str_to_bytes(val)
def get_generator(self, settings_):
return self.val
@@ -135,7 +135,7 @@ class TokValueLiteral(_TokValueLiteral):
return v
def spec(self):
- inner = bytes_to_escaped_str(self.val)
+ inner = strutils.bytes_to_escaped_str(self.val)
inner = inner.replace(r"\'", r"\x27")
return "'" + inner + "'"
@@ -148,7 +148,7 @@ class TokValueNakedLiteral(_TokValueLiteral):
return e.setParseAction(lambda x: cls(*x))
def spec(self):
- return bytes_to_escaped_str(self.val)
+ return strutils.bytes_to_escaped_str(self.val)
class TokValueGenerate(Token):
@@ -166,7 +166,7 @@ class TokValueGenerate(Token):
def freeze(self, settings):
g = self.get_generator(settings)
- return TokValueLiteral(bytes_to_escaped_str(g[:]))
+ return TokValueLiteral(strutils.bytes_to_escaped_str(g[:]))
@classmethod
def expr(cls):
@@ -226,7 +226,7 @@ class TokValueFile(Token):
return generators.FileGenerator(s)
def spec(self):
- return "<'%s'" % bytes_to_escaped_str(self.path)
+ return "<'%s'" % strutils.bytes_to_escaped_str(self.path)
TokValue = pp.MatchFirst(
@@ -578,4 +578,4 @@ class NestedMessage(Token):
def freeze(self, settings):
f = self.parsed.freeze(settings).spec()
- return self.__class__(TokValueLiteral(bytes_to_escaped_str(f)))
+ return self.__class__(TokValueLiteral(strutils.bytes_to_escaped_str(f)))
diff --git a/pathod/log.py b/pathod/log.py
index 3f6aaea0..5bf55de4 100644
--- a/pathod/log.py
+++ b/pathod/log.py
@@ -2,9 +2,7 @@ import datetime
import six
-import netlib.utils
-import netlib.tcp
-import netlib.http
+from netlib import strutils
TIMEFMT = '%d-%m-%y %H:%M:%S'
@@ -62,10 +60,10 @@ class LogCtx(object):
def dump(self, data, hexdump):
if hexdump:
- for line in netlib.utils.hexdump(data):
+ for line in strutils.hexdump(data):
self("\t%s %s %s" % line)
else:
- for i in netlib.utils.clean_bin(data).split("\n"):
+ for i in strutils.clean_bin(data).split("\n"):
self("\t%s" % i)
def __call__(self, line):
diff --git a/pathod/pathoc.py b/pathod/pathoc.py
index 910387b6..2b7d053c 100644
--- a/pathod/pathoc.py
+++ b/pathod/pathoc.py
@@ -18,14 +18,19 @@ from netlib.exceptions import HttpException, TcpDisconnect, TcpTimeout, TlsExcep
NetlibException
from netlib.http import http1, http2
-from . import utils, log, language
+from . import log, language
import logging
from netlib.tutils import treq
+from netlib import strutils
logging.getLogger("hpack").setLevel(logging.WARNING)
+def xrepr(s):
+ return repr(s)[1:-1]
+
+
class PathocError(Exception):
pass
@@ -423,7 +428,7 @@ class Pathoc(tcp.TCPClient):
finally:
if resp:
lg("<< %s %s: %s bytes" % (
- resp.status_code, utils.xrepr(resp.reason), len(resp.content)
+ resp.status_code, strutils.bytes_to_escaped_str(resp.reason), len(resp.content)
))
if resp.status_code in self.ignorecodes:
lg.suppress()
diff --git a/pathod/utils.py b/pathod/utils.py
index fe12f541..3276198a 100644
--- a/pathod/utils.py
+++ b/pathod/utils.py
@@ -2,8 +2,6 @@ import os
import sys
import netlib.utils
-from netlib.utils import bytes_to_escaped_str
-
class MemBool(object):
@@ -28,22 +26,6 @@ def parse_anchor_spec(s):
return tuple(s.split("=", 1))
-def xrepr(s):
- return repr(s)[1:-1]
-
-
-def escape_unprintables(s):
- """
- Like inner_repr, but preserves line breaks.
- """
- s = s.replace(b"\r\n", b"PATHOD_MARKER_RN")
- s = s.replace(b"\n", b"PATHOD_MARKER_N")
- s = bytes_to_escaped_str(s)
- s = s.replace("PATHOD_MARKER_RN", "\n")
- s = s.replace("PATHOD_MARKER_N", "\n")
- return s
-
-
data = netlib.utils.Data(__name__)
diff --git a/test/mitmproxy/test_utils.py b/test/mitmproxy/test_utils.py
index 2af7a332..c01b5f2a 100644
--- a/test/mitmproxy/test_utils.py
+++ b/test/mitmproxy/test_utils.py
@@ -13,25 +13,6 @@ def test_format_timestamp_with_milli():
assert utils.format_timestamp_with_milli(utils.timestamp())
-def test_isBin():
- assert not utils.isBin("testing\n\r")
- assert utils.isBin("testing\x01")
- assert utils.isBin("testing\x0e")
- assert utils.isBin("testing\x7f")
-
-
-def test_isXml():
- assert not utils.isXML("foo")
- assert utils.isXML("<foo")
- assert utils.isXML(" \n<foo")
-
-
-def test_clean_hanging_newline():
- s = "foo\n"
- assert utils.clean_hanging_newline(s) == "foo"
- assert utils.clean_hanging_newline("foo") == "foo"
-
-
def test_pkg_data():
assert utils.pkg_data.path("console")
tutils.raises("does not exist", utils.pkg_data.path, "nonexistent")
diff --git a/test/netlib/test_strutils.py b/test/netlib/test_strutils.py
new file mode 100644
index 00000000..734265c4
--- /dev/null
+++ b/test/netlib/test_strutils.py
@@ -0,0 +1,63 @@
+# coding=utf-8
+
+from netlib import strutils
+
+
+def test_clean_bin():
+ assert strutils.clean_bin(b"one") == b"one"
+ assert strutils.clean_bin(b"\00ne") == b".ne"
+ assert strutils.clean_bin(b"\nne") == b"\nne"
+ assert strutils.clean_bin(b"\nne", False) == b".ne"
+ assert strutils.clean_bin(u"\u2605".encode("utf8")) == b"..."
+
+ assert strutils.clean_bin(u"one") == u"one"
+ assert strutils.clean_bin(u"\00ne") == u".ne"
+ assert strutils.clean_bin(u"\nne") == u"\nne"
+ assert strutils.clean_bin(u"\nne", False) == u".ne"
+ assert strutils.clean_bin(u"\u2605") == u"\u2605"
+
+
+def test_safe_subn():
+ assert strutils.safe_subn("foo", u"bar", "\xc2foo")
+
+
+def test_bytes_to_escaped_str():
+ assert strutils.bytes_to_escaped_str(b"foo") == "foo"
+ assert strutils.bytes_to_escaped_str(b"\b") == r"\x08"
+ assert strutils.bytes_to_escaped_str(br"&!?=\)") == r"&!?=\\)"
+ assert strutils.bytes_to_escaped_str(b'\xc3\xbc') == r"\xc3\xbc"
+ assert strutils.bytes_to_escaped_str(b"'") == r"\'"
+ assert strutils.bytes_to_escaped_str(b'"') == r'"'
+
+
+def test_escaped_str_to_bytes():
+ assert strutils.escaped_str_to_bytes("foo") == b"foo"
+ assert strutils.escaped_str_to_bytes("\x08") == b"\b"
+ assert strutils.escaped_str_to_bytes("&!?=\\\\)") == br"&!?=\)"
+ assert strutils.escaped_str_to_bytes("ü") == b'\xc3\xbc'
+ assert strutils.escaped_str_to_bytes(u"\\x08") == b"\b"
+ assert strutils.escaped_str_to_bytes(u"&!?=\\\\)") == br"&!?=\)"
+ assert strutils.escaped_str_to_bytes(u"ü") == b'\xc3\xbc'
+
+
+def test_isBin():
+ assert not strutils.isBin("testing\n\r")
+ assert strutils.isBin("testing\x01")
+ assert strutils.isBin("testing\x0e")
+ assert strutils.isBin("testing\x7f")
+
+
+def test_isXml():
+ assert not strutils.isXML("foo")
+ assert strutils.isXML("<foo")
+ assert strutils.isXML(" \n<foo")
+
+
+def test_clean_hanging_newline():
+ s = "foo\n"
+ assert strutils.clean_hanging_newline(s) == "foo"
+ assert strutils.clean_hanging_newline("foo") == "foo"
+
+
+def test_hexdump():
+ assert list(strutils.hexdump(b"one\0" * 10))
diff --git a/test/netlib/test_utils.py b/test/netlib/test_utils.py
index e13029cb..eaa66f13 100644
--- a/test/netlib/test_utils.py
+++ b/test/netlib/test_utils.py
@@ -10,44 +10,3 @@ def test_bidi():
assert b.get_name(5) is None
tutils.raises(AttributeError, getattr, b, "c")
tutils.raises(ValueError, utils.BiDi, one=1, two=1)
-
-
-def test_hexdump():
- assert list(utils.hexdump(b"one\0" * 10))
-
-
-def test_clean_bin():
- assert utils.clean_bin(b"one") == b"one"
- assert utils.clean_bin(b"\00ne") == b".ne"
- assert utils.clean_bin(b"\nne") == b"\nne"
- assert utils.clean_bin(b"\nne", False) == b".ne"
- assert utils.clean_bin(u"\u2605".encode("utf8")) == b"..."
-
- assert utils.clean_bin(u"one") == u"one"
- assert utils.clean_bin(u"\00ne") == u".ne"
- assert utils.clean_bin(u"\nne") == u"\nne"
- assert utils.clean_bin(u"\nne", False) == u".ne"
- assert utils.clean_bin(u"\u2605") == u"\u2605"
-
-
-def test_safe_subn():
- assert utils.safe_subn("foo", u"bar", "\xc2foo")
-
-
-def test_bytes_to_escaped_str():
- assert utils.bytes_to_escaped_str(b"foo") == "foo"
- assert utils.bytes_to_escaped_str(b"\b") == r"\x08"
- assert utils.bytes_to_escaped_str(br"&!?=\)") == r"&!?=\\)"
- assert utils.bytes_to_escaped_str(b'\xc3\xbc') == r"\xc3\xbc"
- assert utils.bytes_to_escaped_str(b"'") == r"\'"
- assert utils.bytes_to_escaped_str(b'"') == r'"'
-
-
-def test_escaped_str_to_bytes():
- assert utils.escaped_str_to_bytes("foo") == b"foo"
- assert utils.escaped_str_to_bytes("\x08") == b"\b"
- assert utils.escaped_str_to_bytes("&!?=\\\\)") == br"&!?=\)"
- assert utils.escaped_str_to_bytes("ü") == b'\xc3\xbc'
- assert utils.escaped_str_to_bytes(u"\\x08") == b"\b"
- assert utils.escaped_str_to_bytes(u"&!?=\\\\)") == br"&!?=\)"
- assert utils.escaped_str_to_bytes(u"ü") == b'\xc3\xbc'
diff --git a/test/pathod/test_utils.py b/test/pathod/test_utils.py
index ab4abbae..a46a523a 100644
--- a/test/pathod/test_utils.py
+++ b/test/pathod/test_utils.py
@@ -1,8 +1,6 @@
from pathod import utils
import tutils
-import six
-
def test_membool():
m = utils.MemBool()
@@ -20,12 +18,3 @@ def test_parse_anchor_spec():
def test_data_path():
tutils.raises(ValueError, utils.data.path, "nonexistent")
-
-
-def test_escape_unprintables():
- s = bytes(range(256))
- if six.PY2:
- s = "".join([chr(i) for i in range(255)])
- e = utils.escape_unprintables(s)
- assert e.encode('ascii')
- assert "PATHOD_MARKER" not in e