From 17ed58daa9573458157b02f822f5dc471d954298 Mon Sep 17 00:00:00 2001 From: Alex Stapleton Date: Tue, 21 Jan 2014 20:19:17 +0000 Subject: Python implementation of OpenSSL locking callback --- tests/hazmat/bindings/test_openssl.py | 106 ++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) (limited to 'tests') diff --git a/tests/hazmat/bindings/test_openssl.py b/tests/hazmat/bindings/test_openssl.py index d1e85058..b2264fb5 100644 --- a/tests/hazmat/bindings/test_openssl.py +++ b/tests/hazmat/bindings/test_openssl.py @@ -11,6 +11,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading +import time + +import pytest + from cryptography.hazmat.bindings.openssl.binding import Binding @@ -23,3 +28,104 @@ class TestOpenSSL(object): def test_is_available(self): assert Binding.is_available() is True + + def test_crypto_lock_init(self): + b = Binding() + b.init_static_locks() + lock_cb = b.lib.CRYPTO_get_locking_callback() + assert lock_cb != b.ffi.NULL + + def test_our_crypto_lock(self, capfd): + b = Binding() + b.init_static_locks() + + # only run this test if we are using our locking cb + original_cb = b.lib.CRYPTO_get_locking_callback() + if original_cb != b._lock_cb_handle: + pytest.skip("Not using Python locking callback implementation") + + # check that the lock state changes appropriately + lock = b._locks[b.lib.CRYPTO_LOCK_SSL] + + assert lock.acquire(False) + + lock.release() + + b.lib.CRYPTO_lock( + b.lib.CRYPTO_LOCK | b.lib.CRYPTO_READ, + b.lib.CRYPTO_LOCK_SSL, + b.ffi.NULL, + 0 + ) + + assert not lock.acquire(False) + + b.lib.CRYPTO_lock( + b.lib.CRYPTO_UNLOCK | b.lib.CRYPTO_READ, + b.lib.CRYPTO_LOCK_SSL, + b.ffi.NULL, + 0 + ) + + assert lock.acquire(False) + lock.release() + + # force the error path to run. + + b.lib.CRYPTO_lock( + 0, + b.lib.CRYPTO_LOCK_SSL, + b.ffi.NULL, + 0 + ) + + lock.acquire(False) + lock.release() + + out, err = capfd.readouterr() + assert "RuntimeError: Unknown lock mode" in err + + def test_crypto_lock_mutex(self): + b = Binding() + b.init_static_locks() + + # make sure whatever locking system we end up with actually acts + # like a mutex. + + self._shared_value = 0 + + def critical_loop(): + for i in range(10): + b.lib.CRYPTO_lock( + b.lib.CRYPTO_LOCK | b.lib.CRYPTO_READ, + b.lib.CRYPTO_LOCK_SSL, + b.ffi.NULL, + 0 + ) + + assert self._shared_value == 0 + self._shared_value += 1 + time.sleep(0.01) + assert self._shared_value == 1 + self._shared_value = 0 + + b.lib.CRYPTO_lock( + b.lib.CRYPTO_UNLOCK | b.lib.CRYPTO_READ, + b.lib.CRYPTO_LOCK_SSL, + b.ffi.NULL, + 0 + ) + + threads = [] + for x in range(10): + t = threading.Thread(target=critical_loop) + t.daemon = True + t.start() + + threads.append(t) + + while threads: + for t in threads: + t.join(0.1) + if not t.is_alive(): + threads.remove(t) -- cgit v1.2.3