aboutsummaryrefslogtreecommitdiffstats
path: root/libmproxy
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@nullcube.com>2012-06-27 16:43:33 +1200
committerAldo Cortesi <aldo@nullcube.com>2012-06-27 16:43:33 +1200
commit35ee0c098f1a9c492ddec34bf03f4f6741ac6d41 (patch)
tree199fdfc33243451d026871f75d6b758f7c9938ec /libmproxy
parent49dedd361c887ee1aae5b40c881d54d00b30e870 (diff)
downloadmitmproxy-35ee0c098f1a9c492ddec34bf03f4f6741ac6d41.tar.gz
mitmproxy-35ee0c098f1a9c492ddec34bf03f4f6741ac6d41.tar.bz2
mitmproxy-35ee0c098f1a9c492ddec34bf03f4f6741ac6d41.zip
Remove certutils from mitmproxy.
Diffstat (limited to 'libmproxy')
-rw-r--r--libmproxy/certutils.py219
-rw-r--r--libmproxy/flow.py4
-rw-r--r--libmproxy/proxy.py4
3 files changed, 4 insertions, 223 deletions
diff --git a/libmproxy/certutils.py b/libmproxy/certutils.py
deleted file mode 100644
index 31b1fa08..00000000
--- a/libmproxy/certutils.py
+++ /dev/null
@@ -1,219 +0,0 @@
-import os, ssl, hashlib, socket, time, datetime
-from pyasn1.type import univ, constraint, char, namedtype, tag
-from pyasn1.codec.der.decoder import decode
-import OpenSSL
-
-CERT_SLEEP_TIME = 1
-CERT_EXPIRY = str(365 * 3)
-
-
-def create_ca():
- key = OpenSSL.crypto.PKey()
- key.generate_key(OpenSSL.crypto.TYPE_RSA, 1024)
- ca = OpenSSL.crypto.X509()
- ca.set_serial_number(int(time.time()*10000))
- ca.set_version(2)
- ca.get_subject().CN = "mitmproxy"
- ca.get_subject().O = "mitmproxy"
- ca.gmtime_adj_notBefore(0)
- ca.gmtime_adj_notAfter(24 * 60 * 60 * 720)
- ca.set_issuer(ca.get_subject())
- ca.set_pubkey(key)
- ca.add_extensions([
- OpenSSL.crypto.X509Extension("basicConstraints", True,
- "CA:TRUE"),
- OpenSSL.crypto.X509Extension("nsCertType", True,
- "sslCA"),
- OpenSSL.crypto.X509Extension("extendedKeyUsage", True,
- "serverAuth,clientAuth,emailProtection,timeStamping,msCodeInd,msCodeCom,msCTLSign,msSGC,msEFS,nsSGC"
- ),
- OpenSSL.crypto.X509Extension("keyUsage", False,
- "keyCertSign, cRLSign"),
- OpenSSL.crypto.X509Extension("subjectKeyIdentifier", False, "hash",
- subject=ca),
- ])
- ca.sign(key, "sha1")
- return key, ca
-
-
-def dummy_ca(path):
- dirname = os.path.dirname(path)
- if not os.path.exists(dirname):
- os.makedirs(dirname)
- if path.endswith(".pem"):
- basename, _ = os.path.splitext(path)
- else:
- basename = path
-
- key, ca = create_ca()
-
- # Dump the CA plus private key
- f = open(path, "w")
- f.write(OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key))
- f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
- f.close()
-
- # Dump the certificate in PEM format
- f = open(os.path.join(dirname, basename + "-cert.pem"), "w")
- f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
- f.close()
-
- # Create a .cer file with the same contents for Android
- f = open(os.path.join(dirname, basename + "-cert.cer"), "w")
- f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, ca))
- f.close()
-
- # Dump the certificate in PKCS12 format for Windows devices
- f = open(os.path.join(dirname, basename + "-cert.p12"), "w")
- p12 = OpenSSL.crypto.PKCS12()
- p12.set_certificate(ca)
- p12.set_privatekey(key)
- f.write(p12.export())
- f.close()
- return True
-
-
-def dummy_cert(certdir, ca, commonname, sans):
- """
- certdir: Certificate directory.
- ca: Path to the certificate authority file, or None.
- commonname: Common name for the generated certificate.
-
- Returns cert path if operation succeeded, None if not.
- """
- namehash = hashlib.sha256(commonname).hexdigest()
- certpath = os.path.join(certdir, namehash + ".pem")
- if os.path.exists(certpath):
- return certpath
-
- ss = []
- for i in sans:
- ss.append("DNS: %s"%i)
- ss = ", ".join(ss)
-
- if ca:
- raw = file(ca, "r").read()
- ca = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, raw)
- key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, raw)
- else:
- key, ca = create_ca()
-
- req = OpenSSL.crypto.X509Req()
- subj = req.get_subject()
- subj.CN = commonname
- req.set_pubkey(ca.get_pubkey())
- req.sign(key, "sha1")
- if ss:
- req.add_extensions([OpenSSL.crypto.X509Extension("subjectAltName", True, ss)])
-
- cert = OpenSSL.crypto.X509()
- cert.gmtime_adj_notBefore(0)
- cert.gmtime_adj_notAfter(60 * 60 * 24 * 30)
- cert.set_issuer(ca.get_subject())
- cert.set_subject(req.get_subject())
- cert.set_serial_number(int(time.time()*10000))
- if ss:
- cert.add_extensions([OpenSSL.crypto.X509Extension("subjectAltName", True, ss)])
- cert.set_pubkey(req.get_pubkey())
- cert.sign(key, "sha1")
-
- f = open(certpath, "w")
- f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert))
- f.close()
-
- return certpath
-
-
-class _GeneralName(univ.Choice):
- # We are only interested in dNSNames. We use a default handler to ignore
- # other types.
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('dNSName', char.IA5String().subtype(
- implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)
- )
- ),
- )
-
-
-class _GeneralNames(univ.SequenceOf):
- componentType = _GeneralName()
- sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, 1024)
-
-
-class SSLCert:
- def __init__(self, pemtxt):
- """
- Returns a (common name, [subject alternative names]) tuple.
- """
- self.cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pemtxt)
-
- @classmethod
- def from_der(klass, der):
- pem = ssl.DER_cert_to_PEM_cert(der)
- return klass(pem)
-
- def digest(self, name):
- return self.cert.digest(name)
-
- @property
- def issuer(self):
- return self.cert.get_issuer().get_components()
-
- @property
- def notbefore(self):
- t = self.cert.get_notBefore()
- return datetime.datetime.strptime(t, "%Y%m%d%H%M%SZ")
-
- @property
- def notafter(self):
- t = self.cert.get_notAfter()
- return datetime.datetime.strptime(t, "%Y%m%d%H%M%SZ")
-
- @property
- def has_expired(self):
- return self.cert.has_expired()
-
- @property
- def subject(self):
- return self.cert.get_subject().get_components()
-
- @property
- def serial(self):
- return self.cert.get_serial_number()
-
- @property
- def keyinfo(self):
- pk = self.cert.get_pubkey()
- types = {
- OpenSSL.crypto.TYPE_RSA: "RSA",
- OpenSSL.crypto.TYPE_DSA: "DSA",
- }
- return (
- types.get(pk.type(), "UNKNOWN"),
- pk.bits()
- )
-
- @property
- def cn(self):
- cn = None
- for i in self.subject:
- if i[0] == "CN":
- cn = i[1]
- return cn
-
- @property
- def altnames(self):
- altnames = []
- for i in range(self.cert.get_extension_count()):
- ext = self.cert.get_extension(i)
- if ext.get_short_name() == "subjectAltName":
- dec = decode(ext.get_data(), asn1Spec=_GeneralNames())
- for i in dec[0]:
- altnames.append(i[0].asOctets())
- return altnames
-
-
-def get_remote_cert(host, port): # pragma: no cover
- addr = socket.gethostbyname(host)
- s = ssl.get_server_certificate((addr, port))
- return SSLCert(s)
diff --git a/libmproxy/flow.py b/libmproxy/flow.py
index 0620ab18..06300784 100644
--- a/libmproxy/flow.py
+++ b/libmproxy/flow.py
@@ -21,8 +21,8 @@ import hashlib, Cookie, cookielib, copy, re, urlparse
import time
import tnetstring, filt, script, utils, encoding, proxy
from email.utils import parsedate_tz, formatdate, mktime_tz
-from netlib import odict, http
-import controller, version, certutils
+from netlib import odict, http, certutils
+import controller, version
HDR_FORM_URLENCODED = "application/x-www-form-urlencoded"
CONTENT_MISSING = 0
diff --git a/libmproxy/proxy.py b/libmproxy/proxy.py
index bbccb713..0f68fe25 100644
--- a/libmproxy/proxy.py
+++ b/libmproxy/proxy.py
@@ -16,8 +16,8 @@ import sys, os, string, socket, time
import shutil, tempfile, threading
import optparse, SocketServer
from OpenSSL import SSL
-from netlib import odict, tcp, http, wsgi
-import utils, flow, certutils, version, platform
+from netlib import odict, tcp, http, wsgi, certutils
+import utils, flow, version, platform
class ProxyError(Exception):