From 7bdde73d817b8a352aa5a606358ad25fdb384267 Mon Sep 17 00:00:00 2001 From: Matthias Valvekens Date: Thu, 28 Dec 2023 01:04:40 +0100 Subject: [PATCH] Implement service history parsing --- .../sign/validation/qualified/eutl_parse.py | 102 ++++++++++-- pyhanko/sign/validation/qualified/tsp.py | 12 +- pyhanko_tests/test_trusted_list.py | 150 +++++++++++++++++- 3 files changed, 238 insertions(+), 26 deletions(-) diff --git a/pyhanko/sign/validation/qualified/eutl_parse.py b/pyhanko/sign/validation/qualified/eutl_parse.py index 79e6da29..bb17f9b8 100644 --- a/pyhanko/sign/validation/qualified/eutl_parse.py +++ b/pyhanko/sign/validation/qualified/eutl_parse.py @@ -1,9 +1,11 @@ +import itertools import logging from datetime import datetime from typing import ( FrozenSet, Generator, Iterable, + List, Optional, Set, Tuple, @@ -56,6 +58,16 @@ PREFERRED_LANGUAGE: str = 'en' +def _service_name_from_intl_string( + intl_string: Optional[ts_119612.InternationalNamesType], +) -> str: + return ( + _extract_from_intl_string(intl_string.name) + if intl_string + else "unknown" + ) + + def _extract_from_intl_string( intl_string: Tuple[ Union[ts_119612.MultiLangStringType, ts_119612.MultiLangNormStringType], @@ -228,9 +240,7 @@ def _interpret_historical_service_info_for_ca( ) ) ) - service_name = None - if service_info.service_name: - service_name = _extract_from_intl_string(service_info.service_name.name) + service_name = _service_name_from_intl_string(service_info.service_name) qualifications: FrozenSet[Qualification] = frozenset() expired_revinfo_date = None additional_info = [] @@ -262,7 +272,6 @@ def _interpret_historical_service_info_for_ca( except KeyError: additional_info.append(additional_info_entry) elif ext.critical: - # TODO more informative exception / only ditch the current SDI raise TSPServiceParsingError( f"Cannot process a critical extension " f"in service named '{service_name}'.\n" @@ -271,8 +280,9 @@ def _interpret_historical_service_info_for_ca( valid_from_date = service_info.status_starting_time if valid_from_date is None: raise TSPServiceParsingError( - "The validity start of the current status of the the service named " - f"{service_name} is not known. This is an error." + f"The validity start of the status of " + f"the the service named {service_name} is not known. " + f"This is an error." ) base_service_info = BaseServiceInformation( service_type=_required( @@ -280,7 +290,7 @@ def _interpret_historical_service_info_for_ca( ), valid_from=valid_from_date.to_datetime(), valid_until=next_update_at, - service_name=service_name or "unknown", + service_name=service_name, provider_certs=tuple(certs), additional_info_certificate_type=frozenset(asi_qc_type), other_additional_info=frozenset(additional_info), @@ -293,9 +303,53 @@ def _interpret_historical_service_info_for_ca( ) +def _read_service_history(history_items, validity_start, service_name): + errors_encountered = [] + item_index_sorted_by_date = sorted( + ( + (orig_ix, item.status_starting_time.to_datetime()) + for orig_ix, item in enumerate(history_items) + if item.status_starting_time + ), + key=lambda t: t[1], + reverse=True, + ) + end_of_validity_by_orig_ix = { + orig_ix: next_start + for (orig_ix, cur_start), next_start in zip( + item_index_sorted_by_date, + itertools.chain( + (validity_start.to_datetime(),), + (st for _, st in item_index_sorted_by_date[:-1]), + ), + ) + } + + for orig_ix, validity_end in end_of_validity_by_orig_ix.items(): + history_item = history_items[orig_ix] + if history_item.service_status != STATUS_GRANTED: + continue + try: + validity_end = end_of_validity_by_orig_ix[orig_ix] + yield _interpret_historical_service_info_for_ca( + history_item, + next_update_at=validity_end, + ) + except TSPServiceParsingError as e: + logger.debug( + f"Failed to parse item {orig_ix + 1} in history " + f"of service {service_name}. This history " + f"entry will not be processed further.", + exc_info=e, + ) + errors_encountered.append(e) + return errors_encountered + + def _interpret_service_info_for_cas( services: Iterable[ts_119612.TSPService], ): + errors_encountered = [] for service in services: service_info = service.service_information if ( @@ -304,14 +358,33 @@ def _interpret_service_info_for_cas( ): continue + service_name = _service_name_from_intl_string(service_info.service_name) # TODO allow the user to specify if they also want to include # other statuses (e.g. national level) # TODO evaluate historical definitions too in case of point-in-time # work, store that info on the object - if service_info.service_status != STATUS_GRANTED: - continue - # TODO process errors in individual services - yield _interpret_service_info_for_ca(service) + if service_info.service_status == STATUS_GRANTED: + try: + yield _interpret_service_info_for_ca(service) + except TSPServiceParsingError as e: + logger.warning( + f"Failed to process current status " + f"of service {service_name}. This history " + f"entry will not be processed further.", + exc_info=e, + ) + errors_encountered.append(e) + continue + + validity_start = service_info.status_starting_time + if validity_start and service.service_history: + history_items = service.service_history.service_history_instance + history_errors = yield from _read_service_history( + history_items, validity_start, service_name + ) + errors_encountered.extend(history_errors) + + return errors_encountered def _raw_tl_parse(tl_xml: str) -> ts_119612.TrustServiceStatusList: @@ -329,10 +402,13 @@ def _raw_tl_parse(tl_xml: str) -> ts_119612.TrustServiceStatusList: # TODO introduce a similar method for other types of service (TSAs etc) def read_qualified_certificate_authorities( tl_xml: str, -) -> Generator[CAServiceInformation, None, None]: +) -> Generator[CAServiceInformation, None, List[TSPServiceParsingError]]: parse_result = _raw_tl_parse(tl_xml) tspl = parse_result.trust_service_provider_list + errors_encountered = [] for tsp in _required(tspl, "TSP list").trust_service_provider: - yield from _interpret_service_info_for_cas( + tsp_errors = yield from _interpret_service_info_for_cas( _required(tsp.tspservices, "TSP services").tspservice ) + errors_encountered.extend(tsp_errors) + return errors_encountered diff --git a/pyhanko/sign/validation/qualified/tsp.py b/pyhanko/sign/validation/qualified/tsp.py index f633eb1a..816e9db4 100644 --- a/pyhanko/sign/validation/qualified/tsp.py +++ b/pyhanko/sign/validation/qualified/tsp.py @@ -2,16 +2,7 @@ from collections import defaultdict from dataclasses import dataclass from datetime import datetime -from typing import ( - Dict, - FrozenSet, - Generator, - Iterable, - Optional, - Set, - Tuple, - Union, -) +from typing import Dict, FrozenSet, Generator, Iterable, Optional, Set, Tuple from asn1crypto import x509 from pyhanko_certvalidator.authority import ( @@ -36,7 +27,6 @@ 'TSPTrustManager', 'QcCertType', 'AdditionalServiceInformation', - 'TSPServiceParsingError', 'BaseServiceInformation', 'Qualifier', 'Criterion', diff --git a/pyhanko_tests/test_trusted_list.py b/pyhanko_tests/test_trusted_list.py index 51404cb9..838f276b 100644 --- a/pyhanko_tests/test_trusted_list.py +++ b/pyhanko_tests/test_trusted_list.py @@ -18,6 +18,10 @@ from pyhanko.generated.etsi import ts_119612 from pyhanko.sign.validation.qualified import assess, eutl_parse, q_status, tsp +from pyhanko.sign.validation.qualified.eutl_parse import ( + CA_QC_URI, + STATUS_GRANTED, +) from pyhanko.sign.validation.settings import KeyUsageConstraints from .samples import CERTOMANCER @@ -50,7 +54,10 @@ def _raw_tlservice_parse(xml: str) -> ts_119612.TSPService: def test_parse_cas_from_real_tl_smoke_test(): - assert len(_read_cas_from_file(TEST_REAL_TL)) == 52 + cas_read = _read_cas_from_file(TEST_REAL_TL) + current_cas = [ca for ca in cas_read if not ca.base_info.valid_until] + assert len(current_cas) == 52 + assert len(cas_read) == 73 ETSI_NS = 'http://uri.etsi.org' @@ -89,7 +96,7 @@ def test_parse_ca_with_unsupported_critical_qualifier(): parse_result = _raw_tlservice_parse(xml) with pytest.raises( - tsp.TSPServiceParsingError, + eutl_parse.TSPServiceParsingError, match="critical", ): eutl_parse._interpret_service_info_for_ca(parse_result) @@ -1158,3 +1165,142 @@ async def test_conclude_qualified_convergence(): assessor = assess.QualificationAssessor(tsp_registry=registry) status = assessor.check_entity_cert_qualified(path) assert status.qualified + + +def test_parse_service_history_intervals(): + xml = f""" + + + Test + {CA_QC_URI} + {STATUS_GRANTED} + + + 2020-11-01T00:00:00Z + + + + + Test + {CA_QC_URI} + {STATUS_GRANTED} + + + 2017-11-01T00:00:00Z + + + + Test + {CA_QC_URI} + {STATUS_GRANTED} + + + 2019-11-01T00:00:00Z + + + + + """ + + parse_result = _raw_tlservice_parse(xml) + result = eutl_parse._interpret_service_info_for_cas([parse_result]) + date1 = datetime(2017, 11, 1, tzinfo=timezone.utc) + date2 = datetime(2019, 11, 1, tzinfo=timezone.utc) + date3 = datetime(2020, 11, 1, tzinfo=timezone.utc) + intervals = [ + (r.base_info.valid_from, r.base_info.valid_until) for r in result + ] + assert intervals == [(date3, None), (date2, date3), (date1, date2)] + + +def test_parse_service_history_intervals_skip_not_granted(): + xml = f""" + + + Test + {CA_QC_URI} + {STATUS_GRANTED} + + + 2020-11-01T00:00:00Z + + + + + Test + {CA_QC_URI} + {STATUS_GRANTED} + + + 2017-11-01T00:00:00Z + + + + Test + {CA_QC_URI} + urn:blah + + + 2019-11-01T00:00:00Z + + + + + """ + + parse_result = _raw_tlservice_parse(xml) + result = eutl_parse._interpret_service_info_for_cas([parse_result]) + date1 = datetime(2017, 11, 1, tzinfo=timezone.utc) + date2 = datetime(2019, 11, 1, tzinfo=timezone.utc) + date3 = datetime(2020, 11, 1, tzinfo=timezone.utc) + intervals = [ + (r.base_info.valid_from, r.base_info.valid_until) for r in result + ] + assert intervals == [ + (date3, None), + # gap where status is not granted + (date1, date2), + ] + + +def test_parse_service_history_intervals_skip_invalid_entries(): + xml = f""" + + + Test + {CA_QC_URI} + {STATUS_GRANTED} + + + 2020-11-01T00:00:00Z + + + + + Test + {CA_QC_URI} + {STATUS_GRANTED} + + + 2017-11-01T00:00:00Z + + + + Test + {STATUS_GRANTED} + 2019-11-01T00:00:00Z + + + + """ + + parse_result = _raw_tlservice_parse(xml) + result = eutl_parse._interpret_service_info_for_cas([parse_result]) + date2 = datetime(2020, 11, 1, tzinfo=timezone.utc) + intervals = [ + (r.base_info.valid_from, r.base_info.valid_until) for r in result + ] + assert len(intervals) == 2 + assert intervals[0] == (date2, None) + # don't assert on second interval for now; let's call + # that one undefined behaviour