diff options
-rw-r--r-- | cryptography/hazmat/backends/commoncrypto/backend.py | 187 | ||||
-rw-r--r-- | docs/changelog.rst | 2 | ||||
-rw-r--r-- | tests/hazmat/backends/test_commoncrypto.py | 46 |
3 files changed, 233 insertions, 2 deletions
diff --git a/cryptography/hazmat/backends/commoncrypto/backend.py b/cryptography/hazmat/backends/commoncrypto/backend.py index 603edc40..8b9fe084 100644 --- a/cryptography/hazmat/backends/commoncrypto/backend.py +++ b/cryptography/hazmat/backends/commoncrypto/backend.py @@ -18,10 +18,16 @@ from collections import namedtuple from cryptography import utils from cryptography.exceptions import UnsupportedAlgorithm from cryptography.hazmat.backends.interfaces import ( - HashBackend, HMACBackend, + HashBackend, HMACBackend, CipherBackend ) from cryptography.hazmat.bindings.commoncrypto.binding import Binding from cryptography.hazmat.primitives import interfaces +from cryptography.hazmat.primitives.ciphers.algorithms import ( + AES, Blowfish, TripleDES, ARC4 +) +from cryptography.hazmat.primitives.ciphers.modes import ( + CBC, CTR, ECB, OFB, CFB +) HashMethods = namedtuple( @@ -29,6 +35,7 @@ HashMethods = namedtuple( ) +@utils.register_interface(CipherBackend) @utils.register_interface(HashBackend) @utils.register_interface(HMACBackend) class Backend(object): @@ -42,6 +49,8 @@ class Backend(object): self._ffi = self._binding.ffi self._lib = self._binding.lib + self._cipher_registry = {} + self._register_default_ciphers() self._hash_mapping = { "md5": HashMethods( "CC_MD5_CTX *", self._lib.CC_MD5_Init, @@ -100,6 +109,182 @@ class Backend(object): def create_hmac_ctx(self, key, algorithm): return _HMACContext(self, key, algorithm) + def cipher_supported(self, cipher, mode): + try: + self._cipher_registry[type(cipher), type(mode)] + except KeyError: + return False + return True + + def create_symmetric_encryption_ctx(self, cipher, mode): + return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT) + + def create_symmetric_decryption_ctx(self, cipher, mode): + return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT) + + def _register_cipher_adapter(self, cipher_cls, cipher_const, mode_cls, + mode_const): + if (cipher_cls, mode_cls) in self._cipher_registry: + raise ValueError("Duplicate registration for: {0} {1}".format( + cipher_cls, mode_cls) + ) + self._cipher_registry[cipher_cls, mode_cls] = (cipher_const, + mode_const) + + def _register_default_ciphers(self): + for mode_cls, mode_const in [ + (CBC, self._lib.kCCModeCBC), (ECB, self._lib.kCCModeECB), + (CFB, self._lib.kCCModeCFB), (OFB, self._lib.kCCModeOFB), + (CTR, self._lib.kCCModeCTR) + ]: + self._register_cipher_adapter( + AES, + self._lib.kCCAlgorithmAES128, + mode_cls, + mode_const + ) + for mode_cls, mode_const in [ + (CBC, self._lib.kCCModeCBC), (CFB, self._lib.kCCModeCFB), + (OFB, self._lib.kCCModeOFB), + ]: + self._register_cipher_adapter( + TripleDES, + self._lib.kCCAlgorithm3DES, + mode_cls, + mode_const + ) + for mode_cls, mode_const in [ + (CBC, self._lib.kCCModeCBC), (ECB, self._lib.kCCModeECB), + (CFB, self._lib.kCCModeCFB), (OFB, self._lib.kCCModeOFB), + ]: + self._register_cipher_adapter( + Blowfish, + self._lib.kCCAlgorithmBlowfish, + mode_cls, + mode_const + ) + self._register_cipher_adapter( + ARC4, + self._lib.kCCAlgorithmRC4, + type(None), + self._lib.kCCModeRC4 + ) + + def _check_response(self, response): + if response == self._lib.kCCSuccess: + return + elif response == self._lib.kCCAlignmentError: + # This error is not currently triggered due to a bug filed as + # rdar://15589470 + raise ValueError( + "The length of the provided data is not a multiple of " + "the block length" + ) + else: + raise SystemError( + "The backend returned an error. Code: {0}".format(response) + ) + + +def _release_cipher_ctx(ctx): + """ + Called by the garbage collector and used to safely dereference and + release the context. + """ + if ctx[0] != backend._ffi.NULL: + res = backend._lib.CCCryptorRelease(ctx[0]) + backend._check_response(res) + ctx[0] = backend._ffi.NULL + + +@utils.register_interface(interfaces.CipherContext) +class _CipherContext(object): + _ENCRYPT = 0 # kCCEncrypt + _DECRYPT = 1 # kCCDecrypt + + def __init__(self, backend, cipher, mode, operation): + self._backend = backend + self._cipher = cipher + self._mode = mode + self._operation = operation + # There is a bug in CommonCrypto where block ciphers do not raise + # kCCAlignmentError when finalizing if you supply non-block aligned + # data. To work around this we need to keep track of the block + # alignment ourselves, but only for alg+mode combos that require + # block alignment. OFB, CFB, and CTR make a block cipher algorithm + # into a stream cipher so we don't need to track them (and thus their + # block size is effectively 1 byte just like OpenSSL/CommonCrypto + # treat RC4 and other stream cipher block sizes). + # This bug has been filed as rdar://15589470 + self._bytes_processed = 0 + if (isinstance(cipher, interfaces.BlockCipherAlgorithm) and not + isinstance(mode, (OFB, CFB, CTR))): + self._byte_block_size = cipher.block_size // 8 + else: + self._byte_block_size = 1 + + registry = self._backend._cipher_registry + try: + cipher_enum, mode_enum = registry[type(cipher), type(mode)] + except KeyError: + raise UnsupportedAlgorithm( + "cipher {0} in {1} mode is not supported " + "by this backend".format( + cipher.name, mode.name if mode else mode) + ) + + ctx = self._backend._ffi.new("CCCryptorRef *") + ctx = self._backend._ffi.gc(ctx, _release_cipher_ctx) + + if isinstance(mode, interfaces.ModeWithInitializationVector): + iv_nonce = mode.initialization_vector + elif isinstance(mode, interfaces.ModeWithNonce): + iv_nonce = mode.nonce + else: + iv_nonce = self._backend._ffi.NULL + + if isinstance(mode, CTR): + mode_option = self._backend._lib.kCCModeOptionCTR_BE + else: + mode_option = 0 + + res = self._backend._lib.CCCryptorCreateWithMode( + operation, + mode_enum, cipher_enum, + self._backend._lib.ccNoPadding, iv_nonce, + cipher.key, len(cipher.key), + self._backend._ffi.NULL, 0, 0, mode_option, ctx) + self._backend._check_response(res) + + self._ctx = ctx + + def update(self, data): + # Count bytes processed to handle block alignment. + self._bytes_processed += len(data) + buf = self._backend._ffi.new( + "unsigned char[]", len(data) + self._byte_block_size - 1) + outlen = self._backend._ffi.new("size_t *") + res = self._backend._lib.CCCryptorUpdate( + self._ctx[0], data, len(data), buf, + len(data) + self._byte_block_size - 1, outlen) + self._backend._check_response(res) + return self._backend._ffi.buffer(buf)[:outlen[0]] + + def finalize(self): + # Raise error if block alignment is wrong. + if self._bytes_processed % self._byte_block_size: + raise ValueError( + "The length of the provided data is not a multiple of " + "the block length" + ) + buf = self._backend._ffi.new("unsigned char[]", self._byte_block_size) + outlen = self._backend._ffi.new("size_t *") + res = self._backend._lib.CCCryptorFinal( + self._ctx[0], buf, len(buf), outlen) + self._backend._check_response(res) + _release_cipher_ctx(self._ctx) + return self._backend._ffi.buffer(buf)[:outlen[0]] + @utils.register_interface(interfaces.HashContext) class _HashContext(object): diff --git a/docs/changelog.rst b/docs/changelog.rst index 819b426a..c3859c25 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -6,7 +6,7 @@ Changelog **In development** -* Added :doc:`/hazmat/backends/commoncrypto` with hash and HMAC support. +* Added :doc:`/hazmat/backends/commoncrypto`. * Added initial :doc:`/hazmat/bindings/commoncrypto`. 0.1 - 2014-01-08 diff --git a/tests/hazmat/backends/test_commoncrypto.py b/tests/hazmat/backends/test_commoncrypto.py new file mode 100644 index 00000000..68ab6bc1 --- /dev/null +++ b/tests/hazmat/backends/test_commoncrypto.py @@ -0,0 +1,46 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from cryptography.hazmat.bindings.commoncrypto.binding import Binding +from cryptography.hazmat.primitives.ciphers.algorithms import AES +from cryptography.hazmat.primitives.ciphers.modes import CBC + + +@pytest.mark.skipif(not Binding.is_available(), + reason="CommonCrypto not available") +class TestCommonCrypto(object): + def test_supports_cipher(self): + from cryptography.hazmat.backends.commoncrypto.backend import backend + assert backend.cipher_supported(None, None) is False + + def test_register_duplicate_cipher_adapter(self): + from cryptography.hazmat.backends.commoncrypto.backend import backend + with pytest.raises(ValueError): + backend._register_cipher_adapter( + AES, backend._lib.kCCAlgorithmAES128, + CBC, backend._lib.kCCModeCBC + ) + + def test_handle_response(self): + from cryptography.hazmat.backends.commoncrypto.backend import backend + + with pytest.raises(ValueError): + backend._check_response(backend._lib.kCCAlignmentError) + + with pytest.raises(SystemError): + backend._check_response(backend._lib.kCCMemoryFailure) + + with pytest.raises(SystemError): + backend._check_response(backend._lib.kCCDecodeError) |