aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/contrib
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@corte.si>2017-03-20 12:50:09 +1300
committerAldo Cortesi <aldo@corte.si>2017-03-20 12:50:09 +1300
commit4ca78604af2a8ddb596e2f4e95090dabc8495bfe (patch)
treefff817d49cd5f4d8a3989f64be94b13cac17fd67 /mitmproxy/contrib
parent3a8da31835db37d65637058935f144ece62c1bdd (diff)
downloadmitmproxy-4ca78604af2a8ddb596e2f4e95090dabc8495bfe.tar.gz
mitmproxy-4ca78604af2a8ddb596e2f4e95090dabc8495bfe.tar.bz2
mitmproxy-4ca78604af2a8ddb596e2f4e95090dabc8495bfe.zip
Factor out an io module
Include tnetstring - we've made enough changes that this no longer belongs in contrib.
Diffstat (limited to 'mitmproxy/contrib')
-rw-r--r--mitmproxy/contrib/tnetstring.py250
1 files changed, 0 insertions, 250 deletions
diff --git a/mitmproxy/contrib/tnetstring.py b/mitmproxy/contrib/tnetstring.py
deleted file mode 100644
index 24ce6ce8..00000000
--- a/mitmproxy/contrib/tnetstring.py
+++ /dev/null
@@ -1,250 +0,0 @@
-"""
-tnetstring: data serialization using typed netstrings
-======================================================
-
-This is a custom Python 3 implementation of tnetstrings.
-Compared to other implementations, the main difference
-is that this implementation supports a custom unicode datatype.
-
-An ordinary tnetstring is a blob of data prefixed with its length and postfixed
-with its type. Here are some examples:
-
- >>> tnetstring.dumps("hello world")
- 11:hello world,
- >>> tnetstring.dumps(12345)
- 5:12345#
- >>> tnetstring.dumps([12345, True, 0])
- 19:5:12345#4:true!1:0#]
-
-This module gives you the following functions:
-
- :dump: dump an object as a tnetstring to a file
- :dumps: dump an object as a tnetstring to a string
- :load: load a tnetstring-encoded object from a file
- :loads: load a tnetstring-encoded object from a string
-
-Note that since parsing a tnetstring requires reading all the data into memory
-at once, there's no efficiency gain from using the file-based versions of these
-functions. They're only here so you can use load() to read precisely one
-item from a file or socket without consuming any extra data.
-
-The tnetstrings specification explicitly states that strings are binary blobs
-and forbids the use of unicode at the protocol level.
-**This implementation decodes dictionary keys as surrogate-escaped ASCII**,
-all other strings are returned as plain bytes.
-
-:Copyright: (c) 2012-2013 by Ryan Kelly <ryan@rfk.id.au>.
-:Copyright: (c) 2014 by Carlo Pires <carlopires@gmail.com>.
-:Copyright: (c) 2016 by Maximilian Hils <tnetstring3@maximilianhils.com>.
-
-:License: MIT
-"""
-
-import collections
-from typing import io, Union, Tuple
-
-TSerializable = Union[None, bool, int, float, bytes, list, tuple, dict]
-
-
-def dumps(value: TSerializable) -> bytes:
- """
- This function dumps a python object as a tnetstring.
- """
- # This uses a deque to collect output fragments in reverse order,
- # then joins them together at the end. It's measurably faster
- # than creating all the intermediate strings.
- q = collections.deque()
- _rdumpq(q, 0, value)
- return b''.join(q)
-
-
-def dump(value: TSerializable, file_handle: io.BinaryIO) -> None:
- """
- This function dumps a python object as a tnetstring and
- writes it to the given file.
- """
- file_handle.write(dumps(value))
-
-
-def _rdumpq(q: collections.deque, size: int, value: TSerializable) -> int:
- """
- Dump value as a tnetstring, to a deque instance, last chunks first.
-
- This function generates the tnetstring representation of the given value,
- pushing chunks of the output onto the given deque instance. It pushes
- the last chunk first, then recursively generates more chunks.
-
- When passed in the current size of the string in the queue, it will return
- the new size of the string in the queue.
-
- Operating last-chunk-first makes it easy to calculate the size written
- for recursive structures without having to build their representation as
- a string. This is measurably faster than generating the intermediate
- strings, especially on deeply nested structures.
- """
- write = q.appendleft
- if value is None:
- write(b'0:~')
- return size + 3
- elif value is True:
- write(b'4:true!')
- return size + 7
- elif value is False:
- write(b'5:false!')
- return size + 8
- elif isinstance(value, int):
- data = str(value).encode()
- ldata = len(data)
- span = str(ldata).encode()
- write(b'%s:%s#' % (span, data))
- return size + 2 + len(span) + ldata
- elif isinstance(value, float):
- # Use repr() for float rather than str().
- # It round-trips more accurately.
- # Probably unnecessary in later python versions that
- # use David Gay's ftoa routines.
- data = repr(value).encode()
- ldata = len(data)
- span = str(ldata).encode()
- write(b'%s:%s^' % (span, data))
- return size + 2 + len(span) + ldata
- elif isinstance(value, bytes):
- data = value
- ldata = len(data)
- span = str(ldata).encode()
- write(b',')
- write(data)
- write(b':')
- write(span)
- return size + 2 + len(span) + ldata
- elif isinstance(value, str):
- data = value.encode("utf8")
- ldata = len(data)
- span = str(ldata).encode()
- write(b';')
- write(data)
- write(b':')
- write(span)
- return size + 2 + len(span) + ldata
- elif isinstance(value, (list, tuple)):
- write(b']')
- init_size = size = size + 1
- for item in reversed(value):
- size = _rdumpq(q, size, item)
- span = str(size - init_size).encode()
- write(b':')
- write(span)
- return size + 1 + len(span)
- elif isinstance(value, dict):
- write(b'}')
- init_size = size = size + 1
- for (k, v) in value.items():
- size = _rdumpq(q, size, v)
- size = _rdumpq(q, size, k)
- span = str(size - init_size).encode()
- write(b':')
- write(span)
- return size + 1 + len(span)
- else:
- raise ValueError("unserializable object: {} ({})".format(value, type(value)))
-
-
-def loads(string: bytes) -> TSerializable:
- """
- This function parses a tnetstring into a python object.
- """
- return pop(string)[0]
-
-
-def load(file_handle: io.BinaryIO) -> TSerializable:
- """load(file) -> object
-
- This function reads a tnetstring from a file and parses it into a
- python object. The file must support the read() method, and this
- function promises not to read more data than necessary.
- """
- # Read the length prefix one char at a time.
- # Note that the netstring spec explicitly forbids padding zeros.
- c = file_handle.read(1)
- if c == b"": # we want to detect this special case.
- raise ValueError("not a tnetstring: empty file")
- data_length = b""
- while c.isdigit():
- data_length += c
- if len(data_length) > 9:
- raise ValueError("not a tnetstring: absurdly large length prefix")
- c = file_handle.read(1)
- if c != b":":
- raise ValueError("not a tnetstring: missing or invalid length prefix")
-
- data = file_handle.read(int(data_length))
- data_type = file_handle.read(1)[0]
-
- return parse(data_type, data)
-
-
-def parse(data_type: int, data: bytes) -> TSerializable:
- if data_type == ord(b','):
- return data
- if data_type == ord(b';'):
- return data.decode("utf8")
- if data_type == ord(b'#'):
- try:
- return int(data)
- except ValueError:
- raise ValueError("not a tnetstring: invalid integer literal: {}".format(data))
- if data_type == ord(b'^'):
- try:
- return float(data)
- except ValueError:
- raise ValueError("not a tnetstring: invalid float literal: {}".format(data))
- if data_type == ord(b'!'):
- if data == b'true':
- return True
- elif data == b'false':
- return False
- else:
- raise ValueError("not a tnetstring: invalid boolean literal: {}".format(data))
- if data_type == ord(b'~'):
- if data:
- raise ValueError("not a tnetstring: invalid null literal")
- return None
- if data_type == ord(b']'):
- l = []
- while data:
- item, data = pop(data)
- l.append(item)
- return l
- if data_type == ord(b'}'):
- d = {}
- while data:
- key, data = pop(data)
- val, data = pop(data)
- d[key] = val
- return d
- raise ValueError("unknown type tag: {}".format(data_type))
-
-
-def pop(data: bytes) -> Tuple[TSerializable, bytes]:
- """
- This function parses a tnetstring into a python object.
- It returns a tuple giving the parsed object and a string
- containing any unparsed data from the end of the string.
- """
- # Parse out data length, type and remaining string.
- try:
- length, data = data.split(b':', 1)
- length = int(length)
- except ValueError:
- raise ValueError("not a tnetstring: missing or invalid length prefix: {}".format(data))
- try:
- data, data_type, remain = data[:length], data[length], data[length + 1:]
- except IndexError:
- # This fires if len(data) < dlen, meaning we don't need
- # to further validate that data is the right length.
- raise ValueError("not a tnetstring: invalid length prefix: {}".format(length))
- # Parse the data based on the type tag.
- return parse(data_type, data), remain
-
-
-__all__ = ["dump", "dumps", "load", "loads", "pop"]