From 9ea5612c89cbb6f1461929f99919a889ffaaa0e1 Mon Sep 17 00:00:00 2001 From: Matthias Valvekens Date: Thu, 5 Oct 2023 23:26:52 +0200 Subject: [PATCH 1/4] Fix non-discoverable OCSP tests --- tests/test_validate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_validate.py b/tests/test_validate.py index ea7bb2c..18a10fb 100644 --- a/tests/test_validate.py +++ b/tests/test_validate.py @@ -422,7 +422,7 @@ def read_openssl_ocsp_test_params(): @pytest.mark.parametrize( "test_case", read_openssl_ocsp_test_params(), ids=lambda case: case.name ) -def openssl_ocsp(test_case: OCSPTestCase): +def test_openssl_ocsp(test_case: OCSPTestCase): context = ValidationContext( trust_roots=test_case.roots, other_certs=test_case.other_certs, From 2aeea5b85e1a6bfe6f9a4a0048f0e86a5af2c07d Mon Sep 17 00:00:00 2001 From: Matthias Valvekens Date: Thu, 5 Oct 2023 23:08:52 +0200 Subject: [PATCH 2/4] Bump minor version --- pyhanko_certvalidator/version.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyhanko_certvalidator/version.py b/pyhanko_certvalidator/version.py index 8081eac..d89b6f0 100644 --- a/pyhanko_certvalidator/version.py +++ b/pyhanko_certvalidator/version.py @@ -1,5 +1,5 @@ # coding: utf-8 -__version__ = '0.24.2-dev1' -__version_info__ = (0, 24, 2, 'dev1') +__version__ = '0.25.0-dev1' +__version_info__ = (0, 25, 0, 'dev1') From aca268c130a7194d37c509ecfcf9ccbda90a201c Mon Sep 17 00:00:00 2001 From: Matthias Valvekens Date: Thu, 5 Oct 2023 23:08:16 +0200 Subject: [PATCH 3/4] Throw more precise error on stale revinfo --- pyhanko_certvalidator/errors.py | 45 +++++- pyhanko_certvalidator/revinfo/_err_gather.py | 22 +++ pyhanko_certvalidator/revinfo/validate_crl.py | 149 ++++++++---------- .../revinfo/validate_ocsp.py | 68 ++++---- pyhanko_certvalidator/validate.py | 29 +++- tests/fixtures/nist_pkits/pkits.json | 4 +- tests/fixtures/openssl-ocsp/openssl-ocsp.json | 14 ++ tests/test_validate.py | 8 +- 8 files changed, 206 insertions(+), 133 deletions(-) create mode 100644 pyhanko_certvalidator/revinfo/_err_gather.py diff --git a/pyhanko_certvalidator/errors.py b/pyhanko_certvalidator/errors.py index 22adb9c..8873679 100644 --- a/pyhanko_certvalidator/errors.py +++ b/pyhanko_certvalidator/errors.py @@ -1,6 +1,6 @@ # coding: utf-8 from datetime import datetime -from typing import Optional, Type, TypeVar +from typing import List, Optional, Type, TypeVar from asn1crypto.crl import CRLReason from cryptography.exceptions import InvalidSignature @@ -34,9 +34,16 @@ class CRLFetchError(CRLValidationError): class CRLValidationIndeterminateError(CRLValidationError): - @property - def failures(self): - return self.args[1] + def __init__( + self, + msg: str, + failures: List[str], + suspect_stale: Optional[datetime] = None, + ): + self.msg = msg + self.failures = failures + self.suspect_stale = suspect_stale + super().__init__(msg, failures) class OCSPValidationError(Exception): @@ -48,9 +55,16 @@ class OCSPNoMatchesError(OCSPValidationError): class OCSPValidationIndeterminateError(OCSPValidationError): - @property - def failures(self): - return self.args[1] + def __init__( + self, + msg: str, + failures: List[str], + suspect_stale: Optional[datetime] = None, + ): + self.msg = msg + self.failures = failures + self.suspect_stale = suspect_stale + super().__init__(msg, failures) class OCSPFetchError(OCSPValidationError): @@ -118,6 +132,23 @@ class InsufficientRevinfoError(PathValidationError): pass +class StaleRevinfoError(InsufficientRevinfoError): + @classmethod + def format( + cls, + msg: str, + time_cutoff: datetime, + proc_state: ValProcState, + ): + return StaleRevinfoError(msg, time_cutoff, proc_state) + + def __init__( + self, msg: str, time_cutoff: datetime, proc_state: ValProcState + ): + self.time_cutoff = time_cutoff + super().__init__(msg, proc_state=proc_state) + + class InsufficientPOEError(PathValidationError): pass diff --git a/pyhanko_certvalidator/revinfo/_err_gather.py b/pyhanko_certvalidator/revinfo/_err_gather.py new file mode 100644 index 0000000..86647c0 --- /dev/null +++ b/pyhanko_certvalidator/revinfo/_err_gather.py @@ -0,0 +1,22 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Optional + + +@dataclass +class Errors: + failures: list = field(default_factory=list) + freshness_failures_only: bool = True + stale_last_usable_at: Optional[datetime] = None + + def append(self, msg: str, revinfo: Any, is_freshness_failure=False): + self.failures.append((msg, revinfo)) + self.freshness_failures_only &= is_freshness_failure + + def update_stale(self, dt: Optional[datetime]): + if dt is not None: + self.stale_last_usable_at = ( + dt + if self.stale_last_usable_at is None + else max(self.stale_last_usable_at, dt) + ) diff --git a/pyhanko_certvalidator/revinfo/validate_crl.py b/pyhanko_certvalidator/revinfo/validate_crl.py index da8df00..02a8174 100644 --- a/pyhanko_certvalidator/revinfo/validate_crl.py +++ b/pyhanko_certvalidator/revinfo/validate_crl.py @@ -24,6 +24,7 @@ from pyhanko_certvalidator.path import ValidationPath from pyhanko_certvalidator.policy_decl import CertRevTrustPolicy from pyhanko_certvalidator.registry import CertificateRegistry +from pyhanko_certvalidator.revinfo._err_gather import Errors from pyhanko_certvalidator.revinfo.archival import ( CRLContainer, RevinfoUsabilityRating, @@ -317,8 +318,7 @@ async def _find_crl_issuer( @dataclass -class _CRLErrs: - failures: list = field(default_factory=list) +class _CRLErrs(Errors): issuer_failures: int = 0 @@ -436,13 +436,11 @@ def _handle_crl_idp_ext_constraints( crl_authority_name=crl_authority_name, ) if not match: - errs.failures.append( - ( - "The CRL issuing distribution point extension does not " - "share any names with the certificate CRL distribution " - "point extension", - certificate_list, - ) + errs.append( + "The CRL issuing distribution point extension does not " + "share any names with the certificate CRL distribution " + "point extension", + certificate_list, ) errs.issuer_failures += 1 return False @@ -453,12 +451,10 @@ def _handle_crl_idp_ext_constraints( cert.basic_constraints_value and cert.basic_constraints_value['ca'].native ): - errs.failures.append( - ( - "CRL only contains end-entity certificates and " - "certificate is a CA certificate", - certificate_list, - ) + errs.append( + "CRL only contains end-entity certificates and " + "certificate is a CA certificate", + certificate_list, ) return False @@ -468,19 +464,17 @@ def _handle_crl_idp_ext_constraints( not cert.basic_constraints_value or cert.basic_constraints_value['ca'].native is False ): - errs.failures.append( - ( - "CRL only contains CA certificates and certificate " - "is an end-entity certificate", - certificate_list, - ) + errs.append( + "CRL only contains CA certificates and certificate " + "is an end-entity certificate", + certificate_list, ) return False # Step b 2 iv if crl_idp['only_contains_attribute_certs'].native: - errs.failures.append( - ('CRL only contains attribute certificates', certificate_list) + errs.append( + 'CRL only contains attribute certificates', certificate_list ) return False @@ -502,13 +496,11 @@ def _handle_attr_cert_crl_idp_ext_constraints( crl_authority_name=crl_authority_name, ) if not match: - errs.failures.append( - ( - "The CRL issuing distribution point extension does not " - "share any names with the attribute certificate's " - "CRL distribution point extension", - certificate_list, - ) + errs.append( + "The CRL issuing distribution point extension does not " + "share any names with the attribute certificate's " + "CRL distribution point extension", + certificate_list, ) errs.issuer_failures += 1 return False @@ -519,12 +511,10 @@ def _handle_attr_cert_crl_idp_ext_constraints( or crl_idp['only_contains_ca_certs'].native ) if pkc_only: - errs.failures.append( - ( - "CRL only contains public-key certificates, but " - "certificate is an attribute certificate", - certificate_list, - ) + errs.append( + "CRL only contains public-key certificates, but " + "certificate is an attribute certificate", + certificate_list, ) return False @@ -578,7 +568,7 @@ async def _handle_single_crl( errs.issuer_failures += 1 return None except (CertificateFetchError, CRLValidationError) as e: - errs.failures.append((e.args[0], certificate_list)) + errs.append(e.args[0], certificate_list) return None interim_reasons = _get_crl_scope_assuming_authority( @@ -600,11 +590,12 @@ async def _handle_single_crl( if rating != RevinfoUsabilityRating.OK: if rating == RevinfoUsabilityRating.STALE: msg = 'CRL is not recent enough' + errs.update_stale(freshness_result.last_usable_at) elif rating == RevinfoUsabilityRating.TOO_NEW: msg = 'CRL is too recent' else: msg = 'CRL freshness could not be established' - errs.failures.append((msg, certificate_list_cont)) + errs.append(msg, certificate_list_cont, is_freshness_failure=True) return None # Step c @@ -679,12 +670,10 @@ def _get_crl_authority_name( ) crl_authority_name = tmp_crl_issuer.subject else: - errs.failures.append( - ( - 'CRL is marked as an indirect CRL, but provides no ' - 'mechanism for locating the CRL issuer certificate', - certificate_list_cont, - ) + errs.append( + 'CRL is marked as an indirect CRL, but provides no ' + 'mechanism for locating the CRL issuer certificate', + certificate_list_cont, ) raise LookupError return is_indirect, crl_authority_name @@ -725,12 +714,10 @@ def _maybe_get_delta_crl( delta_certificate_list = delta_certificate_list_cont.crl_data if delta_certificate_list.critical_extensions - KNOWN_CRL_EXTENSIONS: - errs.failures.append( - ( - 'One or more unrecognized critical extensions are present in ' - 'the delta CRL', - delta_certificate_list_cont, - ) + errs.append( + 'One or more unrecognized critical extensions are present in ' + 'the delta CRL', + delta_certificate_list_cont, ) return None @@ -738,11 +725,9 @@ def _maybe_get_delta_crl( try: _verify_crl_signature(delta_certificate_list, crl_issuer.public_key) except CRLValidationError: - errs.failures.append( - ( - 'Delta CRL signature could not be verified', - delta_certificate_list_cont, - ) + errs.append( + 'Delta CRL signature could not be verified', + delta_certificate_list_cont, ) return None @@ -754,11 +739,14 @@ def _maybe_get_delta_crl( if rating != RevinfoUsabilityRating.OK: if rating == RevinfoUsabilityRating.STALE: msg = 'Delta CRL is stale' + errs.update_stale(freshness_result.last_usable_at) elif rating == RevinfoUsabilityRating.TOO_NEW: msg = 'Delta CRL is too recent' else: msg = 'Delta CRL freshness could not be established' - errs.failures.append((msg, delta_certificate_list_cont)) + errs.append( + msg, delta_certificate_list_cont, is_freshness_failure=True + ) return None return delta_certificate_list_cont return None @@ -853,12 +841,10 @@ def _get_crl_scope_assuming_authority( # a certificate issuer can self-issue a new cert that is used for CRLs if certificate_list.critical_extensions - KNOWN_CRL_EXTENSIONS: - errs.failures.append( - ( - 'One or more unrecognized critical extensions are present in ' - 'the CRL', - certificate_list_cont, - ) + errs.append( + 'One or more unrecognized critical extensions are present in ' + 'the CRL', + certificate_list_cont, ) return None @@ -889,12 +875,10 @@ def _check_cert_on_crl_and_delta( crl_issuer.subject, ) except NotImplementedError: - errs.failures.append( - ( - 'One or more unrecognized critical extensions are present in ' - 'the CRL entry for the certificate', - delta_certificate_list_cont, - ) + errs.append( + 'One or more unrecognized critical extensions are present in ' + 'the CRL entry for the certificate', + delta_certificate_list_cont, ) raise @@ -905,12 +889,10 @@ def _check_cert_on_crl_and_delta( cert, cert_issuer_name, certificate_list, crl_issuer.subject ) except NotImplementedError: - errs.failures.append( - ( - 'One or more unrecognized critical extensions are present in ' - 'the CRL entry for the certificate', - certificate_list_cont, - ) + errs.append( + 'One or more unrecognized critical extensions are present in ' + 'the CRL entry for the certificate', + certificate_list_cont, ) raise @@ -961,7 +943,7 @@ async def _classify_relevant_crls( except ValueError as e: msg = "Generic processing error while classifying CRL." logging.debug(msg, exc_info=e) - errs.failures.append((msg, certificate_list)) + errs.append(msg, certificate_list) return complete_lists_by_issuer, delta_lists_by_issuer @@ -984,14 +966,19 @@ def _process_crl_completeness( ) if not errs.failures: - errs.failures.append( - ('The available CRLs do not cover all revocation reasons',) + errs.append( + 'The available CRLs do not cover all revocation reasons', None ) return CRLValidationIndeterminateError( f"Unable to determine if {proc_state.describe_cert()} " f"is revoked due to insufficient information from known CRLs", - errs.failures, + failures=errs.failures, + suspect_stale=( + errs.stale_last_usable_at + if errs.freshness_failures_only + else None + ), ) @@ -1081,7 +1068,7 @@ async def verify_crl( except ValueError as e: msg = "Generic processing error while validating CRL." logging.debug(msg, exc_info=e) - errs.failures.append((msg, certificate_list_cont)) + errs.append(msg, certificate_list_cont) exc = _process_crl_completeness( checked_reasons, total_crls, errs, proc_state @@ -1195,7 +1182,7 @@ async def _assess_crl_relevance( errs.issuer_failures += 1 return None except (CertificateFetchError, CRLValidationError) as e: - errs.failures.append((e.args[0], certificate_list)) + errs.append(e.args[0], certificate_list) return None provisional_results = [] @@ -1304,7 +1291,7 @@ async def collect_relevant_crls_with_paths( except ValueError as e: msg = "Generic processing error while validating CRL." logging.debug(msg, exc_info=e) - errs.failures.append((msg, certificate_list_cont)) + errs.append(msg, certificate_list_cont) return CRLCollectionResult( crls=relevant_crls, diff --git a/pyhanko_certvalidator/revinfo/validate_ocsp.py b/pyhanko_certvalidator/revinfo/validate_ocsp.py index bf80fb5..f54103f 100644 --- a/pyhanko_certvalidator/revinfo/validate_ocsp.py +++ b/pyhanko_certvalidator/revinfo/validate_ocsp.py @@ -34,6 +34,7 @@ LayeredCertificateStore, SimpleCertificateStore, ) +from pyhanko_certvalidator.revinfo._err_gather import Errors from pyhanko_certvalidator.revinfo.archival import ( OCSPContainer, RevinfoUsabilityRating, @@ -162,8 +163,7 @@ def _ocsp_allowed(responder_cert: x509.Certificate): @dataclass -class _OCSPErrs: - failures: list = field(default_factory=list) +class _OCSPErrs(Errors): mismatch_failures: int = 0 @@ -208,23 +208,21 @@ def _match_ocsp_certid( return False if name_mismatch: - errs.failures.append( - ('OCSP response issuer name hash does not match', ocsp_response) + errs.append( + 'OCSP response issuer name hash does not match', ocsp_response ) return False if serial_mismatch: - errs.failures.append( - ( - 'OCSP response certificate serial number does not match', - ocsp_response, - ) + errs.append( + 'OCSP response certificate serial number does not match', + ocsp_response, ) return False if key_hash_mismatch: - errs.failures.append( - ('OCSP response issuer key hash does not match', ocsp_response) + errs.append( + 'OCSP response issuer key hash does not match', ocsp_response ) return False return True @@ -259,12 +257,10 @@ def _identify_responder_cert( candidate_responder_certs[0] if candidate_responder_certs else None ) if not responder_cert: - errs.failures.append( - ( - "Unable to verify OCSP response since response signing " - "certificate could not be located", - ocsp_response, - ) + errs.append( + "Unable to verify OCSP response since response signing " + "certificate could not be located", + ocsp_response, ) return responder_cert @@ -331,15 +327,13 @@ async def _check_ocsp_authorisation( ) auth_ok = True except OCSPValidationError as e: - errs.failures.append((e.args[0], ocsp_response)) + errs.append(e.args[0], ocsp_response) auth_ok = False if not auth_ok: - errs.failures.append( - ( - 'Unable to verify OCSP response since response was ' - 'signed by an unauthorized certificate', - ocsp_response, - ) + errs.append( + 'Unable to verify OCSP response since response was ' + 'signed by an unauthorized certificate', + ocsp_response, ) return auth_ok @@ -399,17 +393,13 @@ def _verify_ocsp_signature( ) return True except PSSParameterMismatch: - errs.failures.append( - ( - 'The signature parameters on the OCSP response do not match ' - 'the constraints on the public key', - ocsp_response, - ) + errs.append( + 'The signature parameters on the OCSP response do not match ' + 'the constraints on the public key', + ocsp_response, ) except InvalidSignature: - errs.failures.append( - ('Unable to verify OCSP response signature', ocsp_response) - ) + errs.append('Unable to verify OCSP response signature', ocsp_response) return False @@ -469,11 +459,12 @@ async def _handle_single_ocsp_resp( if rating != RevinfoUsabilityRating.OK: if rating == RevinfoUsabilityRating.STALE: msg = 'OCSP response is not recent enough' + errs.update_stale(freshness_result.last_usable_at) elif rating == RevinfoUsabilityRating.TOO_NEW: msg = 'OCSP response is too recent' else: msg = 'OCSP response freshness could not be established' - errs.failures.append((msg, ocsp_response)) + errs.append(msg, ocsp_response, is_freshness_failure=True) return False # check whether the responder cert is authorised @@ -565,7 +556,7 @@ async def verify_ocsp_response( except ValueError as e: msg = "Generic processing error while validating OCSP response." logging.debug(msg, exc_info=e) - errs.failures.append((msg, ocsp_response)) + errs.append(msg, ocsp_response) if errs.mismatch_failures == len(ocsp_responses): raise OCSPNoMatchesError( @@ -575,7 +566,10 @@ async def verify_ocsp_response( raise OCSPValidationIndeterminateError( f"Unable to determine if {cert_description} " f"is revoked due to insufficient information from OCSP responses.", - errs.failures, + failures=errs.failures, + suspect_stale=( + errs.stale_last_usable_at if errs.freshness_failures_only else None + ), ) @@ -676,7 +670,7 @@ async def collect_relevant_responses_with_paths( except ValueError as e: msg = "Generic processing error while validating OCSP response." logging.debug(msg, exc_info=e) - errs.failures.append((msg, ocsp_response_cont)) + errs.append(msg, ocsp_response_cont) return OCSPCollectionResult( responses=relevant, failure_msgs=[f[0] for f in errs.failures], diff --git a/pyhanko_certvalidator/validate.py b/pyhanko_certvalidator/validate.py index a1a8f5d..aae0252 100644 --- a/pyhanko_certvalidator/validate.py +++ b/pyhanko_certvalidator/validate.py @@ -30,6 +30,7 @@ PathBuildingError, PathValidationError, PSSParameterMismatch, + StaleRevinfoError, ValidationError, ) from .name_trees import ( @@ -1292,6 +1293,7 @@ async def _check_revocation( else rev_check_policy.intermediate_ca_cert_rule ) + ocsp_suspect_stale_since = None # for OCSP, we don't bother if there's nothing in the certificate's AIA if rev_rule.ocsp_relevant and cert_has_ocsp: try: @@ -1304,6 +1306,7 @@ async def _check_revocation( failures.extend([failure[0] for failure in e.failures]) revocation_check_failed = True ocsp_matched = True + ocsp_suspect_stale_since = e.suspect_stale except OCSPNoMatchesError: pass except OCSPFetchError as e: @@ -1311,7 +1314,7 @@ async def _check_revocation( soft_fail = True validation_context._report_soft_fail(e) else: - failures.append(e) + failures.append(e.args[0]) revocation_check_failed = True if not ocsp_status_good and rev_rule.ocsp_mandatory: if failures: @@ -1329,6 +1332,7 @@ async def _check_revocation( ) crl_status_good = False + crl_suspect_stale_since = None # do not attempt to check CRLs (even cached ones) if there are no # distribution points, unless we have to crl_required_by_policy = rev_rule.crl_mandatory or ( @@ -1348,6 +1352,7 @@ async def _check_revocation( failures.extend([failure[0] for failure in e.failures]) revocation_check_failed = True crl_matched = True + crl_suspect_stale_since = e.suspect_stale except CRLNoMatchesError: pass except CRLFetchError as e: @@ -1355,7 +1360,7 @@ async def _check_revocation( soft_fail = True validation_context._report_soft_fail(e) else: - failures.append(e) + failures.append(e.args[0]) revocation_check_failed = True if not crl_status_good and rev_rule.crl_mandatory: @@ -1383,12 +1388,26 @@ async def _check_revocation( expected_revinfo_not_found = not matched and expected_revinfo if not soft_fail: if not status_good and matched and revocation_check_failed: - raise InsufficientRevinfoError.from_state( + msg = ( f"The path could not be validated because " f"{proc_state.describe_cert(def_interm=True)} revocation " - f"checks failed: {'; '.join(failures)}", - proc_state, + f"checks failed: {'; '.join(failures)}" + ) + maybe_stale_cutoff = ( + ocsp_suspect_stale_since or crl_suspect_stale_since ) + if maybe_stale_cutoff: + stale_cutoff = ( + max(ocsp_suspect_stale_since, crl_suspect_stale_since) + if ocsp_suspect_stale_since and crl_suspect_stale_since + else maybe_stale_cutoff + ) + raise StaleRevinfoError.format(msg, stale_cutoff, proc_state) + else: + raise InsufficientRevinfoError.from_state( + msg, + proc_state, + ) if expected_revinfo_not_found: raise InsufficientRevinfoError.from_state( f"The path could not be validated because no revocation " diff --git a/tests/fixtures/nist_pkits/pkits.json b/tests/fixtures/nist_pkits/pkits.json index aacf9c2..af0bc0f 100644 --- a/tests/fixtures/nist_pkits/pkits.json +++ b/tests/fixtures/nist_pkits/pkits.json @@ -540,7 +540,7 @@ ], "path_len": 3, "error": { - "class": "InsufficientRevinfoError", + "class": "StaleRevinfoError", "msg_regex": "The path could not be validated because the end-entity certificate revocation checks failed: CRL is not recent enough" } }, @@ -556,7 +556,7 @@ ], "path_len": 3, "error": { - "class": "InsufficientRevinfoError", + "class": "StaleRevinfoError", "msg_regex": "The path could not be validated because the end-entity certificate revocation checks failed: CRL is not recent enough" } }, diff --git a/tests/fixtures/openssl-ocsp/openssl-ocsp.json b/tests/fixtures/openssl-ocsp/openssl-ocsp.json index 1ecbb71..4671676 100644 --- a/tests/fixtures/openssl-ocsp/openssl-ocsp.json +++ b/tests/fixtures/openssl-ocsp/openssl-ocsp.json @@ -431,5 +431,19 @@ "class": "InsufficientRevinfoError", "msg_regex": "The path could not be validated because the end-entity certificate revocation checks failed: Unable to verify OCSP response since response signing certificate could not be validated" } + }, + { + "name": "direct_stale_otherwise_ok", + "root": "ND3_Issuer_Root.pem", + "cert": "ND3_Cert_EE.pem", + "ocsps": [ + "ND3.ors" + ], + "path_len": 2, + "moment": "2013-10-12T00:00:00+00:00", + "error": { + "class": "StaleRevinfoError", + "msg_regex": "The path could not be validated because the end-entity certificate revocation checks failed: OCSP response is not recent enough" + } } ] diff --git a/tests/test_validate.py b/tests/test_validate.py index 18a10fb..64f53c4 100644 --- a/tests/test_validate.py +++ b/tests/test_validate.py @@ -20,6 +20,7 @@ OCSPFetchError, PathValidationError, RevokedError, + StaleRevinfoError, ) from pyhanko_certvalidator.fetchers import ( CertificateFetcher, @@ -95,7 +96,12 @@ def get_fetchers(self) -> Fetchers: ERR_CLASSES = { cls.__name__: cls - for cls in (PathValidationError, RevokedError, InsufficientRevinfoError) + for cls in ( + PathValidationError, + RevokedError, + InsufficientRevinfoError, + StaleRevinfoError, + ) } From 7cdfb067faaa5973baab09df6a57d21c0ab2ccdc Mon Sep 17 00:00:00 2001 From: Matthias Valvekens Date: Fri, 6 Oct 2023 00:12:29 +0200 Subject: [PATCH 4/4] Unify some code from CRLs and deltas --- pyhanko_certvalidator/revinfo/validate_crl.py | 104 ++++++++++-------- 1 file changed, 61 insertions(+), 43 deletions(-) diff --git a/pyhanko_certvalidator/revinfo/validate_crl.py b/pyhanko_certvalidator/revinfo/validate_crl.py index 02a8174..73ea10b 100644 --- a/pyhanko_certvalidator/revinfo/validate_crl.py +++ b/pyhanko_certvalidator/revinfo/validate_crl.py @@ -521,6 +521,32 @@ def _handle_attr_cert_crl_idp_ext_constraints( return True +def _check_crl_freshness( + certificate_list_cont: CRLContainer, + revinfo_policy: CertRevTrustPolicy, + timing_params: ValidationTimingParams, + errs: _CRLErrs, + is_delta: bool, +): + freshness_result = certificate_list_cont.usable_at( + policy=revinfo_policy, + timing_params=timing_params, + ) + prefix = "Delta CRL" if is_delta else "CRL" + rating = freshness_result.rating + if rating != RevinfoUsabilityRating.OK: + if rating == RevinfoUsabilityRating.STALE: + msg = f'{prefix} is not recent enough' + errs.update_stale(freshness_result.last_usable_at) + elif rating == RevinfoUsabilityRating.TOO_NEW: + msg = f'{prefix} is too recent' + else: + msg = f'{prefix} freshness could not be established' + errs.append(msg, certificate_list_cont, is_freshness_failure=True) + return False + return True + + async def _handle_single_crl( cert: Union[x509.Certificate, cms.AttributeCertificateV2], cert_issuer_auth: Authority, @@ -582,20 +608,13 @@ async def _handle_single_crl( if interim_reasons is None: return None - freshness_result = certificate_list_cont.usable_at( - policy=validation_context.revinfo_policy, - timing_params=validation_context.timing_params, - ) - rating = freshness_result.rating - if rating != RevinfoUsabilityRating.OK: - if rating == RevinfoUsabilityRating.STALE: - msg = 'CRL is not recent enough' - errs.update_stale(freshness_result.last_usable_at) - elif rating == RevinfoUsabilityRating.TOO_NEW: - msg = 'CRL is too recent' - else: - msg = 'CRL freshness could not be established' - errs.append(msg, certificate_list_cont, is_freshness_failure=True) + if not _check_crl_freshness( + certificate_list_cont, + validation_context.revinfo_policy, + validation_context.timing_params, + errs, + is_delta=False, + ): return None # Step c @@ -713,12 +732,9 @@ def _maybe_get_delta_crl( delta_certificate_list = delta_certificate_list_cont.crl_data - if delta_certificate_list.critical_extensions - KNOWN_CRL_EXTENSIONS: - errs.append( - 'One or more unrecognized critical extensions are present in ' - 'the delta CRL', - delta_certificate_list_cont, - ) + if not _verify_no_unknown_critical_extensions( + delta_certificate_list_cont, errs, is_delta=True + ): return None # Step h @@ -732,26 +748,31 @@ def _maybe_get_delta_crl( return None if policy and timing_params: - freshness_result = delta_certificate_list_cont.usable_at( - policy=policy, timing_params=timing_params - ) - rating = freshness_result.rating - if rating != RevinfoUsabilityRating.OK: - if rating == RevinfoUsabilityRating.STALE: - msg = 'Delta CRL is stale' - errs.update_stale(freshness_result.last_usable_at) - elif rating == RevinfoUsabilityRating.TOO_NEW: - msg = 'Delta CRL is too recent' - else: - msg = 'Delta CRL freshness could not be established' - errs.append( - msg, delta_certificate_list_cont, is_freshness_failure=True - ) - return None - return delta_certificate_list_cont + if _check_crl_freshness( + delta_certificate_list_cont, + policy, + timing_params, + errs, + is_delta=True, + ): + return delta_certificate_list_cont return None +def _verify_no_unknown_critical_extensions( + certificate_list_cont: CRLContainer, errs: _CRLErrs, is_delta: bool +): + extensions = certificate_list_cont.crl_data.critical_extensions + if extensions - KNOWN_CRL_EXTENSIONS: + errs.append( + f'One or more unrecognized critical extensions are present in ' + f'the {"delta CRL" if is_delta else "CRL"}', + certificate_list_cont, + ) + return False + return True + + def _get_crl_scope_assuming_authority( crl_issuer: x509.Certificate, cert: Union[x509.Certificate, cms.AttributeCertificateV2], @@ -840,12 +861,9 @@ def _get_crl_scope_assuming_authority( # We don't skip a CRL if it only contains reasons already checked since # a certificate issuer can self-issue a new cert that is used for CRLs - if certificate_list.critical_extensions - KNOWN_CRL_EXTENSIONS: - errs.append( - 'One or more unrecognized critical extensions are present in ' - 'the CRL', - certificate_list_cont, - ) + if not _verify_no_unknown_critical_extensions( + certificate_list_cont, errs, is_delta=False + ): return None return interim_reasons