Skip to content

Commit

Permalink
Qualification support WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
MatthiasValvekens committed Dec 1, 2023
1 parent 6eaa5cd commit cafdec5
Showing 1 changed file with 213 additions and 0 deletions.
213 changes: 213 additions & 0 deletions pyhanko/sign/validation/eutl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
import enum
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Set, Tuple, Union

from asn1crypto import x509
from pyhanko_certvalidator import InvalidCertificateError

from pyhanko.generated.etsi import ts_119612, ts_119612_sie
from pyhanko.sign.validation import KeyUsageConstraints

CA_QC_URI = 'http://uri.etsi.org/TrstSvc/Svctype/CA/QC'
QTST_URI = 'http://uri.etsi.org/TrstSvc/Svctype/TSA/QTST'
STATUS_GRANTED = 'http://uri.etsi.org/TrstSvc/TrustedList/Svcstatus/granted'


@dataclass(frozen=True)
class BaseServiceInformation:
service_type: str
service_name: str
provider_certs: List[x509.Certificate]


class Qualifier(enum.Enum):
WITH_SSCD = 'QCWithSSCD'
NO_SSCD = 'QCNoSSCD'
SSCD_AS_IN_CERT = 'QCSSCDStatusAsInCert'
WITH_QSCD = 'QCWithQSCD'
NO_QSCD = 'QCNoQSCD'
QSCD_AS_IN_CERT = 'QCQSCDStatusAsInCert'
QSCD_MANAGED = 'QCQSCDManagedOnBehalf'
LEGAL_PERSON = 'QCForLegalPerson'
FOR_ESIG = 'QCForEsig'
FOR_ESEAL = 'QCForEseal'
FOR_WSA = 'QCForWSA'
NOT_QUALIFIED = 'NotQualified'
QC_STATEMENT = 'QCStatement'

@property
def uri(self):
return (
f"http://uri.etsi.org/TrstSvc/TrustedList/SvcInfoExt/{self.value}"
)


class CriteriaAssertionType(enum.Enum):
NONE = 0
AT_LEAST_ONE = 1
ALL = 2


class Criterion:
def matches(self, cert: x509.Certificate):
raise NotImplementedError


@dataclass(frozen=True)
class KeyUsageCriterion(Criterion):
settings: KeyUsageConstraints

def matches(self, cert: x509.Certificate):
try:
self.settings.validate(cert)
return True
except InvalidCertificateError:
return False


@dataclass(frozen=True)
class PolicySetCriterion(Criterion):
required_policy_oids: Set[str]

def matches(self, cert: x509.Certificate):
policy_ext = cert.certificate_policies_value or ()
found_policies = {pol['policy_identifier'].dotted for pol in policy_ext}
return self.required_policy_oids.issubset(found_policies)


@dataclass(frozen=True)
class CertSubjectDNCriterion(Criterion):
required_rdn_part_oids: Set[str]

def matches(self, cert: x509.Certificate):
subject_dn: x509.Name = cert.subject
found_rdn_part_oids = {
pair['type'].dotted for rdn in subject_dn.chosen for pair in rdn
}
return self.required_rdn_part_oids.issubset(found_rdn_part_oids)


@dataclass(frozen=True)
class Qualification:
qualifiers: List[Qualifier]
criteria_assertion_type: CriteriaAssertionType
criteria_list: List[Criterion]


@dataclass(frozen=True)
class CAServiceInformation:
base_info: BaseServiceInformation
qualifications: List[Qualification]
expired_certs_revocation_info: Optional[datetime]


# TODO make this somehow customisable
PREFERRED_LANGUAGE: str = 'en'


def _extract_from_intl_string(
intl_string: Tuple[
Union[ts_119612.MultiLangStringType, ts_119612.MultiLangNormStringType],
...,
]
):
first_value = intl_string[0].value
for part in intl_string:
if part.lang == PREFERRED_LANGUAGE:
return part.value
return first_value


def _as_certs(sdi: ts_119612.ServiceDigitalIdentity):
for digital_id in sdi.digital_id:
cert_bytes = digital_id.x509_certificate
if cert_bytes:
yield x509.Certificate.load(cert_bytes)


def _get_qualifications(qualifications: ts_119612_sie.Qualifications):
for qual in qualifications.qualification_element:
criteria = []
if qual.criteria_list.policy_set:

Check failure on line 132 in pyhanko/sign/validation/eutl.py

View workflow job for this annotation

GitHub Actions / mypy

