From 301d52d9d05f2c5f074fe68c73acc1c32e518020 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Thu, 20 Oct 2016 11:06:57 +1300 Subject: netlib.encoding -> netlib.http.encoding Encoding is highly specific to http, and only used within this module. --- netlib/encoding.py | 175 ------------------------------------------------ netlib/http/encoding.py | 175 ++++++++++++++++++++++++++++++++++++++++++++++++ netlib/http/message.py | 2 +- 3 files changed, 176 insertions(+), 176 deletions(-) delete mode 100644 netlib/encoding.py create mode 100644 netlib/http/encoding.py (limited to 'netlib') diff --git a/netlib/encoding.py b/netlib/encoding.py deleted file mode 100644 index e123a033..00000000 --- a/netlib/encoding.py +++ /dev/null @@ -1,175 +0,0 @@ -""" -Utility functions for decoding response bodies. -""" - -import codecs -import collections -from io import BytesIO - -import gzip -import zlib -import brotli - -from typing import Union - - -# We have a shared single-element cache for encoding and decoding. -# This is quite useful in practice, e.g. -# flow.request.content = flow.request.content.replace(b"foo", b"bar") -# does not require an .encode() call if content does not contain b"foo" -CachedDecode = collections.namedtuple("CachedDecode", "encoded encoding errors decoded") -_cache = CachedDecode(None, None, None, None) - - -def decode(encoded: Union[str, bytes], encoding: str, errors: str='strict') -> Union[str, bytes]: - """ - Decode the given input object - - Returns: - The decoded value - - Raises: - ValueError, if decoding fails. - """ - if len(encoded) == 0: - return encoded - - global _cache - cached = ( - isinstance(encoded, bytes) and - _cache.encoded == encoded and - _cache.encoding == encoding and - _cache.errors == errors - ) - if cached: - return _cache.decoded - try: - try: - decoded = custom_decode[encoding](encoded) - except KeyError: - decoded = codecs.decode(encoded, encoding, errors) - if encoding in ("gzip", "deflate", "br"): - _cache = CachedDecode(encoded, encoding, errors, decoded) - return decoded - except TypeError: - raise - except Exception as e: - raise ValueError("{} when decoding {} with {}: {}".format( - type(e).__name__, - repr(encoded)[:10], - repr(encoding), - repr(e), - )) - - -def encode(decoded: Union[str, bytes], encoding: str, errors: str='strict') -> Union[str, bytes]: - """ - Encode the given input object - - Returns: - The encoded value - - Raises: - ValueError, if encoding fails. - """ - if len(decoded) == 0: - return decoded - - global _cache - cached = ( - isinstance(decoded, bytes) and - _cache.decoded == decoded and - _cache.encoding == encoding and - _cache.errors == errors - ) - if cached: - return _cache.encoded - try: - try: - value = decoded - if isinstance(value, str): - value = decoded.encode() - encoded = custom_encode[encoding](value) - except KeyError: - encoded = codecs.encode(decoded, encoding, errors) - if encoding in ("gzip", "deflate", "br"): - _cache = CachedDecode(encoded, encoding, errors, decoded) - return encoded - except TypeError: - raise - except Exception as e: - raise ValueError("{} when encoding {} with {}: {}".format( - type(e).__name__, - repr(decoded)[:10], - repr(encoding), - repr(e), - )) - - -def identity(content): - """ - Returns content unchanged. Identity is the default value of - Accept-Encoding headers. - """ - return content - - -def decode_gzip(content): - gfile = gzip.GzipFile(fileobj=BytesIO(content)) - return gfile.read() - - -def encode_gzip(content): - s = BytesIO() - gf = gzip.GzipFile(fileobj=s, mode='wb') - gf.write(content) - gf.close() - return s.getvalue() - - -def decode_brotli(content): - return brotli.decompress(content) - - -def encode_brotli(content): - return brotli.compress(content) - - -def decode_deflate(content): - """ - Returns decompressed data for DEFLATE. Some servers may respond with - compressed data without a zlib header or checksum. An undocumented - feature of zlib permits the lenient decompression of data missing both - values. - - http://bugs.python.org/issue5784 - """ - try: - return zlib.decompress(content) - except zlib.error: - return zlib.decompress(content, -15) - - -def encode_deflate(content): - """ - Returns compressed content, always including zlib header and checksum. - """ - return zlib.compress(content) - - -custom_decode = { - "none": identity, - "identity": identity, - "gzip": decode_gzip, - "deflate": decode_deflate, - "br": decode_brotli, -} -custom_encode = { - "none": identity, - "identity": identity, - "gzip": encode_gzip, - "deflate": encode_deflate, - "br": encode_brotli, -} - -__all__ = ["encode", "decode"] diff --git a/netlib/http/encoding.py b/netlib/http/encoding.py new file mode 100644 index 00000000..e123a033 --- /dev/null +++ b/netlib/http/encoding.py @@ -0,0 +1,175 @@ +""" +Utility functions for decoding response bodies. +""" + +import codecs +import collections +from io import BytesIO + +import gzip +import zlib +import brotli + +from typing import Union + + +# We have a shared single-element cache for encoding and decoding. +# This is quite useful in practice, e.g. +# flow.request.content = flow.request.content.replace(b"foo", b"bar") +# does not require an .encode() call if content does not contain b"foo" +CachedDecode = collections.namedtuple("CachedDecode", "encoded encoding errors decoded") +_cache = CachedDecode(None, None, None, None) + + +def decode(encoded: Union[str, bytes], encoding: str, errors: str='strict') -> Union[str, bytes]: + """ + Decode the given input object + + Returns: + The decoded value + + Raises: + ValueError, if decoding fails. + """ + if len(encoded) == 0: + return encoded + + global _cache + cached = ( + isinstance(encoded, bytes) and + _cache.encoded == encoded and + _cache.encoding == encoding and + _cache.errors == errors + ) + if cached: + return _cache.decoded + try: + try: + decoded = custom_decode[encoding](encoded) + except KeyError: + decoded = codecs.decode(encoded, encoding, errors) + if encoding in ("gzip", "deflate", "br"): + _cache = CachedDecode(encoded, encoding, errors, decoded) + return decoded + except TypeError: + raise + except Exception as e: + raise ValueError("{} when decoding {} with {}: {}".format( + type(e).__name__, + repr(encoded)[:10], + repr(encoding), + repr(e), + )) + + +def encode(decoded: Union[str, bytes], encoding: str, errors: str='strict') -> Union[str, bytes]: + """ + Encode the given input object + + Returns: + The encoded value + + Raises: + ValueError, if encoding fails. + """ + if len(decoded) == 0: + return decoded + + global _cache + cached = ( + isinstance(decoded, bytes) and + _cache.decoded == decoded and + _cache.encoding == encoding and + _cache.errors == errors + ) + if cached: + return _cache.encoded + try: + try: + value = decoded + if isinstance(value, str): + value = decoded.encode() + encoded = custom_encode[encoding](value) + except KeyError: + encoded = codecs.encode(decoded, encoding, errors) + if encoding in ("gzip", "deflate", "br"): + _cache = CachedDecode(encoded, encoding, errors, decoded) + return encoded + except TypeError: + raise + except Exception as e: + raise ValueError("{} when encoding {} with {}: {}".format( + type(e).__name__, + repr(decoded)[:10], + repr(encoding), + repr(e), + )) + + +def identity(content): + """ + Returns content unchanged. Identity is the default value of + Accept-Encoding headers. + """ + return content + + +def decode_gzip(content): + gfile = gzip.GzipFile(fileobj=BytesIO(content)) + return gfile.read() + + +def encode_gzip(content): + s = BytesIO() + gf = gzip.GzipFile(fileobj=s, mode='wb') + gf.write(content) + gf.close() + return s.getvalue() + + +def decode_brotli(content): + return brotli.decompress(content) + + +def encode_brotli(content): + return brotli.compress(content) + + +def decode_deflate(content): + """ + Returns decompressed data for DEFLATE. Some servers may respond with + compressed data without a zlib header or checksum. An undocumented + feature of zlib permits the lenient decompression of data missing both + values. + + http://bugs.python.org/issue5784 + """ + try: + return zlib.decompress(content) + except zlib.error: + return zlib.decompress(content, -15) + + +def encode_deflate(content): + """ + Returns compressed content, always including zlib header and checksum. + """ + return zlib.compress(content) + + +custom_decode = { + "none": identity, + "identity": identity, + "gzip": decode_gzip, + "deflate": decode_deflate, + "br": decode_brotli, +} +custom_encode = { + "none": identity, + "identity": identity, + "gzip": encode_gzip, + "deflate": encode_deflate, + "br": encode_brotli, +} + +__all__ = ["encode", "decode"] diff --git a/netlib/http/message.py b/netlib/http/message.py index 62c3aa38..772a124e 100644 --- a/netlib/http/message.py +++ b/netlib/http/message.py @@ -3,7 +3,7 @@ import warnings from typing import Optional from mitmproxy.utils import strutils -from netlib import encoding +from netlib.http import encoding from mitmproxy.types import serializable from netlib.http import headers -- cgit v1.2.3