diff options
author | Paul Kehrer <paul.l.kehrer@gmail.com> | 2018-10-29 05:36:34 +0800 |
---|---|---|
committer | Alex Gaynor <alex.gaynor@gmail.com> | 2018-10-28 17:36:34 -0400 |
commit | e617c5a047b60204ab049a1ffe432310bb406055 (patch) | |
tree | e75b6af74c6c26beae2d96c0e69011cc52dfd179 /src/cryptography/x509/ocsp.py | |
parent | 6e756aec9c91a8350208050b0a3775ea63891cfd (diff) | |
download | cryptography-e617c5a047b60204ab049a1ffe432310bb406055.tar.gz cryptography-e617c5a047b60204ab049a1ffe432310bb406055.tar.bz2 cryptography-e617c5a047b60204ab049a1ffe432310bb406055.zip |
OCSP response builder (#4485)
* ocsp response builder
* better prose
* review changes
Diffstat (limited to 'src/cryptography/x509/ocsp.py')
-rw-r--r-- | src/cryptography/x509/ocsp.py | 181 |
1 files changed, 172 insertions, 9 deletions
diff --git a/src/cryptography/x509/ocsp.py b/src/cryptography/x509/ocsp.py index c89f12ce..2b0b1dc3 100644 --- a/src/cryptography/x509/ocsp.py +++ b/src/cryptography/x509/ocsp.py @@ -5,13 +5,16 @@ from __future__ import absolute_import, division, print_function import abc +import datetime from enum import Enum import six from cryptography import x509 from cryptography.hazmat.primitives import hashes -from cryptography.x509.base import _reject_duplicate_extension +from cryptography.x509.base import ( + _UNIX_EPOCH, _convert_to_naive_utc_time, _reject_duplicate_extension +) _OIDS_TO_HASH = { @@ -23,6 +26,11 @@ _OIDS_TO_HASH = { } +class OCSPResponderEncoding(Enum): + HASH = "By Hash" + NAME = "By Name" + + class OCSPResponseStatus(Enum): SUCCESSFUL = 0 MALFORMED_REQUEST = 1 @@ -33,6 +41,17 @@ class OCSPResponseStatus(Enum): _RESPONSE_STATUS_TO_ENUM = dict((x.value, x) for x in OCSPResponseStatus) +_ALLOWED_HASHES = ( + hashes.SHA1, hashes.SHA224, hashes.SHA256, + hashes.SHA384, hashes.SHA512 +) + + +def _verify_algorithm(algorithm): + if not isinstance(algorithm, _ALLOWED_HASHES): + raise ValueError( + "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512" + ) class OCSPCertStatus(Enum): @@ -63,14 +82,7 @@ class OCSPRequestBuilder(object): if self._request is not None: raise ValueError("Only one certificate can be added to a request") - allowed_hashes = ( - hashes.SHA1, hashes.SHA224, hashes.SHA256, - hashes.SHA384, hashes.SHA512 - ) - if not isinstance(algorithm, allowed_hashes): - raise ValueError( - "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512" - ) + _verify_algorithm(algorithm) if ( not isinstance(cert, x509.Certificate) or not isinstance(issuer, x509.Certificate) @@ -98,6 +110,157 @@ class OCSPRequestBuilder(object): return backend.create_ocsp_request(self) +class _SingleResponse(object): + def __init__(self, cert, issuer, algorithm, cert_status, this_update, + next_update, revocation_time, revocation_reason): + if ( + not isinstance(cert, x509.Certificate) or + not isinstance(issuer, x509.Certificate) + ): + raise TypeError("cert and issuer must be a Certificate") + + _verify_algorithm(algorithm) + if not isinstance(this_update, datetime.datetime): + raise TypeError("this_update must be a datetime object") + if ( + next_update is not None and + not isinstance(next_update, datetime.datetime) + ): + raise TypeError("next_update must be a datetime object or None") + + self._cert = cert + self._issuer = issuer + self._algorithm = algorithm + self._this_update = this_update + self._next_update = next_update + + if not isinstance(cert_status, OCSPCertStatus): + raise TypeError( + "cert_status must be an item from the OCSPCertStatus enum" + ) + if cert_status is not OCSPCertStatus.REVOKED: + if revocation_time is not None: + raise ValueError( + "revocation_time can only be provided if the certificate " + "is revoked" + ) + if revocation_reason is not None: + raise ValueError( + "revocation_reason can only be provided if the certificate" + " is revoked" + ) + else: + if not isinstance(revocation_time, datetime.datetime): + raise TypeError("revocation_time must be a datetime object") + + revocation_time = _convert_to_naive_utc_time(revocation_time) + if revocation_time <= _UNIX_EPOCH: + raise ValueError('The revocation_time must be after the unix' + ' epoch (1970 January 1).') + + if ( + revocation_reason is not None and + not isinstance(revocation_reason, x509.ReasonFlags) + ): + raise TypeError( + "revocation_reason must be an item from the ReasonFlags " + "enum or None" + ) + + self._cert_status = cert_status + self._revocation_time = revocation_time + self._revocation_reason = revocation_reason + + +class OCSPResponseBuilder(object): + def __init__(self, response=None, responder_id=None, certs=None, + extensions=[]): + self._response = response + self._responder_id = responder_id + self._certs = certs + self._extensions = extensions + + def add_response(self, cert, issuer, algorithm, cert_status, this_update, + next_update, revocation_time, revocation_reason): + if self._response is not None: + raise ValueError("Only one response per OCSPResponse.") + + singleresp = _SingleResponse( + cert, issuer, algorithm, cert_status, this_update, next_update, + revocation_time, revocation_reason + ) + return OCSPResponseBuilder( + singleresp, self._responder_id, + self._certs, self._extensions, + ) + + def responder_id(self, encoding, responder_cert): + if self._responder_id is not None: + raise ValueError("responder_id can only be set once") + if not isinstance(responder_cert, x509.Certificate): + raise TypeError("responder_cert must be a Certificate") + if not isinstance(encoding, OCSPResponderEncoding): + raise TypeError( + "encoding must be an element from OCSPResponderEncoding" + ) + + return OCSPResponseBuilder( + self._response, (responder_cert, encoding), + self._certs, self._extensions, + ) + + def certificates(self, certs): + if self._certs is not None: + raise ValueError("certificates may only be set once") + certs = list(certs) + if len(certs) == 0: + raise ValueError("certs must not be an empty list") + if not all(isinstance(x, x509.Certificate) for x in certs): + raise TypeError("certs must be a list of Certificates") + return OCSPResponseBuilder( + self._response, self._responder_id, + certs, self._extensions, + ) + + def add_extension(self, extension, critical): + if not isinstance(extension, x509.ExtensionType): + raise TypeError("extension must be an ExtensionType") + + extension = x509.Extension(extension.oid, critical, extension) + _reject_duplicate_extension(extension, self._extensions) + + return OCSPResponseBuilder( + self._response, self._responder_id, + self._certs, self._extensions + [extension], + ) + + def sign(self, private_key, algorithm): + from cryptography.hazmat.backends.openssl.backend import backend + if self._response is None: + raise ValueError("You must add a response before signing") + if self._responder_id is None: + raise ValueError("You must add a responder_id before signing") + + if not isinstance(algorithm, hashes.HashAlgorithm): + raise TypeError("Algorithm must be a registered hash algorithm.") + + return backend.create_ocsp_response( + OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm + ) + + @classmethod + def build_unsuccessful(cls, response_status): + from cryptography.hazmat.backends.openssl.backend import backend + if not isinstance(response_status, OCSPResponseStatus): + raise TypeError( + "response_status must be an item from OCSPResponseStatus" + ) + if response_status is OCSPResponseStatus.SUCCESSFUL: + raise ValueError("response_status cannot be SUCCESSFUL") + + return backend.create_ocsp_response(response_status, None, None, None) + + @six.add_metaclass(abc.ABCMeta) class OCSPRequest(object): @abc.abstractproperty |