From c8b893f453f7ac47ff6d64ca1467099b0d1d252c Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Thu, 19 Nov 2015 15:14:47 +0100 Subject: Change password callback to use userdata pointer Instead of a closure the pem_password_cb now uses the void *userdata argument to exchange data with the callback function. It's a necessary step to port all callbacks to new static callbacks. See: #2477 Signed-off-by: Christian Heimes --- .../hazmat/backends/openssl/backend.py | 81 +++++++++++++--------- tests/hazmat/backends/test_openssl.py | 17 ++++- 2 files changed, 63 insertions(+), 35 deletions(-) diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index 768559cf..e69554f9 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -40,6 +40,7 @@ from cryptography.hazmat.backends.openssl.x509 import ( _Certificate, _CertificateRevocationList, _CertificateSigningRequest, _DISTPOINT_TYPE_FULLNAME, _DISTPOINT_TYPE_RELATIVENAME ) +from cryptography.hazmat.bindings._openssl import ffi as _ffi from cryptography.hazmat.bindings.openssl import binding from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa @@ -625,6 +626,34 @@ _EXTENSION_ENCODE_HANDLERS = { } +class _PasswordUserdata(object): + def __init__(self, password): + self.password = password + self.called = 0 + self.exception = None + + +def _pem_password_cb(buf, size, writing, userdata_handle): + ud = _ffi.from_handle(userdata_handle) + ud.called += 1 + + if not ud.password: + ud.exception = TypeError( + "Password was not given but private key is encrypted." + ) + return -1 + elif len(ud.password) < size: + pw_buf = _ffi.buffer(buf, size) + pw_buf[:len(ud.password)] = ud.password + return len(ud.password) + else: + ud.exception = ValueError( + "Passwords longer than {0} bytes are not supported " + "by this backend.".format(size - 1) + ) + return 0 + + @utils.register_interface(CipherBackend) @utils.register_interface(CMACBackend) @utils.register_interface(DERSerializationBackend) @@ -1090,37 +1119,22 @@ class Backend(object): Useful for decrypting PKCS8 files and so on. - Returns a tuple of (cdata function pointer, callback function). + Returns a tuple of (cdata function pointer, userdata). """ + # Forward compatibility for new static callbacks: + # _pem_password_cb is not a nested function because closures don't + # work well with static callbacks. Static callbacks are registered + # globally. The backend is passed in as userdata argument. - def pem_password_cb(buf, size, writing, userdata): - pem_password_cb.called += 1 + userdata = _PasswordUserdata(password=password) - if not password: - pem_password_cb.exception = TypeError( - "Password was not given but private key is encrypted." - ) - return 0 - elif len(password) < size: - pw_buf = self._ffi.buffer(buf, size) - pw_buf[:len(password)] = password - return len(password) - else: - pem_password_cb.exception = ValueError( - "Passwords longer than {0} bytes are not supported " - "by this backend.".format(size - 1) - ) - return 0 - - pem_password_cb.called = 0 - pem_password_cb.exception = None - - return ( - self._ffi.callback("int (char *, int, int, void *)", - pem_password_cb), - pem_password_cb + pem_password_cb = self._ffi.callback( + "int (char *, int, int, void *)", + _pem_password_cb, ) + return pem_password_cb, userdata + def _mgf1_hash_supported(self, algorithm): if self._lib.Cryptography_HAS_MGF1_MD: return self.hash_supported(algorithm) @@ -1626,31 +1640,32 @@ class Backend(object): def _load_key(self, openssl_read_func, convert_func, data, password): mem_bio = self._bytes_to_bio(data) - password_callback, password_func = self._pem_password_cb(password) + password_cb, userdata = self._pem_password_cb(password) + userdata_handle = self._ffi.new_handle(userdata) evp_pkey = openssl_read_func( mem_bio.bio, self._ffi.NULL, - password_callback, - self._ffi.NULL + password_cb, + userdata_handle, ) if evp_pkey == self._ffi.NULL: - if password_func.exception is not None: + if userdata.exception is not None: errors = self._consume_errors() self.openssl_assert(errors) - raise password_func.exception + raise userdata.exception else: self._handle_key_loading_error() evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) - if password is not None and password_func.called == 0: + if password is not None and userdata.called == 0: raise TypeError( "Password was given but private key is not encrypted.") assert ( - (password is not None and password_func.called == 1) or + (password is not None and userdata.called == 1) or password is None ) diff --git a/tests/hazmat/backends/test_openssl.py b/tests/hazmat/backends/test_openssl.py index 85331595..d048fe68 100644 --- a/tests/hazmat/backends/test_openssl.py +++ b/tests/hazmat/backends/test_openssl.py @@ -503,8 +503,21 @@ class TestOpenSSLSignX509Certificate(object): class TestOpenSSLSerialisationWithOpenSSL(object): def test_pem_password_cb_buffer_too_small(self): - ffi_cb, cb = backend._pem_password_cb(b"aa") - assert cb(None, 1, False, None) == 0 + ffi_cb, userdata = backend._pem_password_cb(b"aa") + handle = backend._ffi.new_handle(userdata) + buf = backend._ffi.new('char *') + assert ffi_cb(buf, 1, False, handle) == 0 + assert userdata.called == 1 + assert isinstance(userdata.exception, ValueError) + + def test_pem_password_cb(self): + password = b'abcdefg' + ffi_cb, userdata = backend._pem_password_cb(password) + handle = backend._ffi.new_handle(userdata) + buf = backend._ffi.new('char *') + assert ffi_cb(buf, len(password) + 1, False, handle) == len(password) + assert userdata.called == 1 + assert backend._ffi.string(buf, len(password)) == password def test_unsupported_evp_pkey_type(self): key = pretend.stub(type="unsupported") -- cgit v1.2.3