From f33c93d3e7ec99d7712a9e775ec608a763d89ef9 Mon Sep 17 00:00:00 2001 From: Konstantin Shemyak Date: Fri, 23 Apr 2021 16:54:36 +0300 Subject: [PATCH] DRAFT: add method Certificate.is_issued_by(ca) (#2381) Tests missing, documentation incomplete. --- src/cryptography/x509/base.py | 135 +++++++++++++++++++++++++++++++++- 1 file changed, 134 insertions(+), 1 deletion(-) diff --git a/src/cryptography/x509/base.py b/src/cryptography/x509/base.py index 5380f6cff44dd..85920bae48be2 100644 --- a/src/cryptography/x509/base.py +++ b/src/cryptography/x509/base.py @@ -19,12 +19,20 @@ ed25519, ed448, rsa, + padding, ) from cryptography.hazmat.primitives.asymmetric.types import ( PRIVATE_KEY_TYPES, PUBLIC_KEY_TYPES, ) -from cryptography.x509.extensions import Extension, ExtensionType, Extensions +from cryptography.x509.extensions import ( + BasicConstraints, + Extension, + ExtensionType, + ExtensionNotFound, + ExtensionOID, + Extensions, +) from cryptography.x509.name import Name from cryptography.x509.oid import ObjectIdentifier @@ -38,6 +46,13 @@ def __init__(self, msg: str, oid: ObjectIdentifier) -> None: self.oid = oid +class InvalidIssuer(Exception): + def __init__(self, own_issuer: Name, ca_subject: Name) -> None: + super(InvalidIssuer, self).__init__() + self.own_issuer = own_issuer + self.ca_subject = ca_subject + + def _reject_duplicate_extension( extension: Extension[ExtensionType], extensions: typing.List[Extension[ExtensionType]], @@ -255,6 +270,124 @@ def _has_signature_of(self, signer_candidate: "Certificate") -> bool: return True + def valid_at_time(self, date: datetime.datetime) -> bool: + """Return True if the `date` is within certificate's validity time. + Raise appropriate exception otherwise.""" + if self.not_valid_after < date: + raise NotValidAnymore(self, date) + if self.not_valid_before > date: + raise NotValidYet(self, date) + return True + + def ca_bit_set(self, allow_missing=True) -> bool: + try: + basic_constraints = self.extensions.get_extension_for_class( + BasicConstraints + ) + except ExtensionNotFound: + if not allow_missing: + raise CABitNotSet(self) + else: + if not basic_constraints.value.ca: + raise CABitNotSet(self) + return True + + def alt_subject_name_matches_issuer( + self, ca: "Certificate", allow_missing=True + ) -> bool: + try: + subj_alt_name = ca.extensions.get_extension_for_oid( + ExtensionOID.SUBJECT_ALTERNATIVE_NAME + ) + issuer_alt_name = self.extensions.get_extension_for_oid( + ExtensionOID.SUBJECT_ALTERNATIVE_NAME + ) + except ExtensionNotFound: + if allow_missing: + return True + else: + raise + if subj_alt_name != issuer_alt_name: + raise IssuerAltSubjectNameMismatch(self, ca) + return True + + @staticmethod + def is_issued_by_default_cb( + leaf: "Certificate", ca: "Certificate" + ) -> bool: + """ + Default checks which must hold true to claim that `leaf` is issued by `ca`. + This callback can be overwritten. It must return True or raise an exception otherwise. + """ + + # Remains: KeySign EKU + + today = datetime.datetime.today() + leaf.valid_at_time(today) + ca.valid_at_time(today) + ca.ca_bit_set() + leaf.alt_subject_name_matches_issuer(ca) + return True + + def is_issued_by( + self, + issuer_candidate: "Certificate", + is_issued_by_cb=is_issued_by_default_cb, + extra_checks_cb=None, + ) -> bool: + """ + Returns True if the certificate is issued by `issuer_candidate`. + """ + if self.issuer != issuer_candidate.subject: + raise InvalidIssuer( + own_issuer=self.issuer, ca_subject=issuer_candidate.subject + ) + self._has_signature_of(issuer_candidate) + is_issued_by_cb(self, issuer_candidate) + if extra_checks_cb is not None: + extra_checks_cb(self, issuer_candidate) + return True + + def is_issuer_of( + self, + issued_candidate: "Certificate", + is_issued_by_cb=is_issued_by_default_cb, + extra_checks_cb=None, + ) -> bool: + """ + Returns True if the `issued_candidate` is issued by the certificate. + """ + return issued_candidate.is_issued_by( + self, is_issued_by_cb, extra_checks_cb + ) + + +class CertificateNotSuitable(Exception): + def __init__(self, certificate: Certificate): + self.certificate = certificate + + +class NotValidAnymore(CertificateNotSuitable): + def __init__(self, certificate: Certificate, at_time: datetime.datetime): + super(NotValidAnymore, self).__init__(certificate) + self.at_time = at_time + + +class NotValidYet(CertificateNotSuitable): + def __init__(self, certificate: Certificate, at_time: datetime.datetime): + super(NotValidYet, self).__init__(certificate) + self.at_time = at_time + + +class CABitNotSet(CertificateNotSuitable): + pass + + +class IssuerAltSubjectNameMismatch(CertificateNotSuitable): + def __init__(self, certificate: Certificate, ca: Certificate): + super(IssuerAltSubjectNameMismatch, self).__init__(certificate) + self.ca = ca + class RevokedCertificate(metaclass=abc.ABCMeta): @abc.abstractproperty