diff options
author | Alex Gaynor <alex.gaynor@gmail.com> | 2014-01-01 16:18:53 -0800 |
---|---|---|
committer | Alex Gaynor <alex.gaynor@gmail.com> | 2014-01-01 16:18:53 -0800 |
commit | 604afb5d5531da3f93ed83526e81f09ba76b3c60 (patch) | |
tree | 668a11d9426077331c04ec7eb4e91a4494bce73c /cryptography | |
parent | 92217366e331ab5bcdb7e64dccc14048647bc5c7 (diff) | |
download | cryptography-604afb5d5531da3f93ed83526e81f09ba76b3c60.tar.gz cryptography-604afb5d5531da3f93ed83526e81f09ba76b3c60.tar.bz2 cryptography-604afb5d5531da3f93ed83526e81f09ba76b3c60.zip |
Make lib and ffi be private on backend
Because now they are public on a binding instance if you need them!
Diffstat (limited to 'cryptography')
-rw-r--r-- | cryptography/hazmat/backends/openssl/backend.py | 202 |
1 files changed, 107 insertions, 95 deletions
diff --git a/cryptography/hazmat/backends/openssl/backend.py b/cryptography/hazmat/backends/openssl/backend.py index b25d86d0..470aa399 100644 --- a/cryptography/hazmat/backends/openssl/backend.py +++ b/cryptography/hazmat/backends/openssl/backend.py @@ -40,11 +40,11 @@ class Backend(object): def __init__(self): self._binding = Binding() - self.ffi = self._binding.ffi - self.lib = self._binding.lib + self._ffi = self._binding.ffi + self._lib = self._binding.lib - self.lib.OpenSSL_add_all_algorithms() - self.lib.SSL_load_error_strings() + self._lib.OpenSSL_add_all_algorithms() + self._lib.SSL_load_error_strings() self._cipher_registry = {} self._register_default_ciphers() @@ -55,14 +55,14 @@ class Backend(object): Example: OpenSSL 1.0.1e 11 Feb 2013 """ - return self.ffi.string(self.lib.OPENSSL_VERSION_TEXT).decode("ascii") + return self._ffi.string(self._lib.OPENSSL_VERSION_TEXT).decode("ascii") def create_hmac_ctx(self, key, algorithm): return _HMACContext(self, key, algorithm) def hash_supported(self, algorithm): - digest = self.lib.EVP_get_digestbyname(algorithm.name.encode("ascii")) - return digest != self.ffi.NULL + digest = self._lib.EVP_get_digestbyname(algorithm.name.encode("ascii")) + return digest != self._ffi.NULL def hmac_supported(self, algorithm): return self.hash_supported(algorithm) @@ -76,7 +76,7 @@ class Backend(object): except KeyError: return False evp_cipher = adapter(self, cipher, mode) - return self.ffi.NULL != evp_cipher + return self._ffi.NULL != evp_cipher def register_cipher_adapter(self, cipher_cls, mode_cls, adapter): if (cipher_cls, mode_cls) in self._cipher_registry: @@ -130,25 +130,25 @@ class Backend(object): return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT) def _handle_error(self, mode): - code = self.lib.ERR_get_error() + code = self._lib.ERR_get_error() if not code and isinstance(mode, GCM): raise InvalidTag assert code != 0 - lib = self.lib.ERR_GET_LIB(code) - func = self.lib.ERR_GET_FUNC(code) - reason = self.lib.ERR_GET_REASON(code) + lib = self._lib.ERR_GET_LIB(code) + func = self._lib.ERR_GET_FUNC(code) + reason = self._lib.ERR_GET_REASON(code) return self._handle_error_code(lib, func, reason) def _handle_error_code(self, lib, func, reason): - if lib == self.lib.ERR_LIB_EVP: - if func == self.lib.EVP_F_EVP_ENCRYPTFINAL_EX: - if reason == self.lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH: + if lib == self._lib.ERR_LIB_EVP: + if func == self._lib.EVP_F_EVP_ENCRYPTFINAL_EX: + if reason == self._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH: raise ValueError( "The length of the provided data is not a multiple of " "the block length" ) - elif func == self.lib.EVP_F_EVP_DECRYPTFINAL_EX: - if reason == self.lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH: + elif func == self._lib.EVP_F_EVP_DECRYPTFINAL_EX: + if reason == self._lib.EVP_R_DATA_NOT_MULTIPLE_OF_BLOCK_LENGTH: raise ValueError( "The length of the provided data is not a multiple of " "the block length" @@ -165,7 +165,7 @@ class GetCipherByName(object): def __call__(self, backend, cipher, mode): cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower() - return backend.lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) + return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) @utils.register_interface(interfaces.CipherContext) @@ -187,8 +187,10 @@ class _CipherContext(object): else: self._block_size = 1 - ctx = self._backend.lib.EVP_CIPHER_CTX_new() - ctx = self._backend.ffi.gc(ctx, self._backend.lib.EVP_CIPHER_CTX_free) + ctx = self._backend._lib.EVP_CIPHER_CTX_new() + ctx = self._backend._ffi.gc( + ctx, self._backend._lib.EVP_CIPHER_CTX_free + ) registry = self._backend._cipher_registry try: @@ -201,7 +203,7 @@ class _CipherContext(object): ) evp_cipher = adapter(self._backend, cipher, mode) - if evp_cipher == self._backend.ffi.NULL: + if evp_cipher == self._backend._ffi.NULL: raise UnsupportedAlgorithm( "cipher {0} in {1} mode is not supported " "by this backend".format( @@ -213,31 +215,31 @@ class _CipherContext(object): elif isinstance(mode, interfaces.ModeWithNonce): iv_nonce = mode.nonce else: - iv_nonce = self._backend.ffi.NULL + iv_nonce = self._backend._ffi.NULL # begin init with cipher and operation type - res = self._backend.lib.EVP_CipherInit_ex(ctx, evp_cipher, - self._backend.ffi.NULL, - self._backend.ffi.NULL, - self._backend.ffi.NULL, - operation) + res = self._backend._lib.EVP_CipherInit_ex(ctx, evp_cipher, + self._backend._ffi.NULL, + self._backend._ffi.NULL, + self._backend._ffi.NULL, + operation) assert res != 0 # set the key length to handle variable key ciphers - res = self._backend.lib.EVP_CIPHER_CTX_set_key_length( + res = self._backend._lib.EVP_CIPHER_CTX_set_key_length( ctx, len(cipher.key) ) assert res != 0 if isinstance(mode, GCM): - res = self._backend.lib.EVP_CIPHER_CTX_ctrl( - ctx, self._backend.lib.EVP_CTRL_GCM_SET_IVLEN, - len(iv_nonce), self._backend.ffi.NULL + res = self._backend._lib.EVP_CIPHER_CTX_ctrl( + ctx, self._backend._lib.EVP_CTRL_GCM_SET_IVLEN, + len(iv_nonce), self._backend._ffi.NULL ) assert res != 0 if operation == self._DECRYPT: if not mode.tag or len(mode.tag) < 4: raise ValueError("Authentication tag must be provided and " "be 4 bytes or longer when decrypting") - res = self._backend.lib.EVP_CIPHER_CTX_ctrl( - ctx, self._backend.lib.EVP_CTRL_GCM_SET_TAG, + res = self._backend._lib.EVP_CIPHER_CTX_ctrl( + ctx, self._backend._lib.EVP_CTRL_GCM_SET_TAG, len(mode.tag), mode.tag ) assert res != 0 @@ -247,52 +249,57 @@ class _CipherContext(object): "encrypting") # pass key/iv - res = self._backend.lib.EVP_CipherInit_ex(ctx, self._backend.ffi.NULL, - self._backend.ffi.NULL, - cipher.key, - iv_nonce, - operation) + res = self._backend._lib.EVP_CipherInit_ex( + ctx, + self._backend._ffi.NULL, + self._backend._ffi.NULL, + cipher.key, + iv_nonce, + operation + ) assert res != 0 # We purposely disable padding here as it's handled higher up in the # API. - self._backend.lib.EVP_CIPHER_CTX_set_padding(ctx, 0) + self._backend._lib.EVP_CIPHER_CTX_set_padding(ctx, 0) self._ctx = ctx def update(self, data): - buf = self._backend.ffi.new("unsigned char[]", - len(data) + self._block_size - 1) - outlen = self._backend.ffi.new("int *") - res = self._backend.lib.EVP_CipherUpdate(self._ctx, buf, outlen, data, - len(data)) + buf = self._backend._ffi.new("unsigned char[]", + len(data) + self._block_size - 1) + outlen = self._backend._ffi.new("int *") + res = self._backend._lib.EVP_CipherUpdate(self._ctx, buf, outlen, data, + len(data)) assert res != 0 - return self._backend.ffi.buffer(buf)[:outlen[0]] + return self._backend._ffi.buffer(buf)[:outlen[0]] def finalize(self): - buf = self._backend.ffi.new("unsigned char[]", self._block_size) - outlen = self._backend.ffi.new("int *") - res = self._backend.lib.EVP_CipherFinal_ex(self._ctx, buf, outlen) + buf = self._backend._ffi.new("unsigned char[]", self._block_size) + outlen = self._backend._ffi.new("int *") + res = self._backend._lib.EVP_CipherFinal_ex(self._ctx, buf, outlen) if res == 0: self._backend._handle_error(self._mode) if (isinstance(self._mode, GCM) and self._operation == self._ENCRYPT): block_byte_size = self._block_size // 8 - tag_buf = self._backend.ffi.new("unsigned char[]", block_byte_size) - res = self._backend.lib.EVP_CIPHER_CTX_ctrl( - self._ctx, self._backend.lib.EVP_CTRL_GCM_GET_TAG, + tag_buf = self._backend._ffi.new( + "unsigned char[]", block_byte_size + ) + res = self._backend._lib.EVP_CIPHER_CTX_ctrl( + self._ctx, self._backend._lib.EVP_CTRL_GCM_GET_TAG, block_byte_size, tag_buf ) assert res != 0 - self._tag = self._backend.ffi.buffer(tag_buf)[:] + self._tag = self._backend._ffi.buffer(tag_buf)[:] - res = self._backend.lib.EVP_CIPHER_CTX_cleanup(self._ctx) + res = self._backend._lib.EVP_CIPHER_CTX_cleanup(self._ctx) assert res == 1 - return self._backend.ffi.buffer(buf)[:outlen[0]] + return self._backend._ffi.buffer(buf)[:outlen[0]] def authenticate_additional_data(self, data): - outlen = self._backend.ffi.new("int *") - res = self._backend.lib.EVP_CipherUpdate( - self._ctx, self._backend.ffi.NULL, outlen, data, len(data) + outlen = self._backend._ffi.new("int *") + res = self._backend._lib.EVP_CipherUpdate( + self._ctx, self._backend._ffi.NULL, outlen, data, len(data) ) assert res != 0 @@ -309,43 +316,44 @@ class _HashContext(object): self._backend = backend if ctx is None: - ctx = self._backend.lib.EVP_MD_CTX_create() - ctx = self._backend.ffi.gc(ctx, - self._backend.lib.EVP_MD_CTX_destroy) - evp_md = self._backend.lib.EVP_get_digestbyname( + ctx = self._backend._lib.EVP_MD_CTX_create() + ctx = self._backend._ffi.gc(ctx, + self._backend._lib.EVP_MD_CTX_destroy) + evp_md = self._backend._lib.EVP_get_digestbyname( algorithm.name.encode("ascii")) - if evp_md == self._backend.ffi.NULL: + if evp_md == self._backend._ffi.NULL: raise UnsupportedAlgorithm( "{0} is not a supported hash on this backend".format( algorithm.name) ) - res = self._backend.lib.EVP_DigestInit_ex(ctx, evp_md, - self._backend.ffi.NULL) + res = self._backend._lib.EVP_DigestInit_ex(ctx, evp_md, + self._backend._ffi.NULL) assert res != 0 self._ctx = ctx def copy(self): - copied_ctx = self._backend.lib.EVP_MD_CTX_create() - copied_ctx = self._backend.ffi.gc(copied_ctx, - self._backend.lib.EVP_MD_CTX_destroy) - res = self._backend.lib.EVP_MD_CTX_copy_ex(copied_ctx, self._ctx) + copied_ctx = self._backend._lib.EVP_MD_CTX_create() + copied_ctx = self._backend._ffi.gc( + copied_ctx, self._backend._lib.EVP_MD_CTX_destroy + ) + res = self._backend._lib.EVP_MD_CTX_copy_ex(copied_ctx, self._ctx) assert res != 0 return _HashContext(self._backend, self.algorithm, ctx=copied_ctx) def update(self, data): - res = self._backend.lib.EVP_DigestUpdate(self._ctx, data, len(data)) + res = self._backend._lib.EVP_DigestUpdate(self._ctx, data, len(data)) assert res != 0 def finalize(self): - buf = self._backend.ffi.new("unsigned char[]", - self.algorithm.digest_size) - res = self._backend.lib.EVP_DigestFinal_ex(self._ctx, buf, - self._backend.ffi.NULL) + buf = self._backend._ffi.new("unsigned char[]", + self.algorithm.digest_size) + res = self._backend._lib.EVP_DigestFinal_ex(self._ctx, buf, + self._backend._ffi.NULL) assert res != 0 - res = self._backend.lib.EVP_MD_CTX_cleanup(self._ctx) + res = self._backend._lib.EVP_MD_CTX_cleanup(self._ctx) assert res == 1 - return self._backend.ffi.buffer(buf)[:] + return self._backend._ffi.buffer(buf)[:] @utils.register_interface(interfaces.HashContext) @@ -355,18 +363,20 @@ class _HMACContext(object): self._backend = backend if ctx is None: - ctx = self._backend.ffi.new("HMAC_CTX *") - self._backend.lib.HMAC_CTX_init(ctx) - ctx = self._backend.ffi.gc(ctx, self._backend.lib.HMAC_CTX_cleanup) - evp_md = self._backend.lib.EVP_get_digestbyname( + ctx = self._backend._ffi.new("HMAC_CTX *") + self._backend._lib.HMAC_CTX_init(ctx) + ctx = self._backend._ffi.gc( + ctx, self._backend._lib.HMAC_CTX_cleanup + ) + evp_md = self._backend._lib.EVP_get_digestbyname( algorithm.name.encode('ascii')) - if evp_md == self._backend.ffi.NULL: + if evp_md == self._backend._ffi.NULL: raise UnsupportedAlgorithm( "{0} is not a supported hash on this backend".format( algorithm.name) ) - res = self._backend.lib.Cryptography_HMAC_Init_ex( - ctx, key, len(key), evp_md, self._backend.ffi.NULL + res = self._backend._lib.Cryptography_HMAC_Init_ex( + ctx, key, len(key), evp_md, self._backend._ffi.NULL ) assert res != 0 @@ -374,12 +384,12 @@ class _HMACContext(object): self._key = key def copy(self): - copied_ctx = self._backend.ffi.new("HMAC_CTX *") - self._backend.lib.HMAC_CTX_init(copied_ctx) - copied_ctx = self._backend.ffi.gc( - copied_ctx, self._backend.lib.HMAC_CTX_cleanup + copied_ctx = self._backend._ffi.new("HMAC_CTX *") + self._backend._lib.HMAC_CTX_init(copied_ctx) + copied_ctx = self._backend._ffi.gc( + copied_ctx, self._backend._lib.HMAC_CTX_cleanup ) - res = self._backend.lib.Cryptography_HMAC_CTX_copy( + res = self._backend._lib.Cryptography_HMAC_CTX_copy( copied_ctx, self._ctx ) assert res != 0 @@ -388,20 +398,22 @@ class _HMACContext(object): ) def update(self, data): - res = self._backend.lib.Cryptography_HMAC_Update( + res = self._backend._lib.Cryptography_HMAC_Update( self._ctx, data, len(data) ) assert res != 0 def finalize(self): - buf = self._backend.ffi.new("unsigned char[]", - self.algorithm.digest_size) - buflen = self._backend.ffi.new("unsigned int *", - self.algorithm.digest_size) - res = self._backend.lib.Cryptography_HMAC_Final(self._ctx, buf, buflen) + buf = self._backend._ffi.new("unsigned char[]", + self.algorithm.digest_size) + buflen = self._backend._ffi.new("unsigned int *", + self.algorithm.digest_size) + res = self._backend._lib.Cryptography_HMAC_Final( + self._ctx, buf, buflen + ) assert res != 0 - self._backend.lib.HMAC_CTX_cleanup(self._ctx) - return self._backend.ffi.buffer(buf)[:] + self._backend._lib.HMAC_CTX_cleanup(self._ctx) + return self._backend._ffi.buffer(buf)[:] backend = Backend() |