From 9470f67a3086e4c003ab27ca6a2209dae9b1a9e6 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Mon, 25 May 2020 21:17:25 +0300 Subject: Cleanup serialize (#5149) * Additional tests for public/private_bytes They expose few places that raise TypeError and AssertionError! before, and ValueError later. * Cleanup of private_bytes() backend Also pass key itself down to backend. * Cleanup of public_bytes() backend * Test handling of unsupported key type --- .../hazmat/backends/openssl/backend.py | 200 ++++++++++----------- 1 file changed, 95 insertions(+), 105 deletions(-) (limited to 'src/cryptography/hazmat/backends/openssl/backend.py') diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index 6fd191f0..e5b64e3d 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -1746,26 +1746,14 @@ class Backend(object): return ctx def _private_key_bytes(self, encoding, format, encryption_algorithm, - evp_pkey, cdata): + key, evp_pkey, cdata): + # validate argument types + if not isinstance(encoding, serialization.Encoding): + raise TypeError("encoding must be an item from the Encoding enum") if not isinstance(format, serialization.PrivateFormat): raise TypeError( "format must be an item from the PrivateFormat enum" ) - - # X9.62 encoding is only valid for EC public keys - if encoding is serialization.Encoding.X962: - raise ValueError("X9.62 format is only valid for EC public keys") - - # Raw format and encoding are only valid for X25519, Ed25519, X448, and - # Ed448 keys. We capture those cases before this method is called so if - # we see those enum values here it means the caller has passed them to - # a key that doesn't support raw type - if format is serialization.PrivateFormat.Raw: - raise ValueError("raw format is invalid with this key or encoding") - - if encoding is serialization.Encoding.Raw: - raise ValueError("raw encoding is invalid with this key or format") - if not isinstance(encryption_algorithm, serialization.KeySerializationEncryption): raise TypeError( @@ -1773,19 +1761,13 @@ class Backend(object): "instance" ) + # validate password if isinstance(encryption_algorithm, serialization.NoEncryption): password = b"" - passlen = 0 - evp_cipher = self._ffi.NULL elif isinstance(encryption_algorithm, serialization.BestAvailableEncryption): - # This is a curated value that we will update over time. - evp_cipher = self._lib.EVP_get_cipherbyname( - b"aes-256-cbc" - ) password = encryption_algorithm.password - passlen = len(password) - if passlen > 1023: + if len(password) > 1023: raise ValueError( "Passwords longer than 1023 bytes are not supported by " "this backend" @@ -1793,127 +1775,135 @@ class Backend(object): else: raise ValueError("Unsupported encryption type") - key_type = self._lib.EVP_PKEY_id(evp_pkey) - if encoding is serialization.Encoding.PEM: - if format is serialization.PrivateFormat.PKCS8: + # PKCS8 + PEM/DER + if format is serialization.PrivateFormat.PKCS8: + if encoding is serialization.Encoding.PEM: write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey - key = evp_pkey + elif encoding is serialization.Encoding.DER: + write_bio = self._lib.i2d_PKCS8PrivateKey_bio else: - assert format is serialization.PrivateFormat.TraditionalOpenSSL + raise ValueError("Unsupported encoding for PKCS8") + return self._private_key_bytes_via_bio( + write_bio, evp_pkey, password + ) + + # TraditionalOpenSSL + PEM/DER + if format is serialization.PrivateFormat.TraditionalOpenSSL: + key_type = self._lib.EVP_PKEY_id(evp_pkey) + + if encoding is serialization.Encoding.PEM: if key_type == self._lib.EVP_PKEY_RSA: write_bio = self._lib.PEM_write_bio_RSAPrivateKey elif key_type == self._lib.EVP_PKEY_DSA: write_bio = self._lib.PEM_write_bio_DSAPrivateKey - else: - assert key_type == self._lib.EVP_PKEY_EC + elif key_type == self._lib.EVP_PKEY_EC: write_bio = self._lib.PEM_write_bio_ECPrivateKey + else: + raise ValueError( + "Unsupported key type for TraditionalOpenSSL" + ) + return self._private_key_bytes_via_bio( + write_bio, cdata, password + ) - key = cdata - elif encoding is serialization.Encoding.DER: - if format is serialization.PrivateFormat.TraditionalOpenSSL: - if not isinstance( - encryption_algorithm, serialization.NoEncryption - ): + if encoding is serialization.Encoding.DER: + if password: raise ValueError( "Encryption is not supported for DER encoded " "traditional OpenSSL keys" ) + if key_type == self._lib.EVP_PKEY_RSA: + write_bio = self._lib.i2d_RSAPrivateKey_bio + elif key_type == self._lib.EVP_PKEY_EC: + write_bio = self._lib.i2d_ECPrivateKey_bio + elif key_type == self._lib.EVP_PKEY_DSA: + write_bio = self._lib.i2d_DSAPrivateKey_bio + else: + raise ValueError( + "Unsupported key type for TraditionalOpenSSL" + ) + return self._bio_func_output(write_bio, cdata) - return self._private_key_bytes_traditional_der(key_type, cdata) - else: - assert format is serialization.PrivateFormat.PKCS8 - write_bio = self._lib.i2d_PKCS8PrivateKey_bio - key = evp_pkey + raise ValueError( + "Unsupported encoding for TraditionalOpenSSL" + ) + + # Anything that key-specific code was supposed to handle earlier, + # like Raw. + raise ValueError("format is invalid with this key") + + def _private_key_bytes_via_bio(self, write_bio, evp_pkey, password): + if not password: + evp_cipher = self._ffi.NULL else: - raise TypeError("encoding must be Encoding.PEM or Encoding.DER") + # This is a curated value that we will update over time. + evp_cipher = self._lib.EVP_get_cipherbyname(b"aes-256-cbc") - bio = self._create_mem_bio_gc() - res = write_bio( - bio, - key, + return self._bio_func_output( + write_bio, + evp_pkey, evp_cipher, password, - passlen, + len(password), self._ffi.NULL, self._ffi.NULL ) - self.openssl_assert(res == 1) - return self._read_mem_bio(bio) - - def _private_key_bytes_traditional_der(self, key_type, cdata): - if key_type == self._lib.EVP_PKEY_RSA: - write_bio = self._lib.i2d_RSAPrivateKey_bio - elif key_type == self._lib.EVP_PKEY_EC: - write_bio = self._lib.i2d_ECPrivateKey_bio - else: - self.openssl_assert(key_type == self._lib.EVP_PKEY_DSA) - write_bio = self._lib.i2d_DSAPrivateKey_bio + def _bio_func_output(self, write_bio, *args): bio = self._create_mem_bio_gc() - res = write_bio(bio, cdata) + res = write_bio(bio, *args) self.openssl_assert(res == 1) return self._read_mem_bio(bio) def _public_key_bytes(self, encoding, format, key, evp_pkey, cdata): if not isinstance(encoding, serialization.Encoding): raise TypeError("encoding must be an item from the Encoding enum") + if not isinstance(format, serialization.PublicFormat): + raise TypeError( + "format must be an item from the PublicFormat enum" + ) - # Compressed/UncompressedPoint are only valid for EC keys and those - # cases are handled by the ECPublicKey public_bytes method before this - # method is called - if format in (serialization.PublicFormat.UncompressedPoint, - serialization.PublicFormat.CompressedPoint): - raise ValueError("Point formats are not valid for this key type") - - # Raw format and encoding are only valid for X25519, Ed25519, X448, and - # Ed448 keys. We capture those cases before this method is called so if - # we see those enum values here it means the caller has passed them to - # a key that doesn't support raw type - if format is serialization.PublicFormat.Raw: - raise ValueError("raw format is invalid with this key or encoding") - - if encoding is serialization.Encoding.Raw: - raise ValueError("raw encoding is invalid with this key or format") - - if ( - format is serialization.PublicFormat.OpenSSH or - encoding is serialization.Encoding.OpenSSH - ): - if ( - format is not serialization.PublicFormat.OpenSSH or - encoding is not serialization.Encoding.OpenSSH - ): - raise ValueError( - "OpenSSH format must be used with OpenSSH encoding" - ) - return self._openssh_public_key_bytes(key) - elif format is serialization.PublicFormat.SubjectPublicKeyInfo: + # SubjectPublicKeyInfo + PEM/DER + if format is serialization.PublicFormat.SubjectPublicKeyInfo: if encoding is serialization.Encoding.PEM: write_bio = self._lib.PEM_write_bio_PUBKEY - else: - assert encoding is serialization.Encoding.DER + elif encoding is serialization.Encoding.DER: write_bio = self._lib.i2d_PUBKEY_bio + else: + raise ValueError( + "SubjectPublicKeyInfo works only with PEM or DER encoding" + ) + return self._bio_func_output(write_bio, evp_pkey) - key = evp_pkey - elif format is serialization.PublicFormat.PKCS1: + # PKCS1 + PEM/DER + if format is serialization.PublicFormat.PKCS1: # Only RSA is supported here. - assert self._lib.EVP_PKEY_id(evp_pkey) == self._lib.EVP_PKEY_RSA + key_type = self._lib.EVP_PKEY_id(evp_pkey) + if key_type != self._lib.EVP_PKEY_RSA: + raise ValueError("PKCS1 format is supported only for RSA keys") + if encoding is serialization.Encoding.PEM: write_bio = self._lib.PEM_write_bio_RSAPublicKey - else: - assert encoding is serialization.Encoding.DER + elif encoding is serialization.Encoding.DER: write_bio = self._lib.i2d_RSAPublicKey_bio + else: + raise ValueError( + "PKCS1 works only with PEM or DER encoding" + ) + return self._bio_func_output(write_bio, cdata) - key = cdata - else: - raise TypeError( - "format must be an item from the PublicFormat enum" + # OpenSSH + OpenSSH + if format is serialization.PublicFormat.OpenSSH: + if encoding is serialization.Encoding.OpenSSH: + return self._openssh_public_key_bytes(key) + + raise ValueError( + "OpenSSH format must be used with OpenSSH encoding" ) - bio = self._create_mem_bio_gc() - res = write_bio(bio, key) - self.openssl_assert(res == 1) - return self._read_mem_bio(bio) + # Anything that key-specific code was supposed to handle earlier, + # like Raw, CompressedPoint, UncompressedPoint + raise ValueError("format is invalid with this key") def _openssh_public_key_bytes(self, key): if isinstance(key, rsa.RSAPublicKey): -- cgit v1.2.3