aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/net/tls.py
diff options
context:
space:
mode:
Diffstat (limited to 'mitmproxy/net/tls.py')
-rw-r--r--mitmproxy/net/tls.py130
1 files changed, 97 insertions, 33 deletions
diff --git a/mitmproxy/net/tls.py b/mitmproxy/net/tls.py
index ccff36b8..74911f1e 100644
--- a/mitmproxy/net/tls.py
+++ b/mitmproxy/net/tls.py
@@ -5,6 +5,7 @@ import binascii
import os
import threading
import typing
+from ssl import match_hostname, CertificateError
import certifi
from OpenSSL import SSL
@@ -94,23 +95,23 @@ log_master_secret = MasterSecretLogger.create_logfun(
def _create_ssl_context(
method: int = DEFAULT_METHOD,
options: int = DEFAULT_OPTIONS,
- verify_options: int = SSL.VERIFY_NONE,
ca_path: str = None,
ca_pemfile: str = None,
cipher_list: str = None,
alpn_protos: typing.Iterable[bytes] = None,
alpn_select=None,
alpn_select_callback: typing.Callable[[typing.Any, typing.Any], bytes] = None,
- sni=None,
- verify_error_callback: typing.Callable[
- [exceptions.InvalidCertificateException], None] = None,
+ verify: int = SSL.VERIFY_PEER,
+ verify_callback: typing.Optional[
+ typing.Callable[[SSL.Connection, SSL.X509, int, int, bool], bool]
+ ] = None,
) -> SSL.Context:
"""
Creates an SSL Context.
:param method: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD, TLSv1_1_METHOD, or TLSv1_2_METHOD
:param options: A bit field consisting of OpenSSL.SSL.OP_* values
- :param verify_options: A bit field consisting of OpenSSL.SSL.VERIFY_* values
+ :param verify: A bit field consisting of OpenSSL.SSL.VERIFY_* values
:param ca_path: Path to a directory of trusted CA certificates prepared using the c_rehash tool
:param ca_pemfile: Path to a PEM formatted trusted CA certificate
:param cipher_list: A textual OpenSSL cipher list, see https://www.openssl.org/docs/apps/ciphers.html
@@ -132,22 +133,8 @@ def _create_ssl_context(
context.set_options(options)
# Verify Options (NONE/PEER and trusted CAs)
- if verify_options is not None:
- def verify_cert(conn, x509, errno, err_depth, is_cert_verified):
- if not is_cert_verified:
- if verify_error_callback:
- e = exceptions.InvalidCertificateException(
- "Certificate Verification Error for {}: {} (errno: {}, depth: {})".format(
- sni,
- SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(errno)).decode(),
- errno,
- err_depth
- )
- )
- verify_error_callback(e)
- return is_cert_verified
-
- context.set_verify(verify_options, verify_cert)
+ if verify is not None:
+ context.set_verify(verify, verify_callback)
if ca_path is None and ca_pemfile is None:
ca_pemfile = certifi.where()
try:
@@ -201,11 +188,77 @@ def _create_ssl_context(
def create_client_context(
cert: str = None,
+ sni: str = None,
+ address: str=None,
+ verify: int = SSL.VERIFY_NONE,
**sslctx_kwargs
) -> SSL.Context:
+ """
+ Args:
+ cert: Path to a file containing both client cert and private key.
+ sni: Server Name Indication. Required for VERIFY_PEER
+ address: server address, used for expressive error messages only
+ verify: A bit field consisting of OpenSSL.SSL.VERIFY_* values
+ """
+
+ if sni is None and verify != SSL.VERIFY_NONE:
+ raise exceptions.TlsException("Cannot validate certificate hostname without SNI")
+
+ def verify_callback(
+ conn: SSL.Connection,
+ x509: SSL.X509,
+ errno: int,
+ depth: int,
+ is_cert_verified: bool
+ ) -> bool:
+ if is_cert_verified and depth == 0:
+ # Verify hostname of leaf certificate.
+ cert = certs.SSLCert(x509)
+ try:
+ crt = dict(
+ subjectAltName=[("DNS", x.decode("ascii", "strict")) for x in cert.altnames]
+ ) # type: typing.Dict[str, typing.Any]
+ if cert.cn:
+ crt["subject"] = [[["commonName", cert.cn.decode("ascii", "strict")]]]
+ if sni:
+ # SNI hostnames allow support of IDN by using ASCII-Compatible Encoding
+ # Conversion algorithm is in RFC 3490 which is implemented by idna codec
+ # https://docs.python.org/3/library/codecs.html#text-encodings
+ # https://tools.ietf.org/html/rfc6066#section-3
+ # https://tools.ietf.org/html/rfc4985#section-3
+ hostname = sni.encode("idna").decode("ascii")
+ else:
+ hostname = "no-hostname"
+ match_hostname(crt, hostname)
+ except (ValueError, CertificateError) as e:
+ conn.cert_error = exceptions.InvalidCertificateException(
+ "Certificate verification error for {}: {}".format(
+ sni or repr(address),
+ str(e)
+ )
+ )
+ is_cert_verified = False
+ elif is_cert_verified:
+ pass
+ else:
+ conn.cert_error = exceptions.InvalidCertificateException(
+ "Certificate verification error for {}: {} (errno: {}, depth: {})".format(
+ sni,
+ SSL._ffi.string(SSL._lib.X509_verify_cert_error_string(errno)).decode(),
+ errno,
+ depth
+ )
+ )
+
+ # SSL_VERIFY_NONE: The handshake will be continued regardless of the verification result.
+ return is_cert_verified
+
context = _create_ssl_context(
- **sslctx_kwargs
+ verify=verify,
+ verify_callback=verify_callback,
+ **sslctx_kwargs,
)
+
# Client Certs
if cert:
try:
@@ -220,7 +273,7 @@ def create_server_context(
cert: typing.Union[certs.SSLCert, str],
key: SSL.PKey,
handle_sni: typing.Optional[typing.Callable[[SSL.Connection], None]] = None,
- request_client_cert: typing.Optional[typing.Callable[[certs.SSLCert], None]] = None,
+ request_client_cert: bool = False,
chain_file=None,
dhparams=None,
extra_chain_certs: typing.Iterable[certs.SSLCert] = None,
@@ -245,7 +298,27 @@ def create_server_context(
until then we're conservative.
"""
- context = _create_ssl_context(ca_pemfile=chain_file, **sslctx_kwargs)
+ def accept_all(
+ conn_: SSL.Connection,
+ x509: SSL.X509,
+ errno: int,
+ err_depth: int,
+ is_cert_verified: bool,
+ ) -> bool:
+ # Return true to prevent cert verification error
+ return True
+
+ if request_client_cert:
+ verify = SSL.VERIFY_PEER
+ else:
+ verify = SSL.VERIFY_NONE
+
+ context = _create_ssl_context(
+ ca_pemfile=chain_file,
+ verify=verify,
+ verify_callback=accept_all,
+ **sslctx_kwargs,
+ )
context.use_privatekey(key)
if isinstance(cert, certs.SSLCert):
@@ -261,15 +334,6 @@ def create_server_context(
# SNI callback happens during do_handshake()
context.set_tlsext_servername_callback(handle_sni)
- if request_client_cert:
- def save_cert(conn_, x509, errno_, depth_, preverify_ok_):
- cert = certs.SSLCert(x509)
- request_client_cert(cert)
- # Return true to prevent cert verification error
- return True
-
- context.set_verify(SSL.VERIFY_PEER, save_cert)
-
if dhparams:
SSL._lib.SSL_CTX_set_tmp_dh(context._context, dhparams)