aboutsummaryrefslogtreecommitdiffstats
path: root/cryptography/hazmat/bindings/openssl/backend.py
diff options
context:
space:
mode:
Diffstat (limited to 'cryptography/hazmat/bindings/openssl/backend.py')
-rw-r--r--cryptography/hazmat/bindings/openssl/backend.py217
1 files changed, 108 insertions, 109 deletions
diff --git a/cryptography/hazmat/bindings/openssl/backend.py b/cryptography/hazmat/bindings/openssl/backend.py
index 71b94abe..844e175f 100644
--- a/cryptography/hazmat/bindings/openssl/backend.py
+++ b/cryptography/hazmat/bindings/openssl/backend.py
@@ -63,9 +63,8 @@ class Backend(object):
def __init__(self):
self._ensure_ffi_initialized()
- self.ciphers = Ciphers(self)
- self.hashes = Hashes(self)
- self.hmacs = HMACs(self)
+ self._cipher_registry = {}
+ self._register_default_ciphers()
@classmethod
def _ensure_ffi_initialized(cls):
@@ -123,6 +122,70 @@ class Backend(object):
"""
return self.ffi.string(self.lib.OPENSSL_VERSION_TEXT).decode("ascii")
+ def create_hmac_ctx(self, key, algorithm):
+ return _HMACContext(self, key, algorithm)
+
+ def hash_supported(self, algorithm):
+ digest = self.lib.EVP_get_digestbyname(algorithm.name.encode("ascii"))
+ return digest != self.ffi.NULL
+
+ def create_hash_ctx(self, algorithm):
+ return _HashContext(self, algorithm)
+
+ def cipher_supported(self, cipher, mode):
+ try:
+ adapter = self._cipher_registry[type(cipher), type(mode)]
+ except KeyError:
+ return False
+ evp_cipher = adapter(self, cipher, mode)
+ return self.ffi.NULL != evp_cipher
+
+ def register_cipher_adapter(self, cipher_cls, mode_cls, adapter):
+ if (cipher_cls, mode_cls) in self._cipher_registry:
+ raise ValueError("Duplicate registration for: {0} {1}".format(
+ cipher_cls, mode_cls)
+ )
+ self._cipher_registry[cipher_cls, mode_cls] = adapter
+
+ def _register_default_ciphers(self):
+ for cipher_cls, mode_cls in itertools.product(
+ [AES, Camellia],
+ [CBC, CTR, ECB, OFB, CFB],
+ ):
+ self.register_cipher_adapter(
+ cipher_cls,
+ mode_cls,
+ GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}")
+ )
+ for mode_cls in [CBC, CFB, OFB]:
+ self.register_cipher_adapter(
+ TripleDES,
+ mode_cls,
+ GetCipherByName("des-ede3-{mode.name}")
+ )
+ for mode_cls in [CBC, CFB, OFB, ECB]:
+ self.register_cipher_adapter(
+ Blowfish,
+ mode_cls,
+ GetCipherByName("bf-{mode.name}")
+ )
+ self.register_cipher_adapter(
+ CAST5,
+ ECB,
+ GetCipherByName("cast5-ecb")
+ )
+ self.register_cipher_adapter(
+ ARC4,
+ type(None),
+ GetCipherByName("rc4")
+ )
+
+ def create_symmetric_encryption_ctx(self, cipher, mode):
+ return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
+
+ def create_symmetric_decryption_ctx(self, cipher, mode):
+ return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
+
class GetCipherByName(object):
def __init__(self, fmt):
@@ -145,7 +208,7 @@ class _CipherContext(object):
ctx = self._backend.lib.EVP_CIPHER_CTX_new()
ctx = self._backend.ffi.gc(ctx, self._backend.lib.EVP_CIPHER_CTX_free)
- registry = self._backend.ciphers._cipher_registry
+ registry = self._backend._cipher_registry
try:
adapter = registry[type(cipher), type(mode)]
except KeyError:
@@ -204,69 +267,6 @@ class _CipherContext(object):
return self._backend.ffi.buffer(buf)[:outlen[0]]
-class Ciphers(object):
- def __init__(self, backend):
- self._backend = backend
- self._cipher_registry = {}
- self._register_default_ciphers()
-
- def supported(self, cipher, mode):
- try:
- adapter = self._cipher_registry[type(cipher), type(mode)]
- except KeyError:
- return False
- evp_cipher = adapter(self._backend, cipher, mode)
- return self._backend.ffi.NULL != evp_cipher
-
- def register_cipher_adapter(self, cipher_cls, mode_cls, adapter):
- if (cipher_cls, mode_cls) in self._cipher_registry:
- raise ValueError("Duplicate registration for: {0} {1}".format(
- cipher_cls, mode_cls)
- )
- self._cipher_registry[cipher_cls, mode_cls] = adapter
-
- def _register_default_ciphers(self):
- for cipher_cls, mode_cls in itertools.product(
- [AES, Camellia],
- [CBC, CTR, ECB, OFB, CFB],
- ):
- self.register_cipher_adapter(
- cipher_cls,
- mode_cls,
- GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}")
- )
- for mode_cls in [CBC, CFB, OFB]:
- self.register_cipher_adapter(
- TripleDES,
- mode_cls,
- GetCipherByName("des-ede3-{mode.name}")
- )
- for mode_cls in [CBC, CFB, OFB, ECB]:
- self.register_cipher_adapter(
- Blowfish,
- mode_cls,
- GetCipherByName("bf-{mode.name}")
- )
- self.register_cipher_adapter(
- CAST5,
- ECB,
- GetCipherByName("cast5-ecb")
- )
- self.register_cipher_adapter(
- ARC4,
- type(None),
- GetCipherByName("rc4")
- )
-
- def create_encrypt_ctx(self, cipher, mode):
- return _CipherContext(self._backend, cipher, mode,
- _CipherContext._ENCRYPT)
-
- def create_decrypt_ctx(self, cipher, mode):
- return _CipherContext(self._backend, cipher, mode,
- _CipherContext._DECRYPT)
-
-
@interfaces.register(interfaces.HashContext)
class _HashContext(object):
def __init__(self, backend, algorithm, ctx=None):
@@ -307,60 +307,59 @@ class _HashContext(object):
assert res != 0
res = self._backend.lib.EVP_MD_CTX_cleanup(self._ctx)
assert res == 1
- return self._backend.ffi.buffer(buf)[:self.algorithm.digest_size]
+ return self._backend.ffi.buffer(buf)[:]
-class Hashes(object):
- def __init__(self, backend):
+@interfaces.register(interfaces.HashContext)
+class _HMACContext(object):
+ def __init__(self, backend, key, algorithm, ctx=None):
+ self.algorithm = algorithm
self._backend = backend
- def supported(self, algorithm):
- digest = self._backend.lib.EVP_get_digestbyname(
- algorithm.name.encode("ascii")
- )
- return digest != self._backend.ffi.NULL
-
- def create_ctx(self, algorithm):
- return _HashContext(self._backend, algorithm)
-
+ if ctx is None:
+ ctx = self._backend.ffi.new("HMAC_CTX *")
+ self._backend.lib.HMAC_CTX_init(ctx)
+ ctx = self._backend.ffi.gc(ctx, self._backend.lib.HMAC_CTX_cleanup)
+ evp_md = self._backend.lib.EVP_get_digestbyname(
+ algorithm.name.encode('ascii'))
+ assert evp_md != self._backend.ffi.NULL
+ res = self._backend.lib.Cryptography_HMAC_Init_ex(
+ ctx, key, len(key), evp_md, self._backend.ffi.NULL
+ )
+ assert res != 0
-class HMACs(object):
- def __init__(self, backend):
- self._backend = backend
+ self._ctx = ctx
+ self._key = key
- def create_ctx(self, key, hash_cls):
- ctx = self._backend.ffi.new("HMAC_CTX *")
- self._backend.lib.HMAC_CTX_init(ctx)
- ctx = self._backend.ffi.gc(ctx, self._backend.lib.HMAC_CTX_cleanup)
- evp_md = self._backend.lib.EVP_get_digestbyname(
- hash_cls.name.encode('ascii'))
- assert evp_md != self._backend.ffi.NULL
- res = self._backend.lib.Cryptography_HMAC_Init_ex(
- ctx, key, len(key), evp_md, self._backend.ffi.NULL
+ def copy(self):
+ copied_ctx = self._backend.ffi.new("HMAC_CTX *")
+ self._backend.lib.HMAC_CTX_init(copied_ctx)
+ copied_ctx = self._backend.ffi.gc(
+ copied_ctx, self._backend.lib.HMAC_CTX_cleanup
+ )
+ res = self._backend.lib.Cryptography_HMAC_CTX_copy(
+ copied_ctx, self._ctx
)
assert res != 0
- return ctx
-
- def update_ctx(self, ctx, data):
- res = self._backend.lib.Cryptography_HMAC_Update(ctx, data, len(data))
- assert res != 0
+ return _HMACContext(
+ self._backend, self._key, self.algorithm, ctx=copied_ctx
+ )
- def finalize_ctx(self, ctx, digest_size):
- buf = self._backend.ffi.new("unsigned char[]", digest_size)
- buflen = self._backend.ffi.new("unsigned int *", digest_size)
- res = self._backend.lib.Cryptography_HMAC_Final(ctx, buf, buflen)
+ def update(self, data):
+ res = self._backend.lib.Cryptography_HMAC_Update(
+ self._ctx, data, len(data)
+ )
assert res != 0
- self._backend.lib.HMAC_CTX_cleanup(ctx)
- return self._backend.ffi.buffer(buf)[:digest_size]
- def copy_ctx(self, ctx):
- copied_ctx = self._backend.ffi.new("HMAC_CTX *")
- self._backend.lib.HMAC_CTX_init(copied_ctx)
- copied_ctx = self._backend.ffi.gc(copied_ctx,
- self._backend.lib.HMAC_CTX_cleanup)
- res = self._backend.lib.Cryptography_HMAC_CTX_copy(copied_ctx, ctx)
+ def finalize(self):
+ buf = self._backend.ffi.new("unsigned char[]",
+ self.algorithm.digest_size)
+ buflen = self._backend.ffi.new("unsigned int *",
+ self.algorithm.digest_size)
+ res = self._backend.lib.Cryptography_HMAC_Final(self._ctx, buf, buflen)
assert res != 0
- return copied_ctx
+ self._backend.lib.HMAC_CTX_cleanup(self._ctx)
+ return self._backend.ffi.buffer(buf)[:]
backend = Backend()