diff options
Diffstat (limited to 'netlib/http')
-rw-r--r-- | netlib/http/__init__.py | 15 | ||||
-rw-r--r-- | netlib/http/authentication.py | 176 | ||||
-rw-r--r-- | netlib/http/cookies.py | 384 | ||||
-rw-r--r-- | netlib/http/encoding.py | 175 | ||||
-rw-r--r-- | netlib/http/headers.py | 221 | ||||
-rw-r--r-- | netlib/http/http1/__init__.py | 24 | ||||
-rw-r--r-- | netlib/http/http1/assemble.py | 100 | ||||
-rw-r--r-- | netlib/http/http1/read.py | 377 | ||||
-rw-r--r-- | netlib/http/http2/__init__.py | 8 | ||||
-rw-r--r-- | netlib/http/http2/framereader.py | 25 | ||||
-rw-r--r-- | netlib/http/http2/utils.py | 37 | ||||
-rw-r--r-- | netlib/http/message.py | 300 | ||||
-rw-r--r-- | netlib/http/multipart.py | 32 | ||||
-rw-r--r-- | netlib/http/request.py | 405 | ||||
-rw-r--r-- | netlib/http/response.py | 192 | ||||
-rw-r--r-- | netlib/http/status_codes.py | 104 | ||||
-rw-r--r-- | netlib/http/url.py | 127 | ||||
-rw-r--r-- | netlib/http/user_agents.py | 50 |
18 files changed, 0 insertions, 2752 deletions
diff --git a/netlib/http/__init__.py b/netlib/http/__init__.py deleted file mode 100644 index 315f61ac..00000000 --- a/netlib/http/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -from netlib.http.request import Request -from netlib.http.response import Response -from netlib.http.message import Message -from netlib.http.headers import Headers, parse_content_type -from netlib.http.message import decoded -from netlib.http import http1, http2, status_codes, multipart - -__all__ = [ - "Request", - "Response", - "Message", - "Headers", "parse_content_type", - "decoded", - "http1", "http2", "status_codes", "multipart", -] diff --git a/netlib/http/authentication.py b/netlib/http/authentication.py deleted file mode 100644 index a65279e4..00000000 --- a/netlib/http/authentication.py +++ /dev/null @@ -1,176 +0,0 @@ -import argparse -import binascii - - -def parse_http_basic_auth(s): - words = s.split() - if len(words) != 2: - return None - scheme = words[0] - try: - user = binascii.a2b_base64(words[1]).decode("utf8", "replace") - except binascii.Error: - return None - parts = user.split(':') - if len(parts) != 2: - return None - return scheme, parts[0], parts[1] - - -def assemble_http_basic_auth(scheme, username, password): - v = binascii.b2a_base64((username + ":" + password).encode("utf8")).decode("ascii") - return scheme + " " + v - - -class NullProxyAuth: - - """ - No proxy auth at all (returns empty challange headers) - """ - - def __init__(self, password_manager): - self.password_manager = password_manager - - def clean(self, headers_): - """ - Clean up authentication headers, so they're not passed upstream. - """ - - def authenticate(self, headers_): - """ - Tests that the user is allowed to use the proxy - """ - return True - - def auth_challenge_headers(self): - """ - Returns a dictionary containing the headers require to challenge the user - """ - return {} - - -class BasicAuth(NullProxyAuth): - CHALLENGE_HEADER = None - AUTH_HEADER = None - - def __init__(self, password_manager, realm): - NullProxyAuth.__init__(self, password_manager) - self.realm = realm - - def clean(self, headers): - del headers[self.AUTH_HEADER] - - def authenticate(self, headers): - auth_value = headers.get(self.AUTH_HEADER) - if not auth_value: - return False - parts = parse_http_basic_auth(auth_value) - if not parts: - return False - scheme, username, password = parts - if scheme.lower() != 'basic': - return False - if not self.password_manager.test(username, password): - return False - self.username = username - return True - - def auth_challenge_headers(self): - return {self.CHALLENGE_HEADER: 'Basic realm="%s"' % self.realm} - - -class BasicWebsiteAuth(BasicAuth): - CHALLENGE_HEADER = 'WWW-Authenticate' - AUTH_HEADER = 'Authorization' - - -class BasicProxyAuth(BasicAuth): - CHALLENGE_HEADER = 'Proxy-Authenticate' - AUTH_HEADER = 'Proxy-Authorization' - - -class PassMan: - - def test(self, username_, password_token_): - return False - - -class PassManNonAnon(PassMan): - - """ - Ensure the user specifies a username, accept any password. - """ - - def test(self, username, password_token_): - if username: - return True - return False - - -class PassManHtpasswd(PassMan): - - """ - Read usernames and passwords from an htpasswd file - """ - - def __init__(self, path): - """ - Raises ValueError if htpasswd file is invalid. - """ - import passlib.apache - self.htpasswd = passlib.apache.HtpasswdFile(path) - - def test(self, username, password_token): - return bool(self.htpasswd.check_password(username, password_token)) - - -class PassManSingleUser(PassMan): - - def __init__(self, username, password): - self.username, self.password = username, password - - def test(self, username, password_token): - return self.username == username and self.password == password_token - - -class AuthAction(argparse.Action): - - """ - Helper class to allow seamless integration int argparse. Example usage: - parser.add_argument( - "--nonanonymous", - action=NonanonymousAuthAction, nargs=0, - help="Allow access to any user long as a credentials are specified." - ) - """ - - def __call__(self, parser, namespace, values, option_string=None): - passman = self.getPasswordManager(values) - authenticator = BasicProxyAuth(passman, "mitmproxy") - setattr(namespace, self.dest, authenticator) - - def getPasswordManager(self, s): # pragma: no cover - raise NotImplementedError() - - -class SingleuserAuthAction(AuthAction): - - def getPasswordManager(self, s): - if len(s.split(':')) != 2: - raise argparse.ArgumentTypeError( - "Invalid single-user specification. Please use the format username:password" - ) - username, password = s.split(':') - return PassManSingleUser(username, password) - - -class NonanonymousAuthAction(AuthAction): - - def getPasswordManager(self, s): - return PassManNonAnon() - - -class HtpasswdAuthAction(AuthAction): - - def getPasswordManager(self, s): - return PassManHtpasswd(s) diff --git a/netlib/http/cookies.py b/netlib/http/cookies.py deleted file mode 100644 index 9f32fa5e..00000000 --- a/netlib/http/cookies.py +++ /dev/null @@ -1,384 +0,0 @@ -import collections -import email.utils -import re -import time - -from mitmproxy.types import multidict - -""" -A flexible module for cookie parsing and manipulation. - -This module differs from usual standards-compliant cookie modules in a number -of ways. We try to be as permissive as possible, and to retain even mal-formed -information. Duplicate cookies are preserved in parsing, and can be set in -formatting. We do attempt to escape and quote values where needed, but will not -reject data that violate the specs. - -Parsing accepts the formats in RFC6265 and partially RFC2109 and RFC2965. We -also parse the comma-separated variant of Set-Cookie that allows multiple -cookies to be set in a single header. Serialization follows RFC6265. - - http://tools.ietf.org/html/rfc6265 - http://tools.ietf.org/html/rfc2109 - http://tools.ietf.org/html/rfc2965 -""" - -_cookie_params = set(( - 'expires', 'path', 'comment', 'max-age', - 'secure', 'httponly', 'version', -)) - -ESCAPE = re.compile(r"([\"\\])") - - -class CookieAttrs(multidict.ImmutableMultiDict): - @staticmethod - def _kconv(key): - return key.lower() - - @staticmethod - def _reduce_values(values): - # See the StickyCookieTest for a weird cookie that only makes sense - # if we take the last part. - return values[-1] - -SetCookie = collections.namedtuple("SetCookie", ["value", "attrs"]) - - -def _read_until(s, start, term): - """ - Read until one of the characters in term is reached. - """ - if start == len(s): - return "", start + 1 - for i in range(start, len(s)): - if s[i] in term: - return s[start:i], i - return s[start:i + 1], i + 1 - - -def _read_quoted_string(s, start): - """ - start: offset to the first quote of the string to be read - - A sort of loose super-set of the various quoted string specifications. - - RFC6265 disallows backslashes or double quotes within quoted strings. - Prior RFCs use backslashes to escape. This leaves us free to apply - backslash escaping by default and be compatible with everything. - """ - escaping = False - ret = [] - # Skip the first quote - i = start # initialize in case the loop doesn't run. - for i in range(start + 1, len(s)): - if escaping: - ret.append(s[i]) - escaping = False - elif s[i] == '"': - break - elif s[i] == "\\": - escaping = True - else: - ret.append(s[i]) - return "".join(ret), i + 1 - - -def _read_key(s, start, delims=";="): - """ - Read a key - the LHS of a token/value pair in a cookie. - """ - return _read_until(s, start, delims) - - -def _read_value(s, start, delims): - """ - Reads a value - the RHS of a token/value pair in a cookie. - """ - if start >= len(s): - return "", start - elif s[start] == '"': - return _read_quoted_string(s, start) - else: - return _read_until(s, start, delims) - - -def _read_cookie_pairs(s, off=0): - """ - Read pairs of lhs=rhs values from Cookie headers. - - off: start offset - """ - pairs = [] - - while True: - lhs, off = _read_key(s, off) - lhs = lhs.lstrip() - - if lhs: - rhs = None - if off < len(s) and s[off] == "=": - rhs, off = _read_value(s, off + 1, ";") - - pairs.append([lhs, rhs]) - - off += 1 - - if not off < len(s): - break - - return pairs, off - - -def _read_set_cookie_pairs(s, off=0): - """ - Read pairs of lhs=rhs values from SetCookie headers while handling multiple cookies. - - off: start offset - specials: attributes that are treated specially - """ - cookies = [] - pairs = [] - - while True: - lhs, off = _read_key(s, off, ";=,") - lhs = lhs.lstrip() - - if lhs: - rhs = None - if off < len(s) and s[off] == "=": - rhs, off = _read_value(s, off + 1, ";,") - - # Special handliing of attributes - if lhs.lower() == "expires": - # 'expires' values can contain commas in them so they need to - # be handled separately. - - # We actually bank on the fact that the expires value WILL - # contain a comma. Things will fail, if they don't. - - # '3' is just a heuristic we use to determine whether we've - # only read a part of the expires value and we should read more. - if len(rhs) <= 3: - trail, off = _read_value(s, off + 1, ";,") - rhs = rhs + "," + trail - - pairs.append([lhs, rhs]) - - # comma marks the beginning of a new cookie - if off < len(s) and s[off] == ",": - cookies.append(pairs) - pairs = [] - - off += 1 - - if not off < len(s): - break - - if pairs or not cookies: - cookies.append(pairs) - - return cookies, off - - -def _has_special(s): - for i in s: - if i in '",;\\': - return True - o = ord(i) - if o < 0x21 or o > 0x7e: - return True - return False - - -def _format_pairs(pairs, specials=(), sep="; "): - """ - specials: A lower-cased list of keys that will not be quoted. - """ - vals = [] - for k, v in pairs: - if v is None: - vals.append(k) - else: - if k.lower() not in specials and _has_special(v): - v = ESCAPE.sub(r"\\\1", v) - v = '"%s"' % v - vals.append("%s=%s" % (k, v)) - return sep.join(vals) - - -def _format_set_cookie_pairs(lst): - return _format_pairs( - lst, - specials=("expires", "path") - ) - - -def parse_cookie_header(line): - """ - Parse a Cookie header value. - Returns a list of (lhs, rhs) tuples. - """ - pairs, off_ = _read_cookie_pairs(line) - return pairs - - -def parse_cookie_headers(cookie_headers): - cookie_list = [] - for header in cookie_headers: - cookie_list.extend(parse_cookie_header(header)) - return cookie_list - - -def format_cookie_header(lst): - """ - Formats a Cookie header value. - """ - return _format_pairs(lst) - - -def parse_set_cookie_header(line): - """ - Parse a Set-Cookie header value - - Returns a list of (name, value, attrs) tuples, where attrs is a - CookieAttrs dict of attributes. No attempt is made to parse attribute - values - they are treated purely as strings. - """ - cookie_pairs, off = _read_set_cookie_pairs(line) - cookies = [ - (pairs[0][0], pairs[0][1], CookieAttrs(tuple(x) for x in pairs[1:])) - for pairs in cookie_pairs if pairs - ] - return cookies - - -def parse_set_cookie_headers(headers): - rv = [] - for header in headers: - cookies = parse_set_cookie_header(header) - if cookies: - for name, value, attrs in cookies: - rv.append((name, SetCookie(value, attrs))) - return rv - - -def format_set_cookie_header(set_cookies): - """ - Formats a Set-Cookie header value. - """ - - rv = [] - - for set_cookie in set_cookies: - name, value, attrs = set_cookie - - pairs = [(name, value)] - pairs.extend( - attrs.fields if hasattr(attrs, "fields") else attrs - ) - - rv.append(_format_set_cookie_pairs(pairs)) - - return ", ".join(rv) - - -def refresh_set_cookie_header(c, delta): - """ - Args: - c: A Set-Cookie string - delta: Time delta in seconds - Returns: - A refreshed Set-Cookie string - """ - - name, value, attrs = parse_set_cookie_header(c)[0] - if not name or not value: - raise ValueError("Invalid Cookie") - - if "expires" in attrs: - e = email.utils.parsedate_tz(attrs["expires"]) - if e: - f = email.utils.mktime_tz(e) + delta - attrs = attrs.with_set_all("expires", [email.utils.formatdate(f)]) - else: - # This can happen when the expires tag is invalid. - # reddit.com sends a an expires tag like this: "Thu, 31 Dec - # 2037 23:59:59 GMT", which is valid RFC 1123, but not - # strictly correct according to the cookie spec. Browsers - # appear to parse this tolerantly - maybe we should too. - # For now, we just ignore this. - attrs = attrs.with_delitem("expires") - - rv = format_set_cookie_header([(name, value, attrs)]) - if not rv: - raise ValueError("Invalid Cookie") - return rv - - -def get_expiration_ts(cookie_attrs): - """ - Determines the time when the cookie will be expired. - - Considering both 'expires' and 'max-age' parameters. - - Returns: timestamp of when the cookie will expire. - None, if no expiration time is set. - """ - if 'expires' in cookie_attrs: - e = email.utils.parsedate_tz(cookie_attrs["expires"]) - if e: - return email.utils.mktime_tz(e) - - elif 'max-age' in cookie_attrs: - try: - max_age = int(cookie_attrs['Max-Age']) - except ValueError: - pass - else: - now_ts = time.time() - return now_ts + max_age - - return None - - -def is_expired(cookie_attrs): - """ - Determines whether a cookie has expired. - - Returns: boolean - """ - - exp_ts = get_expiration_ts(cookie_attrs) - now_ts = time.time() - - # If no expiration information was provided with the cookie - if exp_ts is None: - return False - else: - return exp_ts <= now_ts - - -def group_cookies(pairs): - """ - Converts a list of pairs to a (name, value, attrs) for each cookie. - """ - - if not pairs: - return [] - - cookie_list = [] - - # First pair is always a new cookie - name, value = pairs[0] - attrs = [] - - for k, v in pairs[1:]: - if k.lower() in _cookie_params: - attrs.append((k, v)) - else: - cookie_list.append((name, value, CookieAttrs(attrs))) - name, value, attrs = k, v, [] - - cookie_list.append((name, value, CookieAttrs(attrs))) - return cookie_list diff --git a/netlib/http/encoding.py b/netlib/http/encoding.py deleted file mode 100644 index e123a033..00000000 --- a/netlib/http/encoding.py +++ /dev/null @@ -1,175 +0,0 @@ -""" -Utility functions for decoding response bodies. -""" - -import codecs -import collections -from io import BytesIO - -import gzip -import zlib -import brotli - -from typing import Union - - -# We have a shared single-element cache for encoding and decoding. -# This is quite useful in practice, e.g. -# flow.request.content = flow.request.content.replace(b"foo", b"bar") -# does not require an .encode() call if content does not contain b"foo" -CachedDecode = collections.namedtuple("CachedDecode", "encoded encoding errors decoded") -_cache = CachedDecode(None, None, None, None) - - -def decode(encoded: Union[str, bytes], encoding: str, errors: str='strict') -> Union[str, bytes]: - """ - Decode the given input object - - Returns: - The decoded value - - Raises: - ValueError, if decoding fails. - """ - if len(encoded) == 0: - return encoded - - global _cache - cached = ( - isinstance(encoded, bytes) and - _cache.encoded == encoded and - _cache.encoding == encoding and - _cache.errors == errors - ) - if cached: - return _cache.decoded - try: - try: - decoded = custom_decode[encoding](encoded) - except KeyError: - decoded = codecs.decode(encoded, encoding, errors) - if encoding in ("gzip", "deflate", "br"): - _cache = CachedDecode(encoded, encoding, errors, decoded) - return decoded - except TypeError: - raise - except Exception as e: - raise ValueError("{} when decoding {} with {}: {}".format( - type(e).__name__, - repr(encoded)[:10], - repr(encoding), - repr(e), - )) - - -def encode(decoded: Union[str, bytes], encoding: str, errors: str='strict') -> Union[str, bytes]: - """ - Encode the given input object - - Returns: - The encoded value - - Raises: - ValueError, if encoding fails. - """ - if len(decoded) == 0: - return decoded - - global _cache - cached = ( - isinstance(decoded, bytes) and - _cache.decoded == decoded and - _cache.encoding == encoding and - _cache.errors == errors - ) - if cached: - return _cache.encoded - try: - try: - value = decoded - if isinstance(value, str): - value = decoded.encode() - encoded = custom_encode[encoding](value) - except KeyError: - encoded = codecs.encode(decoded, encoding, errors) - if encoding in ("gzip", "deflate", "br"): - _cache = CachedDecode(encoded, encoding, errors, decoded) - return encoded - except TypeError: - raise - except Exception as e: - raise ValueError("{} when encoding {} with {}: {}".format( - type(e).__name__, - repr(decoded)[:10], - repr(encoding), - repr(e), - )) - - -def identity(content): - """ - Returns content unchanged. Identity is the default value of - Accept-Encoding headers. - """ - return content - - -def decode_gzip(content): - gfile = gzip.GzipFile(fileobj=BytesIO(content)) - return gfile.read() - - -def encode_gzip(content): - s = BytesIO() - gf = gzip.GzipFile(fileobj=s, mode='wb') - gf.write(content) - gf.close() - return s.getvalue() - - -def decode_brotli(content): - return brotli.decompress(content) - - -def encode_brotli(content): - return brotli.compress(content) - - -def decode_deflate(content): - """ - Returns decompressed data for DEFLATE. Some servers may respond with - compressed data without a zlib header or checksum. An undocumented - feature of zlib permits the lenient decompression of data missing both - values. - - http://bugs.python.org/issue5784 - """ - try: - return zlib.decompress(content) - except zlib.error: - return zlib.decompress(content, -15) - - -def encode_deflate(content): - """ - Returns compressed content, always including zlib header and checksum. - """ - return zlib.compress(content) - - -custom_decode = { - "none": identity, - "identity": identity, - "gzip": decode_gzip, - "deflate": decode_deflate, - "br": decode_brotli, -} -custom_encode = { - "none": identity, - "identity": identity, - "gzip": encode_gzip, - "deflate": encode_deflate, - "br": encode_brotli, -} - -__all__ = ["encode", "decode"] diff --git a/netlib/http/headers.py b/netlib/http/headers.py deleted file mode 100644 index 8fc0cd43..00000000 --- a/netlib/http/headers.py +++ /dev/null @@ -1,221 +0,0 @@ -import re - -import collections -from mitmproxy.types import multidict -from mitmproxy.utils import strutils - -# See also: http://lucumr.pocoo.org/2013/7/2/the-updated-guide-to-unicode/ - - -# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded. -def _native(x): - return x.decode("utf-8", "surrogateescape") - - -def _always_bytes(x): - return strutils.always_bytes(x, "utf-8", "surrogateescape") - - -class Headers(multidict.MultiDict): - """ - Header class which allows both convenient access to individual headers as well as - direct access to the underlying raw data. Provides a full dictionary interface. - - Example: - - .. code-block:: python - - # Create headers with keyword arguments - >>> h = Headers(host="example.com", content_type="application/xml") - - # Headers mostly behave like a normal dict. - >>> h["Host"] - "example.com" - - # HTTP Headers are case insensitive - >>> h["host"] - "example.com" - - # Headers can also be created from a list of raw (header_name, header_value) byte tuples - >>> h = Headers([ - (b"Host",b"example.com"), - (b"Accept",b"text/html"), - (b"accept",b"application/xml") - ]) - - # Multiple headers are folded into a single header as per RFC7230 - >>> h["Accept"] - "text/html, application/xml" - - # Setting a header removes all existing headers with the same name. - >>> h["Accept"] = "application/text" - >>> h["Accept"] - "application/text" - - # bytes(h) returns a HTTP1 header block. - >>> print(bytes(h)) - Host: example.com - Accept: application/text - - # For full control, the raw header fields can be accessed - >>> h.fields - - Caveats: - For use with the "Set-Cookie" header, see :py:meth:`get_all`. - """ - - def __init__(self, fields=(), **headers): - """ - Args: - fields: (optional) list of ``(name, value)`` header byte tuples, - e.g. ``[(b"Host", b"example.com")]``. All names and values must be bytes. - **headers: Additional headers to set. Will overwrite existing values from `fields`. - For convenience, underscores in header names will be transformed to dashes - - this behaviour does not extend to other methods. - If ``**headers`` contains multiple keys that have equal ``.lower()`` s, - the behavior is undefined. - """ - super().__init__(fields) - - for key, value in self.fields: - if not isinstance(key, bytes) or not isinstance(value, bytes): - raise TypeError("Header fields must be bytes.") - - # content_type -> content-type - headers = { - _always_bytes(name).replace(b"_", b"-"): _always_bytes(value) - for name, value in headers.items() - } - self.update(headers) - - @staticmethod - def _reduce_values(values): - # Headers can be folded - return ", ".join(values) - - @staticmethod - def _kconv(key): - # Headers are case-insensitive - return key.lower() - - def __bytes__(self): - if self.fields: - return b"\r\n".join(b": ".join(field) for field in self.fields) + b"\r\n" - else: - return b"" - - def __delitem__(self, key): - key = _always_bytes(key) - super().__delitem__(key) - - def __iter__(self): - for x in super().__iter__(): - yield _native(x) - - def get_all(self, name): - """ - Like :py:meth:`get`, but does not fold multiple headers into a single one. - This is useful for Set-Cookie headers, which do not support folding. - See also: https://tools.ietf.org/html/rfc7230#section-3.2.2 - """ - name = _always_bytes(name) - return [ - _native(x) for x in - super().get_all(name) - ] - - def set_all(self, name, values): - """ - Explicitly set multiple headers for the given key. - See: :py:meth:`get_all` - """ - name = _always_bytes(name) - values = [_always_bytes(x) for x in values] - return super().set_all(name, values) - - def insert(self, index, key, value): - key = _always_bytes(key) - value = _always_bytes(value) - super().insert(index, key, value) - - def items(self, multi=False): - if multi: - return ( - (_native(k), _native(v)) - for k, v in self.fields - ) - else: - return super().items() - - def replace(self, pattern, repl, flags=0, count=0): - """ - Replaces a regular expression pattern with repl in each "name: value" - header line. - - Returns: - The number of replacements made. - """ - if isinstance(pattern, str): - pattern = strutils.escaped_str_to_bytes(pattern) - if isinstance(repl, str): - repl = strutils.escaped_str_to_bytes(repl) - pattern = re.compile(pattern, flags) - replacements = 0 - flag_count = count > 0 - fields = [] - for name, value in self.fields: - line, n = pattern.subn(repl, name + b": " + value, count=count) - try: - name, value = line.split(b": ", 1) - except ValueError: - # We get a ValueError if the replacement removed the ": " - # There's not much we can do about this, so we just keep the header as-is. - pass - else: - replacements += n - if flag_count: - count -= n - if count == 0: - break - fields.append((name, value)) - self.fields = tuple(fields) - return replacements - - -def parse_content_type(c): - """ - A simple parser for content-type values. Returns a (type, subtype, - parameters) tuple, where type and subtype are strings, and parameters - is a dict. If the string could not be parsed, return None. - - E.g. the following string: - - text/html; charset=UTF-8 - - Returns: - - ("text", "html", {"charset": "UTF-8"}) - """ - parts = c.split(";", 1) - ts = parts[0].split("/", 1) - if len(ts) != 2: - return None - d = collections.OrderedDict() - if len(parts) == 2: - for i in parts[1].split(";"): - clause = i.split("=", 1) - if len(clause) == 2: - d[clause[0].strip()] = clause[1].strip() - return ts[0].lower(), ts[1].lower(), d - - -def assemble_content_type(type, subtype, parameters): - if not parameters: - return "{}/{}".format(type, subtype) - params = "; ".join( - "{}={}".format(k, v) - for k, v in parameters.items() - ) - return "{}/{}; {}".format( - type, subtype, params - ) diff --git a/netlib/http/http1/__init__.py b/netlib/http/http1/__init__.py deleted file mode 100644 index e4bf01c5..00000000 --- a/netlib/http/http1/__init__.py +++ /dev/null @@ -1,24 +0,0 @@ -from .read import ( - read_request, read_request_head, - read_response, read_response_head, - read_body, - connection_close, - expected_http_body_size, -) -from .assemble import ( - assemble_request, assemble_request_head, - assemble_response, assemble_response_head, - assemble_body, -) - - -__all__ = [ - "read_request", "read_request_head", - "read_response", "read_response_head", - "read_body", - "connection_close", - "expected_http_body_size", - "assemble_request", "assemble_request_head", - "assemble_response", "assemble_response_head", - "assemble_body", -] diff --git a/netlib/http/http1/assemble.py b/netlib/http/http1/assemble.py deleted file mode 100644 index e0a91ad8..00000000 --- a/netlib/http/http1/assemble.py +++ /dev/null @@ -1,100 +0,0 @@ -import netlib.http.url -from mitmproxy import exceptions - - -def assemble_request(request): - if request.data.content is None: - raise exceptions.HttpException("Cannot assemble flow with missing content") - head = assemble_request_head(request) - body = b"".join(assemble_body(request.data.headers, [request.data.content])) - return head + body - - -def assemble_request_head(request): - first_line = _assemble_request_line(request.data) - headers = _assemble_request_headers(request.data) - return b"%s\r\n%s\r\n" % (first_line, headers) - - -def assemble_response(response): - if response.data.content is None: - raise exceptions.HttpException("Cannot assemble flow with missing content") - head = assemble_response_head(response) - body = b"".join(assemble_body(response.data.headers, [response.data.content])) - return head + body - - -def assemble_response_head(response): - first_line = _assemble_response_line(response.data) - headers = _assemble_response_headers(response.data) - return b"%s\r\n%s\r\n" % (first_line, headers) - - -def assemble_body(headers, body_chunks): - if "chunked" in headers.get("transfer-encoding", "").lower(): - for chunk in body_chunks: - if chunk: - yield b"%x\r\n%s\r\n" % (len(chunk), chunk) - yield b"0\r\n\r\n" - else: - for chunk in body_chunks: - yield chunk - - -def _assemble_request_line(request_data): - """ - Args: - request_data (netlib.http.request.RequestData) - """ - form = request_data.first_line_format - if form == "relative": - return b"%s %s %s" % ( - request_data.method, - request_data.path, - request_data.http_version - ) - elif form == "authority": - return b"%s %s:%d %s" % ( - request_data.method, - request_data.host, - request_data.port, - request_data.http_version - ) - elif form == "absolute": - return b"%s %s://%s:%d%s %s" % ( - request_data.method, - request_data.scheme, - request_data.host, - request_data.port, - request_data.path, - request_data.http_version - ) - else: - raise RuntimeError("Invalid request form") - - -def _assemble_request_headers(request_data): - """ - Args: - request_data (netlib.http.request.RequestData) - """ - headers = request_data.headers.copy() - if "host" not in headers and request_data.scheme and request_data.host and request_data.port: - headers["host"] = netlib.http.url.hostport( - request_data.scheme, - request_data.host, - request_data.port - ) - return bytes(headers) - - -def _assemble_response_line(response_data): - return b"%s %d %s" % ( - response_data.http_version, - response_data.status_code, - response_data.reason, - ) - - -def _assemble_response_headers(response): - return bytes(response.headers) diff --git a/netlib/http/http1/read.py b/netlib/http/http1/read.py deleted file mode 100644 index e6b22863..00000000 --- a/netlib/http/http1/read.py +++ /dev/null @@ -1,377 +0,0 @@ -import time -import sys -import re - -from netlib.http import request -from netlib.http import response -from netlib.http import headers -from netlib.http import url -from netlib import check -from mitmproxy import exceptions - - -def get_header_tokens(headers, key): - """ - Retrieve all tokens for a header key. A number of different headers - follow a pattern where each header line can containe comma-separated - tokens, and headers can be set multiple times. - """ - if key not in headers: - return [] - tokens = headers[key].split(",") - return [token.strip() for token in tokens] - - -def read_request(rfile, body_size_limit=None): - request = read_request_head(rfile) - expected_body_size = expected_http_body_size(request) - request.data.content = b"".join(read_body(rfile, expected_body_size, limit=body_size_limit)) - request.timestamp_end = time.time() - return request - - -def read_request_head(rfile): - """ - Parse an HTTP request head (request line + headers) from an input stream - - Args: - rfile: The input stream - - Returns: - The HTTP request object (without body) - - Raises: - exceptions.HttpReadDisconnect: No bytes can be read from rfile. - exceptions.HttpSyntaxException: The input is malformed HTTP. - exceptions.HttpException: Any other error occured. - """ - timestamp_start = time.time() - if hasattr(rfile, "reset_timestamps"): - rfile.reset_timestamps() - - form, method, scheme, host, port, path, http_version = _read_request_line(rfile) - headers = _read_headers(rfile) - - if hasattr(rfile, "first_byte_timestamp"): - # more accurate timestamp_start - timestamp_start = rfile.first_byte_timestamp - - return request.Request( - form, method, scheme, host, port, path, http_version, headers, None, timestamp_start - ) - - -def read_response(rfile, request, body_size_limit=None): - response = read_response_head(rfile) - expected_body_size = expected_http_body_size(request, response) - response.data.content = b"".join(read_body(rfile, expected_body_size, body_size_limit)) - response.timestamp_end = time.time() - return response - - -def read_response_head(rfile): - """ - Parse an HTTP response head (response line + headers) from an input stream - - Args: - rfile: The input stream - - Returns: - The HTTP request object (without body) - - Raises: - exceptions.HttpReadDisconnect: No bytes can be read from rfile. - exceptions.HttpSyntaxException: The input is malformed HTTP. - exceptions.HttpException: Any other error occured. - """ - - timestamp_start = time.time() - if hasattr(rfile, "reset_timestamps"): - rfile.reset_timestamps() - - http_version, status_code, message = _read_response_line(rfile) - headers = _read_headers(rfile) - - if hasattr(rfile, "first_byte_timestamp"): - # more accurate timestamp_start - timestamp_start = rfile.first_byte_timestamp - - return response.Response(http_version, status_code, message, headers, None, timestamp_start) - - -def read_body(rfile, expected_size, limit=None, max_chunk_size=4096): - """ - Read an HTTP message body - - Args: - rfile: The input stream - expected_size: The expected body size (see :py:meth:`expected_body_size`) - limit: Maximum body size - max_chunk_size: Maximium chunk size that gets yielded - - Returns: - A generator that yields byte chunks of the content. - - Raises: - exceptions.HttpException, if an error occurs - - Caveats: - max_chunk_size is not considered if the transfer encoding is chunked. - """ - if not limit or limit < 0: - limit = sys.maxsize - if not max_chunk_size: - max_chunk_size = limit - - if expected_size is None: - for x in _read_chunked(rfile, limit): - yield x - elif expected_size >= 0: - if limit is not None and expected_size > limit: - raise exceptions.HttpException( - "HTTP Body too large. " - "Limit is {}, content length was advertised as {}".format(limit, expected_size) - ) - bytes_left = expected_size - while bytes_left: - chunk_size = min(bytes_left, max_chunk_size) - content = rfile.read(chunk_size) - if len(content) < chunk_size: - raise exceptions.HttpException("Unexpected EOF") - yield content - bytes_left -= chunk_size - else: - bytes_left = limit - while bytes_left: - chunk_size = min(bytes_left, max_chunk_size) - content = rfile.read(chunk_size) - if not content: - return - yield content - bytes_left -= chunk_size - not_done = rfile.read(1) - if not_done: - raise exceptions.HttpException("HTTP body too large. Limit is {}.".format(limit)) - - -def connection_close(http_version, headers): - """ - Checks the message to see if the client connection should be closed - according to RFC 2616 Section 8.1. - """ - # At first, check if we have an explicit Connection header. - if "connection" in headers: - tokens = get_header_tokens(headers, "connection") - if "close" in tokens: - return True - elif "keep-alive" in tokens: - return False - - # If we don't have a Connection header, HTTP 1.1 connections are assumed to - # be persistent - return http_version != "HTTP/1.1" and http_version != b"HTTP/1.1" # FIXME: Remove one case. - - -def expected_http_body_size(request, response=None): - """ - Returns: - The expected body length: - - a positive integer, if the size is known in advance - - None, if the size in unknown in advance (chunked encoding) - - -1, if all data should be read until end of stream. - - Raises: - exceptions.HttpSyntaxException, if the content length header is invalid - """ - # Determine response size according to - # http://tools.ietf.org/html/rfc7230#section-3.3 - if not response: - headers = request.headers - response_code = None - is_request = True - else: - headers = response.headers - response_code = response.status_code - is_request = False - - if is_request: - if headers.get("expect", "").lower() == "100-continue": - return 0 - else: - if request.method.upper() == "HEAD": - return 0 - if 100 <= response_code <= 199: - return 0 - if response_code == 200 and request.method.upper() == "CONNECT": - return 0 - if response_code in (204, 304): - return 0 - - if "chunked" in headers.get("transfer-encoding", "").lower(): - return None - if "content-length" in headers: - try: - size = int(headers["content-length"]) - if size < 0: - raise ValueError() - return size - except ValueError: - raise exceptions.HttpSyntaxException("Unparseable Content Length") - if is_request: - return 0 - return -1 - - -def _get_first_line(rfile): - try: - line = rfile.readline() - if line == b"\r\n" or line == b"\n": - # Possible leftover from previous message - line = rfile.readline() - except exceptions.TcpDisconnect: - raise exceptions.HttpReadDisconnect("Remote disconnected") - if not line: - raise exceptions.HttpReadDisconnect("Remote disconnected") - return line.strip() - - -def _read_request_line(rfile): - try: - line = _get_first_line(rfile) - except exceptions.HttpReadDisconnect: - # We want to provide a better error message. - raise exceptions.HttpReadDisconnect("Client disconnected") - - try: - method, path, http_version = line.split() - - if path == b"*" or path.startswith(b"/"): - form = "relative" - scheme, host, port = None, None, None - elif method == b"CONNECT": - form = "authority" - host, port = _parse_authority_form(path) - scheme, path = None, None - else: - form = "absolute" - scheme, host, port, path = url.parse(path) - - _check_http_version(http_version) - except ValueError: - raise exceptions.HttpSyntaxException("Bad HTTP request line: {}".format(line)) - - return form, method, scheme, host, port, path, http_version - - -def _parse_authority_form(hostport): - """ - Returns (host, port) if hostport is a valid authority-form host specification. - http://tools.ietf.org/html/draft-luotonen-web-proxy-tunneling-01 section 3.1 - - Raises: - ValueError, if the input is malformed - """ - try: - host, port = hostport.split(b":") - port = int(port) - if not check.is_valid_host(host) or not check.is_valid_port(port): - raise ValueError() - except ValueError: - raise exceptions.HttpSyntaxException("Invalid host specification: {}".format(hostport)) - - return host, port - - -def _read_response_line(rfile): - try: - line = _get_first_line(rfile) - except exceptions.HttpReadDisconnect: - # We want to provide a better error message. - raise exceptions.HttpReadDisconnect("Server disconnected") - - try: - parts = line.split(None, 2) - if len(parts) == 2: # handle missing message gracefully - parts.append(b"") - - http_version, status_code, message = parts - status_code = int(status_code) - _check_http_version(http_version) - - except ValueError: - raise exceptions.HttpSyntaxException("Bad HTTP response line: {}".format(line)) - - return http_version, status_code, message - - -def _check_http_version(http_version): - if not re.match(br"^HTTP/\d\.\d$", http_version): - raise exceptions.HttpSyntaxException("Unknown HTTP version: {}".format(http_version)) - - -def _read_headers(rfile): - """ - Read a set of headers. - Stop once a blank line is reached. - - Returns: - A headers object - - Raises: - exceptions.HttpSyntaxException - """ - ret = [] - while True: - line = rfile.readline() - if not line or line == b"\r\n" or line == b"\n": - break - if line[0] in b" \t": - if not ret: - raise exceptions.HttpSyntaxException("Invalid headers") - # continued header - ret[-1] = (ret[-1][0], ret[-1][1] + b'\r\n ' + line.strip()) - else: - try: - name, value = line.split(b":", 1) - value = value.strip() - if not name: - raise ValueError() - ret.append((name, value)) - except ValueError: - raise exceptions.HttpSyntaxException( - "Invalid header line: %s" % repr(line) - ) - return headers.Headers(ret) - - -def _read_chunked(rfile, limit=sys.maxsize): - """ - Read a HTTP body with chunked transfer encoding. - - Args: - rfile: the input file - limit: A positive integer - """ - total = 0 - while True: - line = rfile.readline(128) - if line == b"": - raise exceptions.HttpException("Connection closed prematurely") - if line != b"\r\n" and line != b"\n": - try: - length = int(line, 16) - except ValueError: - raise exceptions.HttpSyntaxException("Invalid chunked encoding length: {}".format(line)) - total += length - if total > limit: - raise exceptions.HttpException( - "HTTP Body too large. Limit is {}, " - "chunked content longer than {}".format(limit, total) - ) - chunk = rfile.read(length) - suffix = rfile.readline(5) - if suffix != b"\r\n": - raise exceptions.HttpSyntaxException("Malformed chunked body") - if length == 0: - return - yield chunk diff --git a/netlib/http/http2/__init__.py b/netlib/http/http2/__init__.py deleted file mode 100644 index 20cc63a0..00000000 --- a/netlib/http/http2/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -from netlib.http.http2.framereader import read_raw_frame, parse_frame -from netlib.http.http2.utils import parse_headers - -__all__ = [ - "read_raw_frame", - "parse_frame", - "parse_headers", -] diff --git a/netlib/http/http2/framereader.py b/netlib/http/http2/framereader.py deleted file mode 100644 index 6a164919..00000000 --- a/netlib/http/http2/framereader.py +++ /dev/null @@ -1,25 +0,0 @@ -import codecs - -import hyperframe -from mitmproxy import exceptions - - -def read_raw_frame(rfile): - header = rfile.safe_read(9) - length = int(codecs.encode(header[:3], 'hex_codec'), 16) - - if length == 4740180: - raise exceptions.HttpException("Length field looks more like HTTP/1.1:\n{}".format(rfile.read(-1))) - - body = rfile.safe_read(length) - return [header, body] - - -def parse_frame(header, body=None): - if body is None: - body = header[9:] - header = header[:9] - - frame, length = hyperframe.frame.Frame.parse_frame_header(header) - frame.parse_body(memoryview(body)) - return frame diff --git a/netlib/http/http2/utils.py b/netlib/http/http2/utils.py deleted file mode 100644 index 164bacc8..00000000 --- a/netlib/http/http2/utils.py +++ /dev/null @@ -1,37 +0,0 @@ -from netlib.http import url - - -def parse_headers(headers): - authority = headers.get(':authority', '').encode() - method = headers.get(':method', 'GET').encode() - scheme = headers.get(':scheme', 'https').encode() - path = headers.get(':path', '/').encode() - - headers.pop(":method", None) - headers.pop(":scheme", None) - headers.pop(":path", None) - - host = None - port = None - - if path == b'*' or path.startswith(b"/"): - first_line_format = "relative" - elif method == b'CONNECT': # pragma: no cover - raise NotImplementedError("CONNECT over HTTP/2 is not implemented.") - else: # pragma: no cover - first_line_format = "absolute" - # FIXME: verify if path or :host contains what we need - scheme, host, port, _ = url.parse(path) - - if authority: - host, _, port = authority.partition(b':') - - if not host: - host = b'localhost' - - if not port: - port = 443 if scheme == b'https' else 80 - - port = int(port) - - return first_line_format, method, scheme, host, port, path diff --git a/netlib/http/message.py b/netlib/http/message.py deleted file mode 100644 index 772a124e..00000000 --- a/netlib/http/message.py +++ /dev/null @@ -1,300 +0,0 @@ -import re -import warnings -from typing import Optional - -from mitmproxy.utils import strutils -from netlib.http import encoding -from mitmproxy.types import serializable -from netlib.http import headers - - -# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded. -def _native(x): - return x.decode("utf-8", "surrogateescape") - - -def _always_bytes(x): - return strutils.always_bytes(x, "utf-8", "surrogateescape") - - -class MessageData(serializable.Serializable): - def __eq__(self, other): - if isinstance(other, MessageData): - return self.__dict__ == other.__dict__ - return False - - def __ne__(self, other): - return not self.__eq__(other) - - def set_state(self, state): - for k, v in state.items(): - if k == "headers": - v = headers.Headers.from_state(v) - setattr(self, k, v) - - def get_state(self): - state = vars(self).copy() - state["headers"] = state["headers"].get_state() - return state - - @classmethod - def from_state(cls, state): - state["headers"] = headers.Headers.from_state(state["headers"]) - return cls(**state) - - -class Message(serializable.Serializable): - def __eq__(self, other): - if isinstance(other, Message): - return self.data == other.data - return False - - def __ne__(self, other): - return not self.__eq__(other) - - def get_state(self): - return self.data.get_state() - - def set_state(self, state): - self.data.set_state(state) - - @classmethod - def from_state(cls, state): - state["headers"] = headers.Headers.from_state(state["headers"]) - return cls(**state) - - @property - def headers(self): - """ - Message headers object - - Returns: - netlib.http.Headers - """ - return self.data.headers - - @headers.setter - def headers(self, h): - self.data.headers = h - - @property - def raw_content(self) -> bytes: - """ - The raw (encoded) HTTP message body - - See also: :py:attr:`content`, :py:class:`text` - """ - return self.data.content - - @raw_content.setter - def raw_content(self, content): - self.data.content = content - - def get_content(self, strict: bool=True) -> bytes: - """ - The HTTP message body decoded with the content-encoding header (e.g. gzip) - - Raises: - ValueError, when the content-encoding is invalid and strict is True. - - See also: :py:class:`raw_content`, :py:attr:`text` - """ - if self.raw_content is None: - return None - ce = self.headers.get("content-encoding") - if ce: - try: - return encoding.decode(self.raw_content, ce) - except ValueError: - if strict: - raise - return self.raw_content - else: - return self.raw_content - - def set_content(self, value): - if value is None: - self.raw_content = None - return - if not isinstance(value, bytes): - raise TypeError( - "Message content must be bytes, not {}. " - "Please use .text if you want to assign a str." - .format(type(value).__name__) - ) - ce = self.headers.get("content-encoding") - try: - self.raw_content = encoding.encode(value, ce or "identity") - except ValueError: - # So we have an invalid content-encoding? - # Let's remove it! - del self.headers["content-encoding"] - self.raw_content = value - self.headers["content-length"] = str(len(self.raw_content)) - - content = property(get_content, set_content) - - @property - def http_version(self): - """ - Version string, e.g. "HTTP/1.1" - """ - return _native(self.data.http_version) - - @http_version.setter - def http_version(self, http_version): - self.data.http_version = _always_bytes(http_version) - - @property - def timestamp_start(self): - """ - First byte timestamp - """ - return self.data.timestamp_start - - @timestamp_start.setter - def timestamp_start(self, timestamp_start): - self.data.timestamp_start = timestamp_start - - @property - def timestamp_end(self): - """ - Last byte timestamp - """ - return self.data.timestamp_end - - @timestamp_end.setter - def timestamp_end(self, timestamp_end): - self.data.timestamp_end = timestamp_end - - def _get_content_type_charset(self) -> Optional[str]: - ct = headers.parse_content_type(self.headers.get("content-type", "")) - if ct: - return ct[2].get("charset") - - def _guess_encoding(self) -> str: - enc = self._get_content_type_charset() - if enc: - return enc - - if "json" in self.headers.get("content-type", ""): - return "utf8" - else: - # We may also want to check for HTML meta tags here at some point. - return "latin-1" - - def get_text(self, strict: bool=True) -> str: - """ - The HTTP message body decoded with both content-encoding header (e.g. gzip) - and content-type header charset. - - Raises: - ValueError, when either content-encoding or charset is invalid and strict is True. - - See also: :py:attr:`content`, :py:class:`raw_content` - """ - if self.raw_content is None: - return None - enc = self._guess_encoding() - - content = self.get_content(strict) - try: - return encoding.decode(content, enc) - except ValueError: - if strict: - raise - return content.decode("utf8", "surrogateescape") - - def set_text(self, text): - if text is None: - self.content = None - return - enc = self._guess_encoding() - - try: - self.content = encoding.encode(text, enc) - except ValueError: - # Fall back to UTF-8 and update the content-type header. - ct = headers.parse_content_type(self.headers.get("content-type", "")) or ("text", "plain", {}) - ct[2]["charset"] = "utf-8" - self.headers["content-type"] = headers.assemble_content_type(*ct) - enc = "utf8" - self.content = text.encode(enc, "surrogateescape") - - text = property(get_text, set_text) - - def decode(self, strict=True): - """ - Decodes body based on the current Content-Encoding header, then - removes the header. If there is no Content-Encoding header, no - action is taken. - - Raises: - ValueError, when the content-encoding is invalid and strict is True. - """ - self.raw_content = self.get_content(strict) - self.headers.pop("content-encoding", None) - - def encode(self, e): - """ - Encodes body with the encoding e, where e is "gzip", "deflate", "identity", or "br". - Any existing content-encodings are overwritten, - the content is not decoded beforehand. - - Raises: - ValueError, when the specified content-encoding is invalid. - """ - self.headers["content-encoding"] = e - self.content = self.raw_content - if "content-encoding" not in self.headers: - raise ValueError("Invalid content encoding {}".format(repr(e))) - - def replace(self, pattern, repl, flags=0, count=0): - """ - Replaces a regular expression pattern with repl in both the headers - and the body of the message. Encoded body will be decoded - before replacement, and re-encoded afterwards. - - Returns: - The number of replacements made. - """ - if isinstance(pattern, str): - pattern = strutils.escaped_str_to_bytes(pattern) - if isinstance(repl, str): - repl = strutils.escaped_str_to_bytes(repl) - replacements = 0 - if self.content: - self.content, replacements = re.subn( - pattern, repl, self.content, flags=flags, count=count - ) - replacements += self.headers.replace(pattern, repl, flags=flags, count=count) - return replacements - - # Legacy - - @property - def body(self): # pragma: no cover - warnings.warn(".body is deprecated, use .content instead.", DeprecationWarning) - return self.content - - @body.setter - def body(self, body): # pragma: no cover - warnings.warn(".body is deprecated, use .content instead.", DeprecationWarning) - self.content = body - - -class decoded: - """ - Deprecated: You can now directly use :py:attr:`content`. - :py:attr:`raw_content` has the encoded content. - """ - - def __init__(self, message): # pragma no cover - warnings.warn("decoded() is deprecated, you can now directly use .content instead. " - ".raw_content has the encoded content.", DeprecationWarning) - - def __enter__(self): # pragma no cover - pass - - def __exit__(self, type, value, tb): # pragma no cover - pass diff --git a/netlib/http/multipart.py b/netlib/http/multipart.py deleted file mode 100644 index 536b2809..00000000 --- a/netlib/http/multipart.py +++ /dev/null @@ -1,32 +0,0 @@ -import re - -from netlib.http import headers - - -def decode(hdrs, content): - """ - Takes a multipart boundary encoded string and returns list of (key, value) tuples. - """ - v = hdrs.get("content-type") - if v: - v = headers.parse_content_type(v) - if not v: - return [] - try: - boundary = v[2]["boundary"].encode("ascii") - except (KeyError, UnicodeError): - return [] - - rx = re.compile(br'\bname="([^"]+)"') - r = [] - - for i in content.split(b"--" + boundary): - parts = i.splitlines() - if len(parts) > 1 and parts[0][0:2] != b"--": - match = rx.search(parts[1]) - if match: - key = match.group(1) - value = b"".join(parts[3 + parts[2:].index(b""):]) - r.append((key, value)) - return r - return [] diff --git a/netlib/http/request.py b/netlib/http/request.py deleted file mode 100644 index 16b0c986..00000000 --- a/netlib/http/request.py +++ /dev/null @@ -1,405 +0,0 @@ -import re -import urllib - -from mitmproxy.types import multidict -from mitmproxy.utils import strutils -from netlib.http import multipart -from netlib.http import cookies -from netlib.http import headers as nheaders -from netlib.http import message -import netlib.http.url - -# This regex extracts & splits the host header into host and port. -# Handles the edge case of IPv6 addresses containing colons. -# https://bugzilla.mozilla.org/show_bug.cgi?id=45891 -host_header_re = re.compile(r"^(?P<host>[^:]+|\[.+\])(?::(?P<port>\d+))?$") - - -class RequestData(message.MessageData): - def __init__( - self, - first_line_format, - method, - scheme, - host, - port, - path, - http_version, - headers=(), - content=None, - timestamp_start=None, - timestamp_end=None - ): - if isinstance(method, str): - method = method.encode("ascii", "strict") - if isinstance(scheme, str): - scheme = scheme.encode("ascii", "strict") - if isinstance(host, str): - host = host.encode("idna", "strict") - if isinstance(path, str): - path = path.encode("ascii", "strict") - if isinstance(http_version, str): - http_version = http_version.encode("ascii", "strict") - if not isinstance(headers, nheaders.Headers): - headers = nheaders.Headers(headers) - if isinstance(content, str): - raise ValueError("Content must be bytes, not {}".format(type(content).__name__)) - - self.first_line_format = first_line_format - self.method = method - self.scheme = scheme - self.host = host - self.port = port - self.path = path - self.http_version = http_version - self.headers = headers - self.content = content - self.timestamp_start = timestamp_start - self.timestamp_end = timestamp_end - - -class Request(message.Message): - """ - An HTTP request. - """ - def __init__(self, *args, **kwargs): - super().__init__() - self.data = RequestData(*args, **kwargs) - - def __repr__(self): - if self.host and self.port: - hostport = "{}:{}".format(self.host, self.port) - else: - hostport = "" - path = self.path or "" - return "Request({} {}{})".format( - self.method, hostport, path - ) - - def replace(self, pattern, repl, flags=0, count=0): - """ - Replaces a regular expression pattern with repl in the headers, the - request path and the body of the request. Encoded content will be - decoded before replacement, and re-encoded afterwards. - - Returns: - The number of replacements made. - """ - if isinstance(pattern, str): - pattern = strutils.escaped_str_to_bytes(pattern) - if isinstance(repl, str): - repl = strutils.escaped_str_to_bytes(repl) - - c = super().replace(pattern, repl, flags, count) - self.path, pc = re.subn( - pattern, repl, self.data.path, flags=flags, count=count - ) - c += pc - return c - - @property - def first_line_format(self): - """ - HTTP request form as defined in `RFC7230 <https://tools.ietf.org/html/rfc7230#section-5.3>`_. - - origin-form and asterisk-form are subsumed as "relative". - """ - return self.data.first_line_format - - @first_line_format.setter - def first_line_format(self, first_line_format): - self.data.first_line_format = first_line_format - - @property - def method(self): - """ - HTTP request method, e.g. "GET". - """ - return message._native(self.data.method).upper() - - @method.setter - def method(self, method): - self.data.method = message._always_bytes(method) - - @property - def scheme(self): - """ - HTTP request scheme, which should be "http" or "https". - """ - if not self.data.scheme: - return self.data.scheme - return message._native(self.data.scheme) - - @scheme.setter - def scheme(self, scheme): - self.data.scheme = message._always_bytes(scheme) - - @property - def host(self): - """ - Target host. This may be parsed from the raw request - (e.g. from a ``GET http://example.com/ HTTP/1.1`` request line) - or inferred from the proxy mode (e.g. an IP in transparent mode). - - Setting the host attribute also updates the host header, if present. - """ - if not self.data.host: - return self.data.host - try: - return self.data.host.decode("idna") - except UnicodeError: - return self.data.host.decode("utf8", "surrogateescape") - - @host.setter - def host(self, host): - if isinstance(host, str): - try: - # There's no non-strict mode for IDNA encoding. - # We don't want this operation to fail though, so we try - # utf8 as a last resort. - host = host.encode("idna", "strict") - except UnicodeError: - host = host.encode("utf8", "surrogateescape") - - self.data.host = host - - # Update host header - if "host" in self.headers: - if host: - self.headers["host"] = host - else: - self.headers.pop("host") - - @property - def port(self): - """ - Target port - """ - return self.data.port - - @port.setter - def port(self, port): - self.data.port = port - - @property - def path(self): - """ - HTTP request path, e.g. "/index.html". - Guaranteed to start with a slash, except for OPTIONS requests, which may just be "*". - """ - if self.data.path is None: - return None - else: - return message._native(self.data.path) - - @path.setter - def path(self, path): - self.data.path = message._always_bytes(path) - - @property - def url(self): - """ - The URL string, constructed from the request's URL components - """ - if self.first_line_format == "authority": - return "%s:%d" % (self.host, self.port) - return netlib.http.url.unparse(self.scheme, self.host, self.port, self.path) - - @url.setter - def url(self, url): - self.scheme, self.host, self.port, self.path = netlib.http.url.parse(url) - - def _parse_host_header(self): - """Extract the host and port from Host header""" - if "host" not in self.headers: - return None, None - host, port = self.headers["host"], None - m = host_header_re.match(host) - if m: - host = m.group("host").strip("[]") - if m.group("port"): - port = int(m.group("port")) - return host, port - - @property - def pretty_host(self): - """ - Similar to :py:attr:`host`, but using the Host headers as an additional preferred data source. - This is useful in transparent mode where :py:attr:`host` is only an IP address, - but may not reflect the actual destination as the Host header could be spoofed. - """ - host, port = self._parse_host_header() - if not host: - return self.host - if not port: - port = 443 if self.scheme == 'https' else 80 - # Prefer the original address if host header has an unexpected form - return host if port == self.port else self.host - - @property - def pretty_url(self): - """ - Like :py:attr:`url`, but using :py:attr:`pretty_host` instead of :py:attr:`host`. - """ - if self.first_line_format == "authority": - return "%s:%d" % (self.pretty_host, self.port) - return netlib.http.url.unparse(self.scheme, self.pretty_host, self.port, self.path) - - @property - def query(self) -> multidict.MultiDictView: - """ - The request query string as an :py:class:`~netlib.multidict.MultiDictView` object. - """ - return multidict.MultiDictView( - self._get_query, - self._set_query - ) - - def _get_query(self): - query = urllib.parse.urlparse(self.url).query - return tuple(netlib.http.url.decode(query)) - - def _set_query(self, query_data): - query = netlib.http.url.encode(query_data) - _, _, path, params, _, fragment = urllib.parse.urlparse(self.url) - self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) - - @query.setter - def query(self, value): - self._set_query(value) - - @property - def cookies(self) -> multidict.MultiDictView: - """ - The request cookies. - - An empty :py:class:`~netlib.multidict.MultiDictView` object if the cookie monster ate them all. - """ - return multidict.MultiDictView( - self._get_cookies, - self._set_cookies - ) - - def _get_cookies(self): - h = self.headers.get_all("Cookie") - return tuple(cookies.parse_cookie_headers(h)) - - def _set_cookies(self, value): - self.headers["cookie"] = cookies.format_cookie_header(value) - - @cookies.setter - def cookies(self, value): - self._set_cookies(value) - - @property - def path_components(self): - """ - The URL's path components as a tuple of strings. - Components are unquoted. - """ - path = urllib.parse.urlparse(self.url).path - # This needs to be a tuple so that it's immutable. - # Otherwise, this would fail silently: - # request.path_components.append("foo") - return tuple(netlib.http.url.unquote(i) for i in path.split("/") if i) - - @path_components.setter - def path_components(self, components): - components = map(lambda x: netlib.http.url.quote(x, safe=""), components) - path = "/" + "/".join(components) - _, _, _, params, query, fragment = urllib.parse.urlparse(self.url) - self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) - - def anticache(self): - """ - Modifies this request to remove headers that might produce a cached - response. That is, we remove ETags and If-Modified-Since headers. - """ - delheaders = [ - "if-modified-since", - "if-none-match", - ] - for i in delheaders: - self.headers.pop(i, None) - - def anticomp(self): - """ - Modifies this request to remove headers that will compress the - resource's data. - """ - self.headers["accept-encoding"] = "identity" - - def constrain_encoding(self): - """ - Limits the permissible Accept-Encoding values, based on what we can - decode appropriately. - """ - accept_encoding = self.headers.get("accept-encoding") - if accept_encoding: - self.headers["accept-encoding"] = ( - ', '.join( - e - for e in {"gzip", "identity", "deflate", "br"} - if e in accept_encoding - ) - ) - - @property - def urlencoded_form(self): - """ - The URL-encoded form data as an :py:class:`~netlib.multidict.MultiDictView` object. - An empty multidict.MultiDictView if the content-type indicates non-form data - or the content could not be parsed. - """ - return multidict.MultiDictView( - self._get_urlencoded_form, - self._set_urlencoded_form - ) - - def _get_urlencoded_form(self): - is_valid_content_type = "application/x-www-form-urlencoded" in self.headers.get("content-type", "").lower() - if is_valid_content_type: - try: - return tuple(netlib.http.url.decode(self.content)) - except ValueError: - pass - return () - - def _set_urlencoded_form(self, form_data): - """ - Sets the body to the URL-encoded form data, and adds the appropriate content-type header. - This will overwrite the existing content if there is one. - """ - self.headers["content-type"] = "application/x-www-form-urlencoded" - self.content = netlib.http.url.encode(form_data).encode() - - @urlencoded_form.setter - def urlencoded_form(self, value): - self._set_urlencoded_form(value) - - @property - def multipart_form(self): - """ - The multipart form data as an :py:class:`~netlib.multidict.MultiDictView` object. - None if the content-type indicates non-form data. - """ - return multidict.MultiDictView( - self._get_multipart_form, - self._set_multipart_form - ) - - def _get_multipart_form(self): - is_valid_content_type = "multipart/form-data" in self.headers.get("content-type", "").lower() - if is_valid_content_type: - try: - return multipart.decode(self.headers, self.content) - except ValueError: - pass - return () - - def _set_multipart_form(self, value): - raise NotImplementedError() - - @multipart_form.setter - def multipart_form(self, value): - self._set_multipart_form(value) diff --git a/netlib/http/response.py b/netlib/http/response.py deleted file mode 100644 index 4d1d5d24..00000000 --- a/netlib/http/response.py +++ /dev/null @@ -1,192 +0,0 @@ -import time -from email.utils import parsedate_tz, formatdate, mktime_tz -from mitmproxy.utils import human -from mitmproxy.types import multidict -from netlib.http import cookies -from netlib.http import headers as nheaders -from netlib.http import message -from netlib.http import status_codes -from typing import AnyStr -from typing import Dict -from typing import Iterable -from typing import Tuple -from typing import Union - - -class ResponseData(message.MessageData): - def __init__( - self, - http_version, - status_code, - reason=None, - headers=(), - content=None, - timestamp_start=None, - timestamp_end=None - ): - if isinstance(http_version, str): - http_version = http_version.encode("ascii", "strict") - if isinstance(reason, str): - reason = reason.encode("ascii", "strict") - if not isinstance(headers, nheaders.Headers): - headers = nheaders.Headers(headers) - if isinstance(content, str): - raise ValueError("Content must be bytes, not {}".format(type(content).__name__)) - - self.http_version = http_version - self.status_code = status_code - self.reason = reason - self.headers = headers - self.content = content - self.timestamp_start = timestamp_start - self.timestamp_end = timestamp_end - - -class Response(message.Message): - """ - An HTTP response. - """ - def __init__(self, *args, **kwargs): - super().__init__() - self.data = ResponseData(*args, **kwargs) - - def __repr__(self): - if self.raw_content: - details = "{}, {}".format( - self.headers.get("content-type", "unknown content type"), - human.pretty_size(len(self.raw_content)) - ) - else: - details = "no content" - return "Response({status_code} {reason}, {details})".format( - status_code=self.status_code, - reason=self.reason, - details=details - ) - - @classmethod - def make( - cls, - status_code: int=200, - content: AnyStr=b"", - headers: Union[Dict[AnyStr, AnyStr], Iterable[Tuple[bytes, bytes]]]=() - ): - """ - Simplified API for creating response objects. - """ - resp = cls( - b"HTTP/1.1", - status_code, - status_codes.RESPONSES.get(status_code, "").encode(), - (), - None - ) - - # Headers can be list or dict, we differentiate here. - if isinstance(headers, dict): - resp.headers = nheaders.Headers(**headers) - elif isinstance(headers, Iterable): - resp.headers = nheaders.Headers(headers) - else: - raise TypeError("Expected headers to be an iterable or dict, but is {}.".format( - type(headers).__name__ - )) - - # Assign this manually to update the content-length header. - if isinstance(content, bytes): - resp.content = content - elif isinstance(content, str): - resp.text = content - else: - raise TypeError("Expected content to be str or bytes, but is {}.".format( - type(content).__name__ - )) - - return resp - - @property - def status_code(self): - """ - HTTP Status Code, e.g. ``200``. - """ - return self.data.status_code - - @status_code.setter - def status_code(self, status_code): - self.data.status_code = status_code - - @property - def reason(self): - """ - HTTP Reason Phrase, e.g. "Not Found". - This is always :py:obj:`None` for HTTP2 requests, because HTTP2 responses do not contain a reason phrase. - """ - return message._native(self.data.reason) - - @reason.setter - def reason(self, reason): - self.data.reason = message._always_bytes(reason) - - @property - def cookies(self) -> multidict.MultiDictView: - """ - The response cookies. A possibly empty - :py:class:`~netlib.multidict.MultiDictView`, where the keys are cookie - name strings, and values are (value, attr) tuples. Value is a string, - and attr is an MultiDictView containing cookie attributes. Within - attrs, unary attributes (e.g. HTTPOnly) are indicated by a Null value. - - Caveats: - Updating the attr - """ - return multidict.MultiDictView( - self._get_cookies, - self._set_cookies - ) - - def _get_cookies(self): - h = self.headers.get_all("set-cookie") - return tuple(cookies.parse_set_cookie_headers(h)) - - def _set_cookies(self, value): - cookie_headers = [] - for k, v in value: - header = cookies.format_set_cookie_header([(k, v[0], v[1])]) - cookie_headers.append(header) - self.headers.set_all("set-cookie", cookie_headers) - - @cookies.setter - def cookies(self, value): - self._set_cookies(value) - - def refresh(self, now=None): - """ - This fairly complex and heuristic function refreshes a server - response for replay. - - - It adjusts date, expires and last-modified headers. - - It adjusts cookie expiration. - """ - if not now: - now = time.time() - delta = now - self.timestamp_start - refresh_headers = [ - "date", - "expires", - "last-modified", - ] - for i in refresh_headers: - if i in self.headers: - d = parsedate_tz(self.headers[i]) - if d: - new = mktime_tz(d) + delta - self.headers[i] = formatdate(new) - c = [] - for set_cookie_header in self.headers.get_all("set-cookie"): - try: - refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta) - except ValueError: - refreshed = set_cookie_header - c.append(refreshed) - if c: - self.headers.set_all("set-cookie", c) diff --git a/netlib/http/status_codes.py b/netlib/http/status_codes.py deleted file mode 100644 index 5a83cd73..00000000 --- a/netlib/http/status_codes.py +++ /dev/null @@ -1,104 +0,0 @@ -CONTINUE = 100 -SWITCHING = 101 -OK = 200 -CREATED = 201 -ACCEPTED = 202 -NON_AUTHORITATIVE_INFORMATION = 203 -NO_CONTENT = 204 -RESET_CONTENT = 205 -PARTIAL_CONTENT = 206 -MULTI_STATUS = 207 - -MULTIPLE_CHOICE = 300 -MOVED_PERMANENTLY = 301 -FOUND = 302 -SEE_OTHER = 303 -NOT_MODIFIED = 304 -USE_PROXY = 305 -TEMPORARY_REDIRECT = 307 - -BAD_REQUEST = 400 -UNAUTHORIZED = 401 -PAYMENT_REQUIRED = 402 -FORBIDDEN = 403 -NOT_FOUND = 404 -NOT_ALLOWED = 405 -NOT_ACCEPTABLE = 406 -PROXY_AUTH_REQUIRED = 407 -REQUEST_TIMEOUT = 408 -CONFLICT = 409 -GONE = 410 -LENGTH_REQUIRED = 411 -PRECONDITION_FAILED = 412 -REQUEST_ENTITY_TOO_LARGE = 413 -REQUEST_URI_TOO_LONG = 414 -UNSUPPORTED_MEDIA_TYPE = 415 -REQUESTED_RANGE_NOT_SATISFIABLE = 416 -EXPECTATION_FAILED = 417 -IM_A_TEAPOT = 418 - -INTERNAL_SERVER_ERROR = 500 -NOT_IMPLEMENTED = 501 -BAD_GATEWAY = 502 -SERVICE_UNAVAILABLE = 503 -GATEWAY_TIMEOUT = 504 -HTTP_VERSION_NOT_SUPPORTED = 505 -INSUFFICIENT_STORAGE_SPACE = 507 -NOT_EXTENDED = 510 - -RESPONSES = { - # 100 - CONTINUE: "Continue", - SWITCHING: "Switching Protocols", - - # 200 - OK: "OK", - CREATED: "Created", - ACCEPTED: "Accepted", - NON_AUTHORITATIVE_INFORMATION: "Non-Authoritative Information", - NO_CONTENT: "No Content", - RESET_CONTENT: "Reset Content.", - PARTIAL_CONTENT: "Partial Content", - MULTI_STATUS: "Multi-Status", - - # 300 - MULTIPLE_CHOICE: "Multiple Choices", - MOVED_PERMANENTLY: "Moved Permanently", - FOUND: "Found", - SEE_OTHER: "See Other", - NOT_MODIFIED: "Not Modified", - USE_PROXY: "Use Proxy", - # 306 not defined?? - TEMPORARY_REDIRECT: "Temporary Redirect", - - # 400 - BAD_REQUEST: "Bad Request", - UNAUTHORIZED: "Unauthorized", - PAYMENT_REQUIRED: "Payment Required", - FORBIDDEN: "Forbidden", - NOT_FOUND: "Not Found", - NOT_ALLOWED: "Method Not Allowed", - NOT_ACCEPTABLE: "Not Acceptable", - PROXY_AUTH_REQUIRED: "Proxy Authentication Required", - REQUEST_TIMEOUT: "Request Time-out", - CONFLICT: "Conflict", - GONE: "Gone", - LENGTH_REQUIRED: "Length Required", - PRECONDITION_FAILED: "Precondition Failed", - REQUEST_ENTITY_TOO_LARGE: "Request Entity Too Large", - REQUEST_URI_TOO_LONG: "Request-URI Too Long", - UNSUPPORTED_MEDIA_TYPE: "Unsupported Media Type", - REQUESTED_RANGE_NOT_SATISFIABLE: "Requested Range not satisfiable", - EXPECTATION_FAILED: "Expectation Failed", - IM_A_TEAPOT: "I'm a teapot", - - # 500 - INTERNAL_SERVER_ERROR: "Internal Server Error", - NOT_IMPLEMENTED: "Not Implemented", - BAD_GATEWAY: "Bad Gateway", - SERVICE_UNAVAILABLE: "Service Unavailable", - GATEWAY_TIMEOUT: "Gateway Time-out", - HTTP_VERSION_NOT_SUPPORTED: "HTTP Version not supported", - INSUFFICIENT_STORAGE_SPACE: "Insufficient Storage Space", - NOT_EXTENDED: "Not Extended" -} diff --git a/netlib/http/url.py b/netlib/http/url.py deleted file mode 100644 index 3ca58120..00000000 --- a/netlib/http/url.py +++ /dev/null @@ -1,127 +0,0 @@ -import urllib -from typing import Sequence -from typing import Tuple - -from netlib import check - - -# PY2 workaround -def decode_parse_result(result, enc): - if hasattr(result, "decode"): - return result.decode(enc) - else: - return urllib.parse.ParseResult(*[x.decode(enc) for x in result]) - - -# PY2 workaround -def encode_parse_result(result, enc): - if hasattr(result, "encode"): - return result.encode(enc) - else: - return urllib.parse.ParseResult(*[x.encode(enc) for x in result]) - - -def parse(url): - """ - URL-parsing function that checks that - - port is an integer 0-65535 - - host is a valid IDNA-encoded hostname with no null-bytes - - path is valid ASCII - - Args: - A URL (as bytes or as unicode) - - Returns: - A (scheme, host, port, path) tuple - - Raises: - ValueError, if the URL is not properly formatted. - """ - parsed = urllib.parse.urlparse(url) - - if not parsed.hostname: - raise ValueError("No hostname given") - - if isinstance(url, bytes): - host = parsed.hostname - - # this should not raise a ValueError, - # but we try to be very forgiving here and accept just everything. - # decode_parse_result(parsed, "ascii") - else: - host = parsed.hostname.encode("idna") - parsed = encode_parse_result(parsed, "ascii") - - port = parsed.port - if not port: - port = 443 if parsed.scheme == b"https" else 80 - - full_path = urllib.parse.urlunparse( - (b"", b"", parsed.path, parsed.params, parsed.query, parsed.fragment) - ) - if not full_path.startswith(b"/"): - full_path = b"/" + full_path - - if not check.is_valid_host(host): - raise ValueError("Invalid Host") - if not check.is_valid_port(port): - raise ValueError("Invalid Port") - - return parsed.scheme, host, port, full_path - - -def unparse(scheme, host, port, path=""): - """ - Returns a URL string, constructed from the specified components. - - Args: - All args must be str. - """ - if path == "*": - path = "" - return "%s://%s%s" % (scheme, hostport(scheme, host, port), path) - - -def encode(s: Sequence[Tuple[str, str]]) -> str: - """ - Takes a list of (key, value) tuples and returns a urlencoded string. - """ - return urllib.parse.urlencode(s, False, errors="surrogateescape") - - -def decode(s): - """ - Takes a urlencoded string and returns a list of surrogate-escaped (key, value) tuples. - """ - return urllib.parse.parse_qsl(s, keep_blank_values=True, errors='surrogateescape') - - -def quote(b: str, safe: str="/") -> str: - """ - Returns: - An ascii-encodable str. - """ - return urllib.parse.quote(b, safe=safe, errors="surrogateescape") - - -def unquote(s: str) -> str: - """ - Args: - s: A surrogate-escaped str - Returns: - A surrogate-escaped str - """ - return urllib.parse.unquote(s, errors="surrogateescape") - - -def hostport(scheme, host, port): - """ - Returns the host component, with a port specifcation if needed. - """ - if (port, scheme) in [(80, "http"), (443, "https"), (80, b"http"), (443, b"https")]: - return host - else: - if isinstance(host, bytes): - return b"%s:%d" % (host, port) - else: - return "%s:%d" % (host, port) diff --git a/netlib/http/user_agents.py b/netlib/http/user_agents.py deleted file mode 100644 index d0ca2f21..00000000 --- a/netlib/http/user_agents.py +++ /dev/null @@ -1,50 +0,0 @@ -""" - A small collection of useful user-agent header strings. These should be - kept reasonably current to reflect common usage. -""" - -# pylint: line-too-long - -# A collection of (name, shortcut, string) tuples. - -UASTRINGS = [ - ("android", - "a", - "Mozilla/5.0 (Linux; U; Android 4.1.1; en-gb; Nexus 7 Build/JRO03D) AFL/01.04.02"), # noqa - ("blackberry", - "l", - "Mozilla/5.0 (BlackBerry; U; BlackBerry 9900; en) AppleWebKit/534.11+ (KHTML, like Gecko) Version/7.1.0.346 Mobile Safari/534.11+"), # noqa - ("bingbot", - "b", - "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)"), # noqa - ("chrome", - "c", - "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"), # noqa - ("firefox", - "f", - "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:14.0) Gecko/20120405 Firefox/14.0a1"), # noqa - ("googlebot", - "g", - "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"), # noqa - ("ie9", - "i", - "Mozilla/5.0 (Windows; U; MSIE 9.0; WIndows NT 9.0; en-US)"), # noqa - ("ipad", - "p", - "Mozilla/5.0 (iPad; CPU OS 5_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9B176 Safari/7534.48.3"), # noqa - ("iphone", - "h", - "Mozilla/5.0 (iPhone; CPU iPhone OS 4_2_1 like Mac OS X) AppleWebKit/533.17.9 (KHTML, like Gecko) Version/5.0.2 Mobile/8C148a Safari/6533.18.5"), # noqa - ("safari", - "s", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/534.55.3 (KHTML, like Gecko) Version/5.1.3 Safari/534.53.10"), # noqa -] - - -def get_by_shortcut(s): - """ - Retrieve a user agent entry by shortcut. - """ - for i in UASTRINGS: - if s == i[1]: - return i |