aboutsummaryrefslogtreecommitdiffstats
path: root/tests/hazmat/bindings/test_openssl.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/hazmat/bindings/test_openssl.py')
-rw-r--r--tests/hazmat/bindings/test_openssl.py106
1 files changed, 106 insertions, 0 deletions
diff --git a/tests/hazmat/bindings/test_openssl.py b/tests/hazmat/bindings/test_openssl.py
index d1e85058..b2264fb5 100644
--- a/tests/hazmat/bindings/test_openssl.py
+++ b/tests/hazmat/bindings/test_openssl.py
@@ -11,6 +11,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import threading
+import time
+
+import pytest
+
from cryptography.hazmat.bindings.openssl.binding import Binding
@@ -23,3 +28,104 @@ class TestOpenSSL(object):
def test_is_available(self):
assert Binding.is_available() is True
+
+ def test_crypto_lock_init(self):
+ b = Binding()
+ b.init_static_locks()
+ lock_cb = b.lib.CRYPTO_get_locking_callback()
+ assert lock_cb != b.ffi.NULL
+
+ def test_our_crypto_lock(self, capfd):
+ b = Binding()
+ b.init_static_locks()
+
+ # only run this test if we are using our locking cb
+ original_cb = b.lib.CRYPTO_get_locking_callback()
+ if original_cb != b._lock_cb_handle:
+ pytest.skip("Not using Python locking callback implementation")
+
+ # check that the lock state changes appropriately
+ lock = b._locks[b.lib.CRYPTO_LOCK_SSL]
+
+ assert lock.acquire(False)
+
+ lock.release()
+
+ b.lib.CRYPTO_lock(
+ b.lib.CRYPTO_LOCK | b.lib.CRYPTO_READ,
+ b.lib.CRYPTO_LOCK_SSL,
+ b.ffi.NULL,
+ 0
+ )
+
+ assert not lock.acquire(False)
+
+ b.lib.CRYPTO_lock(
+ b.lib.CRYPTO_UNLOCK | b.lib.CRYPTO_READ,
+ b.lib.CRYPTO_LOCK_SSL,
+ b.ffi.NULL,
+ 0
+ )
+
+ assert lock.acquire(False)
+ lock.release()
+
+ # force the error path to run.
+
+ b.lib.CRYPTO_lock(
+ 0,
+ b.lib.CRYPTO_LOCK_SSL,
+ b.ffi.NULL,
+ 0
+ )
+
+ lock.acquire(False)
+ lock.release()
+
+ out, err = capfd.readouterr()
+ assert "RuntimeError: Unknown lock mode" in err
+
+ def test_crypto_lock_mutex(self):
+ b = Binding()
+ b.init_static_locks()
+
+ # make sure whatever locking system we end up with actually acts
+ # like a mutex.
+
+ self._shared_value = 0
+
+ def critical_loop():
+ for i in range(10):
+ b.lib.CRYPTO_lock(
+ b.lib.CRYPTO_LOCK | b.lib.CRYPTO_READ,
+ b.lib.CRYPTO_LOCK_SSL,
+ b.ffi.NULL,
+ 0
+ )
+
+ assert self._shared_value == 0
+ self._shared_value += 1
+ time.sleep(0.01)
+ assert self._shared_value == 1
+ self._shared_value = 0
+
+ b.lib.CRYPTO_lock(
+ b.lib.CRYPTO_UNLOCK | b.lib.CRYPTO_READ,
+ b.lib.CRYPTO_LOCK_SSL,
+ b.ffi.NULL,
+ 0
+ )
+
+ threads = []
+ for x in range(10):
+ t = threading.Thread(target=critical_loop)
+ t.daemon = True
+ t.start()
+
+ threads.append(t)
+
+ while threads:
+ for t in threads:
+ t.join(0.1)
+ if not t.is_alive():
+ threads.remove(t)