aboutsummaryrefslogtreecommitdiffstats
path: root/tests/hazmat/backends
diff options
context:
space:
mode:
Diffstat (limited to 'tests/hazmat/backends')
-rw-r--r--tests/hazmat/backends/test_commoncrypto.py61
-rw-r--r--tests/hazmat/backends/test_multibackend.py498
-rw-r--r--tests/hazmat/backends/test_openssl.py472
-rw-r--r--tests/hazmat/backends/test_openssl_memleak.py451
4 files changed, 739 insertions, 743 deletions
diff --git a/tests/hazmat/backends/test_commoncrypto.py b/tests/hazmat/backends/test_commoncrypto.py
deleted file mode 100644
index f7200016..00000000
--- a/tests/hazmat/backends/test_commoncrypto.py
+++ /dev/null
@@ -1,61 +0,0 @@
-# This file is dual licensed under the terms of the Apache License, Version
-# 2.0, and the BSD License. See the LICENSE file in the root of this repository
-# for complete details.
-
-from __future__ import absolute_import, division, print_function
-
-import pytest
-
-from cryptography import utils
-from cryptography.exceptions import InternalError, _Reasons
-from cryptography.hazmat.backends import _available_backends
-from cryptography.hazmat.primitives.ciphers import Cipher, CipherAlgorithm
-from cryptography.hazmat.primitives.ciphers.algorithms import AES
-from cryptography.hazmat.primitives.ciphers.modes import CBC, GCM
-
-from ...utils import raises_unsupported_algorithm
-
-
-@utils.register_interface(CipherAlgorithm)
-class DummyCipher(object):
- name = "dummy-cipher"
- block_size = None
- key_size = None
-
-
-@pytest.mark.skipif("commoncrypto" not in
- [i.name for i in _available_backends()],
- reason="CommonCrypto not available")
-class TestCommonCrypto(object):
- def test_supports_cipher(self):
- from cryptography.hazmat.backends.commoncrypto.backend import backend
- assert backend.cipher_supported(None, None) is False
-
- def test_register_duplicate_cipher_adapter(self):
- from cryptography.hazmat.backends.commoncrypto.backend import backend
- with pytest.raises(ValueError):
- backend._register_cipher_adapter(
- AES, backend._lib.kCCAlgorithmAES128,
- CBC, backend._lib.kCCModeCBC
- )
-
- def test_handle_response(self):
- from cryptography.hazmat.backends.commoncrypto.backend import backend
-
- with pytest.raises(ValueError):
- backend._check_cipher_response(backend._lib.kCCAlignmentError)
-
- with pytest.raises(InternalError):
- backend._check_cipher_response(backend._lib.kCCMemoryFailure)
-
- with pytest.raises(InternalError):
- backend._check_cipher_response(backend._lib.kCCDecodeError)
-
- def test_nonexistent_aead_cipher(self):
- from cryptography.hazmat.backends.commoncrypto.backend import Backend
- b = Backend()
- cipher = Cipher(
- DummyCipher(), GCM(b"fake_iv_here"), backend=b,
- )
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER):
- cipher.encryptor()
diff --git a/tests/hazmat/backends/test_multibackend.py b/tests/hazmat/backends/test_multibackend.py
deleted file mode 100644
index 3c05cdfa..00000000
--- a/tests/hazmat/backends/test_multibackend.py
+++ /dev/null
@@ -1,498 +0,0 @@
-# This file is dual licensed under the terms of the Apache License, Version
-# 2.0, and the BSD License. See the LICENSE file in the root of this repository
-# for complete details.
-
-from __future__ import absolute_import, division, print_function
-
-from cryptography import utils
-from cryptography.exceptions import (
- UnsupportedAlgorithm, _Reasons
-)
-from cryptography.hazmat.backends.interfaces import (
- CMACBackend, CipherBackend, DERSerializationBackend, DSABackend,
- EllipticCurveBackend, HMACBackend, HashBackend, PBKDF2HMACBackend,
- PEMSerializationBackend, RSABackend, X509Backend
-)
-from cryptography.hazmat.backends.multibackend import MultiBackend
-from cryptography.hazmat.primitives import cmac, hashes, hmac
-from cryptography.hazmat.primitives.asymmetric import ec, padding
-from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
-
-from ...utils import raises_unsupported_algorithm
-
-
-@utils.register_interface(CipherBackend)
-class DummyCipherBackend(object):
- def __init__(self, supported_ciphers):
- self._ciphers = supported_ciphers
-
- def cipher_supported(self, cipher, mode):
- return (type(cipher), type(mode)) in self._ciphers
-
- def create_symmetric_encryption_ctx(self, cipher, mode):
- if not self.cipher_supported(cipher, mode):
- raise UnsupportedAlgorithm("", _Reasons.UNSUPPORTED_CIPHER)
-
- def create_symmetric_decryption_ctx(self, cipher, mode):
- if not self.cipher_supported(cipher, mode):
- raise UnsupportedAlgorithm("", _Reasons.UNSUPPORTED_CIPHER)
-
-
-@utils.register_interface(HashBackend)
-class DummyHashBackend(object):
- def __init__(self, supported_algorithms):
- self._algorithms = supported_algorithms
-
- def hash_supported(self, algorithm):
- return type(algorithm) in self._algorithms
-
- def create_hash_ctx(self, algorithm):
- if not self.hash_supported(algorithm):
- raise UnsupportedAlgorithm("", _Reasons.UNSUPPORTED_HASH)
-
-
-@utils.register_interface(HMACBackend)
-class DummyHMACBackend(object):
- def __init__(self, supported_algorithms):
- self._algorithms = supported_algorithms
-
- def hmac_supported(self, algorithm):
- return type(algorithm) in self._algorithms
-
- def create_hmac_ctx(self, key, algorithm):
- if not self.hmac_supported(algorithm):
- raise UnsupportedAlgorithm("", _Reasons.UNSUPPORTED_HASH)
-
-
-@utils.register_interface(PBKDF2HMACBackend)
-class DummyPBKDF2HMACBackend(object):
- def __init__(self, supported_algorithms):
- self._algorithms = supported_algorithms
-
- def pbkdf2_hmac_supported(self, algorithm):
- return type(algorithm) in self._algorithms
-
- def derive_pbkdf2_hmac(self, algorithm, length, salt, iterations,
- key_material):
- if not self.pbkdf2_hmac_supported(algorithm):
- raise UnsupportedAlgorithm("", _Reasons.UNSUPPORTED_HASH)
-
-
-@utils.register_interface(RSABackend)
-class DummyRSABackend(object):
- def generate_rsa_private_key(self, public_exponent, key_size):
- pass
-
- def rsa_padding_supported(self, padding):
- pass
-
- def generate_rsa_parameters_supported(self, public_exponent, key_size):
- pass
-
- def load_rsa_private_numbers(self, numbers):
- pass
-
- def load_rsa_public_numbers(self, numbers):
- pass
-
-
-@utils.register_interface(DSABackend)
-class DummyDSABackend(object):
- def generate_dsa_parameters(self, key_size):
- pass
-
- def generate_dsa_private_key(self, parameters):
- pass
-
- def generate_dsa_private_key_and_parameters(self, key_size):
- pass
-
- def dsa_hash_supported(self, algorithm):
- pass
-
- def dsa_parameters_supported(self, p, q, g):
- pass
-
- def load_dsa_private_numbers(self, numbers):
- pass
-
- def load_dsa_public_numbers(self, numbers):
- pass
-
- def load_dsa_parameter_numbers(self, numbers):
- pass
-
-
-@utils.register_interface(CMACBackend)
-class DummyCMACBackend(object):
- def __init__(self, supported_algorithms):
- self._algorithms = supported_algorithms
-
- def cmac_algorithm_supported(self, algorithm):
- return type(algorithm) in self._algorithms
-
- def create_cmac_ctx(self, algorithm):
- if not self.cmac_algorithm_supported(algorithm):
- raise UnsupportedAlgorithm("", _Reasons.UNSUPPORTED_CIPHER)
-
-
-@utils.register_interface(EllipticCurveBackend)
-class DummyEllipticCurveBackend(object):
- def __init__(self, supported_curves):
- self._curves = supported_curves
-
- def elliptic_curve_supported(self, curve):
- return any(
- isinstance(curve, curve_type)
- for curve_type in self._curves
- )
-
- def elliptic_curve_signature_algorithm_supported(
- self, signature_algorithm, curve
- ):
- return (
- isinstance(signature_algorithm, ec.ECDSA) and
- any(
- isinstance(curve, curve_type)
- for curve_type in self._curves
- )
- )
-
- def generate_elliptic_curve_private_key(self, curve):
- if not self.elliptic_curve_supported(curve):
- raise UnsupportedAlgorithm(_Reasons.UNSUPPORTED_ELLIPTIC_CURVE)
-
- def load_elliptic_curve_private_numbers(self, numbers):
- if not self.elliptic_curve_supported(numbers.public_numbers.curve):
- raise UnsupportedAlgorithm(_Reasons.UNSUPPORTED_ELLIPTIC_CURVE)
-
- def load_elliptic_curve_public_numbers(self, numbers):
- if not self.elliptic_curve_supported(numbers.curve):
- raise UnsupportedAlgorithm(_Reasons.UNSUPPORTED_ELLIPTIC_CURVE)
-
-
-@utils.register_interface(PEMSerializationBackend)
-class DummyPEMSerializationBackend(object):
- def load_pem_private_key(self, data, password):
- pass
-
- def load_pem_public_key(self, data):
- pass
-
-
-@utils.register_interface(DERSerializationBackend)
-class DummyDERSerializationBackend(object):
- def load_der_private_key(self, data, password):
- pass
-
- def load_der_public_key(self, data):
- pass
-
-
-@utils.register_interface(X509Backend)
-class DummyX509Backend(object):
- def load_pem_x509_certificate(self, data):
- pass
-
- def load_der_x509_certificate(self, data):
- pass
-
- def load_pem_x509_csr(self, data):
- pass
-
- def load_der_x509_csr(self, data):
- pass
-
- def create_x509_csr(self, builder, private_key, algorithm):
- pass
-
-
-class TestMultiBackend(object):
- def test_ciphers(self):
- backend = MultiBackend([
- DummyHashBackend([]),
- DummyCipherBackend([
- (algorithms.AES, modes.CBC),
- ])
- ])
- assert backend.cipher_supported(
- algorithms.AES(b"\x00" * 16), modes.CBC(b"\x00" * 16)
- )
-
- cipher = Cipher(
- algorithms.AES(b"\x00" * 16),
- modes.CBC(b"\x00" * 16),
- backend=backend
- )
- cipher.encryptor()
- cipher.decryptor()
-
- cipher = Cipher(
- algorithms.Camellia(b"\x00" * 16),
- modes.CBC(b"\x00" * 16),
- backend=backend
- )
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER):
- cipher.encryptor()
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER):
- cipher.decryptor()
-
- def test_hashes(self):
- backend = MultiBackend([
- DummyHashBackend([hashes.MD5])
- ])
- assert backend.hash_supported(hashes.MD5())
-
- hashes.Hash(hashes.MD5(), backend=backend)
-
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH):
- hashes.Hash(hashes.SHA1(), backend=backend)
-
- def test_hmac(self):
- backend = MultiBackend([
- DummyHMACBackend([hashes.MD5])
- ])
- assert backend.hmac_supported(hashes.MD5())
-
- hmac.HMAC(b"", hashes.MD5(), backend=backend)
-
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH):
- hmac.HMAC(b"", hashes.SHA1(), backend=backend)
-
- def test_pbkdf2(self):
- backend = MultiBackend([
- DummyPBKDF2HMACBackend([hashes.MD5])
- ])
- assert backend.pbkdf2_hmac_supported(hashes.MD5())
-
- backend.derive_pbkdf2_hmac(hashes.MD5(), 10, b"", 10, b"")
-
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH):
- backend.derive_pbkdf2_hmac(hashes.SHA1(), 10, b"", 10, b"")
-
- def test_rsa(self):
- backend = MultiBackend([
- DummyRSABackend()
- ])
-
- backend.generate_rsa_private_key(
- key_size=1024, public_exponent=65537
- )
-
- backend.rsa_padding_supported(padding.PKCS1v15())
-
- backend.generate_rsa_parameters_supported(65537, 1024)
-
- backend.load_rsa_private_numbers("private_numbers")
-
- backend.load_rsa_public_numbers("public_numbers")
-
- backend = MultiBackend([])
- with raises_unsupported_algorithm(
- _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM
- ):
- backend.generate_rsa_private_key(key_size=1024, public_exponent=3)
-
- with raises_unsupported_algorithm(
- _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM
- ):
- backend.rsa_padding_supported(padding.PKCS1v15())
-
- with raises_unsupported_algorithm(
- _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM
- ):
- backend.generate_rsa_parameters_supported(65537, 1024)
-
- with raises_unsupported_algorithm(
- _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM
- ):
- backend.load_rsa_private_numbers("private_numbers")
-
- with raises_unsupported_algorithm(
- _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM
- ):
- backend.load_rsa_public_numbers("public_numbers")
-
- def test_dsa(self):
- backend = MultiBackend([
- DummyDSABackend()
- ])
-
- backend.generate_dsa_parameters(key_size=1024)
-
- parameters = object()
- backend.generate_dsa_private_key(parameters)
- backend.generate_dsa_private_key_and_parameters(key_size=1024)
-
- backend.dsa_hash_supported(hashes.SHA1())
- backend.dsa_parameters_supported(1, 2, 3)
- backend.load_dsa_private_numbers("numbers")
- backend.load_dsa_public_numbers("numbers")
- backend.load_dsa_parameter_numbers("numbers")
-
- backend = MultiBackend([])
- with raises_unsupported_algorithm(
- _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM
- ):
- backend.generate_dsa_parameters(key_size=1024)
-
- with raises_unsupported_algorithm(
- _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM
- ):
- backend.generate_dsa_private_key(parameters)
-
- with raises_unsupported_algorithm(
- _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM
- ):
- backend.generate_dsa_private_key_and_parameters(key_size=1024)
-
- with raises_unsupported_algorithm(
- _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM
- ):
- backend.dsa_hash_supported(hashes.SHA1())
-
- with raises_unsupported_algorithm(
- _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM
- ):
- backend.dsa_parameters_supported('p', 'q', 'g')
-
- with raises_unsupported_algorithm(
- _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM
- ):
- backend.load_dsa_private_numbers("numbers")
-
- with raises_unsupported_algorithm(
- _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM
- ):
- backend.load_dsa_public_numbers("numbers")
-
- with raises_unsupported_algorithm(
- _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM
- ):
- backend.load_dsa_parameter_numbers("numbers")
-
- def test_cmac(self):
- backend = MultiBackend([
- DummyCMACBackend([algorithms.AES])
- ])
-
- fake_key = b"\x00" * 16
-
- assert backend.cmac_algorithm_supported(
- algorithms.AES(fake_key)) is True
-
- cmac.CMAC(algorithms.AES(fake_key), backend)
-
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER):
- cmac.CMAC(algorithms.TripleDES(fake_key), backend)
-
- def test_elliptic_curve(self):
- backend = MultiBackend([
- DummyEllipticCurveBackend([
- ec.SECT283K1
- ])
- ])
-
- assert backend.elliptic_curve_supported(ec.SECT283K1()) is True
-
- assert backend.elliptic_curve_signature_algorithm_supported(
- ec.ECDSA(hashes.SHA256()),
- ec.SECT283K1()
- ) is True
-
- backend.generate_elliptic_curve_private_key(ec.SECT283K1())
-
- backend.load_elliptic_curve_private_numbers(
- ec.EllipticCurvePrivateNumbers(
- 1,
- ec.EllipticCurvePublicNumbers(
- 2,
- 3,
- ec.SECT283K1()
- )
- )
- )
-
- backend.load_elliptic_curve_public_numbers(
- ec.EllipticCurvePublicNumbers(
- 2,
- 3,
- ec.SECT283K1()
- )
- )
-
- assert backend.elliptic_curve_supported(ec.SECT163K1()) is False
-
- assert backend.elliptic_curve_signature_algorithm_supported(
- ec.ECDSA(hashes.SHA256()),
- ec.SECT163K1()
- ) is False
-
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_ELLIPTIC_CURVE):
- backend.generate_elliptic_curve_private_key(ec.SECT163K1())
-
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_ELLIPTIC_CURVE):
- backend.load_elliptic_curve_private_numbers(
- ec.EllipticCurvePrivateNumbers(
- 1,
- ec.EllipticCurvePublicNumbers(
- 2,
- 3,
- ec.SECT163K1()
- )
- )
- )
-
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_ELLIPTIC_CURVE):
- backend.load_elliptic_curve_public_numbers(
- ec.EllipticCurvePublicNumbers(
- 2,
- 3,
- ec.SECT163K1()
- )
- )
-
- def test_pem_serialization_backend(self):
- backend = MultiBackend([DummyPEMSerializationBackend()])
-
- backend.load_pem_private_key(b"keydata", None)
- backend.load_pem_public_key(b"keydata")
-
- backend = MultiBackend([])
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
- backend.load_pem_private_key(b"keydata", None)
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
- backend.load_pem_public_key(b"keydata")
-
- def test_der_serialization_backend(self):
- backend = MultiBackend([DummyDERSerializationBackend()])
-
- backend.load_der_private_key(b"keydata", None)
- backend.load_der_public_key(b"keydata")
-
- backend = MultiBackend([])
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
- backend.load_der_private_key(b"keydata", None)
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
- backend.load_der_public_key(b"keydata")
-
- def test_x509_backend(self):
- backend = MultiBackend([DummyX509Backend()])
-
- backend.load_pem_x509_certificate(b"certdata")
- backend.load_der_x509_certificate(b"certdata")
- backend.load_pem_x509_csr(b"reqdata")
- backend.load_der_x509_csr(b"reqdata")
- backend.create_x509_csr(object(), b"privatekey", hashes.SHA1())
-
- backend = MultiBackend([])
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_X509):
- backend.load_pem_x509_certificate(b"certdata")
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_X509):
- backend.load_der_x509_certificate(b"certdata")
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_X509):
- backend.load_pem_x509_csr(b"reqdata")
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_X509):
- backend.load_der_x509_csr(b"reqdata")
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_X509):
- backend.create_x509_csr(object(), b"privatekey", hashes.SHA1())
diff --git a/tests/hazmat/backends/test_openssl.py b/tests/hazmat/backends/test_openssl.py
index 6a2e8a77..44fd3db4 100644
--- a/tests/hazmat/backends/test_openssl.py
+++ b/tests/hazmat/backends/test_openssl.py
@@ -4,60 +4,49 @@
from __future__ import absolute_import, division, print_function
+import itertools
import os
import subprocess
import sys
import textwrap
-import pretend
-
import pytest
-from cryptography import utils
+from cryptography import x509
from cryptography.exceptions import InternalError, _Reasons
-from cryptography.hazmat.backends.interfaces import RSABackend
+from cryptography.hazmat.backends.interfaces import DHBackend, RSABackend
from cryptography.hazmat.backends.openssl.backend import (
Backend, backend
)
from cryptography.hazmat.backends.openssl.ec import _sn_to_elliptic_curve
from cryptography.hazmat.primitives import hashes, serialization
-from cryptography.hazmat.primitives.asymmetric import dsa, ec, padding
-from cryptography.hazmat.primitives.ciphers import (
- BlockCipherAlgorithm, Cipher, CipherAlgorithm
-)
+from cryptography.hazmat.primitives.asymmetric import dh, dsa, padding
+from cryptography.hazmat.primitives.ciphers import Cipher
from cryptography.hazmat.primitives.ciphers.algorithms import AES
-from cryptography.hazmat.primitives.ciphers.modes import CBC, CTR, Mode
+from cryptography.hazmat.primitives.ciphers.modes import CBC
-from ..primitives.fixtures_dsa import DSA_KEY_2048
from ..primitives.fixtures_rsa import RSA_KEY_2048, RSA_KEY_512
-from ..primitives.test_ec import _skip_curve_unsupported
-from ...utils import load_vectors_from_file, raises_unsupported_algorithm
-
-
-@utils.register_interface(Mode)
-class DummyMode(object):
- name = "dummy-mode"
-
- def validate_for_algorithm(self, algorithm):
- pass
-
+from ...doubles import (
+ DummyAsymmetricPadding, DummyCipherAlgorithm, DummyHashAlgorithm, DummyMode
+)
+from ...utils import (
+ load_nist_vectors, load_vectors_from_file, raises_unsupported_algorithm
+)
+from ...x509.test_x509 import _load_cert
-@utils.register_interface(CipherAlgorithm)
-class DummyCipher(object):
- name = "dummy-cipher"
- key_size = None
+def skip_if_libre_ssl(openssl_version):
+ if u'LibreSSL' in openssl_version:
+ pytest.skip("LibreSSL hard-codes RAND_bytes to use arc4random.")
-@utils.register_interface(padding.AsymmetricPadding)
-class DummyPadding(object):
- name = "dummy-cipher"
+class TestLibreSkip(object):
+ def test_skip_no(self):
+ assert skip_if_libre_ssl(u"OpenSSL 1.0.2h 3 May 2016") is None
-@utils.register_interface(hashes.HashAlgorithm)
-class DummyHash(object):
- name = "dummy-hash"
- block_size = None
- digest_size = None
+ def test_skip_yes(self):
+ with pytest.raises(pytest.skip.Exception):
+ skip_if_libre_ssl(u"LibreSSL 2.1.6")
class DummyMGF(object):
@@ -82,14 +71,12 @@ class TestOpenSSL(object):
backend.openssl_version_text().startswith("LibreSSL")
)
+ def test_openssl_version_number(self):
+ assert backend.openssl_version_number() > 0
+
def test_supports_cipher(self):
assert backend.cipher_supported(None, None) is False
- def test_aes_ctr_always_available(self):
- # AES CTR should always be available in both 0.9.8 and 1.0.0+
- assert backend.cipher_supported(AES(b"\x00" * 16),
- CTR(b"\x00" * 16)) is True
-
def test_register_duplicate_cipher_adapter(self):
with pytest.raises(ValueError):
backend.register_cipher_adapter(AES, CBC, None)
@@ -98,16 +85,21 @@ class TestOpenSSL(object):
def test_nonexistent_cipher(self, mode):
b = Backend()
b.register_cipher_adapter(
- DummyCipher,
+ DummyCipherAlgorithm,
type(mode),
lambda backend, cipher, mode: backend._ffi.NULL
)
cipher = Cipher(
- DummyCipher(), mode, backend=b,
+ DummyCipherAlgorithm(), mode, backend=b,
)
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER):
cipher.encryptor()
+ def test_openssl_assert(self):
+ backend.openssl_assert(True)
+ with pytest.raises(InternalError):
+ backend.openssl_assert(False)
+
def test_consume_errors(self):
for i in range(10):
backend._lib.ERR_put_error(backend._lib.ERR_LIB_EVP, 0, 0,
@@ -120,25 +112,8 @@ class TestOpenSSL(object):
assert backend._lib.ERR_peek_error() == 0
assert len(errors) == 10
- def test_openssl_error_string(self):
- backend._lib.ERR_put_error(
- backend._lib.ERR_LIB_EVP,
- backend._lib.EVP_F_EVP_DECRYPTFINAL_EX,
- 0,
- b"test_openssl.py",
- -1
- )
-
- errors = backend._consume_errors()
- exc = backend._unknown_error(errors[0])
-
- assert (
- "digital envelope routines:"
- "EVP_DecryptFinal_ex:digital envelope routines" in str(exc)
- )
-
def test_ssl_ciphers_registered(self):
- meth = backend._lib.TLSv1_method()
+ meth = backend._lib.SSLv23_method()
ctx = backend._lib.SSL_CTX_new(meth)
assert ctx != backend._ffi.NULL
backend._lib.SSL_CTX_free(ctx)
@@ -148,12 +123,9 @@ class TestOpenSSL(object):
assert cipher != backend._ffi.NULL
def test_error_strings_loaded(self):
- # returns a value in a static buffer
- err = backend._lib.ERR_error_string(101183626, backend._ffi.NULL)
- assert backend._ffi.string(err) == (
- b"error:0607F08A:digital envelope routines:EVP_EncryptFinal_ex:"
- b"data not multiple of block length"
- )
+ buf = backend._ffi.new("char[]", 256)
+ backend._lib.ERR_error_string_n(101183626, buf, len(buf))
+ assert b"data not multiple of block length" in backend._ffi.string(buf)
def test_unknown_error_in_cipher_finalize(self):
cipher = Cipher(AES(b"\0" * 16), CBC(b"\0" * 16), backend=backend)
@@ -164,40 +136,19 @@ class TestOpenSSL(object):
with pytest.raises(InternalError):
enc.finalize()
- def test_derive_pbkdf2_raises_unsupported_on_old_openssl(self):
- if backend.pbkdf2_hmac_supported(hashes.SHA256()):
- pytest.skip("Requires an older OpenSSL")
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH):
- backend.derive_pbkdf2_hmac(hashes.SHA256(), 10, b"", 1000, b"")
-
- @pytest.mark.skipif(
- backend._lib.OPENSSL_VERSION_NUMBER >= 0x1000000f,
- reason="Requires an older OpenSSL. Must be < 1.0.0"
- )
- def test_large_key_size_on_old_openssl(self):
- with pytest.raises(ValueError):
- dsa.generate_parameters(2048, backend=backend)
-
- with pytest.raises(ValueError):
- dsa.generate_parameters(3072, backend=backend)
-
- @pytest.mark.skipif(
- backend._lib.OPENSSL_VERSION_NUMBER < 0x1000000f,
- reason="Requires a newer OpenSSL. Must be >= 1.0.0"
- )
def test_large_key_size_on_new_openssl(self):
parameters = dsa.generate_parameters(2048, backend)
param_num = parameters.parameter_numbers()
- assert utils.bit_length(param_num.p) == 2048
+ assert param_num.p.bit_length() == 2048
parameters = dsa.generate_parameters(3072, backend)
param_num = parameters.parameter_numbers()
- assert utils.bit_length(param_num.p) == 3072
+ assert param_num.p.bit_length() == 3072
def test_int_to_bn(self):
value = (2 ** 4242) - 4242
bn = backend._int_to_bn(value)
assert bn != backend._ffi.NULL
- bn = backend._ffi.gc(bn, backend._lib.BN_free)
+ bn = backend._ffi.gc(bn, backend._lib.BN_clear_free)
assert bn
assert backend._bn_to_int(bn) == value
@@ -217,15 +168,29 @@ class TestOpenSSL(object):
assert backend._bn_to_int(bn) == 0
+@pytest.mark.skipif(
+ backend._lib.Cryptography_HAS_ENGINE == 0,
+ reason="Requires OpenSSL with ENGINE support")
class TestOpenSSLRandomEngine(object):
- def teardown_method(self, method):
+ def setup(self):
+ # The default RAND engine is global and shared between
+ # tests. We make sure that the default engine is osrandom
+ # before we start each test and restore the global state to
+ # that engine in teardown.
+ current_default = backend._lib.ENGINE_get_default_RAND()
+ name = backend._lib.ENGINE_get_name(current_default)
+ assert name == backend._lib.Cryptography_osrandom_engine_name
+
+ def teardown(self):
# we need to reset state to being default. backend is a shared global
# for all these tests.
backend.activate_osrandom_engine()
current_default = backend._lib.ENGINE_get_default_RAND()
name = backend._lib.ENGINE_get_name(current_default)
- assert name == backend._binding._osrandom_engine_name
+ assert name == backend._lib.Cryptography_osrandom_engine_name
+ @pytest.mark.skipif(sys.executable is None,
+ reason="No Python interpreter available.")
def test_osrandom_engine_is_default(self, tmpdir):
engine_printer = textwrap.dedent(
"""
@@ -254,18 +219,19 @@ class TestOpenSSLRandomEngine(object):
subprocess.check_call(
[sys.executable, "-c", engine_printer],
env=env,
- stdout=out
+ stdout=out,
+ stderr=subprocess.PIPE,
)
osrandom_engine_name = backend._ffi.string(
- backend._binding._osrandom_engine_name
+ backend._lib.Cryptography_osrandom_engine_name
)
assert engine_name.read().encode('ascii') == osrandom_engine_name
def test_osrandom_sanity_check(self):
# This test serves as a check against catastrophic failure.
- buf = backend._ffi.new("char[]", 500)
+ buf = backend._ffi.new("unsigned char[]", 500)
res = backend._lib.RAND_bytes(buf, 500)
assert res == 1
assert backend._ffi.buffer(buf)[:] != "\x00" * 500
@@ -277,7 +243,7 @@ class TestOpenSSLRandomEngine(object):
backend.activate_osrandom_engine()
e = backend._lib.ENGINE_get_default_RAND()
name = backend._lib.ENGINE_get_name(e)
- assert name == backend._binding._osrandom_engine_name
+ assert name == backend._lib.Cryptography_osrandom_engine_name
res = backend._lib.ENGINE_free(e)
assert res == 1
@@ -285,7 +251,7 @@ class TestOpenSSLRandomEngine(object):
e = backend._lib.ENGINE_get_default_RAND()
assert e != backend._ffi.NULL
name = backend._lib.ENGINE_get_name(e)
- assert name == backend._binding._osrandom_engine_name
+ assert name == backend._lib.Cryptography_osrandom_engine_name
res = backend._lib.ENGINE_free(e)
assert res == 1
backend.activate_builtin_random()
@@ -300,20 +266,50 @@ class TestOpenSSLRandomEngine(object):
e = backend._lib.ENGINE_get_default_RAND()
assert e == backend._ffi.NULL
+ def test_osrandom_engine_implementation(self):
+ name = backend.osrandom_engine_implementation()
+ assert name in ['/dev/urandom', 'CryptGenRandom', 'getentropy',
+ 'getrandom']
+ if sys.platform.startswith('linux'):
+ assert name in ['getrandom', '/dev/urandom']
+ if sys.platform == 'darwin':
+ assert name in ['getentropy', '/dev/urandom']
+ if sys.platform == 'win32':
+ assert name == 'CryptGenRandom'
+
def test_activate_osrandom_already_default(self):
e = backend._lib.ENGINE_get_default_RAND()
name = backend._lib.ENGINE_get_name(e)
- assert name == backend._binding._osrandom_engine_name
+ assert name == backend._lib.Cryptography_osrandom_engine_name
res = backend._lib.ENGINE_free(e)
assert res == 1
backend.activate_osrandom_engine()
e = backend._lib.ENGINE_get_default_RAND()
name = backend._lib.ENGINE_get_name(e)
- assert name == backend._binding._osrandom_engine_name
+ assert name == backend._lib.Cryptography_osrandom_engine_name
res = backend._lib.ENGINE_free(e)
assert res == 1
+@pytest.mark.skipif(
+ backend._lib.Cryptography_HAS_ENGINE == 1,
+ reason="Requires OpenSSL without ENGINE support")
+class TestOpenSSLNoEngine(object):
+ def test_no_engine_support(self):
+ assert backend._ffi.string(
+ backend._lib.Cryptography_osrandom_engine_id
+ ) == b"no-engine-support"
+ assert backend._ffi.string(
+ backend._lib.Cryptography_osrandom_engine_name
+ ) == b"osrandom_engine disabled due to no engine support"
+
+ def test_activate_builtin_random_does_nothing(self):
+ backend.activate_builtin_random()
+
+ def test_activate_osrandom_does_nothing(self):
+ backend.activate_osrandom_engine()
+
+
class TestOpenSSLRSA(object):
def test_generate_rsa_parameters_supported(self):
assert backend.generate_rsa_parameters_supported(1, 1024) is False
@@ -337,42 +333,13 @@ class TestOpenSSLRSA(object):
backend.generate_rsa_private_key(public_exponent=65537,
key_size=256)
- @pytest.mark.skipif(
- backend._lib.OPENSSL_VERSION_NUMBER >= 0x1000100f,
- reason="Requires an older OpenSSL. Must be < 1.0.1"
- )
- def test_non_sha1_pss_mgf1_hash_algorithm_on_old_openssl(self):
- private_key = RSA_KEY_512.private_key(backend)
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH):
- private_key.signer(
- padding.PSS(
- mgf=padding.MGF1(
- algorithm=hashes.SHA256(),
- ),
- salt_length=padding.PSS.MAX_LENGTH
- ),
- hashes.SHA1()
- )
- public_key = private_key.public_key()
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH):
- public_key.verifier(
- b"sig",
- padding.PSS(
- mgf=padding.MGF1(
- algorithm=hashes.SHA256(),
- ),
- salt_length=padding.PSS.MAX_LENGTH
- ),
- hashes.SHA1()
- )
-
def test_rsa_padding_unsupported_pss_mgf1_hash(self):
assert backend.rsa_padding_supported(
- padding.PSS(mgf=padding.MGF1(DummyHash()), salt_length=0)
+ padding.PSS(mgf=padding.MGF1(DummyHashAlgorithm()), salt_length=0)
) is False
def test_rsa_padding_unsupported(self):
- assert backend.rsa_padding_supported(DummyPadding()) is False
+ assert backend.rsa_padding_supported(DummyAsymmetricPadding()) is False
def test_rsa_padding_supported_pkcs1v15(self):
assert backend.rsa_padding_supported(padding.PKCS1v15()) is True
@@ -391,6 +358,27 @@ class TestOpenSSLRSA(object):
),
) is True
+ @pytest.mark.skipif(
+ backend._lib.Cryptography_HAS_RSA_OAEP_MD == 0,
+ reason="Requires OpenSSL with rsa_oaep_md (1.0.2+)"
+ )
+ def test_rsa_padding_supported_oaep_sha2_combinations(self):
+ hashalgs = [
+ hashes.SHA1(),
+ hashes.SHA224(),
+ hashes.SHA256(),
+ hashes.SHA384(),
+ hashes.SHA512(),
+ ]
+ for mgf1alg, oaepalg in itertools.product(hashalgs, hashalgs):
+ assert backend.rsa_padding_supported(
+ padding.OAEP(
+ mgf=padding.MGF1(algorithm=mgf1alg),
+ algorithm=oaepalg,
+ label=None
+ ),
+ ) is True
+
def test_rsa_padding_unsupported_mgf(self):
assert backend.rsa_padding_supported(
padding.OAEP(
@@ -404,9 +392,13 @@ class TestOpenSSLRSA(object):
padding.PSS(mgf=DummyMGF(), salt_length=0)
) is False
+ @pytest.mark.skipif(
+ backend._lib.Cryptography_HAS_RSA_OAEP_MD == 1,
+ reason="Requires OpenSSL without rsa_oaep_md (< 1.0.2)"
+ )
def test_unsupported_mgf1_hash_algorithm_decrypt(self):
private_key = RSA_KEY_512.private_key(backend)
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH):
+ with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_PADDING):
private_key.decrypt(
b"0" * 64,
padding.OAEP(
@@ -416,9 +408,13 @@ class TestOpenSSLRSA(object):
)
)
+ @pytest.mark.skipif(
+ backend._lib.Cryptography_HAS_RSA_OAEP_MD == 1,
+ reason="Requires OpenSSL without rsa_oaep_md (< 1.0.2)"
+ )
def test_unsupported_oaep_hash_algorithm_decrypt(self):
private_key = RSA_KEY_512.private_key(backend)
- with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH):
+ with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_PADDING):
private_key.decrypt(
b"0" * 64,
padding.OAEP(
@@ -428,70 +424,96 @@ class TestOpenSSLRSA(object):
)
)
- def test_unsupported_oaep_label_decrypt(self):
+ def test_unsupported_mgf1_hash_algorithm_md5_decrypt(self):
private_key = RSA_KEY_512.private_key(backend)
- with pytest.raises(ValueError):
+ with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_PADDING):
private_key.decrypt(
b"0" * 64,
padding.OAEP(
- mgf=padding.MGF1(algorithm=hashes.SHA1()),
- algorithm=hashes.SHA1(),
- label=b"label"
+ mgf=padding.MGF1(algorithm=hashes.MD5()),
+ algorithm=hashes.MD5(),
+ label=None
)
)
-@pytest.mark.skipif(
- backend._lib.OPENSSL_VERSION_NUMBER <= 0x10001000,
- reason="Requires an OpenSSL version >= 1.0.1"
-)
class TestOpenSSLCMAC(object):
def test_unsupported_cipher(self):
- @utils.register_interface(BlockCipherAlgorithm)
- class FakeAlgorithm(object):
- block_size = 64
-
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER):
- backend.create_cmac_ctx(FakeAlgorithm())
+ backend.create_cmac_ctx(DummyCipherAlgorithm())
-class TestOpenSSLCreateX509CSR(object):
- @pytest.mark.skipif(
- backend._lib.OPENSSL_VERSION_NUMBER >= 0x10001000,
- reason="Requires an older OpenSSL. Must be < 1.0.1"
- )
- def test_unsupported_dsa_keys(self):
- private_key = DSA_KEY_2048.private_key(backend)
+class TestOpenSSLSignX509Certificate(object):
+ def test_requires_certificate_builder(self):
+ private_key = RSA_KEY_2048.private_key(backend)
- with pytest.raises(NotImplementedError):
- backend.create_x509_csr(object(), private_key, hashes.SHA1())
+ with pytest.raises(TypeError):
+ backend.create_x509_certificate(
+ object(), private_key, DummyHashAlgorithm()
+ )
+
+
+class TestOpenSSLSignX509CSR(object):
+ def test_requires_csr_builder(self):
+ private_key = RSA_KEY_2048.private_key(backend)
+
+ with pytest.raises(TypeError):
+ backend.create_x509_csr(
+ object(), private_key, DummyHashAlgorithm()
+ )
- @pytest.mark.skipif(
- backend._lib.OPENSSL_VERSION_NUMBER >= 0x10001000,
- reason="Requires an older OpenSSL. Must be < 1.0.1"
- )
- def test_unsupported_ec_keys(self):
- _skip_curve_unsupported(backend, ec.SECP256R1())
- private_key = ec.generate_private_key(ec.SECP256R1(), backend)
- with pytest.raises(NotImplementedError):
- backend.create_x509_csr(object(), private_key, hashes.SHA1())
+class TestOpenSSLSignX509CertificateRevocationList(object):
+ def test_invalid_builder(self):
+ private_key = RSA_KEY_2048.private_key(backend)
+ with pytest.raises(TypeError):
+ backend.create_x509_crl(object(), private_key, hashes.SHA256())
-class TestOpenSSLSerialisationWithOpenSSL(object):
- def test_pem_password_cb_buffer_too_small(self):
- ffi_cb, cb = backend._pem_password_cb(b"aa")
- assert cb(None, 1, False, None) == 0
+
+class TestOpenSSLCreateRevokedCertificate(object):
+ def test_invalid_builder(self):
+ with pytest.raises(TypeError):
+ backend.create_x509_revoked_certificate(object())
+
+
+class TestOpenSSLSerializationWithOpenSSL(object):
+ def test_pem_password_cb(self):
+ userdata = backend._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
+ pw = b"abcdefg"
+ password = backend._ffi.new("char []", pw)
+ userdata.password = password
+ userdata.length = len(pw)
+ buflen = 10
+ buf = backend._ffi.new("char []", buflen)
+ res = backend._lib.Cryptography_pem_password_cb(
+ buf, buflen, 0, userdata
+ )
+ assert res == len(pw)
+ assert userdata.called == 1
+ assert backend._ffi.buffer(buf, len(pw))[:] == pw
+ assert userdata.maxsize == buflen
+ assert userdata.error == 0
+
+ def test_pem_password_cb_no_password(self):
+ userdata = backend._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
+ buflen = 10
+ buf = backend._ffi.new("char []", buflen)
+ res = backend._lib.Cryptography_pem_password_cb(
+ buf, buflen, 0, userdata
+ )
+ assert res == 0
+ assert userdata.error == -1
def test_unsupported_evp_pkey_type(self):
- key = pretend.stub(type="unsupported")
+ key = backend._create_evp_pkey_gc()
with raises_unsupported_algorithm(None):
backend._evp_pkey_to_private_key(key)
with raises_unsupported_algorithm(None):
backend._evp_pkey_to_public_key(key)
def test_very_long_pem_serialization_password(self):
- password = "x" * 1024
+ password = b"x" * 1024
with pytest.raises(ValueError):
load_vectors_from_file(
@@ -507,23 +529,7 @@ class TestOpenSSLSerialisationWithOpenSSL(object):
)
-class DummyLibrary(object):
- Cryptography_HAS_EC = 0
-
-
class TestOpenSSLEllipticCurve(object):
- def test_elliptic_curve_supported(self, monkeypatch):
- monkeypatch.setattr(backend, "_lib", DummyLibrary())
-
- assert backend.elliptic_curve_supported(None) is False
-
- def test_elliptic_curve_signature_algorithm_supported(self, monkeypatch):
- monkeypatch.setattr(backend, "_lib", DummyLibrary())
-
- assert backend.elliptic_curve_signature_algorithm_supported(
- None, None
- ) is False
-
def test_sn_to_elliptic_curve_not_supported(self):
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_ELLIPTIC_CURVE):
_sn_to_elliptic_curve(backend, b"fake")
@@ -540,3 +546,101 @@ class TestRSAPEMSerialization(object):
serialization.PrivateFormat.PKCS8,
serialization.BestAvailableEncryption(password)
)
+
+
+class TestGOSTCertificate(object):
+ def test_numeric_string_x509_name_entry(self):
+ cert = _load_cert(
+ os.path.join("x509", "e-trust.ru.der"),
+ x509.load_der_x509_certificate,
+ backend
+ )
+ if backend._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_102I:
+ with pytest.raises(ValueError) as exc:
+ cert.subject
+
+ # We assert on the message in this case because if the certificate
+ # fails to load it will also raise a ValueError and this test could
+ # erroneously pass.
+ assert str(exc.value) == "Unsupported ASN1 string type. Type: 18"
+ else:
+ assert cert.subject.get_attributes_for_oid(
+ x509.ObjectIdentifier("1.2.643.3.131.1.1")
+ )[0].value == "007710474375"
+
+
+@pytest.mark.skipif(
+ backend._lib.Cryptography_HAS_EVP_PKEY_DHX == 1,
+ reason="Requires OpenSSL without EVP_PKEY_DHX (< 1.0.2)")
+@pytest.mark.requires_backend_interface(interface=DHBackend)
+class TestOpenSSLDHSerialization(object):
+
+ @pytest.mark.parametrize(
+ "vector",
+ load_vectors_from_file(
+ os.path.join("asymmetric", "DH", "RFC5114.txt"),
+ load_nist_vectors))
+ def test_dh_serialization_with_q_unsupported(self, backend, vector):
+ parameters = dh.DHParameterNumbers(int(vector["p"], 16),
+ int(vector["g"], 16),
+ int(vector["q"], 16))
+ public = dh.DHPublicNumbers(int(vector["ystatcavs"], 16), parameters)
+ private = dh.DHPrivateNumbers(int(vector["xstatcavs"], 16), public)
+ private_key = private.private_key(backend)
+ public_key = private_key.public_key()
+ with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
+ private_key.private_bytes(serialization.Encoding.PEM,
+ serialization.PrivateFormat.PKCS8,
+ serialization.NoEncryption())
+ with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
+ public_key.public_bytes(
+ serialization.Encoding.PEM,
+ serialization.PublicFormat.SubjectPublicKeyInfo)
+ with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
+ parameters.parameters(backend).parameter_bytes(
+ serialization.Encoding.PEM,
+ serialization.ParameterFormat.PKCS3)
+
+ @pytest.mark.parametrize(
+ ("key_path", "loader_func"),
+ [
+ (
+ os.path.join("asymmetric", "DH", "dhkey_rfc5114_2.pem"),
+ serialization.load_pem_private_key,
+ ),
+ (
+ os.path.join("asymmetric", "DH", "dhkey_rfc5114_2.der"),
+ serialization.load_der_private_key,
+ )
+ ]
+ )
+ def test_private_load_dhx_unsupported(self, key_path, loader_func,
+ backend):
+ key_bytes = load_vectors_from_file(
+ key_path,
+ lambda pemfile: pemfile.read(), mode="rb"
+ )
+ with pytest.raises(ValueError):
+ loader_func(key_bytes, None, backend)
+
+ @pytest.mark.parametrize(
+ ("key_path", "loader_func"),
+ [
+ (
+ os.path.join("asymmetric", "DH", "dhpub_rfc5114_2.pem"),
+ serialization.load_pem_public_key,
+ ),
+ (
+ os.path.join("asymmetric", "DH", "dhpub_rfc5114_2.der"),
+ serialization.load_der_public_key,
+ )
+ ]
+ )
+ def test_public_load_dhx_unsupported(self, key_path, loader_func,
+ backend):
+ key_bytes = load_vectors_from_file(
+ key_path,
+ lambda pemfile: pemfile.read(), mode="rb"
+ )
+ with pytest.raises(ValueError):
+ loader_func(key_bytes, backend)
diff --git a/tests/hazmat/backends/test_openssl_memleak.py b/tests/hazmat/backends/test_openssl_memleak.py
new file mode 100644
index 00000000..935ea3df
--- /dev/null
+++ b/tests/hazmat/backends/test_openssl_memleak.py
@@ -0,0 +1,451 @@
+# This file is dual licensed under the terms of the Apache License, Version
+# 2.0, and the BSD License. See the LICENSE file in the root of this repository
+# for complete details.
+
+from __future__ import absolute_import, division, print_function
+
+import json
+import os
+import subprocess
+import sys
+import textwrap
+
+import pytest
+
+from cryptography.hazmat.bindings.openssl.binding import Binding
+
+
+MEMORY_LEAK_SCRIPT = """
+import sys
+
+
+def main(argv):
+ import gc
+ import json
+
+ import cffi
+
+ from cryptography.hazmat.bindings._openssl import ffi, lib
+
+ heap = {}
+
+ BACKTRACE_ENABLED = False
+ if BACKTRACE_ENABLED:
+ backtrace_ffi = cffi.FFI()
+ backtrace_ffi.cdef('''
+ int backtrace(void **, int);
+ char **backtrace_symbols(void *const *, int);
+ ''')
+ backtrace_lib = backtrace_ffi.dlopen(None)
+
+ def backtrace():
+ buf = backtrace_ffi.new("void*[]", 24)
+ length = backtrace_lib.backtrace(buf, len(buf))
+ return (buf, length)
+
+ def symbolize_backtrace(trace):
+ (buf, length) = trace
+ symbols = backtrace_lib.backtrace_symbols(buf, length)
+ stack = [
+ backtrace_ffi.string(symbols[i]).decode()
+ for i in range(length)
+ ]
+ lib.Cryptography_free_wrapper(symbols, backtrace_ffi.NULL, 0)
+ return stack
+ else:
+ def backtrace():
+ return None
+
+ def symbolize_backtrace(trace):
+ return None
+
+ @ffi.callback("void *(size_t, const char *, int)")
+ def malloc(size, path, line):
+ ptr = lib.Cryptography_malloc_wrapper(size, path, line)
+ heap[ptr] = (size, path, line, backtrace())
+ return ptr
+
+ @ffi.callback("void *(void *, size_t, const char *, int)")
+ def realloc(ptr, size, path, line):
+ if ptr != ffi.NULL:
+ del heap[ptr]
+ new_ptr = lib.Cryptography_realloc_wrapper(ptr, size, path, line)
+ heap[new_ptr] = (size, path, line, backtrace())
+ return new_ptr
+
+ @ffi.callback("void(void *, const char *, int)")
+ def free(ptr, path, line):
+ if ptr != ffi.NULL:
+ del heap[ptr]
+ lib.Cryptography_free_wrapper(ptr, path, line)
+
+ result = lib.Cryptography_CRYPTO_set_mem_functions(malloc, realloc, free)
+ assert result == 1
+
+ # Trigger a bunch of initialization stuff.
+ import cryptography.hazmat.backends.openssl
+
+ start_heap = set(heap)
+
+ func(*argv[1:])
+ gc.collect()
+ gc.collect()
+ gc.collect()
+
+ if lib.Cryptography_HAS_OPENSSL_CLEANUP:
+ lib.OPENSSL_cleanup()
+
+ # Swap back to the original functions so that if OpenSSL tries to free
+ # something from its atexit handle it won't be going through a Python
+ # function, which will be deallocated when this function returns
+ result = lib.Cryptography_CRYPTO_set_mem_functions(
+ ffi.addressof(lib, "Cryptography_malloc_wrapper"),
+ ffi.addressof(lib, "Cryptography_realloc_wrapper"),
+ ffi.addressof(lib, "Cryptography_free_wrapper"),
+ )
+ assert result == 1
+
+ remaining = set(heap) - start_heap
+
+ if remaining:
+ sys.stdout.write(json.dumps(dict(
+ (int(ffi.cast("size_t", ptr)), {
+ "size": heap[ptr][0],
+ "path": ffi.string(heap[ptr][1]).decode(),
+ "line": heap[ptr][2],
+ "backtrace": symbolize_backtrace(heap[ptr][3]),
+ })
+ for ptr in remaining
+ )))
+ sys.stdout.flush()
+ sys.exit(255)
+
+main(sys.argv)
+"""
+
+
+def assert_no_memory_leaks(s, argv=[]):
+ env = os.environ.copy()
+ env["PYTHONPATH"] = os.pathsep.join(sys.path)
+ argv = [
+ sys.executable, "-c", "{}\n\n{}".format(s, MEMORY_LEAK_SCRIPT)
+ ] + argv
+ # Shell out to a fresh Python process because OpenSSL does not allow you to
+ # install new memory hooks after the first malloc/free occurs.
+ proc = subprocess.Popen(
+ argv,
+ env=env,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ try:
+ proc.wait()
+ if proc.returncode == 255:
+ # 255 means there was a leak, load the info about what mallocs
+ # weren't freed.
+ out = json.loads(proc.stdout.read().decode())
+ raise AssertionError(out)
+ elif proc.returncode != 0:
+ # Any exception type will do to be honest
+ raise ValueError(proc.stdout.read(), proc.stderr.read())
+ finally:
+ proc.stdout.close()
+ proc.stderr.close()
+
+
+def skip_if_memtesting_not_supported():
+ return pytest.mark.skipif(
+ not Binding().lib.Cryptography_HAS_MEM_FUNCTIONS,
+ reason="Requires OpenSSL memory functions (>=1.1.0)"
+ )
+
+
+@skip_if_memtesting_not_supported()
+class TestAssertNoMemoryLeaks(object):
+ def test_no_leak_no_malloc(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ pass
+ """))
+
+ def test_no_leak_free(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography.hazmat.bindings.openssl.binding import Binding
+ b = Binding()
+ name = b.lib.X509_NAME_new()
+ b.lib.X509_NAME_free(name)
+ """))
+
+ def test_no_leak_gc(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography.hazmat.bindings.openssl.binding import Binding
+ b = Binding()
+ name = b.lib.X509_NAME_new()
+ b.ffi.gc(name, b.lib.X509_NAME_free)
+ """))
+
+ def test_leak(self):
+ with pytest.raises(AssertionError):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography.hazmat.bindings.openssl.binding import (
+ Binding
+ )
+ b = Binding()
+ b.lib.X509_NAME_new()
+ """))
+
+ def test_errors(self):
+ with pytest.raises(ValueError):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ raise ZeroDivisionError
+ """))
+
+
+@skip_if_memtesting_not_supported()
+class TestOpenSSLMemoryLeaks(object):
+ @pytest.mark.parametrize("path", [
+ "x509/PKITS_data/certs/ValidcRLIssuerTest28EE.crt",
+ ])
+ def test_der_x509_certificate_extensions(self, path):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func(path):
+ from cryptography import x509
+ from cryptography.hazmat.backends.openssl import backend
+
+ import cryptography_vectors
+
+ with cryptography_vectors.open_vector_file(path, "rb") as f:
+ cert = x509.load_der_x509_certificate(
+ f.read(), backend
+ )
+
+ cert.extensions
+ """), [path])
+
+ @pytest.mark.parametrize("path", [
+ "x509/cryptography.io.pem",
+ ])
+ def test_pem_x509_certificate_extensions(self, path):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func(path):
+ from cryptography import x509
+ from cryptography.hazmat.backends.openssl import backend
+
+ import cryptography_vectors
+
+ with cryptography_vectors.open_vector_file(path, "rb") as f:
+ cert = x509.load_pem_x509_certificate(
+ f.read(), backend
+ )
+
+ cert.extensions
+ """), [path])
+
+ def test_x509_csr_extensions(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography import x509
+ from cryptography.hazmat.backends.openssl import backend
+ from cryptography.hazmat.primitives import hashes
+ from cryptography.hazmat.primitives.asymmetric import rsa
+
+ private_key = rsa.generate_private_key(
+ key_size=2048, public_exponent=65537, backend=backend
+ )
+ cert = x509.CertificateSigningRequestBuilder().subject_name(
+ x509.Name([])
+ ).add_extension(
+ x509.OCSPNoCheck(), critical=False
+ ).sign(private_key, hashes.SHA256(), backend)
+
+ cert.extensions
+ """))
+
+ def test_ec_private_numbers_private_key(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography.hazmat.backends.openssl import backend
+ from cryptography.hazmat.primitives.asymmetric import ec
+
+ ec.EllipticCurvePrivateNumbers(
+ private_value=int(
+ '280814107134858470598753916394807521398239633534281633982576099083'
+ '35787109896602102090002196616273211495718603965098'
+ ),
+ public_numbers=ec.EllipticCurvePublicNumbers(
+ curve=ec.SECP384R1(),
+ x=int(
+ '10036914308591746758780165503819213553101287571902957054148542'
+ '504671046744460374996612408381962208627004841444205030'
+ ),
+ y=int(
+ '17337335659928075994560513699823544906448896792102247714689323'
+ '575406618073069185107088229463828921069465902299522926'
+ )
+ )
+ ).private_key(backend)
+ """))
+
+ def test_ec_derive_private_key(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography.hazmat.backends.openssl import backend
+ from cryptography.hazmat.primitives.asymmetric import ec
+ ec.derive_private_key(1, ec.SECP256R1(), backend)
+ """))
+
+ def test_x25519_pubkey_from_private_key(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography.hazmat.primitives.asymmetric import x25519
+ private_key = x25519.X25519PrivateKey.generate()
+ private_key.public_key()
+ """))
+
+ def test_create_ocsp_request(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ from cryptography import x509
+ from cryptography.hazmat.backends.openssl import backend
+ from cryptography.hazmat.primitives import hashes
+ from cryptography.x509 import ocsp
+ import cryptography_vectors
+
+ path = "x509/PKITS_data/certs/ValidcRLIssuerTest28EE.crt"
+ with cryptography_vectors.open_vector_file(path, "rb") as f:
+ cert = x509.load_der_x509_certificate(
+ f.read(), backend
+ )
+ builder = ocsp.OCSPRequestBuilder()
+ builder = builder.add_certificate(
+ cert, cert, hashes.SHA1()
+ ).add_extension(x509.OCSPNonce(b"0000"), False)
+ req = builder.build()
+ """))
+
+ @pytest.mark.parametrize("path", [
+ "pkcs12/cert-aes256cbc-no-key.p12",
+ "pkcs12/cert-key-aes256cbc.p12",
+ ])
+ def test_load_pkcs12_key_and_certificates(self, path):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func(path):
+ from cryptography import x509
+ from cryptography.hazmat.backends.openssl import backend
+ from cryptography.hazmat.primitives.serialization import pkcs12
+ import cryptography_vectors
+
+ with cryptography_vectors.open_vector_file(path, "rb") as f:
+ pkcs12.load_key_and_certificates(
+ f.read(), b"cryptography", backend
+ )
+ """), [path])
+
+ def test_create_crl_with_idp(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ import datetime
+ from cryptography import x509
+ from cryptography.hazmat.backends.openssl import backend
+ from cryptography.hazmat.primitives import hashes
+ from cryptography.hazmat.primitives.asymmetric import ec
+ from cryptography.x509.oid import NameOID
+
+ key = ec.generate_private_key(ec.SECP256R1(), backend)
+ last_update = datetime.datetime(2002, 1, 1, 12, 1)
+ next_update = datetime.datetime(2030, 1, 1, 12, 1)
+ idp = x509.IssuingDistributionPoint(
+ full_name=None,
+ relative_name=x509.RelativeDistinguishedName([
+ x509.NameAttribute(
+ oid=x509.NameOID.ORGANIZATION_NAME, value=u"PyCA")
+ ]),
+ only_contains_user_certs=False,
+ only_contains_ca_certs=True,
+ only_some_reasons=None,
+ indirect_crl=False,
+ only_contains_attribute_certs=False,
+ )
+ builder = x509.CertificateRevocationListBuilder().issuer_name(
+ x509.Name([
+ x509.NameAttribute(
+ NameOID.COMMON_NAME, u"cryptography.io CA"
+ )
+ ])
+ ).last_update(
+ last_update
+ ).next_update(
+ next_update
+ ).add_extension(
+ idp, True
+ )
+
+ crl = builder.sign(key, hashes.SHA256(), backend)
+ crl.extensions.get_extension_for_class(
+ x509.IssuingDistributionPoint
+ )
+ """))
+
+ def test_create_certificate_with_extensions(self):
+ assert_no_memory_leaks(textwrap.dedent("""
+ def func():
+ import datetime
+
+ from cryptography import x509
+ from cryptography.hazmat.backends.openssl import backend
+ from cryptography.hazmat.primitives import hashes
+ from cryptography.hazmat.primitives.asymmetric import ec
+ from cryptography.x509.oid import (
+ AuthorityInformationAccessOID, ExtendedKeyUsageOID, NameOID
+ )
+
+ private_key = ec.generate_private_key(ec.SECP256R1(), backend)
+
+ not_valid_before = datetime.datetime.now()
+ not_valid_after = not_valid_before + datetime.timedelta(days=365)
+
+ aia = x509.AuthorityInformationAccess([
+ x509.AccessDescription(
+ AuthorityInformationAccessOID.OCSP,
+ x509.UniformResourceIdentifier(u"http://ocsp.domain.com")
+ ),
+ x509.AccessDescription(
+ AuthorityInformationAccessOID.CA_ISSUERS,
+ x509.UniformResourceIdentifier(u"http://domain.com/ca.crt")
+ )
+ ])
+ sans = [u'*.example.org', u'foobar.example.net']
+ san = x509.SubjectAlternativeName(list(map(x509.DNSName, sans)))
+
+ ski = x509.SubjectKeyIdentifier.from_public_key(
+ private_key.public_key()
+ )
+ eku = x509.ExtendedKeyUsage([
+ ExtendedKeyUsageOID.CLIENT_AUTH,
+ ExtendedKeyUsageOID.SERVER_AUTH,
+ ExtendedKeyUsageOID.CODE_SIGNING,
+ ])
+
+ builder = x509.CertificateBuilder().serial_number(
+ 777
+ ).issuer_name(x509.Name([
+ x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'),
+ ])).subject_name(x509.Name([
+ x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'),
+ ])).public_key(
+ private_key.public_key()
+ ).add_extension(
+ aia, critical=False
+ ).not_valid_before(
+ not_valid_before
+ ).not_valid_after(
+ not_valid_after
+ )
+
+ cert = builder.sign(private_key, hashes.SHA256(), backend)
+ cert.extensions
+ """))