diff options
| -rw-r--r-- | mitmproxy/contrib/py2/__init__.py | 0 | ||||
| -rw-r--r-- | mitmproxy/contrib/py2/tnetstring.py | 375 | ||||
| -rw-r--r-- | mitmproxy/contrib/py3/__init__.py | 0 | ||||
| -rw-r--r-- | mitmproxy/contrib/py3/tnetstring.py | 237 | ||||
| -rw-r--r-- | mitmproxy/contrib/py3/tnetstring_tests.py | 133 | ||||
| -rw-r--r-- | mitmproxy/contrib/tnetstring.py | 256 | 
6 files changed, 251 insertions, 750 deletions
| diff --git a/mitmproxy/contrib/py2/__init__.py b/mitmproxy/contrib/py2/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/mitmproxy/contrib/py2/__init__.py +++ /dev/null diff --git a/mitmproxy/contrib/py2/tnetstring.py b/mitmproxy/contrib/py2/tnetstring.py deleted file mode 100644 index 9bf20b09..00000000 --- a/mitmproxy/contrib/py2/tnetstring.py +++ /dev/null @@ -1,375 +0,0 @@ -# imported from the tnetstring project: https://github.com/rfk/tnetstring -# -# Copyright (c) 2011 Ryan Kelly -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -""" -tnetstring:  data serialization using typed netstrings -====================================================== - - -This is a data serialization library. It's a lot like JSON but it uses a -new syntax called "typed netstrings" that Zed has proposed for use in the -Mongrel2 webserver.  It's designed to be simpler and easier to implement -than JSON, with a happy consequence of also being faster in many cases. - -An ordinary netstring is a blob of data prefixed with its length and postfixed -with a sanity-checking comma.  The string "hello world" encodes like this:: - -    11:hello world, - -Typed netstrings add other datatypes by replacing the comma with a type tag. -Here's the integer 12345 encoded as a tnetstring:: - -    5:12345# - -And here's the list [12345,True,0] which mixes integers and bools:: - -    19:5:12345#4:true!1:0#] - -Simple enough?  This module gives you the following functions: - -    :dump:    dump an object as a tnetstring to a file -    :dumps:   dump an object as a tnetstring to a string -    :load:    load a tnetstring-encoded object from a file -    :loads:   load a tnetstring-encoded object from a string -    :pop:     pop a tnetstring-encoded object from the front of a string - -Note that since parsing a tnetstring requires reading all the data into memory -at once, there's no efficiency gain from using the file-based versions of these -functions.  They're only here so you can use load() to read precisely one -item from a file or socket without consuming any extra data. - -By default tnetstrings work only with byte strings, not unicode.  If you want -unicode strings then pass an optional encoding to the various functions, -like so:: - -    >>> print(repr(tnetstring.loads("2:\\xce\\xb1,"))) -    '\\xce\\xb1' -    >>> -    >>> print(repr(tnetstring.loads("2:\\xce\\xb1,","utf8"))) -    u'\u03b1' - -""" -from collections import deque - -import six - -__ver_major__ = 0 -__ver_minor__ = 2 -__ver_patch__ = 0 -__ver_sub__ = "" -__version__ = "%d.%d.%d%s" % ( -    __ver_major__, __ver_minor__, __ver_patch__, __ver_sub__) - - -def dumps(value): -    """ -    This function dumps a python object as a tnetstring. -    """ -    #  This uses a deque to collect output fragments in reverse order, -    #  then joins them together at the end.  It's measurably faster -    #  than creating all the intermediate strings. -    #  If you're reading this to get a handle on the tnetstring format, -    #  consider the _gdumps() function instead; it's a standard top-down -    #  generator that's simpler to understand but much less efficient. -    q = deque() -    _rdumpq(q, 0, value) -    return b''.join(q) - - -def dump(value, file_handle): -    """ -    This function dumps a python object as a tnetstring and -    writes it to the given file. -    """ -    file_handle.write(dumps(value)) - - -def _rdumpq(q, size, value): -    """ -    Dump value as a tnetstring, to a deque instance, last chunks first. - -    This function generates the tnetstring representation of the given value, -    pushing chunks of the output onto the given deque instance.  It pushes -    the last chunk first, then recursively generates more chunks. - -    When passed in the current size of the string in the queue, it will return -    the new size of the string in the queue. - -    Operating last-chunk-first makes it easy to calculate the size written -    for recursive structures without having to build their representation as -    a string.  This is measurably faster than generating the intermediate -    strings, especially on deeply nested structures. -    """ -    write = q.appendleft -    if value is None: -        write(b'0:~') -        return size + 3 -    elif value is True: -        write(b'4:true!') -        return size + 7 -    elif value is False: -        write(b'5:false!') -        return size + 8 -    elif isinstance(value, six.integer_types): -        data = str(value).encode() -        ldata = len(data) -        span = str(ldata).encode() -        write(b'#') -        write(data) -        write(b':') -        write(span) -        return size + 2 + len(span) + ldata -    elif isinstance(value, float): -        #  Use repr() for float rather than str(). -        #  It round-trips more accurately. -        #  Probably unnecessary in later python versions that -        #  use David Gay's ftoa routines. -        data = repr(value).encode() -        ldata = len(data) -        span = str(ldata).encode() -        write(b'^') -        write(data) -        write(b':') -        write(span) -        return size + 2 + len(span) + ldata -    elif isinstance(value, bytes): -        lvalue = len(value) -        span = str(lvalue).encode() -        write(b',') -        write(value) -        write(b':') -        write(span) -        return size + 2 + len(span) + lvalue -    elif isinstance(value, (list, tuple)): -        write(b']') -        init_size = size = size + 1 -        for item in reversed(value): -            size = _rdumpq(q, size, item) -        span = str(size - init_size).encode() -        write(b':') -        write(span) -        return size + 1 + len(span) -    elif isinstance(value, dict): -        write(b'}') -        init_size = size = size + 1 -        for (k, v) in value.items(): -            size = _rdumpq(q, size, v) -            size = _rdumpq(q, size, k) -        span = str(size - init_size).encode() -        write(b':') -        write(span) -        return size + 1 + len(span) -    else: -        raise ValueError("unserializable object: {} ({})".format(value, type(value))) - - -def _gdumps(value): -    """ -    Generate fragments of value dumped as a tnetstring. - -    This is the naive dumping algorithm, implemented as a generator so that -    it's easy to pass to "".join() without building a new list. - -    This is mainly here for comparison purposes; the _rdumpq version is -    measurably faster as it doesn't have to build intermediate strins. -    """ -    if value is None: -        yield b'0:~' -    elif value is True: -        yield b'4:true!' -    elif value is False: -        yield b'5:false!' -    elif isinstance(value, six.integer_types): -        data = str(value).encode() -        yield str(len(data)).encode() -        yield b':' -        yield data -        yield b'#' -    elif isinstance(value, float): -        data = repr(value).encode() -        yield str(len(data)).encode() -        yield b':' -        yield data -        yield b'^' -    elif isinstance(value, bytes): -        yield str(len(value)).encode() -        yield b':' -        yield value -        yield b',' -    elif isinstance(value, (list, tuple)): -        sub = [] -        for item in value: -            sub.extend(_gdumps(item)) -        sub = b''.join(sub) -        yield str(len(sub)).encode() -        yield b':' -        yield sub -        yield b']' -    elif isinstance(value, (dict,)): -        sub = [] -        for (k, v) in value.items(): -            sub.extend(_gdumps(k)) -            sub.extend(_gdumps(v)) -        sub = b''.join(sub) -        yield str(len(sub)).encode() -        yield b':' -        yield sub -        yield b'}' -    else: -        raise ValueError("unserializable object") - - -def loads(string): -    """ -    This function parses a tnetstring into a python object. -    """ -    #  No point duplicating effort here.  In the C-extension version, -    #  loads() is measurably faster then pop() since it can avoid -    #  the overhead of building a second string. -    return pop(string)[0] - - -def load(file_handle): -    """load(file) -> object - -    This function reads a tnetstring from a file and parses it into a -    python object.  The file must support the read() method, and this -    function promises not to read more data than necessary. -    """ -    #  Read the length prefix one char at a time. -    #  Note that the netstring spec explicitly forbids padding zeros. -    c = file_handle.read(1) -    if not c.isdigit(): -        raise ValueError("not a tnetstring: missing or invalid length prefix") -    datalen = ord(c) - ord('0') -    c = file_handle.read(1) -    if datalen != 0: -        while c.isdigit(): -            datalen = (10 * datalen) + (ord(c) - ord('0')) -            if datalen > 999999999: -                errmsg = "not a tnetstring: absurdly large length prefix" -                raise ValueError(errmsg) -            c = file_handle.read(1) -    if c != b':': -        raise ValueError("not a tnetstring: missing or invalid length prefix") -    #  Now we can read and parse the payload. -    #  This repeats the dispatch logic of pop() so we can avoid -    #  re-constructing the outermost tnetstring. -    data = file_handle.read(datalen) -    if len(data) != datalen: -        raise ValueError("not a tnetstring: length prefix too big") -    tns_type = file_handle.read(1) -    if tns_type == b',': -        return data -    if tns_type == b'#': -        try: -            return int(data) -        except ValueError: -            raise ValueError("not a tnetstring: invalid integer literal") -    if tns_type == b'^': -        try: -            return float(data) -        except ValueError: -            raise ValueError("not a tnetstring: invalid float literal") -    if tns_type == b'!': -        if data == b'true': -            return True -        elif data == b'false': -            return False -        else: -            raise ValueError("not a tnetstring: invalid boolean literal") -    if tns_type == b'~': -        if data: -            raise ValueError("not a tnetstring: invalid null literal") -        return None -    if tns_type == b']': -        l = [] -        while data: -            item, data = pop(data) -            l.append(item) -        return l -    if tns_type == b'}': -        d = {} -        while data: -            key, data = pop(data) -            val, data = pop(data) -            d[key] = val -        return d -    raise ValueError("unknown type tag") - - -def pop(string): -    """pop(string,encoding='utf_8') -> (object, remain) - -    This function parses a tnetstring into a python object. -    It returns a tuple giving the parsed object and a string -    containing any unparsed data from the end of the string. -    """ -    #  Parse out data length, type and remaining string. -    try: -        dlen, rest = string.split(b':', 1) -        dlen = int(dlen) -    except ValueError: -        raise ValueError("not a tnetstring: missing or invalid length prefix: {}".format(string)) -    try: -        data, tns_type, remain = rest[:dlen], rest[dlen:dlen + 1], rest[dlen + 1:] -    except IndexError: -        #  This fires if len(rest) < dlen, meaning we don't need -        #  to further validate that data is the right length. -        raise ValueError("not a tnetstring: invalid length prefix: {}".format(dlen)) -    #  Parse the data based on the type tag. -    if tns_type == b',': -        return data, remain -    if tns_type == b'#': -        try: -            return int(data), remain -        except ValueError: -            raise ValueError("not a tnetstring: invalid integer literal: {}".format(data)) -    if tns_type == b'^': -        try: -            return float(data), remain -        except ValueError: -            raise ValueError("not a tnetstring: invalid float literal: {}".format(data)) -    if tns_type == b'!': -        if data == b'true': -            return True, remain -        elif data == b'false': -            return False, remain -        else: -            raise ValueError("not a tnetstring: invalid boolean literal: {}".format(data)) -    if tns_type == b'~': -        if data: -            raise ValueError("not a tnetstring: invalid null literal") -        return None, remain -    if tns_type == b']': -        l = [] -        while data: -            item, data = pop(data) -            l.append(item) -        return (l, remain) -    if tns_type == b'}': -        d = {} -        while data: -            key, data = pop(data) -            val, data = pop(data) -            d[key] = val -        return d, remain -    raise ValueError("unknown type tag: {}".format(tns_type)) diff --git a/mitmproxy/contrib/py3/__init__.py b/mitmproxy/contrib/py3/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/mitmproxy/contrib/py3/__init__.py +++ /dev/null diff --git a/mitmproxy/contrib/py3/tnetstring.py b/mitmproxy/contrib/py3/tnetstring.py deleted file mode 100644 index 6998fc82..00000000 --- a/mitmproxy/contrib/py3/tnetstring.py +++ /dev/null @@ -1,237 +0,0 @@ -""" -tnetstring:  data serialization using typed netstrings -====================================================== - -This is a custom Python 3 implementation of tnetstrings. -Compared to other implementations, the main difference -is the conversion of dictionary keys to str. - -An ordinary tnetstring is a blob of data prefixed with its length and postfixed -with its type. Here are some examples: - -    >>> tnetstring.dumps("hello world") -    11:hello world, -    >>> tnetstring.dumps(12345) -    5:12345# -    >>> tnetstring.dumps([12345, True, 0]) -    19:5:12345#4:true!1:0#] - -This module gives you the following functions: - -    :dump:    dump an object as a tnetstring to a file -    :dumps:   dump an object as a tnetstring to a string -    :load:    load a tnetstring-encoded object from a file -    :loads:   load a tnetstring-encoded object from a string - -Note that since parsing a tnetstring requires reading all the data into memory -at once, there's no efficiency gain from using the file-based versions of these -functions.  They're only here so you can use load() to read precisely one -item from a file or socket without consuming any extra data. - -The tnetstrings specification explicitly states that strings are binary blobs -and forbids the use of unicode at the protocol level. -**This implementation decodes dictionary keys as surrogate-escaped ASCII**, -all other strings are returned as plain bytes. - -:Copyright: (c) 2012-2013 by Ryan Kelly <ryan@rfk.id.au>. -:Copyright: (c) 2014 by Carlo Pires <carlopires@gmail.com>. -:Copyright: (c) 2016 by Maximilian Hils <tnetstring3@maximilianhils.com>. - -:License: MIT -""" - -import collections -from typing import io, Union, Tuple - -TSerializable = Union[None, bool, int, float, bytes, list, tuple, dict] - - -def dumps(value: TSerializable) -> bytes: -    """ -    This function dumps a python object as a tnetstring. -    """ -    #  This uses a deque to collect output fragments in reverse order, -    #  then joins them together at the end.  It's measurably faster -    #  than creating all the intermediate strings. -    q = collections.deque() -    _rdumpq(q, 0, value) -    return b''.join(q) - - -def dump(value: TSerializable, file_handle: io.BinaryIO) -> None: -    """ -    This function dumps a python object as a tnetstring and -    writes it to the given file. -    """ -    file_handle.write(dumps(value)) - - -def _rdumpq(q: collections.deque, size: int, value: TSerializable) -> int: -    """ -    Dump value as a tnetstring, to a deque instance, last chunks first. - -    This function generates the tnetstring representation of the given value, -    pushing chunks of the output onto the given deque instance.  It pushes -    the last chunk first, then recursively generates more chunks. - -    When passed in the current size of the string in the queue, it will return -    the new size of the string in the queue. - -    Operating last-chunk-first makes it easy to calculate the size written -    for recursive structures without having to build their representation as -    a string.  This is measurably faster than generating the intermediate -    strings, especially on deeply nested structures. -    """ -    write = q.appendleft -    if value is None: -        write(b'0:~') -        return size + 3 -    elif value is True: -        write(b'4:true!') -        return size + 7 -    elif value is False: -        write(b'5:false!') -        return size + 8 -    elif isinstance(value, int): -        data = str(value).encode() -        ldata = len(data) -        span = str(ldata).encode() -        write(b'%s:%s#' % (span, data)) -        return size + 2 + len(span) + ldata -    elif isinstance(value, float): -        #  Use repr() for float rather than str(). -        #  It round-trips more accurately. -        #  Probably unnecessary in later python versions that -        #  use David Gay's ftoa routines. -        data = repr(value).encode() -        ldata = len(data) -        span = str(ldata).encode() -        write(b'%s:%s^' % (span, data)) -        return size + 2 + len(span) + ldata -    elif isinstance(value, bytes): -        lvalue = len(value) -        span = str(lvalue).encode() -        write(b'%s:%s,' % (span, value)) -        return size + 2 + len(span) + lvalue -    elif isinstance(value, (list, tuple)): -        write(b']') -        init_size = size = size + 1 -        for item in reversed(value): -            size = _rdumpq(q, size, item) -        span = str(size - init_size).encode() -        write(b':') -        write(span) -        return size + 1 + len(span) -    elif isinstance(value, dict): -        write(b'}') -        init_size = size = size + 1 -        for (k, v) in value.items(): -            if isinstance(k, str): -                k = k.encode("ascii", "strict") -            size = _rdumpq(q, size, v) -            size = _rdumpq(q, size, k) -        span = str(size - init_size).encode() -        write(b':') -        write(span) -        return size + 1 + len(span) -    else: -        raise ValueError("unserializable object: {} ({})".format(value, type(value))) - - -def loads(string: bytes) -> TSerializable: -    """ -    This function parses a tnetstring into a python object. -    """ -    return pop(string)[0] - - -def load(file_handle: io.BinaryIO) -> TSerializable: -    """load(file) -> object - -    This function reads a tnetstring from a file and parses it into a -    python object.  The file must support the read() method, and this -    function promises not to read more data than necessary. -    """ -    #  Read the length prefix one char at a time. -    #  Note that the netstring spec explicitly forbids padding zeros. -    c = file_handle.read(1) -    data_length = b"" -    while c.isdigit(): -        data_length += c -        if len(data_length) > 9: -            raise ValueError("not a tnetstring: absurdly large length prefix") -        c = file_handle.read(1) -    if c != b":": -        raise ValueError("not a tnetstring: missing or invalid length prefix") - -    data = file_handle.read(int(data_length)) -    data_type = file_handle.read(1)[0] - -    return parse(data_type, data) - - -def parse(data_type: int, data: bytes) -> TSerializable: -    if data_type == ord(b','): -        return data -    if data_type == ord(b'#'): -        try: -            return int(data) -        except ValueError: -            raise ValueError("not a tnetstring: invalid integer literal: {}".format(data)) -    if data_type == ord(b'^'): -        try: -            return float(data) -        except ValueError: -            raise ValueError("not a tnetstring: invalid float literal: {}".format(data)) -    if data_type == ord(b'!'): -        if data == b'true': -            return True -        elif data == b'false': -            return False -        else: -            raise ValueError("not a tnetstring: invalid boolean literal: {}".format(data)) -    if data_type == ord(b'~'): -        if data: -            raise ValueError("not a tnetstring: invalid null literal") -        return None -    if data_type == ord(b']'): -        l = [] -        while data: -            item, data = pop(data) -            l.append(item) -        return l -    if data_type == ord(b'}'): -        d = {} -        while data: -            key, data = pop(data) -            if isinstance(key, bytes): -                key = key.decode("ascii", "strict") -            val, data = pop(data) -            d[key] = val -        return d -    raise ValueError("unknown type tag: {}".format(data_type)) - - -def pop(data: bytes) -> Tuple[TSerializable, bytes]: -    """ -    This function parses a tnetstring into a python object. -    It returns a tuple giving the parsed object and a string -    containing any unparsed data from the end of the string. -    """ -    #  Parse out data length, type and remaining string. -    try: -        length, data = data.split(b':', 1) -        length = int(length) -    except ValueError: -        raise ValueError("not a tnetstring: missing or invalid length prefix: {}".format(data)) -    try: -        data, data_type, remain = data[:length], data[length], data[length + 1:] -    except IndexError: -        #  This fires if len(data) < dlen, meaning we don't need -        #  to further validate that data is the right length. -        raise ValueError("not a tnetstring: invalid length prefix: {}".format(length)) -    # Parse the data based on the type tag. -    return parse(data_type, data), remain - - -__all__ = ["dump", "dumps", "load", "loads", "pop"] diff --git a/mitmproxy/contrib/py3/tnetstring_tests.py b/mitmproxy/contrib/py3/tnetstring_tests.py deleted file mode 100644 index 4ee184d5..00000000 --- a/mitmproxy/contrib/py3/tnetstring_tests.py +++ /dev/null @@ -1,133 +0,0 @@ -import unittest -import random -import math -import io -from . import tnetstring -import struct - -MAXINT = 2 ** (struct.Struct('i').size * 8 - 1) - 1 - -FORMAT_EXAMPLES = { -    b'0:}': {}, -    b'0:]': [], -    b'51:5:hello,39:11:12345678901#4:this,4:true!0:~4:\x00\x00\x00\x00,]}': -            {'hello': [12345678901, b'this', True, None, b'\x00\x00\x00\x00']}, -    b'5:12345#': 12345, -    b'12:this is cool,': b'this is cool', -    b'0:,': b'', -    b'0:~': None, -    b'4:true!': True, -    b'5:false!': False, -    b'10:\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00,': b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', -    b'24:5:12345#5:67890#5:xxxxx,]': [12345, 67890, b'xxxxx'], -    b'18:3:0.1^3:0.2^3:0.3^]': [0.1, 0.2, 0.3], -    b'243:238:233:228:223:218:213:208:203:198:193:188:183:178:173:168:163:158:153:148:143:138:133:128:123:118:113:108:103:99:95:91:87:83:79:75:71:67:63:59:55:51:47:43:39:35:31:27:23:19:15:11:hello-there,]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]': [[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[[b'hello-there']]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]]] -} - -def get_random_object(random=random, depth=0): -    """Generate a random serializable object.""" -    #  The probability of generating a scalar value increases as the depth increase. -    #  This ensures that we bottom out eventually. -    if random.randint(depth,10) <= 4: -        what = random.randint(0,1) -        if what == 0: -            n = random.randint(0,10) -            l = [] -            for _ in range(n): -                l.append(get_random_object(random,depth+1)) -            return l -        if what == 1: -            n = random.randint(0,10) -            d = {} -            for _ in range(n): -                n = random.randint(0,100) -                k = str([random.randint(32,126) for _ in range(n)]) -                d[k] = get_random_object(random,depth+1) -            return d -    else: -        what = random.randint(0,4) -        if what == 0: -            return None -        if what == 1: -            return True -        if what == 2: -            return False -        if what == 3: -            if random.randint(0,1) == 0: -                return random.randint(0,MAXINT) -            else: -                return -1 * random.randint(0,MAXINT) -        n = random.randint(0,100) -        return bytes([random.randint(32,126) for _ in range(n)]) - -class Test_Format(unittest.TestCase): -    def test_roundtrip_format_examples(self): -        for data, expect in FORMAT_EXAMPLES.items(): -            self.assertEqual(expect,tnetstring.loads(data)) -            self.assertEqual(expect,tnetstring.loads(tnetstring.dumps(expect))) -            self.assertEqual((expect,b''),tnetstring.pop(data)) - -    def test_roundtrip_format_random(self): -        for _ in range(500): -            v = get_random_object() -            self.assertEqual(v,tnetstring.loads(tnetstring.dumps(v))) -            self.assertEqual((v,b""),tnetstring.pop(tnetstring.dumps(v))) - -    def test_unicode_handling(self): -        with self.assertRaises(ValueError): -            tnetstring.dumps("hello") -        self.assertEqual(tnetstring.dumps("hello".encode()),b"5:hello,") -        self.assertEqual(type(tnetstring.loads(b"5:hello,")),bytes) - -    def test_roundtrip_format_unicode(self): -        for _ in range(500): -            v = get_random_object() -            self.assertEqual(v,tnetstring.loads(tnetstring.dumps(v))) -            self.assertEqual((v,b''),tnetstring.pop(tnetstring.dumps(v))) - -    def test_roundtrip_big_integer(self): -        i1 = math.factorial(30000) -        s = tnetstring.dumps(i1) -        i2 = tnetstring.loads(s) -        self.assertEqual(i1, i2) - -class Test_FileLoading(unittest.TestCase): -    def test_roundtrip_file_examples(self): -        for data, expect in FORMAT_EXAMPLES.items(): -            s = io.BytesIO() -            s.write(data) -            s.write(b'OK') -            s.seek(0) -            self.assertEqual(expect,tnetstring.load(s)) -            self.assertEqual(b'OK',s.read()) -            s = io.BytesIO() -            tnetstring.dump(expect,s) -            s.write(b'OK') -            s.seek(0) -            self.assertEqual(expect,tnetstring.load(s)) -            self.assertEqual(b'OK',s.read()) - -    def test_roundtrip_file_random(self): -        for _ in range(500): -            v = get_random_object() -            s = io.BytesIO() -            tnetstring.dump(v,s) -            s.write(b'OK') -            s.seek(0) -            self.assertEqual(v,tnetstring.load(s)) -            self.assertEqual(b'OK',s.read()) - -    def test_error_on_absurd_lengths(self): -        s = io.BytesIO() -        s.write(b'1000000000:pwned!,') -        s.seek(0) -        with self.assertRaises(ValueError): -            tnetstring.load(s) -        self.assertEqual(s.read(1),b':') - -def suite(): -    loader = unittest.TestLoader() -    suite = unittest.TestSuite() -    suite.addTest(loader.loadTestsFromTestCase(Test_Format)) -    suite.addTest(loader.loadTestsFromTestCase(Test_FileLoading)) -    return suite
\ No newline at end of file diff --git a/mitmproxy/contrib/tnetstring.py b/mitmproxy/contrib/tnetstring.py index 1ebaba21..5fc26b45 100644 --- a/mitmproxy/contrib/tnetstring.py +++ b/mitmproxy/contrib/tnetstring.py @@ -1,8 +1,254 @@ +""" +tnetstring:  data serialization using typed netstrings +====================================================== + +This is a custom Python 3 implementation of tnetstrings. +Compared to other implementations, the main difference +is that this implementation supports a custom unicode datatype. + +An ordinary tnetstring is a blob of data prefixed with its length and postfixed +with its type. Here are some examples: + +    >>> tnetstring.dumps("hello world") +    11:hello world, +    >>> tnetstring.dumps(12345) +    5:12345# +    >>> tnetstring.dumps([12345, True, 0]) +    19:5:12345#4:true!1:0#] + +This module gives you the following functions: + +    :dump:    dump an object as a tnetstring to a file +    :dumps:   dump an object as a tnetstring to a string +    :load:    load a tnetstring-encoded object from a file +    :loads:   load a tnetstring-encoded object from a string + +Note that since parsing a tnetstring requires reading all the data into memory +at once, there's no efficiency gain from using the file-based versions of these +functions.  They're only here so you can use load() to read precisely one +item from a file or socket without consuming any extra data. + +The tnetstrings specification explicitly states that strings are binary blobs +and forbids the use of unicode at the protocol level. +**This implementation decodes dictionary keys as surrogate-escaped ASCII**, +all other strings are returned as plain bytes. + +:Copyright: (c) 2012-2013 by Ryan Kelly <ryan@rfk.id.au>. +:Copyright: (c) 2014 by Carlo Pires <carlopires@gmail.com>. +:Copyright: (c) 2016 by Maximilian Hils <tnetstring3@maximilianhils.com>. + +:License: MIT +""" + +import collections  import six +from typing import io, Union, Tuple  # noqa + +TSerializable = Union[None, bool, int, float, bytes, list, tuple, dict] + + +def dumps(value): +    # type: (TSerializable) -> bytes +    """ +    This function dumps a python object as a tnetstring. +    """ +    #  This uses a deque to collect output fragments in reverse order, +    #  then joins them together at the end.  It's measurably faster +    #  than creating all the intermediate strings. +    q = collections.deque() +    _rdumpq(q, 0, value) +    return b''.join(q) + + +def dump(value, file_handle): +    # type: (TSerializable, io.BinaryIO) -> None +    """ +    This function dumps a python object as a tnetstring and +    writes it to the given file. +    """ +    file_handle.write(dumps(value)) + + +def _rdumpq(q, size, value): +    # type: (collections.deque, int, TSerializable) -> int +    """ +    Dump value as a tnetstring, to a deque instance, last chunks first. + +    This function generates the tnetstring representation of the given value, +    pushing chunks of the output onto the given deque instance.  It pushes +    the last chunk first, then recursively generates more chunks. + +    When passed in the current size of the string in the queue, it will return +    the new size of the string in the queue. + +    Operating last-chunk-first makes it easy to calculate the size written +    for recursive structures without having to build their representation as +    a string.  This is measurably faster than generating the intermediate +    strings, especially on deeply nested structures. +    """ +    write = q.appendleft +    if value is None: +        write(b'0:~') +        return size + 3 +    elif value is True: +        write(b'4:true!') +        return size + 7 +    elif value is False: +        write(b'5:false!') +        return size + 8 +    elif isinstance(value, int): +        data = str(value).encode() +        ldata = len(data) +        span = str(ldata).encode() +        write(b'%s:%s#' % (span, data)) +        return size + 2 + len(span) + ldata +    elif isinstance(value, float): +        #  Use repr() for float rather than str(). +        #  It round-trips more accurately. +        #  Probably unnecessary in later python versions that +        #  use David Gay's ftoa routines. +        data = repr(value).encode() +        ldata = len(data) +        span = str(ldata).encode() +        write(b'%s:%s^' % (span, data)) +        return size + 2 + len(span) + ldata +    elif isinstance(value, bytes): +        data = value +        ldata = len(data) +        span = str(ldata).encode() +        write(b'%s:%s,' % (span, data)) +        return size + 2 + len(span) + ldata +    elif isinstance(value, six.text_type): +        data = value.encode() +        ldata = len(data) +        span = str(ldata).encode() +        write(b'%s:%s;' % (span, data)) +        return size + 2 + len(span) + ldata +    elif isinstance(value, (list, tuple)): +        write(b']') +        init_size = size = size + 1 +        for item in reversed(value): +            size = _rdumpq(q, size, item) +        span = str(size - init_size).encode() +        write(b':') +        write(span) +        return size + 1 + len(span) +    elif isinstance(value, dict): +        write(b'}') +        init_size = size = size + 1 +        for (k, v) in value.items(): +            if isinstance(k, str): +                k = k.encode("ascii", "strict") +            size = _rdumpq(q, size, v) +            size = _rdumpq(q, size, k) +        span = str(size - init_size).encode() +        write(b':') +        write(span) +        return size + 1 + len(span) +    else: +        raise ValueError("unserializable object: {} ({})".format(value, type(value))) + + +def loads(string): +    # type: (bytes) -> TSerializable +    """ +    This function parses a tnetstring into a python object. +    """ +    return pop(string)[0] + + +def load(file_handle): +    # type: (io.BinaryIO) -> TSerializable +    """load(file) -> object + +    This function reads a tnetstring from a file and parses it into a +    python object.  The file must support the read() method, and this +    function promises not to read more data than necessary. +    """ +    #  Read the length prefix one char at a time. +    #  Note that the netstring spec explicitly forbids padding zeros. +    c = file_handle.read(1) +    data_length = b"" +    while c.isdigit(): +        data_length += c +        if len(data_length) > 9: +            raise ValueError("not a tnetstring: absurdly large length prefix") +        c = file_handle.read(1) +    if c != b":": +        raise ValueError("not a tnetstring: missing or invalid length prefix") + +    data = file_handle.read(int(data_length)) +    data_type = file_handle.read(1)[0] + +    return parse(data_type, data) + + +def parse(data_type, data): +    # type: (int, bytes) -> TSerializable +    if data_type == ord(b','): +        return data +    if data_type == ord(b';'): +        return data.decode() +    if data_type == ord(b'#'): +        try: +            return int(data) +        except ValueError: +            raise ValueError("not a tnetstring: invalid integer literal: {}".format(data)) +    if data_type == ord(b'^'): +        try: +            return float(data) +        except ValueError: +            raise ValueError("not a tnetstring: invalid float literal: {}".format(data)) +    if data_type == ord(b'!'): +        if data == b'true': +            return True +        elif data == b'false': +            return False +        else: +            raise ValueError("not a tnetstring: invalid boolean literal: {}".format(data)) +    if data_type == ord(b'~'): +        if data: +            raise ValueError("not a tnetstring: invalid null literal") +        return None +    if data_type == ord(b']'): +        l = [] +        while data: +            item, data = pop(data) +            l.append(item) +        return l +    if data_type == ord(b'}'): +        d = {} +        while data: +            key, data = pop(data) +            if isinstance(key, bytes): +                key = key.decode("ascii", "strict") +            val, data = pop(data) +            d[key] = val +        return d +    raise ValueError("unknown type tag: {}".format(data_type)) + + +def pop(data): +    # type: (bytes) -> Tuple[TSerializable, bytes] +    """ +    This function parses a tnetstring into a python object. +    It returns a tuple giving the parsed object and a string +    containing any unparsed data from the end of the string. +    """ +    #  Parse out data length, type and remaining string. +    try: +        length, data = data.split(b':', 1) +        length = int(length) +    except ValueError: +        raise ValueError("not a tnetstring: missing or invalid length prefix: {}".format(data)) +    try: +        data, data_type, remain = data[:length], data[length], data[length + 1:] +    except IndexError: +        #  This fires if len(data) < dlen, meaning we don't need +        #  to further validate that data is the right length. +        raise ValueError("not a tnetstring: invalid length prefix: {}".format(length)) +    # Parse the data based on the type tag. +    return parse(data_type, data), remain -if six.PY2: -    from .py2.tnetstring import load, loads, dump, dumps, pop -else: -    from .py3.tnetstring import load, loads, dump, dumps, pop -__all__ = ["load", "loads", "dump", "dumps", "pop"] +__all__ = ["dump", "dumps", "load", "loads", "pop"] | 
