aboutsummaryrefslogtreecommitdiffstats
path: root/cryptography/hazmat/bindings/openssl/backend.py
diff options
context:
space:
mode:
Diffstat (limited to 'cryptography/hazmat/bindings/openssl/backend.py')
-rw-r--r--cryptography/hazmat/bindings/openssl/backend.py274
1 files changed, 145 insertions, 129 deletions
diff --git a/cryptography/hazmat/bindings/openssl/backend.py b/cryptography/hazmat/bindings/openssl/backend.py
index 0c3d22d5..92cd3868 100644
--- a/cryptography/hazmat/bindings/openssl/backend.py
+++ b/cryptography/hazmat/bindings/openssl/backend.py
@@ -18,10 +18,11 @@ import sys
import cffi
+from cryptography import utils
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.primitives import interfaces
from cryptography.hazmat.primitives.ciphers.algorithms import (
- AES, Blowfish, Camellia, CAST5, TripleDES,
+ AES, Blowfish, Camellia, CAST5, TripleDES, ARC4,
)
from cryptography.hazmat.primitives.ciphers.modes import (
CBC, CTR, ECB, OFB, CFB
@@ -63,9 +64,8 @@ class Backend(object):
def __init__(self):
self._ensure_ffi_initialized()
- self.ciphers = Ciphers(self)
- self.hashes = Hashes(self)
- self.hmacs = HMACs(self)
+ self._cipher_registry = {}
+ self._register_default_ciphers()
@classmethod
def _ensure_ffi_initialized(cls):
@@ -123,6 +123,70 @@ class Backend(object):
"""
return self.ffi.string(self.lib.OPENSSL_VERSION_TEXT).decode("ascii")
+ def create_hmac_ctx(self, key, algorithm):
+ return _HMACContext(self, key, algorithm)
+
+ def hash_supported(self, algorithm):
+ digest = self.lib.EVP_get_digestbyname(algorithm.name.encode("ascii"))
+ return digest != self.ffi.NULL
+
+ def create_hash_ctx(self, algorithm):
+ return _HashContext(self, algorithm)
+
+ def cipher_supported(self, cipher, mode):
+ try:
+ adapter = self._cipher_registry[type(cipher), type(mode)]
+ except KeyError:
+ return False
+ evp_cipher = adapter(self, cipher, mode)
+ return self.ffi.NULL != evp_cipher
+
+ def register_cipher_adapter(self, cipher_cls, mode_cls, adapter):
+ if (cipher_cls, mode_cls) in self._cipher_registry:
+ raise ValueError("Duplicate registration for: {0} {1}".format(
+ cipher_cls, mode_cls)
+ )
+ self._cipher_registry[cipher_cls, mode_cls] = adapter
+
+ def _register_default_ciphers(self):
+ for cipher_cls, mode_cls in itertools.product(
+ [AES, Camellia],
+ [CBC, CTR, ECB, OFB, CFB],
+ ):
+ self.register_cipher_adapter(
+ cipher_cls,
+ mode_cls,
+ GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}")
+ )
+ for mode_cls in [CBC, CFB, OFB]:
+ self.register_cipher_adapter(
+ TripleDES,
+ mode_cls,
+ GetCipherByName("des-ede3-{mode.name}")
+ )
+ for mode_cls in [CBC, CFB, OFB, ECB]:
+ self.register_cipher_adapter(
+ Blowfish,
+ mode_cls,
+ GetCipherByName("bf-{mode.name}")
+ )
+ self.register_cipher_adapter(
+ CAST5,
+ ECB,
+ GetCipherByName("cast5-ecb")
+ )
+ self.register_cipher_adapter(
+ ARC4,
+ type(None),
+ GetCipherByName("rc4")
+ )
+
+ def create_symmetric_encryption_ctx(self, cipher, mode):
+ return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
+
+ def create_symmetric_decryption_ctx(self, cipher, mode):
+ return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
+
class GetCipherByName(object):
def __init__(self, fmt):
@@ -133,18 +197,19 @@ class GetCipherByName(object):
return backend.lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
-@interfaces.register(interfaces.CipherContext)
+@utils.register_interface(interfaces.CipherContext)
class _CipherContext(object):
_ENCRYPT = 1
_DECRYPT = 0
def __init__(self, backend, cipher, mode, operation):
self._backend = backend
+ self._cipher = cipher
ctx = self._backend.lib.EVP_CIPHER_CTX_new()
ctx = self._backend.ffi.gc(ctx, self._backend.lib.EVP_CIPHER_CTX_free)
- registry = self._backend.ciphers._cipher_registry
+ registry = self._backend._cipher_registry
try:
adapter = registry[type(cipher), type(mode)]
except KeyError:
@@ -185,9 +250,8 @@ class _CipherContext(object):
self._ctx = ctx
def update(self, data):
- block_size = self._backend.lib.EVP_CIPHER_CTX_block_size(self._ctx)
buf = self._backend.ffi.new("unsigned char[]",
- len(data) + block_size - 1)
+ len(data) + self._cipher.block_size - 1)
outlen = self._backend.ffi.new("int *")
res = self._backend.lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
len(data))
@@ -195,8 +259,7 @@ class _CipherContext(object):
return self._backend.ffi.buffer(buf)[:outlen[0]]
def finalize(self):
- block_size = self._backend.lib.EVP_CIPHER_CTX_block_size(self._ctx)
- buf = self._backend.ffi.new("unsigned char[]", block_size)
+ buf = self._backend.ffi.new("unsigned char[]", self._cipher.block_size)
outlen = self._backend.ffi.new("int *")
res = self._backend.lib.EVP_CipherFinal_ex(self._ctx, buf, outlen)
assert res != 0
@@ -205,146 +268,99 @@ class _CipherContext(object):
return self._backend.ffi.buffer(buf)[:outlen[0]]
-class Ciphers(object):
- def __init__(self, backend):
- super(Ciphers, self).__init__()
- self._backend = backend
- self._cipher_registry = {}
- self._register_default_ciphers()
+@utils.register_interface(interfaces.HashContext)
+class _HashContext(object):
+ def __init__(self, backend, algorithm, ctx=None):
+ self.algorithm = algorithm
- def supported(self, cipher, mode):
- try:
- adapter = self._cipher_registry[type(cipher), type(mode)]
- except KeyError:
- return False
- evp_cipher = adapter(self._backend, cipher, mode)
- return self._backend.ffi.NULL != evp_cipher
-
- def register_cipher_adapter(self, cipher_cls, mode_cls, adapter):
- if (cipher_cls, mode_cls) in self._cipher_registry:
- raise ValueError("Duplicate registration for: {0} {1}".format(
- cipher_cls, mode_cls)
- )
- self._cipher_registry[cipher_cls, mode_cls] = adapter
-
- def _register_default_ciphers(self):
- for cipher_cls, mode_cls in itertools.product(
- [AES, Camellia],
- [CBC, CTR, ECB, OFB, CFB],
- ):
- self.register_cipher_adapter(
- cipher_cls,
- mode_cls,
- GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}")
- )
- for mode_cls in [CBC, CFB, OFB]:
- self.register_cipher_adapter(
- TripleDES,
- mode_cls,
- GetCipherByName("des-ede3-{mode.name}")
- )
- for mode_cls in [CBC, CFB, OFB, ECB]:
- self.register_cipher_adapter(
- Blowfish,
- mode_cls,
- GetCipherByName("bf-{mode.name}")
- )
- self.register_cipher_adapter(
- CAST5,
- ECB,
- GetCipherByName("cast5-ecb")
- )
-
- def create_encrypt_ctx(self, cipher, mode):
- return _CipherContext(self._backend, cipher, mode,
- _CipherContext._ENCRYPT)
-
- def create_decrypt_ctx(self, cipher, mode):
- return _CipherContext(self._backend, cipher, mode,
- _CipherContext._DECRYPT)
+ self._backend = backend
+ if ctx is None:
+ ctx = self._backend.lib.EVP_MD_CTX_create()
+ ctx = self._backend.ffi.gc(ctx,
+ self._backend.lib.EVP_MD_CTX_destroy)
+ evp_md = self._backend.lib.EVP_get_digestbyname(
+ algorithm.name.encode("ascii"))
+ assert evp_md != self._backend.ffi.NULL
+ res = self._backend.lib.EVP_DigestInit_ex(ctx, evp_md,
+ self._backend.ffi.NULL)
+ assert res != 0
-class Hashes(object):
- def __init__(self, backend):
- super(Hashes, self).__init__()
- self._backend = backend
+ self._ctx = ctx
- def supported(self, hash_cls):
- return (self._backend.ffi.NULL !=
- self._backend.lib.EVP_get_digestbyname(
- hash_cls.name.encode("ascii")))
-
- def create_ctx(self, hashobject):
- ctx = self._backend.lib.EVP_MD_CTX_create()
- ctx = self._backend.ffi.gc(ctx, self._backend.lib.EVP_MD_CTX_destroy)
- evp_md = self._backend.lib.EVP_get_digestbyname(
- hashobject.name.encode("ascii"))
- assert evp_md != self._backend.ffi.NULL
- res = self._backend.lib.EVP_DigestInit_ex(ctx, evp_md,
- self._backend.ffi.NULL)
+ def copy(self):
+ copied_ctx = self._backend.lib.EVP_MD_CTX_create()
+ copied_ctx = self._backend.ffi.gc(copied_ctx,
+ self._backend.lib.EVP_MD_CTX_destroy)
+ res = self._backend.lib.EVP_MD_CTX_copy_ex(copied_ctx, self._ctx)
assert res != 0
- return ctx
+ return _HashContext(self._backend, self.algorithm, ctx=copied_ctx)
- def update_ctx(self, ctx, data):
- res = self._backend.lib.EVP_DigestUpdate(ctx, data, len(data))
+ def update(self, data):
+ res = self._backend.lib.EVP_DigestUpdate(self._ctx, data, len(data))
assert res != 0
- def finalize_ctx(self, ctx, digest_size):
- buf = self._backend.ffi.new("unsigned char[]", digest_size)
- res = self._backend.lib.EVP_DigestFinal_ex(ctx, buf,
+ def finalize(self):
+ buf = self._backend.ffi.new("unsigned char[]",
+ self.algorithm.digest_size)
+ res = self._backend.lib.EVP_DigestFinal_ex(self._ctx, buf,
self._backend.ffi.NULL)
assert res != 0
- res = self._backend.lib.EVP_MD_CTX_cleanup(ctx)
+ res = self._backend.lib.EVP_MD_CTX_cleanup(self._ctx)
assert res == 1
- return self._backend.ffi.buffer(buf)[:digest_size]
-
- def copy_ctx(self, ctx):
- copied_ctx = self._backend.lib.EVP_MD_CTX_create()
- copied_ctx = self._backend.ffi.gc(copied_ctx,
- self._backend.lib.EVP_MD_CTX_destroy)
- res = self._backend.lib.EVP_MD_CTX_copy_ex(copied_ctx, ctx)
- assert res != 0
- return copied_ctx
+ return self._backend.ffi.buffer(buf)[:]
-class HMACs(object):
- def __init__(self, backend):
- super(HMACs, self).__init__()
+@utils.register_interface(interfaces.HashContext)
+class _HMACContext(object):
+ def __init__(self, backend, key, algorithm, ctx=None):
+ self.algorithm = algorithm
self._backend = backend
- def create_ctx(self, key, hash_cls):
- ctx = self._backend.ffi.new("HMAC_CTX *")
- self._backend.lib.HMAC_CTX_init(ctx)
- ctx = self._backend.ffi.gc(ctx, self._backend.lib.HMAC_CTX_cleanup)
- evp_md = self._backend.lib.EVP_get_digestbyname(
- hash_cls.name.encode('ascii'))
- assert evp_md != self._backend.ffi.NULL
- res = self._backend.lib.Cryptography_HMAC_Init_ex(
- ctx, key, len(key), evp_md, self._backend.ffi.NULL
- )
- assert res != 0
- return ctx
+ if ctx is None:
+ ctx = self._backend.ffi.new("HMAC_CTX *")
+ self._backend.lib.HMAC_CTX_init(ctx)
+ ctx = self._backend.ffi.gc(ctx, self._backend.lib.HMAC_CTX_cleanup)
+ evp_md = self._backend.lib.EVP_get_digestbyname(
+ algorithm.name.encode('ascii'))
+ assert evp_md != self._backend.ffi.NULL
+ res = self._backend.lib.Cryptography_HMAC_Init_ex(
+ ctx, key, len(key), evp_md, self._backend.ffi.NULL
+ )
+ assert res != 0
+
+ self._ctx = ctx
+ self._key = key
- def update_ctx(self, ctx, data):
- res = self._backend.lib.Cryptography_HMAC_Update(ctx, data, len(data))
+ def copy(self):
+ copied_ctx = self._backend.ffi.new("HMAC_CTX *")
+ self._backend.lib.HMAC_CTX_init(copied_ctx)
+ copied_ctx = self._backend.ffi.gc(
+ copied_ctx, self._backend.lib.HMAC_CTX_cleanup
+ )
+ res = self._backend.lib.Cryptography_HMAC_CTX_copy(
+ copied_ctx, self._ctx
+ )
assert res != 0
+ return _HMACContext(
+ self._backend, self._key, self.algorithm, ctx=copied_ctx
+ )
- def finalize_ctx(self, ctx, digest_size):
- buf = self._backend.ffi.new("unsigned char[]", digest_size)
- buflen = self._backend.ffi.new("unsigned int *", digest_size)
- res = self._backend.lib.Cryptography_HMAC_Final(ctx, buf, buflen)
+ def update(self, data):
+ res = self._backend.lib.Cryptography_HMAC_Update(
+ self._ctx, data, len(data)
+ )
assert res != 0
- self._backend.lib.HMAC_CTX_cleanup(ctx)
- return self._backend.ffi.buffer(buf)[:digest_size]
- def copy_ctx(self, ctx):
- copied_ctx = self._backend.ffi.new("HMAC_CTX *")
- self._backend.lib.HMAC_CTX_init(copied_ctx)
- copied_ctx = self._backend.ffi.gc(copied_ctx,
- self._backend.lib.HMAC_CTX_cleanup)
- res = self._backend.lib.Cryptography_HMAC_CTX_copy(copied_ctx, ctx)
+ def finalize(self):
+ buf = self._backend.ffi.new("unsigned char[]",
+ self.algorithm.digest_size)
+ buflen = self._backend.ffi.new("unsigned int *",
+ self.algorithm.digest_size)
+ res = self._backend.lib.Cryptography_HMAC_Final(self._ctx, buf, buflen)
assert res != 0
- return copied_ctx
+ self._backend.lib.HMAC_CTX_cleanup(self._ctx)
+ return self._backend.ffi.buffer(buf)[:]
backend = Backend()