aboutsummaryrefslogtreecommitdiffstats
path: root/tests/hazmat/backends/test_openssl_memleak.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/hazmat/backends/test_openssl_memleak.py')
-rw-r--r--tests/hazmat/backends/test_openssl_memleak.py451
1 files changed, 451 insertions, 0 deletions
diff --git a/tests/hazmat/backends/test_openssl_memleak.py b/tests/hazmat/backends/test_openssl_memleak.py
new file mode 100644
index 00000000..935ea3df
--- /dev/null
+++ b/tests/hazmat/backends/test_openssl_memleak.py
@@ -0,0 +1,451 @@
+# This file is dual licensed under the terms of the Apache License, Version
+# 2.0, and the BSD License. See the LICENSE file in the root of this repository
+# for complete details.
+
+from __future__ import absolute_import, division, print_function
+
+import json
+import os
+import subprocess
+import sys
+import textwrap
+
+import pytest
+
+from cryptography.hazmat.bindings.openssl.binding import Binding
+
+
+MEMORY_LEAK_SCRIPT = """
+import sys
+
+
+def main(argv):
+ import gc
+ import json
+
+ import cffi
+
+ from cryptography.hazmat.bindings._openssl import ffi, lib
+
+ heap = {}
+
+ BACKTRACE_ENABLED = False
+ if BACKTRACE_ENABLED:
+ backtrace_ffi = cffi.FFI()
+ backtrace_ffi.cdef('''
+ int backtrace(void **, int);
+ char **backtrace_symbols(void *const *, int);
+ ''')
+ backtrace_lib = backtrace_ffi.dlopen(None)
+
+ def backtrace():
+ buf = backtrace_ffi.new("void*[]", 24)
+ length = backtrace_lib.backtrace(buf, len(buf))
+ return (buf, length)
+
+ def symbolize_backtrace(trace):
+ (buf, length) = trace
+ symbols = backtrace_lib.backtrace_symbols(buf, length)
+ stack = [
+ backtrace_ffi.string(symbols[i]).decode()
+ for i in range(length)
+ ]
+ lib.Cryptography_free_wrapper(symbols, backtrace_ffi.NULL, 0)
+ return stack
+ else:
+ def backtrace():
+ return None
+
+ def symbolize_backtrace(trace):
+ return None
+
+ @ffi.callback("void *(size_t, const char *, int)")
+ def malloc(size, path, line):
+ ptr = lib.Cryptography_malloc_wrapper(size, path, line)
+ heap[ptr] = (size, path, line, backtrace())
+ return ptr
+
+ @ffi.callback("void *(void *, size_t, const char *, int)")
+ def realloc(ptr, size, path, line):
+ if ptr != ffi.NULL:
+ del heap[ptr]
+ new_ptr = lib.Cryptography_realloc_wrapper(ptr, size, path, line)
+ heap[new_ptr] = (size, path, line, backtrace())
+ return new_ptr
+
+ @ffi.callback("void(void *, const char *, int)")
+ def free(ptr, path, line):
+ if ptr != ffi.NULL:
+ del heap[ptr]
+ lib.Cryptography_free_wrapper(ptr, path, line)
+
+ result = lib.Cryptography_CRYPTO_set_mem_functions(malloc, realloc, free)
+ assert result == 1
+
+ # Trigger a bunch of initialization stuff.
+ import cryptography.hazmat.backends.openssl
+
+ start_heap = set(heap)
+
+ func(*argv[1:])
+ gc.collect()
+ gc.collect()
+ gc.collect()
+
+ if lib.Cryptography_HAS_OPENSSL_CLEANUP:
+ lib.OPENSSL_cleanup()
+
+ # Swap back to the original functions so that if OpenSSL tries to free
+ # something from its atexit handle it won't be going through a Python
+ # function, which will be deallocated when this function returns
+ result = lib.Cryptography_CRYPTO_set_mem_functions(
+ ffi.addressof(lib, "Cryptography_malloc_wrapper"),
+ ffi.addressof(lib, "Cryptography_realloc_wrapper"),
+ ffi.addressof(lib, "Cryptography_free_wrapper"),
+ )
+ assert result == 1
+
+ remaining = set(heap) - start_heap
+
+ if remaining:
+ sys.stdout.write(json.dumps(dict(
+ (int(ffi.cast("size_t", ptr)), {
+ "size": heap[ptr][0],
+ "path": ffi.string(heap[ptr][1]).decode(),
+ "line": heap[ptr][2],
+ "backtrace": symbolize_backtrace(heap[ptr][3]),
+ })
+ for ptr in remaining
+ )))
+ sys.stdout.flush()
+ sys.exit(255)
+
+main(sys.argv)
+"""
+
+
+def assert_no_memory_leaks(s, argv=[]):
+ env = os.environ.copy()
+ env["PYTHONPATH"] = os.pathsep.join(sys.path)
+ argv = [
+ sys.executable, "-c", "{}\n\n{}".format(s, MEMORY_LEAK_SCRIPT)
+ ] + argv
+ # Shell out to a fresh Python process because OpenSSL does not allow you to
+ # install new memory hooks after the first malloc/free occurs.
+ proc = subprocess.Popen(
+ argv,
+ env=env,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ try:
+ proc.wait()
+ if proc.returncode == 255:
+ # 255 means there was a leak, load the info about what mallocs
+ # weren't freed.
+ out = json.loads(proc.stdout.read().decode())
+ raise AssertionError(out)
+ elif proc.returncode != 0:
+ # Any exception type will do to be honest
+ raise ValueError(proc.stdout.read(), proc.stderr.read())
+ finally:
+ proc.stdout.close()
+ proc.stderr.close()
+
+
+def skip_if_memtesting_not_supported():
+ return pytest.mark.skipif(
+ not Binding().lib.Cryptography_HAS_MEM_FUNCTIONS,
+ reason="Requires OpenSSL memory functions (>=1.1.0)"
+ )
+
+
+@skip_if_memtesting_not_supported()
+class TestAssertNoMemoryLeaks(object):
+ def test_no_leak_no_malloc(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ pass
+ """))
+
+ def test_no_leak_free(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography.hazmat.bindings.openssl.binding import Binding
+ b = Binding()
+ name = b.lib.X509_NAME_new()
+ b.lib.X509_NAME_free(name)
+ """))
+
+ def test_no_leak_gc(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography.hazmat.bindings.openssl.binding import Binding
+ b = Binding()
+ name = b.lib.X509_NAME_new()
+ b.ffi.gc(name, b.lib.X509_NAME_free)
+ """))
+
+ def test_leak(self):
+ with pytest.raises(AssertionError):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography.hazmat.bindings.openssl.binding import (
+ Binding
+ )
+ b = Binding()
+ b.lib.X509_NAME_new()
+ """))
+
+ def test_errors(self):
+ with pytest.raises(ValueError):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ raise ZeroDivisionError
+ """))
+
+
+@skip_if_memtesting_not_supported()
+class TestOpenSSLMemoryLeaks(object):
+ @pytest.mark.parametrize("path", [
+ "x509/PKITS_data/certs/ValidcRLIssuerTest28EE.crt",
+ ])
+ def test_der_x509_certificate_extensions(self, path):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func(path):
+ from cryptography import x509
+ from cryptography.hazmat.backends.openssl import backend
+
+ import cryptography_vectors
+
+ with cryptography_vectors.open_vector_file(path, "rb") as f:
+ cert = x509.load_der_x509_certificate(
+ f.read(), backend
+ )
+
+ cert.extensions
+ """), [path])
+
+ @pytest.mark.parametrize("path", [
+ "x509/cryptography.io.pem",
+ ])
+ def test_pem_x509_certificate_extensions(self, path):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func(path):
+ from cryptography import x509
+ from cryptography.hazmat.backends.openssl import backend
+
+ import cryptography_vectors
+
+ with cryptography_vectors.open_vector_file(path, "rb") as f:
+ cert = x509.load_pem_x509_certificate(
+ f.read(), backend
+ )
+
+ cert.extensions
+ """), [path])
+
+ def test_x509_csr_extensions(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography import x509
+ from cryptography.hazmat.backends.openssl import backend
+ from cryptography.hazmat.primitives import hashes
+ from cryptography.hazmat.primitives.asymmetric import rsa
+
+ private_key = rsa.generate_private_key(
+ key_size=2048, public_exponent=65537, backend=backend
+ )
+ cert = x509.CertificateSigningRequestBuilder().subject_name(
+ x509.Name([])
+ ).add_extension(
+ x509.OCSPNoCheck(), critical=False
+ ).sign(private_key, hashes.SHA256(), backend)
+
+ cert.extensions
+ """))
+
+ def test_ec_private_numbers_private_key(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography.hazmat.backends.openssl import backend
+ from cryptography.hazmat.primitives.asymmetric import ec
+
+ ec.EllipticCurvePrivateNumbers(
+ private_value=int(
+ '280814107134858470598753916394807521398239633534281633982576099083'
+ '35787109896602102090002196616273211495718603965098'
+ ),
+ public_numbers=ec.EllipticCurvePublicNumbers(
+ curve=ec.SECP384R1(),
+ x=int(
+ '10036914308591746758780165503819213553101287571902957054148542'
+ '504671046744460374996612408381962208627004841444205030'
+ ),
+ y=int(
+ '17337335659928075994560513699823544906448896792102247714689323'
+ '575406618073069185107088229463828921069465902299522926'
+ )
+ )
+ ).private_key(backend)
+ """))
+
+ def test_ec_derive_private_key(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography.hazmat.backends.openssl import backend
+ from cryptography.hazmat.primitives.asymmetric import ec
+ ec.derive_private_key(1, ec.SECP256R1(), backend)
+ """))
+
+ def test_x25519_pubkey_from_private_key(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography.hazmat.primitives.asymmetric import x25519
+ private_key = x25519.X25519PrivateKey.generate()
+ private_key.public_key()
+ """))
+
+ def test_create_ocsp_request(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography import x509
+ from cryptography.hazmat.backends.openssl import backend
+ from cryptography.hazmat.primitives import hashes
+ from cryptography.x509 import ocsp
+ import cryptography_vectors
+
+ path = "x509/PKITS_data/certs/ValidcRLIssuerTest28EE.crt"
+ with cryptography_vectors.open_vector_file(path, "rb") as f:
+ cert = x509.load_der_x509_certificate(
+ f.read(), backend
+ )
+ builder = ocsp.OCSPRequestBuilder()
+ builder = builder.add_certificate(
+ cert, cert, hashes.SHA1()
+ ).add_extension(x509.OCSPNonce(b"0000"), False)
+ req = builder.build()
+ """))
+
+ @pytest.mark.parametrize("path", [
+ "pkcs12/cert-aes256cbc-no-key.p12",
+ "pkcs12/cert-key-aes256cbc.p12",
+ ])
+ def test_load_pkcs12_key_and_certificates(self, path):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func(path):
+ from cryptography import x509
+ from cryptography.hazmat.backends.openssl import backend
+ from cryptography.hazmat.primitives.serialization import pkcs12
+ import cryptography_vectors
+
+ with cryptography_vectors.open_vector_file(path, "rb") as f:
+ pkcs12.load_key_and_certificates(
+ f.read(), b"cryptography", backend
+ )
+ """), [path])
+
+ def test_create_crl_with_idp(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ import datetime
+ from cryptography import x509
+ from cryptography.hazmat.backends.openssl import backend
+ from cryptography.hazmat.primitives import hashes
+ from cryptography.hazmat.primitives.asymmetric import ec
+ from cryptography.x509.oid import NameOID
+
+ key = ec.generate_private_key(ec.SECP256R1(), backend)
+ last_update = datetime.datetime(2002, 1, 1, 12, 1)
+ next_update = datetime.datetime(2030, 1, 1, 12, 1)
+ idp = x509.IssuingDistributionPoint(
+ full_name=None,
+ relative_name=x509.RelativeDistinguishedName([
+ x509.NameAttribute(
+ oid=x509.NameOID.ORGANIZATION_NAME, value=u"PyCA")
+ ]),
+ only_contains_user_certs=False,
+ only_contains_ca_certs=True,
+ only_some_reasons=None,
+ indirect_crl=False,
+ only_contains_attribute_certs=False,
+ )
+ builder = x509.CertificateRevocationListBuilder().issuer_name(
+ x509.Name([
+ x509.NameAttribute(
+ NameOID.COMMON_NAME, u"cryptography.io CA"
+ )
+ ])
+ ).last_update(
+ last_update
+ ).next_update(
+ next_update
+ ).add_extension(
+ idp, True
+ )
+
+ crl = builder.sign(key, hashes.SHA256(), backend)
+ crl.extensions.get_extension_for_class(
+ x509.IssuingDistributionPoint
+ )
+ """))
+
+ def test_create_certificate_with_extensions(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ import datetime
+
+ from cryptography import x509
+ from cryptography.hazmat.backends.openssl import backend
+ from cryptography.hazmat.primitives import hashes
+ from cryptography.hazmat.primitives.asymmetric import ec
+ from cryptography.x509.oid import (
+ AuthorityInformationAccessOID, ExtendedKeyUsageOID, NameOID
+ )
+
+ private_key = ec.generate_private_key(ec.SECP256R1(), backend)
+
+ not_valid_before = datetime.datetime.now()
+ not_valid_after = not_valid_before + datetime.timedelta(days=365)
+
+ aia = x509.AuthorityInformationAccess([
+ x509.AccessDescription(
+ AuthorityInformationAccessOID.OCSP,
+ x509.UniformResourceIdentifier(u"http://ocsp.domain.com")
+ ),
+ x509.AccessDescription(
+ AuthorityInformationAccessOID.CA_ISSUERS,
+ x509.UniformResourceIdentifier(u"http://domain.com/ca.crt")
+ )
+ ])
+ sans = [u'*.example.org', u'foobar.example.net']
+ san = x509.SubjectAlternativeName(list(map(x509.DNSName, sans)))
+
+ ski = x509.SubjectKeyIdentifier.from_public_key(
+ private_key.public_key()
+ )
+ eku = x509.ExtendedKeyUsage([
+ ExtendedKeyUsageOID.CLIENT_AUTH,
+ ExtendedKeyUsageOID.SERVER_AUTH,
+ ExtendedKeyUsageOID.CODE_SIGNING,
+ ])
+
+ builder = x509.CertificateBuilder().serial_number(
+ 777
+ ).issuer_name(x509.Name([
+ x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'),
+ ])).subject_name(x509.Name([
+ x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'),
+ ])).public_key(
+ private_key.public_key()
+ ).add_extension(
+ aia, critical=False
+ ).not_valid_before(
+ not_valid_before
+ ).not_valid_after(
+ not_valid_after
+ )
+
+ cert = builder.sign(private_key, hashes.SHA256(), backend)
+ cert.extensions
+ """))