aboutsummaryrefslogtreecommitdiffstats
path: root/src/cryptography/hazmat/backends/openssl
diff options
context:
space:
mode:
authorPaul Kehrer <paul.l.kehrer@gmail.com>2016-01-10 23:55:12 -0600
committerPaul Kehrer <paul.l.kehrer@gmail.com>2016-01-10 23:55:12 -0600
commit54baf9bd14cda0e122e9b9477d1152ee81f61195 (patch)
treea94b2292f0290fbd002ad2be9abceff42b0c5787 /src/cryptography/hazmat/backends/openssl
parent0c2feae1b35131f270a17af3a8573b5cffb54e6c (diff)
downloadcryptography-54baf9bd14cda0e122e9b9477d1152ee81f61195.tar.gz
cryptography-54baf9bd14cda0e122e9b9477d1152ee81f61195.tar.bz2
cryptography-54baf9bd14cda0e122e9b9477d1152ee81f61195.zip
move more functions out of the openssl backend class
Diffstat (limited to 'src/cryptography/hazmat/backends/openssl')
-rw-r--r--src/cryptography/hazmat/backends/openssl/backend.py50
-rw-r--r--src/cryptography/hazmat/backends/openssl/decode_asn1.py89
-rw-r--r--src/cryptography/hazmat/backends/openssl/x509.py25
3 files changed, 87 insertions, 77 deletions
diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py
index 57d36acd..4a8fda99 100644
--- a/src/cryptography/hazmat/backends/openssl/backend.py
+++ b/src/cryptography/hazmat/backends/openssl/backend.py
@@ -6,7 +6,6 @@ from __future__ import absolute_import, division, print_function
import calendar
import collections
-import datetime
import itertools
from contextlib import contextmanager
@@ -1658,55 +1657,6 @@ class Backend(object):
self.openssl_assert(res == 1)
return self._read_mem_bio(bio)
- def _asn1_integer_to_int(self, asn1_int):
- bn = self._lib.ASN1_INTEGER_to_BN(asn1_int, self._ffi.NULL)
- self.openssl_assert(bn != self._ffi.NULL)
- bn = self._ffi.gc(bn, self._lib.BN_free)
- return self._bn_to_int(bn)
-
- def _asn1_string_to_bytes(self, asn1_string):
- return self._ffi.buffer(asn1_string.data, asn1_string.length)[:]
-
- def _asn1_string_to_ascii(self, asn1_string):
- return self._asn1_string_to_bytes(asn1_string).decode("ascii")
-
- def _asn1_string_to_utf8(self, asn1_string):
- buf = self._ffi.new("unsigned char **")
- res = self._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
- self.openssl_assert(res >= 0)
- self.openssl_assert(buf[0] != self._ffi.NULL)
- buf = self._ffi.gc(
- buf, lambda buffer: self._lib.OPENSSL_free(buffer[0])
- )
- return self._ffi.buffer(buf[0], res)[:].decode('utf8')
-
- def _asn1_to_der(self, asn1_type):
- buf = self._ffi.new("unsigned char **")
- res = self._lib.i2d_ASN1_TYPE(asn1_type, buf)
- self.openssl_assert(res >= 0)
- self.openssl_assert(buf[0] != self._ffi.NULL)
- buf = self._ffi.gc(
- buf, lambda buffer: self._lib.OPENSSL_free(buffer[0])
- )
- return self._ffi.buffer(buf[0], res)[:]
-
- def _parse_asn1_time(self, asn1_time):
- self.openssl_assert(asn1_time != self._ffi.NULL)
- generalized_time = self._lib.ASN1_TIME_to_generalizedtime(
- asn1_time, self._ffi.NULL
- )
- self.openssl_assert(generalized_time != self._ffi.NULL)
- generalized_time = self._ffi.gc(
- generalized_time, self._lib.ASN1_GENERALIZEDTIME_free
- )
- return self._parse_asn1_generalized_time(generalized_time)
-
- def _parse_asn1_generalized_time(self, generalized_time):
- time = self._asn1_string_to_ascii(
- self._ffi.cast("ASN1_STRING *", generalized_time)
- )
- return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ")
-
class GetCipherByName(object):
def __init__(self, fmt):
diff --git a/src/cryptography/hazmat/backends/openssl/decode_asn1.py b/src/cryptography/hazmat/backends/openssl/decode_asn1.py
index c0cc01a8..42d6c858 100644
--- a/src/cryptography/hazmat/backends/openssl/decode_asn1.py
+++ b/src/cryptography/hazmat/backends/openssl/decode_asn1.py
@@ -4,6 +4,7 @@
from __future__ import absolute_import, division, print_function
+import datetime
import ipaddress
from email.utils import parseaddr
@@ -35,7 +36,7 @@ def _decode_x509_name_entry(backend, x509_name_entry):
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
- value = backend._asn1_string_to_utf8(data)
+ value = _asn1_string_to_utf8(backend, data)
oid = _obj2txt(backend, obj)
return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
@@ -64,7 +65,7 @@ def _decode_general_names(backend, gns):
def _decode_general_name(backend, gn):
if gn.type == backend._lib.GEN_DNS:
- data = backend._asn1_string_to_bytes(gn.d.dNSName)
+ data = _asn1_string_to_bytes(backend, gn.d.dNSName)
if not data:
decoded = u""
elif data.startswith(b"*."):
@@ -83,7 +84,7 @@ def _decode_general_name(backend, gn):
return x509.DNSName(decoded)
elif gn.type == backend._lib.GEN_URI:
- data = backend._asn1_string_to_ascii(gn.d.uniformResourceIdentifier)
+ data = _asn1_string_to_ascii(backend, gn.d.uniformResourceIdentifier)
parsed = urllib_parse.urlparse(data)
if parsed.hostname:
hostname = idna.decode(parsed.hostname)
@@ -110,7 +111,7 @@ def _decode_general_name(backend, gn):
oid = _obj2txt(backend, gn.d.registeredID)
return x509.RegisteredID(x509.ObjectIdentifier(oid))
elif gn.type == backend._lib.GEN_IPADD:
- data = backend._asn1_string_to_bytes(gn.d.iPAddress)
+ data = _asn1_string_to_bytes(backend, gn.d.iPAddress)
data_len = len(data)
if data_len == 8 or data_len == 32:
# This is an IPv4 or IPv6 Network and not a single IP. This
@@ -141,7 +142,7 @@ def _decode_general_name(backend, gn):
_decode_x509_name(backend, gn.d.directoryName)
)
elif gn.type == backend._lib.GEN_EMAIL:
- data = backend._asn1_string_to_ascii(gn.d.rfc822Name)
+ data = _asn1_string_to_ascii(backend, gn.d.rfc822Name)
name, address = parseaddr(data)
parts = address.split(u"@")
if name or not address:
@@ -160,7 +161,7 @@ def _decode_general_name(backend, gn):
)
elif gn.type == backend._lib.GEN_OTHERNAME:
type_id = _obj2txt(backend, gn.d.otherName.type_id)
- value = backend._asn1_to_der(gn.d.otherName.value)
+ value = _asn1_to_der(backend, gn.d.otherName.value)
return x509.OtherName(x509.ObjectIdentifier(type_id), value)
else:
# x400Address or ediPartyName
@@ -179,7 +180,7 @@ def _decode_ocsp_no_check(backend, ext):
def _decode_crl_number(backend, ext):
asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext)
asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free)
- return x509.CRLNumber(backend._asn1_integer_to_int(asn1_int))
+ return x509.CRLNumber(_asn1_integer_to_int(backend, asn1_int))
class _X509ExtensionParser(object):
@@ -285,10 +286,12 @@ def _decode_user_notice(backend, un):
notice_reference = None
if un.exptext != backend._ffi.NULL:
- explicit_text = backend._asn1_string_to_utf8(un.exptext)
+ explicit_text = _asn1_string_to_utf8(backend, un.exptext)
if un.noticeref != backend._ffi.NULL:
- organization = backend._asn1_string_to_utf8(un.noticeref.organization)
+ organization = _asn1_string_to_utf8(
+ backend, un.noticeref.organization
+ )
num = backend._lib.sk_ASN1_INTEGER_num(
un.noticeref.noticenos
@@ -298,7 +301,7 @@ def _decode_user_notice(backend, un):
asn1_int = backend._lib.sk_ASN1_INTEGER_value(
un.noticeref.noticenos, i
)
- notice_num = backend._asn1_integer_to_int(asn1_int)
+ notice_num = _asn1_integer_to_int(backend, asn1_int)
notice_numbers.append(notice_num)
notice_reference = x509.NoticeReference(
@@ -320,7 +323,7 @@ def _decode_basic_constraints(backend, bc_st):
if basic_constraints.pathlen == backend._ffi.NULL:
path_length = None
else:
- path_length = backend._asn1_integer_to_int(basic_constraints.pathlen)
+ path_length = _asn1_integer_to_int(backend, basic_constraints.pathlen)
return x509.BasicConstraints(ca, path_length)
@@ -353,8 +356,8 @@ def _decode_authority_key_identifier(backend, akid):
)
if akid.serial != backend._ffi.NULL:
- authority_cert_serial_number = backend._asn1_integer_to_int(
- akid.serial
+ authority_cert_serial_number = _asn1_integer_to_int(
+ backend, akid.serial
)
return x509.AuthorityKeyIdentifier(
@@ -561,7 +564,7 @@ def _decode_crl_distribution_points(backend, cdps):
def _decode_inhibit_any_policy(backend, asn1_int):
asn1_int = backend._ffi.cast("ASN1_INTEGER *", asn1_int)
asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free)
- skip_certs = backend._asn1_integer_to_int(asn1_int)
+ skip_certs = _asn1_integer_to_int(backend, asn1_int)
return x509.InhibitAnyPolicy(skip_certs)
@@ -624,7 +627,7 @@ def _decode_invalidity_date(backend, inv_date):
generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free
)
return x509.InvalidityDate(
- backend._parse_asn1_generalized_time(generalized_time)
+ _parse_asn1_generalized_time(backend, generalized_time)
)
@@ -654,6 +657,62 @@ def _decode_cert_issuer(backend, ext):
return x509.CertificateIssuer(_decode_general_names(backend, gns))
+def _asn1_to_der(backend, asn1_type):
+ buf = backend._ffi.new("unsigned char **")
+ res = backend._lib.i2d_ASN1_TYPE(asn1_type, buf)
+ backend.openssl_assert(res >= 0)
+ backend.openssl_assert(buf[0] != backend._ffi.NULL)
+ buf = backend._ffi.gc(
+ buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
+ )
+ return backend._ffi.buffer(buf[0], res)[:]
+
+
+def _asn1_integer_to_int(backend, asn1_int):
+ bn = backend._lib.ASN1_INTEGER_to_BN(asn1_int, backend._ffi.NULL)
+ backend.openssl_assert(bn != backend._ffi.NULL)
+ bn = backend._ffi.gc(bn, backend._lib.BN_free)
+ return backend._bn_to_int(bn)
+
+
+def _asn1_string_to_bytes(backend, asn1_string):
+ return backend._ffi.buffer(asn1_string.data, asn1_string.length)[:]
+
+
+def _asn1_string_to_ascii(backend, asn1_string):
+ return _asn1_string_to_bytes(backend, asn1_string).decode("ascii")
+
+
+def _asn1_string_to_utf8(backend, asn1_string):
+ buf = backend._ffi.new("unsigned char **")
+ res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
+ backend.openssl_assert(res >= 0)
+ backend.openssl_assert(buf[0] != backend._ffi.NULL)
+ buf = backend._ffi.gc(
+ buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
+ )
+ return backend._ffi.buffer(buf[0], res)[:].decode('utf8')
+
+
+def _parse_asn1_time(backend, asn1_time):
+ backend.openssl_assert(asn1_time != backend._ffi.NULL)
+ generalized_time = backend._lib.ASN1_TIME_to_generalizedtime(
+ asn1_time, backend._ffi.NULL
+ )
+ backend.openssl_assert(generalized_time != backend._ffi.NULL)
+ generalized_time = backend._ffi.gc(
+ generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free
+ )
+ return _parse_asn1_generalized_time(backend, generalized_time)
+
+
+def _parse_asn1_generalized_time(backend, generalized_time):
+ time = _asn1_string_to_ascii(
+ backend, backend._ffi.cast("ASN1_STRING *", generalized_time)
+ )
+ return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ")
+
+
_EXTENSION_HANDLERS = {
ExtensionOID.BASIC_CONSTRAINTS: _decode_basic_constraints,
ExtensionOID.SUBJECT_KEY_IDENTIFIER: _decode_subject_key_identifier,
diff --git a/src/cryptography/hazmat/backends/openssl/x509.py b/src/cryptography/hazmat/backends/openssl/x509.py
index 529ffa3e..a6f7d69e 100644
--- a/src/cryptography/hazmat/backends/openssl/x509.py
+++ b/src/cryptography/hazmat/backends/openssl/x509.py
@@ -11,7 +11,8 @@ from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.backends.openssl.decode_asn1 import (
_CERTIFICATE_EXTENSION_PARSER, _CRL_EXTENSION_PARSER,
_CSR_EXTENSION_PARSER, _REVOKED_CERTIFICATE_EXTENSION_PARSER,
- _decode_x509_name, _obj2txt,
+ _asn1_integer_to_int, _asn1_string_to_bytes, _decode_x509_name, _obj2txt,
+ _parse_asn1_time
)
from cryptography.hazmat.primitives import hashes, serialization
@@ -59,7 +60,7 @@ class _Certificate(object):
def serial(self):
asn1_int = self._backend._lib.X509_get_serialNumber(self._x509)
self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL)
- return self._backend._asn1_integer_to_int(asn1_int)
+ return _asn1_integer_to_int(self._backend, asn1_int)
def public_key(self):
pkey = self._backend._lib.X509_get_pubkey(self._x509)
@@ -75,12 +76,12 @@ class _Certificate(object):
@property
def not_valid_before(self):
asn1_time = self._backend._lib.X509_get_notBefore(self._x509)
- return self._backend._parse_asn1_time(asn1_time)
+ return _parse_asn1_time(self._backend, asn1_time)
@property
def not_valid_after(self):
asn1_time = self._backend._lib.X509_get_notAfter(self._x509)
- return self._backend._parse_asn1_time(asn1_time)
+ return _parse_asn1_time(self._backend, asn1_time)
@property
def issuer(self):
@@ -110,7 +111,7 @@ class _Certificate(object):
@property
def signature(self):
- return self._backend._asn1_string_to_bytes(self._x509.signature)
+ return _asn1_string_to_bytes(self._backend, self._x509.signature)
@property
def tbs_certificate_bytes(self):
@@ -154,12 +155,12 @@ class _RevokedCertificate(object):
def serial_number(self):
asn1_int = self._x509_revoked.serialNumber
self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL)
- return self._backend._asn1_integer_to_int(asn1_int)
+ return _asn1_integer_to_int(self._backend, asn1_int)
@property
def revocation_date(self):
- return self._backend._parse_asn1_time(
- self._x509_revoked.revocationDate)
+ return _parse_asn1_time(
+ self._backend, self._x509_revoked.revocationDate)
@property
def extensions(self):
@@ -215,17 +216,17 @@ class _CertificateRevocationList(object):
def next_update(self):
nu = self._backend._lib.X509_CRL_get_nextUpdate(self._x509_crl)
self._backend.openssl_assert(nu != self._backend._ffi.NULL)
- return self._backend._parse_asn1_time(nu)
+ return _parse_asn1_time(self._backend, nu)
@property
def last_update(self):
lu = self._backend._lib.X509_CRL_get_lastUpdate(self._x509_crl)
self._backend.openssl_assert(lu != self._backend._ffi.NULL)
- return self._backend._parse_asn1_time(lu)
+ return _parse_asn1_time(self._backend, lu)
@property
def signature(self):
- return self._backend._asn1_string_to_bytes(self._x509_crl.signature)
+ return _asn1_string_to_bytes(self._backend, self._x509_crl.signature)
@property
def tbs_certlist_bytes(self):
@@ -360,4 +361,4 @@ class _CertificateSigningRequest(object):
@property
def signature(self):
- return self._backend._asn1_string_to_bytes(self._x509_req.signature)
+ return _asn1_string_to_bytes(self._backend, self._x509_req.signature)