diff options
author | Alex Stapleton <alexs@prol.etari.at> | 2014-01-21 20:19:17 +0000 |
---|---|---|
committer | Alex Stapleton <alexs@prol.etari.at> | 2014-01-24 08:11:43 +0000 |
commit | 17ed58daa9573458157b02f822f5dc471d954298 (patch) | |
tree | 4e5a4fe5516384bccac219bad8a0e4872b0b67b7 /tests/hazmat | |
parent | adbe0801270ac5ba247b78568ad42381419193d2 (diff) | |
download | cryptography-17ed58daa9573458157b02f822f5dc471d954298.tar.gz cryptography-17ed58daa9573458157b02f822f5dc471d954298.tar.bz2 cryptography-17ed58daa9573458157b02f822f5dc471d954298.zip |
Python implementation of OpenSSL locking callback
Diffstat (limited to 'tests/hazmat')
-rw-r--r-- | tests/hazmat/bindings/test_openssl.py | 106 |
1 files changed, 106 insertions, 0 deletions
diff --git a/tests/hazmat/bindings/test_openssl.py b/tests/hazmat/bindings/test_openssl.py index d1e85058..b2264fb5 100644 --- a/tests/hazmat/bindings/test_openssl.py +++ b/tests/hazmat/bindings/test_openssl.py @@ -11,6 +11,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading +import time + +import pytest + from cryptography.hazmat.bindings.openssl.binding import Binding @@ -23,3 +28,104 @@ class TestOpenSSL(object): def test_is_available(self): assert Binding.is_available() is True + + def test_crypto_lock_init(self): + b = Binding() + b.init_static_locks() + lock_cb = b.lib.CRYPTO_get_locking_callback() + assert lock_cb != b.ffi.NULL + + def test_our_crypto_lock(self, capfd): + b = Binding() + b.init_static_locks() + + # only run this test if we are using our locking cb + original_cb = b.lib.CRYPTO_get_locking_callback() + if original_cb != b._lock_cb_handle: + pytest.skip("Not using Python locking callback implementation") + + # check that the lock state changes appropriately + lock = b._locks[b.lib.CRYPTO_LOCK_SSL] + + assert lock.acquire(False) + + lock.release() + + b.lib.CRYPTO_lock( + b.lib.CRYPTO_LOCK | b.lib.CRYPTO_READ, + b.lib.CRYPTO_LOCK_SSL, + b.ffi.NULL, + 0 + ) + + assert not lock.acquire(False) + + b.lib.CRYPTO_lock( + b.lib.CRYPTO_UNLOCK | b.lib.CRYPTO_READ, + b.lib.CRYPTO_LOCK_SSL, + b.ffi.NULL, + 0 + ) + + assert lock.acquire(False) + lock.release() + + # force the error path to run. + + b.lib.CRYPTO_lock( + 0, + b.lib.CRYPTO_LOCK_SSL, + b.ffi.NULL, + 0 + ) + + lock.acquire(False) + lock.release() + + out, err = capfd.readouterr() + assert "RuntimeError: Unknown lock mode" in err + + def test_crypto_lock_mutex(self): + b = Binding() + b.init_static_locks() + + # make sure whatever locking system we end up with actually acts + # like a mutex. + + self._shared_value = 0 + + def critical_loop(): + for i in range(10): + b.lib.CRYPTO_lock( + b.lib.CRYPTO_LOCK | b.lib.CRYPTO_READ, + b.lib.CRYPTO_LOCK_SSL, + b.ffi.NULL, + 0 + ) + + assert self._shared_value == 0 + self._shared_value += 1 + time.sleep(0.01) + assert self._shared_value == 1 + self._shared_value = 0 + + b.lib.CRYPTO_lock( + b.lib.CRYPTO_UNLOCK | b.lib.CRYPTO_READ, + b.lib.CRYPTO_LOCK_SSL, + b.ffi.NULL, + 0 + ) + + threads = [] + for x in range(10): + t = threading.Thread(target=critical_loop) + t.daemon = True + t.start() + + threads.append(t) + + while threads: + for t in threads: + t.join(0.1) + if not t.is_alive(): + threads.remove(t) |