diff options
| -rw-r--r-- | .azure-pipelines/wheel-builder.yml | 6 | ||||
| -rw-r--r-- | setup.py | 1 | ||||
| -rw-r--r-- | src/cryptography/hazmat/_der.py | 150 | ||||
| -rw-r--r-- | src/cryptography/hazmat/backends/openssl/backend.py | 18 | ||||
| -rw-r--r-- | src/cryptography/hazmat/backends/openssl/decode_asn1.py | 24 | ||||
| -rw-r--r-- | src/cryptography/hazmat/primitives/asymmetric/utils.py | 26 | ||||
| -rw-r--r-- | src/cryptography/x509/extensions.py | 24 | ||||
| -rw-r--r-- | tests/hazmat/primitives/test_asym_utils.py | 9 | ||||
| -rw-r--r-- | tests/hazmat/test_der.py | 217 | ||||
| -rw-r--r-- | tests/x509/test_x509.py | 98 | 
10 files changed, 509 insertions, 64 deletions
| diff --git a/.azure-pipelines/wheel-builder.yml b/.azure-pipelines/wheel-builder.yml index edd1dd51..b3ec8ee0 100644 --- a/.azure-pipelines/wheel-builder.yml +++ b/.azure-pipelines/wheel-builder.yml @@ -37,7 +37,7 @@ jobs:              displayName: Update wheel to the latest version            - script: .venv/bin/pip install -U pip==10.0.1              displayName: Downgrade pip lol -          - script: .venv/bin/pip install cffi six asn1crypto ipaddress "enum34; python_version < '3'" +          - script: .venv/bin/pip install cffi six ipaddress "enum34; python_version < '3'"              displayName: Install our Python dependencies            - script: | @@ -106,7 +106,7 @@ jobs:              displayName: Create virtualenv            - script: .venv/bin/pip install -U pip==10.0.1              displayName: Downgrade pip lol -          - script: .venv/bin/pip install cffi six asn1crypto ipaddress enum34 +          - script: .venv/bin/pip install cffi six ipaddress enum34              displayName: Install our Python dependencies            - script: |                set -e @@ -207,7 +207,7 @@ jobs:        steps:            - script: '"C:/Python%PYTHON_VERSION%/python.exe" -m pip install -U pip==10.0.1'              displayName: Downgrade pip lol -          - script: '"C:/Python%PYTHON_VERSION%/Scripts/pip" install wheel cffi six asn1crypto ipaddress enum34' +          - script: '"C:/Python%PYTHON_VERSION%/Scripts/pip" install wheel cffi six ipaddress enum34'              displayName: Install wheel and our Python dependencies            - script: |                set INCLUDE=C:/%OPENSSL_DIR%/include;%INCLUDE% @@ -233,7 +233,6 @@ setup(      python_requires='>=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*',      install_requires=[ -        "asn1crypto >= 0.21.0",          "six >= 1.4.1",      ] + setup_requirements,      extras_require={ diff --git a/src/cryptography/hazmat/_der.py b/src/cryptography/hazmat/_der.py new file mode 100644 index 00000000..3a121a85 --- /dev/null +++ b/src/cryptography/hazmat/_der.py @@ -0,0 +1,150 @@ +# This file is dual licensed under the terms of the Apache License, Version +# 2.0, and the BSD License. See the LICENSE file in the root of this repository +# for complete details. + +from __future__ import absolute_import, division, print_function + +import six + +from cryptography.utils import int_from_bytes, int_to_bytes + + +# This module contains a lightweight DER encoder and decoder. See X.690 for the +# specification. This module intentionally does not implement the more complex +# BER encoding, only DER. +# +# Note this implementation treats an element's constructed bit as part of the +# tag. This is fine for DER, where the bit is always computable from the type. + + +CONSTRUCTED = 0x20 +CONTEXT_SPECIFIC = 0x80 + +INTEGER = 0x02 +BIT_STRING = 0x03 +OCTET_STRING = 0x04 +NULL = 0x05 +OBJECT_IDENTIFIER = 0x06 +SEQUENCE = 0x10 | CONSTRUCTED +SET = 0x11 | CONSTRUCTED +PRINTABLE_STRING = 0x13 +UTC_TIME = 0x17 +GENERALIZED_TIME = 0x18 + + +class DERReader(object): +    def __init__(self, data): +        self.data = memoryview(data) + +    def is_empty(self): +        return len(self.data) == 0 + +    def check_empty(self): +        if not self.is_empty(): +            raise ValueError("Invalid DER input: trailing data") + +    def read_byte(self): +        if len(self.data) < 1: +            raise ValueError("Invalid DER input: insufficient data") +        ret = six.indexbytes(self.data, 0) +        self.data = self.data[1:] +        return ret + +    def read_bytes(self, n): +        if len(self.data) < n: +            raise ValueError("Invalid DER input: insufficient data") +        ret = self.data[:n] +        self.data = self.data[n:] +        return ret + +    def read_any_element(self): +        tag = self.read_byte() +        # Tag numbers 31 or higher are stored in multiple bytes. No supported +        # ASN.1 types use such tags, so reject these. +        if tag & 0x1f == 0x1f: +            raise ValueError("Invalid DER input: unexpected high tag number") +        length_byte = self.read_byte() +        if length_byte & 0x80 == 0: +            # If the high bit is clear, the first length byte is the length. +            length = length_byte +        else: +            # If the high bit is set, the first length byte encodes the length +            # of the length. +            length_byte &= 0x7f +            if length_byte == 0: +                raise ValueError( +                    "Invalid DER input: indefinite length form is not allowed " +                    "in DER" +                ) +            length = 0 +            for i in range(length_byte): +                length <<= 8 +                length |= self.read_byte() +                if length == 0: +                    raise ValueError( +                        "Invalid DER input: length was not minimally-encoded" +                    ) +            if length < 0x80: +                # If the length could have been encoded in short form, it must +                # not use long form. +                raise ValueError( +                    "Invalid DER input: length was not minimally-encoded" +                ) +        body = self.read_bytes(length) +        return tag, DERReader(body) + +    def read_element(self, expected_tag): +        tag, body = self.read_any_element() +        if tag != expected_tag: +            raise ValueError("Invalid DER input: unexpected tag") +        return body + +    def read_single_element(self, expected_tag): +        ret = self.read_element(expected_tag) +        self.check_empty() +        return ret + +    def read_optional_element(self, expected_tag): +        if len(self.data) > 0 and six.indexbytes(self.data, 0) == expected_tag: +            return self.read_element(expected_tag) +        return None + +    def as_integer(self): +        if len(self.data) == 0: +            raise ValueError("Invalid DER input: empty integer contents") +        first = six.indexbytes(self.data, 0) +        if first & 0x80 == 0x80: +            raise ValueError("Negative DER integers are not supported") +        # The first 9 bits must not all be zero or all be ones. Otherwise, the +        # encoding should have been one byte shorter. +        if len(self.data) > 1: +            second = six.indexbytes(self.data, 1) +            if first == 0 and second & 0x80 == 0: +                raise ValueError( +                    "Invalid DER input: integer not minimally-encoded" +                ) +        return int_from_bytes(self.data, "big") + + +def encode_der_integer(x): +    if not isinstance(x, six.integer_types): +        raise ValueError("Value must be an integer") +    if x < 0: +        raise ValueError("Negative integers are not supported") +    n = x.bit_length() // 8 + 1 +    return int_to_bytes(x, n) + + +def encode_der(tag, *children): +    length = 0 +    for child in children: +        length += len(child) +    chunks = [six.int2byte(tag)] +    if length < 0x80: +        chunks.append(six.int2byte(length)) +    else: +        length_bytes = int_to_bytes(length) +        chunks.append(six.int2byte(0x80 | len(length_bytes))) +        chunks.append(length_bytes) +    chunks.extend(children) +    return b"".join(chunks) diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index ca8b1b62..eb6654b0 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -10,13 +10,14 @@ import contextlib  import itertools  from contextlib import contextmanager -import asn1crypto.core -  import six  from six.moves import range  from cryptography import utils, x509  from cryptography.exceptions import UnsupportedAlgorithm, _Reasons +from cryptography.hazmat._der import ( +    INTEGER, NULL, SEQUENCE, encode_der, encode_der_integer +)  from cryptography.hazmat.backends.interfaces import (      CMACBackend, CipherBackend, DERSerializationBackend, DHBackend, DSABackend,      EllipticCurveBackend, HMACBackend, HashBackend, PBKDF2HMACBackend, @@ -26,7 +27,7 @@ from cryptography.hazmat.backends.openssl import aead  from cryptography.hazmat.backends.openssl.ciphers import _CipherContext  from cryptography.hazmat.backends.openssl.cmac import _CMACContext  from cryptography.hazmat.backends.openssl.decode_asn1 import ( -    _CRL_ENTRY_REASON_ENUM_TO_CODE, _Integers +    _CRL_ENTRY_REASON_ENUM_TO_CODE  )  from cryptography.hazmat.backends.openssl.dh import (      _DHParameters, _DHPrivateKey, _DHPublicKey, _dh_params_dup @@ -1008,12 +1009,17 @@ class Backend(object):              value = _encode_asn1_str_gc(self, extension.value.value)              return self._create_raw_x509_extension(extension, value)          elif isinstance(extension.value, x509.TLSFeature): -            asn1 = _Integers([x.value for x in extension.value]).dump() +            asn1 = encode_der( +                SEQUENCE, +                *[ +                    encode_der(INTEGER, encode_der_integer(x.value)) +                    for x in extension.value +                ] +            )              value = _encode_asn1_str_gc(self, asn1)              return self._create_raw_x509_extension(extension, value)          elif isinstance(extension.value, x509.PrecertPoison): -            asn1 = asn1crypto.core.Null().dump() -            value = _encode_asn1_str_gc(self, asn1) +            value = _encode_asn1_str_gc(self, encode_der(NULL))              return self._create_raw_x509_extension(extension, value)          else:              try: diff --git a/src/cryptography/hazmat/backends/openssl/decode_asn1.py b/src/cryptography/hazmat/backends/openssl/decode_asn1.py index 75d5844b..35295ce3 100644 --- a/src/cryptography/hazmat/backends/openssl/decode_asn1.py +++ b/src/cryptography/hazmat/backends/openssl/decode_asn1.py @@ -7,11 +7,10 @@ from __future__ import absolute_import, division, print_function  import datetime  import ipaddress -import asn1crypto.core -  import six  from cryptography import x509 +from cryptography.hazmat._der import DERReader, INTEGER, NULL, SEQUENCE  from cryptography.x509.extensions import _TLS_FEATURE_TYPE_TO_ENUM  from cryptography.x509.name import _ASN1_TYPE_TO_ENUM  from cryptography.x509.oid import ( @@ -20,10 +19,6 @@ from cryptography.x509.oid import (  ) -class _Integers(asn1crypto.core.SequenceOf): -    _child_spec = asn1crypto.core.Integer - -  def _obj2txt(backend, obj):      # Set to 80 on the recommendation of      # https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values @@ -209,20 +204,25 @@ class _X509ExtensionParser(object):              # to support them in all versions of OpenSSL so we decode them              # ourselves.              if oid == ExtensionOID.TLS_FEATURE: +                # The extension contents are a SEQUENCE OF INTEGERs.                  data = backend._lib.X509_EXTENSION_get_data(ext) -                parsed = _Integers.load(_asn1_string_to_bytes(backend, data)) +                data_bytes = _asn1_string_to_bytes(backend, data) +                features = DERReader(data_bytes).read_single_element(SEQUENCE) +                parsed = [] +                while not features.is_empty(): +                    parsed.append(features.read_element(INTEGER).as_integer()) +                # Map the features to their enum value.                  value = x509.TLSFeature( -                    [_TLS_FEATURE_TYPE_TO_ENUM[x.native] for x in parsed] +                    [_TLS_FEATURE_TYPE_TO_ENUM[x] for x in parsed]                  )                  extensions.append(x509.Extension(oid, critical, value))                  seen_oids.add(oid)                  continue              elif oid == ExtensionOID.PRECERT_POISON:                  data = backend._lib.X509_EXTENSION_get_data(ext) -                parsed = asn1crypto.core.Null.load( -                    _asn1_string_to_bytes(backend, data) -                ) -                assert parsed == asn1crypto.core.Null() +                # The contents of the extension must be an ASN.1 NULL. +                reader = DERReader(_asn1_string_to_bytes(backend, data)) +                reader.read_single_element(NULL).check_empty()                  extensions.append(x509.Extension(                      oid, critical, x509.PrecertPoison()                  )) diff --git a/src/cryptography/hazmat/primitives/asymmetric/utils.py b/src/cryptography/hazmat/primitives/asymmetric/utils.py index 274c1f41..43d5b9bf 100644 --- a/src/cryptography/hazmat/primitives/asymmetric/utils.py +++ b/src/cryptography/hazmat/primitives/asymmetric/utils.py @@ -4,27 +4,27 @@  from __future__ import absolute_import, division, print_function -from asn1crypto.algos import DSASignature - -import six -  from cryptography import utils +from cryptography.hazmat._der import ( +    DERReader, INTEGER, SEQUENCE, encode_der, encode_der_integer +)  from cryptography.hazmat.primitives import hashes  def decode_dss_signature(signature): -    data = DSASignature.load(signature, strict=True).native -    return data['r'], data['s'] +    seq = DERReader(signature).read_single_element(SEQUENCE) +    r = seq.read_element(INTEGER).as_integer() +    s = seq.read_element(INTEGER).as_integer() +    seq.check_empty() +    return r, s  def encode_dss_signature(r, s): -    if ( -        not isinstance(r, six.integer_types) or -        not isinstance(s, six.integer_types) -    ): -        raise ValueError("Both r and s must be integers") - -    return DSASignature({'r': r, 's': s}).dump() +    return encode_der( +        SEQUENCE, +        encode_der(INTEGER, encode_der_integer(r)), +        encode_der(INTEGER, encode_der_integer(s)), +    )  class Prehashed(object): diff --git a/src/cryptography/x509/extensions.py b/src/cryptography/x509/extensions.py index d25131b8..c78c76c2 100644 --- a/src/cryptography/x509/extensions.py +++ b/src/cryptography/x509/extensions.py @@ -11,11 +11,12 @@ import ipaddress  import warnings  from enum import Enum -from asn1crypto.keys import PublicKeyInfo -  import six  from cryptography import utils +from cryptography.hazmat._der import ( +    BIT_STRING, DERReader, OBJECT_IDENTIFIER, SEQUENCE +)  from cryptography.hazmat.primitives import constant_time, serialization  from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey  from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey @@ -47,7 +48,24 @@ def _key_identifier_from_public_key(public_key):              serialization.PublicFormat.SubjectPublicKeyInfo          ) -        data = bytes(PublicKeyInfo.load(serialized)['public_key']) +        public_key_info = DERReader(serialized).read_single_element(SEQUENCE) +        algorithm = public_key_info.read_element(SEQUENCE) +        public_key = public_key_info.read_element(BIT_STRING) +        public_key_info.check_empty() + +        # Double-check the algorithm structure. +        algorithm.read_element(OBJECT_IDENTIFIER) +        if not algorithm.is_empty(): +            # Skip the optional parameters field. +            algorithm.read_any_element() +        algorithm.check_empty() + +        # BIT STRING contents begin with the number of padding bytes added. It +        # must be zero for SubjectPublicKeyInfo structures. +        if public_key.read_byte() != 0: +            raise ValueError('Invalid public key encoding') + +        data = public_key.data      return hashlib.sha1(data).digest() diff --git a/tests/hazmat/primitives/test_asym_utils.py b/tests/hazmat/primitives/test_asym_utils.py index d817c651..b49ca3f2 100644 --- a/tests/hazmat/primitives/test_asym_utils.py +++ b/tests/hazmat/primitives/test_asym_utils.py @@ -31,10 +31,6 @@ def test_dss_signature():      assert sig3 == b"0\x06\x02\x01\x00\x02\x01\x00"      assert decode_dss_signature(sig3) == (0, 0) -    sig4 = encode_dss_signature(-1, 0) -    assert sig4 == b"0\x06\x02\x01\xFF\x02\x01\x00" -    assert decode_dss_signature(sig4) == (-1, 0) -  def test_encode_dss_non_integer():      with pytest.raises(ValueError): @@ -53,6 +49,11 @@ def test_encode_dss_non_integer():          encode_dss_signature("hello", "world") +def test_encode_dss_negative(): +    with pytest.raises(ValueError): +        encode_dss_signature(-1, 0) + +  def test_decode_dss_trailing_bytes():      with pytest.raises(ValueError):          decode_dss_signature(b"0\x06\x02\x01\x01\x02\x01\x01\x00\x00\x00") diff --git a/tests/hazmat/test_der.py b/tests/hazmat/test_der.py new file mode 100644 index 00000000..d81c0d3e --- /dev/null +++ b/tests/hazmat/test_der.py @@ -0,0 +1,217 @@ +# This file is dual licensed under the terms of the Apache License, Version +# 2.0, and the BSD License. See the LICENSE file in the root of this repository +# for complete details. + +from __future__ import absolute_import, division, print_function + +import pytest + +from cryptography.hazmat._der import ( +    DERReader, INTEGER, NULL, OCTET_STRING, SEQUENCE, encode_der, +    encode_der_integer +) + + +def test_der_reader_basic(): +    reader = DERReader(b"123456789") +    assert reader.read_byte() == ord(b"1") +    assert reader.read_bytes(1).tobytes() == b"2" +    assert reader.read_bytes(4).tobytes() == b"3456" + +    with pytest.raises(ValueError): +        reader.read_bytes(4) + +    assert reader.read_bytes(3).tobytes() == b"789" + +    # The input is now empty. +    with pytest.raises(ValueError): +        reader.read_bytes(1) +    with pytest.raises(ValueError): +        reader.read_byte() + + +def test_der(): +    # This input is the following structure, using +    # https://github.com/google/der-ascii +    # +    # SEQUENCE { +    #   SEQUENCE { +    #     NULL {} +    #     INTEGER { 42 } +    #     OCTET_STRING { "hello" } +    #   } +    # } +    der = b"\x30\x0e\x30\x0c\x05\x00\x02\x01\x2a\x04\x05\x68\x65\x6c\x6c\x6f" +    reader = DERReader(der) +    with pytest.raises(ValueError): +        reader.check_empty() + +    # Parse the outer element. +    outer = reader.read_element(SEQUENCE) +    reader.check_empty() +    assert outer.data.tobytes() == der[2:] + +    # Parse the outer element with read_any_element. +    reader = DERReader(der) +    tag, outer2 = reader.read_any_element() +    reader.check_empty() +    assert tag == SEQUENCE +    assert outer2.data.tobytes() == der[2:] + +    # Parse the outer element with read_single_element. +    outer3 = DERReader(der).read_single_element(SEQUENCE) +    assert outer3.data.tobytes() == der[2:] + +    # read_single_element rejects trailing data. +    with pytest.raises(ValueError): +        DERReader(der + der).read_single_element(SEQUENCE) + +    # Continue parsing the structure. +    inner = outer.read_element(SEQUENCE) +    outer.check_empty() + +    # Parsing a missing optional element should work. +    assert inner.read_optional_element(INTEGER) is None + +    null = inner.read_element(NULL) +    null.check_empty() + +    # Parsing a present optional element should work. +    integer = inner.read_optional_element(INTEGER) +    assert integer.as_integer() == 42 + +    octet_string = inner.read_element(OCTET_STRING) +    assert octet_string.data.tobytes() == b"hello" + +    # Parsing a missing optional element should work when the input is empty. +    inner.check_empty() +    assert inner.read_optional_element(INTEGER) is None + +    # Re-encode the same structure. +    der2 = encode_der( +        SEQUENCE, +        encode_der( +            SEQUENCE, +            encode_der(NULL), +            encode_der(INTEGER, encode_der_integer(42)), +            encode_der(OCTET_STRING, b"hello"), +        ) +    ) +    assert der2 == der + + +@pytest.mark.parametrize( +    "length,header", +    [ +        # Single-byte lengths. +        (0, b"\x04\x00"), +        (1, b"\x04\x01"), +        (2, b"\x04\x02"), +        (127, b"\x04\x7f"), +        # Long-form lengths. +        (128, b"\x04\x81\x80"), +        (129, b"\x04\x81\x81"), +        (255, b"\x04\x81\xff"), +        (0x100, b"\x04\x82\x01\x00"), +        (0x101, b"\x04\x82\x01\x01"), +        (0xffff, b"\x04\x82\xff\xff"), +        (0x10000, b"\x04\x83\x01\x00\x00"), +    ] +) +def test_der_lengths(length, header): +    body = length * b"a" +    der = header + body + +    reader = DERReader(der) +    element = reader.read_element(OCTET_STRING) +    reader.check_empty() +    assert element.data.tobytes() == body + +    assert encode_der(OCTET_STRING, body) == der + + +@pytest.mark.parametrize( +    "bad_input", +    [ +        # The input ended before the tag. +        b"", +        # The input ended before the length. +        b"\x30", +        # The input ended before the second byte of the length. +        b"\x30\x81", +        # The input ended before the body. +        b"\x30\x01", +        # The length used long form when it should be short form. +        b"\x30\x81\x01\x00", +        # The length was not minimally-encoded. +        b"\x30\x82\x00\x80" + (0x80 * b"a"), +        # Indefinite-length encoding is not valid DER. +        b"\x30\x80\x00\x00" +        # Tag number (the bottom 5 bits) 31 indicates long form tags, which we +        # do not support. +        b"\x1f\x00", +        b"\x9f\x00", +        b"\xbf\x00", +        b"\xff\x00", +    ] +) +def test_der_reader_bad_input(bad_input): +    reader = DERReader(bad_input) +    with pytest.raises(ValueError): +        reader.read_any_element() + + +def test_der_reader_wrong_tag(): +    reader = DERReader(b"\x04\x00") +    with pytest.raises(ValueError): +        reader.read_element(SEQUENCE) + + +@pytest.mark.parametrize( +    "value,der", +    [ +        (0, b'\x00'), +        (1, b'\x01'), +        (2, b'\x02'), +        (3, b'\x03'), +        (127, b'\x7f'), +        (128, b'\x00\x80'), +        (0x112233445566778899aabbccddeeff, +         b'\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff'), +    ] +) +def test_integer(value, der): +    assert encode_der_integer(value) == der +    assert DERReader(der).as_integer() == value + + +@pytest.mark.parametrize( +    "bad_input", +    [ +        # Zero is encoded as b"\x00", not the empty string. +        b"", +        # Too many leading zeros. +        b"\x00\x00", +        b"\x00\x7f", +        # Too many leading ones. +        b"\xff\xff", +        b"\xff\x80", +        # Negative integers are not supported. +        b"\x80", +        b"\x81", +        b"\x80\x00\x00", +        b"\xff", +    ] +) +def test_invalid_integer(bad_input): +    reader = DERReader(bad_input) +    with pytest.raises(ValueError): +        reader.as_integer() + + +def test_invalid_integer_encode(): +    with pytest.raises(ValueError): +        encode_der_integer(-1) + +    with pytest.raises(ValueError): +        encode_der_integer("not an integer") diff --git a/tests/x509/test_x509.py b/tests/x509/test_x509.py index 20e23d5f..bb0ad022 100644 --- a/tests/x509/test_x509.py +++ b/tests/x509/test_x509.py @@ -6,12 +6,11 @@  from __future__ import absolute_import, division, print_function  import binascii +import collections  import datetime  import ipaddress  import os -from asn1crypto.x509 import Certificate -  import pytest  import pytz @@ -20,6 +19,10 @@ import six  from cryptography import utils, x509  from cryptography.exceptions import UnsupportedAlgorithm +from cryptography.hazmat._der import ( +    BIT_STRING, CONSTRUCTED, CONTEXT_SPECIFIC, DERReader, GENERALIZED_TIME, +    INTEGER, OBJECT_IDENTIFIER, PRINTABLE_STRING, SEQUENCE, SET, UTC_TIME +)  from cryptography.hazmat.backends.interfaces import (      DSABackend, EllipticCurveBackend, RSABackend, X509Backend  ) @@ -65,6 +68,53 @@ def _load_cert(filename, loader, backend):      return cert +ParsedCertificate = collections.namedtuple( +    "ParsedCertificate", +    ["not_before_tag", "not_after_tag", "issuer", "subject"] +) + + +def _parse_cert(der): +    # See the Certificate structured, defined in RFC 5280. +    cert = DERReader(der).read_single_element(SEQUENCE) +    tbs_cert = cert.read_element(SEQUENCE) +    # Skip outer signature algorithm +    _ = cert.read_element(SEQUENCE) +    # Skip signature +    _ = cert.read_element(BIT_STRING) +    cert.check_empty() + +    # Skip version +    _ = tbs_cert.read_optional_element(CONTEXT_SPECIFIC | CONSTRUCTED | 0) +    # Skip serialNumber +    _ = tbs_cert.read_element(INTEGER) +    # Skip inner signature algorithm +    _ = tbs_cert.read_element(SEQUENCE) +    issuer = tbs_cert.read_element(SEQUENCE) +    validity = tbs_cert.read_element(SEQUENCE) +    subject = tbs_cert.read_element(SEQUENCE) +    # Skip subjectPublicKeyInfo +    _ = tbs_cert.read_element(SEQUENCE) +    # Skip issuerUniqueID +    _ = tbs_cert.read_optional_element(CONTEXT_SPECIFIC | CONSTRUCTED | 1) +    # Skip subjectUniqueID +    _ = tbs_cert.read_optional_element(CONTEXT_SPECIFIC | CONSTRUCTED | 2) +    # Skip extensions +    _ = tbs_cert.read_optional_element(CONTEXT_SPECIFIC | CONSTRUCTED | 3) +    tbs_cert.check_empty() + +    not_before_tag, _ = validity.read_any_element() +    not_after_tag, _ = validity.read_any_element() +    validity.check_empty() + +    return ParsedCertificate( +        not_before_tag=not_before_tag, +        not_after_tag=not_after_tag, +        issuer=issuer, +        subject=subject, +    ) + +  @pytest.mark.requires_backend_interface(interface=X509Backend)  class TestCertificateRevocationList(object):      def test_load_pem_crl(self, backend): @@ -1587,12 +1637,24 @@ class TestRSACertificateRequest(object):          cert = builder.sign(issuer_private_key, hashes.SHA256(), backend) -        parsed = Certificate.load( -            cert.public_bytes(serialization.Encoding.DER)) +        parsed = _parse_cert(cert.public_bytes(serialization.Encoding.DER)) +        subject = parsed.subject +        issuer = parsed.issuer + +        def read_next_rdn_value_tag(reader): +            rdn = reader.read_element(SET) +            attribute = rdn.read_element(SEQUENCE) +            # Assume each RDN has a single attribute. +            rdn.check_empty() + +            _ = attribute.read_element(OBJECT_IDENTIFIER) +            tag, value = attribute.read_any_element() +            attribute.check_empty() +            return tag          # Check that each value was encoded as an ASN.1 PRINTABLESTRING. -        assert parsed.subject.chosen[0][0]['value'].chosen.tag == 19 -        assert parsed.issuer.chosen[0][0]['value'].chosen.tag == 19 +        assert read_next_rdn_value_tag(subject) == PRINTABLE_STRING +        assert read_next_rdn_value_tag(issuer) == PRINTABLE_STRING          if (              # This only works correctly in OpenSSL 1.1.0f+ and 1.0.2l+              backend._lib.CRYPTOGRAPHY_OPENSSL_110F_OR_GREATER or ( @@ -1600,8 +1662,8 @@ class TestRSACertificateRequest(object):                  not backend._lib.CRYPTOGRAPHY_OPENSSL_110_OR_GREATER              )          ): -            assert parsed.subject.chosen[1][0]['value'].chosen.tag == 19 -            assert parsed.issuer.chosen[1][0]['value'].chosen.tag == 19 +            assert read_next_rdn_value_tag(subject) == PRINTABLE_STRING +            assert read_next_rdn_value_tag(issuer) == PRINTABLE_STRING  class TestCertificateBuilder(object): @@ -1738,13 +1800,9 @@ class TestCertificateBuilder(object):          cert = builder.sign(private_key, hashes.SHA256(), backend)          assert cert.not_valid_before == not_valid_before          assert cert.not_valid_after == not_valid_after -        parsed = Certificate.load( -            cert.public_bytes(serialization.Encoding.DER) -        ) -        not_before = parsed['tbs_certificate']['validity']['not_before'] -        not_after = parsed['tbs_certificate']['validity']['not_after'] -        assert not_before.chosen.tag == 23  # UTCTime -        assert not_after.chosen.tag == 24  # GeneralizedTime +        parsed = _parse_cert(cert.public_bytes(serialization.Encoding.DER)) +        assert parsed.not_before_tag == UTC_TIME +        assert parsed.not_after_tag == GENERALIZED_TIME      @pytest.mark.requires_backend_interface(interface=RSABackend)      @pytest.mark.requires_backend_interface(interface=X509Backend) @@ -2038,13 +2096,9 @@ class TestCertificateBuilder(object):          cert = cert_builder.sign(private_key, hashes.SHA256(), backend)          assert cert.not_valid_before == time          assert cert.not_valid_after == time -        parsed = Certificate.load( -            cert.public_bytes(serialization.Encoding.DER) -        ) -        not_before = parsed['tbs_certificate']['validity']['not_before'] -        not_after = parsed['tbs_certificate']['validity']['not_after'] -        assert not_before.chosen.tag == 23  # UTCTime -        assert not_after.chosen.tag == 23  # UTCTime +        parsed = _parse_cert(cert.public_bytes(serialization.Encoding.DER)) +        assert parsed.not_before_tag == UTC_TIME +        assert parsed.not_after_tag == UTC_TIME      def test_invalid_not_valid_after(self):          with pytest.raises(TypeError): | 
