aboutsummaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorAlex Stapleton <alexs@prol.etari.at>2014-01-21 20:19:17 +0000
committerAlex Stapleton <alexs@prol.etari.at>2014-01-24 08:11:43 +0000
commit17ed58daa9573458157b02f822f5dc471d954298 (patch)
tree4e5a4fe5516384bccac219bad8a0e4872b0b67b7 /tests
parentadbe0801270ac5ba247b78568ad42381419193d2 (diff)
downloadcryptography-17ed58daa9573458157b02f822f5dc471d954298.tar.gz
cryptography-17ed58daa9573458157b02f822f5dc471d954298.tar.bz2
cryptography-17ed58daa9573458157b02f822f5dc471d954298.zip
Python implementation of OpenSSL locking callback
Diffstat (limited to 'tests')
-rw-r--r--tests/hazmat/bindings/test_openssl.py106
1 files changed, 106 insertions, 0 deletions
diff --git a/tests/hazmat/bindings/test_openssl.py b/tests/hazmat/bindings/test_openssl.py
index d1e85058..b2264fb5 100644
--- a/tests/hazmat/bindings/test_openssl.py
+++ b/tests/hazmat/bindings/test_openssl.py
@@ -11,6 +11,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import threading
+import time
+
+import pytest
+
from cryptography.hazmat.bindings.openssl.binding import Binding
@@ -23,3 +28,104 @@ class TestOpenSSL(object):
def test_is_available(self):
assert Binding.is_available() is True
+
+ def test_crypto_lock_init(self):
+ b = Binding()
+ b.init_static_locks()
+ lock_cb = b.lib.CRYPTO_get_locking_callback()
+ assert lock_cb != b.ffi.NULL
+
+ def test_our_crypto_lock(self, capfd):
+ b = Binding()
+ b.init_static_locks()
+
+ # only run this test if we are using our locking cb
+ original_cb = b.lib.CRYPTO_get_locking_callback()
+ if original_cb != b._lock_cb_handle:
+ pytest.skip("Not using Python locking callback implementation")
+
+ # check that the lock state changes appropriately
+ lock = b._locks[b.lib.CRYPTO_LOCK_SSL]
+
+ assert lock.acquire(False)
+
+ lock.release()
+
+ b.lib.CRYPTO_lock(
+ b.lib.CRYPTO_LOCK | b.lib.CRYPTO_READ,
+ b.lib.CRYPTO_LOCK_SSL,
+ b.ffi.NULL,
+ 0
+ )
+
+ assert not lock.acquire(False)
+
+ b.lib.CRYPTO_lock(
+ b.lib.CRYPTO_UNLOCK | b.lib.CRYPTO_READ,
+ b.lib.CRYPTO_LOCK_SSL,
+ b.ffi.NULL,
+ 0
+ )
+
+ assert lock.acquire(False)
+ lock.release()
+
+ # force the error path to run.
+
+ b.lib.CRYPTO_lock(
+ 0,
+ b.lib.CRYPTO_LOCK_SSL,
+ b.ffi.NULL,
+ 0
+ )
+
+ lock.acquire(False)
+ lock.release()
+
+ out, err = capfd.readouterr()
+ assert "RuntimeError: Unknown lock mode" in err
+
+ def test_crypto_lock_mutex(self):
+ b = Binding()
+ b.init_static_locks()
+
+ # make sure whatever locking system we end up with actually acts
+ # like a mutex.
+
+ self._shared_value = 0
+
+ def critical_loop():
+ for i in range(10):
+ b.lib.CRYPTO_lock(
+ b.lib.CRYPTO_LOCK | b.lib.CRYPTO_READ,
+ b.lib.CRYPTO_LOCK_SSL,
+ b.ffi.NULL,
+ 0
+ )
+
+ assert self._shared_value == 0
+ self._shared_value += 1
+ time.sleep(0.01)
+ assert self._shared_value == 1
+ self._shared_value = 0
+
+ b.lib.CRYPTO_lock(
+ b.lib.CRYPTO_UNLOCK | b.lib.CRYPTO_READ,
+ b.lib.CRYPTO_LOCK_SSL,
+ b.ffi.NULL,
+ 0
+ )
+
+ threads = []
+ for x in range(10):
+ t = threading.Thread(target=critical_loop)
+ t.daemon = True
+ t.start()
+
+ threads.append(t)
+
+ while threads:
+ for t in threads:
+ t.join(0.1)
+ if not t.is_alive():
+ threads.remove(t)