From a66f596665037e200ee928addd909f71a2c66ff5 Mon Sep 17 00:00:00 2001 From: Matthias Valvekens Date: Sun, 10 Dec 2023 23:32:46 +0100 Subject: [PATCH] Qualification algorithm: beginnings --- pyhanko/sign/ades/qualified_asn1.py | 12 +- pyhanko/sign/validation/eutl.py | 304 ++++++++++++++- pyhanko_tests/data/crypto/certomancer.yml | 94 +++++ pyhanko_tests/test_trusted_list.py | 431 +++++++++++++++++++++- 4 files changed, 816 insertions(+), 25 deletions(-) diff --git a/pyhanko/sign/ades/qualified_asn1.py b/pyhanko/sign/ades/qualified_asn1.py index 190bfdf6..473600d3 100644 --- a/pyhanko/sign/ades/qualified_asn1.py +++ b/pyhanko/sign/ades/qualified_asn1.py @@ -1,4 +1,4 @@ -from asn1crypto import core +from asn1crypto import core, x509 from pyhanko.sign.ades.asn1_util import register_x509_extension @@ -93,4 +93,14 @@ class QcStatements(core.SequenceOf): _child_spec = QcStatement +def get_qc_statements(cert: x509.Certificate) -> QcStatements: + for ext in cert['tbs_certificate']['extensions']: + if ext['extn_id'].native != 'qc_statements': + continue + qc_statements: QcStatements = ext['extn_value'].parsed + return qc_statements + else: + return QcStatements() + + register_x509_extension('1.3.6.1.5.5.7.1.3', 'qc_statements', QcStatements) diff --git a/pyhanko/sign/validation/eutl.py b/pyhanko/sign/validation/eutl.py index 2c5ab24a..17097578 100644 --- a/pyhanko/sign/validation/eutl.py +++ b/pyhanko/sign/validation/eutl.py @@ -1,7 +1,8 @@ import enum import logging +import zoneinfo from collections import defaultdict -from dataclasses import dataclass +from dataclasses import dataclass, replace from datetime import datetime from typing import ( Dict, @@ -17,6 +18,7 @@ ) from asn1crypto import x509 +from pyhanko_certvalidator import ValidationPath from pyhanko_certvalidator.authority import Authority, AuthorityWithCert from pyhanko_certvalidator.errors import InvalidCertificateError from xsdata.formats.dataclass.parsers import XmlParser @@ -28,13 +30,16 @@ ts_119612_sie, xades, ) +from pyhanko.sign.ades import qualified_asn1 from pyhanko.sign.validation.settings import KeyUsageConstraints logger = logging.getLogger(__name__) -CA_QC_URI = 'http://uri.etsi.org/TrstSvc/Svctype/CA/QC' -QTST_URI = 'http://uri.etsi.org/TrstSvc/Svctype/TSA/QTST' -STATUS_GRANTED = 'http://uri.etsi.org/TrstSvc/TrustedList/Svcstatus/granted' +_TRSTSVC_URI_BASE = 'http://uri.etsi.org/TrstSvc' +_TRUSTEDLIST_URI_BASE = f'{_TRSTSVC_URI_BASE}/TrustedList' +CA_QC_URI = f'{_TRSTSVC_URI_BASE}/Svctype/CA/QC' +QTST_URI = f'{_TRSTSVC_URI_BASE}/Svctype/TSA/QTST' +STATUS_GRANTED = f'{_TRUSTEDLIST_URI_BASE}/Svcstatus/granted' class TSPServiceParsingError(ValueError): @@ -48,12 +53,28 @@ class AdditionalServiceInformation: textual_info: Optional[str] +class QcCertType(enum.Enum): + QC_ESIGN = 'qct_esign' + QC_ESEAL = 'qct_eseal' + QC_WEB = 'qct_web' + + +_SVCINFOEXT_URI_BASE = f'{_TRUSTEDLIST_URI_BASE}/SvcInfoExt' + +_CERTIFICATE_TYPE_BY_URI = { + f'{_SVCINFOEXT_URI_BASE}/ForeSignatures': QcCertType.QC_ESIGN, + f'{_SVCINFOEXT_URI_BASE}/ForeSeals': QcCertType.QC_ESEAL, + f'{_SVCINFOEXT_URI_BASE}/ForWebSiteAuthentication': QcCertType.QC_WEB, +} + + @dataclass(frozen=True) class BaseServiceInformation: service_type: str service_name: str provider_certs: Tuple[x509.Certificate, ...] - additional_info: FrozenSet[AdditionalServiceInformation] + additional_info_certificate_type: FrozenSet[QcCertType] + other_additional_info: FrozenSet[AdditionalServiceInformation] class Qualifier(enum.Enum): @@ -63,7 +84,7 @@ class Qualifier(enum.Enum): WITH_QSCD = 'QCWithQSCD' NO_QSCD = 'QCNoQSCD' QSCD_AS_IN_CERT = 'QCQSCDStatusAsInCert' - QSCD_MANAGED = 'QCQSCDManagedOnBehalf' + QSCD_MANAGED_ON_BEHALF = 'QCQSCDManagedOnBehalf' LEGAL_PERSON = 'QCForLegalPerson' FOR_ESIG = 'QCForEsig' FOR_ESEAL = 'QCForEseal' @@ -78,7 +99,7 @@ def uri(self): ) -_BY_URI = {q.uri: q for q in Qualifier} +_QUALIFIER_BY_URI = {q.uri: q for q in Qualifier} class Criterion: @@ -272,7 +293,7 @@ def _process_criteria_list_entries( def _process_qualifiers(qualifiers: Iterable[ts_119612_sie.QualifierType]): for qual in qualifiers: try: - yield _BY_URI[qual.uri] + yield _QUALIFIER_BY_URI[qual.uri] except KeyError: logger.info(f"Qualifier {qual.uri} in SDI ignored...") @@ -327,6 +348,7 @@ def _interpret_service_info_for_ca( if service_info.service_information_extensions else () ) + asi_qc_type: Set[QcCertType] = set() for ext in extensions_xml: for ext_content in ext.content: if isinstance(ext_content, ts_119612_sie.Qualifications): @@ -339,9 +361,15 @@ def _interpret_service_info_for_ca( elif isinstance( ext_content, ts_119612.AdditionalServiceInformation ): - additional_info.append( - _process_additional_info(ext_content, ext.critical or False) + additional_info_entry = _process_additional_info( + ext_content, ext.critical or False ) + try: + asi_qc_type.add( + _CERTIFICATE_TYPE_BY_URI[additional_info_entry.uri] + ) + except KeyError: + additional_info.append(additional_info_entry) elif ext.critical: # TODO more informative exception / only ditch the current SDI raise TSPServiceParsingError( @@ -355,7 +383,8 @@ def _interpret_service_info_for_ca( ), service_name=service_name or "unknown", provider_certs=tuple(certs), - additional_info=frozenset(additional_info), + additional_info_certificate_type=frozenset(asi_qc_type), + other_additional_info=frozenset(additional_info), ) return CAServiceInformation( @@ -382,6 +411,7 @@ def _interpret_service_info_for_cas( # work, store that info on the object if service_info.service_status != STATUS_GRANTED: continue + # TODO process errors in individual services yield _interpret_service_info_for_ca(service) @@ -425,3 +455,255 @@ def applicable_service_definitions( self, ca: Authority ) -> Iterable[CAServiceInformation]: return tuple(self._cert_to_si[ca]) + + # TODO take date into account (and properly track it + # for service definitions) + def applicable_tsps_on_path( + self, + path: ValidationPath, + ) -> Generator[CAServiceInformation, None, None]: + for ca in path.iter_authorities(): + yield from self.applicable_service_definitions(ca) + + +class QcPrivateKeyManagementType(enum.Enum): + UNKNOWN = 0 + QCSD = 1 + QCSD_DELEGATED = 2 + QCSD_BY_POLICY = 3 + + @property + def is_qcsd(self) -> bool: + return self != QcPrivateKeyManagementType.UNKNOWN + + +EIDAS_START_DATE = datetime( + 2016, 7, 1, 0, 0, 0, tzinfo=zoneinfo.ZoneInfo('CET') +) + +PRE_EIDAS_QCP_POLICY = '0.4.0.1456.1.1' +PRE_EIDAS_QCP_PLUS_POLICY = '0.4.0.1456.1.2' + + +@dataclass(frozen=True) +class QualifiedStatus: + """ + Represents the qualified status of a certificate. + """ + + qualified: bool + """ + Indicates whether the certificate is to be considered qualified. + """ + + qc_type: QcCertType + """ + Type of qualified certificate. + """ + + qc_key_security: QcPrivateKeyManagementType + """ + Indicates whether the CA declares that the private key + corresponding to this certificate resides in a qualified + signature creation device (QSCD) or secure signature creation device (SSCD). + It also indicates whether the QCSD is managed on behalf of the signer, + if applicable. + + .. warning:: + These terms are functionally interchangeable, the only difference is + that "SSCD" is pre-eIDAS terminology. + """ + + +UNQUALIFIED = QualifiedStatus( + qualified=False, + qc_type=QcCertType.QC_ESIGN, + qc_key_security=QcPrivateKeyManagementType.UNKNOWN, +) + + +class QualificationAssessor: + def __init__(self, tsp_registry: TSPRegistry): + self._registry = tsp_registry + + @staticmethod + def _process_qc_statements(cert: x509.Certificate) -> QualifiedStatus: + qcs = qualified_asn1.get_qc_statements(cert) + qualified = False + key_secure = False + qc_type = QcCertType.QC_ESIGN + for statement in qcs: + st_type = statement['statement_id'].native + if st_type == 'qc_compliance': + qualified = True + elif st_type == 'qc_sscd': + # management delegation is not encoded by the QcStatements + key_secure = True + elif st_type == 'qc_type': + qc_types: qualified_asn1.QcCertificateType = statement[ + 'statement_info' + ] + if len(qc_types) != 1: + # In theory this is not limited to one value, we have to + # let the TL override in a case like this. + # Nonetheless there's really no good reason to do this, + # and some ETSI specs are more strict than others, + # so I'll deal with this case when it presents itself + raise NotImplementedError("only support exactly 1 qc_type") + qc_type = QcCertType(qc_types[0].native) + return QualifiedStatus( + qualified=qualified, + qc_type=qc_type, + qc_key_security=( + QcPrivateKeyManagementType.QCSD + if key_secure and qualified + else QcPrivateKeyManagementType.UNKNOWN + ), + ) + + @staticmethod + def _check_cd_applicable( + sd: CAServiceInformation, putative_status: QualifiedStatus + ): + sd_declared_type = sd.base_info.additional_info_certificate_type + if sd_declared_type and putative_status.qc_type not in sd_declared_type: + logger.info( + f"Found matching SDI {sd.base_info.service_name} on path; " + f"skipping because QC type does not match" + ) + return False + return True + + @staticmethod + def _apply_sd_qualifications( + cert: x509.Certificate, + prelim_status: QualifiedStatus, + sd: CAServiceInformation, + ): + applicable_qualifiers: Set[Qualifier] = set() + for qualification in sd.qualifications: + if not qualification.criteria_list.matches(cert): + continue + applicable_qualifiers.update(qualification.qualifiers) + return QualificationAssessor._final_status( + prelim_status, frozenset(applicable_qualifiers) + ) + + @staticmethod + def _final_status( + prelim_status: QualifiedStatus, + applicable_qualifiers: FrozenSet[Qualifier], + ): + # TODO explicitly check consistency / contradictory qualifiers + # (for now we just use conservative defaults) + is_qualified: bool + if ( + Qualifier.NOT_QUALIFIED in applicable_qualifiers + or Qualifier.LEGAL_PERSON in applicable_qualifiers + ): + is_qualified = False + elif Qualifier.QC_STATEMENT in applicable_qualifiers: + is_qualified = True + else: + is_qualified = prelim_status.qualified + + qc_type: QcCertType + if Qualifier.FOR_WSA in applicable_qualifiers: + qc_type = QcCertType.QC_WEB + elif Qualifier.FOR_ESIG in applicable_qualifiers: + qc_type = QcCertType.QC_ESIGN + elif Qualifier.FOR_ESEAL in applicable_qualifiers: + qc_type = QcCertType.QC_ESEAL + else: + qc_type = prelim_status.qc_type + + key_mgmt: QcPrivateKeyManagementType + if not is_qualified: + key_mgmt = QcPrivateKeyManagementType.UNKNOWN + elif ( + Qualifier.NO_SSCD in applicable_qualifiers + or Qualifier.NO_QSCD in applicable_qualifiers + ): + key_mgmt = QcPrivateKeyManagementType.UNKNOWN + elif Qualifier.QSCD_MANAGED_ON_BEHALF in applicable_qualifiers: + key_mgmt = QcPrivateKeyManagementType.QCSD_DELEGATED + elif ( + Qualifier.WITH_SSCD in applicable_qualifiers + or Qualifier.WITH_QSCD in applicable_qualifiers + ): + key_mgmt = QcPrivateKeyManagementType.QCSD + else: + key_mgmt = prelim_status.qc_key_security + return QualifiedStatus( + qualified=is_qualified, + qc_type=qc_type, + qc_key_security=key_mgmt, + ) + + def check_entity_cert_qualified( + self, path: ValidationPath, moment: Optional[datetime] = None + ) -> QualifiedStatus: + cert = path.leaf + if not isinstance(cert, x509.Certificate): + raise NotImplementedError( + "Only public-key certs are in scope for qualification" + ) + prelim_status = QualificationAssessor._process_qc_statements(cert) + path_policies = path.qualified_policies() + reference_time = moment or datetime.now(tz=zoneinfo.ZoneInfo('CET')) + if reference_time < EIDAS_START_DATE and path_policies: + # check QCP / QCP+ policy + policy_oids = {q.user_domain_policy_id for q in path_policies} + if PRE_EIDAS_QCP_PLUS_POLICY in policy_oids: + prelim_status = replace( + prelim_status, + qualified=True, + qc_key_security=( + QcPrivateKeyManagementType.QCSD_BY_POLICY + if not prelim_status.qc_key_security.is_qcsd + else prelim_status.qc_key_security + ), + ) + elif PRE_EIDAS_QCP_POLICY in policy_oids: + prelim_status = replace(prelim_status, qualified=True) + + statuses_found: List[Tuple[CAServiceInformation, QualifiedStatus]] = [] + for sd in self._registry.applicable_tsps_on_path(path): + # For this subtlety, see the hanging para in the beginning of + # section 4 in the CEF eSignature DSS validation algorithm doc + putative_status = QualificationAssessor._apply_sd_qualifications( + cert, prelim_status, sd + ) + if QualificationAssessor._check_cd_applicable(sd, putative_status): + statuses_found.append((sd, putative_status)) + + uniq_statuses = set(st for _, st in statuses_found) + if len(statuses_found) == 1: + # happy path + return statuses_found[0][1] + elif len(uniq_statuses) == 1: + # TODO gather these warnings somewhere so they can be added + # to the validation report + service_info = ', '.join( + sd.base_info.service_name for sd, _ in statuses_found + ) + logger.warning( + f"Qualification algorithm for {cert.subject.human_friendly} " + f"reached a consistent conclusion, but through several " + f"different service definitions: {service_info}" + ) + return statuses_found[0][1] + elif not uniq_statuses: + return UNQUALIFIED + else: + service_info = ', '.join( + sd.base_info.service_name for sd, _ in statuses_found + ) + logger.warning( + f"Qualification algorithm for {cert.subject.human_friendly} " + f"reached contradictory conclusions: {uniq_statuses}. " + f"Several service definitions were found applicable: " + f"{service_info}. This certificate will not be considered " + f"qualified." + ) + return UNQUALIFIED diff --git a/pyhanko_tests/data/crypto/certomancer.yml b/pyhanko_tests/data/crypto/certomancer.yml index 6792381d..4042c3e4 100644 --- a/pyhanko_tests/data/crypto/certomancer.yml +++ b/pyhanko_tests/data/crypto/certomancer.yml @@ -1,4 +1,6 @@ external-url-prefix: "http://pyhanko.tests" +plugin-modules: + - pyhanko.sign.ades.qualified_asn1 keysets: testing-ca: path-prefix: keys-rsa @@ -607,3 +609,95 @@ pki-architectures: tsa: signing-key: tsa signing-cert: tsa + testing-ca-qualified: + template: testing-ca + keyset: testing-ca-ecdsa + certs: + interm-qualified: + template: interm + subject: interm + extensions: + - id: certificate_policies + value: + - policy_identifier: '2.999.31337.0' + - policy_identifier: '2.999.31337.1' + - policy_identifier: '0.4.0.1456.1.1' + - policy_identifier: '0.4.0.1456.1.2' + esig-qualified: + template: signer1 + subject: signer1 + issuer-cert: interm-qualified + extensions: + - id: certificate_policies + value: + - policy_identifier: '2.999.31337.0' + - id: qc_statements + value: + - statement_id: 'qc_compliance' + - statement_id: 'qc_sscd' + eseal-qualified: + template: signer1 + subject: signer1 + issuer-cert: interm-qualified + extensions: + - id: certificate_policies + value: + - policy_identifier: '2.999.31337.1' + - id: qc_statements + value: + - statement_id: 'qc_compliance' + - statement_id: 'qc_sscd' + - statement_id: 'qc_type' + statement_info: ['qct_eseal'] + esig-qualified-no-qscd: + template: signer1 + subject: signer1 + issuer-cert: interm-qualified + extensions: + - id: certificate_policies + value: + - policy_identifier: '2.999.31337.0' + - id: qc_statements + value: + - statement_id: 'qc_compliance' + not-qualified: + template: signer1 + subject: signer1 + issuer-cert: interm-qualified + extensions: + - id: certificate_policies + value: + - policy_identifier: '2.999.31337.0' + not-qualified-nonsense-qcsd: + template: signer1 + subject: signer1 + issuer-cert: interm-qualified + extensions: + - id: certificate_policies + value: + - policy_identifier: '2.999.31337.0' + - id: qc_statements + value: + - statement_id: 'qc_sscd' + esig-qualified-legacy-policy: + template: signer1 + subject: signer1 + issuer-cert: interm-qualified + validity: + valid-from: "2012-01-01T00:00:00+0000" + valid-to: "2022-01-01T00:00:00+0000" + extensions: + - id: certificate_policies + value: + - policy_identifier: '0.4.0.1456.1.1' + esig-qualified-legacy-policy-qscd: + template: signer1 + subject: signer1 + issuer-cert: interm-qualified + validity: + valid-from: "2012-01-01T00:00:00+0000" + valid-to: "2022-01-01T00:00:00+0000" + extensions: + - id: certificate_policies + value: + - policy_identifier: '0.4.0.1456.1.2' diff --git a/pyhanko_tests/test_trusted_list.py b/pyhanko_tests/test_trusted_list.py index 9370acfc..dd33647c 100644 --- a/pyhanko_tests/test_trusted_list.py +++ b/pyhanko_tests/test_trusted_list.py @@ -2,14 +2,28 @@ from pathlib import Path import pytest +from certomancer.integrations.illusionist import Illusionist +from certomancer.registry import ArchLabel +from freezegun import freeze_time +from pyhanko_certvalidator import CertificateValidator, ValidationContext from pyhanko_certvalidator.authority import AuthorityWithCert, NamedKeyAuthority -from signing_commons import ECC_INTERM_CERT, FROM_CA, FROM_ECC_CA, INTERM_CERT +from pyhanko_certvalidator.policy_decl import ( + CertRevTrustPolicy, + RevocationCheckingPolicy, + RevocationCheckingRule, +) +from samples import CERTOMANCER +from signing_commons import ECC_INTERM_CERT, FROM_CA, INTERM_CERT from xsdata.formats.dataclass.parsers import XmlParser from xsdata.formats.dataclass.parsers.config import ParserConfig from pyhanko.generated.etsi import ts_119612 -from pyhanko.sign.validation import KeyUsageConstraints, eutl -from pyhanko.sign.validation.eutl import CriteriaCombination +from pyhanko.sign.validation import eutl +from pyhanko.sign.validation.settings import KeyUsageConstraints + +TESTING_CA_QUALIFIED = CERTOMANCER.get_pki_arch( + ArchLabel('testing-ca-qualified') +) def _read_cas_from_file(path: Path): @@ -337,10 +351,10 @@ def test_parse_service_no_type(): @pytest.mark.parametrize( 'criterion', [ - eutl.CriteriaList(CriteriaCombination.ALL, frozenset()), - eutl.CriteriaList(CriteriaCombination.NONE, frozenset()), + eutl.CriteriaList(eutl.CriteriaCombination.ALL, frozenset()), + eutl.CriteriaList(eutl.CriteriaCombination.NONE, frozenset()), eutl.CriteriaList( - CriteriaCombination.ALL, + eutl.CriteriaCombination.ALL, frozenset( [ eutl.CertSubjectDNCriterion( @@ -358,7 +372,7 @@ def test_parse_service_no_type(): ), ), eutl.CriteriaList( - CriteriaCombination.AT_LEAST_ONE, + eutl.CriteriaCombination.AT_LEAST_ONE, frozenset( [ eutl.CertSubjectDNCriterion( @@ -369,7 +383,7 @@ def test_parse_service_no_type(): ), ), eutl.CriteriaList( - CriteriaCombination.NONE, + eutl.CriteriaCombination.NONE, frozenset( [ eutl.KeyUsageCriterion( @@ -390,7 +404,7 @@ def test_criteria_accept(criterion): 'criterion', [ eutl.CriteriaList( - CriteriaCombination.ALL, + eutl.CriteriaCombination.ALL, frozenset( [ eutl.CertSubjectDNCriterion( @@ -401,7 +415,7 @@ def test_criteria_accept(criterion): ), ), eutl.CriteriaList( - CriteriaCombination.NONE, + eutl.CriteriaCombination.NONE, frozenset( [ eutl.CertSubjectDNCriterion( @@ -423,7 +437,8 @@ def _dummy_service_definition(*extra_certs) -> eutl.CAServiceInformation: '', 'test1', provider_certs=(INTERM_CERT, *extra_certs), - additional_info=frozenset(), + additional_info_certificate_type=frozenset(), + other_additional_info=frozenset(), ), qualifications=frozenset(), expired_certs_revocation_info=None, @@ -481,7 +496,8 @@ def test_tsp_registry_multiple_sds(): '', 'test1', # quals are too much work to mock provider_certs=(INTERM_CERT,), - additional_info=frozenset(), + other_additional_info=frozenset(), + additional_info_certificate_type=frozenset(), ), qualifications=frozenset(), expired_certs_revocation_info=None, @@ -494,7 +510,8 @@ def test_tsp_registry_multiple_sds(): '', 'test2', provider_certs=(INTERM_CERT,), - additional_info=frozenset(), + other_additional_info=frozenset(), + additional_info_certificate_type=frozenset(), ), qualifications=frozenset(), expired_certs_revocation_info=None, @@ -506,3 +523,391 @@ def test_tsp_registry_multiple_sds(): ) assert len(result) == 2 assert set(r.base_info.service_name for r in result) == {'test1', 'test2'} + + +@pytest.mark.parametrize( + 'cert_id,expected_prelim_status', + [ + ( + 'esig-qualified', + eutl.QualifiedStatus( + qualified=True, + qc_type=eutl.QcCertType.QC_ESIGN, + qc_key_security=eutl.QcPrivateKeyManagementType.QCSD, + ), + ), + ( + 'eseal-qualified', + eutl.QualifiedStatus( + qualified=True, + qc_type=eutl.QcCertType.QC_ESEAL, + qc_key_security=eutl.QcPrivateKeyManagementType.QCSD, + ), + ), + ( + 'esig-qualified-no-qscd', + eutl.QualifiedStatus( + qualified=True, + qc_type=eutl.QcCertType.QC_ESIGN, + qc_key_security=eutl.QcPrivateKeyManagementType.UNKNOWN, + ), + ), + ('not-qualified', eutl.UNQUALIFIED), + ('not-qualified-nonsense-qcsd', eutl.UNQUALIFIED), + ], +) +def test_qcstatements_processing(cert_id, expected_prelim_status): + cert = TESTING_CA_QUALIFIED.get_cert(cert_id) + result = eutl.QualificationAssessor._process_qc_statements(cert) + assert result == expected_prelim_status + + +@pytest.mark.parametrize( + 'prelim_status,applicable_qualifiers,expected_status_fields', + [ + ( + eutl.QualifiedStatus( + qualified=True, + qc_type=eutl.QcCertType.QC_ESIGN, + qc_key_security=eutl.QcPrivateKeyManagementType.QCSD, + ), + [eutl.Qualifier.NOT_QUALIFIED], + { + 'qualified': False, + 'qc_key_security': eutl.QcPrivateKeyManagementType.UNKNOWN, + }, + ), + ( + eutl.QualifiedStatus( + qualified=False, + qc_type=eutl.QcCertType.QC_ESIGN, + qc_key_security=eutl.QcPrivateKeyManagementType.QCSD, + ), + [eutl.Qualifier.QC_STATEMENT], + { + 'qualified': True, + 'qc_key_security': eutl.QcPrivateKeyManagementType.QCSD, + }, + ), + ( + eutl.QualifiedStatus( + qualified=False, + qc_type=eutl.QcCertType.QC_ESIGN, + qc_key_security=eutl.QcPrivateKeyManagementType.QCSD, + ), + [eutl.Qualifier.WITH_QSCD], + { + 'qualified': False, + 'qc_key_security': eutl.QcPrivateKeyManagementType.UNKNOWN, + }, + ), + ( + eutl.QualifiedStatus( + qualified=True, + qc_type=eutl.QcCertType.QC_ESIGN, + qc_key_security=eutl.QcPrivateKeyManagementType.UNKNOWN, + ), + [eutl.Qualifier.WITH_QSCD], + { + 'qualified': True, + 'qc_key_security': eutl.QcPrivateKeyManagementType.QCSD, + }, + ), + ( + eutl.QualifiedStatus( + qualified=True, + qc_type=eutl.QcCertType.QC_ESIGN, + qc_key_security=eutl.QcPrivateKeyManagementType.UNKNOWN, + ), + [eutl.Qualifier.WITH_SSCD], + { + 'qualified': True, + 'qc_key_security': eutl.QcPrivateKeyManagementType.QCSD, + }, + ), + ( + eutl.QualifiedStatus( + qualified=True, + qc_type=eutl.QcCertType.QC_ESIGN, + qc_key_security=eutl.QcPrivateKeyManagementType.UNKNOWN, + ), + [eutl.Qualifier.FOR_ESEAL], + { + 'qualified': True, + 'qc_type': eutl.QcCertType.QC_ESEAL, + }, + ), + ( + eutl.QualifiedStatus( + qualified=True, + qc_type=eutl.QcCertType.QC_ESEAL, + qc_key_security=eutl.QcPrivateKeyManagementType.UNKNOWN, + ), + [eutl.Qualifier.FOR_ESIG], + { + 'qualified': True, + 'qc_type': eutl.QcCertType.QC_ESIGN, + }, + ), + ( + eutl.QualifiedStatus( + qualified=True, + qc_type=eutl.QcCertType.QC_ESIGN, + qc_key_security=eutl.QcPrivateKeyManagementType.UNKNOWN, + ), + [eutl.Qualifier.FOR_WSA], + { + 'qualified': True, + 'qc_type': eutl.QcCertType.QC_WEB, + }, + ), + ( + eutl.QualifiedStatus( + qualified=True, + qc_type=eutl.QcCertType.QC_ESEAL, + qc_key_security=eutl.QcPrivateKeyManagementType.QCSD, + ), + [eutl.Qualifier.QSCD_MANAGED_ON_BEHALF], + { + 'qc_key_security': ( + eutl.QcPrivateKeyManagementType.QCSD_DELEGATED + ) + }, + ), + ( + eutl.QualifiedStatus( + qualified=True, + qc_type=eutl.QcCertType.QC_ESIGN, + qc_key_security=eutl.QcPrivateKeyManagementType.QCSD, + ), + [eutl.Qualifier.NO_QSCD], + { + 'qualified': True, + 'qc_key_security': eutl.QcPrivateKeyManagementType.UNKNOWN, + }, + ), + ( + eutl.QualifiedStatus( + qualified=True, + qc_type=eutl.QcCertType.QC_ESIGN, + qc_key_security=eutl.QcPrivateKeyManagementType.QCSD, + ), + [eutl.Qualifier.NO_SSCD], + { + 'qualified': True, + 'qc_key_security': eutl.QcPrivateKeyManagementType.UNKNOWN, + }, + ), + ( + eutl.QualifiedStatus( + qualified=True, + qc_type=eutl.QcCertType.QC_ESIGN, + qc_key_security=eutl.QcPrivateKeyManagementType.QCSD, + ), + [eutl.Qualifier.QSCD_AS_IN_CERT], + { + 'qualified': True, + 'qc_key_security': eutl.QcPrivateKeyManagementType.QCSD, + }, + ), + ], +) +def test_tl_override_processing( + prelim_status, applicable_qualifiers, expected_status_fields +): + result = eutl.QualificationAssessor._final_status( + prelim_status, frozenset(applicable_qualifiers) + ) + result_fields = { + key: getattr(result, key) for key in expected_status_fields + } + assert result_fields == expected_status_fields + + +DUMMY_BASE_INFO = eutl.BaseServiceInformation( + service_type=eutl.CA_QC_URI, + service_name='Dummy', + provider_certs=(TESTING_CA_QUALIFIED.get_cert('root'),), + additional_info_certificate_type=frozenset([eutl.QcCertType.QC_ESIGN]), + other_additional_info=frozenset(), +) + +_SKIP_REVOCATION = CertRevTrustPolicy( + RevocationCheckingPolicy( + ee_certificate_rule=RevocationCheckingRule.NO_CHECK, + intermediate_ca_cert_rule=RevocationCheckingRule.NO_CHECK, + ) +) + +MUST_HAVE_POLICY0 = eutl.CriteriaList( + combine_as=eutl.CriteriaCombination.ALL, + criteria=frozenset([eutl.PolicySetCriterion(frozenset(['2.999.31337.0']))]), +) + +MUST_HAVE_POLICY1 = eutl.CriteriaList( + combine_as=eutl.CriteriaCombination.ALL, + criteria=frozenset([eutl.PolicySetCriterion(frozenset(['2.999.31337.1']))]), +) + +MUST_HAVE_NONREPUD = eutl.CriteriaList( + combine_as=eutl.CriteriaCombination.ALL, + criteria=frozenset( + [ + eutl.KeyUsageCriterion( + KeyUsageConstraints(key_usage=('non_repudiation',)) + ) + ] + ), +) + + +@pytest.mark.asyncio +@freeze_time('2020-11-01') +@pytest.mark.parametrize( + 'cert_name,sd', + [ + ( + 'esig-qualified', + eutl.CAServiceInformation( + base_info=DUMMY_BASE_INFO, + qualifications=frozenset(), + expired_certs_revocation_info=None, + ), + ), + ( + 'esig-qualified-no-qscd', + eutl.CAServiceInformation( + base_info=DUMMY_BASE_INFO, + qualifications=frozenset( + [ + eutl.Qualification( + qualifiers=frozenset([eutl.Qualifier.WITH_QSCD]), + criteria_list=MUST_HAVE_POLICY0, + ) + ] + ), + expired_certs_revocation_info=None, + ), + ), + ( + 'not-qualified', + eutl.CAServiceInformation( + base_info=DUMMY_BASE_INFO, + qualifications=frozenset( + [ + eutl.Qualification( + criteria_list=MUST_HAVE_POLICY0, + qualifiers=frozenset( + [ + eutl.Qualifier.QC_STATEMENT, + eutl.Qualifier.WITH_QSCD, + ] + ), + ) + ] + ), + expired_certs_revocation_info=None, + ), + ), + ( + 'not-qualified', + eutl.CAServiceInformation( + base_info=DUMMY_BASE_INFO, + qualifications=frozenset( + [ + eutl.Qualification( + criteria_list=MUST_HAVE_POLICY0, + qualifiers=frozenset( + [ + eutl.Qualifier.QC_STATEMENT, + ] + ), + ), + eutl.Qualification( + criteria_list=MUST_HAVE_NONREPUD, + qualifiers=frozenset( + [ + eutl.Qualifier.WITH_QSCD, + ] + ), + ), + ] + ), + expired_certs_revocation_info=None, + ), + ), + ( + 'esig-qualified', + eutl.CAServiceInformation( + base_info=DUMMY_BASE_INFO, + qualifications=frozenset( + [ + eutl.Qualification( + criteria_list=MUST_HAVE_POLICY1, + qualifiers=frozenset( + [ + eutl.Qualifier.NO_QSCD, + ] + ), + ), + ] + ), + expired_certs_revocation_info=None, + ), + ), + ], +) +async def test_conclude_qualified_qcsd(cert_name, sd): + ee_cert = TESTING_CA_QUALIFIED.get_cert(cert_name) + vc = ValidationContext( + trust_roots=[TESTING_CA_QUALIFIED.get_cert('root')], + allow_fetching=False, + revinfo_policy=_SKIP_REVOCATION, + other_certs=[TESTING_CA_QUALIFIED.get_cert('interm-qualified')], + ) + cv = CertificateValidator(end_entity_cert=ee_cert, validation_context=vc) + path = await cv.async_validate_path() + registry = eutl.TSPRegistry() + registry.register_ca(sd) + assessor = eutl.QualificationAssessor(tsp_registry=registry) + status = assessor.check_entity_cert_qualified(path) + assert status.qualified + assert status.qc_key_security == eutl.QcPrivateKeyManagementType.QCSD + + +@pytest.mark.asyncio +@freeze_time('2015-11-01') +@pytest.mark.parametrize( + 'cert_name,expect_qscd', + [ + ('esig-qualified-legacy-policy', False), + ('esig-qualified-legacy-policy-qscd', True), + ], +) +async def test_conclude_qualified_pre_eidas(cert_name, expect_qscd): + sd = eutl.CAServiceInformation( + base_info=DUMMY_BASE_INFO, + qualifications=frozenset(), + expired_certs_revocation_info=None, + ) + ee_cert = TESTING_CA_QUALIFIED.get_cert(cert_name) + vc = ValidationContext( + trust_roots=[TESTING_CA_QUALIFIED.get_cert('root')], + allow_fetching=False, + revinfo_policy=_SKIP_REVOCATION, + other_certs=[TESTING_CA_QUALIFIED.get_cert('interm-qualified')], + ) + cv = CertificateValidator(end_entity_cert=ee_cert, validation_context=vc) + path = await cv.async_validate_path() + registry = eutl.TSPRegistry() + registry.register_ca(sd) + assessor = eutl.QualificationAssessor(tsp_registry=registry) + status = assessor.check_entity_cert_qualified(path) + assert status.qualified + if expect_qscd: + assert ( + status.qc_key_security + == eutl.QcPrivateKeyManagementType.QCSD_BY_POLICY + ) + else: + assert status.qc_key_security == eutl.QcPrivateKeyManagementType.UNKNOWN