# This file is dual licensed under the terms of the Apache License, Version # 2.0, and the BSD License. See the LICENSE file in the root of this repository # for complete details. from __future__ import absolute_import, division, print_function import datetime import ipaddress import six from cryptography import x509 from cryptography.hazmat._der import DERReader, INTEGER, NULL, SEQUENCE from cryptography.x509.extensions import _TLS_FEATURE_TYPE_TO_ENUM from cryptography.x509.name import _ASN1_TYPE_TO_ENUM from cryptography.x509.oid import ( CRLEntryExtensionOID, CertificatePoliciesOID, ExtensionOID, OCSPExtensionOID, ) def _obj2txt(backend, obj): # Set to 80 on the recommendation of # https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values # # But OIDs longer than this occur in real life (e.g. Active # Directory makes some very long OIDs). So we need to detect # and properly handle the case where the default buffer is not # big enough. # buf_len = 80 buf = backend._ffi.new("char[]", buf_len) # 'res' is the number of bytes that *would* be written if the # buffer is large enough. If 'res' > buf_len - 1, we need to # alloc a big-enough buffer and go again. res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1) if res > buf_len - 1: # account for terminating null byte buf_len = res + 1 buf = backend._ffi.new("char[]", buf_len) res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1) backend.openssl_assert(res > 0) return backend._ffi.buffer(buf, res)[:].decode() def _decode_x509_name_entry(backend, x509_name_entry): obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry) backend.openssl_assert(obj != backend._ffi.NULL) data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry) backend.openssl_assert(data != backend._ffi.NULL) value = _asn1_string_to_utf8(backend, data) oid = _obj2txt(backend, obj) type = _ASN1_TYPE_TO_ENUM[data.type] return x509.NameAttribute(x509.ObjectIdentifier(oid), value, type) def _decode_x509_name(backend, x509_name): count = backend._lib.X509_NAME_entry_count(x509_name) attributes = [] prev_set_id = -1 for x in range(count): entry = backend._lib.X509_NAME_get_entry(x509_name, x) attribute = _decode_x509_name_entry(backend, entry) set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry) if set_id != prev_set_id: attributes.append({attribute}) else: # is in the same RDN a previous entry attributes[-1].add(attribute) prev_set_id = set_id return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes) def _decode_general_names(backend, gns): num = backend._lib.sk_GENERAL_NAME_num(gns) names = [] for i in range(num): gn = backend._lib.sk_GENERAL_NAME_value(gns, i) backend.openssl_assert(gn != backend._ffi.NULL) names.append(_decode_general_name(backend, gn)) return names def _decode_general_name(backend, gn): if gn.type == backend._lib.GEN_DNS: # Convert to bytes and then decode to utf8. We don't use # asn1_string_to_utf8 here because it doesn't properly convert # utf8 from ia5strings. data = _asn1_string_to_bytes(backend, gn.d.dNSName).decode("utf8") # We don't use the constructor for DNSName so we can bypass validation # This allows us to create DNSName objects that have unicode chars # when a certificate (against the RFC) contains them. return x509.DNSName._init_without_validation(data) elif gn.type == backend._lib.GEN_URI: # Convert to bytes and then decode to utf8. We don't use # asn1_string_to_utf8 here because it doesn't properly convert # utf8 from ia5strings. data = _asn1_string_to_bytes( backend, gn.d.uniformResourceIdentifier ).decode("utf8") # We don't use the constructor for URI so we can bypass validation # This allows us to create URI objects that have unicode chars # when a certificate (against the RFC) contains them. return x509.UniformResourceIdentifier._init_without_validation(data) elif gn.type == backend._lib.GEN_RID: oid = _obj2txt(backend, gn.d.registeredID) return x509.RegisteredID(x509.ObjectIdentifier(oid)) elif gn.type == backend._lib.GEN_IPADD: data = _asn1_string_to_bytes(backend, gn.d.iPAddress) data_len = len(data) if data_len == 8 or data_len == 32: # This is an IPv4 or IPv6 Network and not a single IP. This # type of data appears in Name Constraints. Unfortunately, # ipaddress doesn't support packed bytes + netmask. Additionally, # IPv6Network can only handle CIDR rather than the full 16 byte # netmask. To handle this we convert the netmask to integer, then # find the first 0 bit, which will be the prefix. If another 1 # bit is present after that the netmask is invalid. base = ipaddress.ip_address(data[:data_len // 2]) netmask = ipaddress.ip_address(data[data_len // 2:]) bits = bin(int(netmask))[2:] prefix = bits.find('0') # If no 0 bits are found it is a /32 or /128 if prefix == -1: prefix = len(bits) if "1" in bits[prefix:]: raise ValueError("Invalid netmask") ip = ipaddress.ip_network(base.exploded + u"/{}".format(prefix)) else: ip = ipaddress.ip_address(data) return x509.IPAddress(ip) elif gn.type == backend._lib.GEN_DIRNAME: return x509.DirectoryName( _decode_x509_name(backend, gn.d.directoryName) ) elif gn.type == backend._lib.GEN_EMAIL: # Convert to bytes and then decode to utf8. We don't use # asn1_string_to_utf8 here because it doesn't properly convert # utf8 from ia5strings. data = _asn1_string_to_bytes(backend, gn.d.rfc822Name).decode("utf8") # We don't use the constructor for RFC822Name so we can bypass # validation. This allows us to create RFC822Name objects that have # unicode chars when a certificate (against the RFC) contains them. return x509.RFC822Name._init_without_validation(data) elif gn.type == backend._lib.GEN_OTHERNAME: type_id = _obj2txt(backend, gn.d.otherName.type_id) value = _asn1_to_der(backend, gn.d.otherName.value) return x509.OtherName(x509.ObjectIdentifier(type_id), value) else: # x400Address or ediPartyName raise x509.UnsupportedGeneralNameType( "{} is not a supported type".format( x509._GENERAL_NAMES.get(gn.type, gn.type) ), gn.type ) def _decode_ocsp_no_check(backend, ext): return x509.OCSPNoCheck() def _decode_crl_number(backend, ext): asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext) asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free) return x509.CRLNumber(_asn1_integer_to_int(backend, asn1_int)) def _decode_delta_crl_indicator(backend, ext): asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext) asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free) return x509.DeltaCRLIndicator(_asn1_integer_to_int(backend, asn1_int)) class _X509ExtensionParser(object): def __init__(self, ext_count, get_ext, handlers): self.ext_count = ext_count self.get_ext = get_ext self.handlers = handlers def parse(self, backend, x509_obj): extensions = [] seen_oids = set() for i in range(self.ext_count(backend, x509_obj)): ext = self.get_ext(backend, x509_obj, i) backend.openssl_assert(ext != backend._ffi.NULL) crit = backend._lib.X509_EXTENSION_get_critical(ext) critical = crit == 1 oid = x509.ObjectIdentifier( _obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext)) ) if oid in seen_oids: raise x509.DuplicateExtension( "Duplicate {} extension found".format(oid), oid ) # These OIDs are only supported in OpenSSL 1.1.0+ but we want # to support them in all versions of OpenSSL so we decode them # ourselves. if oid == ExtensionOID.TLS_FEATURE: # The extension contents are a SEQUENCE OF INTEGERs. data = backend._lib.X509_EXTENSION_get_data(ext) data_bytes = _asn1_string_to_bytes(backend, data) features = DERReader(data_bytes).read_single_element(SEQUENCE) parsed = [] while not features.is_empty(): parsed.append(features.read_element(INTEGER).as_integer()) # Map the features to their enum value. value = x509.TLSFeature( [_TLS_FEATURE_TYPE_TO_ENUM[x] for x in parsed] ) extensions.append(x509.Extension(oid, critical, value)) seen_oids.add(oid) continue elif oid == ExtensionOID.PRECERT_POISON: data = backend._lib.X509_EXTENSION_get_data(ext) # The contents of the extension must be an ASN.1 NULL. reader = DERReader(_asn1_string_to_bytes(backend, data)) reader.read_single_element(NULL).check_empty() extensions.append(x509.Extension( oid, critical, x509.PrecertPoison() )) seen_oids.add(oid) continue try: handler = self.handlers[oid] except KeyError: # Dump the DER payload into an UnrecognizedExtension object data = backend._lib.X509_EXTENSION_get_data(ext) backend.openssl_assert(data != backend._ffi.NULL) der = backend._ffi.buffer(data.data, data.length)[:] unrecognized = x509.UnrecognizedExtension(oid, der) extensions.append( x509.Extension(oid, critical, unrecognized) ) else: ext_data = backend._lib.X509V3_EXT_d2i(ext) if ext_data == backend._ffi.NULL: backend._consume_errors() raise ValueError( "The {} extension is invalid and can't be " "parsed".format(oid) ) value = handler(backend, ext_data) extensions.append(x509.Extension(oid, critical, value)) seen_oids.add(oid) return x509.Extensions(extensions) def _decode_certificate_policies(backend, cp): cp = backend._ffi.cast("Cryptography_STACK_OF_POLICYINFO *", cp) cp = backend._ffi.gc(cp, backend._lib.CERTIFICATEPOLICIES_free) num = backend._lib.sk_POLICYINFO_num(cp) certificate_policies = [] for i in range(num): qualifiers = None pi = backend._lib.sk_POLICYINFO_value(cp, i) oid = x509.ObjectIdentifier(_obj2txt(backend, pi.policyid)) if pi.qualifiers != backend._ffi.NULL: qnum = backend._lib.sk_POLICYQUALINFO_num(pi.qualifiers) qualifiers = [] for j in range(qnum): pqi = backend._lib.sk_POLICYQUALINFO_value( pi.qualifiers, j ) pqualid = x509.ObjectIdentifier( _obj2txt(backend, pqi.pqualid) ) if pqualid == CertificatePoliciesOID.CPS_QUALIFIER: cpsuri = backend._ffi.buffer( pqi.d.cpsuri.data, pqi.d.cpsuri.length )[:].decode('ascii') qualifiers.append(cpsuri) else: assert pqualid == CertificatePoliciesOID.CPS_USER_NOTICE user_notice = _decode_user_notice( backend, pqi.d.usernotice ) qualifiers.append(user_notice) certificate_policies.append( x509.PolicyInformation(oid, qualifiers) ) return x509.CertificatePolicies(certificate_policies) def _decode_user_notice(backend, un): explicit_text = None notice_reference = None if un.exptext != backend._ffi.NULL: explicit_text = _asn1_string_to_utf8(backend, un.exptext) if un.noticeref != backend._ffi.NULL: organization = _asn1_string_to_utf8( backend, un.noticeref.organization ) num = backend._lib.sk_ASN1_INTEGER_num( un.noticeref.noticenos ) notice_numbers = [] for i in range(num): asn1_int = backend._lib.sk_ASN1_INTEGER_value( un.noticeref.noticenos, i ) notice_num = _asn1_integer_to_int(backend, asn1_int) notice_numbers.append(notice_num) notice_reference = x509.NoticeReference( organization, notice_numbers ) return x509.UserNotice(notice_reference, explicit_text) def _decode_basic_constraints(backend, bc_st): basic_constraints = backend._ffi.cast("BASIC_CONSTRAINTS *", bc_st) basic_constraints = backend._ffi.gc( basic_constraints, backend._lib.BASIC_CONSTRAINTS_free ) # The byte representation of an ASN.1 boolean true is \xff. OpenSSL # chooses to just map this to its ordinal value, so true is 255 and # false is 0. ca = basic_constraints.ca == 255 path_length = _asn1_integer_to_int_or_none( backend, basic_constraints.pathlen ) return x509.BasicConstraints(ca, path_length) def _decode_subject_key_identifier(backend, asn1_string): asn1_string = backend._ffi.cast("ASN1_OCTET_STRING *", asn1_string) asn1_string = backend._ffi.gc( asn1_string, backend._lib.ASN1_OCTET_STRING_free ) return x509.SubjectKeyIdentifier( backend._ffi.buffer(asn1_string.data, asn1_string.length)[:] ) def _decode_authority_key_identifier(backend, akid): akid = backend._ffi.cast("AUTHORITY_KEYID *", akid) akid = backend._ffi.gc(akid, backend._lib.AUTHORITY_KEYID_free) key_identifier = None authority_cert_issuer = None if akid.keyid != backend._ffi.NULL: key_identifier = backend._ffi.buffer( akid.keyid.data, akid.keyid.length )[:] if akid.issuer != backend._ffi.NULL: authority_cert_issuer = _decode_general_names( backend, akid.issuer ) authority_cert_serial_number = _asn1_integer_to_int_or_none( backend, akid.serial ) return x509.AuthorityKeyIdentifier( key_identifier, authority_cert_issuer, authority_cert_serial_number ) def _decode_authority_information_access(backend, aia): aia = backend._ffi.cast("Cryptography_STACK_OF_ACCESS_DESCRIPTION *", aia) aia = backend._ffi.gc( aia, lambda x: backend._lib.sk_ACCESS_DESCRIPTION_pop_free( x, backend._ffi.addressof( backend._lib._original_lib, "ACCESS_DESCRIPTION_free" ) ) ) num = backend._lib.sk_ACCESS_DESCRIPTION_num(aia) access_descriptions = [] for i in range(num): ad = backend._lib.sk_ACCESS_DESCRIPTION_value(aia, i) backend.openssl_assert(ad.method != backend._ffi.NULL) oid = x509.ObjectIdentifier(_obj2txt(backend, ad.method)) backend.openssl_assert(ad.location != backend._ffi.NULL) gn = _decode_general_name(backend, ad.location) access_descriptions.append(x509.AccessDescription(oid, gn)) return x509.AuthorityInformationAccess(access_descriptions) def _decode_key_usage(backend, bit_string): bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string) bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free) get_bit = backend._lib.ASN1_BIT_STRING_get_bit digital_signature = get_bit(bit_string, 0) == 1 content_commitment = get_bit(bit_string, 1) == 1 key_encipherment = get_bit(bit_string, 2) == 1 data_encipherment = get_bit(bit_string, 3) == 1 key_agreement = get_bit(bit_string, 4) == 1 key_cert_sign = get_bit(bit_string, 5) == 1 crl_sign = get_bit(bit_string, 6) == 1 encipher_only = get_bit(bit_string, 7) == 1 decipher_only = get_bit(bit_string, 8) == 1 return x509.KeyUsage( digital_signature, content_commitment, key_encipherment, data_encipherment, key_agreement, key_cert_sign, crl_sign, encipher_only, decipher_only ) def _decode_general_names_extension(backend, gns): gns = backend._ffi.cast("GENERAL_NAMES *", gns) gns = backend._ffi.gc(gns, backend._lib.GENERAL_NAMES_free) general_names = _decode_general_names(backend, gns) return general_names def _decode_subject_alt_name(backend, ext): return x509.SubjectAlternativeName( _decode_general_names_extension(backend, ext) ) def _decode_issuer_alt_name(backend, ext): return x509.IssuerAlternativeName( _decode_general_names_extension(backend, ext) ) def _decode_name_constraints(backend, nc): nc = backend._ffi.cast("NAME_CONSTRAINTS *", nc) nc = backend._ffi.gc(nc, backend._lib.NAME_CONSTRAINTS_free) permitted = _decode_general_subtrees(backend, nc.permittedSubtrees) excluded = _decode_general_subtrees(backend, nc.excludedSubtrees) return x509.NameConstraints( permitted_subtrees=permitted, excluded_subtrees=excluded ) def _decode_general_subtrees(backend, stack_subtrees): if stack_subtrees == backend._ffi.NULL: return None num = backend._lib.sk_GENERAL_SUBTREE_num(stack_subtrees) subtrees = [] for i in range(num): obj = backend._lib.sk_GENERAL_SUBTREE_value(stack_subtrees, i) backend.openssl_assert(obj != backend._ffi.NULL) name = _decode_general_name(backend, obj.base) subtrees.append(name) return subtrees def _decode_issuing_dist_point(backend, idp): idp = backend._ffi.cast("ISSUING_DIST_POINT *", idp) idp = backend._ffi.gc(idp, backend._lib.ISSUING_DIST_POINT_free) if idp.distpoint != backend._ffi.NULL: full_name, relative_name = _decode_distpoint(backend, idp.distpoint) else: full_name = None relative_name = None only_user = idp.onlyuser == 255 only_ca = idp.onlyCA == 255 indirect_crl = idp.indirectCRL == 255 only_attr = idp.onlyattr == 255 if idp.onlysomereasons != backend._ffi.NULL: only_some_reasons = _decode_reasons(backend, idp.onlysomereasons) else: only_some_reasons = None return x509.IssuingDistributionPoint( full_name, relative_name, only_user, only_ca, only_some_reasons, indirect_crl, only_attr ) def _decode_policy_constraints(backend, pc): pc = backend._ffi.cast("POLICY_CONSTRAINTS *", pc) pc = backend._ffi.gc(pc, backend._lib.POLICY_CONSTRAINTS_free) require_explicit_policy = _asn1_integer_to_int_or_none( backend, pc.requireExplicitPolicy ) inhibit_policy_mapping = _asn1_integer_to_int_or_none( backend, pc.inhibitPolicyMapping ) return x509.PolicyConstraints( require_explicit_policy, inhibit_policy_mapping ) def _decode_extended_key_usage(backend, sk): sk = backend._ffi.cast("Cryptography_STACK_OF_ASN1_OBJECT *", sk) sk = backend._ffi.gc(sk, backend._lib.sk_ASN1_OBJECT_free) num = backend._lib.sk_ASN1_OBJECT_num(sk) ekus = [] for i in range(num): obj = backend._lib.sk_ASN1_OBJECT_value(sk, i) backend.openssl_assert(obj != backend._ffi.NULL) oid = x509.ObjectIdentifier(_obj2txt(backend, obj)) ekus.append(oid) return x509.ExtendedKeyUsage(ekus) _DISTPOINT_TYPE_FULLNAME = 0 _DISTPOINT_TYPE_RELATIVENAME = 1 def _decode_dist_points(backend, cdps): cdps = backend._ffi.cast("Cryptography_STACK_OF_DIST_POINT *", cdps) cdps = backend._ffi.gc(cdps, backend._lib.CRL_DIST_POINTS_free) num = backend._lib.sk_DIST_POINT_num(cdps) dist_points = [] for i in range(num): full_name = None relative_name = None crl_issuer = None reasons = None cdp = backend._lib.sk_DIST_POINT_value(cdps, i) if cdp.reasons != backend._ffi.NULL: reasons = _decode_reasons(backend, cdp.reasons) if cdp.CRLissuer != backend._ffi.NULL: crl_issuer = _decode_general_names(backend, cdp.CRLissuer) # Certificates may have a crl_issuer/reasons and no distribution # point so make sure it's not null. if cdp.distpoint != backend._ffi.NULL: full_name, relative_name = _decode_distpoint( backend, cdp.distpoint ) dist_points.append( x509.DistributionPoint( full_name, relative_name, reasons, crl_issuer ) ) return dist_points # ReasonFlags ::= BIT STRING { # unused (0), # keyCompromise (1), # cACompromise (2), # affiliationChanged (3), # superseded (4), # cessationOfOperation (5), # certificateHold (6), # privilegeWithdrawn (7), # aACompromise (8) } _REASON_BIT_MAPPING = { 1: x509.ReasonFlags.key_compromise, 2: x509.ReasonFlags.ca_compromise, 3: x509.ReasonFlags.affiliation_changed, 4: x509.ReasonFlags.superseded, 5: x509.ReasonFlags.cessation_of_operation, 6: x509.ReasonFlags.certificate_hold, 7: x509.ReasonFlags.privilege_withdrawn, 8: x509.ReasonFlags.aa_compromise, } def _decode_reasons(backend, reasons): # We will check each bit from RFC 5280 enum_reasons = [] for bit_position, reason in six.iteritems(_REASON_BIT_MAPPING): if backend._lib.ASN1_BIT_STRING_get_bit(reasons, bit_position): enum_reasons.append(reason) return frozenset(enum_reasons) def _decode_distpoint(backend, distpoint): if distpoint.type == _DISTPOINT_TYPE_FULLNAME: full_name = _decode_general_names(backend, distpoint.name.fullname) return full_name, None # OpenSSL code doesn't test for a specific type for # relativename, everything that isn't fullname is considered # relativename. Per RFC 5280: # # DistributionPointName ::= CHOICE { # fullName [0] GeneralNames, # nameRelativeToCRLIssuer [1] RelativeDistinguishedName } rns = distpoint.name.relativename rnum = backend._lib.sk_X509_NAME_ENTRY_num(rns) attributes = set() for i in range(rnum): rn = backend._lib.sk_X509_NAME_ENTRY_value( rns, i ) backend.openssl_assert(rn != backend._ffi.NULL) attributes.add( _decode_x509_name_entry(backend, rn) ) relative_name = x509.RelativeDistinguishedName(attributes) return None, relative_name def _decode_crl_distribution_points(backend, cdps): dist_points = _decode_dist_points(backend, cdps) return x509.CRLDistributionPoints(dist_points) def _decode_freshest_crl(backend, cdps): dist_points = _decode_dist_points(backend, cdps) return x509.FreshestCRL(dist_points) def _decode_inhibit_any_policy(backend, asn1_int): asn1_int = backend._ffi.cast("ASN1_INTEGER *", asn1_int) asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free) skip_certs = _asn1_integer_to_int(backend, asn1_int) return x509.InhibitAnyPolicy(skip_certs) def _decode_precert_signed_certificate_timestamps(backend, asn1_scts): from cryptography.hazmat.backends.openssl.x509 import ( _SignedCertificateTimestamp ) asn1_scts = backend._ffi.cast("Cryptography_STACK_OF_SCT *", asn1_scts) asn1_scts = backend._ffi.gc(asn1_scts, backend._lib.SCT_LIST_free) scts = [] for i in range(backend._lib.sk_SCT_num(asn1_scts)): sct = backend._lib.sk_SCT_value(asn1_scts, i) scts.append(_SignedCertificateTimestamp(backend, asn1_scts, sct)) return x509.PrecertificateSignedCertificateTimestamps(scts) # CRLReason ::= ENUMERATED { # unspecified (0), # keyCompromise (1), # cACompromise (2), # affiliationChanged (3), # superseded (4), # cessationOfOperation (5), # certificateHold (6), # -- value 7 is not used # removeFromCRL (8), # privilegeWithdrawn (9), # aACompromise (10) } _CRL_ENTRY_REASON_CODE_TO_ENUM = { 0: x509.ReasonFlags.unspecified, 1: x509.ReasonFlags.key_compromise, 2: x509.ReasonFlags.ca_compromise, 3: x509.ReasonFlags.affiliation_changed, 4: x509.ReasonFlags.superseded, 5: x509.ReasonFlags.cessation_of_operation, 6: x509.ReasonFlags.certificate_hold, 8: x509.ReasonFlags.remove_from_crl, 9: x509.ReasonFlags.privilege_withdrawn, 10: x509.ReasonFlags.aa_compromise, } _CRL_ENTRY_REASON_ENUM_TO_CODE = { x509.ReasonFlags.unspecified: 0, x509.ReasonFlags.key_compromise: 1, x509.ReasonFlags.ca_compromise: 2, x509.ReasonFlags.affiliation_changed: 3, x509.ReasonFlags.superseded: 4, x509.ReasonFlags.cessation_of_operation: 5, x509.ReasonFlags.certificate_hold: 6, x509.ReasonFlags.remove_from_crl: 8, x509.ReasonFlags.privilege_withdrawn: 9, x509.ReasonFlags.aa_compromise: 10 } def _decode_crl_reason(backend, enum): enum = backend._ffi.cast("ASN1_ENUMERATED *", enum) enum = backend._ffi.gc(enum, backend._lib.ASN1_ENUMERATED_free) code = backend._lib.ASN1_ENUMERATED_get(enum) try: return x509.CRLReason(_CRL_ENTRY_REASON_CODE_TO_ENUM[code]) except KeyError: raise ValueError("Unsupported reason code: {}".format(code)) def _decode_invalidity_date(backend, inv_date): generalized_time = backend._ffi.cast( "ASN1_GENERALIZEDTIME *", inv_date ) generalized_time = backend._ffi.gc( generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free ) return x509.InvalidityDate( _parse_asn1_generalized_time(backend, generalized_time) ) def _decode_cert_issuer(backend, gns): gns = backend._ffi.cast("GENERAL_NAMES *", gns) gns = backend._ffi.gc(gns, backend._lib.GENERAL_NAMES_free) general_names = _decode_general_names(backend, gns) return x509.CertificateIssuer(general_names) def _asn1_to_der(backend, asn1_type): buf = backend._ffi.new("unsigned char **") res = backend._lib.i2d_ASN1_TYPE(asn1_type, buf) backend.openssl_assert(res >= 0) backend.openssl_assert(buf[0] != backend._ffi.NULL) buf = backend._ffi.gc( buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0]) ) return backend._ffi.buffer(buf[0], res)[:] def _asn1_integer_to_int(backend, asn1_int): bn = backend._lib.ASN1_INTEGER_to_BN(asn1_int, backend._ffi.NULL) backend.openssl_assert(bn != backend._ffi.NULL) bn = backend._ffi.gc(bn, backend._lib.BN_free) return backend._bn_to_int(bn) def _asn1_integer_to_int_or_none(backend, asn1_int): if asn1_int == backend._ffi.NULL: return None else: return _asn1_integer_to_int(backend, asn1_int) def _asn1_string_to_bytes(backend, asn1_string): return backend._ffi.buffer(asn1_string.data, asn1_string.length)[:] def _asn1_string_to_ascii(backend, asn1_string): return _asn1_string_to_bytes(backend, asn1_string).decode("ascii") def _asn1_string_to_utf8(backend, asn1_string): buf = backend._ffi.new("unsigned char **") res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string) if res == -1: raise ValueError( "Unsupported ASN1 string type. Type: {}".format(asn1_string.type) ) backend.openssl_assert(buf[0] != backend._ffi.NULL) buf = backend._ffi.gc( buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0]) ) return backend._ffi.buffer(buf[0], res)[:].decode('utf8') def _parse_asn1_time(backend, asn1_time): backend.openssl_assert(asn1_time != backend._ffi.NULL) generalized_time = backend._lib.ASN1_TIME_to_generalizedtime( asn1_time, backend._ffi.NULL ) if generalized_time == backend._ffi.NULL: raise ValueError( "Couldn't parse ASN.1 time as generalizedtime {!r}".format( _asn1_string_to_bytes(backend, asn1_time) ) ) generalized_time = backend._ffi.gc( generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free ) return _parse_asn1_generalized_time(backend, generalized_time) def _parse_asn1_generalized_time(backend, generalized_time): time = _asn1_string_to_ascii( backend, backend._ffi.cast("ASN1_STRING *", generalized_time) ) return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ") def _decode_nonce(backend, nonce): nonce = backend._ffi.cast("ASN1_OCTET_STRING *", nonce) nonce = backend._ffi.gc(nonce, backend._lib.ASN1_OCTET_STRING_free) return x509.OCSPNonce(_asn1_string_to_bytes(backend, nonce)) _EXTENSION_HANDLERS_NO_SCT = { ExtensionOID.BASIC_CONSTRAINTS: _decode_basic_constraints, ExtensionOID.SUBJECT_KEY_IDENTIFIER: _decode_subject_key_identifier, ExtensionOID.KEY_USAGE: _decode_key_usage, ExtensionOID.SUBJECT_ALTERNATIVE_NAME: _decode_subject_alt_name, ExtensionOID.EXTENDED_KEY_USAGE: _decode_extended_key_usage, ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _decode_authority_key_identifier, ExtensionOID.AUTHORITY_INFORMATION_ACCESS: ( _decode_authority_information_access ), ExtensionOID.CERTIFICATE_POLICIES: _decode_certificate_policies, ExtensionOID.CRL_DISTRIBUTION_POINTS: _decode_crl_distribution_points, ExtensionOID.FRESHEST_CRL: _decode_freshest_crl, ExtensionOID.OCSP_NO_CHECK: _decode_ocsp_no_check, ExtensionOID.INHIBIT_ANY_POLICY: _decode_inhibit_any_policy, ExtensionOID.ISSUER_ALTERNATIVE_NAME: _decode_issuer_alt_name, ExtensionOID.NAME_CONSTRAINTS: _decode_name_constraints, ExtensionOID.POLICY_CONSTRAINTS: _decode_policy_constraints, } _EXTENSION_HANDLERS = _EXTENSION_HANDLERS_NO_SCT.copy() _EXTENSION_HANDLERS[ ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS ] = _decode_precert_signed_certificate_timestamps _REVOKED_EXTENSION_HANDLERS = { CRLEntryExtensionOID.CRL_REASON: _decode_crl_reason, CRLEntryExtensionOID.INVALIDITY_DATE: _decode_invalidity_date, CRLEntryExtensionOID.CERTIFICATE_ISSUER: _decode_cert_issuer, } _CRL_EXTENSION_HANDLERS = { ExtensionOID.CRL_NUMBER: _decode_crl_number, ExtensionOID.DELTA_CRL_INDICATOR: _decode_delta_crl_indicator, ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _decode_authority_key_identifier, ExtensionOID.ISSUER_ALTERNATIVE_NAME: _decode_issuer_alt_name, ExtensionOID.AUTHORITY_INFORMATION_ACCESS: ( _decode_authority_information_access ), ExtensionOID.ISSUING_DISTRIBUTION_POINT: _decode_issuing_dist_point, ExtensionOID.FRESHEST_CRL: _decode_freshest_crl, } _OCSP_REQ_EXTENSION_HANDLERS = { OCSPExtensionOID.NONCE: _decode_nonce, } _OCSP_BASICRESP_EXTENSION_HANDLERS = { OCSPExtensionOID.NONCE: _decode_nonce, } # All revoked extensions are valid single response extensions, see: # https://tools.ietf.org/html/rfc6960#section-4.4.5 _OCSP_SINGLERESP_EXTENSION_HANDLERS = _REVOKED_EXTENSION_HANDLERS.copy() _CERTIFICATE_EXTENSION_PARSER_NO_SCT = _X509ExtensionParser( ext_count=lambda backend, x: backend._lib.X509_get_ext_count(x), get_ext=lambda backend, x, i: backend._lib.X509_get_ext(x, i), handlers=_EXTENSION_HANDLERS_NO_SCT ) _CERTIFICATE_EXTENSION_PARSER = _X509ExtensionParser( ext_count=lambda backend, x: backend._lib.X509_get_ext_count(x), get_ext=lambda backend, x, i: backend._lib.X509_get_ext(x, i), handlers=_EXTENSION_HANDLERS ) _CSR_EXTENSION_PARSER = _X509ExtensionParser( ext_count=lambda backend, x: backend._lib.sk_X509_EXTENSION_num(x), get_ext=lambda backend, x, i: backend._lib.sk_X509_EXTENSION_value(x, i), handlers=_EXTENSION_HANDLERS ) _REVOKED_CERTIFICATE_EXTENSION_PARSER = _X509ExtensionParser( ext_count=lambda backend, x: backend._lib.X509_REVOKED_get_ext_count(x), get_ext=lambda backend, x, i: backend._lib.X509_REVOKED_get_ext(x, i), handlers=_REVOKED_EXTENSION_HANDLERS, ) _CRL_EXTENSION_PARSER = _X509ExtensionParser( ext_count=lambda backend, x: backend._lib.X509_CRL_get_ext_count(x), get_ext=lambda backend, x, i: backend._lib.X509_CRL_get_ext(x, i), handlers=_CRL_EXTENSION_HANDLERS, ) _OCSP_REQ_EXT_PARSER = _X509ExtensionParser( ext_count=lambda backend, x: backend._lib.OCSP_REQUEST_get_ext_count(x), get_ext=lambda backend, x, i: backend._lib.OCSP_REQUEST_get_ext(x, i), handlers=_OCSP_REQ_EXTENSION_HANDLERS, ) _OCSP_BASICRESP_EXT_PARSER = _X509ExtensionParser( ext_count=lambda backend, x: backend._lib.OCSP_BASICRESP_get_ext_count(x), get_ext=lambda backend, x, i: backend._lib.OCSP_BASICRESP_get_ext(x, i), handlers=_OCSP_BASICRESP_EXTENSION_HANDLERS, ) _OCSP_SINGLERESP_EXT_PARSER = _X509ExtensionParser( ext_count=lambda backend, x: backend._lib.OCSP_SINGLERESP_get_ext_count(x), get_ext=lambda backend, x, i: backend._lib.OCSP_SINGLERESP_get_ext(x, i), handlers=_OCSP_SINGLERESP_EXTENSION_HANDLERS, )