From 64e0603bbe36316842208b943814e9e9a48d7fe2 Mon Sep 17 00:00:00 2001 From: Thore Sommer Date: Mon, 2 Aug 2021 16:26:02 +0200 Subject: [PATCH] failure: add infrastructure to tag and collect revocation events in Keylime Currently Keylime operates in a binary state when it comes failures that cause revocations and does not collect information where and why that revocation happened. This commit introduces the tagging and collection infrastructure and configuration for revocation events and adding context to them. SeverityLabel Has a name that can be set in the keylime.conf and a severity that is dynamically calculated based on the order in the configuration. Components Is a enumeration that contains the main components of Keylime that can cause events. Must be specified when creating a Failure object. Event Holds a revocation event. An event can be identified by their event_id which has the format: "component.[sub_component].event" The severity is automatically assigned based on the event_id. The event contains a context which is string encoded JSON object. An event must be marked irrecoverable if other validation steps are skipped by early returning. Failure Holds a collection of events and provides add_event(...) for adding new events to it and merge(...) to merge it with another Failure object. Is False if no events are currently in the Failure object and is otherwise True. Future changes that cause a revocation must extend and return a Failure object instead of returning a boolean value. Part of enhancement proposal https://github.com/keylime/enhancements/issues/48 Signed-off-by: Thore Sommer --- keylime.conf | 11 +++ keylime/failure.py | 198 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 209 insertions(+) create mode 100644 keylime/failure.py diff --git a/keylime.conf b/keylime.conf index f20a4d9bc..0b24adcea 100644 --- a/keylime.conf +++ b/keylime.conf @@ -278,6 +278,17 @@ measured_boot_policy_name = accept-all # The default value for this config item is the empty string. # measured_boot_imports = keylime.elchecking +# Severity labels for revocation events strictly ordered from least severity to +# highest severtiy. +severity_labels = ["info", "notice", "warning", "error", "critical", "alert", "emergency"] + +# Severity policy that matches different event_ids to the severity label. +# The rules are evaluated from the beginning of the list and the first match is +# used. The event_id can also be a regex. Default policy assigns the highest +# severity to all events. +severity_policy = [{"event_id": ".*", "severity_label": "emerg"}] + + #============================================================================= [tenant] #============================================================================= diff --git a/keylime/failure.py b/keylime/failure.py new file mode 100644 index 000000000..bbba4fbbd --- /dev/null +++ b/keylime/failure.py @@ -0,0 +1,198 @@ +''' +SPDX-License-Identifier: Apache-2.0 +Copyright 2021 Thore Sommer + +Tagging of failure events that might cause revocation in Keylime. +''' +import ast +import dataclasses +import enum +import functools +import json +import re +from typing import List, Optional, Tuple, Callable, Union + +from keylime import config +from keylime import keylime_logging + +logger = keylime_logging.init_logging("failure") + + +@functools.total_ordering +@dataclasses.dataclass(frozen=True) +class SeverityLabel: + """ + Severity label that can be attached to an event. + + The severity level is assigned dynamically based on the configuration, + so only use the name for use outside use of the tagging module. + """ + name: str + severity: int + + def __lt__(self, other): + return self.severity < other.severity + + def __eq__(self, other): + return self.severity == self.severity + + +class Component(enum.Enum): + """ + Main components of Keylime that can generate revocations. + """ + QUOTE_VALIDATION = "qoute_validation" + PCR_VALIDATION = "pcr_validation" + MEASURED_BOOT = "measured_boot" + IMA = "ima" + INTERNAL = "internal" + DEFAULT = "default" + + +@dataclasses.dataclass +class Event: + """ + Event that might be the reason for revocation. + + The context is string + """ + event_id: str + severity_label: SeverityLabel + context: str + recoverable: bool + + def __init__(self, component: Component, + sub_components: Optional[List[str]], + event_id: str, + context: Union[str, dict], + recoverable: bool): + + # Build full event id with the format "component.[sub_component].event_id" + self.event_id = component.value + if sub_components is not None: + self.event_id += "." + ".".join(sub_components) + self.event_id += f".{event_id}" + + # Convert message + if isinstance(context, str): + context = {"message": context} + self.context = json.dumps(context) + + self.severity_label = _severity_match(self.event_id) + self.recoverable = recoverable + + +class Failure: + """ + Failure Object that collects all events that might cause a revocation. + + If recoverable is set to False the validation process returned early and might skipped other validation steps. + """ + events: List[Event] + recoverable: bool + highest_severity: Optional[SeverityLabel] + _component: Optional[Component] + _sub_components: Optional[List[str]] + + def __init__(self, component, sub_components=None): + self._component = component + self._sub_components = sub_components + self.events = [] + self.recoverable = True + self.highest_severity_event: Optional[Event] = None # This only holds the first event that has the highest severity + self.highest_severity: Optional[SeverityLabel] = None + + def _add(self, event: Event): + if not event.recoverable: + self.recoverable = False + if event.severity_label != MAX_SEVERITY_LABEL: + logger.warning( + f"Irrecoverable Event with id: {event.event_id} has not the highest severity level.\n " + f"Setting it the the highest severity level.") + event.severity_label = MAX_SEVERITY_LABEL + + if self.highest_severity is None or event.severity_label > self.highest_severity: + self.highest_severity = event.severity_label + self.highest_severity_event = event + + self.events.append(event) + + def add_event(self, event_id: str, context: Union[str, dict], recoverable: bool, sub_components=None): + """ + Add event to Failure object. Uses the component and subcomponents specified in the Failure object. + + As context specify either a string that contains a message or a dict that contains useful information about that + event. + Set recoverable to False if the code skips other not directly related checks. Those events should always have + the highest severity label assigned and if not we manually do that. + + Example usage: + failure.add_event("ima_hash", + {"message": "IMA hash does not match the calculated hash.", + "expected": self.template_hash, "got": self.mode.hash()}, True) + """ + if sub_components is not None and self._sub_components is not None: + sub_components = self._sub_components + sub_components + elif self._sub_components is not None: + sub_components = self._sub_components + event = Event(self._component, sub_components, event_id, context, recoverable) + self._add(event) + + def merge(self, other): + if self.recoverable: + self.recoverable = other.recoverable + if self.highest_severity is None: + self.highest_severity = other.highest_severity + elif other.highest_severity is not None: + self.highest_severity = max(self.highest_severity, other.highest_severity) + self.events.extend(other.events) + + def __bool__(self): + return not self.recoverable or len(self.events) > 0 + + +def _eval_severity_config() -> Tuple[List[Callable[[str], Optional[SeverityLabel]]], SeverityLabel]: + """ + Generates the list of rules to match a event_id against. + """ + + labels_list = ast.literal_eval(config.get("cloud_verifier", "severity_labels")) + labels = {} + for label, level in zip(labels_list, range(0, len(labels_list))): + labels[label] = SeverityLabel(label, level) + + label_max = labels[labels_list[-1]] + + policies = ast.literal_eval(config.get("cloud_verifier", "severity_policy")) + rules = [] + for policy in policies: + # TODO validate regex + regex = re.compile(policy["event_id"]) + + def rule(policy_regex, label_str: str, event_id: str) -> Optional[SeverityLabel]: + if policy_regex.fullmatch(event_id): + policy_label = labels.get(label_str) + if policy_label is None: + logger.error(f"Label {label_str} is not a valid label. Defaulting to maximal severity label!") + return label_max + return policy_label + return None + + rules.append(functools.partial(rule, regex, policy["severity_label"])) + return rules, label_max + + +# Only evaluate the policy once on module load +SEVERITY_RULES, MAX_SEVERITY_LABEL = _eval_severity_config() + + +def _severity_match(event_id: str) -> SeverityLabel: + """ + Match the event_id to a severity label. + """ + for rule in SEVERITY_RULES: + match = rule(event_id) + if match is not None: + return match + logger.warning(f"No rule matched for event_id: {event_id}. Defaulting to max severity label") + return MAX_SEVERITY_LABEL