diff --git a/pyhanko/sign/validation/eutl.py b/pyhanko/sign/validation/eutl.py new file mode 100644 index 00000000..428b63b3 --- /dev/null +++ b/pyhanko/sign/validation/eutl.py @@ -0,0 +1,213 @@ +import enum +from dataclasses import dataclass +from datetime import datetime +from typing import List, Optional, Set, Tuple, Union + +from asn1crypto import x509 +from pyhanko_certvalidator import InvalidCertificateError + +from pyhanko.generated.etsi import ts_119612, ts_119612_sie +from pyhanko.sign.validation import KeyUsageConstraints + +CA_QC_URI = 'http://uri.etsi.org/TrstSvc/Svctype/CA/QC' +QTST_URI = 'http://uri.etsi.org/TrstSvc/Svctype/TSA/QTST' +STATUS_GRANTED = 'http://uri.etsi.org/TrstSvc/TrustedList/Svcstatus/granted' + + +@dataclass(frozen=True) +class BaseServiceInformation: + service_type: str + service_name: str + provider_certs: List[x509.Certificate] + + +class Qualifier(enum.Enum): + WITH_SSCD = 'QCWithSSCD' + NO_SSCD = 'QCNoSSCD' + SSCD_AS_IN_CERT = 'QCSSCDStatusAsInCert' + WITH_QSCD = 'QCWithQSCD' + NO_QSCD = 'QCNoQSCD' + QSCD_AS_IN_CERT = 'QCQSCDStatusAsInCert' + QSCD_MANAGED = 'QCQSCDManagedOnBehalf' + LEGAL_PERSON = 'QCForLegalPerson' + FOR_ESIG = 'QCForEsig' + FOR_ESEAL = 'QCForEseal' + FOR_WSA = 'QCForWSA' + NOT_QUALIFIED = 'NotQualified' + QC_STATEMENT = 'QCStatement' + + @property + def uri(self): + return ( + f"http://uri.etsi.org/TrstSvc/TrustedList/SvcInfoExt/{self.value}" + ) + + +class CriteriaAssertionType(enum.Enum): + NONE = 0 + AT_LEAST_ONE = 1 + ALL = 2 + + +class Criterion: + def matches(self, cert: x509.Certificate): + raise NotImplementedError + + +@dataclass(frozen=True) +class KeyUsageCriterion(Criterion): + settings: KeyUsageConstraints + + def matches(self, cert: x509.Certificate): + try: + self.settings.validate(cert) + return True + except InvalidCertificateError: + return False + + +@dataclass(frozen=True) +class PolicySetCriterion(Criterion): + required_policy_oids: Set[str] + + def matches(self, cert: x509.Certificate): + policy_ext = cert.certificate_policies_value or () + found_policies = {pol['policy_identifier'].dotted for pol in policy_ext} + return self.required_policy_oids.issubset(found_policies) + + +@dataclass(frozen=True) +class CertSubjectDNCriterion(Criterion): + required_rdn_part_oids: Set[str] + + def matches(self, cert: x509.Certificate): + subject_dn: x509.Name = cert.subject + found_rdn_part_oids = { + pair['type'].dotted for rdn in subject_dn.chosen for pair in rdn + } + return self.required_rdn_part_oids.issubset(found_rdn_part_oids) + + +@dataclass(frozen=True) +class Qualification: + qualifiers: List[Qualifier] + criteria_assertion_type: CriteriaAssertionType + criteria_list: List[Criterion] + + +@dataclass(frozen=True) +class CAServiceInformation: + base_info: BaseServiceInformation + qualifications: List[Qualification] + expired_certs_revocation_info: Optional[datetime] + + +# TODO make this somehow customisable +PREFERRED_LANGUAGE: str = 'en' + + +def _extract_from_intl_string( + intl_string: Tuple[ + Union[ts_119612.MultiLangStringType, ts_119612.MultiLangNormStringType], + ..., + ] +): + first_value = intl_string[0].value + for part in intl_string: + if part.lang == PREFERRED_LANGUAGE: + return part.value + return first_value + + +def _as_certs(sdi: ts_119612.ServiceDigitalIdentity): + for digital_id in sdi.digital_id: + cert_bytes = digital_id.x509_certificate + if cert_bytes: + yield x509.Certificate.load(cert_bytes) + + +def _get_qualifications(qualifications: ts_119612_sie.Qualifications): + for qual in qualifications.qualification_element: + criteria = [] + if qual.criteria_list.policy_set: + criteria.append( + # TODO also take policy qualifiers into account + PolicySetCriterion( + required_policy_oids={ + oid.identifier.value + for policy in qual.criteria_list.policy_set + for oid in policy.policy_identifier + } + ) + ) + if qual.criteria_list.key_usage: + key_usage_must_have = { + bit.name.name.lower() + for ku in qual.criteria_list.key_usage + for bit in ku.key_usage_bit + if bit.value == True + } + key_usage_forbidden = { + bit.name.name.lower() + for ku in qual.criteria_list.key_usage + for bit in ku.key_usage_bit + if bit.value == False + } + # TODO check EKUs + criteria.append( + KeyUsageCriterion( + settings=KeyUsageConstraints( + key_usage=key_usage_must_have, + key_usage_forbidden=key_usage_forbidden, + ) + ) + ) + + +def _interpret_service_info_for_cas( + services: List[ts_119612.TSPService], +): + for service in services: + service_info = service.service_information + if ( + not service_info + or service_info.service_type_identifier != CA_QC_URI + ): + continue + certs = list(_as_certs(service_info.service_digital_identity)) + + # TODO allow the user to specify if they also want to include + # other statuses (e.g. national level) + # TODO evaluate historical definitions too in case of point-in-time + # work, store that info on the object + if service_info.service_status != STATUS_GRANTED: + continue + service_name = None + if service_info.service_name: + service_name = _extract_from_intl_string( + service_info.service_name.name + ) + base_service_info = BaseServiceInformation( + service_type=service_info.service_type_identifier, + service_name=service_name or "unknown", + provider_certs=certs, + ) + qualifications = [] + expired_revinfo_date = None + for ext in service_info.service_information_extensions.extension: + ext_content = ext.content + if isinstance(ext_content, ts_119612_sie.Qualifications): + qualifications = list(_get_qualifications(ext_content)) + elif isinstance(ext_content, ts_119612.ExpiredCertsRevocationInfo): + expired_revinfo_date = ext_content.value.to_datetime() + elif ext.critical: + # TODO more informative exception / only ditch the current SDI + raise ValueError( + f"Cannot process a critical extension " + f"in {base_service_info}" + ) + return CAServiceInformation( + base_info=base_service_info, + qualifications=qualifications, + expired_certs_revocation_info=expired_revinfo_date, + )