forked from keylime/keylime
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add infrastructure to tag and collect revocation events in Keylime
Currently Keylime operates in a binary state when it comes failures that cause revocations and does not collect information where and why that revocation happened. This commit introduces the tagging and collection infrastructure and configuration for revocation events and adding context to them. SeverityLabel Has a name that can be set in the keylime.conf and a severity that is dynamically calculated based on the order in the configuration. Components Is a enumeration that contains the main components of Keylime that can cause events. Must be specified when creating a Failure object. Event Holds a revocation event. An event can be identified by their event_id which has the format: "component.[sub_component].event" The severity is automatically assigned based on the event_id. The event contains a context which is string encoded JSON object. An event must be marked irrecoverable if other validation steps are skipped by early returning. Failure Holds a collection of events and provides add_event(...) for adding new events to it and merge(...) to merge it with another Failure object. Is False if no events are currently in the Failure object and is otherwise True. Future changes that cause a revocation must extend and return a Failure object instead of returning a boolean value. Part of enhancement proposal keylime/enhancements#48 Signed-off-by: Thore Sommer <[email protected]>
- Loading branch information
Showing
2 changed files
with
206 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,196 @@ | ||
''' | ||
SPDX-License-Identifier: Apache-2.0 | ||
Copyright 2021 Thore Sommer | ||
Tagging of events that might cause revocation in Keylime. | ||
''' | ||
import ast | ||
import dataclasses | ||
import enum | ||
import functools | ||
import json | ||
import re | ||
from typing import List, Optional, Tuple, Callable, Union | ||
|
||
from keylime import config | ||
from keylime import keylime_logging | ||
|
||
logger = keylime_logging.init_logging("tagging") | ||
|
||
|
||
@functools.total_ordering | ||
@dataclasses.dataclass(frozen=True) | ||
class SeverityLabel: | ||
""" | ||
Severity label that can be attached to an event. | ||
The severity level is assigned dynamically based on the configuration, | ||
so only use the name for use outside use of the tagging module. | ||
""" | ||
name: str | ||
severity: int | ||
|
||
def __lt__(self, other): | ||
return self.severity < other.severity | ||
|
||
def __eq__(self, other): | ||
return self.severity == self.severity | ||
|
||
|
||
class Component(enum.Enum): | ||
""" | ||
Main components of Keylime that can generate revocations. | ||
""" | ||
QUOTE_VALIDATION = "qoute_validation" | ||
PCR_VALIDATION = "pcr_validation" | ||
MEASURED_BOOT = "measured_boot" | ||
IMA = "ima" | ||
INTERNAL = "internal" | ||
DEFAULT = "default" | ||
|
||
|
||
@dataclasses.dataclass | ||
class Event: | ||
""" | ||
Event that might be the reason for revocation. | ||
The context is string | ||
""" | ||
event_id: str | ||
severity_label: SeverityLabel | ||
context: str | ||
recoverable: bool | ||
|
||
def __init__(self, component: Component, | ||
sub_components: Optional[List[str]], | ||
event_id: str, | ||
context: Union[str, dict], | ||
recoverable: bool): | ||
|
||
# Build full event id with the format "component.[sub_component].event_id" | ||
self.event_id = component.value | ||
if sub_components is not None: | ||
self.event_id += "." + ".".join(sub_components) | ||
self.event_id += f".{event_id}" | ||
|
||
# Convert message | ||
if isinstance(context, str): | ||
context = {"message": context} | ||
self.context = json.dumps(context) | ||
|
||
self.severity_label = _severity_match(self.event_id) | ||
self.recoverable = recoverable | ||
|
||
|
||
class Failure: | ||
""" | ||
Failure Object that collects all events that might cause a revocation. | ||
If recoverable is set to False the validation process returned early and might skipped other validation steps. | ||
""" | ||
events: List[Event] | ||
recoverable: bool | ||
highest_severity: Optional[SeverityLabel] | ||
_component: Optional[Component] | ||
_sub_components: Optional[List[str]] | ||
|
||
def __init__(self, component, sub_components=None): | ||
self._component = component | ||
self._sub_components = sub_components | ||
self.events = [] | ||
self.recoverable = True | ||
self.highest_severity = None | ||
|
||
def _add(self, event: Event): | ||
if not event.recoverable: | ||
self.recoverable = False | ||
if event.severity_label != MAX_SEVERITY_LABEL: | ||
logger.warning( | ||
f"Irrecoverable Event with id: {event.event_id} has not the highest severity level.\n " | ||
f"Setting it the the highest severity level.") | ||
event.severity_label = MAX_SEVERITY_LABEL | ||
|
||
if self.highest_severity is None or event.severity_label > self.highest_severity: | ||
self.highest_severity = event.severity_label | ||
|
||
self.events.append(event) | ||
|
||
def add_event(self, event_id: str, context: Union[str, dict], recoverable: bool, sub_components=None): | ||
""" | ||
Add event to Failure object. Uses the component and subcomponents specified in the Failure object. | ||
As context specify either a string that contains a message or a dict that contains useful information about that | ||
event. | ||
Set recoverable to False if the code skips other not directly related checks. Those events should always have | ||
the highest severity label assigned and if not we manually do that. | ||
Example usage: | ||
failure.add_event("ima_hash", | ||
{"message": "IMA hash does not match the calculated hash.", | ||
"expected": self.template_hash, "got": self.mode.hash()}, True) | ||
""" | ||
if sub_components is not None and self._sub_components is not None: | ||
sub_components = self._sub_components + sub_components | ||
elif self._sub_components is not None: | ||
sub_components = self._sub_components | ||
event = Event(self._component, sub_components, event_id, context, recoverable) | ||
self._add(event) | ||
|
||
def merge(self, other): | ||
if self.recoverable: | ||
self.recoverable = other.recoverable | ||
if self.highest_severity is None: | ||
self.highest_severity = other.highest_severity | ||
elif other.highest_severity is not None: | ||
self.highest_severity = max(self.highest_severity, other.highest_severity) | ||
self.events.extend(other.events) | ||
|
||
def __bool__(self): | ||
return not self.recoverable or len(self.events) > 0 | ||
|
||
|
||
def _eval_severity_config() -> Tuple[List[Callable[[str], Optional[SeverityLabel]]], SeverityLabel]: | ||
""" | ||
Generates the list of rules to match a event_id against. | ||
""" | ||
|
||
labels_list = ast.literal_eval(config.get("cloud_verifier", "severity_labels")) | ||
labels = {} | ||
for label, level in zip(labels_list, range(0, len(labels_list))): | ||
labels[label] = SeverityLabel(label, level) | ||
|
||
label_max = labels[labels_list[-1]] | ||
|
||
policies = ast.literal_eval(config.get("cloud_verifier", "severity_policy")) | ||
rules = [] | ||
for policy in policies: | ||
# TODO validate regex | ||
regex = re.compile(policy["event_id"]) | ||
|
||
def rule(policy_regex, label_str: str, event_id: str) -> Optional[SeverityLabel]: | ||
if policy_regex.fullmatch(event_id): | ||
policy_label = labels.get(label_str) | ||
if policy_label is None: | ||
logger.error(f"Label {label_str} is not a valid label. Defaulting to maximal severity label!") | ||
return label_max | ||
return policy_label | ||
return None | ||
|
||
rules.append(functools.partial(rule, regex, policy["severity_label"])) | ||
return rules, label_max | ||
|
||
|
||
# Only evaluate the policy once on module load | ||
SEVERITY_RULES, MAX_SEVERITY_LABEL = _eval_severity_config() | ||
|
||
|
||
def _severity_match(event_id: str) -> SeverityLabel: | ||
""" | ||
Match the event_id to a severity label. | ||
""" | ||
for rule in SEVERITY_RULES: | ||
match = rule(event_id) | ||
if match is not None: | ||
return match | ||
logger.warning(f"No rule matched for event_id: {event_id}. Defaulting to max severity label") | ||
return MAX_SEVERITY_LABEL |