diff options
Diffstat (limited to 'mitmproxy/net')
-rw-r--r-- | mitmproxy/net/tcp.py | 77 | ||||
-rw-r--r-- | mitmproxy/net/tls.py | 130 |
2 files changed, 106 insertions, 101 deletions
diff --git a/mitmproxy/net/tcp.py b/mitmproxy/net/tcp.py index 35d3388f..47c80e80 100644 --- a/mitmproxy/net/tcp.py +++ b/mitmproxy/net/tcp.py @@ -5,8 +5,6 @@ import sys import threading import time import traceback -from ssl import match_hostname -from ssl import CertificateError from typing import Optional # noqa @@ -365,10 +363,13 @@ class TCPClient(_Connection): self.source_address = source_address self.cert = None self.server_certs = [] - self.ssl_verification_error = None # type: Optional[exceptions.InvalidCertificateException] self.sni = None self.spoof_source_address = spoof_source_address + @property + def ssl_verification_error(self) -> Optional[exceptions.InvalidCertificateException]: + return getattr(self.connection, "cert_error", None) + def close(self): # Make sure to close the real socket, not the SSL proxy. # OpenSSL is really good at screwing up, i.e. when trying to recv from a failed connection, @@ -380,29 +381,8 @@ class TCPClient(_Connection): else: close_socket(self.connection) - def create_ssl_context(self, **sslctx_kwargs): - def store_err(e): - self.ssl_verification_error = e - - return tls.create_client_context( - verify_error_callback=store_err, - **sslctx_kwargs, - ) - def convert_to_ssl(self, sni=None, alpn_protos=None, **sslctx_kwargs): - """ - cert: Path to a file containing both client cert and private key. - - options: A bit field consisting of OpenSSL.SSL.OP_* values - verify_options: A bit field consisting of OpenSSL.SSL.VERIFY_* values - ca_path: Path to a directory of trusted CA certificates prepared using the c_rehash tool - ca_pemfile: Path to a PEM formatted trusted CA certificate - """ - verification_mode = sslctx_kwargs.get('verify_options', None) - if verification_mode == SSL.VERIFY_PEER and not sni: - raise exceptions.TlsException("Cannot validate certificate hostname without SNI") - - context = self.create_ssl_context( + context = tls.create_client_context( alpn_protos=alpn_protos, sni=sni, **sslctx_kwargs @@ -426,33 +406,6 @@ class TCPClient(_Connection): for i in self.connection.get_peer_cert_chain(): self.server_certs.append(certs.SSLCert(i)) - # Validate TLS Hostname - try: - crt = dict( - subjectAltName=[("DNS", x.decode("ascii", "strict")) for x in self.cert.altnames] - ) - if self.cert.cn: - crt["subject"] = [[["commonName", self.cert.cn.decode("ascii", "strict")]]] - if sni: - # SNI hostnames allow support of IDN by using ASCII-Compatible Encoding - # Conversion algorithm is in RFC 3490 which is implemented by idna codec - # https://docs.python.org/3/library/codecs.html#text-encodings - # https://tools.ietf.org/html/rfc6066#section-3 - # https://tools.ietf.org/html/rfc4985#section-3 - hostname = sni.encode("idna").decode("ascii") - else: - hostname = "no-hostname" - match_hostname(crt, hostname) - except (ValueError, CertificateError) as e: - self.ssl_verification_error = exceptions.InvalidCertificateException( - "Certificate Verification Error for {}: {}".format( - sni or repr(self.address), - str(e) - ) - ) - if verification_mode == SSL.VERIFY_PEER: - raise self.ssl_verification_error - self.ssl_established = True self.rfile.set_descriptor(self.connection) self.wfile.set_descriptor(self.connection) @@ -538,28 +491,13 @@ class BaseHandler(_Connection): self.server = server self.clientcert = None - def create_ssl_context(self, **kwargs): - if kwargs.get("request_client_cert", None) is True: - def store_clientcert(cert): - self.clientcert = cert - - kwargs["request_client_cert"] = store_clientcert - - def store_err(e): - self.ssl_verification_error = e - - return tls.create_server_context( - **kwargs, - verify_error_callback=store_err, - ) - def convert_to_ssl(self, cert, key, **sslctx_kwargs): """ Convert connection to SSL. For a list of parameters, see tls.create_server_context(...) """ - context = self.create_ssl_context( + context = tls.create_server_context( cert=cert, key=key, **sslctx_kwargs) @@ -570,6 +508,9 @@ class BaseHandler(_Connection): except SSL.Error as v: raise exceptions.TlsException("SSL handshake error: %s" % repr(v)) self.ssl_established = True + cert = self.connection.get_peer_certificate() + if cert: + self.clientcert = certs.SSLCert(cert) self.rfile.set_descriptor(self.connection) self.wfile.set_descriptor(self.connection) diff --git a/mitmproxy/net/tls.py b/mitmproxy/net/tls.py index ccff36b8..74911f1e 100644 --- a/mitmproxy/net/tls.py +++ b/mitmproxy/net/tls.py @@ -5,6 +5,7 @@ import binascii import os import threading import typing +from ssl import match_hostname, CertificateError import certifi from OpenSSL import SSL @@ -94,23 +95,23 @@ log_master_secret = MasterSecretLogger.create_logfun( def _create_ssl_context( method: int = DEFAULT_METHOD, options: int = DEFAULT_OPTIONS, - verify_options: int = SSL.VERIFY_NONE, ca_path: str = None, ca_pemfile: str = None, cipher_list: str = None, alpn_protos: typing.Iterable[bytes] = None, alpn_select=None, alpn_select_callback: typing.Callable[[typing.Any, typing.Any], bytes] = None, - sni=None, - verify_error_callback: typing.Callable[ - [exceptions.InvalidCertificateException], None] = None, + verify: int = SSL.VERIFY_PEER, + verify_callback: typing.Optional[ + typing.Callable[[SSL.Connection, SSL.X509, int, int, bool], bool] + ] = None, ) -> SSL.Context: """ Creates an SSL Context. :param method: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD, TLSv1_1_METHOD, or TLSv1_2_METHOD :param options: A bit field consisting of OpenSSL.SSL.OP_* values - :param verify_options: A bit field consisting of OpenSSL.SSL.VERIFY_* values + :param verify: A bit field consisting of OpenSSL.SSL.VERIFY_* values :param ca_path: Path to a directory of trusted CA certificates prepared using the c_rehash tool :param ca_pemfile: Path to a PEM formatted trusted CA certificate :param cipher_list: A textual OpenSSL cipher list, see https://www.openssl.org/docs/apps/ciphers.html @@ -132,22 +133,8 @@ def _create_ssl_context( context.set_options(options) # Verify Options (NONE/PEER and trusted CAs) - if verify_options is not None: - def verify_cert(conn, x509, errno, err_depth, is_cert_verified): - if not is_cert_verified: - if verify_error_callback: - e = exceptions.InvalidCertificateException( - "Certificate Verification Error for {}: {} (errno: {}, depth: {})".format( - sni, - SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(errno)).decode(), - errno, - err_depth - ) - ) - verify_error_callback(e) - return is_cert_verified - - context.set_verify(verify_options, verify_cert) + if verify is not None: + context.set_verify(verify, verify_callback) if ca_path is None and ca_pemfile is None: ca_pemfile = certifi.where() try: @@ -201,11 +188,77 @@ def _create_ssl_context( def create_client_context( cert: str = None, + sni: str = None, + address: str=None, + verify: int = SSL.VERIFY_NONE, **sslctx_kwargs ) -> SSL.Context: + """ + Args: + cert: Path to a file containing both client cert and private key. + sni: Server Name Indication. Required for VERIFY_PEER + address: server address, used for expressive error messages only + verify: A bit field consisting of OpenSSL.SSL.VERIFY_* values + """ + + if sni is None and verify != SSL.VERIFY_NONE: + raise exceptions.TlsException("Cannot validate certificate hostname without SNI") + + def verify_callback( + conn: SSL.Connection, + x509: SSL.X509, + errno: int, + depth: int, + is_cert_verified: bool + ) -> bool: + if is_cert_verified and depth == 0: + # Verify hostname of leaf certificate. + cert = certs.SSLCert(x509) + try: + crt = dict( + subjectAltName=[("DNS", x.decode("ascii", "strict")) for x in cert.altnames] + ) # type: typing.Dict[str, typing.Any] + if cert.cn: + crt["subject"] = [[["commonName", cert.cn.decode("ascii", "strict")]]] + if sni: + # SNI hostnames allow support of IDN by using ASCII-Compatible Encoding + # Conversion algorithm is in RFC 3490 which is implemented by idna codec + # https://docs.python.org/3/library/codecs.html#text-encodings + # https://tools.ietf.org/html/rfc6066#section-3 + # https://tools.ietf.org/html/rfc4985#section-3 + hostname = sni.encode("idna").decode("ascii") + else: + hostname = "no-hostname" + match_hostname(crt, hostname) + except (ValueError, CertificateError) as e: + conn.cert_error = exceptions.InvalidCertificateException( + "Certificate verification error for {}: {}".format( + sni or repr(address), + str(e) + ) + ) + is_cert_verified = False + elif is_cert_verified: + pass + else: + conn.cert_error = exceptions.InvalidCertificateException( + "Certificate verification error for {}: {} (errno: {}, depth: {})".format( + sni, + SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(errno)).decode(), + errno, + depth + ) + ) + + # SSL_VERIFY_NONE: The handshake will be continued regardless of the verification result. + return is_cert_verified + context = _create_ssl_context( - **sslctx_kwargs + verify=verify, + verify_callback=verify_callback, + **sslctx_kwargs, ) + # Client Certs if cert: try: @@ -220,7 +273,7 @@ def create_server_context( cert: typing.Union[certs.SSLCert, str], key: SSL.PKey, handle_sni: typing.Optional[typing.Callable[[SSL.Connection], None]] = None, - request_client_cert: typing.Optional[typing.Callable[[certs.SSLCert], None]] = None, + request_client_cert: bool = False, chain_file=None, dhparams=None, extra_chain_certs: typing.Iterable[certs.SSLCert] = None, @@ -245,7 +298,27 @@ def create_server_context( until then we're conservative. """ - context = _create_ssl_context(ca_pemfile=chain_file, **sslctx_kwargs) + def accept_all( + conn_: SSL.Connection, + x509: SSL.X509, + errno: int, + err_depth: int, + is_cert_verified: bool, + ) -> bool: + # Return true to prevent cert verification error + return True + + if request_client_cert: + verify = SSL.VERIFY_PEER + else: + verify = SSL.VERIFY_NONE + + context = _create_ssl_context( + ca_pemfile=chain_file, + verify=verify, + verify_callback=accept_all, + **sslctx_kwargs, + ) context.use_privatekey(key) if isinstance(cert, certs.SSLCert): @@ -261,15 +334,6 @@ def create_server_context( # SNI callback happens during do_handshake() context.set_tlsext_servername_callback(handle_sni) - if request_client_cert: - def save_cert(conn_, x509, errno_, depth_, preverify_ok_): - cert = certs.SSLCert(x509) - request_client_cert(cert) - # Return true to prevent cert verification error - return True - - context.set_verify(SSL.VERIFY_PEER, save_cert) - if dhparams: SSL._lib.SSL_CTX_set_tmp_dh(context._context, dhparams) |