From cb0e3287090786fad566feb67ac07b8ef361b2c3 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Tue, 16 Feb 2010 17:09:07 +1300 Subject: Initial checkin. --- libmproxy/utils.py | 277 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 277 insertions(+) create mode 100644 libmproxy/utils.py (limited to 'libmproxy/utils.py') diff --git a/libmproxy/utils.py b/libmproxy/utils.py new file mode 100644 index 00000000..82ddf8fc --- /dev/null +++ b/libmproxy/utils.py @@ -0,0 +1,277 @@ +# Copyright (C) 2010 Aldo Cortesi +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import re, os, subprocess + +def isBin(s): + """ + Does this string have any non-ASCII characters? + """ + for i in s: + i = ord(i) + if i < 9: + return True + elif i > 13 and i < 32: + return True + elif i > 126: + return True + return False + + +def cleanBin(s): + parts = [] + for i in s: + o = ord(i) + if o > 31 and o < 127: + parts.append(i) + else: + parts.append(".") + return "".join(parts) + + +def hexdump(s): + """ + Returns a set of typles: + (offset, hex, str) + """ + parts = [] + for i in range(0, len(s), 16): + o = "%.10x"%i + part = s[i:i+16] + x = " ".join(["%.2x"%ord(i) for i in part]) + if len(part) < 16: + x += " " + x += " ".join([" " for i in range(16-len(part))]) + parts.append( + (o, x, cleanBin(part)) + ) + return parts + + +def isStringLike(anobj): + try: + # Avoid succeeding expensively if anobj is large. + anobj[:0]+'' + except: + return 0 + else: + return 1 + + +def isSequenceLike(anobj): + """ + Is anobj a non-string sequence type (list, tuple, iterator, or + similar)? Crude, but mostly effective. + """ + if not hasattr(anobj, "next"): + if isStringLike(anobj): + return 0 + try: + anobj[:0] + except: + return 0 + return 1 + + +def _caseless(s): + return s.lower() + + +class MultiDict: + """ + Simple wrapper around a dictionary to make holding multiple objects per + key easier. + + Note that this class assumes that keys are strings. + + Keys have no order, but the order in which values are added to a key is + preserved. + """ + # This ridiculous bit of subterfuge is needed to prevent the class from + # treating this as a bound method. + _helper = (str,) + def __init__(self): + self._d = dict() + + def copy(self): + m = self.__class__() + m._d = self._d.copy() + return m + + def clear(self): + return self._d.clear() + + def get(self, key, d=None): + key = self._helper[0](key) + return self._d.get(key, d) + + def __eq__(self, other): + return dict(self) == dict(other) + + def __delitem__(self, key): + self._d.__delitem__(key) + + def __getitem__(self, key): + key = self._helper[0](key) + return self._d.__getitem__(key) + + def __setitem__(self, key, value): + if not isSequenceLike(value): + raise ValueError, "Cannot insert non-sequence." + key = self._helper[0](key) + return self._d.__setitem__(key, value) + + def has_key(self, key): + key = self._helper[0](key) + return self._d.has_key(key) + + def keys(self): + return self._d.keys() + + def extend(self, key, value): + if not self.has_key(key): + self[key] = [] + self[key].extend(value) + + def append(self, key, value): + self.extend(key, [value]) + + def itemPairs(self): + """ + Yield all possible pairs of items. + """ + for i in self.keys(): + for j in self[i]: + yield (i, j) + + +class Headers(MultiDict): + """ + A dictionary-like class for keeping track of HTTP headers. + + It is case insensitive, and __repr__ formats the headers correcty for + output to the server. + """ + _helper = (_caseless,) + def __repr__(self): + """ + Returns a string containing a formatted header string. + """ + headerElements = [] + for key in self.keys(): + for val in self[key]: + headerElements.append(key + ": " + val) + headerElements.append("") + return "\r\n".join(headerElements) + + def match_re(self, expr): + """ + Match the regular expression against each header (key, value) pair. + """ + for k, v in self.itemPairs(): + s = "%s: %s"%(k, v) + if re.search(expr, s): + return True + return False + + def read(self, fp): + """ + Read a set of headers from a file pointer. Stop once a blank line + is reached. + """ + name = '' + while 1: + line = fp.readline() + if not line or line == '\r\n' or line == '\n': + break + if line[0] in ' \t': + # continued header + self[name][-1] = self[name][-1] + '\r\n ' + line.strip() + else: + i = line.find(':') + # We're being liberal in what we accept, here. + if i > 0: + name = line[:i] + value = line[i+1:].strip() + if self.has_key(name): + # merge value + self.append(name, value) + else: + self[name] = [value] + + +def pretty_size(size): + suffixes = [ + ("B", 2**10), + ("kB", 2**20), + ("M", 2**30), + ] + for suf, lim in suffixes: + if size >= lim: + continue + else: + x = round(size/float(lim/2**10), 2) + if x == int(x): + x = int(x) + return str(x) + suf + + +class Data: + def __init__(self, name): + m = __import__(name) + dirname, _ = os.path.split(m.__file__) + self.dirname = os.path.abspath(dirname) + + def path(self, path): + """ + Returns a path to the package data housed at 'path' under this + module.Path can be a path to a file, or to a directory. + + This function will raise ValueError if the path does not exist. + """ + fullpath = os.path.join(self.dirname, path) + if not os.path.exists(fullpath): + raise ValueError, "dataPath: %s does not exist."%fullpath + return fullpath +data = Data(__name__) + + +def make_bogus_cert(path): + # Generates a bogus certificate like so: + # openssl req -config template -x509 -nodes -days 9999 -newkey rsa:1024 \ + # -keyout cert.pem -out cert.pem + + d = os.path.dirname(path) + if not os.path.exists(d): + os.makedirs(d) + + cmd = [ + "openssl", + "req", + "-config", data.path("resources/bogus_template"), + "-x509" , + "-nodes", + "-days", "9999", + "-newkey", "rsa:1024", + "-keyout", path, + "-out", path, + ] + subprocess.call( + cmd, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE + ) + -- cgit v1.2.3