From e3675af0f42e1f3117b61984805c192c1937a64f Mon Sep 17 00:00:00 2001 From: Paul Kehrer Date: Fri, 25 Sep 2015 14:34:43 -0500 Subject: start converting asserts to a function call This prevents situations where asserts are bypassed when running python with -O. --- .../hazmat/backends/openssl/backend.py | 351 +++++++++++---------- 1 file changed, 185 insertions(+), 166 deletions(-) diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index c74d90d4..81d191eb 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -62,6 +62,12 @@ _OpenSSLError = collections.namedtuple("_OpenSSLError", ["code", "lib", "func", "reason"]) +class UnhandledOpenSSLError(Exception): + def __init__(self, msg, errors): + super(UnhandledOpenSSLError, self).__init__(msg) + self.errors = errors + + def _encode_asn1_int(backend, x): """ Converts a python integer to a ASN1_INTEGER. The returned ASN1_INTEGER will @@ -78,7 +84,7 @@ def _encode_asn1_int(backend, x): # Wrap in a ASN.1 integer. Don't GC -- as documented. i = backend._lib.BN_to_ASN1_INTEGER(i, backend._ffi.NULL) - assert i != backend._ffi.NULL + backend.openssl_assert(i != backend._ffi.NULL) return i @@ -94,7 +100,7 @@ def _encode_asn1_str(backend, data, length): """ s = backend._lib.ASN1_OCTET_STRING_new() res = backend._lib.ASN1_OCTET_STRING_set(s, data, length) - assert res == 1 + backend.openssl_assert(res == 1) return s @@ -108,7 +114,7 @@ def _encode_inhibit_any_policy(backend, inhibit_any_policy): asn1int = _encode_asn1_int_gc(backend, inhibit_any_policy.skip_certs) pp = backend._ffi.new('unsigned char **') r = backend._lib.i2d_ASN1_INTEGER(asn1int, pp) - assert r > 0 + backend.openssl_assert(r > 0) pp = backend._ffi.gc( pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0]) ) @@ -130,7 +136,7 @@ def _encode_name(backend, attributes): value, -1, -1, 0, ) - assert res == 1 + backend.openssl_assert(res == 1) return subject @@ -147,7 +153,7 @@ def _txt2obj(backend, name): """ name = name.encode('ascii') obj = backend._lib.OBJ_txt2obj(name, 1) - assert obj != backend._ffi.NULL + backend.openssl_assert(obj != backend._ffi.NULL) return obj @@ -170,33 +176,33 @@ def _encode_key_usage(backend, key_usage): ku = backend._lib.ASN1_BIT_STRING_new() ku = backend._ffi.gc(ku, backend._lib.ASN1_BIT_STRING_free) res = set_bit(ku, 0, key_usage.digital_signature) - assert res == 1 + backend.openssl_assert(res == 1) res = set_bit(ku, 1, key_usage.content_commitment) - assert res == 1 + backend.openssl_assert(res == 1) res = set_bit(ku, 2, key_usage.key_encipherment) - assert res == 1 + backend.openssl_assert(res == 1) res = set_bit(ku, 3, key_usage.data_encipherment) - assert res == 1 + backend.openssl_assert(res == 1) res = set_bit(ku, 4, key_usage.key_agreement) - assert res == 1 + backend.openssl_assert(res == 1) res = set_bit(ku, 5, key_usage.key_cert_sign) - assert res == 1 + backend.openssl_assert(res == 1) res = set_bit(ku, 6, key_usage.crl_sign) - assert res == 1 + backend.openssl_assert(res == 1) if key_usage.key_agreement: res = set_bit(ku, 7, key_usage.encipher_only) - assert res == 1 + backend.openssl_assert(res == 1) res = set_bit(ku, 8, key_usage.decipher_only) - assert res == 1 + backend.openssl_assert(res == 1) else: res = set_bit(ku, 7, 0) - assert res == 1 + backend.openssl_assert(res == 1) res = set_bit(ku, 8, 0) - assert res == 1 + backend.openssl_assert(res == 1) pp = backend._ffi.new('unsigned char **') r = backend._lib.i2d_ASN1_BIT_STRING(ku, pp) - assert r > 0 + backend.openssl_assert(r > 0) pp = backend._ffi.gc( pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0]) ) @@ -205,7 +211,7 @@ def _encode_key_usage(backend, key_usage): def _encode_authority_key_identifier(backend, authority_keyid): akid = backend._lib.AUTHORITY_KEYID_new() - assert akid != backend._ffi.NULL + backend.openssl_assert(akid != backend._ffi.NULL) akid = backend._ffi.gc(akid, backend._lib.AUTHORITY_KEYID_free) if authority_keyid.key_identifier is not None: akid.keyid = _encode_asn1_str( @@ -226,7 +232,7 @@ def _encode_authority_key_identifier(backend, authority_keyid): pp = backend._ffi.new('unsigned char **') r = backend._lib.i2d_AUTHORITY_KEYID(akid, pp) - assert r > 0 + backend.openssl_assert(r > 0) pp = backend._ffi.gc( pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0]) ) @@ -247,7 +253,7 @@ def _encode_basic_constraints(backend, basic_constraints): # Fetch the encoded payload. pp = backend._ffi.new('unsigned char **') r = backend._lib.i2d_BASIC_CONSTRAINTS(constraints, pp) - assert r > 0 + backend.openssl_assert(r > 0) pp = backend._ffi.gc( pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0]) ) @@ -256,7 +262,7 @@ def _encode_basic_constraints(backend, basic_constraints): def _encode_authority_information_access(backend, authority_info_access): aia = backend._lib.sk_ACCESS_DESCRIPTION_new_null() - assert aia != backend._ffi.NULL + backend.openssl_assert(aia != backend._ffi.NULL) aia = backend._ffi.gc( aia, backend._lib.sk_ACCESS_DESCRIPTION_free ) @@ -269,11 +275,11 @@ def _encode_authority_information_access(backend, authority_info_access): ad.method = method ad.location = gn res = backend._lib.sk_ACCESS_DESCRIPTION_push(aia, ad) - assert res >= 1 + backend.openssl_assert(res >= 1) pp = backend._ffi.new('unsigned char **') r = backend._lib.i2d_AUTHORITY_INFO_ACCESS(aia, pp) - assert r > 0 + backend.openssl_assert(r > 0) pp = backend._ffi.gc( pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0]) ) @@ -282,11 +288,11 @@ def _encode_authority_information_access(backend, authority_info_access): def _encode_general_names(backend, names): general_names = backend._lib.GENERAL_NAMES_new() - assert general_names != backend._ffi.NULL + backend.openssl_assert(general_names != backend._ffi.NULL) for name in names: gn = _encode_general_name(backend, name) res = backend._lib.sk_GENERAL_NAME_push(general_names, gn) - assert res != 0 + backend.openssl_assert(res != 0) return general_names @@ -298,7 +304,7 @@ def _encode_alt_name(backend, san): ) pp = backend._ffi.new("unsigned char **") r = backend._lib.i2d_GENERAL_NAMES(general_names, pp) - assert r > 0 + backend.openssl_assert(r > 0) pp = backend._ffi.gc( pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0]) ) @@ -309,7 +315,7 @@ def _encode_subject_key_identifier(backend, ski): asn1_str = _encode_asn1_str_gc(backend, ski.digest, len(ski.digest)) pp = backend._ffi.new("unsigned char **") r = backend._lib.i2d_ASN1_OCTET_STRING(asn1_str, pp) - assert r > 0 + backend.openssl_assert(r > 0) pp = backend._ffi.gc( pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0]) ) @@ -319,11 +325,11 @@ def _encode_subject_key_identifier(backend, ski): def _encode_general_name(backend, name): if isinstance(name, x509.DNSName): gn = backend._lib.GENERAL_NAME_new() - assert gn != backend._ffi.NULL + backend.openssl_assert(gn != backend._ffi.NULL) gn.type = backend._lib.GEN_DNS ia5 = backend._lib.ASN1_IA5STRING_new() - assert ia5 != backend._ffi.NULL + backend.openssl_assert(ia5 != backend._ffi.NULL) if name.value.startswith(u"*."): value = b"*." + idna.encode(name.value[2:]) @@ -331,26 +337,26 @@ def _encode_general_name(backend, name): value = idna.encode(name.value) res = backend._lib.ASN1_STRING_set(ia5, value, len(value)) - assert res == 1 + backend.openssl_assert(res == 1) gn.d.dNSName = ia5 elif isinstance(name, x509.RegisteredID): gn = backend._lib.GENERAL_NAME_new() - assert gn != backend._ffi.NULL + backend.openssl_assert(gn != backend._ffi.NULL) gn.type = backend._lib.GEN_RID obj = backend._lib.OBJ_txt2obj( name.value.dotted_string.encode('ascii'), 1 ) - assert obj != backend._ffi.NULL + backend.openssl_assert(obj != backend._ffi.NULL) gn.d.registeredID = obj elif isinstance(name, x509.DirectoryName): gn = backend._lib.GENERAL_NAME_new() - assert gn != backend._ffi.NULL + backend.openssl_assert(gn != backend._ffi.NULL) dir_name = _encode_name(backend, name.value) gn.type = backend._lib.GEN_DIRNAME gn.d.directoryName = dir_name elif isinstance(name, x509.IPAddress): gn = backend._lib.GENERAL_NAME_new() - assert gn != backend._ffi.NULL + backend.openssl_assert(gn != backend._ffi.NULL) ipaddr = _encode_asn1_str( backend, name.value.packed, len(name.value.packed) ) @@ -358,14 +364,14 @@ def _encode_general_name(backend, name): gn.d.iPAddress = ipaddr elif isinstance(name, x509.OtherName): gn = backend._lib.GENERAL_NAME_new() - assert gn != backend._ffi.NULL + backend.openssl_assert(gn != backend._ffi.NULL) other_name = backend._lib.OTHERNAME_new() - assert other_name != backend._ffi.NULL + backend.openssl_assert(other_name != backend._ffi.NULL) type_id = backend._lib.OBJ_txt2obj( name.type_id.dotted_string.encode('ascii'), 1 ) - assert type_id != backend._ffi.NULL + backend.openssl_assert(type_id != backend._ffi.NULL) data = backend._ffi.new("unsigned char[]", name.value) data_ptr_ptr = backend._ffi.new("unsigned char **") data_ptr_ptr[0] = data @@ -381,7 +387,7 @@ def _encode_general_name(backend, name): gn.d.otherName = other_name elif isinstance(name, x509.RFC822Name): gn = backend._lib.GENERAL_NAME_new() - assert gn != backend._ffi.NULL + backend.openssl_assert(gn != backend._ffi.NULL) asn1_str = _encode_asn1_str( backend, name._encoded, len(name._encoded) ) @@ -389,7 +395,7 @@ def _encode_general_name(backend, name): gn.d.rfc822Name = asn1_str elif isinstance(name, x509.UniformResourceIdentifier): gn = backend._lib.GENERAL_NAME_new() - assert gn != backend._ffi.NULL + backend.openssl_assert(gn != backend._ffi.NULL) asn1_str = _encode_asn1_str( backend, name._encoded, len(name._encoded) ) @@ -409,13 +415,13 @@ def _encode_extended_key_usage(backend, extended_key_usage): for oid in extended_key_usage: obj = _txt2obj(backend, oid.dotted_string) res = backend._lib.sk_ASN1_OBJECT_push(eku, obj) - assert res >= 1 + backend.openssl_assert(res >= 1) pp = backend._ffi.new('unsigned char **') r = backend._lib.i2d_EXTENDED_KEY_USAGE( backend._ffi.cast("EXTENDED_KEY_USAGE *", eku), pp ) - assert r > 0 + backend.openssl_assert(r > 0) pp = backend._ffi.gc( pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0]) ) @@ -439,32 +445,32 @@ def _encode_crl_distribution_points(backend, crl_distribution_points): cdp = backend._ffi.gc(cdp, backend._lib.sk_DIST_POINT_free) for point in crl_distribution_points: dp = backend._lib.DIST_POINT_new() - assert dp != backend._ffi.NULL + backend.openssl_assert(dp != backend._ffi.NULL) if point.reasons: bitmask = backend._lib.ASN1_BIT_STRING_new() - assert bitmask != backend._ffi.NULL + backend.openssl_assert(bitmask != backend._ffi.NULL) dp.reasons = bitmask for reason in point.reasons: res = backend._lib.ASN1_BIT_STRING_set_bit( bitmask, _CRLREASONFLAGS[reason], 1 ) - assert res == 1 + backend.openssl_assert(res == 1) if point.full_name: dpn = backend._lib.DIST_POINT_NAME_new() - assert dpn != backend._ffi.NULL + backend.openssl_assert(dpn != backend._ffi.NULL) dpn.type = _DISTPOINT_TYPE_FULLNAME dpn.name.fullname = _encode_general_names(backend, point.full_name) dp.distpoint = dpn if point.relative_name: dpn = backend._lib.DIST_POINT_NAME_new() - assert dpn != backend._ffi.NULL + backend.openssl_assert(dpn != backend._ffi.NULL) dpn.type = _DISTPOINT_TYPE_RELATIVENAME name = _encode_name_gc(backend, point.relative_name) relativename = backend._lib.sk_X509_NAME_ENTRY_dup(name.entries) - assert relativename != backend._ffi.NULL + backend.openssl_assert(relativename != backend._ffi.NULL) dpn.name.relativename = relativename dp.distpoint = dpn @@ -472,11 +478,11 @@ def _encode_crl_distribution_points(backend, crl_distribution_points): dp.CRLissuer = _encode_general_names(backend, point.crl_issuer) res = backend._lib.sk_DIST_POINT_push(cdp, dp) - assert res >= 1 + backend.openssl_assert(res >= 1) pp = backend._ffi.new('unsigned char **') r = backend._lib.i2d_CRL_DIST_POINTS(cdp, pp) - assert r > 0 + backend.openssl_assert(r > 0) pp = backend._ffi.gc( pp, lambda pointer: backend._lib.OPENSSL_free(pointer[0]) ) @@ -526,7 +532,7 @@ class Backend(object): # is the default for newer OpenSSLs for several years and is # recommended in RFC 2459. res = self._lib.ASN1_STRING_set_default_mask_asc(b"utf8only") - assert res == 1 + self.openssl_assert(res == 1) self._binding.init_static_locks() @@ -534,6 +540,11 @@ class Backend(object): self._register_default_ciphers() self.activate_osrandom_engine() + def openssl_assert(self, ok): + if not ok: + errors = self._consume_errors() + raise UnhandledOpenSSLError("Unknown OpenSSL error", errors) + def activate_builtin_random(self): # Obtain a new structural reference. e = self._lib.ENGINE_get_default_RAND() @@ -543,7 +554,7 @@ class Backend(object): self._lib.RAND_cleanup() # decrement the structural reference from get_default_RAND res = self._lib.ENGINE_finish(e) - assert res == 1 + self.openssl_assert(res == 1) def activate_osrandom_engine(self): # Unregister and free the current engine. @@ -551,19 +562,19 @@ class Backend(object): # Fetches an engine by id and returns it. This creates a structural # reference. e = self._lib.ENGINE_by_id(self._binding._osrandom_engine_id) - assert e != self._ffi.NULL + self.openssl_assert(e != self._ffi.NULL) # Initialize the engine for use. This adds a functional reference. res = self._lib.ENGINE_init(e) - assert res == 1 + self.openssl_assert(res == 1) # Set the engine as the default RAND provider. res = self._lib.ENGINE_set_default_RAND(e) - assert res == 1 + self.openssl_assert(res == 1) # Decrement the structural ref incremented by ENGINE_by_id. res = self._lib.ENGINE_free(e) - assert res == 1 + self.openssl_assert(res == 1) # Decrement the functional ref incremented by ENGINE_init. res = self._lib.ENGINE_finish(e) - assert res == 1 + self.openssl_assert(res == 1) # Reset the RNG to use the new engine. self._lib.RAND_cleanup() @@ -705,7 +716,7 @@ class Backend(object): if self._lib.Cryptography_HAS_PBKDF2_HMAC: evp_md = self._lib.EVP_get_digestbyname( algorithm.name.encode("ascii")) - assert evp_md != self._ffi.NULL + self.openssl_assert(evp_md != self._ffi.NULL) res = self._lib.PKCS5_PBKDF2_HMAC( key_material, len(key_material), @@ -716,7 +727,7 @@ class Backend(object): length, buf ) - assert res == 1 + self.openssl_assert(res == 1) else: if not isinstance(algorithm, hashes.SHA1): raise UnsupportedAlgorithm( @@ -733,7 +744,7 @@ class Backend(object): length, buf ) - assert res == 1 + self.openssl_assert(res == 1) return self._ffi.buffer(buf)[:] @@ -765,7 +776,7 @@ class Backend(object): ) def _bn_to_int(self, bn): - assert bn != self._ffi.NULL + self.openssl_assert(bn != self._ffi.NULL) if six.PY3: # Python 3 has constant time from_bytes, so use that. @@ -773,15 +784,15 @@ class Backend(object): bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes) bin_len = self._lib.BN_bn2bin(bn, bin_ptr) # A zero length means the BN has value 0 - assert bin_len >= 0 - assert bin_ptr != self._ffi.NULL + self.openssl_assert(bin_len >= 0) + self.openssl_assert(bin_ptr != self._ffi.NULL) return int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big") else: # Under Python 2 the best we can do is hex() hex_cdata = self._lib.BN_bn2hex(bn) - assert hex_cdata != self._ffi.NULL + self.openssl_assert(hex_cdata != self._ffi.NULL) hex_str = self._ffi.string(hex_cdata) self._lib.OPENSSL_free(hex_cdata) return int(hex_str, 16) @@ -793,7 +804,7 @@ class Backend(object): ownership of the object). Be sure to register it for GC if it will be discarded after use. """ - assert bn is None or bn != self._ffi.NULL + self.openssl_assert(bn is None or bn != self._ffi.NULL) if bn is None: bn = self._ffi.NULL @@ -803,7 +814,7 @@ class Backend(object): binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big") bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn) - assert bn_ptr != self._ffi.NULL + self.openssl_assert(bn_ptr != self._ffi.NULL) return bn_ptr else: @@ -813,15 +824,15 @@ class Backend(object): bn_ptr = self._ffi.new("BIGNUM **") bn_ptr[0] = bn res = self._lib.BN_hex2bn(bn_ptr, hex_num) - assert res != 0 - assert bn_ptr[0] != self._ffi.NULL + self.openssl_assert(res != 0) + self.openssl_assert(bn_ptr[0] != self._ffi.NULL) return bn_ptr[0] def generate_rsa_private_key(self, public_exponent, key_size): rsa._verify_rsa_parameters(public_exponent, key_size) rsa_cdata = self._lib.RSA_new() - assert rsa_cdata != self._ffi.NULL + self.openssl_assert(rsa_cdata != self._ffi.NULL) rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) bn = self._int_to_bn(public_exponent) @@ -830,7 +841,7 @@ class Backend(object): res = self._lib.RSA_generate_key_ex( rsa_cdata, key_size, bn, self._ffi.NULL ) - assert res == 1 + self.openssl_assert(res == 1) evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) return _RSAPrivateKey(self, rsa_cdata, evp_pkey) @@ -851,7 +862,7 @@ class Backend(object): numbers.public_numbers.n ) rsa_cdata = self._lib.RSA_new() - assert rsa_cdata != self._ffi.NULL + self.openssl_assert(rsa_cdata != self._ffi.NULL) rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) rsa_cdata.p = self._int_to_bn(numbers.p) rsa_cdata.q = self._int_to_bn(numbers.q) @@ -862,7 +873,7 @@ class Backend(object): rsa_cdata.e = self._int_to_bn(numbers.public_numbers.e) rsa_cdata.n = self._int_to_bn(numbers.public_numbers.n) res = self._lib.RSA_blinding_on(rsa_cdata, self._ffi.NULL) - assert res == 1 + self.openssl_assert(res == 1) evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) return _RSAPrivateKey(self, rsa_cdata, evp_pkey) @@ -870,22 +881,22 @@ class Backend(object): def load_rsa_public_numbers(self, numbers): rsa._check_public_key_components(numbers.e, numbers.n) rsa_cdata = self._lib.RSA_new() - assert rsa_cdata != self._ffi.NULL + self.openssl_assert(rsa_cdata != self._ffi.NULL) rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) rsa_cdata.e = self._int_to_bn(numbers.e) rsa_cdata.n = self._int_to_bn(numbers.n) res = self._lib.RSA_blinding_on(rsa_cdata, self._ffi.NULL) - assert res == 1 + self.openssl_assert(res == 1) evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) return _RSAPublicKey(self, rsa_cdata, evp_pkey) def _rsa_cdata_to_evp_pkey(self, rsa_cdata): evp_pkey = self._lib.EVP_PKEY_new() - assert evp_pkey != self._ffi.NULL + self.openssl_assert(evp_pkey != self._ffi.NULL) evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata) - assert res == 1 + self.openssl_assert(res == 1) return evp_pkey def _bytes_to_bio(self, data): @@ -899,7 +910,7 @@ class Backend(object): bio = self._lib.BIO_new_mem_buf( data_char_p, len(data) ) - assert bio != self._ffi.NULL + self.openssl_assert(bio != self._ffi.NULL) return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_char_p) @@ -908,9 +919,9 @@ class Backend(object): Creates an empty memory BIO. """ bio_method = self._lib.BIO_s_mem() - assert bio_method != self._ffi.NULL + self.openssl_assert(bio_method != self._ffi.NULL) bio = self._lib.BIO_new(bio_method) - assert bio != self._ffi.NULL + self.openssl_assert(bio != self._ffi.NULL) bio = self._ffi.gc(bio, self._lib.BIO_free) return bio @@ -920,8 +931,8 @@ class Backend(object): """ buf = self._ffi.new("char **") buf_len = self._lib.BIO_get_mem_data(bio, buf) - assert buf_len > 0 - assert buf[0] != self._ffi.NULL + self.openssl_assert(buf_len > 0) + self.openssl_assert(buf[0] != self._ffi.NULL) bio_data = self._ffi.buffer(buf[0], buf_len)[:] return bio_data @@ -935,18 +946,18 @@ class Backend(object): if key_type == self._lib.EVP_PKEY_RSA: rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) - assert rsa_cdata != self._ffi.NULL + self.openssl_assert(rsa_cdata != self._ffi.NULL) rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) return _RSAPrivateKey(self, rsa_cdata, evp_pkey) elif key_type == self._lib.EVP_PKEY_DSA: dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey) - assert dsa_cdata != self._ffi.NULL + self.openssl_assert(dsa_cdata != self._ffi.NULL) dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) return _DSAPrivateKey(self, dsa_cdata, evp_pkey) elif (self._lib.Cryptography_HAS_EC == 1 and key_type == self._lib.EVP_PKEY_EC): ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) - assert ec_cdata != self._ffi.NULL + self.openssl_assert(ec_cdata != self._ffi.NULL) ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) else: @@ -962,18 +973,18 @@ class Backend(object): if key_type == self._lib.EVP_PKEY_RSA: rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) - assert rsa_cdata != self._ffi.NULL + self.openssl_assert(rsa_cdata != self._ffi.NULL) rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) return _RSAPublicKey(self, rsa_cdata, evp_pkey) elif key_type == self._lib.EVP_PKEY_DSA: dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey) - assert dsa_cdata != self._ffi.NULL + self.openssl_assert(dsa_cdata != self._ffi.NULL) dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) return _DSAPublicKey(self, dsa_cdata, evp_pkey) elif (self._lib.Cryptography_HAS_EC == 1 and key_type == self._lib.EVP_PKEY_EC): ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) - assert ec_cdata != self._ffi.NULL + self.openssl_assert(ec_cdata != self._ffi.NULL) ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) else: @@ -1047,7 +1058,7 @@ class Backend(object): "support larger key sizes.") ctx = self._lib.DSA_new() - assert ctx != self._ffi.NULL + self.openssl_assert(ctx != self._ffi.NULL) ctx = self._ffi.gc(ctx, self._lib.DSA_free) res = self._lib.DSA_generate_parameters_ex( @@ -1055,13 +1066,13 @@ class Backend(object): self._ffi.NULL, self._ffi.NULL, self._ffi.NULL ) - assert res == 1 + self.openssl_assert(res == 1) return _DSAParameters(self, ctx) def generate_dsa_private_key(self, parameters): ctx = self._lib.DSA_new() - assert ctx != self._ffi.NULL + self.openssl_assert(ctx != self._ffi.NULL) ctx = self._ffi.gc(ctx, self._lib.DSA_free) ctx.p = self._lib.BN_dup(parameters._dsa_cdata.p) ctx.q = self._lib.BN_dup(parameters._dsa_cdata.q) @@ -1081,7 +1092,7 @@ class Backend(object): parameter_numbers = numbers.public_numbers.parameter_numbers dsa_cdata = self._lib.DSA_new() - assert dsa_cdata != self._ffi.NULL + self.openssl_assert(dsa_cdata != self._ffi.NULL) dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) dsa_cdata.p = self._int_to_bn(parameter_numbers.p) @@ -1097,7 +1108,7 @@ class Backend(object): def load_dsa_public_numbers(self, numbers): dsa._check_dsa_parameters(numbers.parameter_numbers) dsa_cdata = self._lib.DSA_new() - assert dsa_cdata != self._ffi.NULL + self.openssl_assert(dsa_cdata != self._ffi.NULL) dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) dsa_cdata.p = self._int_to_bn(numbers.parameter_numbers.p) @@ -1112,7 +1123,7 @@ class Backend(object): def load_dsa_parameter_numbers(self, numbers): dsa._check_dsa_parameters(numbers) dsa_cdata = self._lib.DSA_new() - assert dsa_cdata != self._ffi.NULL + self.openssl_assert(dsa_cdata != self._ffi.NULL) dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) dsa_cdata.p = self._int_to_bn(numbers.p) @@ -1123,10 +1134,10 @@ class Backend(object): def _dsa_cdata_to_evp_pkey(self, dsa_cdata): evp_pkey = self._lib.EVP_PKEY_new() - assert evp_pkey != self._ffi.NULL + self.openssl_assert(evp_pkey != self._ffi.NULL) evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) res = self._lib.EVP_PKEY_set1_DSA(evp_pkey, dsa_cdata) - assert res == 1 + self.openssl_assert(res == 1) return evp_pkey def dsa_hash_supported(self, algorithm): @@ -1172,33 +1183,33 @@ class Backend(object): evp_md = self._lib.EVP_get_digestbyname( algorithm.name.encode('ascii') ) - assert evp_md != self._ffi.NULL + self.openssl_assert(evp_md != self._ffi.NULL) # Create an empty request. x509_req = self._lib.X509_REQ_new() - assert x509_req != self._ffi.NULL + self.openssl_assert(x509_req != self._ffi.NULL) x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free) # Set x509 version. res = self._lib.X509_REQ_set_version(x509_req, x509.Version.v1.value) - assert res == 1 + self.openssl_assert(res == 1) # Set subject name. res = self._lib.X509_REQ_set_subject_name( x509_req, _encode_name_gc(self, builder._subject_name) ) - assert res == 1 + self.openssl_assert(res == 1) # Set subject public key. public_key = private_key.public_key() res = self._lib.X509_REQ_set_pubkey( x509_req, public_key._evp_pkey ) - assert res == 1 + self.openssl_assert(res == 1) # Add extensions. extensions = self._lib.sk_X509_EXTENSION_new_null() - assert extensions != self._ffi.NULL + self.openssl_assert(extensions != self._ffi.NULL) extensions = self._ffi.gc( extensions, self._lib.sk_X509_EXTENSION_free, @@ -1217,11 +1228,11 @@ class Backend(object): 1 if extension.critical else 0, _encode_asn1_str_gc(self, pp[0], r), ) - assert extension != self._ffi.NULL + self.openssl_assert(extension != self._ffi.NULL) res = self._lib.sk_X509_EXTENSION_push(extensions, extension) - assert res >= 1 + self.openssl_assert(res >= 1) res = self._lib.X509_REQ_add_extensions(x509_req, extensions) - assert res == 1 + self.openssl_assert(res == 1) # Sign the request using the requester's private key. res = self._lib.X509_REQ_sign( @@ -1229,8 +1240,10 @@ class Backend(object): ) if res == 0: errors = self._consume_errors() - assert errors[0][1] == self._lib.ERR_LIB_RSA - assert errors[0][3] == self._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY + self.openssl_assert(errors[0][1] == self._lib.ERR_LIB_RSA) + self.openssl_assert( + errors[0][3] == self._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY + ) raise ValueError("Digest too big for RSA key") return _CertificateSigningRequest(self, x509_req) @@ -1257,7 +1270,7 @@ class Backend(object): evp_md = self._lib.EVP_get_digestbyname( algorithm.name.encode('ascii') ) - assert evp_md != self._ffi.NULL + self.openssl_assert(evp_md != self._ffi.NULL) # Create an empty certificate. x509_cert = self._lib.X509_new() @@ -1265,38 +1278,38 @@ class Backend(object): # Set the x509 version. res = self._lib.X509_set_version(x509_cert, builder._version.value) - assert res == 1 + self.openssl_assert(res == 1) # Set the subject's name. res = self._lib.X509_set_subject_name( x509_cert, _encode_name(self, list(builder._subject_name)) ) - assert res == 1 + self.openssl_assert(res == 1) # Set the subject's public key. res = self._lib.X509_set_pubkey( x509_cert, builder._public_key._evp_pkey ) - assert res == 1 + self.openssl_assert(res == 1) # Set the certificate serial number. serial_number = _encode_asn1_int_gc(self, builder._serial_number) res = self._lib.X509_set_serialNumber(x509_cert, serial_number) - assert res == 1 + self.openssl_assert(res == 1) # Set the "not before" time. res = self._lib.ASN1_TIME_set( self._lib.X509_get_notBefore(x509_cert), calendar.timegm(builder._not_valid_before.timetuple()) ) - assert res != self._ffi.NULL + self.openssl_assert(res != self._ffi.NULL) # Set the "not after" time. res = self._lib.ASN1_TIME_set( self._lib.X509_get_notAfter(x509_cert), calendar.timegm(builder._not_valid_after.timetuple()) ) - assert res != self._ffi.NULL + self.openssl_assert(res != self._ffi.NULL) # Add extensions. for i, extension in enumerate(builder._extensions): @@ -1313,16 +1326,16 @@ class Backend(object): 1 if extension.critical else 0, _encode_asn1_str_gc(self, pp[0], r) ) - assert extension != self._ffi.NULL + self.openssl_assert(extension != self._ffi.NULL) extension = self._ffi.gc(extension, self._lib.X509_EXTENSION_free) res = self._lib.X509_add_ext(x509_cert, extension, i) - assert res == 1 + self.openssl_assert(res == 1) # Set the issuer name. res = self._lib.X509_set_issuer_name( x509_cert, _encode_name(self, list(builder._issuer_name)) ) - assert res == 1 + self.openssl_assert(res == 1) # Sign the certificate with the issuer's private key. res = self._lib.X509_sign( @@ -1330,8 +1343,10 @@ class Backend(object): ) if res == 0: errors = self._consume_errors() - assert errors[0][1] == self._lib.ERR_LIB_RSA - assert errors[0][3] == self._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY + self.openssl_assert(errors[0][1] == self._lib.ERR_LIB_RSA) + self.openssl_assert( + errors[0][3] == self._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY + ) raise ValueError("Digest too big for RSA key") return _Certificate(self, x509_cert) @@ -1358,7 +1373,7 @@ class Backend(object): # embedded in a subjectPublicKeyInfo) self._consume_errors() res = self._lib.BIO_reset(mem_bio.bio) - assert res == 1 + self.openssl_assert(res == 1) rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey( mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL ) @@ -1382,7 +1397,7 @@ class Backend(object): # PKCS8 keys using d2i_PKCS8PrivateKey_bio so we do this instead. # Reset the memory BIO so we can read the data again. res = self._lib.BIO_reset(bio_data.bio) - assert res == 1 + self.openssl_assert(res == 1) key = self._evp_pkey_from_der_unencrypted_pkcs8(bio_data, password) if key: @@ -1418,7 +1433,7 @@ class Backend(object): info = self._ffi.gc(info, self._lib.PKCS8_PRIV_KEY_INFO_free) if info != self._ffi.NULL: key = self._lib.EVP_PKCS82PKEY(info) - assert key != self._ffi.NULL + self.openssl_assert(key != self._ffi.NULL) key = self._ffi.gc(key, self._lib.EVP_PKEY_free) if password is not None: raise TypeError( @@ -1441,7 +1456,7 @@ class Backend(object): # embedded in a subjectPublicKeyInfo) self._consume_errors() res = self._lib.BIO_reset(mem_bio.bio) - assert res == 1 + self.openssl_assert(res == 1) rsa_cdata = self._lib.d2i_RSAPublicKey_bio( mem_bio.bio, self._ffi.NULL ) @@ -1511,7 +1526,7 @@ class Backend(object): if evp_pkey == self._ffi.NULL: if password_func.exception is not None: errors = self._consume_errors() - assert errors + self.openssl_assert(errors) raise password_func.exception else: self._handle_key_loading_error() @@ -1522,7 +1537,7 @@ class Backend(object): raise TypeError( "Password was given but private key is not encrypted.") - assert ( + self.openssl_assert( (password is not None and password_func.called == 1) or password is None ) @@ -1581,11 +1596,11 @@ class Backend(object): ) else: - assert errors[0][1] in ( + self.openssl_assert(errors[0][1] in ( self._lib.ERR_LIB_EVP, self._lib.ERR_LIB_PEM, self._lib.ERR_LIB_ASN1, - ) + )) raise ValueError("Could not unserialize key data.") def elliptic_curve_supported(self, curve): @@ -1601,7 +1616,7 @@ class Backend(object): if ctx == self._ffi.NULL: errors = self._consume_errors() - assert ( + self.openssl_assert( curve_nid == self._lib.NID_undef or errors[0][1:] == ( self._lib.ERR_LIB_EC, @@ -1611,7 +1626,7 @@ class Backend(object): ) return False else: - assert curve_nid != self._lib.NID_undef + self.openssl_assert(curve_nid != self._lib.NID_undef) self._lib.EC_GROUP_free(ctx) return True @@ -1643,14 +1658,14 @@ class Backend(object): curve_nid = self._elliptic_curve_to_nid(curve) ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid) - assert ec_cdata != self._ffi.NULL + self.openssl_assert(ec_cdata != self._ffi.NULL) ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) res = self._lib.EC_KEY_generate_key(ec_cdata) - assert res == 1 + self.openssl_assert(res == 1) res = self._lib.EC_KEY_check_key(ec_cdata) - assert res == 1 + self.openssl_assert(res == 1) evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) @@ -1667,7 +1682,7 @@ class Backend(object): curve_nid = self._elliptic_curve_to_nid(public.curve) ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid) - assert ec_cdata != self._ffi.NULL + self.openssl_assert(ec_cdata != self._ffi.NULL) ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) ec_cdata = self._ec_key_set_public_key_affine_coordinates( @@ -1675,7 +1690,7 @@ class Backend(object): res = self._lib.EC_KEY_set_private_key( ec_cdata, self._int_to_bn(numbers.private_value)) - assert res == 1 + self.openssl_assert(res == 1) evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) @@ -1684,7 +1699,7 @@ class Backend(object): curve_nid = self._elliptic_curve_to_nid(numbers.curve) ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid) - assert ec_cdata != self._ffi.NULL + self.openssl_assert(ec_cdata != self._ffi.NULL) ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) ec_cdata = self._ec_key_set_public_key_affine_coordinates( @@ -1695,10 +1710,10 @@ class Backend(object): def _ec_cdata_to_evp_pkey(self, ec_cdata): evp_pkey = self._lib.EVP_PKEY_new() - assert evp_pkey != self._ffi.NULL + self.openssl_assert(evp_pkey != self._ffi.NULL) evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata) - assert res == 1 + self.openssl_assert(res == 1) return evp_pkey def _elliptic_curve_to_nid(self, curve): @@ -1724,7 +1739,7 @@ class Backend(object): @contextmanager def _tmp_bn_ctx(self): bn_ctx = self._lib.BN_CTX_new() - assert bn_ctx != self._ffi.NULL + self.openssl_assert(bn_ctx != self._ffi.NULL) bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free) self._lib.BN_CTX_start(bn_ctx) try: @@ -1737,19 +1752,19 @@ class Backend(object): Given an EC_KEY determine the group and what methods are required to get/set point coordinates. """ - assert ctx != self._ffi.NULL + self.openssl_assert(ctx != self._ffi.NULL) nid_two_field = self._lib.OBJ_sn2nid(b"characteristic-two-field") - assert nid_two_field != self._lib.NID_undef + self.openssl_assert(nid_two_field != self._lib.NID_undef) group = self._lib.EC_KEY_get0_group(ctx) - assert group != self._ffi.NULL + self.openssl_assert(group != self._ffi.NULL) method = self._lib.EC_GROUP_method_of(group) - assert method != self._ffi.NULL + self.openssl_assert(method != self._ffi.NULL) nid = self._lib.EC_METHOD_get_field_type(method) - assert nid != self._lib.NID_undef + self.openssl_assert(nid != self._lib.NID_undef) if nid == nid_two_field and self._lib.Cryptography_HAS_EC2M: set_func = self._lib.EC_POINT_set_affine_coordinates_GF2m @@ -1758,7 +1773,7 @@ class Backend(object): set_func = self._lib.EC_POINT_set_affine_coordinates_GFp get_func = self._lib.EC_POINT_get_affine_coordinates_GFp - assert set_func and get_func + self.openssl_assert(set_func and get_func) return set_func, get_func, group @@ -1781,7 +1796,7 @@ class Backend(object): ) point = self._lib.EC_POINT_new(group) - assert point != self._ffi.NULL + self.openssl_assert(point != self._ffi.NULL) point = self._ffi.gc(point, self._lib.EC_POINT_free) bn_x = self._int_to_bn(x) @@ -1792,18 +1807,18 @@ class Backend(object): check_y = self._lib.BN_CTX_get(bn_ctx) res = set_func(group, point, bn_x, bn_y, bn_ctx) - assert res == 1 + self.openssl_assert(res == 1) res = get_func(group, point, check_x, check_y, bn_ctx) - assert res == 1 + self.openssl_assert(res == 1) res = self._lib.BN_cmp(bn_x, check_x) - assert res == 0 + self.openssl_assert(res == 0) res = self._lib.BN_cmp(bn_y, check_y) - assert res == 0 + self.openssl_assert(res == 0) res = self._lib.EC_KEY_set_public_key(ctx, point) - assert res == 1 + self.openssl_assert(res == 1) res = self._lib.EC_KEY_check_key(ctx) if res != 1: @@ -1851,14 +1866,16 @@ class Backend(object): write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey key = evp_pkey else: - assert format is serialization.PrivateFormat.TraditionalOpenSSL + self.openssl_assert( + format is serialization.PrivateFormat.TraditionalOpenSSL + ) if evp_pkey.type == self._lib.EVP_PKEY_RSA: write_bio = self._lib.PEM_write_bio_RSAPrivateKey elif evp_pkey.type == self._lib.EVP_PKEY_DSA: write_bio = self._lib.PEM_write_bio_DSAPrivateKey else: - assert self._lib.Cryptography_HAS_EC == 1 - assert evp_pkey.type == self._lib.EVP_PKEY_EC + self.openssl_assert(self._lib.Cryptography_HAS_EC == 1) + self.openssl_assert(evp_pkey.type == self._lib.EVP_PKEY_EC) write_bio = self._lib.PEM_write_bio_ECPrivateKey key = cdata @@ -1876,7 +1893,9 @@ class Backend(object): evp_pkey.type, cdata ) else: - assert format is serialization.PrivateFormat.PKCS8 + self.openssl_assert( + format is serialization.PrivateFormat.PKCS8 + ) write_bio = self._lib.i2d_PKCS8PrivateKey_bio key = evp_pkey else: @@ -1892,7 +1911,7 @@ class Backend(object): self._ffi.NULL, self._ffi.NULL ) - assert res == 1 + self.openssl_assert(res == 1) return self._read_mem_bio(bio) def _private_key_bytes_traditional_der(self, key_type, cdata): @@ -1902,12 +1921,12 @@ class Backend(object): key_type == self._lib.EVP_PKEY_EC): write_bio = self._lib.i2d_ECPrivateKey_bio else: - assert key_type == self._lib.EVP_PKEY_DSA + self.openssl_assert(key_type == self._lib.EVP_PKEY_DSA) write_bio = self._lib.i2d_DSAPrivateKey_bio bio = self._create_mem_bio() res = write_bio(bio, cdata) - assert res == 1 + self.openssl_assert(res == 1) return self._read_mem_bio(bio) def _public_key_bytes(self, encoding, format, evp_pkey, cdata): @@ -1918,17 +1937,17 @@ class Backend(object): if encoding is serialization.Encoding.PEM: write_bio = self._lib.PEM_write_bio_PUBKEY else: - assert encoding is serialization.Encoding.DER + self.openssl_assert(encoding is serialization.Encoding.DER) write_bio = self._lib.i2d_PUBKEY_bio key = evp_pkey elif format is serialization.PublicFormat.PKCS1: # Only RSA is supported here. - assert evp_pkey.type == self._lib.EVP_PKEY_RSA + self.openssl_assert(evp_pkey.type == self._lib.EVP_PKEY_RSA) if encoding is serialization.Encoding.PEM: write_bio = self._lib.PEM_write_bio_RSAPublicKey else: - assert encoding is serialization.Encoding.DER + self.openssl_assert(encoding is serialization.Encoding.DER) write_bio = self._lib.i2d_RSAPublicKey_bio key = cdata @@ -1939,12 +1958,12 @@ class Backend(object): bio = self._create_mem_bio() res = write_bio(bio, key) - assert res == 1 + self.openssl_assert(res == 1) return self._read_mem_bio(bio) def _asn1_integer_to_int(self, asn1_int): bn = self._lib.ASN1_INTEGER_to_BN(asn1_int, self._ffi.NULL) - assert bn != self._ffi.NULL + self.openssl_assert(bn != self._ffi.NULL) bn = self._ffi.gc(bn, self._lib.BN_free) return self._bn_to_int(bn) @@ -1957,8 +1976,8 @@ class Backend(object): def _asn1_string_to_utf8(self, asn1_string): buf = self._ffi.new("unsigned char **") res = self._lib.ASN1_STRING_to_UTF8(buf, asn1_string) - assert res >= 0 - assert buf[0] != self._ffi.NULL + self.openssl_assert(res >= 0) + self.openssl_assert(buf[0] != self._ffi.NULL) buf = self._ffi.gc( buf, lambda buffer: self._lib.OPENSSL_free(buffer[0]) ) @@ -1967,19 +1986,19 @@ class Backend(object): def _asn1_to_der(self, asn1_type): buf = self._ffi.new("unsigned char **") res = self._lib.i2d_ASN1_TYPE(asn1_type, buf) - assert res >= 0 - assert buf[0] != self._ffi.NULL + self.openssl_assert(res >= 0) + self.openssl_assert(buf[0] != self._ffi.NULL) buf = self._ffi.gc( buf, lambda buffer: self._lib.OPENSSL_free(buffer[0]) ) return self._ffi.buffer(buf[0], res)[:] def _parse_asn1_time(self, asn1_time): - assert asn1_time != self._ffi.NULL + self.openssl_assert(asn1_time != self._ffi.NULL) generalized_time = self._lib.ASN1_TIME_to_generalizedtime( asn1_time, self._ffi.NULL ) - assert generalized_time != self._ffi.NULL + self.openssl_assert(generalized_time != self._ffi.NULL) generalized_time = self._ffi.gc( generalized_time, self._lib.ASN1_GENERALIZEDTIME_free ) -- cgit v1.2.3 From 3c39eba249bfd4582cfb4f169d7c47492b5369e3 Mon Sep 17 00:00:00 2001 From: Paul Kehrer Date: Fri, 25 Sep 2015 15:22:36 -0500 Subject: change some asserts back since they're not openssl specific plus bonus better exception msg --- .../hazmat/backends/openssl/backend.py | 37 +++++++++++----------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index 81d191eb..58de3d3e 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -543,7 +543,12 @@ class Backend(object): def openssl_assert(self, ok): if not ok: errors = self._consume_errors() - raise UnhandledOpenSSLError("Unknown OpenSSL error", errors) + raise UnhandledOpenSSLError( + "Unknown OpenSSL error. Please file an issue at https://github" + ".com/pyca/cryptography/issues with information on how to " + "reproduce this.", + errors + ) def activate_builtin_random(self): # Obtain a new structural reference. @@ -776,7 +781,7 @@ class Backend(object): ) def _bn_to_int(self, bn): - self.openssl_assert(bn != self._ffi.NULL) + assert bn != self._ffi.NULL if six.PY3: # Python 3 has constant time from_bytes, so use that. @@ -804,7 +809,7 @@ class Backend(object): ownership of the object). Be sure to register it for GC if it will be discarded after use. """ - self.openssl_assert(bn is None or bn != self._ffi.NULL) + assert bn is None or bn != self._ffi.NULL if bn is None: bn = self._ffi.NULL @@ -1537,7 +1542,7 @@ class Backend(object): raise TypeError( "Password was given but private key is not encrypted.") - self.openssl_assert( + assert ( (password is not None and password_func.called == 1) or password is None ) @@ -1596,11 +1601,11 @@ class Backend(object): ) else: - self.openssl_assert(errors[0][1] in ( + assert errors[0][1] in ( self._lib.ERR_LIB_EVP, self._lib.ERR_LIB_PEM, self._lib.ERR_LIB_ASN1, - )) + ) raise ValueError("Could not unserialize key data.") def elliptic_curve_supported(self, curve): @@ -1773,7 +1778,7 @@ class Backend(object): set_func = self._lib.EC_POINT_set_affine_coordinates_GFp get_func = self._lib.EC_POINT_get_affine_coordinates_GFp - self.openssl_assert(set_func and get_func) + assert set_func and get_func return set_func, get_func, group @@ -1866,16 +1871,14 @@ class Backend(object): write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey key = evp_pkey else: - self.openssl_assert( - format is serialization.PrivateFormat.TraditionalOpenSSL - ) + assert format is serialization.PrivateFormat.TraditionalOpenSSL if evp_pkey.type == self._lib.EVP_PKEY_RSA: write_bio = self._lib.PEM_write_bio_RSAPrivateKey elif evp_pkey.type == self._lib.EVP_PKEY_DSA: write_bio = self._lib.PEM_write_bio_DSAPrivateKey else: - self.openssl_assert(self._lib.Cryptography_HAS_EC == 1) - self.openssl_assert(evp_pkey.type == self._lib.EVP_PKEY_EC) + assert self._lib.Cryptography_HAS_EC == 1 + assert evp_pkey.type == self._lib.EVP_PKEY_EC write_bio = self._lib.PEM_write_bio_ECPrivateKey key = cdata @@ -1893,9 +1896,7 @@ class Backend(object): evp_pkey.type, cdata ) else: - self.openssl_assert( - format is serialization.PrivateFormat.PKCS8 - ) + assert format is serialization.PrivateFormat.PKCS8 write_bio = self._lib.i2d_PKCS8PrivateKey_bio key = evp_pkey else: @@ -1937,17 +1938,17 @@ class Backend(object): if encoding is serialization.Encoding.PEM: write_bio = self._lib.PEM_write_bio_PUBKEY else: - self.openssl_assert(encoding is serialization.Encoding.DER) + assert encoding is serialization.Encoding.DER write_bio = self._lib.i2d_PUBKEY_bio key = evp_pkey elif format is serialization.PublicFormat.PKCS1: # Only RSA is supported here. - self.openssl_assert(evp_pkey.type == self._lib.EVP_PKEY_RSA) + assert evp_pkey.type == self._lib.EVP_PKEY_RSA if encoding is serialization.Encoding.PEM: write_bio = self._lib.PEM_write_bio_RSAPublicKey else: - self.openssl_assert(encoding is serialization.Encoding.DER) + assert encoding is serialization.Encoding.DER write_bio = self._lib.i2d_RSAPublicKey_bio key = cdata -- cgit v1.2.3 From 0c27fe5a43a7a95ee7816cc4c8ae4348cf559551 Mon Sep 17 00:00:00 2001 From: Paul Kehrer Date: Fri, 25 Sep 2015 15:24:48 -0500 Subject: remove unnecessary check --- src/cryptography/hazmat/backends/openssl/backend.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index 58de3d3e..a476b1e9 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -790,7 +790,6 @@ class Backend(object): bin_len = self._lib.BN_bn2bin(bn, bin_ptr) # A zero length means the BN has value 0 self.openssl_assert(bin_len >= 0) - self.openssl_assert(bin_ptr != self._ffi.NULL) return int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big") else: -- cgit v1.2.3 From 7712edc5fa2bc5244221c35cf97e1b58f5981446 Mon Sep 17 00:00:00 2001 From: Paul Kehrer Date: Fri, 25 Sep 2015 16:24:10 -0500 Subject: add test for openssl_assert --- tests/hazmat/backends/test_openssl.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/hazmat/backends/test_openssl.py b/tests/hazmat/backends/test_openssl.py index debea5a2..5264ba55 100644 --- a/tests/hazmat/backends/test_openssl.py +++ b/tests/hazmat/backends/test_openssl.py @@ -17,7 +17,7 @@ from cryptography import utils from cryptography.exceptions import InternalError, _Reasons from cryptography.hazmat.backends.interfaces import RSABackend from cryptography.hazmat.backends.openssl.backend import ( - Backend, backend + Backend, UnhandledOpenSSLError, backend ) from cryptography.hazmat.backends.openssl.ec import _sn_to_elliptic_curve from cryptography.hazmat.primitives import hashes, serialization @@ -122,6 +122,11 @@ class TestOpenSSL(object): with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER): cipher.encryptor() + def test_openssl_assert(self): + backend.openssl_assert(True) + with pytest.raises(UnhandledOpenSSLError): + backend.openssl_assert(False) + def test_consume_errors(self): for i in range(10): backend._lib.ERR_put_error(backend._lib.ERR_LIB_EVP, 0, 0, -- cgit v1.2.3