Item "None" of "Optional[CriteriaListType]" has no attribute "policy_set" [union-attr]
criteria.append(
# TODO also take policy qualifiers into account
PolicySetCriterion(
required_policy_oids={
oid.identifier.value

Check failure on line 137 in pyhanko/sign/validation/eutl.py

View workflow job for this annotation

GitHub Actions / mypy

Item "None" of "Union[IdentifierType, None, Any]" has no attribute "value" [union-attr]
for policy in qual.criteria_list.policy_set

Check failure on line 138 in pyhanko/sign/validation/eutl.py

View workflow job for this annotation

GitHub Actions / mypy

Item "None" of "Optional[CriteriaListType]" has no attribute "policy_set" [union-attr]
for oid in policy.policy_identifier
}
)
)
if qual.criteria_list.key_usage:

Check failure on line 143 in pyhanko/sign/validation/eutl.py

View workflow job for this annotation

GitHub Actions / mypy

Item "None" of "Optional[CriteriaListType]" has no attribute "key_usage" [union-attr]
key_usage_must_have = {
bit.name.name.lower()

Check failure on line 145 in pyhanko/sign/validation/eutl.py

View workflow job for this annotation

GitHub Actions / mypy

Item "None" of "Union[KeyUsageBitTypename, None, Any]" has no attribute "name" [union-attr]
for ku in qual.criteria_list.key_usage

Check failure on line 146 in pyhanko/sign/validation/eutl.py

View workflow job for this annotation

GitHub Actions / mypy

Item "None" of "Optional[CriteriaListType]" has no attribute "key_usage" [union-attr]
for bit in ku.key_usage_bit
if bit.value == True
}
key_usage_forbidden = {
bit.name.name.lower()

Check failure on line 151 in pyhanko/sign/validation/eutl.py

View workflow job for this annotation

GitHub Actions / mypy

Item "None" of "Union[KeyUsageBitTypename, None, Any]" has no attribute "name" [union-attr]
for ku in qual.criteria_list.key_usage

Check failure on line 152 in pyhanko/sign/validation/eutl.py

View workflow job for this annotation

GitHub Actions / mypy

Item "None" of "Optional[CriteriaListType]" has no attribute "key_usage" [union-attr]
for bit in ku.key_usage_bit
if bit.value == False
}
# TODO check EKUs
criteria.append(
KeyUsageCriterion(

Check failure on line 158 in pyhanko/sign/validation/eutl.py

View workflow job for this annotation

GitHub Actions / mypy

Argument 1 to "append" of "list" has incompatible type "KeyUsageCriterion"; expected "PolicySetCriterion" [arg-type]
settings=KeyUsageConstraints(
key_usage=key_usage_must_have,
key_usage_forbidden=key_usage_forbidden,
)
)
)


def _interpret_service_info_for_cas(
services: List[ts_119612.TSPService],
):
for service in services:
service_info = service.service_information
if (
not service_info
or service_info.service_type_identifier != CA_QC_URI
):
continue
certs = list(_as_certs(service_info.service_digital_identity))

Check failure on line 177 in pyhanko/sign/validation/eutl.py

View workflow job for this annotation

GitHub Actions / mypy

Argument 1 to "_as_certs" has incompatible type "Optional[ServiceDigitalIdentity]"; expected "ServiceDigitalIdentity" [arg-type]

# TODO allow the user to specify if they also want to include
# other statuses (e.g. national level)
# TODO evaluate historical definitions too in case of point-in-time
# work, store that info on the object
if service_info.service_status != STATUS_GRANTED:
continue
service_name = None
if service_info.service_name:
service_name = _extract_from_intl_string(
service_info.service_name.name
)
base_service_info = BaseServiceInformation(
service_type=service_info.service_type_identifier,
service_name=service_name or "unknown",
provider_certs=certs,
)
qualifications = []
expired_revinfo_date = None
for ext in service_info.service_information_extensions.extension:
ext_content = ext.content
if isinstance(ext_content, ts_119612_sie.Qualifications):
qualifications = list(_get_qualifications(ext_content))
elif isinstance(ext_content, ts_119612.ExpiredCertsRevocationInfo):
expired_revinfo_date = ext_content.value.to_datetime()
elif ext.critical:
# TODO more informative exception / only ditch the current SDI
raise ValueError(
f"Cannot process a critical extension "
f"in {base_service_info}"
)
return CAServiceInformation(
base_info=base_service_info,
qualifications=qualifications,
expired_certs_revocation_info=expired_revinfo_date,
)

0 comments on commit cafdec5

Please sign in to comment.