diff options
Diffstat (limited to 'cryptography/hazmat/bindings/openssl/backend.py')
-rw-r--r-- | cryptography/hazmat/bindings/openssl/backend.py | 217 |
1 files changed, 108 insertions, 109 deletions
diff --git a/cryptography/hazmat/bindings/openssl/backend.py b/cryptography/hazmat/bindings/openssl/backend.py index 71b94abe..844e175f 100644 --- a/cryptography/hazmat/bindings/openssl/backend.py +++ b/cryptography/hazmat/bindings/openssl/backend.py @@ -63,9 +63,8 @@ class Backend(object): def __init__(self): self._ensure_ffi_initialized() - self.ciphers = Ciphers(self) - self.hashes = Hashes(self) - self.hmacs = HMACs(self) + self._cipher_registry = {} + self._register_default_ciphers() @classmethod def _ensure_ffi_initialized(cls): @@ -123,6 +122,70 @@ class Backend(object): """ return self.ffi.string(self.lib.OPENSSL_VERSION_TEXT).decode("ascii") + def create_hmac_ctx(self, key, algorithm): + return _HMACContext(self, key, algorithm) + + def hash_supported(self, algorithm): + digest = self.lib.EVP_get_digestbyname(algorithm.name.encode("ascii")) + return digest != self.ffi.NULL + + def create_hash_ctx(self, algorithm): + return _HashContext(self, algorithm) + + def cipher_supported(self, cipher, mode): + try: + adapter = self._cipher_registry[type(cipher), type(mode)] + except KeyError: + return False + evp_cipher = adapter(self, cipher, mode) + return self.ffi.NULL != evp_cipher + + def register_cipher_adapter(self, cipher_cls, mode_cls, adapter): + if (cipher_cls, mode_cls) in self._cipher_registry: + raise ValueError("Duplicate registration for: {0} {1}".format( + cipher_cls, mode_cls) + ) + self._cipher_registry[cipher_cls, mode_cls] = adapter + + def _register_default_ciphers(self): + for cipher_cls, mode_cls in itertools.product( + [AES, Camellia], + [CBC, CTR, ECB, OFB, CFB], + ): + self.register_cipher_adapter( + cipher_cls, + mode_cls, + GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}") + ) + for mode_cls in [CBC, CFB, OFB]: + self.register_cipher_adapter( + TripleDES, + mode_cls, + GetCipherByName("des-ede3-{mode.name}") + ) + for mode_cls in [CBC, CFB, OFB, ECB]: + self.register_cipher_adapter( + Blowfish, + mode_cls, + GetCipherByName("bf-{mode.name}") + ) + self.register_cipher_adapter( + CAST5, + ECB, + GetCipherByName("cast5-ecb") + ) + self.register_cipher_adapter( + ARC4, + type(None), + GetCipherByName("rc4") + ) + + def create_symmetric_encryption_ctx(self, cipher, mode): + return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT) + + def create_symmetric_decryption_ctx(self, cipher, mode): + return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT) + class GetCipherByName(object): def __init__(self, fmt): @@ -145,7 +208,7 @@ class _CipherContext(object): ctx = self._backend.lib.EVP_CIPHER_CTX_new() ctx = self._backend.ffi.gc(ctx, self._backend.lib.EVP_CIPHER_CTX_free) - registry = self._backend.ciphers._cipher_registry + registry = self._backend._cipher_registry try: adapter = registry[type(cipher), type(mode)] except KeyError: @@ -204,69 +267,6 @@ class _CipherContext(object): return self._backend.ffi.buffer(buf)[:outlen[0]] -class Ciphers(object): - def __init__(self, backend): - self._backend = backend - self._cipher_registry = {} - self._register_default_ciphers() - - def supported(self, cipher, mode): - try: - adapter = self._cipher_registry[type(cipher), type(mode)] - except KeyError: - return False - evp_cipher = adapter(self._backend, cipher, mode) - return self._backend.ffi.NULL != evp_cipher - - def register_cipher_adapter(self, cipher_cls, mode_cls, adapter): - if (cipher_cls, mode_cls) in self._cipher_registry: - raise ValueError("Duplicate registration for: {0} {1}".format( - cipher_cls, mode_cls) - ) - self._cipher_registry[cipher_cls, mode_cls] = adapter - - def _register_default_ciphers(self): - for cipher_cls, mode_cls in itertools.product( - [AES, Camellia], - [CBC, CTR, ECB, OFB, CFB], - ): - self.register_cipher_adapter( - cipher_cls, - mode_cls, - GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}") - ) - for mode_cls in [CBC, CFB, OFB]: - self.register_cipher_adapter( - TripleDES, - mode_cls, - GetCipherByName("des-ede3-{mode.name}") - ) - for mode_cls in [CBC, CFB, OFB, ECB]: - self.register_cipher_adapter( - Blowfish, - mode_cls, - GetCipherByName("bf-{mode.name}") - ) - self.register_cipher_adapter( - CAST5, - ECB, - GetCipherByName("cast5-ecb") - ) - self.register_cipher_adapter( - ARC4, - type(None), - GetCipherByName("rc4") - ) - - def create_encrypt_ctx(self, cipher, mode): - return _CipherContext(self._backend, cipher, mode, - _CipherContext._ENCRYPT) - - def create_decrypt_ctx(self, cipher, mode): - return _CipherContext(self._backend, cipher, mode, - _CipherContext._DECRYPT) - - @interfaces.register(interfaces.HashContext) class _HashContext(object): def __init__(self, backend, algorithm, ctx=None): @@ -307,60 +307,59 @@ class _HashContext(object): assert res != 0 res = self._backend.lib.EVP_MD_CTX_cleanup(self._ctx) assert res == 1 - return self._backend.ffi.buffer(buf)[:self.algorithm.digest_size] + return self._backend.ffi.buffer(buf)[:] -class Hashes(object): - def __init__(self, backend): +@interfaces.register(interfaces.HashContext) +class _HMACContext(object): + def __init__(self, backend, key, algorithm, ctx=None): + self.algorithm = algorithm self._backend = backend - def supported(self, algorithm): - digest = self._backend.lib.EVP_get_digestbyname( - algorithm.name.encode("ascii") - ) - return digest != self._backend.ffi.NULL - - def create_ctx(self, algorithm): - return _HashContext(self._backend, algorithm) - + if ctx is None: + ctx = self._backend.ffi.new("HMAC_CTX *") + self._backend.lib.HMAC_CTX_init(ctx) + ctx = self._backend.ffi.gc(ctx, self._backend.lib.HMAC_CTX_cleanup) + evp_md = self._backend.lib.EVP_get_digestbyname( + algorithm.name.encode('ascii')) + assert evp_md != self._backend.ffi.NULL + res = self._backend.lib.Cryptography_HMAC_Init_ex( + ctx, key, len(key), evp_md, self._backend.ffi.NULL + ) + assert res != 0 -class HMACs(object): - def __init__(self, backend): - self._backend = backend + self._ctx = ctx + self._key = key - def create_ctx(self, key, hash_cls): - ctx = self._backend.ffi.new("HMAC_CTX *") - self._backend.lib.HMAC_CTX_init(ctx) - ctx = self._backend.ffi.gc(ctx, self._backend.lib.HMAC_CTX_cleanup) - evp_md = self._backend.lib.EVP_get_digestbyname( - hash_cls.name.encode('ascii')) - assert evp_md != self._backend.ffi.NULL - res = self._backend.lib.Cryptography_HMAC_Init_ex( - ctx, key, len(key), evp_md, self._backend.ffi.NULL + def copy(self): + copied_ctx = self._backend.ffi.new("HMAC_CTX *") + self._backend.lib.HMAC_CTX_init(copied_ctx) + copied_ctx = self._backend.ffi.gc( + copied_ctx, self._backend.lib.HMAC_CTX_cleanup + ) + res = self._backend.lib.Cryptography_HMAC_CTX_copy( + copied_ctx, self._ctx ) assert res != 0 - return ctx - - def update_ctx(self, ctx, data): - res = self._backend.lib.Cryptography_HMAC_Update(ctx, data, len(data)) - assert res != 0 + return _HMACContext( + self._backend, self._key, self.algorithm, ctx=copied_ctx + ) - def finalize_ctx(self, ctx, digest_size): - buf = self._backend.ffi.new("unsigned char[]", digest_size) - buflen = self._backend.ffi.new("unsigned int *", digest_size) - res = self._backend.lib.Cryptography_HMAC_Final(ctx, buf, buflen) + def update(self, data): + res = self._backend.lib.Cryptography_HMAC_Update( + self._ctx, data, len(data) + ) assert res != 0 - self._backend.lib.HMAC_CTX_cleanup(ctx) - return self._backend.ffi.buffer(buf)[:digest_size] - def copy_ctx(self, ctx): - copied_ctx = self._backend.ffi.new("HMAC_CTX *") - self._backend.lib.HMAC_CTX_init(copied_ctx) - copied_ctx = self._backend.ffi.gc(copied_ctx, - self._backend.lib.HMAC_CTX_cleanup) - res = self._backend.lib.Cryptography_HMAC_CTX_copy(copied_ctx, ctx) + def finalize(self): + buf = self._backend.ffi.new("unsigned char[]", + self.algorithm.digest_size) + buflen = self._backend.ffi.new("unsigned int *", + self.algorithm.digest_size) + res = self._backend.lib.Cryptography_HMAC_Final(self._ctx, buf, buflen) assert res != 0 - return copied_ctx + self._backend.lib.HMAC_CTX_cleanup(self._ctx) + return self._backend.ffi.buffer(buf)[:] backend = Backend() |