aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorAlex Gaynor <alex.gaynor@gmail.com>2015-06-28 11:34:02 -0400
committerAlex Gaynor <alex.gaynor@gmail.com>2015-06-28 11:34:02 -0400
commit70add5fef5274c697f79c0e5628c49540bd8e52b (patch)
tree40474bacfedc30c7890a0bc0eebaa714dfd5e45a /src
parent5432f818890a67a8c6c53ebdea47fd9dd7ec1ae9 (diff)
downloadcryptography-70add5fef5274c697f79c0e5628c49540bd8e52b.tar.gz
cryptography-70add5fef5274c697f79c0e5628c49540bd8e52b.tar.bz2
cryptography-70add5fef5274c697f79c0e5628c49540bd8e52b.zip
Initial stab at unifying the extension parsing code fro CSRs and certificates
Diffstat (limited to 'src')
-rw-r--r--src/cryptography/hazmat/backends/openssl/x509.py154
1 files changed, 75 insertions, 79 deletions
diff --git a/src/cryptography/hazmat/backends/openssl/x509.py b/src/cryptography/hazmat/backends/openssl/x509.py
index 8e361fa2..b7693bc1 100644
--- a/src/cryptography/hazmat/backends/openssl/x509.py
+++ b/src/cryptography/hazmat/backends/openssl/x509.py
@@ -163,6 +163,44 @@ def _decode_general_name(backend, gn):
)
+def _decode_ocsp_no_check(backend, ext):
+ return x509.OCSPNoCheck()
+
+
+class _X509ExtensionParser(object):
+ def __init__(self, ext_count, get_ext, handlers):
+ self.ext_count = ext_count
+ self.get_ext = get_ext
+ self.handlers = handlers
+
+ def parse(self, backend, x509_obj):
+ extensions = []
+ seen_oids = set()
+ for i in range(self.ext_count(x509_obj)):
+ ext = self.get_ext(x509_obj, i)
+ assert ext != backend._ffi.NULL
+ crit = backend._lib.X509_EXTENSION_get_critical(ext)
+ critical = crit == 1
+ oid = x509.ObjectIdentifier(_obj2txt(backend, ext.object))
+ if oid in seen_oids:
+ raise x509.DuplicateExtension(
+ "Duplicate {0} extension found".format(oid), oid
+ )
+ for handler_oid, f in self.handlers:
+ if handler_oid == oid:
+ value = f(backend, ext)
+ extensions.append(x509.Extension(oid, critical, value))
+ break
+ else:
+ if critical:
+ raise x509.UnsupportedExtension(
+ "{0} is not currently supported".format(oid), oid
+ )
+ seen_oids.add(oid)
+
+ return x509.Extensions(extensions)
+
+
@utils.register_interface(x509.Certificate)
class _Certificate(object):
def __init__(self, backend, x509):
@@ -268,58 +306,36 @@ class _Certificate(object):
@property
def extensions(self):
- extensions = []
- seen_oids = set()
- extcount = self._backend._lib.X509_get_ext_count(self._x509)
- for i in range(0, extcount):
- ext = self._backend._lib.X509_get_ext(self._x509, i)
- assert ext != self._backend._ffi.NULL
- crit = self._backend._lib.X509_EXTENSION_get_critical(ext)
- critical = crit == 1
- oid = x509.ObjectIdentifier(_obj2txt(self._backend, ext.object))
- if oid in seen_oids:
- raise x509.DuplicateExtension(
- "Duplicate {0} extension found".format(oid), oid
- )
- elif oid == x509.OID_BASIC_CONSTRAINTS:
- value = _decode_basic_constraints(self._backend, ext)
- elif oid == x509.OID_SUBJECT_KEY_IDENTIFIER:
- value = _decode_subject_key_identifier(self._backend, ext)
- elif oid == x509.OID_KEY_USAGE:
- value = _decode_key_usage(self._backend, ext)
- elif oid == x509.OID_SUBJECT_ALTERNATIVE_NAME:
- value = _decode_subject_alt_name(self._backend, ext)
- elif oid == x509.OID_EXTENDED_KEY_USAGE:
- value = _decode_extended_key_usage(self._backend, ext)
- elif oid == x509.OID_AUTHORITY_KEY_IDENTIFIER:
- value = _decode_authority_key_identifier(self._backend, ext)
- elif oid == x509.OID_AUTHORITY_INFORMATION_ACCESS:
- value = _decode_authority_information_access(
- self._backend, ext
- )
- elif oid == x509.OID_CERTIFICATE_POLICIES:
- value = _decode_certificate_policies(self._backend, ext)
- elif oid == x509.OID_CRL_DISTRIBUTION_POINTS:
- value = _decode_crl_distribution_points(self._backend, ext)
- elif oid == x509.OID_OCSP_NO_CHECK:
- value = x509.OCSPNoCheck()
- elif oid == x509.OID_INHIBIT_ANY_POLICY:
- value = _decode_inhibit_any_policy(self._backend, ext)
- elif oid == x509.OID_ISSUER_ALTERNATIVE_NAME:
- value = _decode_issuer_alt_name(self._backend, ext)
- elif critical:
- raise x509.UnsupportedExtension(
- "{0} is not currently supported".format(oid), oid
- )
- else:
- # Unsupported non-critical extension, silently skipping for now
- seen_oids.add(oid)
- continue
-
- seen_oids.add(oid)
- extensions.append(x509.Extension(oid, critical, value))
-
- return x509.Extensions(extensions)
+ return _X509ExtensionParser(
+ ext_count=self._backend._lib.X509_get_ext_count,
+ get_ext=self._backend._lib.X509_get_ext,
+ handlers=[
+ (x509.OID_BASIC_CONSTRAINTS, _decode_basic_constraints),
+ (
+ x509.OID_SUBJECT_KEY_IDENTIFIER,
+ _decode_subject_key_identifier
+ ),
+ (x509.OID_KEY_USAGE, _decode_key_usage),
+ (x509.OID_SUBJECT_ALTERNATIVE_NAME, _decode_subject_alt_name),
+ (x509.OID_EXTENDED_KEY_USAGE, _decode_extended_key_usage),
+ (
+ x509.OID_AUTHORITY_KEY_IDENTIFIER,
+ _decode_authority_key_identifier
+ ),
+ (
+ x509.OID_AUTHORITY_INFORMATION_ACCESS,
+ _decode_authority_information_access
+ ),
+ (x509.OID_CERTIFICATE_POLICIES, _decode_certificate_policies),
+ (
+ x509.OID_CRL_DISTRIBUTION_POINTS,
+ _decode_crl_distribution_points
+ ),
+ (x509.OID_OCSP_NO_CHECK, _decode_ocsp_no_check),
+ (x509.OID_INHIBIT_ANY_POLICY, _decode_inhibit_any_policy),
+ (x509.OID_ISSUER_ALTERNATIVE_NAME, _decode_issuer_alt_name),
+ ]
+ ).parse(self._backend, self._x509)
def public_bytes(self, encoding):
bio = self._backend._create_mem_bio()
@@ -704,35 +720,15 @@ class _CertificateSigningRequest(object):
@property
def extensions(self):
- extensions = []
- seen_oids = set()
x509_exts = self._backend._lib.X509_REQ_get_extensions(self._x509_req)
- extcount = self._backend._lib.sk_X509_EXTENSION_num(x509_exts)
- for i in range(0, extcount):
- ext = self._backend._lib.sk_X509_EXTENSION_value(x509_exts, i)
- assert ext != self._backend._ffi.NULL
- crit = self._backend._lib.X509_EXTENSION_get_critical(ext)
- critical = crit == 1
- oid = x509.ObjectIdentifier(_obj2txt(self._backend, ext.object))
- if oid in seen_oids:
- raise x509.DuplicateExtension(
- "Duplicate {0} extension found".format(oid), oid
- )
- elif oid == x509.OID_BASIC_CONSTRAINTS:
- value = _decode_basic_constraints(self._backend, ext)
- elif critical:
- raise x509.UnsupportedExtension(
- "{0} is not currently supported".format(oid), oid
- )
- else:
- # Unsupported non-critical extension, silently skipping for now
- seen_oids.add(oid)
- continue
- seen_oids.add(oid)
- extensions.append(x509.Extension(oid, critical, value))
-
- return x509.Extensions(extensions)
+ return _X509ExtensionParser(
+ ext_count=self._backend._lib.sk_X509_EXTENSION_num,
+ get_ext=self._backend._lib.sk_X509_EXTENSION_value,
+ handlers=[
+ (x509.OID_BASIC_CONSTRAINTS, _decode_basic_constraints),
+ ]
+ ).parse(self._backend, x509_exts)
def public_bytes(self, encoding):
bio = self._backend._create_mem_bio()