Skip to content

Commit

Permalink
Implement TSP registry
Browse files Browse the repository at this point in the history
  • Loading branch information
MatthiasValvekens committed Dec 12, 2023
1 parent b174866 commit 83b2515
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 1 deletion.
21 changes: 21 additions & 0 deletions pyhanko/sign/validation/eutl.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
import enum
import logging
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from typing import (
Dict,
FrozenSet,
Generator,
Iterable,
List,
Optional,
Set,
Tuple,
TypeVar,
Union,
)

from asn1crypto import x509
from pyhanko_certvalidator.authority import Authority, AuthorityWithCert
from pyhanko_certvalidator.errors import InvalidCertificateError
from xsdata.formats.dataclass.parsers import XmlParser
from xsdata.formats.dataclass.parsers.config import ParserConfig
Expand Down Expand Up @@ -404,3 +409,19 @@ def read_qualified_certificate_authorities(
yield from _interpret_service_info_for_cas(
_required(tsp.tspservices, "TSP services").tspservice
)


class TSPRegistry:
def __init__(self: 'TSPRegistry'):
self._cert_to_si: Dict[
Authority, Set[CAServiceInformation]
] = defaultdict(set)

def register_ca(self, ca_service_info: CAServiceInformation):
for cert in ca_service_info.base_info.provider_certs:
self._cert_to_si[AuthorityWithCert(cert)].add(ca_service_info)

def applicable_service_definitions(
self, ca: Authority
) -> Iterable[CAServiceInformation]:
return tuple(self._cert_to_si[ca])
94 changes: 93 additions & 1 deletion pyhanko_tests/test_trusted_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from pathlib import Path

import pytest
from signing_commons import FROM_CA
from pyhanko_certvalidator.authority import AuthorityWithCert, NamedKeyAuthority
from signing_commons import ECC_INTERM_CERT, FROM_CA, FROM_ECC_CA, INTERM_CERT
from xsdata.formats.dataclass.parsers import XmlParser
from xsdata.formats.dataclass.parsers.config import ParserConfig

Expand Down Expand Up @@ -414,3 +415,94 @@ def test_criteria_accept(criterion):
)
def test_criteria_reject(criterion):
assert not criterion.matches(FROM_CA.signing_cert)


def _dummy_service_definition(*extra_certs) -> eutl.CAServiceInformation:
return eutl.CAServiceInformation(
eutl.BaseServiceInformation(
'',
'test1',
provider_certs=(INTERM_CERT, *extra_certs),
additional_info=frozenset(),
),
qualifications=frozenset(),
expired_certs_revocation_info=None,
)


def test_tsp_registry():
registry = eutl.TSPRegistry()
registry.register_ca(_dummy_service_definition())

result = list(
registry.applicable_service_definitions(AuthorityWithCert(INTERM_CERT))
)
assert len(result) == 1
assert result[0].base_info.service_name == 'test1'


def test_tsp_registry_by_name():
registry = eutl.TSPRegistry()
registry.register_ca(_dummy_service_definition())

result = list(
registry.applicable_service_definitions(
NamedKeyAuthority(INTERM_CERT.subject, INTERM_CERT.public_key)
)
)
assert len(result) == 1
assert result[0].base_info.service_name == 'test1'


def test_tsp_registry_alternative_cert():
registry = eutl.TSPRegistry()
registry.register_ca(_dummy_service_definition(ECC_INTERM_CERT))

result = list(
registry.applicable_service_definitions(AuthorityWithCert(INTERM_CERT))
)

result2 = list(
registry.applicable_service_definitions(
AuthorityWithCert(ECC_INTERM_CERT)
)
)
assert result == result2
assert len(result) == 1
assert result[0].base_info.service_name == 'test1'


def test_tsp_registry_multiple_sds():
registry = eutl.TSPRegistry()
# multiple SDs with the same issuer
registry.register_ca(
eutl.CAServiceInformation(
eutl.BaseServiceInformation(
'',
'test1', # quals are too much work to mock
provider_certs=(INTERM_CERT,),
additional_info=frozenset(),
),
qualifications=frozenset(),
expired_certs_revocation_info=None,
)
)

registry.register_ca(
eutl.CAServiceInformation(
eutl.BaseServiceInformation(
'',
'test2',
provider_certs=(INTERM_CERT,),
additional_info=frozenset(),
),
qualifications=frozenset(),
expired_certs_revocation_info=None,
)
)

result = list(
registry.applicable_service_definitions(AuthorityWithCert(INTERM_CERT))
)
assert len(result) == 2
assert set(r.base_info.service_name for r in result) == {'test1', 'test2'}

0 comments on commit 83b2515

Please sign in to comment.