diff options
Diffstat (limited to 'tests/hazmat/backends')
| -rw-r--r-- | tests/hazmat/backends/test_commoncrypto.py | 61 | ||||
| -rw-r--r-- | tests/hazmat/backends/test_multibackend.py | 498 | ||||
| -rw-r--r-- | tests/hazmat/backends/test_openssl.py | 472 | ||||
| -rw-r--r-- | tests/hazmat/backends/test_openssl_memleak.py | 451 |
4 files changed, 739 insertions, 743 deletions
diff --git a/tests/hazmat/backends/test_commoncrypto.py b/tests/hazmat/backends/test_commoncrypto.py deleted file mode 100644 index f7200016..00000000 --- a/tests/hazmat/backends/test_commoncrypto.py +++ /dev/null @@ -1,61 +0,0 @@ -# This file is dual licensed under the terms of the Apache License, Version -# 2.0, and the BSD License. See the LICENSE file in the root of this repository -# for complete details. - -from __future__ import absolute_import, division, print_function - -import pytest - -from cryptography import utils -from cryptography.exceptions import InternalError, _Reasons -from cryptography.hazmat.backends import _available_backends -from cryptography.hazmat.primitives.ciphers import Cipher, CipherAlgorithm -from cryptography.hazmat.primitives.ciphers.algorithms import AES -from cryptography.hazmat.primitives.ciphers.modes import CBC, GCM - -from ...utils import raises_unsupported_algorithm - - -@utils.register_interface(CipherAlgorithm) -class DummyCipher(object): - name = "dummy-cipher" - block_size = None - key_size = None - - -@pytest.mark.skipif("commoncrypto" not in - [i.name for i in _available_backends()], - reason="CommonCrypto not available") -class TestCommonCrypto(object): - def test_supports_cipher(self): - from cryptography.hazmat.backends.commoncrypto.backend import backend - assert backend.cipher_supported(None, None) is False - - def test_register_duplicate_cipher_adapter(self): - from cryptography.hazmat.backends.commoncrypto.backend import backend - with pytest.raises(ValueError): - backend._register_cipher_adapter( - AES, backend._lib.kCCAlgorithmAES128, - CBC, backend._lib.kCCModeCBC - ) - - def test_handle_response(self): - from cryptography.hazmat.backends.commoncrypto.backend import backend - - with pytest.raises(ValueError): - backend._check_cipher_response(backend._lib.kCCAlignmentError) - - with pytest.raises(InternalError): - backend._check_cipher_response(backend._lib.kCCMemoryFailure) - - with pytest.raises(InternalError): - backend._check_cipher_response(backend._lib.kCCDecodeError) - - def test_nonexistent_aead_cipher(self): - from cryptography.hazmat.backends.commoncrypto.backend import Backend - b = Backend() - cipher = Cipher( - DummyCipher(), GCM(b"fake_iv_here"), backend=b, - ) - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER): - cipher.encryptor() diff --git a/tests/hazmat/backends/test_multibackend.py b/tests/hazmat/backends/test_multibackend.py deleted file mode 100644 index 3c05cdfa..00000000 --- a/tests/hazmat/backends/test_multibackend.py +++ /dev/null @@ -1,498 +0,0 @@ -# This file is dual licensed under the terms of the Apache License, Version -# 2.0, and the BSD License. See the LICENSE file in the root of this repository -# for complete details. - -from __future__ import absolute_import, division, print_function - -from cryptography import utils -from cryptography.exceptions import ( - UnsupportedAlgorithm, _Reasons -) -from cryptography.hazmat.backends.interfaces import ( - CMACBackend, CipherBackend, DERSerializationBackend, DSABackend, - EllipticCurveBackend, HMACBackend, HashBackend, PBKDF2HMACBackend, - PEMSerializationBackend, RSABackend, X509Backend -) -from cryptography.hazmat.backends.multibackend import MultiBackend -from cryptography.hazmat.primitives import cmac, hashes, hmac -from cryptography.hazmat.primitives.asymmetric import ec, padding -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes - -from ...utils import raises_unsupported_algorithm - - -@utils.register_interface(CipherBackend) -class DummyCipherBackend(object): - def __init__(self, supported_ciphers): - self._ciphers = supported_ciphers - - def cipher_supported(self, cipher, mode): - return (type(cipher), type(mode)) in self._ciphers - - def create_symmetric_encryption_ctx(self, cipher, mode): - if not self.cipher_supported(cipher, mode): - raise UnsupportedAlgorithm("", _Reasons.UNSUPPORTED_CIPHER) - - def create_symmetric_decryption_ctx(self, cipher, mode): - if not self.cipher_supported(cipher, mode): - raise UnsupportedAlgorithm("", _Reasons.UNSUPPORTED_CIPHER) - - -@utils.register_interface(HashBackend) -class DummyHashBackend(object): - def __init__(self, supported_algorithms): - self._algorithms = supported_algorithms - - def hash_supported(self, algorithm): - return type(algorithm) in self._algorithms - - def create_hash_ctx(self, algorithm): - if not self.hash_supported(algorithm): - raise UnsupportedAlgorithm("", _Reasons.UNSUPPORTED_HASH) - - -@utils.register_interface(HMACBackend) -class DummyHMACBackend(object): - def __init__(self, supported_algorithms): - self._algorithms = supported_algorithms - - def hmac_supported(self, algorithm): - return type(algorithm) in self._algorithms - - def create_hmac_ctx(self, key, algorithm): - if not self.hmac_supported(algorithm): - raise UnsupportedAlgorithm("", _Reasons.UNSUPPORTED_HASH) - - -@utils.register_interface(PBKDF2HMACBackend) -class DummyPBKDF2HMACBackend(object): - def __init__(self, supported_algorithms): - self._algorithms = supported_algorithms - - def pbkdf2_hmac_supported(self, algorithm): - return type(algorithm) in self._algorithms - - def derive_pbkdf2_hmac(self, algorithm, length, salt, iterations, - key_material): - if not self.pbkdf2_hmac_supported(algorithm): - raise UnsupportedAlgorithm("", _Reasons.UNSUPPORTED_HASH) - - -@utils.register_interface(RSABackend) -class DummyRSABackend(object): - def generate_rsa_private_key(self, public_exponent, key_size): - pass - - def rsa_padding_supported(self, padding): - pass - - def generate_rsa_parameters_supported(self, public_exponent, key_size): - pass - - def load_rsa_private_numbers(self, numbers): - pass - - def load_rsa_public_numbers(self, numbers): - pass - - -@utils.register_interface(DSABackend) -class DummyDSABackend(object): - def generate_dsa_parameters(self, key_size): - pass - - def generate_dsa_private_key(self, parameters): - pass - - def generate_dsa_private_key_and_parameters(self, key_size): - pass - - def dsa_hash_supported(self, algorithm): - pass - - def dsa_parameters_supported(self, p, q, g): - pass - - def load_dsa_private_numbers(self, numbers): - pass - - def load_dsa_public_numbers(self, numbers): - pass - - def load_dsa_parameter_numbers(self, numbers): - pass - - -@utils.register_interface(CMACBackend) -class DummyCMACBackend(object): - def __init__(self, supported_algorithms): - self._algorithms = supported_algorithms - - def cmac_algorithm_supported(self, algorithm): - return type(algorithm) in self._algorithms - - def create_cmac_ctx(self, algorithm): - if not self.cmac_algorithm_supported(algorithm): - raise UnsupportedAlgorithm("", _Reasons.UNSUPPORTED_CIPHER) - - -@utils.register_interface(EllipticCurveBackend) -class DummyEllipticCurveBackend(object): - def __init__(self, supported_curves): - self._curves = supported_curves - - def elliptic_curve_supported(self, curve): - return any( - isinstance(curve, curve_type) - for curve_type in self._curves - ) - - def elliptic_curve_signature_algorithm_supported( - self, signature_algorithm, curve - ): - return ( - isinstance(signature_algorithm, ec.ECDSA) and - any( - isinstance(curve, curve_type) - for curve_type in self._curves - ) - ) - - def generate_elliptic_curve_private_key(self, curve): - if not self.elliptic_curve_supported(curve): - raise UnsupportedAlgorithm(_Reasons.UNSUPPORTED_ELLIPTIC_CURVE) - - def load_elliptic_curve_private_numbers(self, numbers): - if not self.elliptic_curve_supported(numbers.public_numbers.curve): - raise UnsupportedAlgorithm(_Reasons.UNSUPPORTED_ELLIPTIC_CURVE) - - def load_elliptic_curve_public_numbers(self, numbers): - if not self.elliptic_curve_supported(numbers.curve): - raise UnsupportedAlgorithm(_Reasons.UNSUPPORTED_ELLIPTIC_CURVE) - - -@utils.register_interface(PEMSerializationBackend) -class DummyPEMSerializationBackend(object): - def load_pem_private_key(self, data, password): - pass - - def load_pem_public_key(self, data): - pass - - -@utils.register_interface(DERSerializationBackend) -class DummyDERSerializationBackend(object): - def load_der_private_key(self, data, password): - pass - - def load_der_public_key(self, data): - pass - - -@utils.register_interface(X509Backend) -class DummyX509Backend(object): - def load_pem_x509_certificate(self, data): - pass - - def load_der_x509_certificate(self, data): - pass - - def load_pem_x509_csr(self, data): - pass - - def load_der_x509_csr(self, data): - pass - - def create_x509_csr(self, builder, private_key, algorithm): - pass - - -class TestMultiBackend(object): - def test_ciphers(self): - backend = MultiBackend([ - DummyHashBackend([]), - DummyCipherBackend([ - (algorithms.AES, modes.CBC), - ]) - ]) - assert backend.cipher_supported( - algorithms.AES(b"\x00" * 16), modes.CBC(b"\x00" * 16) - ) - - cipher = Cipher( - algorithms.AES(b"\x00" * 16), - modes.CBC(b"\x00" * 16), - backend=backend - ) - cipher.encryptor() - cipher.decryptor() - - cipher = Cipher( - algorithms.Camellia(b"\x00" * 16), - modes.CBC(b"\x00" * 16), - backend=backend - ) - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER): - cipher.encryptor() - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER): - cipher.decryptor() - - def test_hashes(self): - backend = MultiBackend([ - DummyHashBackend([hashes.MD5]) - ]) - assert backend.hash_supported(hashes.MD5()) - - hashes.Hash(hashes.MD5(), backend=backend) - - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH): - hashes.Hash(hashes.SHA1(), backend=backend) - - def test_hmac(self): - backend = MultiBackend([ - DummyHMACBackend([hashes.MD5]) - ]) - assert backend.hmac_supported(hashes.MD5()) - - hmac.HMAC(b"", hashes.MD5(), backend=backend) - - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH): - hmac.HMAC(b"", hashes.SHA1(), backend=backend) - - def test_pbkdf2(self): - backend = MultiBackend([ - DummyPBKDF2HMACBackend([hashes.MD5]) - ]) - assert backend.pbkdf2_hmac_supported(hashes.MD5()) - - backend.derive_pbkdf2_hmac(hashes.MD5(), 10, b"", 10, b"") - - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH): - backend.derive_pbkdf2_hmac(hashes.SHA1(), 10, b"", 10, b"") - - def test_rsa(self): - backend = MultiBackend([ - DummyRSABackend() - ]) - - backend.generate_rsa_private_key( - key_size=1024, public_exponent=65537 - ) - - backend.rsa_padding_supported(padding.PKCS1v15()) - - backend.generate_rsa_parameters_supported(65537, 1024) - - backend.load_rsa_private_numbers("private_numbers") - - backend.load_rsa_public_numbers("public_numbers") - - backend = MultiBackend([]) - with raises_unsupported_algorithm( - _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM - ): - backend.generate_rsa_private_key(key_size=1024, public_exponent=3) - - with raises_unsupported_algorithm( - _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM - ): - backend.rsa_padding_supported(padding.PKCS1v15()) - - with raises_unsupported_algorithm( - _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM - ): - backend.generate_rsa_parameters_supported(65537, 1024) - - with raises_unsupported_algorithm( - _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM - ): - backend.load_rsa_private_numbers("private_numbers") - - with raises_unsupported_algorithm( - _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM - ): - backend.load_rsa_public_numbers("public_numbers") - - def test_dsa(self): - backend = MultiBackend([ - DummyDSABackend() - ]) - - backend.generate_dsa_parameters(key_size=1024) - - parameters = object() - backend.generate_dsa_private_key(parameters) - backend.generate_dsa_private_key_and_parameters(key_size=1024) - - backend.dsa_hash_supported(hashes.SHA1()) - backend.dsa_parameters_supported(1, 2, 3) - backend.load_dsa_private_numbers("numbers") - backend.load_dsa_public_numbers("numbers") - backend.load_dsa_parameter_numbers("numbers") - - backend = MultiBackend([]) - with raises_unsupported_algorithm( - _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM - ): - backend.generate_dsa_parameters(key_size=1024) - - with raises_unsupported_algorithm( - _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM - ): - backend.generate_dsa_private_key(parameters) - - with raises_unsupported_algorithm( - _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM - ): - backend.generate_dsa_private_key_and_parameters(key_size=1024) - - with raises_unsupported_algorithm( - _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM - ): - backend.dsa_hash_supported(hashes.SHA1()) - - with raises_unsupported_algorithm( - _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM - ): - backend.dsa_parameters_supported('p', 'q', 'g') - - with raises_unsupported_algorithm( - _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM - ): - backend.load_dsa_private_numbers("numbers") - - with raises_unsupported_algorithm( - _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM - ): - backend.load_dsa_public_numbers("numbers") - - with raises_unsupported_algorithm( - _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM - ): - backend.load_dsa_parameter_numbers("numbers") - - def test_cmac(self): - backend = MultiBackend([ - DummyCMACBackend([algorithms.AES]) - ]) - - fake_key = b"\x00" * 16 - - assert backend.cmac_algorithm_supported( - algorithms.AES(fake_key)) is True - - cmac.CMAC(algorithms.AES(fake_key), backend) - - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER): - cmac.CMAC(algorithms.TripleDES(fake_key), backend) - - def test_elliptic_curve(self): - backend = MultiBackend([ - DummyEllipticCurveBackend([ - ec.SECT283K1 - ]) - ]) - - assert backend.elliptic_curve_supported(ec.SECT283K1()) is True - - assert backend.elliptic_curve_signature_algorithm_supported( - ec.ECDSA(hashes.SHA256()), - ec.SECT283K1() - ) is True - - backend.generate_elliptic_curve_private_key(ec.SECT283K1()) - - backend.load_elliptic_curve_private_numbers( - ec.EllipticCurvePrivateNumbers( - 1, - ec.EllipticCurvePublicNumbers( - 2, - 3, - ec.SECT283K1() - ) - ) - ) - - backend.load_elliptic_curve_public_numbers( - ec.EllipticCurvePublicNumbers( - 2, - 3, - ec.SECT283K1() - ) - ) - - assert backend.elliptic_curve_supported(ec.SECT163K1()) is False - - assert backend.elliptic_curve_signature_algorithm_supported( - ec.ECDSA(hashes.SHA256()), - ec.SECT163K1() - ) is False - - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_ELLIPTIC_CURVE): - backend.generate_elliptic_curve_private_key(ec.SECT163K1()) - - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_ELLIPTIC_CURVE): - backend.load_elliptic_curve_private_numbers( - ec.EllipticCurvePrivateNumbers( - 1, - ec.EllipticCurvePublicNumbers( - 2, - 3, - ec.SECT163K1() - ) - ) - ) - - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_ELLIPTIC_CURVE): - backend.load_elliptic_curve_public_numbers( - ec.EllipticCurvePublicNumbers( - 2, - 3, - ec.SECT163K1() - ) - ) - - def test_pem_serialization_backend(self): - backend = MultiBackend([DummyPEMSerializationBackend()]) - - backend.load_pem_private_key(b"keydata", None) - backend.load_pem_public_key(b"keydata") - - backend = MultiBackend([]) - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION): - backend.load_pem_private_key(b"keydata", None) - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION): - backend.load_pem_public_key(b"keydata") - - def test_der_serialization_backend(self): - backend = MultiBackend([DummyDERSerializationBackend()]) - - backend.load_der_private_key(b"keydata", None) - backend.load_der_public_key(b"keydata") - - backend = MultiBackend([]) - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION): - backend.load_der_private_key(b"keydata", None) - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION): - backend.load_der_public_key(b"keydata") - - def test_x509_backend(self): - backend = MultiBackend([DummyX509Backend()]) - - backend.load_pem_x509_certificate(b"certdata") - backend.load_der_x509_certificate(b"certdata") - backend.load_pem_x509_csr(b"reqdata") - backend.load_der_x509_csr(b"reqdata") - backend.create_x509_csr(object(), b"privatekey", hashes.SHA1()) - - backend = MultiBackend([]) - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_X509): - backend.load_pem_x509_certificate(b"certdata") - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_X509): - backend.load_der_x509_certificate(b"certdata") - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_X509): - backend.load_pem_x509_csr(b"reqdata") - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_X509): - backend.load_der_x509_csr(b"reqdata") - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_X509): - backend.create_x509_csr(object(), b"privatekey", hashes.SHA1()) diff --git a/tests/hazmat/backends/test_openssl.py b/tests/hazmat/backends/test_openssl.py index 6a2e8a77..44fd3db4 100644 --- a/tests/hazmat/backends/test_openssl.py +++ b/tests/hazmat/backends/test_openssl.py @@ -4,60 +4,49 @@ from __future__ import absolute_import, division, print_function +import itertools import os import subprocess import sys import textwrap -import pretend - import pytest -from cryptography import utils +from cryptography import x509 from cryptography.exceptions import InternalError, _Reasons -from cryptography.hazmat.backends.interfaces import RSABackend +from cryptography.hazmat.backends.interfaces import DHBackend, RSABackend from cryptography.hazmat.backends.openssl.backend import ( Backend, backend ) from cryptography.hazmat.backends.openssl.ec import _sn_to_elliptic_curve from cryptography.hazmat.primitives import hashes, serialization -from cryptography.hazmat.primitives.asymmetric import dsa, ec, padding -from cryptography.hazmat.primitives.ciphers import ( - BlockCipherAlgorithm, Cipher, CipherAlgorithm -) +from cryptography.hazmat.primitives.asymmetric import dh, dsa, padding +from cryptography.hazmat.primitives.ciphers import Cipher from cryptography.hazmat.primitives.ciphers.algorithms import AES -from cryptography.hazmat.primitives.ciphers.modes import CBC, CTR, Mode +from cryptography.hazmat.primitives.ciphers.modes import CBC -from ..primitives.fixtures_dsa import DSA_KEY_2048 from ..primitives.fixtures_rsa import RSA_KEY_2048, RSA_KEY_512 -from ..primitives.test_ec import _skip_curve_unsupported -from ...utils import load_vectors_from_file, raises_unsupported_algorithm - - -@utils.register_interface(Mode) -class DummyMode(object): - name = "dummy-mode" - - def validate_for_algorithm(self, algorithm): - pass - +from ...doubles import ( + DummyAsymmetricPadding, DummyCipherAlgorithm, DummyHashAlgorithm, DummyMode +) +from ...utils import ( + load_nist_vectors, load_vectors_from_file, raises_unsupported_algorithm +) +from ...x509.test_x509 import _load_cert -@utils.register_interface(CipherAlgorithm) -class DummyCipher(object): - name = "dummy-cipher" - key_size = None +def skip_if_libre_ssl(openssl_version): + if u'LibreSSL' in openssl_version: + pytest.skip("LibreSSL hard-codes RAND_bytes to use arc4random.") -@utils.register_interface(padding.AsymmetricPadding) -class DummyPadding(object): - name = "dummy-cipher" +class TestLibreSkip(object): + def test_skip_no(self): + assert skip_if_libre_ssl(u"OpenSSL 1.0.2h 3 May 2016") is None -@utils.register_interface(hashes.HashAlgorithm) -class DummyHash(object): - name = "dummy-hash" - block_size = None - digest_size = None + def test_skip_yes(self): + with pytest.raises(pytest.skip.Exception): + skip_if_libre_ssl(u"LibreSSL 2.1.6") class DummyMGF(object): @@ -82,14 +71,12 @@ class TestOpenSSL(object): backend.openssl_version_text().startswith("LibreSSL") ) + def test_openssl_version_number(self): + assert backend.openssl_version_number() > 0 + def test_supports_cipher(self): assert backend.cipher_supported(None, None) is False - def test_aes_ctr_always_available(self): - # AES CTR should always be available in both 0.9.8 and 1.0.0+ - assert backend.cipher_supported(AES(b"\x00" * 16), - CTR(b"\x00" * 16)) is True - def test_register_duplicate_cipher_adapter(self): with pytest.raises(ValueError): backend.register_cipher_adapter(AES, CBC, None) @@ -98,16 +85,21 @@ class TestOpenSSL(object): def test_nonexistent_cipher(self, mode): b = Backend() b.register_cipher_adapter( - DummyCipher, + DummyCipherAlgorithm, type(mode), lambda backend, cipher, mode: backend._ffi.NULL ) cipher = Cipher( - DummyCipher(), mode, backend=b, + DummyCipherAlgorithm(), mode, backend=b, ) with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER): cipher.encryptor() + def test_openssl_assert(self): + backend.openssl_assert(True) + with pytest.raises(InternalError): + backend.openssl_assert(False) + def test_consume_errors(self): for i in range(10): backend._lib.ERR_put_error(backend._lib.ERR_LIB_EVP, 0, 0, @@ -120,25 +112,8 @@ class TestOpenSSL(object): assert backend._lib.ERR_peek_error() == 0 assert len(errors) == 10 - def test_openssl_error_string(self): - backend._lib.ERR_put_error( - backend._lib.ERR_LIB_EVP, - backend._lib.EVP_F_EVP_DECRYPTFINAL_EX, - 0, - b"test_openssl.py", - -1 - ) - - errors = backend._consume_errors() - exc = backend._unknown_error(errors[0]) - - assert ( - "digital envelope routines:" - "EVP_DecryptFinal_ex:digital envelope routines" in str(exc) - ) - def test_ssl_ciphers_registered(self): - meth = backend._lib.TLSv1_method() + meth = backend._lib.SSLv23_method() ctx = backend._lib.SSL_CTX_new(meth) assert ctx != backend._ffi.NULL backend._lib.SSL_CTX_free(ctx) @@ -148,12 +123,9 @@ class TestOpenSSL(object): assert cipher != backend._ffi.NULL def test_error_strings_loaded(self): - # returns a value in a static buffer - err = backend._lib.ERR_error_string(101183626, backend._ffi.NULL) - assert backend._ffi.string(err) == ( - b"error:0607F08A:digital envelope routines:EVP_EncryptFinal_ex:" - b"data not multiple of block length" - ) + buf = backend._ffi.new("char[]", 256) + backend._lib.ERR_error_string_n(101183626, buf, len(buf)) + assert b"data not multiple of block length" in backend._ffi.string(buf) def test_unknown_error_in_cipher_finalize(self): cipher = Cipher(AES(b"\0" * 16), CBC(b"\0" * 16), backend=backend) @@ -164,40 +136,19 @@ class TestOpenSSL(object): with pytest.raises(InternalError): enc.finalize() - def test_derive_pbkdf2_raises_unsupported_on_old_openssl(self): - if backend.pbkdf2_hmac_supported(hashes.SHA256()): - pytest.skip("Requires an older OpenSSL") - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH): - backend.derive_pbkdf2_hmac(hashes.SHA256(), 10, b"", 1000, b"") - - @pytest.mark.skipif( - backend._lib.OPENSSL_VERSION_NUMBER >= 0x1000000f, - reason="Requires an older OpenSSL. Must be < 1.0.0" - ) - def test_large_key_size_on_old_openssl(self): - with pytest.raises(ValueError): - dsa.generate_parameters(2048, backend=backend) - - with pytest.raises(ValueError): - dsa.generate_parameters(3072, backend=backend) - - @pytest.mark.skipif( - backend._lib.OPENSSL_VERSION_NUMBER < 0x1000000f, - reason="Requires a newer OpenSSL. Must be >= 1.0.0" - ) def test_large_key_size_on_new_openssl(self): parameters = dsa.generate_parameters(2048, backend) param_num = parameters.parameter_numbers() - assert utils.bit_length(param_num.p) == 2048 + assert param_num.p.bit_length() == 2048 parameters = dsa.generate_parameters(3072, backend) param_num = parameters.parameter_numbers() - assert utils.bit_length(param_num.p) == 3072 + assert param_num.p.bit_length() == 3072 def test_int_to_bn(self): value = (2 ** 4242) - 4242 bn = backend._int_to_bn(value) assert bn != backend._ffi.NULL - bn = backend._ffi.gc(bn, backend._lib.BN_free) + bn = backend._ffi.gc(bn, backend._lib.BN_clear_free) assert bn assert backend._bn_to_int(bn) == value @@ -217,15 +168,29 @@ class TestOpenSSL(object): assert backend._bn_to_int(bn) == 0 +@pytest.mark.skipif( + backend._lib.Cryptography_HAS_ENGINE == 0, + reason="Requires OpenSSL with ENGINE support") class TestOpenSSLRandomEngine(object): - def teardown_method(self, method): + def setup(self): + # The default RAND engine is global and shared between + # tests. We make sure that the default engine is osrandom + # before we start each test and restore the global state to + # that engine in teardown. + current_default = backend._lib.ENGINE_get_default_RAND() + name = backend._lib.ENGINE_get_name(current_default) + assert name == backend._lib.Cryptography_osrandom_engine_name + + def teardown(self): # we need to reset state to being default. backend is a shared global # for all these tests. backend.activate_osrandom_engine() current_default = backend._lib.ENGINE_get_default_RAND() name = backend._lib.ENGINE_get_name(current_default) - assert name == backend._binding._osrandom_engine_name + assert name == backend._lib.Cryptography_osrandom_engine_name + @pytest.mark.skipif(sys.executable is None, + reason="No Python interpreter available.") def test_osrandom_engine_is_default(self, tmpdir): engine_printer = textwrap.dedent( """ @@ -254,18 +219,19 @@ class TestOpenSSLRandomEngine(object): subprocess.check_call( [sys.executable, "-c", engine_printer], env=env, - stdout=out + stdout=out, + stderr=subprocess.PIPE, ) osrandom_engine_name = backend._ffi.string( - backend._binding._osrandom_engine_name + backend._lib.Cryptography_osrandom_engine_name ) assert engine_name.read().encode('ascii') == osrandom_engine_name def test_osrandom_sanity_check(self): # This test serves as a check against catastrophic failure. - buf = backend._ffi.new("char[]", 500) + buf = backend._ffi.new("unsigned char[]", 500) res = backend._lib.RAND_bytes(buf, 500) assert res == 1 assert backend._ffi.buffer(buf)[:] != "\x00" * 500 @@ -277,7 +243,7 @@ class TestOpenSSLRandomEngine(object): backend.activate_osrandom_engine() e = backend._lib.ENGINE_get_default_RAND() name = backend._lib.ENGINE_get_name(e) - assert name == backend._binding._osrandom_engine_name + assert name == backend._lib.Cryptography_osrandom_engine_name res = backend._lib.ENGINE_free(e) assert res == 1 @@ -285,7 +251,7 @@ class TestOpenSSLRandomEngine(object): e = backend._lib.ENGINE_get_default_RAND() assert e != backend._ffi.NULL name = backend._lib.ENGINE_get_name(e) - assert name == backend._binding._osrandom_engine_name + assert name == backend._lib.Cryptography_osrandom_engine_name res = backend._lib.ENGINE_free(e) assert res == 1 backend.activate_builtin_random() @@ -300,20 +266,50 @@ class TestOpenSSLRandomEngine(object): e = backend._lib.ENGINE_get_default_RAND() assert e == backend._ffi.NULL + def test_osrandom_engine_implementation(self): + name = backend.osrandom_engine_implementation() + assert name in ['/dev/urandom', 'CryptGenRandom', 'getentropy', + 'getrandom'] + if sys.platform.startswith('linux'): + assert name in ['getrandom', '/dev/urandom'] + if sys.platform == 'darwin': + assert name in ['getentropy', '/dev/urandom'] + if sys.platform == 'win32': + assert name == 'CryptGenRandom' + def test_activate_osrandom_already_default(self): e = backend._lib.ENGINE_get_default_RAND() name = backend._lib.ENGINE_get_name(e) - assert name == backend._binding._osrandom_engine_name + assert name == backend._lib.Cryptography_osrandom_engine_name res = backend._lib.ENGINE_free(e) assert res == 1 backend.activate_osrandom_engine() e = backend._lib.ENGINE_get_default_RAND() name = backend._lib.ENGINE_get_name(e) - assert name == backend._binding._osrandom_engine_name + assert name == backend._lib.Cryptography_osrandom_engine_name res = backend._lib.ENGINE_free(e) assert res == 1 +@pytest.mark.skipif( + backend._lib.Cryptography_HAS_ENGINE == 1, + reason="Requires OpenSSL without ENGINE support") +class TestOpenSSLNoEngine(object): + def test_no_engine_support(self): + assert backend._ffi.string( + backend._lib.Cryptography_osrandom_engine_id + ) == b"no-engine-support" + assert backend._ffi.string( + backend._lib.Cryptography_osrandom_engine_name + ) == b"osrandom_engine disabled due to no engine support" + + def test_activate_builtin_random_does_nothing(self): + backend.activate_builtin_random() + + def test_activate_osrandom_does_nothing(self): + backend.activate_osrandom_engine() + + class TestOpenSSLRSA(object): def test_generate_rsa_parameters_supported(self): assert backend.generate_rsa_parameters_supported(1, 1024) is False @@ -337,42 +333,13 @@ class TestOpenSSLRSA(object): backend.generate_rsa_private_key(public_exponent=65537, key_size=256) - @pytest.mark.skipif( - backend._lib.OPENSSL_VERSION_NUMBER >= 0x1000100f, - reason="Requires an older OpenSSL. Must be < 1.0.1" - ) - def test_non_sha1_pss_mgf1_hash_algorithm_on_old_openssl(self): - private_key = RSA_KEY_512.private_key(backend) - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH): - private_key.signer( - padding.PSS( - mgf=padding.MGF1( - algorithm=hashes.SHA256(), - ), - salt_length=padding.PSS.MAX_LENGTH - ), - hashes.SHA1() - ) - public_key = private_key.public_key() - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH): - public_key.verifier( - b"sig", - padding.PSS( - mgf=padding.MGF1( - algorithm=hashes.SHA256(), - ), - salt_length=padding.PSS.MAX_LENGTH - ), - hashes.SHA1() - ) - def test_rsa_padding_unsupported_pss_mgf1_hash(self): assert backend.rsa_padding_supported( - padding.PSS(mgf=padding.MGF1(DummyHash()), salt_length=0) + padding.PSS(mgf=padding.MGF1(DummyHashAlgorithm()), salt_length=0) ) is False def test_rsa_padding_unsupported(self): - assert backend.rsa_padding_supported(DummyPadding()) is False + assert backend.rsa_padding_supported(DummyAsymmetricPadding()) is False def test_rsa_padding_supported_pkcs1v15(self): assert backend.rsa_padding_supported(padding.PKCS1v15()) is True @@ -391,6 +358,27 @@ class TestOpenSSLRSA(object): ), ) is True + @pytest.mark.skipif( + backend._lib.Cryptography_HAS_RSA_OAEP_MD == 0, + reason="Requires OpenSSL with rsa_oaep_md (1.0.2+)" + ) + def test_rsa_padding_supported_oaep_sha2_combinations(self): + hashalgs = [ + hashes.SHA1(), + hashes.SHA224(), + hashes.SHA256(), + hashes.SHA384(), + hashes.SHA512(), + ] + for mgf1alg, oaepalg in itertools.product(hashalgs, hashalgs): + assert backend.rsa_padding_supported( + padding.OAEP( + mgf=padding.MGF1(algorithm=mgf1alg), + algorithm=oaepalg, + label=None + ), + ) is True + def test_rsa_padding_unsupported_mgf(self): assert backend.rsa_padding_supported( padding.OAEP( @@ -404,9 +392,13 @@ class TestOpenSSLRSA(object): padding.PSS(mgf=DummyMGF(), salt_length=0) ) is False + @pytest.mark.skipif( + backend._lib.Cryptography_HAS_RSA_OAEP_MD == 1, + reason="Requires OpenSSL without rsa_oaep_md (< 1.0.2)" + ) def test_unsupported_mgf1_hash_algorithm_decrypt(self): private_key = RSA_KEY_512.private_key(backend) - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH): + with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_PADDING): private_key.decrypt( b"0" * 64, padding.OAEP( @@ -416,9 +408,13 @@ class TestOpenSSLRSA(object): ) ) + @pytest.mark.skipif( + backend._lib.Cryptography_HAS_RSA_OAEP_MD == 1, + reason="Requires OpenSSL without rsa_oaep_md (< 1.0.2)" + ) def test_unsupported_oaep_hash_algorithm_decrypt(self): private_key = RSA_KEY_512.private_key(backend) - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH): + with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_PADDING): private_key.decrypt( b"0" * 64, padding.OAEP( @@ -428,70 +424,96 @@ class TestOpenSSLRSA(object): ) ) - def test_unsupported_oaep_label_decrypt(self): + def test_unsupported_mgf1_hash_algorithm_md5_decrypt(self): private_key = RSA_KEY_512.private_key(backend) - with pytest.raises(ValueError): + with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_PADDING): private_key.decrypt( b"0" * 64, padding.OAEP( - mgf=padding.MGF1(algorithm=hashes.SHA1()), - algorithm=hashes.SHA1(), - label=b"label" + mgf=padding.MGF1(algorithm=hashes.MD5()), + algorithm=hashes.MD5(), + label=None ) ) -@pytest.mark.skipif( - backend._lib.OPENSSL_VERSION_NUMBER <= 0x10001000, - reason="Requires an OpenSSL version >= 1.0.1" -) class TestOpenSSLCMAC(object): def test_unsupported_cipher(self): - @utils.register_interface(BlockCipherAlgorithm) - class FakeAlgorithm(object): - block_size = 64 - with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER): - backend.create_cmac_ctx(FakeAlgorithm()) + backend.create_cmac_ctx(DummyCipherAlgorithm()) -class TestOpenSSLCreateX509CSR(object): - @pytest.mark.skipif( - backend._lib.OPENSSL_VERSION_NUMBER >= 0x10001000, - reason="Requires an older OpenSSL. Must be < 1.0.1" - ) - def test_unsupported_dsa_keys(self): - private_key = DSA_KEY_2048.private_key(backend) +class TestOpenSSLSignX509Certificate(object): + def test_requires_certificate_builder(self): + private_key = RSA_KEY_2048.private_key(backend) - with pytest.raises(NotImplementedError): - backend.create_x509_csr(object(), private_key, hashes.SHA1()) + with pytest.raises(TypeError): + backend.create_x509_certificate( + object(), private_key, DummyHashAlgorithm() + ) + + +class TestOpenSSLSignX509CSR(object): + def test_requires_csr_builder(self): + private_key = RSA_KEY_2048.private_key(backend) + + with pytest.raises(TypeError): + backend.create_x509_csr( + object(), private_key, DummyHashAlgorithm() + ) - @pytest.mark.skipif( - backend._lib.OPENSSL_VERSION_NUMBER >= 0x10001000, - reason="Requires an older OpenSSL. Must be < 1.0.1" - ) - def test_unsupported_ec_keys(self): - _skip_curve_unsupported(backend, ec.SECP256R1()) - private_key = ec.generate_private_key(ec.SECP256R1(), backend) - with pytest.raises(NotImplementedError): - backend.create_x509_csr(object(), private_key, hashes.SHA1()) +class TestOpenSSLSignX509CertificateRevocationList(object): + def test_invalid_builder(self): + private_key = RSA_KEY_2048.private_key(backend) + with pytest.raises(TypeError): + backend.create_x509_crl(object(), private_key, hashes.SHA256()) -class TestOpenSSLSerialisationWithOpenSSL(object): - def test_pem_password_cb_buffer_too_small(self): - ffi_cb, cb = backend._pem_password_cb(b"aa") - assert cb(None, 1, False, None) == 0 + +class TestOpenSSLCreateRevokedCertificate(object): + def test_invalid_builder(self): + with pytest.raises(TypeError): + backend.create_x509_revoked_certificate(object()) + + +class TestOpenSSLSerializationWithOpenSSL(object): + def test_pem_password_cb(self): + userdata = backend._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") + pw = b"abcdefg" + password = backend._ffi.new("char []", pw) + userdata.password = password + userdata.length = len(pw) + buflen = 10 + buf = backend._ffi.new("char []", buflen) + res = backend._lib.Cryptography_pem_password_cb( + buf, buflen, 0, userdata + ) + assert res == len(pw) + assert userdata.called == 1 + assert backend._ffi.buffer(buf, len(pw))[:] == pw + assert userdata.maxsize == buflen + assert userdata.error == 0 + + def test_pem_password_cb_no_password(self): + userdata = backend._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") + buflen = 10 + buf = backend._ffi.new("char []", buflen) + res = backend._lib.Cryptography_pem_password_cb( + buf, buflen, 0, userdata + ) + assert res == 0 + assert userdata.error == -1 def test_unsupported_evp_pkey_type(self): - key = pretend.stub(type="unsupported") + key = backend._create_evp_pkey_gc() with raises_unsupported_algorithm(None): backend._evp_pkey_to_private_key(key) with raises_unsupported_algorithm(None): backend._evp_pkey_to_public_key(key) def test_very_long_pem_serialization_password(self): - password = "x" * 1024 + password = b"x" * 1024 with pytest.raises(ValueError): load_vectors_from_file( @@ -507,23 +529,7 @@ class TestOpenSSLSerialisationWithOpenSSL(object): ) -class DummyLibrary(object): - Cryptography_HAS_EC = 0 - - class TestOpenSSLEllipticCurve(object): - def test_elliptic_curve_supported(self, monkeypatch): - monkeypatch.setattr(backend, "_lib", DummyLibrary()) - - assert backend.elliptic_curve_supported(None) is False - - def test_elliptic_curve_signature_algorithm_supported(self, monkeypatch): - monkeypatch.setattr(backend, "_lib", DummyLibrary()) - - assert backend.elliptic_curve_signature_algorithm_supported( - None, None - ) is False - def test_sn_to_elliptic_curve_not_supported(self): with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_ELLIPTIC_CURVE): _sn_to_elliptic_curve(backend, b"fake") @@ -540,3 +546,101 @@ class TestRSAPEMSerialization(object): serialization.PrivateFormat.PKCS8, serialization.BestAvailableEncryption(password) ) + + +class TestGOSTCertificate(object): + def test_numeric_string_x509_name_entry(self): + cert = _load_cert( + os.path.join("x509", "e-trust.ru.der"), + x509.load_der_x509_certificate, + backend + ) + if backend._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_102I: + with pytest.raises(ValueError) as exc: + cert.subject + + # We assert on the message in this case because if the certificate + # fails to load it will also raise a ValueError and this test could + # erroneously pass. + assert str(exc.value) == "Unsupported ASN1 string type. Type: 18" + else: + assert cert.subject.get_attributes_for_oid( + x509.ObjectIdentifier("1.2.643.3.131.1.1") + )[0].value == "007710474375" + + +@pytest.mark.skipif( + backend._lib.Cryptography_HAS_EVP_PKEY_DHX == 1, + reason="Requires OpenSSL without EVP_PKEY_DHX (< 1.0.2)") +@pytest.mark.requires_backend_interface(interface=DHBackend) +class TestOpenSSLDHSerialization(object): + + @pytest.mark.parametrize( + "vector", + load_vectors_from_file( + os.path.join("asymmetric", "DH", "RFC5114.txt"), + load_nist_vectors)) + def test_dh_serialization_with_q_unsupported(self, backend, vector): + parameters = dh.DHParameterNumbers(int(vector["p"], 16), + int(vector["g"], 16), + int(vector["q"], 16)) + public = dh.DHPublicNumbers(int(vector["ystatcavs"], 16), parameters) + private = dh.DHPrivateNumbers(int(vector["xstatcavs"], 16), public) + private_key = private.private_key(backend) + public_key = private_key.public_key() + with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION): + private_key.private_bytes(serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption()) + with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION): + public_key.public_bytes( + serialization.Encoding.PEM, + serialization.PublicFormat.SubjectPublicKeyInfo) + with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION): + parameters.parameters(backend).parameter_bytes( + serialization.Encoding.PEM, + serialization.ParameterFormat.PKCS3) + + @pytest.mark.parametrize( + ("key_path", "loader_func"), + [ + ( + os.path.join("asymmetric", "DH", "dhkey_rfc5114_2.pem"), + serialization.load_pem_private_key, + ), + ( + os.path.join("asymmetric", "DH", "dhkey_rfc5114_2.der"), + serialization.load_der_private_key, + ) + ] + ) + def test_private_load_dhx_unsupported(self, key_path, loader_func, + backend): + key_bytes = load_vectors_from_file( + key_path, + lambda pemfile: pemfile.read(), mode="rb" + ) + with pytest.raises(ValueError): + loader_func(key_bytes, None, backend) + + @pytest.mark.parametrize( + ("key_path", "loader_func"), + [ + ( + os.path.join("asymmetric", "DH", "dhpub_rfc5114_2.pem"), + serialization.load_pem_public_key, + ), + ( + os.path.join("asymmetric", "DH", "dhpub_rfc5114_2.der"), + serialization.load_der_public_key, + ) + ] + ) + def test_public_load_dhx_unsupported(self, key_path, loader_func, + backend): + key_bytes = load_vectors_from_file( + key_path, + lambda pemfile: pemfile.read(), mode="rb" + ) + with pytest.raises(ValueError): + loader_func(key_bytes, backend) diff --git a/tests/hazmat/backends/test_openssl_memleak.py b/tests/hazmat/backends/test_openssl_memleak.py new file mode 100644 index 00000000..935ea3df --- /dev/null +++ b/tests/hazmat/backends/test_openssl_memleak.py @@ -0,0 +1,451 @@ +# This file is dual licensed under the terms of the Apache License, Version +# 2.0, and the BSD License. See the LICENSE file in the root of this repository +# for complete details. + +from __future__ import absolute_import, division, print_function + +import json +import os +import subprocess +import sys +import textwrap + +import pytest + +from cryptography.hazmat.bindings.openssl.binding import Binding + + +MEMORY_LEAK_SCRIPT = """ +import sys + + +def main(argv): + import gc + import json + + import cffi + + from cryptography.hazmat.bindings._openssl import ffi, lib + + heap = {} + + BACKTRACE_ENABLED = False + if BACKTRACE_ENABLED: + backtrace_ffi = cffi.FFI() + backtrace_ffi.cdef(''' + int backtrace(void **, int); + char **backtrace_symbols(void *const *, int); + ''') + backtrace_lib = backtrace_ffi.dlopen(None) + + def backtrace(): + buf = backtrace_ffi.new("void*[]", 24) + length = backtrace_lib.backtrace(buf, len(buf)) + return (buf, length) + + def symbolize_backtrace(trace): + (buf, length) = trace + symbols = backtrace_lib.backtrace_symbols(buf, length) + stack = [ + backtrace_ffi.string(symbols[i]).decode() + for i in range(length) + ] + lib.Cryptography_free_wrapper(symbols, backtrace_ffi.NULL, 0) + return stack + else: + def backtrace(): + return None + + def symbolize_backtrace(trace): + return None + + @ffi.callback("void *(size_t, const char *, int)") + def malloc(size, path, line): + ptr = lib.Cryptography_malloc_wrapper(size, path, line) + heap[ptr] = (size, path, line, backtrace()) + return ptr + + @ffi.callback("void *(void *, size_t, const char *, int)") + def realloc(ptr, size, path, line): + if ptr != ffi.NULL: + del heap[ptr] + new_ptr = lib.Cryptography_realloc_wrapper(ptr, size, path, line) + heap[new_ptr] = (size, path, line, backtrace()) + return new_ptr + + @ffi.callback("void(void *, const char *, int)") + def free(ptr, path, line): + if ptr != ffi.NULL: + del heap[ptr] + lib.Cryptography_free_wrapper(ptr, path, line) + + result = lib.Cryptography_CRYPTO_set_mem_functions(malloc, realloc, free) + assert result == 1 + + # Trigger a bunch of initialization stuff. + import cryptography.hazmat.backends.openssl + + start_heap = set(heap) + + func(*argv[1:]) + gc.collect() + gc.collect() + gc.collect() + + if lib.Cryptography_HAS_OPENSSL_CLEANUP: + lib.OPENSSL_cleanup() + + # Swap back to the original functions so that if OpenSSL tries to free + # something from its atexit handle it won't be going through a Python + # function, which will be deallocated when this function returns + result = lib.Cryptography_CRYPTO_set_mem_functions( + ffi.addressof(lib, "Cryptography_malloc_wrapper"), + ffi.addressof(lib, "Cryptography_realloc_wrapper"), + ffi.addressof(lib, "Cryptography_free_wrapper"), + ) + assert result == 1 + + remaining = set(heap) - start_heap + + if remaining: + sys.stdout.write(json.dumps(dict( + (int(ffi.cast("size_t", ptr)), { + "size": heap[ptr][0], + "path": ffi.string(heap[ptr][1]).decode(), + "line": heap[ptr][2], + "backtrace": symbolize_backtrace(heap[ptr][3]), + }) + for ptr in remaining + ))) + sys.stdout.flush() + sys.exit(255) + +main(sys.argv) +""" + + +def assert_no_memory_leaks(s, argv=[]): + env = os.environ.copy() + env["PYTHONPATH"] = os.pathsep.join(sys.path) + argv = [ + sys.executable, "-c", "{}\n\n{}".format(s, MEMORY_LEAK_SCRIPT) + ] + argv + # Shell out to a fresh Python process because OpenSSL does not allow you to + # install new memory hooks after the first malloc/free occurs. + proc = subprocess.Popen( + argv, + env=env, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + try: + proc.wait() + if proc.returncode == 255: + # 255 means there was a leak, load the info about what mallocs + # weren't freed. + out = json.loads(proc.stdout.read().decode()) + raise AssertionError(out) + elif proc.returncode != 0: + # Any exception type will do to be honest + raise ValueError(proc.stdout.read(), proc.stderr.read()) + finally: + proc.stdout.close() + proc.stderr.close() + + +def skip_if_memtesting_not_supported(): + return pytest.mark.skipif( + not Binding().lib.Cryptography_HAS_MEM_FUNCTIONS, + reason="Requires OpenSSL memory functions (>=1.1.0)" + ) + + +@skip_if_memtesting_not_supported() +class TestAssertNoMemoryLeaks(object): + def test_no_leak_no_malloc(self): + assert_no_memory_leaks(textwrap.dedent(""" + def func(): + pass + """)) + + def test_no_leak_free(self): + assert_no_memory_leaks(textwrap.dedent(""" + def func(): + from cryptography.hazmat.bindings.openssl.binding import Binding + b = Binding() + name = b.lib.X509_NAME_new() + b.lib.X509_NAME_free(name) + """)) + + def test_no_leak_gc(self): + assert_no_memory_leaks(textwrap.dedent(""" + def func(): + from cryptography.hazmat.bindings.openssl.binding import Binding + b = Binding() + name = b.lib.X509_NAME_new() + b.ffi.gc(name, b.lib.X509_NAME_free) + """)) + + def test_leak(self): + with pytest.raises(AssertionError): + assert_no_memory_leaks(textwrap.dedent(""" + def func(): + from cryptography.hazmat.bindings.openssl.binding import ( + Binding + ) + b = Binding() + b.lib.X509_NAME_new() + """)) + + def test_errors(self): + with pytest.raises(ValueError): + assert_no_memory_leaks(textwrap.dedent(""" + def func(): + raise ZeroDivisionError + """)) + + +@skip_if_memtesting_not_supported() +class TestOpenSSLMemoryLeaks(object): + @pytest.mark.parametrize("path", [ + "x509/PKITS_data/certs/ValidcRLIssuerTest28EE.crt", + ]) + def test_der_x509_certificate_extensions(self, path): + assert_no_memory_leaks(textwrap.dedent(""" + def func(path): + from cryptography import x509 + from cryptography.hazmat.backends.openssl import backend + + import cryptography_vectors + + with cryptography_vectors.open_vector_file(path, "rb") as f: + cert = x509.load_der_x509_certificate( + f.read(), backend + ) + + cert.extensions + """), [path]) + + @pytest.mark.parametrize("path", [ + "x509/cryptography.io.pem", + ]) + def test_pem_x509_certificate_extensions(self, path): + assert_no_memory_leaks(textwrap.dedent(""" + def func(path): + from cryptography import x509 + from cryptography.hazmat.backends.openssl import backend + + import cryptography_vectors + + with cryptography_vectors.open_vector_file(path, "rb") as f: + cert = x509.load_pem_x509_certificate( + f.read(), backend + ) + + cert.extensions + """), [path]) + + def test_x509_csr_extensions(self): + assert_no_memory_leaks(textwrap.dedent(""" + def func(): + from cryptography import x509 + from cryptography.hazmat.backends.openssl import backend + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import rsa + + private_key = rsa.generate_private_key( + key_size=2048, public_exponent=65537, backend=backend + ) + cert = x509.CertificateSigningRequestBuilder().subject_name( + x509.Name([]) + ).add_extension( + x509.OCSPNoCheck(), critical=False + ).sign(private_key, hashes.SHA256(), backend) + + cert.extensions + """)) + + def test_ec_private_numbers_private_key(self): + assert_no_memory_leaks(textwrap.dedent(""" + def func(): + from cryptography.hazmat.backends.openssl import backend + from cryptography.hazmat.primitives.asymmetric import ec + + ec.EllipticCurvePrivateNumbers( + private_value=int( + '280814107134858470598753916394807521398239633534281633982576099083' + '35787109896602102090002196616273211495718603965098' + ), + public_numbers=ec.EllipticCurvePublicNumbers( + curve=ec.SECP384R1(), + x=int( + '10036914308591746758780165503819213553101287571902957054148542' + '504671046744460374996612408381962208627004841444205030' + ), + y=int( + '17337335659928075994560513699823544906448896792102247714689323' + '575406618073069185107088229463828921069465902299522926' + ) + ) + ).private_key(backend) + """)) + + def test_ec_derive_private_key(self): + assert_no_memory_leaks(textwrap.dedent(""" + def func(): + from cryptography.hazmat.backends.openssl import backend + from cryptography.hazmat.primitives.asymmetric import ec + ec.derive_private_key(1, ec.SECP256R1(), backend) + """)) + + def test_x25519_pubkey_from_private_key(self): + assert_no_memory_leaks(textwrap.dedent(""" + def func(): + from cryptography.hazmat.primitives.asymmetric import x25519 + private_key = x25519.X25519PrivateKey.generate() + private_key.public_key() + """)) + + def test_create_ocsp_request(self): + assert_no_memory_leaks(textwrap.dedent(""" + def func(): + from cryptography import x509 + from cryptography.hazmat.backends.openssl import backend + from cryptography.hazmat.primitives import hashes + from cryptography.x509 import ocsp + import cryptography_vectors + + path = "x509/PKITS_data/certs/ValidcRLIssuerTest28EE.crt" + with cryptography_vectors.open_vector_file(path, "rb") as f: + cert = x509.load_der_x509_certificate( + f.read(), backend + ) + builder = ocsp.OCSPRequestBuilder() + builder = builder.add_certificate( + cert, cert, hashes.SHA1() + ).add_extension(x509.OCSPNonce(b"0000"), False) + req = builder.build() + """)) + + @pytest.mark.parametrize("path", [ + "pkcs12/cert-aes256cbc-no-key.p12", + "pkcs12/cert-key-aes256cbc.p12", + ]) + def test_load_pkcs12_key_and_certificates(self, path): + assert_no_memory_leaks(textwrap.dedent(""" + def func(path): + from cryptography import x509 + from cryptography.hazmat.backends.openssl import backend + from cryptography.hazmat.primitives.serialization import pkcs12 + import cryptography_vectors + + with cryptography_vectors.open_vector_file(path, "rb") as f: + pkcs12.load_key_and_certificates( + f.read(), b"cryptography", backend + ) + """), [path]) + + def test_create_crl_with_idp(self): + assert_no_memory_leaks(textwrap.dedent(""" + def func(): + import datetime + from cryptography import x509 + from cryptography.hazmat.backends.openssl import backend + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.x509.oid import NameOID + + key = ec.generate_private_key(ec.SECP256R1(), backend) + last_update = datetime.datetime(2002, 1, 1, 12, 1) + next_update = datetime.datetime(2030, 1, 1, 12, 1) + idp = x509.IssuingDistributionPoint( + full_name=None, + relative_name=x509.RelativeDistinguishedName([ + x509.NameAttribute( + oid=x509.NameOID.ORGANIZATION_NAME, value=u"PyCA") + ]), + only_contains_user_certs=False, + only_contains_ca_certs=True, + only_some_reasons=None, + indirect_crl=False, + only_contains_attribute_certs=False, + ) + builder = x509.CertificateRevocationListBuilder().issuer_name( + x509.Name([ + x509.NameAttribute( + NameOID.COMMON_NAME, u"cryptography.io CA" + ) + ]) + ).last_update( + last_update + ).next_update( + next_update + ).add_extension( + idp, True + ) + + crl = builder.sign(key, hashes.SHA256(), backend) + crl.extensions.get_extension_for_class( + x509.IssuingDistributionPoint + ) + """)) + + def test_create_certificate_with_extensions(self): + assert_no_memory_leaks(textwrap.dedent(""" + def func(): + import datetime + + from cryptography import x509 + from cryptography.hazmat.backends.openssl import backend + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.x509.oid import ( + AuthorityInformationAccessOID, ExtendedKeyUsageOID, NameOID + ) + + private_key = ec.generate_private_key(ec.SECP256R1(), backend) + + not_valid_before = datetime.datetime.now() + not_valid_after = not_valid_before + datetime.timedelta(days=365) + + aia = x509.AuthorityInformationAccess([ + x509.AccessDescription( + AuthorityInformationAccessOID.OCSP, + x509.UniformResourceIdentifier(u"http://ocsp.domain.com") + ), + x509.AccessDescription( + AuthorityInformationAccessOID.CA_ISSUERS, + x509.UniformResourceIdentifier(u"http://domain.com/ca.crt") + ) + ]) + sans = [u'*.example.org', u'foobar.example.net'] + san = x509.SubjectAlternativeName(list(map(x509.DNSName, sans))) + + ski = x509.SubjectKeyIdentifier.from_public_key( + private_key.public_key() + ) + eku = x509.ExtendedKeyUsage([ + ExtendedKeyUsageOID.CLIENT_AUTH, + ExtendedKeyUsageOID.SERVER_AUTH, + ExtendedKeyUsageOID.CODE_SIGNING, + ]) + + builder = x509.CertificateBuilder().serial_number( + 777 + ).issuer_name(x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'), + ])).subject_name(x509.Name([ + x509.NameAttribute(NameOID.COUNTRY_NAME, u'US'), + ])).public_key( + private_key.public_key() + ).add_extension( + aia, critical=False + ).not_valid_before( + not_valid_before + ).not_valid_after( + not_valid_after + ) + + cert = builder.sign(private_key, hashes.SHA256(), backend) + cert.extensions + """)) |
