diff --git a/keylime/tenant.py b/keylime/tenant.py index 40c3e257d..d01b3a085 100755 --- a/keylime/tenant.py +++ b/keylime/tenant.py @@ -507,7 +507,8 @@ def validate_tpm_quote(self, public_key, quote, hash_alg): logger.warning("AIK not found in registrar, quote not validated") return False - if not self.tpm_instance.check_quote(AgentAttestState(self.agent_uuid), self.nonce, public_key, quote, reg_data['aik_tpm'], hash_alg=hash_alg): + failure = self.tpm_instance.check_quote(AgentAttestState(self.agent_uuid), self.nonce, public_key, quote, reg_data['aik_tpm'], hash_alg=hash_alg) + if failure: if reg_data['regcount'] > 1: logger.error("WARNING: This UUID had more than one ek-ekcert registered to it! This might indicate that your system is misconfigured or a malicious host is present. Run 'regdelete' for this agent and restart") sys.exit() diff --git a/keylime/tpm/tpm_abstract.py b/keylime/tpm/tpm_abstract.py index f87a3abc8..7a8b78d14 100644 --- a/keylime/tpm/tpm_abstract.py +++ b/keylime/tpm/tpm_abstract.py @@ -22,6 +22,7 @@ from keylime import ima from keylime import measured_boot from keylime.common import algorithms +from keylime.failure import Failure, Component logger = keylime_logging.init_logging('tpm') @@ -204,15 +205,18 @@ def _get_tpm_rand_block(self, size=4096): pass def __check_ima(self, agentAttestState, pcrval, ima_measurement_list, allowlist, ima_keyring, boot_aggregates): + failure = Failure(Component.IMA) logger.info("Checking IMA measurement list on agent: %s", agentAttestState.get_agent_id()) if config.STUB_IMA: pcrval = None - ex_value = ima.process_measurement_list(agentAttestState, ima_measurement_list.split('\n'), allowlist, pcrval=pcrval, ima_keyring=ima_keyring, boot_aggregates=boot_aggregates) - if ex_value is None: - return False - logger.debug("IMA measurement list of agent %s validated", agentAttestState.get_agent_id()) - return True + _, ima_failure = ima.process_measurement_list(agentAttestState, ima_measurement_list.split('\n'), allowlist, + pcrval=pcrval, ima_keyring=ima_keyring, + boot_aggregates=boot_aggregates) + failure.merge(ima_failure) + if not failure: + logger.debug("IMA measurement list of agent %s validated", agentAttestState.get_agent_id()) + return failure def __parse_pcrs(self, pcrs, virtual) -> typing.Dict[int, str]: """Parses and validates the format of a list of PCR data""" @@ -231,8 +235,9 @@ def __parse_pcrs(self, pcrs, virtual) -> typing.Dict[int, str]: return output - def check_pcrs(self, agentAttestState, tpm_policy, pcrs, data, virtual, ima_measurement_list, allowlist, ima_keyring, mb_measurement_list, mb_refstate_str): - + def check_pcrs(self, agentAttestState, tpm_policy, pcrs, data, virtual, ima_measurement_list, + allowlist, ima_keyring, mb_measurement_list, mb_refstate_str) -> Failure: + failure = Failure(Component.PCR_VALIDATION) if isinstance(tpm_policy, str): tpm_policy = json.loads(tpm_policy) @@ -244,9 +249,8 @@ def check_pcrs(self, agentAttestState, tpm_policy, pcrs, data, virtual, ima_meas pcr_allowlist = {int(k): v for k, v in list(pcr_allowlist.items())} mb_policy, mb_refstate_data = measured_boot.get_policy(mb_refstate_str) - mb_pcrs_sha256, boot_aggregates, mb_measurement_data, success = self.parse_mb_bootlog(mb_measurement_list) - if not success: - return False + mb_pcrs_sha256, boot_aggregates, mb_measurement_data, mb_failure = self.parse_mb_bootlog(mb_measurement_list) + failure.merge(mb_failure) pcrs_in_quote = set() # PCRs in quote that were already used for some kind of validation @@ -255,7 +259,7 @@ def check_pcrs(self, agentAttestState, tpm_policy, pcrs, data, virtual, ima_meas # Skip validation if TPM is stubbed. if config.STUB_TPM: - return True + return failure # Validate data PCR if config.TPM_DATA_PCR in pcr_nums and data is not None: @@ -264,47 +268,54 @@ def check_pcrs(self, agentAttestState, tpm_policy, pcrs, data, virtual, ima_meas logger.error( "%sPCR #%s: invalid bind data %s from quote does not match expected value %s", ("", "v")[virtual], config.TPM_DATA_PCR, pcrs[config.TPM_DATA_PCR], expectedval) - return False + failure.add_event(f"invalid_pcr_{config.TPM_DATA_PCR}", {"got": pcrs[config.TPM_DATA_PCR], "expected": expectedval}, True) pcrs_in_quote.add(config.TPM_DATA_PCR) else: logger.error("Binding %sPCR #%s was not included in the quote, but is required", ("", "v")[virtual], config.TPM_DATA_PCR) - return False - + failure.add_event(f"missing_pcr_{config.TPM_DATA_PCR}", f"Data PCR {config.TPM_DATA_PCR} is missing in quote, but is required", True) # Check for ima PCR if config.IMA_PCR in pcr_nums: if ima_measurement_list is None: logger.error("IMA PCR in policy, but no measurement list provided") - return False - - if not self.__check_ima(agentAttestState, pcrs[config.IMA_PCR], ima_measurement_list, allowlist, ima_keyring, - boot_aggregates): - return False + failure.add_event(f"unused_pcr_{config.IMA_PCR}", "IMA PCR in policy, but no measurement list provided", True) + else: + ima_failure = self.__check_ima(agentAttestState, pcrs[config.IMA_PCR], ima_measurement_list, allowlist, ima_keyring, + boot_aggregates) + failure.merge(ima_failure) pcrs_in_quote.add(config.IMA_PCR) - # Handle measured boot PCRs - for pcr_num in set(config.MEASUREDBOOT_PCRS) & pcr_nums: - if mb_refstate_data: - if not mb_measurement_list: - logger.error("Measured Boot PCR %d in policy, but no measurement list provided", pcr_num) - return False - - val_from_log_int = mb_pcrs_sha256.get(str(pcr_num), 0) - val_from_log_hex = hex(val_from_log_int)[2:] - val_from_log_hex_stripped = val_from_log_hex.lstrip('0') - pcrval_stripped = pcrs[pcr_num].lstrip('0') - if val_from_log_hex_stripped != pcrval_stripped: - logger.error( - "For PCR %d and hash SHA256 the boot event log has value %r but the agent returned %r", - pcr_num, val_from_log_hex, pcrs[pcr_num]) - return False - if pcr_num in pcr_allowlist and pcrs[pcr_num] not in pcr_allowlist[pcr_num]: - logger.error( - "%sPCR #%s: %s from quote does not match expected value %s", - ("", "v")[virtual], pcr_num, pcrs[pcr_num], pcr_allowlist[pcr_num]) - return False - pcrs_in_quote.add(pcr_num) + # Handle measured boot PCRs only if the parsing worked + if not mb_failure: + for pcr_num in set(config.MEASUREDBOOT_PCRS) & pcr_nums: + if mb_refstate_data: + if not mb_measurement_list: + logger.error("Measured Boot PCR %d in policy, but no measurement list provided", pcr_num) + failure.add_event(f"unused_pcr_{pcr_num}", + f"Measured Boot PCR {pcr_num} in policy, but no measurement list provided", True) + continue + + val_from_log_int = mb_pcrs_sha256.get(str(pcr_num), 0) + val_from_log_hex = hex(val_from_log_int)[2:] + val_from_log_hex_stripped = val_from_log_hex.lstrip('0') + pcrval_stripped = pcrs[pcr_num].lstrip('0') + if val_from_log_hex_stripped != pcrval_stripped: + logger.error( + "For PCR %d and hash SHA256 the boot event log has value %r but the agent returned %r", + pcr_num, val_from_log_hex, pcrs[pcr_num]) + failure.add_event(f"invalid_pcr_{pcr_num}", + {"context": "SHA256 boot event log PCR value does not match", + "got": pcrs[pcr_num], "expected": val_from_log_hex}, True) + + if pcr_num in pcr_allowlist and pcrs[pcr_num] not in pcr_allowlist[pcr_num]: + logger.error( + "%sPCR #%s: %s from quote does not match expected value %s", + ("", "v")[virtual], pcr_num, pcrs[pcr_num], pcr_allowlist[pcr_num]) + failure.add_event(f"invalid_pcr_{pcr_num}", + {"context": "PCR value is not in allowlist", + "got": pcrs[pcr_num], "expected": pcr_allowlist[pcr_num]}, True) + pcrs_in_quote.add(pcr_num) # Check the remaining non validated PCRs for pcr_num in pcr_nums - pcrs_in_quote: @@ -315,22 +326,23 @@ def check_pcrs(self, agentAttestState, tpm_policy, pcrs, data, virtual, ima_meas if pcrs[pcr_num] not in pcr_allowlist[pcr_num]: logger.error("%sPCR #%s: %s from quote does not match expected value %s", ("", "v")[virtual], pcr_num, pcrs[pcr_num], pcr_allowlist[pcr_num]) - return False + failure.add_event(f"invalid_pcr_{pcr_num}", + {"context": "PCR value is not in allowlist", + "got": pcrs[pcr_num], "expected": pcr_allowlist[pcr_num]}, True) pcrs_in_quote.add(pcr_num) missing = set(pcr_allowlist.keys()) - pcrs_in_quote if len(missing) > 0: logger.error("%sPCRs specified in policy not in quote: %s", ("", "v")[virtual], missing) - return False + failure.add_event("missing_pcrs", {"context": "PCRs are missing in quote", "data": missing}, True) - if mb_refstate_data: - success = measured_boot.evaluate_policy(mb_policy, mb_refstate_data, mb_measurement_data, + if not mb_failure and mb_refstate_data: + mb_policy_failure = measured_boot.evaluate_policy(mb_policy, mb_refstate_data, mb_measurement_data, pcrs_in_quote, ("", "v")[virtual], agentAttestState.get_agent_id()) - if not success: - return False + failure.merge(mb_policy_failure) - return True + return failure # tpm_nvram @abstractmethod diff --git a/keylime/tpm/tpm_main.py b/keylime/tpm/tpm_main.py index fdc0df1b1..ee443656a 100644 --- a/keylime/tpm/tpm_main.py +++ b/keylime/tpm/tpm_main.py @@ -11,6 +11,7 @@ import tempfile import threading import time +import typing import zlib import codecs from distutils.version import StrictVersion @@ -27,6 +28,7 @@ from keylime import tpm_ek_ca from keylime.common import algorithms from keylime.tpm import tpm2_objects +from keylime.failure import Failure, Component logger = keylime_logging.init_logging('tpm') @@ -1071,13 +1073,18 @@ def _tpm2_checkquote(self, aikTpmFromRegistrar, quote, nonce, hash_alg): return retout, True - def check_quote(self, agentAttestState, nonce, data, quote, aikTpmFromRegistrar, tpm_policy={}, ima_measurement_list=None, allowlist={}, hash_alg=None, ima_keyring=None, mb_measurement_list=None, mb_refstate=None): + def check_quote(self, agentAttestState, nonce, data, quote, aikTpmFromRegistrar, tpm_policy={}, + ima_measurement_list=None, allowlist={}, hash_alg=None, ima_keyring=None, + mb_measurement_list=None, mb_refstate=None) -> Failure: + failure = Failure(Component.QUOTE_VALIDATION) if hash_alg is None: hash_alg = self.defaults['hash'] retout, success = self._tpm2_checkquote(aikTpmFromRegistrar, quote, nonce, hash_alg) if not success: - return success + # If the quote validation fails we will skip all other steps therefore this failure is irrecoverable. + failure.add_event("quote_validation", {"message": "Quote validation using tpm2-tools", "data": retout}, False) + return failure pcrs = [] jsonout = config.yaml_to_dict(retout) @@ -1321,30 +1328,35 @@ def _parse_mb_bootlog(self, log_b64:str) -> dict: log_bin = base64.b64decode(log_b64, validate=True) return self.parse_binary_bootlog(log_bin) - def parse_mb_bootlog(self, mb_measurement_list:str) -> dict: + def parse_mb_bootlog(self, mb_measurement_list: str) -> typing.Tuple[dict, typing.Optional[dict], dict, Failure]: """ Parse the measured boot log and return its object and the state of the SHA256 PCRs :param mb_measurement_list: The measured boot measurement list :returns: Returns a map of the state of the SHA256 PCRs, measured boot data object and True for success and False in case an error occurred """ + failure = Failure(Component.MEASURED_BOOT, ["parser"]) if mb_measurement_list: + #TODO add tagging for _parse_mb_bootlog mb_measurement_data = self._parse_mb_bootlog(mb_measurement_list) if not mb_measurement_data: logger.error("Unable to parse measured boot event log. Check previous messages for a reason for error.") - return {}, None, {}, False + return {}, None, {}, failure log_pcrs = mb_measurement_data.get('pcrs') if not isinstance(log_pcrs, dict): logger.error("Parse of measured boot event log has unexpected value for .pcrs: %r", log_pcrs) - return {}, None, {}, False + failure.add_event("invalid_pcrs", {"got": log_pcrs}, True) + return {}, None, {}, failure pcrs_sha256 = log_pcrs.get('sha256') if (not isinstance(pcrs_sha256, dict)) or not pcrs_sha256: logger.error("Parse of measured boot event log has unexpected value for .pcrs.sha256: %r", pcrs_sha256) - return {}, None, {}, False + failure.add_event("invalid_pcrs_sha256", {"got": pcrs_sha256}, True) + return {}, None, {}, failure boot_aggregates = mb_measurement_data.get('boot_aggregates') if (not isinstance(boot_aggregates, dict)) or not boot_aggregates: logger.error("Parse of measured boot event log has unexpected value for .boot_aggragtes: %r", boot_aggregates) - return {}, None, {}, False + failure.add_event("invalid_boot_aggregates", {"got": boot_aggregates}, True) + return {}, None, {}, failure - return pcrs_sha256, boot_aggregates, mb_measurement_data, True + return pcrs_sha256, boot_aggregates, mb_measurement_data, failure - return {}, None, {}, True + return {}, None, {}, failure diff --git a/test/test_restful.py b/test/test_restful.py index ca1ad4257..6594db9b1 100644 --- a/test/test_restful.py +++ b/test/test_restful.py @@ -557,13 +557,13 @@ def test_022_agent_quotes_identity_get(self): self.assertIn("pubkey", json_response["results"], "Malformed response body!") # Check the quote identity - self.assertTrue(tpm_instance.check_quote(tenant_templ.agent_uuid, + failure = tpm_instance.check_quote(tenant_templ.agent_uuid, nonce, json_response["results"]["pubkey"], json_response["results"]["quote"], aik_tpm, - hash_alg=json_response["results"]["hash_alg"]), - "Invalid quote!") + hash_alg=json_response["results"]["hash_alg"]) + self.assertTrue(not failure, "Invalid quote!") @unittest.skip("Testing of agent's POST /keys/vkey disabled! (spawned CV should do this already)") def test_023_agent_keys_vkey_post(self): @@ -846,14 +846,14 @@ def test_040_agent_quotes_integrity_get(self): quote = json_response["results"]["quote"] hash_alg = json_response["results"]["hash_alg"] - validQuote = tpm_instance.check_quote(tenant_templ.agent_uuid, + failure = tpm_instance.check_quote(tenant_templ.agent_uuid, nonce, public_key, quote, aik_tpm, self.tpm_policy, hash_alg=hash_alg) - self.assertTrue(validQuote) + self.assertTrue(not failure) async def test_041_agent_keys_verify_get(self): """Test agent's GET /keys/verify Interface