aboutsummaryrefslogtreecommitdiffstats
path: root/cryptography/hazmat/bindings/openssl/backend.py
diff options
context:
space:
mode:
Diffstat (limited to 'cryptography/hazmat/bindings/openssl/backend.py')
-rw-r--r--cryptography/hazmat/bindings/openssl/backend.py262
1 files changed, 262 insertions, 0 deletions
diff --git a/cryptography/hazmat/bindings/openssl/backend.py b/cryptography/hazmat/bindings/openssl/backend.py
new file mode 100644
index 00000000..494430ba
--- /dev/null
+++ b/cryptography/hazmat/bindings/openssl/backend.py
@@ -0,0 +1,262 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
+import itertools
+import sys
+
+import cffi
+
+from cryptography.hazmat.primitives import interfaces
+from cryptography.hazmat.primitives.block.ciphers import (
+ AES, Camellia, TripleDES,
+)
+from cryptography.hazmat.primitives.block.modes import CBC, CTR, ECB, OFB, CFB
+
+
+class Backend(object):
+ """
+ OpenSSL API wrapper.
+ """
+ _modules = [
+ "asn1",
+ "bignum",
+ "bio",
+ "conf",
+ "crypto",
+ "dh",
+ "dsa",
+ "engine",
+ "err",
+ "evp",
+ "hmac",
+ "nid",
+ "opensslv",
+ "pem",
+ "pkcs7",
+ "pkcs12",
+ "rand",
+ "rsa",
+ "ssl",
+ "x509",
+ "x509name",
+ "x509v3",
+ ]
+
+ def __init__(self):
+ self.ffi = cffi.FFI()
+ includes = []
+ functions = []
+ macros = []
+ for name in self._modules:
+ module_name = "cryptography.hazmat.bindings.openssl." + name
+ __import__(module_name)
+ module = sys.modules[module_name]
+
+ self.ffi.cdef(module.TYPES)
+
+ macros.append(module.MACROS)
+ functions.append(module.FUNCTIONS)
+ includes.append(module.INCLUDES)
+
+ # loop over the functions & macros after declaring all the types
+ # so we can set interdependent types in different files and still
+ # have them all defined before we parse the funcs & macros
+ for func in functions:
+ self.ffi.cdef(func)
+ for macro in macros:
+ self.ffi.cdef(macro)
+
+ # We include functions here so that if we got any of their definitions
+ # wrong, the underlying C compiler will explode. In C you are allowed
+ # to re-declare a function if it has the same signature. That is:
+ # int foo(int);
+ # int foo(int);
+ # is legal, but the following will fail to compile:
+ # int foo(int);
+ # int foo(short);
+ self.lib = self.ffi.verify(
+ source="\n".join(includes + functions),
+ libraries=["crypto", "ssl"],
+ )
+
+ self.lib.OpenSSL_add_all_algorithms()
+ self.lib.SSL_load_error_strings()
+
+ self.ciphers = Ciphers(self)
+ self.hashes = Hashes(self)
+
+ def openssl_version_text(self):
+ """
+ Friendly string name of linked OpenSSL.
+
+ Example: OpenSSL 1.0.1e 11 Feb 2013
+ """
+ return self.ffi.string(self.lib.OPENSSL_VERSION_TEXT).decode("ascii")
+
+
+class GetCipherByName(object):
+ def __init__(self, fmt):
+ self._fmt = fmt
+
+ def __call__(self, backend, cipher, mode):
+ cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
+ return backend.lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
+
+
+@interfaces.register(interfaces.CipherContext)
+class _CipherContext(object):
+ _ENCRYPT = 1
+ _DECRYPT = 0
+
+ def __init__(self, backend, cipher, mode, operation):
+ self._backend = backend
+
+ ctx = self._backend.lib.EVP_CIPHER_CTX_new()
+ ctx = self._backend.ffi.gc(ctx, self._backend.lib.EVP_CIPHER_CTX_free)
+
+ registry = self._backend.ciphers._cipher_registry
+ evp_cipher = registry[type(cipher), type(mode)](
+ self._backend, cipher, mode
+ )
+ assert evp_cipher != self._backend.ffi.NULL
+ if isinstance(mode, interfaces.ModeWithInitializationVector):
+ iv_nonce = mode.initialization_vector
+ elif isinstance(mode, interfaces.ModeWithNonce):
+ iv_nonce = mode.nonce
+ else:
+ iv_nonce = self._backend.ffi.NULL
+ res = self._backend.lib.EVP_CipherInit_ex(ctx, evp_cipher,
+ self._backend.ffi.NULL,
+ cipher.key, iv_nonce,
+ operation)
+ assert res != 0
+ # We purposely disable padding here as it's handled higher up in the
+ # API.
+ self._backend.lib.EVP_CIPHER_CTX_set_padding(ctx, 0)
+ self._ctx = ctx
+
+ def update(self, data):
+ block_size = self._backend.lib.EVP_CIPHER_CTX_block_size(self._ctx)
+ buf = self._backend.ffi.new("unsigned char[]",
+ len(data) + block_size - 1)
+ outlen = self._backend.ffi.new("int *")
+ res = self._backend.lib.EVP_CipherUpdate(self._ctx, buf, outlen, data,
+ len(data))
+ assert res != 0
+ return self._backend.ffi.buffer(buf)[:outlen[0]]
+
+ def finalize(self):
+ block_size = self._backend.lib.EVP_CIPHER_CTX_block_size(self._ctx)
+ buf = self._backend.ffi.new("unsigned char[]", block_size)
+ outlen = self._backend.ffi.new("int *")
+ res = self._backend.lib.EVP_CipherFinal_ex(self._ctx, buf, outlen)
+ assert res != 0
+ res = self._backend.lib.EVP_CIPHER_CTX_cleanup(self._ctx)
+ assert res == 1
+ return self._backend.ffi.buffer(buf)[:outlen[0]]
+
+
+class Ciphers(object):
+ def __init__(self, backend):
+ super(Ciphers, self).__init__()
+ self._backend = backend
+ self._cipher_registry = {}
+ self._register_default_ciphers()
+
+ def supported(self, cipher, mode):
+ try:
+ adapter = self._cipher_registry[type(cipher), type(mode)]
+ except KeyError:
+ return False
+ evp_cipher = adapter(self._backend, cipher, mode)
+ return self._backend.ffi.NULL != evp_cipher
+
+ def register_cipher_adapter(self, cipher_cls, mode_cls, adapter):
+ if (cipher_cls, mode_cls) in self._cipher_registry:
+ raise ValueError("Duplicate registration for: {0} {1}".format(
+ cipher_cls, mode_cls)
+ )
+ self._cipher_registry[cipher_cls, mode_cls] = adapter
+
+ def _register_default_ciphers(self):
+ for cipher_cls, mode_cls in itertools.product(
+ [AES, Camellia],
+ [CBC, CTR, ECB, OFB, CFB],
+ ):
+ self.register_cipher_adapter(
+ cipher_cls,
+ mode_cls,
+ GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}")
+ )
+ for mode_cls in [CBC, CFB, OFB]:
+ self.register_cipher_adapter(
+ TripleDES,
+ mode_cls,
+ GetCipherByName("des-ede3-{mode.name}")
+ )
+
+ def create_encrypt_ctx(self, cipher, mode):
+ return _CipherContext(self._backend, cipher, mode,
+ _CipherContext._ENCRYPT)
+
+ def create_decrypt_ctx(self, cipher, mode):
+ return _CipherContext(self._backend, cipher, mode,
+ _CipherContext._DECRYPT)
+
+
+class Hashes(object):
+ def __init__(self, backend):
+ super(Hashes, self).__init__()
+ self._backend = backend
+
+ def supported(self, hash_cls):
+ return (self._backend.ffi.NULL !=
+ self._backend.lib.EVP_get_digestbyname(
+ hash_cls.name.encode("ascii")))
+
+ def create_ctx(self, hashobject):
+ ctx = self._backend.lib.EVP_MD_CTX_create()
+ ctx = self._backend.ffi.gc(ctx, self._backend.lib.EVP_MD_CTX_destroy)
+ evp_md = self._backend.lib.EVP_get_digestbyname(
+ hashobject.name.encode("ascii"))
+ assert evp_md != self._backend.ffi.NULL
+ res = self._backend.lib.EVP_DigestInit_ex(ctx, evp_md,
+ self._backend.ffi.NULL)
+ assert res != 0
+ return ctx
+
+ def update_ctx(self, ctx, data):
+ res = self._backend.lib.EVP_DigestUpdate(ctx, data, len(data))
+ assert res != 0
+
+ def finalize_ctx(self, ctx, digest_size):
+ buf = self._backend.ffi.new("unsigned char[]", digest_size)
+ res = self._backend.lib.EVP_DigestFinal_ex(ctx, buf,
+ self._backend.ffi.NULL)
+ assert res != 0
+ res = self._backend.lib.EVP_MD_CTX_cleanup(ctx)
+ assert res == 1
+ return self._backend.ffi.buffer(buf)[:digest_size]
+
+ def copy_ctx(self, ctx):
+ copied_ctx = self._backend.lib.EVP_MD_CTX_create()
+ copied_ctx = self._backend.ffi.gc(copied_ctx,
+ self._backend.lib.EVP_MD_CTX_destroy)
+ res = self._backend.lib.EVP_MD_CTX_copy_ex(copied_ctx, ctx)
+ assert res != 0
+ return copied_ctx
+
+
+backend = Backend()