diff options
-rw-r--r-- | mitmproxy/net/tcp.py | 77 | ||||
-rw-r--r-- | mitmproxy/net/tls.py | 130 | ||||
-rw-r--r-- | mitmproxy/proxy/protocol/tls.py | 2 | ||||
-rw-r--r-- | setup.cfg | 1 | ||||
-rw-r--r-- | test/mitmproxy/net/test_tcp.py | 50 | ||||
-rw-r--r-- | test/mitmproxy/net/test_tls.py | 19 | ||||
-rw-r--r-- | test/mitmproxy/proxy/test_server.py | 2 |
7 files changed, 145 insertions, 136 deletions
diff --git a/mitmproxy/net/tcp.py b/mitmproxy/net/tcp.py index 35d3388f..47c80e80 100644 --- a/mitmproxy/net/tcp.py +++ b/mitmproxy/net/tcp.py @@ -5,8 +5,6 @@ import sys import threading import time import traceback -from ssl import match_hostname -from ssl import CertificateError from typing import Optional # noqa @@ -365,10 +363,13 @@ class TCPClient(_Connection): self.source_address = source_address self.cert = None self.server_certs = [] - self.ssl_verification_error = None # type: Optional[exceptions.InvalidCertificateException] self.sni = None self.spoof_source_address = spoof_source_address + @property + def ssl_verification_error(self) -> Optional[exceptions.InvalidCertificateException]: + return getattr(self.connection, "cert_error", None) + def close(self): # Make sure to close the real socket, not the SSL proxy. # OpenSSL is really good at screwing up, i.e. when trying to recv from a failed connection, @@ -380,29 +381,8 @@ class TCPClient(_Connection): else: close_socket(self.connection) - def create_ssl_context(self, **sslctx_kwargs): - def store_err(e): - self.ssl_verification_error = e - - return tls.create_client_context( - verify_error_callback=store_err, - **sslctx_kwargs, - ) - def convert_to_ssl(self, sni=None, alpn_protos=None, **sslctx_kwargs): - """ - cert: Path to a file containing both client cert and private key. - - options: A bit field consisting of OpenSSL.SSL.OP_* values - verify_options: A bit field consisting of OpenSSL.SSL.VERIFY_* values - ca_path: Path to a directory of trusted CA certificates prepared using the c_rehash tool - ca_pemfile: Path to a PEM formatted trusted CA certificate - """ - verification_mode = sslctx_kwargs.get('verify_options', None) - if verification_mode == SSL.VERIFY_PEER and not sni: - raise exceptions.TlsException("Cannot validate certificate hostname without SNI") - - context = self.create_ssl_context( + context = tls.create_client_context( alpn_protos=alpn_protos, sni=sni, **sslctx_kwargs @@ -426,33 +406,6 @@ class TCPClient(_Connection): for i in self.connection.get_peer_cert_chain(): self.server_certs.append(certs.SSLCert(i)) - # Validate TLS Hostname - try: - crt = dict( - subjectAltName=[("DNS", x.decode("ascii", "strict")) for x in self.cert.altnames] - ) - if self.cert.cn: - crt["subject"] = [[["commonName", self.cert.cn.decode("ascii", "strict")]]] - if sni: - # SNI hostnames allow support of IDN by using ASCII-Compatible Encoding - # Conversion algorithm is in RFC 3490 which is implemented by idna codec - # https://docs.python.org/3/library/codecs.html#text-encodings - # https://tools.ietf.org/html/rfc6066#section-3 - # https://tools.ietf.org/html/rfc4985#section-3 - hostname = sni.encode("idna").decode("ascii") - else: - hostname = "no-hostname" - match_hostname(crt, hostname) - except (ValueError, CertificateError) as e: - self.ssl_verification_error = exceptions.InvalidCertificateException( - "Certificate Verification Error for {}: {}".format( - sni or repr(self.address), - str(e) - ) - ) - if verification_mode == SSL.VERIFY_PEER: - raise self.ssl_verification_error - self.ssl_established = True self.rfile.set_descriptor(self.connection) self.wfile.set_descriptor(self.connection) @@ -538,28 +491,13 @@ class BaseHandler(_Connection): self.server = server self.clientcert = None - def create_ssl_context(self, **kwargs): - if kwargs.get("request_client_cert", None) is True: - def store_clientcert(cert): - self.clientcert = cert - - kwargs["request_client_cert"] = store_clientcert - - def store_err(e): - self.ssl_verification_error = e - - return tls.create_server_context( - **kwargs, - verify_error_callback=store_err, - ) - def convert_to_ssl(self, cert, key, **sslctx_kwargs): """ Convert connection to SSL. For a list of parameters, see tls.create_server_context(...) """ - context = self.create_ssl_context( + context = tls.create_server_context( cert=cert, key=key, **sslctx_kwargs) @@ -570,6 +508,9 @@ class BaseHandler(_Connection): except SSL.Error as v: raise exceptions.TlsException("SSL handshake error: %s" % repr(v)) self.ssl_established = True + cert = self.connection.get_peer_certificate() + if cert: + self.clientcert = certs.SSLCert(cert) self.rfile.set_descriptor(self.connection) self.wfile.set_descriptor(self.connection) diff --git a/mitmproxy/net/tls.py b/mitmproxy/net/tls.py index ccff36b8..74911f1e 100644 --- a/mitmproxy/net/tls.py +++ b/mitmproxy/net/tls.py @@ -5,6 +5,7 @@ import binascii import os import threading import typing +from ssl import match_hostname, CertificateError import certifi from OpenSSL import SSL @@ -94,23 +95,23 @@ log_master_secret = MasterSecretLogger.create_logfun( def _create_ssl_context( method: int = DEFAULT_METHOD, options: int = DEFAULT_OPTIONS, - verify_options: int = SSL.VERIFY_NONE, ca_path: str = None, ca_pemfile: str = None, cipher_list: str = None, alpn_protos: typing.Iterable[bytes] = None, alpn_select=None, alpn_select_callback: typing.Callable[[typing.Any, typing.Any], bytes] = None, - sni=None, - verify_error_callback: typing.Callable[ - [exceptions.InvalidCertificateException], None] = None, + verify: int = SSL.VERIFY_PEER, + verify_callback: typing.Optional[ + typing.Callable[[SSL.Connection, SSL.X509, int, int, bool], bool] + ] = None, ) -> SSL.Context: """ Creates an SSL Context. :param method: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD, TLSv1_1_METHOD, or TLSv1_2_METHOD :param options: A bit field consisting of OpenSSL.SSL.OP_* values - :param verify_options: A bit field consisting of OpenSSL.SSL.VERIFY_* values + :param verify: A bit field consisting of OpenSSL.SSL.VERIFY_* values :param ca_path: Path to a directory of trusted CA certificates prepared using the c_rehash tool :param ca_pemfile: Path to a PEM formatted trusted CA certificate :param cipher_list: A textual OpenSSL cipher list, see https://www.openssl.org/docs/apps/ciphers.html @@ -132,22 +133,8 @@ def _create_ssl_context( context.set_options(options) # Verify Options (NONE/PEER and trusted CAs) - if verify_options is not None: - def verify_cert(conn, x509, errno, err_depth, is_cert_verified): - if not is_cert_verified: - if verify_error_callback: - e = exceptions.InvalidCertificateException( - "Certificate Verification Error for {}: {} (errno: {}, depth: {})".format( - sni, - SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(errno)).decode(), - errno, - err_depth - ) - ) - verify_error_callback(e) - return is_cert_verified - - context.set_verify(verify_options, verify_cert) + if verify is not None: + context.set_verify(verify, verify_callback) if ca_path is None and ca_pemfile is None: ca_pemfile = certifi.where() try: @@ -201,11 +188,77 @@ def _create_ssl_context( def create_client_context( cert: str = None, + sni: str = None, + address: str=None, + verify: int = SSL.VERIFY_NONE, **sslctx_kwargs ) -> SSL.Context: + """ + Args: + cert: Path to a file containing both client cert and private key. + sni: Server Name Indication. Required for VERIFY_PEER + address: server address, used for expressive error messages only + verify: A bit field consisting of OpenSSL.SSL.VERIFY_* values + """ + + if sni is None and verify != SSL.VERIFY_NONE: + raise exceptions.TlsException("Cannot validate certificate hostname without SNI") + + def verify_callback( + conn: SSL.Connection, + x509: SSL.X509, + errno: int, + depth: int, + is_cert_verified: bool + ) -> bool: + if is_cert_verified and depth == 0: + # Verify hostname of leaf certificate. + cert = certs.SSLCert(x509) + try: + crt = dict( + subjectAltName=[("DNS", x.decode("ascii", "strict")) for x in cert.altnames] + ) # type: typing.Dict[str, typing.Any] + if cert.cn: + crt["subject"] = [[["commonName", cert.cn.decode("ascii", "strict")]]] + if sni: + # SNI hostnames allow support of IDN by using ASCII-Compatible Encoding + # Conversion algorithm is in RFC 3490 which is implemented by idna codec + # https://docs.python.org/3/library/codecs.html#text-encodings + # https://tools.ietf.org/html/rfc6066#section-3 + # https://tools.ietf.org/html/rfc4985#section-3 + hostname = sni.encode("idna").decode("ascii") + else: + hostname = "no-hostname" + match_hostname(crt, hostname) + except (ValueError, CertificateError) as e: + conn.cert_error = exceptions.InvalidCertificateException( + "Certificate verification error for {}: {}".format( + sni or repr(address), + str(e) + ) + ) + is_cert_verified = False + elif is_cert_verified: + pass + else: + conn.cert_error = exceptions.InvalidCertificateException( + "Certificate verification error for {}: {} (errno: {}, depth: {})".format( + sni, + SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(errno)).decode(), + errno, + depth + ) + ) + + # SSL_VERIFY_NONE: The handshake will be continued regardless of the verification result. + return is_cert_verified + context = _create_ssl_context( - **sslctx_kwargs + verify=verify, + verify_callback=verify_callback, + **sslctx_kwargs, ) + # Client Certs if cert: try: @@ -220,7 +273,7 @@ def create_server_context( cert: typing.Union[certs.SSLCert, str], key: SSL.PKey, handle_sni: typing.Optional[typing.Callable[[SSL.Connection], None]] = None, - request_client_cert: typing.Optional[typing.Callable[[certs.SSLCert], None]] = None, + request_client_cert: bool = False, chain_file=None, dhparams=None, extra_chain_certs: typing.Iterable[certs.SSLCert] = None, @@ -245,7 +298,27 @@ def create_server_context( until then we're conservative. """ - context = _create_ssl_context(ca_pemfile=chain_file, **sslctx_kwargs) + def accept_all( + conn_: SSL.Connection, + x509: SSL.X509, + errno: int, + err_depth: int, + is_cert_verified: bool, + ) -> bool: + # Return true to prevent cert verification error + return True + + if request_client_cert: + verify = SSL.VERIFY_PEER + else: + verify = SSL.VERIFY_NONE + + context = _create_ssl_context( + ca_pemfile=chain_file, + verify=verify, + verify_callback=accept_all, + **sslctx_kwargs, + ) context.use_privatekey(key) if isinstance(cert, certs.SSLCert): @@ -261,15 +334,6 @@ def create_server_context( # SNI callback happens during do_handshake() context.set_tlsext_servername_callback(handle_sni) - if request_client_cert: - def save_cert(conn_, x509, errno_, depth_, preverify_ok_): - cert = certs.SSLCert(x509) - request_client_cert(cert) - # Return true to prevent cert verification error - return True - - context.set_verify(SSL.VERIFY_PEER, save_cert) - if dhparams: SSL._lib.SSL_CTX_set_tmp_dh(context._context, dhparams) diff --git a/mitmproxy/proxy/protocol/tls.py b/mitmproxy/proxy/protocol/tls.py index 10eea4ae..21bf1417 100644 --- a/mitmproxy/proxy/protocol/tls.py +++ b/mitmproxy/proxy/protocol/tls.py @@ -548,7 +548,7 @@ class TlsLayer(base.Layer): self.server_sni, method=self.config.openssl_method_server, options=self.config.openssl_options_server, - verify_options=self.config.openssl_verification_mode_server, + verify=self.config.openssl_verification_mode_server, ca_path=self.config.options.ssl_verify_upstream_trusted_cadir, ca_pemfile=self.config.options.ssl_verify_upstream_trusted_ca, cipher_list=ciphers_server, @@ -50,6 +50,7 @@ exclude = mitmproxy/net/http/multipart.py mitmproxy/net/http/url.py mitmproxy/net/tcp.py + mitmproxy/net/tls.py mitmproxy/options.py mitmproxy/proxy/config.py mitmproxy/proxy/modes/http_proxy.py diff --git a/test/mitmproxy/net/test_tcp.py b/test/mitmproxy/net/test_tcp.py index f0e8b776..9d521533 100644 --- a/test/mitmproxy/net/test_tcp.py +++ b/test/mitmproxy/net/test_tcp.py @@ -206,7 +206,7 @@ class TestInvalidTrustFile(tservers.ServerTestBase): with pytest.raises(exceptions.TlsException): c.convert_to_ssl( sni="example.mitmproxy.org", - verify_options=SSL.VERIFY_PEER, + verify=SSL.VERIFY_PEER, ca_pemfile=tutils.test_data.path("mitmproxy/net/data/verificationcerts/generate.py") ) @@ -236,7 +236,7 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase): def test_mode_none_should_pass(self): c = tcp.TCPClient(("127.0.0.1", self.port)) with c.connect(): - c.convert_to_ssl(verify_options=SSL.VERIFY_NONE) + c.convert_to_ssl(verify=SSL.VERIFY_NONE) # Verification errors should be saved even if connection isn't aborted assert c.ssl_verification_error @@ -252,7 +252,7 @@ class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase): with pytest.raises(exceptions.InvalidCertificateException): c.convert_to_ssl( sni="example.mitmproxy.org", - verify_options=SSL.VERIFY_PEER, + verify=SSL.VERIFY_PEER, ca_pemfile=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-root.crt") ) @@ -276,17 +276,27 @@ class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase): with c.connect(): with pytest.raises(exceptions.TlsException): c.convert_to_ssl( - verify_options=SSL.VERIFY_PEER, + verify=SSL.VERIFY_PEER, ca_pemfile=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-root.crt") ) + def test_mode_none_should_pass_without_sni(self): + c = tcp.TCPClient(("127.0.0.1", self.port)) + with c.connect(): + c.convert_to_ssl( + verify=SSL.VERIFY_NONE, + ca_path=tutils.test_data.path("mitmproxy/net/data/verificationcerts/") + ) + + assert "'no-hostname' doesn't match" in str(c.ssl_verification_error) + def test_should_fail(self): c = tcp.TCPClient(("127.0.0.1", self.port)) with c.connect(): with pytest.raises(exceptions.InvalidCertificateException): c.convert_to_ssl( sni="mitmproxy.org", - verify_options=SSL.VERIFY_PEER, + verify=SSL.VERIFY_PEER, ca_pemfile=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-root.crt") ) assert c.ssl_verification_error @@ -305,7 +315,7 @@ class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase): with c.connect(): c.convert_to_ssl( sni="example.mitmproxy.org", - verify_options=SSL.VERIFY_PEER, + verify=SSL.VERIFY_PEER, ca_pemfile=tutils.test_data.path("mitmproxy/net/data/verificationcerts/trusted-root.crt") ) @@ -321,7 +331,7 @@ class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase): with c.connect(): c.convert_to_ssl( sni="example.mitmproxy.org", - verify_options=SSL.VERIFY_PEER, + verify=SSL.VERIFY_PEER, ca_path=tutils.test_data.path("mitmproxy/net/data/verificationcerts/") ) @@ -774,10 +784,7 @@ class TestPeek(tservers.ServerTestBase): c.close() with pytest.raises(exceptions.NetlibException): - if c.rfile.peek(1) == b"": - # Workaround for Python 2 on Unix: - # Peeking a closed connection does not raise an exception here. - raise exceptions.NetlibException() + c.rfile.peek(1) class TestPeekSSL(TestPeek): @@ -787,24 +794,3 @@ class TestPeekSSL(TestPeek): with c.connect() as conn: c.convert_to_ssl() return conn.pop() - - -class TestSSLInvalid(tservers.ServerTestBase): - handler = EchoHandler - ssl = True - - def test_invalid_ssl_method_should_fail(self): - fake_ssl_method = 100500 - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - with pytest.raises(exceptions.TlsException): - c.convert_to_ssl(method=fake_ssl_method) - - def test_alpn_error(self): - c = tcp.TCPClient(("127.0.0.1", self.port)) - with c.connect(): - with pytest.raises(exceptions.TlsException, match="must be a function"): - c.create_ssl_context(alpn_select_callback="foo") - - with pytest.raises(exceptions.TlsException, match="ALPN error"): - c.create_ssl_context(alpn_select="foo", alpn_select_callback="bar") diff --git a/test/mitmproxy/net/test_tls.py b/test/mitmproxy/net/test_tls.py index 473163aa..d0583d34 100644 --- a/test/mitmproxy/net/test_tls.py +++ b/test/mitmproxy/net/test_tls.py @@ -1,10 +1,13 @@ +import pytest + +from mitmproxy import exceptions from mitmproxy.net import tls from mitmproxy.net.tcp import TCPClient from test.mitmproxy.net.test_tcp import EchoHandler from . import tservers -class TestSSLKeyLogger(tservers.ServerTestBase): +class TestMasterSecretLogger(tservers.ServerTestBase): handler = EchoHandler ssl = dict( cipher_list="AES256-SHA" @@ -36,3 +39,17 @@ class TestSSLKeyLogger(tservers.ServerTestBase): tls.MasterSecretLogger.create_logfun("test"), tls.MasterSecretLogger) assert not tls.MasterSecretLogger.create_logfun(False) + + +class TestTLSInvalid: + def test_invalid_ssl_method_should_fail(self): + fake_ssl_method = 100500 + with pytest.raises(exceptions.TlsException): + tls.create_client_context(method=fake_ssl_method) + + def test_alpn_error(self): + with pytest.raises(exceptions.TlsException, match="must be a function"): + tls.create_client_context(alpn_select_callback="foo") + + with pytest.raises(exceptions.TlsException, match="ALPN error"): + tls.create_client_context(alpn_select="foo", alpn_select_callback="bar") diff --git a/test/mitmproxy/proxy/test_server.py b/test/mitmproxy/proxy/test_server.py index 562f822c..affdf221 100644 --- a/test/mitmproxy/proxy/test_server.py +++ b/test/mitmproxy/proxy/test_server.py @@ -468,7 +468,7 @@ class TestHTTPSUpstreamServerVerificationWBadCert(tservers.HTTPProxyTest): self.options.ssl_insecure = False r = self._request() assert r.status_code == 502 - assert b"Certificate Verification Error" in r.raw_content + assert b"Certificate verification error" in r.raw_content class TestHTTPSNoCommonName(tservers.HTTPProxyTest): |