aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.azure-pipelines/wheel-builder.yml6
-rw-r--r--setup.py1
-rw-r--r--src/cryptography/hazmat/_der.py150
-rw-r--r--src/cryptography/hazmat/backends/openssl/backend.py18
-rw-r--r--src/cryptography/hazmat/backends/openssl/decode_asn1.py24
-rw-r--r--src/cryptography/hazmat/primitives/asymmetric/utils.py26
-rw-r--r--src/cryptography/x509/extensions.py24
-rw-r--r--tests/hazmat/primitives/test_asym_utils.py9
-rw-r--r--tests/hazmat/test_der.py217
-rw-r--r--tests/x509/test_x509.py98
10 files changed, 509 insertions, 64 deletions
diff --git a/.azure-pipelines/wheel-builder.yml b/.azure-pipelines/wheel-builder.yml
index edd1dd51..b3ec8ee0 100644
--- a/.azure-pipelines/wheel-builder.yml
+++ b/.azure-pipelines/wheel-builder.yml
@@ -37,7 +37,7 @@ jobs:
displayName: Update wheel to the latest version
- script: .venv/bin/pip install -U pip==10.0.1
displayName: Downgrade pip lol
- - script: .venv/bin/pip install cffi six asn1crypto ipaddress "enum34; python_version < '3'"
+ - script: .venv/bin/pip install cffi six ipaddress "enum34; python_version < '3'"
displayName: Install our Python dependencies
- script: |
@@ -106,7 +106,7 @@ jobs:
displayName: Create virtualenv
- script: .venv/bin/pip install -U pip==10.0.1
displayName: Downgrade pip lol
- - script: .venv/bin/pip install cffi six asn1crypto ipaddress enum34
+ - script: .venv/bin/pip install cffi six ipaddress enum34
displayName: Install our Python dependencies
- script: |
set -e
@@ -207,7 +207,7 @@ jobs:
steps:
- script: '"C:/Python%PYTHON_VERSION%/python.exe" -m pip install -U pip==10.0.1'
displayName: Downgrade pip lol
- - script: '"C:/Python%PYTHON_VERSION%/Scripts/pip" install wheel cffi six asn1crypto ipaddress enum34'
+ - script: '"C:/Python%PYTHON_VERSION%/Scripts/pip" install wheel cffi six ipaddress enum34'
displayName: Install wheel and our Python dependencies
- script: |
set INCLUDE=C:/%OPENSSL_DIR%/include;%INCLUDE%
diff --git a/setup.py b/setup.py
index bf560dd9..8ca985dd 100644
--- a/setup.py
+++ b/setup.py
@@ -233,7 +233,6 @@ setup(
python_requires='>=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*',
install_requires=[
- "asn1crypto >= 0.21.0",
"six >= 1.4.1",
] + setup_requirements,
extras_require={
diff --git a/src/cryptography/hazmat/_der.py b/src/cryptography/hazmat/_der.py
new file mode 100644
index 00000000..3a121a85
--- /dev/null
+++ b/src/cryptography/hazmat/_der.py
@@ -0,0 +1,150 @@
+# This file is dual licensed under the terms of the Apache License, Version
+# 2.0, and the BSD License. See the LICENSE file in the root of this repository
+# for complete details.
+
+from __future__ import absolute_import, division, print_function
+
+import six
+
+from cryptography.utils import int_from_bytes, int_to_bytes
+
+
+# This module contains a lightweight DER encoder and decoder. See X.690 for the
+# specification. This module intentionally does not implement the more complex
+# BER encoding, only DER.
+#
+# Note this implementation treats an element's constructed bit as part of the
+# tag. This is fine for DER, where the bit is always computable from the type.
+
+
+CONSTRUCTED = 0x20
+CONTEXT_SPECIFIC = 0x80
+
+INTEGER = 0x02
+BIT_STRING = 0x03
+OCTET_STRING = 0x04
+NULL = 0x05
+OBJECT_IDENTIFIER = 0x06
+SEQUENCE = 0x10 | CONSTRUCTED
+SET = 0x11 | CONSTRUCTED
+PRINTABLE_STRING = 0x13
+UTC_TIME = 0x17
+GENERALIZED_TIME = 0x18
+
+
+class DERReader(object):
+ def __init__(self, data):
+ self.data = memoryview(data)
+
+ def is_empty(self):
+ return len(self.data) == 0
+
+ def check_empty(self):
+ if not self.is_empty():
+ raise ValueError("Invalid DER input: trailing data")
+
+ def read_byte(self):
+ if len(self.data) < 1:
+ raise ValueError("Invalid DER input: insufficient data")
+ ret = six.indexbytes(self.data, 0)
+ self.data = self.data[1:]
+ return ret
+
+ def read_bytes(self, n):
+ if len(self.data) < n:
+ raise ValueError("Invalid DER input: insufficient data")
+ ret = self.data[:n]
+ self.data = self.data[n:]
+ return ret
+
+ def read_any_element(self):
+ tag = self.read_byte()
+ # Tag numbers 31 or higher are stored in multiple bytes. No supported
+ # ASN.1 types use such tags, so reject these.
+ if tag & 0x1f == 0x1f:
+ raise ValueError("Invalid DER input: unexpected high tag number")
+ length_byte = self.read_byte()
+ if length_byte & 0x80 == 0:
+ # If the high bit is clear, the first length byte is the length.
+ length = length_byte
+ else:
+ # If the high bit is set, the first length byte encodes the length
+ # of the length.
+ length_byte &= 0x7f
+ if length_byte == 0:
+ raise ValueError(
+ "Invalid DER input: indefinite length form is not allowed "
+ "in DER"
+ )
+ length = 0
+ for i in range(length_byte):
+ length <<= 8
+ length |= self.read_byte()
+ if length == 0:
+ raise ValueError(
+ "Invalid DER input: length was not minimally-encoded"
+ )
+ if length < 0x80:
+ # If the length could have been encoded in short form, it must
+ # not use long form.
+ raise ValueError(
+ "Invalid DER input: length was not minimally-encoded"
+ )
+ body = self.read_bytes(length)
+ return tag, DERReader(body)
+
+ def read_element(self, expected_tag):
+ tag, body = self.read_any_element()
+ if tag != expected_tag:
+ raise ValueError("Invalid DER input: unexpected tag")
+ return body
+
+ def read_single_element(self, expected_tag):
+ ret = self.read_element(expected_tag)
+ self.check_empty()
+ return ret
+
+ def read_optional_element(self, expected_tag):
+ if len(self.data) > 0 and six.indexbytes(self.data, 0) == expected_tag:
+ return self.read_element(expected_tag)
+ return None
+
+ def as_integer(self):
+ if len(self.data) == 0:
+ raise ValueError("Invalid DER input: empty integer contents")
+ first = six.indexbytes(self.data, 0)
+ if first & 0x80 == 0x80:
+ raise ValueError("Negative DER integers are not supported")
+ # The first 9 bits must not all be zero or all be ones. Otherwise, the
+ # encoding should have been one byte shorter.
+ if len(self.data) > 1:
+ second = six.indexbytes(self.data, 1)
+ if first == 0 and second & 0x80 == 0:
+ raise ValueError(
+ "Invalid DER input: integer not minimally-encoded"
+ )
+ return int_from_bytes(self.data, "big")
+
+
+def encode_der_integer(x):
+ if not isinstance(x, six.integer_types):
+ raise ValueError("Value must be an integer")
+ if x < 0:
+ raise ValueError("Negative integers are not supported")
+ n = x.bit_length() // 8 + 1
+ return int_to_bytes(x, n)
+
+
+def encode_der(tag, *children):
+ length = 0
+ for child in children:
+ length += len(child)
+ chunks = [six.int2byte(tag)]
+ if length < 0x80:
+ chunks.append(six.int2byte(length))
+ else:
+ length_bytes = int_to_bytes(length)
+ chunks.append(six.int2byte(0x80 | len(length_bytes)))
+ chunks.append(length_bytes)
+ chunks.extend(children)
+ return b"".join(chunks)
diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py
index ca8b1b62..eb6654b0 100644
--- a/src/cryptography/hazmat/backends/openssl/backend.py
+++ b/src/cryptography/hazmat/backends/openssl/backend.py
@@ -10,13 +10,14 @@ import contextlib
import itertools
from contextlib import contextmanager
-import asn1crypto.core
-
import six
from six.moves import range
from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
+from cryptography.hazmat._der import (
+ INTEGER, NULL, SEQUENCE, encode_der, encode_der_integer
+)
from cryptography.hazmat.backends.interfaces import (
CMACBackend, CipherBackend, DERSerializationBackend, DHBackend, DSABackend,
EllipticCurveBackend, HMACBackend, HashBackend, PBKDF2HMACBackend,
@@ -26,7 +27,7 @@ from cryptography.hazmat.backends.openssl import aead
from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
from cryptography.hazmat.backends.openssl.cmac import _CMACContext
from cryptography.hazmat.backends.openssl.decode_asn1 import (
- _CRL_ENTRY_REASON_ENUM_TO_CODE, _Integers
+ _CRL_ENTRY_REASON_ENUM_TO_CODE
)
from cryptography.hazmat.backends.openssl.dh import (
_DHParameters, _DHPrivateKey, _DHPublicKey, _dh_params_dup
@@ -1008,12 +1009,17 @@ class Backend(object):
value = _encode_asn1_str_gc(self, extension.value.value)
return self._create_raw_x509_extension(extension, value)
elif isinstance(extension.value, x509.TLSFeature):
- asn1 = _Integers([x.value for x in extension.value]).dump()
+ asn1 = encode_der(
+ SEQUENCE,
+ *[
+ encode_der(INTEGER, encode_der_integer(x.value))
+ for x in extension.value
+ ]
+ )
value = _encode_asn1_str_gc(self, asn1)
return self._create_raw_x509_extension(extension, value)
elif isinstance(extension.value, x509.PrecertPoison):
- asn1 = asn1crypto.core.Null().dump()
- value = _encode_asn1_str_gc(self, asn1)
+ value = _encode_asn1_str_gc(self, encode_der(NULL))
return self._create_raw_x509_extension(extension, value)
else:
try:
diff --git a/src/cryptography/hazmat/backends/openssl/decode_asn1.py b/src/cryptography/hazmat/backends/openssl/decode_asn1.py
index 75d5844b..35295ce3 100644
--- a/src/cryptography/hazmat/backends/openssl/decode_asn1.py
+++ b/src/cryptography/hazmat/backends/openssl/decode_asn1.py
@@ -7,11 +7,10 @@ from __future__ import absolute_import, division, print_function
import datetime
import ipaddress
-import asn1crypto.core
-
import six
from cryptography import x509
+from cryptography.hazmat._der import DERReader, INTEGER, NULL, SEQUENCE
from cryptography.x509.extensions import _TLS_FEATURE_TYPE_TO_ENUM
from cryptography.x509.name import _ASN1_TYPE_TO_ENUM
from cryptography.x509.oid import (
@@ -20,10 +19,6 @@ from cryptography.x509.oid import (
)
-class _Integers(asn1crypto.core.SequenceOf):
- _child_spec = asn1crypto.core.Integer
-
-
def _obj2txt(backend, obj):
# Set to 80 on the recommendation of
# https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
@@ -209,20 +204,25 @@ class _X509ExtensionParser(object):
# to support them in all versions of OpenSSL so we decode them
# ourselves.
if oid == ExtensionOID.TLS_FEATURE:
+ # The extension contents are a SEQUENCE OF INTEGERs.
data = backend._lib.X509_EXTENSION_get_data(ext)
- parsed = _Integers.load(_asn1_string_to_bytes(backend, data))
+ data_bytes = _asn1_string_to_bytes(backend, data)
+ features = DERReader(data_bytes).read_single_element(SEQUENCE)
+ parsed = []
+ while not features.is_empty():
+ parsed.append(features.read_element(INTEGER).as_integer())
+ # Map the features to their enum value.
value = x509.TLSFeature(
- [_TLS_FEATURE_TYPE_TO_ENUM[x.native] for x in parsed]
+ [_TLS_FEATURE_TYPE_TO_ENUM[x] for x in parsed]
)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
continue
elif oid == ExtensionOID.PRECERT_POISON:
data = backend._lib.X509_EXTENSION_get_data(ext)
- parsed = asn1crypto.core.Null.load(
- _asn1_string_to_bytes(backend, data)
- )
- assert parsed == asn1crypto.core.Null()
+ # The contents of the extension must be an ASN.1 NULL.
+ reader = DERReader(_asn1_string_to_bytes(backend, data))
+ reader.read_single_element(NULL).check_empty()
extensions.append(x509.Extension(
oid, critical, x509.PrecertPoison()
))
diff --git a/src/cryptography/hazmat/primitives/asymmetric/utils.py b/src/cryptography/hazmat/primitives/asymmetric/utils.py
index 274c1f41..43d5b9bf 100644
--- a/src/cryptography/hazmat/primitives/asymmetric/utils.py
+++ b/src/cryptography/hazmat/primitives/asymmetric/utils.py
@@ -4,27 +4,27 @@
from __future__ import absolute_import, division, print_function
-from asn1crypto.algos import DSASignature
-
-import six
-
from cryptography import utils
+from cryptography.hazmat._der import (
+ DERReader, INTEGER, SEQUENCE, encode_der, encode_der_integer
+)
from cryptography.hazmat.primitives import hashes
def decode_dss_signature(signature):
- data = DSASignature.load(signature, strict=True).native
- return data['r'], data['s']
+ seq = DERReader(signature).read_single_element(SEQUENCE)
+ r = seq.read_element(INTEGER).as_integer()
+ s = seq.read_element(INTEGER).as_integer()
+ seq.check_empty()
+ return r, s
def encode_dss_signature(r, s):
- if (
- not isinstance(r, six.integer_types) or
- not isinstance(s, six.integer_types)
- ):
- raise ValueError("Both r and s must be integers")
-
- return DSASignature({'r': r, 's': s}).dump()
+ return encode_der(
+ SEQUENCE,
+ encode_der(INTEGER, encode_der_integer(r)),
+ encode_der(INTEGER, encode_der_integer(s)),
+ )
class Prehashed(object):
diff --git a/src/cryptography/x509/extensions.py b/src/cryptography/x509/extensions.py
index d25131b8..c78c76c2 100644
--- a/src/cryptography/x509/extensions.py
+++ b/src/cryptography/x509/extensions.py
@@ -11,11 +11,12 @@ import ipaddress
import warnings
from enum import Enum
-from asn1crypto.keys import PublicKeyInfo
-
import six
from cryptography import utils
+from cryptography.hazmat._der import (
+ BIT_STRING, DERReader, OBJECT_IDENTIFIER, SEQUENCE
+)
from cryptography.hazmat.primitives import constant_time, serialization
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
@@ -47,7 +48,24 @@ def _key_identifier_from_public_key(public_key):
serialization.PublicFormat.SubjectPublicKeyInfo
)
- data = bytes(PublicKeyInfo.load(serialized)['public_key'])
+ public_key_info = DERReader(serialized).read_single_element(SEQUENCE)
+ algorithm = public_key_info.read_element(SEQUENCE)
+ public_key = public_key_info.read_element(BIT_STRING)
+ public_key_info.check_empty()
+
+ # Double-check the algorithm structure.
+ algorithm.read_element(OBJECT_IDENTIFIER)
+ if not algorithm.is_empty():
+ # Skip the optional parameters field.
+ algorithm.read_any_element()
+ algorithm.check_empty()
+
+ # BIT STRING contents begin with the number of padding bytes added. It
+ # must be zero for SubjectPublicKeyInfo structures.
+ if public_key.read_byte() != 0:
+ raise ValueError('Invalid public key encoding')
+
+ data = public_key.data
return hashlib.sha1(data).digest()
diff --git a/tests/hazmat/primitives/test_asym_utils.py b/tests/hazmat/primitives/test_asym_utils.py
index d817c651..b49ca3f2 100644
--- a/tests/hazmat/primitives/test_asym_utils.py
+++ b/tests/hazmat/primitives/test_asym_utils.py
@@ -31,10 +31,6 @@ def test_dss_signature():
assert sig3 == b"0\x06\x02\x01\x00\x02\x01\x00"
assert decode_dss_signature(sig3) == (0, 0)
- sig4 = encode_dss_signature(-1, 0)
- assert sig4 == b"0\x06\x02\x01\xFF\x02\x01\x00"
- assert decode_dss_signature(sig4) == (-1, 0)
-
def test_encode_dss_non_integer():
with pytest.raises(ValueError):
@@ -53,6 +49,11 @@ def test_encode_dss_non_integer():
encode_dss_signature("hello", "world")
+def test_encode_dss_negative():
+ with pytest.raises(ValueError):
+ encode_dss_signature(-1, 0)
+
+
def test_decode_dss_trailing_bytes():
with pytest.raises(ValueError):
decode_dss_signature(b"0\x06\x02\x01\x01\x02\x01\x01\x00\x00\x00")
diff --git a/tests/hazmat/test_der.py b/tests/hazmat/test_der.py
new file mode 100644
index 00000000..d81c0d3e
--- /dev/null
+++ b/tests/hazmat/test_der.py
@@ -0,0 +1,217 @@
+# This file is dual licensed under the terms of the Apache License, Version
+# 2.0, and the BSD License. See the LICENSE file in the root of this repository
+# for complete details.
+
+from __future__ import absolute_import, division, print_function
+
+import pytest
+
+from cryptography.hazmat._der import (
+ DERReader, INTEGER, NULL, OCTET_STRING, SEQUENCE, encode_der,
+ encode_der_integer
+)
+
+
+def test_der_reader_basic():
+ reader = DERReader(b"123456789")
+ assert reader.read_byte() == ord(b"1")
+ assert reader.read_bytes(1).tobytes() == b"2"
+ assert reader.read_bytes(4).tobytes() == b"3456"
+
+ with pytest.raises(ValueError):
+ reader.read_bytes(4)
+
+ assert reader.read_bytes(3).tobytes() == b"789"
+
+ # The input is now empty.
+ with pytest.raises(ValueError):
+ reader.read_bytes(1)
+ with pytest.raises(ValueError):
+ reader.read_byte()
+
+
+def test_der():
+ # This input is the following structure, using
+ # https://github.com/google/der-ascii
+ #
+ # SEQUENCE {
+ # SEQUENCE {
+ # NULL {}
+ # INTEGER { 42 }
+ # OCTET_STRING { "hello" }
+ # }
+ # }
+ der = b"\x30\x0e\x30\x0c\x05\x00\x02\x01\x2a\x04\x05\x68\x65\x6c\x6c\x6f"
+ reader = DERReader(der)
+ with pytest.raises(ValueError):
+ reader.check_empty()
+
+ # Parse the outer element.
+ outer = reader.read_element(SEQUENCE)
+ reader.check_empty()
+ assert outer.data.tobytes() == der[2:]
+
+ # Parse the outer element with read_any_element.
+ reader = DERReader(der)
+ tag, outer2 = reader.read_any_element()
+ reader.check_empty()
+ assert tag == SEQUENCE
+ assert outer2.data.tobytes() == der[2:]
+
+ # Parse the outer element with read_single_element.
+ outer3 = DERReader(der).read_single_element(SEQUENCE)
+ assert outer3.data.tobytes() == der[2:]
+
+ # read_single_element rejects trailing data.
+ with pytest.raises(ValueError):
+ DERReader(der + der).read_single_element(SEQUENCE)
+
+ # Continue parsing the structure.
+ inner = outer.read_element(SEQUENCE)
+ outer.check_empty()
+
+ # Parsing a missing optional element should work.
+ assert inner.read_optional_element(INTEGER) is None
+
+ null = inner.read_element(NULL)
+ null.check_empty()
+
+ # Parsing a present optional element should work.
+ integer = inner.read_optional_element(INTEGER)
+ assert integer.as_integer() == 42
+
+ octet_string = inner.read_element(OCTET_STRING)
+ assert octet_string.data.tobytes() == b"hello"
+
+ # Parsing a missing optional element should work when the input is empty.
+ inner.check_empty()
+ assert inner.read_optional_element(INTEGER) is None
+
+ # Re-encode the same structure.
+ der2 = encode_der(
+ SEQUENCE,
+ encode_der(
+ SEQUENCE,
+ encode_der(NULL),
+ encode_der(INTEGER, encode_der_integer(42)),
+ encode_der(OCTET_STRING, b"hello"),
+ )
+ )
+ assert der2 == der
+
+
+@pytest.mark.parametrize(
+ "length,header",
+ [
+ # Single-byte lengths.
+ (0, b"\x04\x00"),
+ (1, b"\x04\x01"),
+ (2, b"\x04\x02"),
+ (127, b"\x04\x7f"),
+ # Long-form lengths.
+ (128, b"\x04\x81\x80"),
+ (129, b"\x04\x81\x81"),
+ (255, b"\x04\x81\xff"),
+ (0x100, b"\x04\x82\x01\x00"),
+ (0x101, b"\x04\x82\x01\x01"),
+ (0xffff, b"\x04\x82\xff\xff"),
+ (0x10000, b"\x04\x83\x01\x00\x00"),
+ ]
+)
+def test_der_lengths(length, header):
+ body = length * b"a"
+ der = header + body
+
+ reader = DERReader(der)
+ element = reader.read_element(OCTET_STRING)
+ reader.check_empty()
+ assert element.data.tobytes() == body
+
+ assert encode_der(OCTET_STRING, body) == der
+
+
+@pytest.mark.parametrize(
+ "bad_input",
+ [
+ # The input ended before the tag.
+ b"",
+ # The input ended before the length.
+ b"\x30",
+ # The input ended before the second byte of the length.
+ b"\x30\x81",
+ # The input ended before the body.
+ b"\x30\x01",
+ # The length used long form when it should be short form.
+ b"\x30\x81\x01\x00",
+ # The length was not minimally-encoded.
+ b"\x30\x82\x00\x80" + (0x80 * b"a"),
+ # Indefinite-length encoding is not valid DER.
+ b"\x30\x80\x00\x00"
+ # Tag number (the bottom 5 bits) 31 indicates long form tags, which we
+ # do not support.
+ b"\x1f\x00",
+ b"\x9f\x00",
+ b"\xbf\x00",
+ b"\xff\x00",
+ ]
+)
+def test_der_reader_bad_input(bad_input):
+ reader = DERReader(bad_input)
+ with pytest.raises(ValueError):
+ reader.read_any_element()
+
+
+def test_der_reader_wrong_tag():
+ reader = DERReader(b"\x04\x00")
+ with pytest.raises(ValueError):
+ reader.read_element(SEQUENCE)
+
+
+@pytest.mark.parametrize(
+ "value,der",
+ [
+ (0, b'\x00'),
+ (1, b'\x01'),
+ (2, b'\x02'),
+ (3, b'\x03'),
+ (127, b'\x7f'),
+ (128, b'\x00\x80'),
+ (0x112233445566778899aabbccddeeff,
+ b'\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff'),
+ ]
+)
+def test_integer(value, der):
+ assert encode_der_integer(value) == der
+ assert DERReader(der).as_integer() == value
+
+
+@pytest.mark.parametrize(
+ "bad_input",
+ [
+ # Zero is encoded as b"\x00", not the empty string.
+ b"",
+ # Too many leading zeros.
+ b"\x00\x00",
+ b"\x00\x7f",
+ # Too many leading ones.
+ b"\xff\xff",
+ b"\xff\x80",
+ # Negative integers are not supported.
+ b"\x80",
+ b"\x81",
+ b"\x80\x00\x00",
+ b"\xff",
+ ]
+)
+def test_invalid_integer(bad_input):
+ reader = DERReader(bad_input)
+ with pytest.raises(ValueError):
+ reader.as_integer()
+
+
+def test_invalid_integer_encode():
+ with pytest.raises(ValueError):
+ encode_der_integer(-1)
+
+ with pytest.raises(ValueError):
+ encode_der_integer("not an integer")
diff --git a/tests/x509/test_x509.py b/tests/x509/test_x509.py
index 20e23d5f..bb0ad022 100644
--- a/tests/x509/test_x509.py
+++ b/tests/x509/test_x509.py
@@ -6,12 +6,11 @@
from __future__ import absolute_import, division, print_function
import binascii
+import collections
import datetime
import ipaddress
import os
-from asn1crypto.x509 import Certificate
-
import pytest
import pytz
@@ -20,6 +19,10 @@ import six
from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm
+from cryptography.hazmat._der import (
+ BIT_STRING, CONSTRUCTED, CONTEXT_SPECIFIC, DERReader, GENERALIZED_TIME,
+ INTEGER, OBJECT_IDENTIFIER, PRINTABLE_STRING, SEQUENCE, SET, UTC_TIME
+)
from cryptography.hazmat.backends.interfaces import (
DSABackend, EllipticCurveBackend, RSABackend, X509Backend
)
@@ -65,6 +68,53 @@ def _load_cert(filename, loader, backend):
return cert
+ParsedCertificate = collections.namedtuple(
+ "ParsedCertificate",
+ ["not_before_tag", "not_after_tag", "issuer", "subject"]
+)
+
+
+def _parse_cert(der):
+ # See the Certificate structured, defined in RFC 5280.
+ cert = DERReader(der).read_single_element(SEQUENCE)
+ tbs_cert = cert.read_element(SEQUENCE)
+ # Skip outer signature algorithm
+ _ = cert.read_element(SEQUENCE)
+ # Skip signature
+ _ = cert.read_element(BIT_STRING)
+ cert.check_empty()
+
+ # Skip version
+ _ = tbs_cert.read_optional_element(CONTEXT_SPECIFIC | CONSTRUCTED | 0)
+ # Skip serialNumber
+ _ = tbs_cert.read_element(INTEGER)
+ # Skip inner signature algorithm
+ _ = tbs_cert.read_element(SEQUENCE)
+ issuer = tbs_cert.read_element(SEQUENCE)
+ validity = tbs_cert.read_element(SEQUENCE)
+ subject = tbs_cert.read_element(SEQUENCE)
+ # Skip subjectPublicKeyInfo
+ _ = tbs_cert.read_element(SEQUENCE)
+ # Skip issuerUniqueID
+ _ = tbs_cert.read_optional_element(CONTEXT_SPECIFIC | CONSTRUCTED | 1)
+ # Skip subjectUniqueID
+ _ = tbs_cert.read_optional_element(CONTEXT_SPECIFIC | CONSTRUCTED | 2)
+ # Skip extensions
+ _ = tbs_cert.read_optional_element(CONTEXT_SPECIFIC | CONSTRUCTED | 3)
+ tbs_cert.check_empty()
+
+ not_before_tag, _ = validity.read_any_element()
+ not_after_tag, _ = validity.read_any_element()
+ validity.check_empty()
+
+ return ParsedCertificate(
+ not_before_tag=not_before_tag,
+ not_after_tag=not_after_tag,
+ issuer=issuer,
+ subject=subject,
+ )
+
+
@pytest.mark.requires_backend_interface(interface=X509Backend)
class TestCertificateRevocationList(object):
def test_load_pem_crl(self, backend):
@@ -1587,12 +1637,24 @@ class TestRSACertificateRequest(object):
cert = builder.sign(issuer_private_key, hashes.SHA256(), backend)
- parsed = Certificate.load(
- cert.public_bytes(serialization.Encoding.DER))
+ parsed = _parse_cert(cert.public_bytes(serialization.Encoding.DER))
+ subject = parsed.subject
+ issuer = parsed.issuer
+
+ def read_next_rdn_value_tag(reader):
+ rdn = reader.read_element(SET)
+ attribute = rdn.read_element(SEQUENCE)
+ # Assume each RDN has a single attribute.
+ rdn.check_empty()
+
+ _ = attribute.read_element(OBJECT_IDENTIFIER)
+ tag, value = attribute.read_any_element()
+ attribute.check_empty()
+ return tag
# Check that each value was encoded as an ASN.1 PRINTABLESTRING.
- assert parsed.subject.chosen[0][0]['value'].chosen.tag == 19
- assert parsed.issuer.chosen[0][0]['value'].chosen.tag == 19
+ assert read_next_rdn_value_tag(subject) == PRINTABLE_STRING
+ assert read_next_rdn_value_tag(issuer) == PRINTABLE_STRING
if (
# This only works correctly in OpenSSL 1.1.0f+ and 1.0.2l+
backend._lib.CRYPTOGRAPHY_OPENSSL_110F_OR_GREATER or (
@@ -1600,8 +1662,8 @@ class TestRSACertificateRequest(object):
not backend._lib.CRYPTOGRAPHY_OPENSSL_110_OR_GREATER
)
):
- assert parsed.subject.chosen[1][0]['value'].chosen.tag == 19
- assert parsed.issuer.chosen[1][0]['value'].chosen.tag == 19
+ assert read_next_rdn_value_tag(subject) == PRINTABLE_STRING
+ assert read_next_rdn_value_tag(issuer) == PRINTABLE_STRING
class TestCertificateBuilder(object):
@@ -1738,13 +1800,9 @@ class TestCertificateBuilder(object):
cert = builder.sign(private_key, hashes.SHA256(), backend)
assert cert.not_valid_before == not_valid_before
assert cert.not_valid_after == not_valid_after
- parsed = Certificate.load(
- cert.public_bytes(serialization.Encoding.DER)
- )
- not_before = parsed['tbs_certificate']['validity']['not_before']
- not_after = parsed['tbs_certificate']['validity']['not_after']
- assert not_before.chosen.tag == 23 # UTCTime
- assert not_after.chosen.tag == 24 # GeneralizedTime
+ parsed = _parse_cert(cert.public_bytes(serialization.Encoding.DER))
+ assert parsed.not_before_tag == UTC_TIME
+ assert parsed.not_after_tag == GENERALIZED_TIME
@pytest.mark.requires_backend_interface(interface=RSABackend)
@pytest.mark.requires_backend_interface(interface=X509Backend)
@@ -2038,13 +2096,9 @@ class TestCertificateBuilder(object):
cert = cert_builder.sign(private_key, hashes.SHA256(), backend)
assert cert.not_valid_before == time
assert cert.not_valid_after == time
- parsed = Certificate.load(
- cert.public_bytes(serialization.Encoding.DER)
- )
- not_before = parsed['tbs_certificate']['validity']['not_before']
- not_after = parsed['tbs_certificate']['validity']['not_after']
- assert not_before.chosen.tag == 23 # UTCTime
- assert not_after.chosen.tag == 23 # UTCTime
+ parsed = _parse_cert(cert.public_bytes(serialization.Encoding.DER))
+ assert parsed.not_before_tag == UTC_TIME
+ assert parsed.not_after_tag == UTC_TIME
def test_invalid_not_valid_after(self):
with pytest.raises(TypeError):