From 9bbfcea022820e9783e22f5a8f1fe959c9b245eb Mon Sep 17 00:00:00 2001 From: Andre Caron Date: Mon, 18 May 2015 20:55:29 -0400 Subject: Adds certificate builder. --- .../hazmat/backends/openssl/backend.py | 96 ++++++++++++++++++++++ src/cryptography/x509.py | 90 ++++++++++++++++++++ 2 files changed, 186 insertions(+) (limited to 'src') diff --git a/src/cryptography/hazmat/backends/openssl/backend.py b/src/cryptography/hazmat/backends/openssl/backend.py index 7ccb39a4..04f631f9 100644 --- a/src/cryptography/hazmat/backends/openssl/backend.py +++ b/src/cryptography/hazmat/backends/openssl/backend.py @@ -4,6 +4,7 @@ from __future__ import absolute_import, division, print_function +import calendar import collections import itertools from contextlib import contextmanager @@ -94,6 +95,22 @@ def _encode_asn1_str_gc(backend, data, length): return s +def _make_asn1_int(backend, x): + i = backend._lib.ASN1_INTEGER_new() + # i = backend._ffi.gc(i, backend._lib.ASN1_INTEGER_free) + backend._lib.ASN1_INTEGER_set(i, x) + return i + + +def _make_asn1_str(backend, x, n=None): + if n is None: + n = len(x) + s = backend._lib.ASN1_OCTET_STRING_new() + # s = backend._ffi.gc(s, backend._lib.ASN1_OCTET_STRING_free) + backend._lib.ASN1_OCTET_STRING_set(s, x, n) + return s + + def _encode_name(backend, attributes): """ The X509_NAME created will not be gc'd. Use _encode_name_gc if needed. @@ -988,6 +1005,85 @@ class Backend(object): return _CertificateSigningRequest(self, x509_req) + def sign_x509_certificate(self, builder, private_key, algorithm): + # TODO: check type of private key parameter. + if not isinstance(builder, x509.CertificateBuilder): + raise TypeError('Builder type mismatch.') + if not isinstance(algorithm, hashes.HashAlgorithm): + raise TypeError('Algorithm must be a registered hash algorithm.') + + # Resolve the signature algorithm. + evp_md = self._lib.EVP_get_digestbyname( + algorithm.name.encode('ascii') + ) + assert evp_md != self._ffi.NULL + + # Create an empty certificate. + x509_cert = self._lib.X509_new() + x509_cert = self._ffi.gc(x509_cert, backend._lib.X509_free) + + # Set the x509 version. + res = self._lib.X509_set_version(x509_cert, builder._version.value) + assert res == 1 + + # Set the subject's name. + res = self._lib.X509_set_subject_name( + x509_cert, _encode_name(self, list(builder._subject_name)) + ) + assert res == 1 + + # Set the subject's public key. + res = self._lib.X509_set_pubkey( + x509_cert, builder._public_key._evp_pkey + ) + assert res == 1 + + # Set the certificate serial number. + serial_number = _make_asn1_int(self, builder._serial_number) + self._lib.X509_set_serialNumber(x509_cert, serial_number) + + # Set the "not before" time. + res = self._lib.ASN1_TIME_set( + self._lib.X509_get_notBefore(x509_cert), + calendar.timegm(builder._not_valid_before.timetuple()) + ) + assert res != self._ffi.NULL + + # Set the "not after" time. + res = self._lib.ASN1_TIME_set( + self._lib.X509_get_notAfter(x509_cert), + calendar.timegm(builder._not_valid_after.timetuple()) + ) + assert res != self._ffi.NULL + + # Add extensions. + for i, extension in enumerate(builder._extensions): + if isinstance(extension.value, x509.BasicConstraints): + extension = _encode_basic_constraints( + self, + extension.value.ca, + extension.value.path_length, + extension.critical + ) + else: + raise ValueError('Extension not yet supported.') + res = self._lib.X509_add_ext(x509_cert, extension, i) + assert res == 1 + + # Set the issuer name. + res = self._lib.X509_set_issuer_name( + x509_cert, _encode_name(self, list(builder._issuer_name)) + ) + assert res == 1 + + # Sign the certificate with the issuer's private key. + res = self._lib.X509_sign( + x509_cert, private_key._evp_pkey, evp_md + ) + assert res > 0 + + return _Certificate(self, x509_cert) + def load_pem_private_key(self, data, password): return self._load_key( self._lib.PEM_read_bio_PrivateKey, diff --git a/src/cryptography/x509.py b/src/cryptography/x509.py index 58e1a37c..7cb42f57 100644 --- a/src/cryptography/x509.py +++ b/src/cryptography/x509.py @@ -5,6 +5,7 @@ from __future__ import absolute_import, division, print_function import abc +import datetime import ipaddress from email.utils import parseaddr from enum import Enum @@ -1594,3 +1595,92 @@ class CertificateSigningRequestBuilder(object): if self._subject_name is None: raise ValueError("A CertificateSigningRequest must have a subject") return backend.create_x509_csr(self, private_key, algorithm) + + +class CertificateBuilder(object): + def __init__(self): + """ + Creates an empty X.509 certificate (version 1). + """ + self._version = Version.v1 + self._issuer_name = None + self._subject_name = None + self._public_key = None + self._serial_number = None + self._not_valid_before = None + self._not_valid_after = None + self._extensions = [] + + def set_version(self, version): + """ + Sets the X.509 version required by decoders. + """ + if not isinstance(version, Version): + raise TypeError('Expecting x509.Version object.') + self._version = version + + def set_issuer_name(self, name): + """ + Sets the CA's distinguished name. + """ + if not isinstance(name, Name): + raise TypeError('Expecting x509.Name object.') + self._issuer_name = name + + def set_subject_name(self, name): + """ + Sets the requestor's distinguished name. + """ + if not isinstance(name, Name): + raise TypeError('Expecting x509.Name object.') + self._subject_name = name + + def set_public_key(self, public_key): + """ + Sets the requestor's public key (as found in the signing request). + """ + # TODO: check type. + self._public_key = public_key + + def set_serial_number(self, serial_number): + """ + Sets the certificate serial number. + """ + if not isinstance(serial_number, six.integer_types): + raise TypeError('Serial number must be of integral type.') + self._serial_number = serial_number + + def set_not_valid_before(self, time): + """ + Sets the certificate activation time. + """ + # TODO: require UTC datetime? + if not isinstance(time, datetime.datetime): + raise TypeError('Expecting datetime object.') + self._not_valid_before = time + + def set_not_valid_after(self, time): + """ + Sets the certificate expiration time. + """ + # TODO: require UTC datetime? + if not isinstance(time, datetime.datetime): + raise TypeError('Expecting datetime object.') + self._not_valid_after = time + + def add_extension(self, extension): + """ + Adds an X.509 extension to the certificate. + """ + if not isinstance(extension, Extension): + raise TypeError('Expecting x509.Extension object.') + for e in self._extensions: + if e.oid == extension.oid: + raise ValueError('This extension has already been set.') + self._extensions.append(extension) + + def sign(self, backend, private_key, algorithm): + """ + Signs the certificate using the CA's private key. + """ + return backend.sign_x509_certificate(self, private_key, algorithm) -- cgit v1.2.3