aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/net/tcp.py
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2017-09-03 03:02:13 +0200
committerMaximilian Hils <git@maximilianhils.com>2017-09-03 03:06:15 +0200
commit2b4f58eb4416beff3d0a246b8cfb55b5eb8f735b (patch)
treeff55220efeb1fd6c6f5a704a0aa5c63b4b8a6eee /mitmproxy/net/tcp.py
parent781369a3269c05d40fa8d37cfc0eae0401558d8d (diff)
downloadmitmproxy-2b4f58eb4416beff3d0a246b8cfb55b5eb8f735b.tar.gz
mitmproxy-2b4f58eb4416beff3d0a246b8cfb55b5eb8f735b.tar.bz2
mitmproxy-2b4f58eb4416beff3d0a246b8cfb55b5eb8f735b.zip
split TLS parts from net.tcp into net.tls
Diffstat (limited to 'mitmproxy/net/tcp.py')
-rw-r--r--mitmproxy/net/tcp.py284
1 files changed, 23 insertions, 261 deletions
diff --git a/mitmproxy/net/tcp.py b/mitmproxy/net/tcp.py
index e109236e..35d3388f 100644
--- a/mitmproxy/net/tcp.py
+++ b/mitmproxy/net/tcp.py
@@ -5,15 +5,13 @@ import sys
import threading
import time
import traceback
-import binascii
from ssl import match_hostname
from ssl import CertificateError
from typing import Optional # noqa
-from mitmproxy.utils import strutils
+from mitmproxy.net import tls
-import certifi
from OpenSSL import SSL
from mitmproxy import certs
@@ -28,90 +26,6 @@ IPPROTO_IPV6 = getattr(socket, "IPPROTO_IPV6", 41)
EINTR = 4
-# To enable all SSL methods use: SSLv23
-# then add options to disable certain methods
-# https://bugs.launchpad.net/pyopenssl/+bug/1020632/comments/3
-SSL_BASIC_OPTIONS = (
- SSL.OP_CIPHER_SERVER_PREFERENCE
-)
-if hasattr(SSL, "OP_NO_COMPRESSION"):
- SSL_BASIC_OPTIONS |= SSL.OP_NO_COMPRESSION
-
-SSL_DEFAULT_METHOD = SSL.SSLv23_METHOD
-SSL_DEFAULT_OPTIONS = (
- SSL.OP_NO_SSLv2 |
- SSL.OP_NO_SSLv3 |
- SSL_BASIC_OPTIONS
-)
-if hasattr(SSL, "OP_NO_COMPRESSION"):
- SSL_DEFAULT_OPTIONS |= SSL.OP_NO_COMPRESSION
-
-"""
-Map a reasonable SSL version specification into the format OpenSSL expects.
-Don't ask...
-https://bugs.launchpad.net/pyopenssl/+bug/1020632/comments/3
-"""
-sslversion_choices = {
- "all": (SSL.SSLv23_METHOD, SSL_BASIC_OPTIONS),
- # SSLv23_METHOD + NO_SSLv2 + NO_SSLv3 == TLS 1.0+
- # TLSv1_METHOD would be TLS 1.0 only
- "secure": (SSL.SSLv23_METHOD, (SSL.OP_NO_SSLv2 | SSL.OP_NO_SSLv3 | SSL_BASIC_OPTIONS)),
- "SSLv2": (SSL.SSLv2_METHOD, SSL_BASIC_OPTIONS),
- "SSLv3": (SSL.SSLv3_METHOD, SSL_BASIC_OPTIONS),
- "TLSv1": (SSL.TLSv1_METHOD, SSL_BASIC_OPTIONS),
- "TLSv1_1": (SSL.TLSv1_1_METHOD, SSL_BASIC_OPTIONS),
- "TLSv1_2": (SSL.TLSv1_2_METHOD, SSL_BASIC_OPTIONS),
-}
-
-ssl_method_names = {
- SSL.SSLv2_METHOD: "SSLv2",
- SSL.SSLv3_METHOD: "SSLv3",
- SSL.SSLv23_METHOD: "SSLv23",
- SSL.TLSv1_METHOD: "TLSv1",
- SSL.TLSv1_1_METHOD: "TLSv1.1",
- SSL.TLSv1_2_METHOD: "TLSv1.2",
-}
-
-
-class SSLKeyLogger:
-
- def __init__(self, filename):
- self.filename = filename
- self.f = None
- self.lock = threading.Lock()
-
- # required for functools.wraps, which pyOpenSSL uses.
- __name__ = "SSLKeyLogger"
-
- def __call__(self, connection, where, ret):
- if where == SSL.SSL_CB_HANDSHAKE_DONE and ret == 1:
- with self.lock:
- if not self.f:
- d = os.path.dirname(self.filename)
- if not os.path.isdir(d):
- os.makedirs(d)
- self.f = open(self.filename, "ab")
- self.f.write(b"\r\n")
- client_random = binascii.hexlify(connection.client_random())
- masterkey = binascii.hexlify(connection.master_key())
- self.f.write(b"CLIENT_RANDOM %s %s\r\n" % (client_random, masterkey))
- self.f.flush()
-
- def close(self):
- with self.lock:
- if self.f:
- self.f.close()
-
- @staticmethod
- def create_logfun(filename):
- if filename:
- return SSLKeyLogger(filename)
- return False
-
-
-log_ssl_key = SSLKeyLogger.create_logfun(
- os.getenv("MITMPROXY_SSLKEYLOGFILE") or os.getenv("SSLKEYLOGFILE"))
-
class _FileLike:
BLOCKSIZE = 1024 * 32
@@ -422,107 +336,6 @@ class _Connection:
except SSL.Error:
pass
- def _create_ssl_context(self,
- method=SSL_DEFAULT_METHOD,
- options=SSL_DEFAULT_OPTIONS,
- verify_options=SSL.VERIFY_NONE,
- ca_path=None,
- ca_pemfile=None,
- cipher_list=None,
- alpn_protos=None,
- alpn_select=None,
- alpn_select_callback=None,
- sni=None,
- ):
- """
- Creates an SSL Context.
-
- :param method: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD, TLSv1_1_METHOD, or TLSv1_2_METHOD
- :param options: A bit field consisting of OpenSSL.SSL.OP_* values
- :param verify_options: A bit field consisting of OpenSSL.SSL.VERIFY_* values
- :param ca_path: Path to a directory of trusted CA certificates prepared using the c_rehash tool
- :param ca_pemfile: Path to a PEM formatted trusted CA certificate
- :param cipher_list: A textual OpenSSL cipher list, see https://www.openssl.org/docs/apps/ciphers.html
- :rtype : SSL.Context
- """
- try:
- context = SSL.Context(method)
- except ValueError as e:
- method_name = ssl_method_names.get(method, "unknown")
- raise exceptions.TlsException(
- "SSL method \"%s\" is most likely not supported "
- "or disabled (for security reasons) in your libssl. "
- "Please refer to https://github.com/mitmproxy/mitmproxy/issues/1101 "
- "for more details." % method_name
- )
-
- # Options (NO_SSLv2/3)
- if options is not None:
- context.set_options(options)
-
- # Verify Options (NONE/PEER and trusted CAs)
- if verify_options is not None:
- def verify_cert(conn, x509, errno, err_depth, is_cert_verified):
- if not is_cert_verified:
- self.ssl_verification_error = exceptions.InvalidCertificateException(
- "Certificate Verification Error for {}: {} (errno: {}, depth: {})".format(
- sni,
- strutils.always_str(SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(errno)), "utf8"),
- errno,
- err_depth
- )
- )
- return is_cert_verified
-
- context.set_verify(verify_options, verify_cert)
- if ca_path is None and ca_pemfile is None:
- ca_pemfile = certifi.where()
- try:
- context.load_verify_locations(ca_pemfile, ca_path)
- except SSL.Error:
- raise exceptions.TlsException(
- "Cannot load trusted certificates ({}, {}).".format(
- ca_pemfile, ca_path
- )
- )
-
- # Workaround for
- # https://github.com/pyca/pyopenssl/issues/190
- # https://github.com/mitmproxy/mitmproxy/issues/472
- # Options already set before are not cleared.
- context.set_mode(SSL._lib.SSL_MODE_AUTO_RETRY)
-
- # Cipher List
- if cipher_list:
- try:
- context.set_cipher_list(cipher_list.encode())
- except SSL.Error as v:
- raise exceptions.TlsException("SSL cipher specification error: %s" % str(v))
-
- # SSLKEYLOGFILE
- if log_ssl_key:
- context.set_info_callback(log_ssl_key)
-
- if alpn_protos is not None:
- # advertise application layer protocols
- context.set_alpn_protos(alpn_protos)
- elif alpn_select is not None and alpn_select_callback is None:
- # select application layer protocol
- def alpn_select_callback(conn_, options):
- if alpn_select in options:
- return bytes(alpn_select)
- else: # pragma: no cover
- return options[0]
- context.set_alpn_select_callback(alpn_select_callback)
- elif alpn_select_callback is not None and alpn_select is None:
- if not callable(alpn_select_callback):
- raise exceptions.TlsException("ALPN error: alpn_select_callback must be a function.")
- context.set_alpn_select_callback(alpn_select_callback)
- elif alpn_select_callback is not None and alpn_select is not None:
- raise exceptions.TlsException("ALPN error: only define alpn_select (string) OR alpn_select_callback (function).")
-
- return context
-
class ConnectionCloser:
def __init__(self, conn):
@@ -567,18 +380,14 @@ class TCPClient(_Connection):
else:
close_socket(self.connection)
- def create_ssl_context(self, cert=None, alpn_protos=None, **sslctx_kwargs):
- context = self._create_ssl_context(
- alpn_protos=alpn_protos,
- **sslctx_kwargs)
- # Client Certs
- if cert:
- try:
- context.use_privatekey_file(cert)
- context.use_certificate_file(cert)
- except SSL.Error as v:
- raise exceptions.TlsException("SSL client certificate error: %s" % str(v))
- return context
+ def create_ssl_context(self, **sslctx_kwargs):
+ def store_err(e):
+ self.ssl_verification_error = e
+
+ return tls.create_client_context(
+ verify_error_callback=store_err,
+ **sslctx_kwargs,
+ )
def convert_to_ssl(self, sni=None, alpn_protos=None, **sslctx_kwargs):
"""
@@ -729,77 +538,30 @@ class BaseHandler(_Connection):
self.server = server
self.clientcert = None
- def create_ssl_context(self,
- cert, key,
- handle_sni=None,
- request_client_cert=None,
- chain_file=None,
- dhparams=None,
- extra_chain_certs=None,
- **sslctx_kwargs):
- """
- cert: A certs.SSLCert object or the path to a certificate
- chain file.
-
- handle_sni: SNI handler, should take a connection object. Server
- name can be retrieved like this:
-
- connection.get_servername()
+ def create_ssl_context(self, **kwargs):
+ if kwargs.get("request_client_cert", None) is True:
+ def store_clientcert(cert):
+ self.clientcert = cert
- And you can specify the connection keys as follows:
+ kwargs["request_client_cert"] = store_clientcert
- new_context = Context(TLSv1_METHOD)
- new_context.use_privatekey(key)
- new_context.use_certificate(cert)
- connection.set_context(new_context)
+ def store_err(e):
+ self.ssl_verification_error = e
- The request_client_cert argument requires some explanation. We're
- supposed to be able to do this with no negative effects - if the
- client has no cert to present, we're notified and proceed as usual.
- Unfortunately, Android seems to have a bug (tested on 4.2.2) - when
- an Android client is asked to present a certificate it does not
- have, it hangs up, which is frankly bogus. Some time down the track
- we may be able to make the proper behaviour the default again, but
- until then we're conservative.
- """
-
- context = self._create_ssl_context(ca_pemfile=chain_file, **sslctx_kwargs)
-
- context.use_privatekey(key)
- if isinstance(cert, certs.SSLCert):
- context.use_certificate(cert.x509)
- else:
- context.use_certificate_chain_file(cert)
-
- if extra_chain_certs:
- for i in extra_chain_certs:
- context.add_extra_chain_cert(i.x509)
-
- if handle_sni:
- # SNI callback happens during do_handshake()
- context.set_tlsext_servername_callback(handle_sni)
-
- if request_client_cert:
- def save_cert(conn_, cert, errno_, depth_, preverify_ok_):
- self.clientcert = certs.SSLCert(cert)
- # Return true to prevent cert verification error
- return True
- context.set_verify(SSL.VERIFY_PEER, save_cert)
-
- if dhparams:
- SSL._lib.SSL_CTX_set_tmp_dh(context._context, dhparams)
-
- return context
+ return tls.create_server_context(
+ **kwargs,
+ verify_error_callback=store_err,
+ )
def convert_to_ssl(self, cert, key, **sslctx_kwargs):
"""
Convert connection to SSL.
- For a list of parameters, see BaseHandler._create_ssl_context(...)
+ For a list of parameters, see tls.create_server_context(...)
"""
context = self.create_ssl_context(
- cert,
- key,
+ cert=cert,
+ key=key,
**sslctx_kwargs)
self.connection = SSL.Connection(context, self.connection)
self.connection.set_accept_state()