diff --git a/middleware/adm_sgx.py b/middleware/adm_sgx.py index d6a8f289..7090c5c8 100644 --- a/middleware/adm_sgx.py +++ b/middleware/adm_sgx.py @@ -31,6 +31,7 @@ from admin.pubkeys import do_get_pubkeys from admin.changepin import do_changepin from admin.sgx_attestation import do_attestation +from admin.verify_sgx_attestation import do_verify_attestation def main(): @@ -42,12 +43,13 @@ def main(): "pubkeys": do_get_pubkeys, "changepin": do_changepin, "attestation": do_attestation, + "verify_attestation": do_verify_attestation, } parser = ArgumentParser(description="SGX powHSM Administrative tool") parser.add_argument("operation", choices=list(actions.keys())) parser.add_argument( - "-r", + "-p", "--port", dest="sgx_port", help="SGX powHSM listening port (default 7777)", @@ -61,7 +63,7 @@ def main(): help="SGX powHSM host. (default 'localhost')", default="localhost", ) - parser.add_argument("-p", "--pin", dest="pin", help="PIN.") + parser.add_argument("-P", "--pin", dest="pin", help="PIN.") parser.add_argument( "-n", "--newpin", @@ -103,6 +105,26 @@ def main(): f"{DEFAULT_ATT_UD_SOURCE}). Can also specify a 32-byte hex string to use as" " the value.", ) + parser.add_argument( + "-t", + "--attcert", + dest="attestation_certificate_file_path", + help="Attestation key certificate file (only valid for " + "'verify_attestation' operation).", + ) + parser.add_argument( + "-r", + "--root", + dest="root_authority", + help="Root attestation authority (only valid for 'verify_attestation' " + "operation). Defaults to Intel SGX's root authority.", + ) + parser.add_argument( + "-b", + "--pubkeys", + dest="pubkeys_file_path", + help="Public keys file (only valid for 'verify_attestation' operation).", + ) parser.add_argument( "-v", "--verbose", diff --git a/middleware/admin/attestation_utils.py b/middleware/admin/attestation_utils.py new file mode 100644 index 00000000..d9d31387 --- /dev/null +++ b/middleware/admin/attestation_utils.py @@ -0,0 +1,143 @@ +# The MIT License (MIT) +# +# Copyright (c) 2021 RSK Labs Ltd +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import hashlib +import json +import re +import secp256k1 as ec +import requests +from pathlib import Path +from comm.cstruct import CStruct +from .misc import AdminError +from .certificate_v2 import HSMCertificateV2, HSMCertificateV2ElementX509 + + +class PowHsmAttestationMessage(CStruct): + """ + pow_hsm_message_header + + uint8_t platform 3 + uint8_t ud_value 32 + uint8_t public_keys_hash 32 + uint8_t best_block 32 + uint8_t last_signed_tx 8 + uint8_t timestamp 8 + """ + + HEADER_REGEX = re.compile(b"^POWHSM:(5.[0-9])::") + + @classmethod + def is_header(kls, value): + return kls.HEADER_REGEX.match(value) is not None + + def __init__(self, value, offset=0, little=True, name="powHSM"): + self.name = name + # Parse header + match = self.HEADER_REGEX.match(value) + if match is None: + raise ValueError( + f"Invalid {self.name} attestation message header: {value.hex()}") + + # Validate total length + header_length = len(match.group(0)) + expected_length = header_length + self.get_bytelength() + if len(value[offset:]) != expected_length: + raise ValueError(f"{self.name} attestation message length " + f"mismatch: {value[offset:].hex()}") + + # Grab version + self.version = match.group(1).decode("ASCII") + + # Parse the rest + super().__init__(value, offset+header_length, little) + + # Conversions + self.platform = self.platform.decode("ASCII") + self.timestamp = int.from_bytes(self.timestamp, byteorder="big", signed=False) + + +def load_pubkeys(pubkeys_file_path): + # Load the given public keys file into a map + try: + with open(pubkeys_file_path, "r") as file: + pubkeys_map = json.loads(file.read()) + + if type(pubkeys_map) != dict: + raise AdminError( + "Public keys file must contain an object as a top level element") + + result = {} + for path in pubkeys_map.keys(): + pubkey = pubkeys_map[path] + try: + pubkey = ec.PublicKey(bytes.fromhex(pubkey), raw=True) + except Exception: + raise AdminError(f"Invalid public key for path {path}: {pubkey}") + result[path] = pubkey + return result + except (FileNotFoundError, ValueError, json.JSONDecodeError) as e: + raise AdminError('Unable to read public keys from "%s": %s' % + (pubkeys_file_path, str(e))) + + +def compute_pubkeys_hash(pubkeys_map): + # Compute the given public keys hash + # (sha256sum of the uncompressed public keys in + # lexicographical path order) + if len(pubkeys_map) == 0: + raise AdminError("Can't compute the hash of an empty public keys map") + + pubkeys_hash = hashlib.sha256() + for path in sorted(pubkeys_map.keys()): + pubkey = pubkeys_map[path] + pubkeys_hash.update(pubkey.serialize(compressed=False)) + return pubkeys_hash.digest() + + +def compute_pubkeys_output(pubkeys_map): + pubkeys_output = [] + path_name_padding = max(map(len, pubkeys_map.keys())) + for path in sorted(pubkeys_map.keys()): + pubkey = pubkeys_map[path] + pubkeys_output.append( + f"{(path + ':').ljust(path_name_padding+1)} " + f"{pubkey.serialize(compressed=True).hex()}" + ) + return pubkeys_output + + +def get_root_of_trust(path): + # From file + if Path(path).is_file(): + return HSMCertificateV2ElementX509.from_pemfile( + path, + HSMCertificateV2.ROOT_ELEMENT, + HSMCertificateV2.ROOT_ELEMENT) + + # Assume URL and try to grab it + ra_res = requests.get(path) + if ra_res.status_code != 200: + raise RuntimeError(f"Error fetching root of trust from {path}") + return HSMCertificateV2ElementX509.from_pem( + ra_res.content.decode(), + HSMCertificateV2.ROOT_ELEMENT, + HSMCertificateV2.ROOT_ELEMENT) diff --git a/middleware/admin/certificate_v2.py b/middleware/admin/certificate_v2.py index ca343025..e84591d2 100644 --- a/middleware/admin/certificate_v2.py +++ b/middleware/admin/certificate_v2.py @@ -20,9 +20,12 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +import re +from pathlib import Path import base64 from .certificate_v1 import HSMCertificate from .utils import is_nonempty_hex_string +from sgx.envelope import SgxQuote class HSMCertificateV2Element: @@ -105,7 +108,10 @@ def signature(self): return self._signature.hex() def get_value(self): - return self.custom_data + return { + "sgx_quote": SgxQuote(self._message), + "message": self.custom_data, + } def to_dict(self): return { @@ -170,6 +176,21 @@ def to_dict(self): class HSMCertificateV2ElementX509(HSMCertificateV2Element): + @classmethod + def from_pemfile(kls, pem_path, name, signed_by): + return kls.from_pem(Path(pem_path).read_text(), name, signed_by) + + @classmethod + def from_pem(kls, pem_str, name, signed_by): + return kls({ + "name": name, + "message": re.sub(r"[\s\n\r]+", " ", pem_str) + .replace("-----END CERTIFICATE-----", "") + .replace("-----BEGIN CERTIFICATE-----", "") + .strip().encode(), + "signed_by": signed_by, + }) + def __init__(self, element_map): self._init_with_map(element_map) diff --git a/middleware/admin/verify_ledger_attestation.py b/middleware/admin/verify_ledger_attestation.py index d9f7ffd0..475a1ef5 100644 --- a/middleware/admin/verify_ledger_attestation.py +++ b/middleware/admin/verify_ledger_attestation.py @@ -20,32 +20,23 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -import json -import hashlib -import secp256k1 as ec import re from .misc import info, head, AdminError +from .attestation_utils import PowHsmAttestationMessage, load_pubkeys, \ + compute_pubkeys_hash, compute_pubkeys_output from .utils import is_nonempty_hex_string from .certificate import HSMCertificate, HSMCertificateRoot -UI_MESSAGE_HEADER_REGEX = re.compile(b"^HSM:UI:(5.[0-9])") -SIGNER_LEGACY_MESSAGE_HEADER_REGEX = re.compile(b"^HSM:SIGNER:(5.[0-9])") +UI_MESSAGE_HEADER_REGEX = re.compile(b"^HSM:UI:([2,3,4,5].[0-9])") +SIGNER_LEGACY_MESSAGE_HEADER_REGEX = re.compile(b"^HSM:SIGNER:([2,3,4,5].[0-9])") UI_DERIVATION_PATH = "m/44'/0'/0'/0/0" UD_VALUE_LENGTH = 32 +PUBLIC_KEYS_HASH_LENGTH = 32 PUBKEY_COMPRESSED_LENGTH = 33 SIGNER_HASH_LENGTH = 32 SIGNER_ITERATION_LENGTH = 2 -# New signer message header with fields -SIGNER_MESSAGE_HEADER_REGEX = re.compile(b"^POWHSM:(5.[0-9])::") -SM_PLATFORM_LEN = 3 -SM_UD_LEN = 32 -SM_PKH_LEN = 32 -SM_BB_LEN = 32 -SM_TXN_LEN = 8 -SM_TMSTMP_LEN = 8 - # Ledger's root authority # (according to # https://github.com/LedgerHQ/blue-loader-python/blob/master/ledgerblue/ @@ -75,45 +66,18 @@ def do_verify_attestation(options): raise AdminError("Invalid root authority") info(f"Using {root_authority} as root authority") - # Load the given public keys and compute - # their hash (sha256sum of the uncompressed - # public keys in lexicographical path order) - # Also find and save the public key corresponding - # to the expected derivation path for the UI - # attestation - expected_ui_public_key = None - try: - with open(options.pubkeys_file_path, "r") as file: - pubkeys_map = json.loads(file.read()) - - if type(pubkeys_map) != dict: - raise ValueError( - "Public keys file must contain an object as a top level element") - - pubkeys_hash = hashlib.sha256() - pubkeys_output = [] - path_name_padding = max(map(len, pubkeys_map.keys())) - for path in sorted(pubkeys_map.keys()): - pubkey = pubkeys_map[path] - if not is_nonempty_hex_string(pubkey): - raise AdminError(f"Invalid public key for path {path}: {pubkey}") - pubkey = ec.PublicKey(bytes.fromhex(pubkey), raw=True) - pubkeys_hash.update(pubkey.serialize(compressed=False)) - pubkeys_output.append( - f"{(path + ':').ljust(path_name_padding+1)} " - f"{pubkey.serialize(compressed=True).hex()}" - ) - if path == UI_DERIVATION_PATH: - expected_ui_public_key = pubkey.serialize(compressed=True).hex() - pubkeys_hash = pubkeys_hash.digest() - - except (ValueError, json.JSONDecodeError) as e: - raise ValueError('Unable to read public keys from "%s": %s' % - (options.pubkeys_file_path, str(e))) + # Load public keys, compute their hash and format them for output + pubkeys_map = load_pubkeys(options.pubkeys_file_path) + pubkeys_hash = compute_pubkeys_hash(pubkeys_map) + pubkeys_output = compute_pubkeys_output(pubkeys_map) + # Find the expected UI public key + expected_ui_public_key = next(filter( + lambda pair: pair[0] == UI_DERIVATION_PATH, pubkeys_map.items()), (None, None))[1] if expected_ui_public_key is None: raise AdminError( f"Public key with path {UI_DERIVATION_PATH} not present in public key file") + expected_ui_public_key = expected_ui_public_key.serialize(compressed=True).hex() # Load the given attestation key certificate try: @@ -155,6 +119,10 @@ def do_verify_attestation(options): SIGNER_HASH_LENGTH + SIGNER_ITERATION_LENGTH] signer_iteration = int.from_bytes(signer_iteration, byteorder='big', signed=False) + if ui_public_key != expected_ui_public_key: + raise AdminError("Invalid UI attestation: unexpected public key reported. " + f"Expected {expected_ui_public_key} but got {ui_public_key}") + head( [ "UI verified with:", @@ -180,40 +148,31 @@ def do_verify_attestation(options): signer_message = bytes.fromhex(signer_result[1]) signer_hash = bytes.fromhex(signer_result[2]) lmh_match = SIGNER_LEGACY_MESSAGE_HEADER_REGEX.match(signer_message) - mh_match = SIGNER_MESSAGE_HEADER_REGEX.match(signer_message) - if lmh_match is None and mh_match is None: + if lmh_match is None and not PowHsmAttestationMessage.is_header(signer_message): raise AdminError( f"Invalid Signer attestation message header: {signer_message.hex()}") if lmh_match is not None: # Legacy header + powhsm_message = None hlen = len(lmh_match.group(0)) - signer_version = lmh_match.group(1) + signer_version = lmh_match.group(1).decode() offset = hlen reported_pubkeys_hash = signer_message[offset:] - offset += SM_PKH_LEN + offset += PUBLIC_KEYS_HASH_LENGTH + if signer_message[offset:] != b'': + raise AdminError(f"Signer attestation message longer " + f"than expected: {signer_message.hex()}") else: # New header - hlen = len(mh_match.group(0)) - signer_version = mh_match.group(1) - offset = hlen - reported_platform = signer_message[offset:offset+SM_PLATFORM_LEN] - offset += SM_PLATFORM_LEN - reported_ud_value = signer_message[offset:offset+SM_UD_LEN] - offset += SM_UD_LEN - reported_pubkeys_hash = signer_message[offset:offset+SM_PKH_LEN] - offset += SM_PKH_LEN - reported_best_block = signer_message[offset:offset+SM_BB_LEN] - offset += SM_BB_LEN - reported_txn_head = signer_message[offset:offset+SM_TXN_LEN] - offset += SM_TXN_LEN - reported_timestamp = signer_message[offset:offset+SM_TMSTMP_LEN] - offset += SM_TMSTMP_LEN - - if signer_message[offset:] != b'': - raise AdminError(f"Signer attestation message longer " - f"than expected: {signer_message.hex()}") - + try: + powhsm_message = PowHsmAttestationMessage(signer_message, name="Signer") + except ValueError as e: + raise AdminError(str(e)) + signer_version = powhsm_message.version + reported_pubkeys_hash = powhsm_message.public_keys_hash + + # Validations on extracted values if reported_pubkeys_hash != pubkeys_hash: raise AdminError( f"Signer attestation public keys hash mismatch: expected {pubkeys_hash.hex()}" @@ -224,16 +183,16 @@ def do_verify_attestation(options): f"Hash: {pubkeys_hash.hex()}", "", f"Installed Signer hash: {signer_hash.hex()}", - f"Installed Signer version: {signer_version.decode()}", + f"Installed Signer version: {signer_version}", ] - if mh_match is not None: + if powhsm_message is not None: signer_info += [ - f"Platform: {reported_platform.decode("ASCII")}", - f"UD value: {reported_ud_value.hex()}", - f"Best block: {reported_best_block.hex()}", - f"Last transaction signed: {reported_txn_head.hex()}", - f"Timestamp: {reported_timestamp.hex()}", + f"Platform: {powhsm_message.platform}", + f"UD value: {powhsm_message.ud_value.hex()}", + f"Best block: {powhsm_message.best_block.hex()}", + f"Last transaction signed: {powhsm_message.last_signed_tx.hex()}", + f"Timestamp: {powhsm_message.timestamp}", ] head( diff --git a/middleware/admin/verify_sgx_attestation.py b/middleware/admin/verify_sgx_attestation.py new file mode 100644 index 00000000..1e8672fb --- /dev/null +++ b/middleware/admin/verify_sgx_attestation.py @@ -0,0 +1,125 @@ +# The MIT License (MIT) +# +# Copyright (c) 2021 RSK Labs Ltd +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from .misc import info, head, AdminError +from .attestation_utils import PowHsmAttestationMessage, load_pubkeys, \ + compute_pubkeys_hash, compute_pubkeys_output, \ + get_root_of_trust +from .certificate import HSMCertificate + + +# ################################################################################### +# As default root authority, we use the Provisioning Certification Root CA from Intel +# The Provisioning Certification Root CA is available for download +# from Intel, as described here: +# https://api.portal.trustedservices.intel.com/content/documentation.html + +DEFAULT_ROOT_AUTHORITY = "https://certificates.trustedservices.intel.com/"\ + "Intel_SGX_Provisioning_Certification_RootCA.pem" + +# ################################################################################### + + +def do_verify_attestation(options): + head("### -> Verify powHSM attestation", fill="#") + + if options.attestation_certificate_file_path is None: + raise AdminError("No attestation certificate file given") + + if options.pubkeys_file_path is None: + raise AdminError("No public keys file given") + + # Load root authority + root_authority = options.root_authority or DEFAULT_ROOT_AUTHORITY + info(f"Attempting to gather root authority from {root_authority}...") + try: + root_of_trust = get_root_of_trust(root_authority) + except Exception as e: + raise AdminError(f"Invalid root authority {root_authority}: {e}") + info(f"Using {root_authority} as root authority") + + # Load public keys, compute their hash and format them for output + try: + pubkeys_map = load_pubkeys(options.pubkeys_file_path) + pubkeys_hash = compute_pubkeys_hash(pubkeys_map) + pubkeys_output = compute_pubkeys_output(pubkeys_map) + except Exception as e: + raise AdminError(str(e)) + + # Load the given attestation key certificate + try: + att_cert = HSMCertificate.from_jsonfile(options.attestation_certificate_file_path) + except Exception as e: + raise AdminError(f"While loading the attestation certificate file: {str(e)}") + + # Validate the certificate using the given root authority + # (this should be *one of* Ledger's public keys) + result = att_cert.validate_and_get_values(root_of_trust) + + # powHSM specific validations + if "quote" not in result: + raise AdminError("Certificate does not contain a powHSM attestation") + + powhsm_result = result["quote"] + if not powhsm_result[0]: + raise AdminError( + f"Invalid powHSM attestation: error validating '{powhsm_result[1]}'") + powhsm_result = powhsm_result[1] + + sgx_quote = powhsm_result["sgx_quote"] + powhsm_message = bytes.fromhex(powhsm_result["message"]) + if not PowHsmAttestationMessage.is_header(powhsm_message): + raise AdminError( + f"Invalid powHSM attestation message header: {powhsm_message.hex()}") + + try: + powhsm_message = PowHsmAttestationMessage(powhsm_message) + except Exception as e: + raise AdminError(f"Error parsing powHSM attestation message: {str(e)}") + reported_pubkeys_hash = powhsm_message.public_keys_hash + + if reported_pubkeys_hash != pubkeys_hash: + raise AdminError( + f"powHSM attestation public keys hash mismatch: expected {pubkeys_hash.hex()}" + f" but attestation reports {reported_pubkeys_hash.hex()}" + ) + + signer_info = [ + f"Hash: {pubkeys_hash.hex()}", + "", + f"Installed powHSM MRENCLAVE: {sgx_quote.report_body.mrenclave.hex()}", + f"Installed powHSM MRSIGNER: {sgx_quote.report_body.mrsigner.hex()}", + f"Installed powHSM version: {powhsm_message.version}", + ] + + signer_info += [ + f"Platform: {powhsm_message.platform}", + f"UD value: {powhsm_message.ud_value.hex()}", + f"Best block: {powhsm_message.best_block.hex()}", + f"Last transaction signed: {powhsm_message.last_signed_tx.hex()}", + f"Timestamp: {powhsm_message.timestamp}", + ] + + head( + ["powHSM verified with public keys:"] + pubkeys_output + signer_info, + fill="-", + ) diff --git a/middleware/tests/admin/test_adm_sgx.py b/middleware/tests/admin/test_adm_sgx.py index 53c139cd..ca8427b6 100644 --- a/middleware/tests/admin/test_adm_sgx.py +++ b/middleware/tests/admin/test_adm_sgx.py @@ -40,6 +40,9 @@ def setUp(self): "new_pin": None, "no_unlock": False, "attestation_ud_source": "https://public-node.rsk.co", + "attestation_certificate_file_path": None, + "root_authority": None, + "pubkeys_file_path": None, "operation": None, "output_file_path": None, "pin": None, @@ -61,7 +64,7 @@ def test_unlock(self, do_unlock): call(Namespace(**expected_options)) ] - with patch('sys.argv', ['adm_sgx.py', '-p', 'a-pin', 'unlock']): + with patch('sys.argv', ['adm_sgx.py', '-P', 'a-pin', 'unlock']): with self.assertRaises(SystemExit) as e: main() self.assertEqual(e.exception.code, 0) @@ -89,7 +92,7 @@ def test_onboard(self, do_onboard): ] with patch('sys.argv', - ['adm_sgx.py', '-p', 'a-pin', 'onboard']): + ['adm_sgx.py', '-P', 'a-pin', 'onboard']): with self.assertRaises(SystemExit) as e: main() self.assertEqual(e.exception.code, 0) @@ -119,7 +122,7 @@ def test_pubkeys(self, do_get_pubkeys): call(Namespace(**expected_options)) ] - with patch('sys.argv', ['adm_sgx.py', '-p', 'a-pin', '-o', 'a-path', '-u', + with patch('sys.argv', ['adm_sgx.py', '-P', 'a-pin', '-o', 'a-path', '-u', '-s', '1.2.3.4', 'pubkeys']): with self.assertRaises(SystemExit) as e: main() @@ -154,8 +157,8 @@ def test_changepin(self, do_changepin): call(Namespace(**expected_options)) ] - with patch('sys.argv', ['adm_sgx.py', '-p', 'old-pin', '-n', 'new-pin', - '-r', '4567', '-a', 'changepin']): + with patch('sys.argv', ['adm_sgx.py', '-P', 'old-pin', '-n', 'new-pin', + '-p', '4567', '-a', 'changepin']): with self.assertRaises(SystemExit) as e: main() self.assertEqual(e.exception.code, 0) @@ -186,7 +189,7 @@ def test_attestation(self, do_attestation): ] with patch('sys.argv', ['adm_sgx.py', - '-p', 'a-pin', + '-P', 'a-pin', '-o', 'out-path', '--attudsource', 'user-defined-source', 'attestation']): diff --git a/middleware/tests/admin/test_attestation_utils.py b/middleware/tests/admin/test_attestation_utils.py new file mode 100644 index 00000000..790fa505 --- /dev/null +++ b/middleware/tests/admin/test_attestation_utils.py @@ -0,0 +1,269 @@ +# The MIT License (MIT) +# +# Copyright (c) 2021 RSK Labs Ltd +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from types import SimpleNamespace +import secp256k1 as ec +from unittest import TestCase +from unittest.mock import patch, mock_open +from parameterized import parameterized +from admin.attestation_utils import AdminError, PowHsmAttestationMessage, load_pubkeys, \ + compute_pubkeys_hash, compute_pubkeys_output, \ + get_root_of_trust +from .test_attestation_utils_resources import TEST_PUBKEYS_JSON, \ + TEST_PUBKEYS_JSON_INVALID +import logging + +logging.disable(logging.CRITICAL) + + +class TestPowHsmAttestationMessage(TestCase): + @parameterized.expand([ + ("ok_exact", True, b"POWHSM:5.6::"), + ("ok_longer", True, b"POWHSM:5.3::whatcomesafterwards"), + ("version_mismatch", False, b"POWHSM:4.3::"), + ("shorter", False, b"POWHSM:5.3:"), + ("invalid", False, b"something invalid"), + ]) + def test_is_header(self, _, expected, header): + self.assertEqual(expected, PowHsmAttestationMessage.is_header(header)) + + def test_parse_ok(self): + msg = PowHsmAttestationMessage( + b"POWHSM:5.7::" + + b"abc" + + bytes.fromhex("aa"*32) + + bytes.fromhex("bb"*32) + + bytes.fromhex("cc"*32) + + bytes.fromhex("dd"*8) + + bytes.fromhex("00"*7 + "83") + ) + + self.assertEqual("abc", msg.platform) + self.assertEqual(bytes.fromhex("aa"*32), msg.ud_value) + self.assertEqual(bytes.fromhex("bb"*32), msg.public_keys_hash) + self.assertEqual(bytes.fromhex("cc"*32), msg.best_block) + self.assertEqual(bytes.fromhex("dd"*8), msg.last_signed_tx) + self.assertEqual(0x83, msg.timestamp) + + def test_parse_header_mismatch(self): + with self.assertRaises(ValueError) as e: + PowHsmAttestationMessage( + b"POWHSM:3.0::" + + b"abc" + + bytes.fromhex("aa"*32) + + bytes.fromhex("bb"*32) + + bytes.fromhex("cc"*32) + + bytes.fromhex("dd"*8) + + bytes.fromhex("00"*7 + "83") + + b"0" + ) + self.assertIn("header", str(e.exception)) + + def test_parse_shorter(self): + with self.assertRaises(ValueError) as e: + PowHsmAttestationMessage( + b"POWHSM:5.7::" + + b"abc" + + bytes.fromhex("aa"*32) + + bytes.fromhex("bb"*32) + + bytes.fromhex("cc"*32) + + bytes.fromhex("dd"*8) + + bytes.fromhex("00"*6 + "83") + ) + self.assertIn("length mismatch", str(e.exception)) + + def test_parse_longer(self): + with self.assertRaises(ValueError) as e: + PowHsmAttestationMessage( + b"POWHSM:5.7::" + + b"abc" + + bytes.fromhex("aa"*32) + + bytes.fromhex("bb"*32) + + bytes.fromhex("cc"*32) + + bytes.fromhex("dd"*8) + + bytes.fromhex("00"*7 + "83") + + b"0" + ) + self.assertIn("length mismatch", str(e.exception)) + + +class TestLoadPubKeys(TestCase): + def test_load_pubkeys_ok(self): + with patch("builtins.open", mock_open()) as file_mock: + file_mock.return_value.read.return_value = TEST_PUBKEYS_JSON + pubkeys = load_pubkeys("a-path") + + file_mock.assert_called_with("a-path", "r") + self.assertEqual([ + "m/44'/1'/0'/0/0", + "m/44'/1'/1'/0/0", + "m/44'/1'/2'/0/0", + ], list(pubkeys.keys())) + self.assertEqual(bytes.fromhex( + "03abe31ee7c91976f7a56d8e196d82d5ce75a0fcc2935723bf25610d22bd81e50f"), + pubkeys["m/44'/1'/0'/0/0"].serialize(compressed=True)) + self.assertEqual(bytes.fromhex( + "03d44eac557a58be6cd4a40cbdaa9ed22cf4f0322e8c7bb84f6421d5bdda3b99ff"), + pubkeys["m/44'/1'/1'/0/0"].serialize(compressed=True)) + self.assertEqual(bytes.fromhex( + "02877a756d2b82ddff342fa327b065326001b204b2f86a24ac36638b5162330141"), + pubkeys["m/44'/1'/2'/0/0"].serialize(compressed=True)) + + def test_load_pubkeys_file_doesnotexist(self): + with patch("builtins.open", mock_open()) as file_mock: + file_mock.side_effect = FileNotFoundError("another error") + with self.assertRaises(AdminError) as e: + load_pubkeys("a-path") + file_mock.assert_called_with("a-path", "r") + self.assertIn("another error", str(e.exception)) + + def test_load_pubkeys_invalid_json(self): + with patch("builtins.open", mock_open()) as file_mock: + file_mock.return_value.read.return_value = "not json" + with self.assertRaises(AdminError) as e: + load_pubkeys("a-path") + file_mock.assert_called_with("a-path", "r") + self.assertIn("Unable to read", str(e.exception)) + + def test_load_pubkeys_notamap(self): + with patch("builtins.open", mock_open()) as file_mock: + file_mock.return_value.read.return_value = "[1,2,3]" + with self.assertRaises(AdminError) as e: + load_pubkeys("a-path") + file_mock.assert_called_with("a-path", "r") + self.assertIn("top level", str(e.exception)) + + def test_load_pubkeys_invalid_pubkey(self): + with patch("builtins.open", mock_open()) as file_mock: + file_mock.return_value.read.return_value = TEST_PUBKEYS_JSON_INVALID + with self.assertRaises(AdminError) as e: + load_pubkeys("a-path") + file_mock.assert_called_with("a-path", "r") + self.assertIn("public key", str(e.exception)) + + +class TestComputePubkeysHash(TestCase): + def test_ok(self): + expected_hash = bytes.fromhex( + "ad33c8be1af2520e2c533d883a2021654102917969816cd1b9dacfcccf4e139e") + + def to_pub(h): + return ec.PrivateKey(bytes.fromhex(h), raw=True).pubkey + + keys = { + "1first": to_pub("11"*32), + "3third": to_pub("33"*32), + "2second": to_pub("22"*32), + } + + self.assertEqual(expected_hash, compute_pubkeys_hash(keys)) + + def test_empty_errors(self): + with self.assertRaises(AdminError) as e: + compute_pubkeys_hash({}) + self.assertIn("empty", str(e.exception)) + + +class TestComputePubkeysOutput(TestCase): + def test_sample_output(self): + class PubKey: + def __init__(self, h): + self.h = h + + def serialize(self, compressed): + return bytes.fromhex(self.h) if compressed else "" + + keys = { + "name": PubKey("11223344"), + "longer_name": PubKey("aabbcc"), + "very_very_long_name": PubKey("6677889900"), + } + + self.assertEqual([ + "longer_name: aabbcc", + "name: 11223344", + "very_very_long_name: 6677889900", + ], compute_pubkeys_output(keys)) + + +class TestGetRootOfTrust(TestCase): + @patch("admin.attestation_utils.HSMCertificateV2ElementX509") + @patch("admin.attestation_utils.Path") + def test_file_ok(self, path, HSMCertificateV2ElementX509): + path.return_value.is_file.return_value = True + HSMCertificateV2ElementX509.from_pemfile.return_value = "the-result" + + self.assertEqual("the-result", get_root_of_trust("a-file-path")) + + path.assert_called_with("a-file-path") + HSMCertificateV2ElementX509.from_pemfile.assert_called_with( + "a-file-path", "sgx_root", "sgx_root") + + @patch("admin.attestation_utils.HSMCertificateV2ElementX509") + @patch("admin.attestation_utils.Path") + def test_file_invalid(self, path, HSMCertificateV2ElementX509): + path.return_value.is_file.return_value = True + err = ValueError("something wrong") + HSMCertificateV2ElementX509.from_pemfile.side_effect = err + + with self.assertRaises(ValueError) as e: + get_root_of_trust("a-file-path") + self.assertEqual(err, e.exception) + + path.assert_called_with("a-file-path") + HSMCertificateV2ElementX509.from_pemfile.assert_called_with( + "a-file-path", "sgx_root", "sgx_root") + + @patch("admin.attestation_utils.requests") + @patch("admin.attestation_utils.HSMCertificateV2ElementX509") + @patch("admin.attestation_utils.Path") + def test_url_ok(self, path, HSMCertificateV2ElementX509, requests): + path.return_value.is_file.return_value = False + requests.get.return_value = SimpleNamespace(**{ + "status_code": 200, + "content": b"some-pem", + }) + HSMCertificateV2ElementX509.from_pem.return_value = "the-result" + + self.assertEqual("the-result", get_root_of_trust("a-url")) + + path.assert_called_with("a-url") + requests.get.assert_called_with("a-url") + HSMCertificateV2ElementX509.from_pem.assert_called_with( + "some-pem", "sgx_root", "sgx_root") + + @patch("admin.attestation_utils.requests") + @patch("admin.attestation_utils.HSMCertificateV2ElementX509") + @patch("admin.attestation_utils.Path") + def test_url_error_get(self, path, HSMCertificateV2ElementX509, requests): + path.return_value.is_file.return_value = False + requests.get.return_value = SimpleNamespace(**{ + "status_code": 123, + }) + + with self.assertRaises(RuntimeError) as e: + get_root_of_trust("a-url") + self.assertIn("fetching root of trust", str(e.exception)) + + path.assert_called_with("a-url") + requests.get.assert_called_with("a-url") + HSMCertificateV2ElementX509.from_pem.assert_not_called() diff --git a/middleware/tests/admin/test_attestation_utils_resources.py b/middleware/tests/admin/test_attestation_utils_resources.py new file mode 100644 index 00000000..6da5092d --- /dev/null +++ b/middleware/tests/admin/test_attestation_utils_resources.py @@ -0,0 +1,36 @@ +# The MIT License (MIT) +# +# Copyright (c) 2021 RSK Labs Ltd +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +TEST_PUBKEYS_JSON = """ +{ + "m/44'/1'/0'/0/0": "04abe31ee7c91976f7a56d8e196d82d5ce75a0fcc2935723bf25610d22bd81e50fb4def0b3f99ae2054868ea2133e5b88145220ac492f86b942bd40f574d9117e1", + "m/44'/1'/1'/0/0": "04d44eac557a58be6cd4a40cbdaa9ed22cf4f0322e8c7bb84f6421d5bdda3b99ff73982e67c4550faad3f67de7615a0a32cfcf3322f5eca5cbaa6792131600ca17", + "m/44'/1'/2'/0/0": "04877a756d2b82ddff342fa327b065326001b204b2f86a24ac36638b51623301416076d2eb1a048c2efa3934d5673bdf3db8d0f1e8ade406c6a478f0910cdb8c4c" +} +""" + +TEST_PUBKEYS_JSON_INVALID = """ +{ + "path_1": "02877a756d2b82ddff342fa327b065326001b204b2f86a24ac36638b5162330141", + "path_2": "11223344" +} +""" diff --git a/middleware/tests/admin/test_certificate_v2.py b/middleware/tests/admin/test_certificate_v2.py index ed7c0c8d..99a82099 100644 --- a/middleware/tests/admin/test_certificate_v2.py +++ b/middleware/tests/admin/test_certificate_v2.py @@ -21,6 +21,7 @@ # SOFTWARE. from unittest import TestCase +from unittest.mock import patch from admin.certificate_v1 import HSMCertificate from admin.certificate_v2 import HSMCertificateV2, HSMCertificateV2Element, \ HSMCertificateV2ElementSGXQuote, \ @@ -41,17 +42,34 @@ def test_parse_identity(self): cert = HSMCertificateV2(TEST_CERTIFICATE) self.assertEqual(TEST_CERTIFICATE, cert.to_dict()) - def test_validate_and_get_values_value(self): + @patch("admin.certificate_v2.SgxQuote") + def test_validate_and_get_values_value(self, SgxQuoteMock): + SgxQuoteMock.return_value = "an-sgx-quote" cert = HSMCertificateV2(TEST_CERTIFICATE) self.assertEqual({ "quote": ( - True, - "504f5748534d3a352e343a3a736778f36f7bc09aab50c0886a442b2d04b18186720bd" - "a7a753643066cd0bc0a4191800c4d091913d39750dc8975adbdd261bd10c1c2e110fa" - "a47cfbe30e740895552bbdcb3c17c7aee714cec8ad900341bfd987b452280220dcbd6" - "e7191f67ea4209b00000000000000000000000000000000", - None) + True, { + "sgx_quote": "an-sgx-quote", + "message": "504f5748534d3a352e343a3a736778f36f7bc09aab50c0886a442b2" + "d04b18186720bda7a753643066cd0bc0a4191800c4d091913d39750" + "dc8975adbdd261bd10c1c2e110faa47cfbe30e740895552bbdcb3c1" + "7c7aee714cec8ad900341bfd987b452280220dcbd6e7191f67ea420" + "9b00000000000000000000000000000000", + }, None) }, cert.validate_and_get_values('a-root-of-trust')) + SgxQuoteMock.assert_called_with(bytes.fromhex( + "03000200000000000a000f00939a7233f79c4ca9940a0db3957f0607ceae3549bc7273eb34" + "d562f4564fc182000000000e0e100fffff0100000000000000000001000000000000000000" + "00000000000000000000000000000000000000000000050000000000000007000000000000" + "00d32688d3c1f3dfcc8b0b36eac7c89d49af331800bd56248044166fa6699442c100000000" + "00000000000000000000000000000000000000000000000000000000718c2f1a0efbd513e0" + "16fafd6cf62a624442f2d83708d4b33ab5a8d8c1cd4dd00000000000000000000000000000" + "00000000000000000000000000000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000000000000000000000000000" + "00000000000000006400010000000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000000000000000000000009e95" + "bb875c1a728071f70ad8c9d03f1744c19acb0580921e611ac9104f7701d000000000000000" + "00000000000000000000000000000000000000000000000000")) class TestHSMCertificateV2Element(TestCase): @@ -281,3 +299,29 @@ def test_from_dict_invalid_message(self): "signed_by": "platform_ca" }) self.assertIn("Invalid message", str(e.exception)) + + def test_from_pem(self): + self.assertEqual({ + "name": "thename", + "type": "x509_pem", + "message": "dGhpcyBpcyBhbiBhc2NpaSBtZXNzYWdl", + "signed_by": "whosigned", + }, HSMCertificateV2ElementX509.from_pem(""" + -----BEGIN CERTIFICATE----- + dGhpcyBpcyBhbiBhc2NpaSBtZXNzYWdl + -----END CERTIFICATE----- + """, "thename", "whosigned").to_dict()) + + @patch("admin.certificate_v2.Path") + @patch("admin.certificate_v2.HSMCertificateV2ElementX509.from_pem") + def test_from_pemfile(self, from_pem, Path): + Path.return_value.read_text.return_value = "the pem contents" + from_pem.return_value = "the instance" + self.assertEqual("the instance", + HSMCertificateV2ElementX509.from_pemfile("a-file.pem", + "the name", + "who signed")) + Path.assert_called_with("a-file.pem") + from_pem.assert_called_with("the pem contents", + "the name", + "who signed") diff --git a/middleware/tests/admin/test_verify_ledger_attestation.py b/middleware/tests/admin/test_verify_ledger_attestation.py index 40f510e2..ba5638cc 100644 --- a/middleware/tests/admin/test_verify_ledger_attestation.py +++ b/middleware/tests/admin/test_verify_ledger_attestation.py @@ -22,11 +22,12 @@ from types import SimpleNamespace from unittest import TestCase -from unittest.mock import Mock, call, patch, mock_open +from unittest.mock import Mock, call, patch from admin.misc import AdminError from admin.pubkeys import PATHS from admin.verify_ledger_attestation import do_verify_attestation import ecdsa +import secp256k1 as ec import hashlib import logging @@ -39,7 +40,7 @@ @patch("sys.stdout.write") -class TestVerifyAttestation(TestCase): +class TestVerifyLedgerAttestation(TestCase): def setUp(self): self.certification_path = 'certification-path' self.pubkeys_path = 'pubkeys-path' @@ -60,17 +61,20 @@ def setUp(self): path_name_padding = max(map(len, paths)) for path in sorted(paths): pubkey = ecdsa.SigningKey.generate(curve=ecdsa.SECP256k1).get_verifying_key() - self.public_keys[path] = pubkey.to_string('compressed').hex() + self.public_keys[path] = ec.PublicKey( + pubkey.to_string('compressed'), raw=True) pubkeys_hash.update(pubkey.to_string('uncompressed')) self.expected_pubkeys_output.append( f"{(path + ':').ljust(path_name_padding+1)} " f"{pubkey.to_string('compressed').hex()}" ) self.pubkeys_hash = pubkeys_hash.digest() + self.expected_ui_pubkey = self.public_keys[EXPECTED_UI_DERIVATION_PATH]\ + .serialize(compressed=True).hex() self.ui_msg = UI_HEADER + \ bytes.fromhex("aa"*32) + \ - bytes.fromhex("bb"*33) + \ + bytes.fromhex(self.expected_ui_pubkey) + \ bytes.fromhex("cc"*32) + \ bytes.fromhex("0123") self.ui_hash = bytes.fromhex("ee" * 32) @@ -81,7 +85,7 @@ def setUp(self): bytes.fromhex(self.pubkeys_hash.hex()) + \ bytes.fromhex('bb'*32) + \ bytes.fromhex('cc'*8) + \ - bytes.fromhex('dd'*8) + bytes.fromhex('00'*7 + 'ab') self.signer_hash = bytes.fromhex("ff" * 32) @@ -91,9 +95,9 @@ def setUp(self): @patch("admin.verify_ledger_attestation.head") @patch("admin.verify_ledger_attestation.HSMCertificate") - @patch("json.loads") + @patch("admin.verify_ledger_attestation.load_pubkeys") def test_verify_attestation_legacy(self, - loads_mock, + load_pubkeys_mock, certificate_mock, head_mock, _): self.signer_msg = LEGACY_SIGNER_HEADER + \ @@ -101,15 +105,14 @@ def test_verify_attestation_legacy(self, self.signer_hash = bytes.fromhex("ff" * 32) self.result['signer'] = (True, self.signer_msg.hex(), self.signer_hash.hex()) - loads_mock.return_value = self.public_keys + load_pubkeys_mock.return_value = self.public_keys att_cert = Mock() att_cert.validate_and_get_values = Mock(return_value=self.result) certificate_mock.from_jsonfile = Mock(return_value=att_cert) - with patch('builtins.open', mock_open(read_data='')) as file_mock: - do_verify_attestation(self.default_options) + do_verify_attestation(self.default_options) - self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list) + load_pubkeys_mock.assert_called_with(self.pubkeys_path) self.assertEqual([call(self.certification_path)], certificate_mock.from_jsonfile.call_args_list) @@ -117,7 +120,8 @@ def test_verify_attestation_legacy(self, [ "UI verified with:", f"UD value: {'aa'*32}", - f"Derived public key ({EXPECTED_UI_DERIVATION_PATH}): {'bb'*33}", + f"Derived public key ({EXPECTED_UI_DERIVATION_PATH}): " + f"{self.expected_ui_pubkey}", f"Authorized signer hash: {'cc'*32}", "Authorized signer iteration: 291", f"Installed UI hash: {'ee'*32}", @@ -140,21 +144,19 @@ def test_verify_attestation_legacy(self, @patch("admin.verify_ledger_attestation.head") @patch("admin.verify_ledger_attestation.HSMCertificate") - @patch("json.loads") + @patch("admin.verify_ledger_attestation.load_pubkeys") def test_verify_attestation(self, - loads_mock, + load_pubkeys_mock, certificate_mock, - head_mock, - _): - loads_mock.return_value = self.public_keys + head_mock, _): + load_pubkeys_mock.return_value = self.public_keys att_cert = Mock() att_cert.validate_and_get_values = Mock(return_value=self.result) certificate_mock.from_jsonfile = Mock(return_value=att_cert) - with patch('builtins.open', mock_open(read_data='')) as file_mock: - do_verify_attestation(self.default_options) + do_verify_attestation(self.default_options) - self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list) + load_pubkeys_mock.assert_called_with(self.pubkeys_path) self.assertEqual([call(self.certification_path)], certificate_mock.from_jsonfile.call_args_list) @@ -162,7 +164,8 @@ def test_verify_attestation(self, [ "UI verified with:", f"UD value: {'aa'*32}", - f"Derived public key ({EXPECTED_UI_DERIVATION_PATH}): {'bb'*33}", + f"Derived public key ({EXPECTED_UI_DERIVATION_PATH}): " + f"{self.expected_ui_pubkey}", f"Authorized signer hash: {'cc'*32}", "Authorized signer iteration: 291", f"Installed UI hash: {'ee'*32}", @@ -185,7 +188,7 @@ def test_verify_attestation(self, "Best block: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" "bbbbb", "Last transaction signed: cccccccccccccccc", - "Timestamp: dddddddddddddddd", + "Timestamp: 171", ], fill="-", ) @@ -206,66 +209,43 @@ def test_verify_attestation_no_pubkey(self, _): do_verify_attestation(options) self.assertEqual('No public keys file given', str(e.exception)) - @patch("json.loads") - def test_verify_attestation_invalid_pubkeys_map(self, loads_mock, _): - loads_mock.return_value = 'invalid-json' - with patch('builtins.open', mock_open(read_data='')): - with self.assertRaises(ValueError) as e: - do_verify_attestation(self.default_options) - - self.assertEqual(('Unable to read public keys from "pubkeys-path": Public keys ' - 'file must contain an object as a top level element'), - str(e.exception)) - - @patch("json.loads") - def test_verify_attestation_invalid_pubkey(self, loads_mock, _): - loads_mock.return_value = {'invalid-path': 'invalid-key'} - with patch('builtins.open', mock_open(read_data='')): - with self.assertRaises(AdminError) as e: - do_verify_attestation(self.default_options) - - self.assertEqual('Invalid public key for path invalid-path: invalid-key', - str(e.exception)) - - @patch("json.loads") - def test_verify_attestation_no_ui_derivation_key(self, loads_mock, _): + @patch("admin.verify_ledger_attestation.load_pubkeys") + def test_verify_attestation_no_ui_derivation_key(self, load_pubkeys_mock, _): incomplete_pubkeys = self.public_keys incomplete_pubkeys.pop(EXPECTED_UI_DERIVATION_PATH, None) - loads_mock.return_value = incomplete_pubkeys + load_pubkeys_mock.return_value = incomplete_pubkeys - with patch('builtins.open', mock_open(read_data='')) as file_mock: - with self.assertRaises(AdminError) as e: - do_verify_attestation(self.default_options) + with self.assertRaises(AdminError) as e: + do_verify_attestation(self.default_options) - self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list) + load_pubkeys_mock.assert_called_with(self.pubkeys_path) self.assertEqual((f'Public key with path {EXPECTED_UI_DERIVATION_PATH} ' 'not present in public key file'), str(e.exception)) @patch("admin.verify_ledger_attestation.HSMCertificate") - @patch("json.loads") + @patch("admin.verify_ledger_attestation.load_pubkeys") def test_verify_attestation_invalid_certificate(self, - loads_mock, + load_pubkeys_mock, certificate_mock, _): - loads_mock.return_value = self.public_keys + load_pubkeys_mock.return_value = self.public_keys certificate_mock.from_jsonfile = Mock(side_effect=Exception('error-msg')) - with patch('builtins.open', mock_open(read_data='')) as file_mock: - with self.assertRaises(AdminError) as e: - do_verify_attestation(self.default_options) + with self.assertRaises(AdminError) as e: + do_verify_attestation(self.default_options) - self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list) + load_pubkeys_mock.assert_called_with(self.pubkeys_path) self.assertEqual('While loading the attestation certificate file: error-msg', str(e.exception)) @patch("admin.verify_ledger_attestation.HSMCertificate") - @patch("json.loads") + @patch("admin.verify_ledger_attestation.load_pubkeys") def test_verify_attestation_no_ui_att(self, - loads_mock, + load_pubkeys_mock, certificate_mock, _): - loads_mock.return_value = self.public_keys + load_pubkeys_mock.return_value = self.public_keys result = self.result result.pop('ui', None) @@ -273,115 +253,106 @@ def test_verify_attestation_no_ui_att(self, att_cert.validate_and_get_values = Mock(return_value=self.result) certificate_mock.from_jsonfile = Mock(return_value=att_cert) - with patch('builtins.open', mock_open(read_data='')) as file_mock: - with self.assertRaises(AdminError) as e: - do_verify_attestation(self.default_options) + with self.assertRaises(AdminError) as e: + do_verify_attestation(self.default_options) - self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list) + load_pubkeys_mock.assert_called_with(self.pubkeys_path) self.assertEqual('Certificate does not contain a UI attestation', str(e.exception)) @patch("admin.verify_ledger_attestation.HSMCertificate") - @patch("json.loads") + @patch("admin.verify_ledger_attestation.load_pubkeys") def test_verify_attestation_invalid_ui_att(self, - loads_mock, + load_pubkeys_mock, certificate_mock, _): - loads_mock.return_value = self.public_keys + load_pubkeys_mock.return_value = self.public_keys result = self.result result['ui'] = (False, 'ui') att_cert = Mock() att_cert.validate_and_get_values = Mock(return_value=result) certificate_mock.from_jsonfile = Mock(return_value=att_cert) - with patch('builtins.open', mock_open(read_data='')) as file_mock: - with self.assertRaises(AdminError) as e: - do_verify_attestation(self.default_options) + with self.assertRaises(AdminError) as e: + do_verify_attestation(self.default_options) - self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list) + load_pubkeys_mock.assert_called_with(self.pubkeys_path) self.assertEqual("Invalid UI attestation: error validating 'ui'", str(e.exception)) @patch("admin.verify_ledger_attestation.HSMCertificate") - @patch("json.loads") + @patch("admin.verify_ledger_attestation.load_pubkeys") def test_verify_attestation_no_signer_att(self, - loads_mock, + load_pubkeys_mock, certificate_mock, _): - loads_mock.return_value = self.public_keys - + load_pubkeys_mock.return_value = self.public_keys result = self.result result.pop('signer', None) att_cert = Mock() att_cert.validate_and_get_values = Mock(return_value=self.result) certificate_mock.from_jsonfile = Mock(return_value=att_cert) - with patch('builtins.open', mock_open(read_data='')) as file_mock: - with self.assertRaises(AdminError) as e: - do_verify_attestation(self.default_options) + with self.assertRaises(AdminError) as e: + do_verify_attestation(self.default_options) - self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list) + load_pubkeys_mock.assert_called_with(self.pubkeys_path) self.assertEqual('Certificate does not contain a Signer attestation', str(e.exception)) @patch("admin.verify_ledger_attestation.HSMCertificate") - @patch("json.loads") + @patch("admin.verify_ledger_attestation.load_pubkeys") def test_verify_attestation_invalid_signer_att(self, - loads_mock, + load_pubkeys_mock, certificate_mock, _): - loads_mock.return_value = self.public_keys + load_pubkeys_mock.return_value = self.public_keys result = self.result result['signer'] = (False, 'signer') att_cert = Mock() att_cert.validate_and_get_values = Mock(return_value=result) certificate_mock.from_jsonfile = Mock(return_value=att_cert) - with patch('builtins.open', mock_open(read_data='')) as file_mock: - with self.assertRaises(AdminError) as e: - do_verify_attestation(self.default_options) + with self.assertRaises(AdminError) as e: + do_verify_attestation(self.default_options) - self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list) + load_pubkeys_mock.assert_called_with(self.pubkeys_path) self.assertEqual(("Invalid Signer attestation: error validating 'signer'"), str(e.exception)) @patch("admin.verify_ledger_attestation.HSMCertificate") - @patch("json.loads") + @patch("admin.verify_ledger_attestation.load_pubkeys") def test_verify_attestation_invalid_signer_att_header(self, - loads_mock, + load_pubkeys_mock, certificate_mock, _): - loads_mock.return_value = self.public_keys + load_pubkeys_mock.return_value = self.public_keys signer_header = b"POWHSM:AAA::somerandomstuff".hex() self.result["signer"] = (True, signer_header, self.signer_hash.hex()) att_cert = Mock() att_cert.validate_and_get_values = Mock(return_value=self.result) certificate_mock.from_jsonfile = Mock(return_value=att_cert) - with patch('builtins.open', mock_open(read_data='')) as file_mock: - with self.assertRaises(AdminError) as e: - do_verify_attestation(self.default_options) + with self.assertRaises(AdminError) as e: + do_verify_attestation(self.default_options) - self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list) + load_pubkeys_mock.assert_called_with(self.pubkeys_path) self.assertEqual((f"Invalid Signer attestation message header: {signer_header}"), str(e.exception)) @patch("admin.verify_ledger_attestation.HSMCertificate") - @patch("json.loads") + @patch("admin.verify_ledger_attestation.load_pubkeys") def test_verify_attestation_invalid_signer_att_msg_too_long(self, - loads_mock, + load_pubkeys_mock, certificate_mock, _): - loads_mock.return_value = self.public_keys + load_pubkeys_mock.return_value = self.public_keys signer_header = (b"POWHSM:5.9::" + b"aa"*300).hex() self.result["signer"] = (True, signer_header, self.signer_hash.hex()) att_cert = Mock() att_cert.validate_and_get_values = Mock(return_value=self.result) certificate_mock.from_jsonfile = Mock(return_value=att_cert) - with patch('builtins.open', mock_open(read_data='')) as file_mock: - with self.assertRaises(AdminError) as e: - do_verify_attestation(self.default_options) + with self.assertRaises(AdminError) as e: + do_verify_attestation(self.default_options) - self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list) - self.assertEqual(("Signer attestation message longer " - f"than expected: {signer_header}"), - str(e.exception)) + load_pubkeys_mock.assert_called_with(self.pubkeys_path) + self.assertIn("Signer attestation message length mismatch", str(e.exception)) diff --git a/middleware/tests/admin/test_verify_sgx_attestation.py b/middleware/tests/admin/test_verify_sgx_attestation.py new file mode 100644 index 00000000..5bacadf5 --- /dev/null +++ b/middleware/tests/admin/test_verify_sgx_attestation.py @@ -0,0 +1,241 @@ +# The MIT License (MIT) +# +# Copyright (c) 2021 RSK Labs Ltd +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is furnished to do +# so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from types import SimpleNamespace +from unittest import TestCase +from unittest.mock import Mock, patch +from parameterized import parameterized +from admin.misc import AdminError +from admin.pubkeys import PATHS +from admin.verify_sgx_attestation import do_verify_attestation, DEFAULT_ROOT_AUTHORITY +import ecdsa +import secp256k1 as ec +import hashlib +import logging + +logging.disable(logging.CRITICAL) + + +@patch("sys.stdout.write") +@patch("admin.verify_sgx_attestation.head") +@patch("admin.verify_sgx_attestation.HSMCertificate") +@patch("admin.verify_sgx_attestation.load_pubkeys") +@patch("admin.verify_sgx_attestation.get_root_of_trust") +class TestVerifySgxAttestation(TestCase): + def setUp(self): + self.certification_path = 'certification-path' + self.pubkeys_path = 'pubkeys-path' + self.options = SimpleNamespace(**{ + 'attestation_certificate_file_path': self.certification_path, + 'pubkeys_file_path': self.pubkeys_path, + 'root_authority': None + }) + + paths = [] + for path in PATHS.values(): + paths.append(str(path)) + + self.public_keys = {} + self.expected_pubkeys_output = [] + pubkeys_hash = hashlib.sha256() + path_name_padding = max(map(len, paths)) + for path in sorted(paths): + pubkey = ecdsa.SigningKey.generate(curve=ecdsa.SECP256k1).get_verifying_key() + self.public_keys[path] = ec.PublicKey( + pubkey.to_string('compressed'), raw=True) + pubkeys_hash.update(pubkey.to_string('uncompressed')) + self.expected_pubkeys_output.append( + f"{(path + ':').ljust(path_name_padding+1)} " + f"{pubkey.to_string('compressed').hex()}" + ) + self.expected_pubkeys_hash = pubkeys_hash.digest().hex() + + self.powhsm_msg = \ + b"POWHSM:5.4::" + \ + b'plf' + \ + bytes.fromhex('aa'*32) + \ + bytes.fromhex(self.expected_pubkeys_hash) + \ + bytes.fromhex('bb'*32) + \ + bytes.fromhex('cc'*8) + \ + bytes.fromhex('00'*7 + 'cd') + + self.mock_sgx_quote = SimpleNamespace(**{ + "report_body": SimpleNamespace(**{ + "mrenclave": bytes.fromhex("aabbccdd"), + "mrsigner": bytes.fromhex("1122334455"), + }) + }) + + self.validate_result = {"quote": ( + True, { + "sgx_quote": self.mock_sgx_quote, + "message": self.powhsm_msg.hex() + }, None) + } + + def configure_mocks(self, get_root_of_trust, load_pubkeys, + HSMCertificate, head): + get_root_of_trust.return_value = "the-root-of-trust" + load_pubkeys.return_value = self.public_keys + self.mock_certificate = Mock() + self.mock_certificate.validate_and_get_values.return_value = self.validate_result + HSMCertificate.from_jsonfile.return_value = self.mock_certificate + + @parameterized.expand([ + ("default_root", None), + ("custom_root", "a-custom-root") + ]) + def test_verify_attestation(self, get_root_of_trust, load_pubkeys, + HSMCertificate, head, _, __, custom_root): + self.configure_mocks(get_root_of_trust, load_pubkeys, HSMCertificate, head) + if custom_root: + self.options.root_authority = custom_root + + do_verify_attestation(self.options) + + if custom_root: + get_root_of_trust.assert_called_with(custom_root) + else: + get_root_of_trust.assert_called_with(DEFAULT_ROOT_AUTHORITY) + load_pubkeys.assert_called_with(self.pubkeys_path) + HSMCertificate.from_jsonfile.assert_called_with(self.certification_path) + self.mock_certificate.validate_and_get_values \ + .assert_called_with("the-root-of-trust") + head.assert_called_with([ + "powHSM verified with public keys:" + ] + self.expected_pubkeys_output + [ + f"Hash: {self.expected_pubkeys_hash}", + "", + "Installed powHSM MRENCLAVE: aabbccdd", + "Installed powHSM MRSIGNER: 1122334455", + "Installed powHSM version: 5.4", + "Platform: plf", + f"UD value: {"aa"*32}", + f"Best block: {"bb"*32}", + f"Last transaction signed: {"cc"*8}", + "Timestamp: 205", + ], fill="-") + + def test_verify_attestation_err_load_pubkeys(self, get_root_of_trust, load_pubkeys, + HSMCertificate, head, _): + self.configure_mocks(get_root_of_trust, load_pubkeys, HSMCertificate, head) + load_pubkeys.side_effect = ValueError("pubkeys error") + + with self.assertRaises(AdminError) as e: + do_verify_attestation(self.options) + self.assertIn("pubkeys error", str(e.exception)) + + get_root_of_trust.assert_called_with(DEFAULT_ROOT_AUTHORITY) + load_pubkeys.assert_called_with(self.pubkeys_path) + HSMCertificate.from_jsonfile.assert_not_called() + self.mock_certificate.validate_and_get_values.assert_not_called() + + def test_verify_attestation_err_load_cert(self, get_root_of_trust, load_pubkeys, + HSMCertificate, head, _): + self.configure_mocks(get_root_of_trust, load_pubkeys, HSMCertificate, head) + HSMCertificate.from_jsonfile.side_effect = ValueError("load cert error") + + with self.assertRaises(AdminError) as e: + do_verify_attestation(self.options) + self.assertIn("load cert error", str(e.exception)) + + get_root_of_trust.assert_called_with(DEFAULT_ROOT_AUTHORITY) + load_pubkeys.assert_called_with(self.pubkeys_path) + HSMCertificate.from_jsonfile.assert_called_with(self.certification_path) + self.mock_certificate.validate_and_get_values.assert_not_called() + + def test_verify_attestation_validation_noquote(self, get_root_of_trust, load_pubkeys, + HSMCertificate, head, _): + self.configure_mocks(get_root_of_trust, load_pubkeys, HSMCertificate, head) + self.mock_certificate.validate_and_get_values.return_value = {"something": "else"} + + with self.assertRaises(AdminError) as e: + do_verify_attestation(self.options) + self.assertIn("does not contain", str(e.exception)) + + get_root_of_trust.assert_called_with(DEFAULT_ROOT_AUTHORITY) + load_pubkeys.assert_called_with(self.pubkeys_path) + HSMCertificate.from_jsonfile.assert_called_with(self.certification_path) + self.mock_certificate.validate_and_get_values \ + .assert_called_with("the-root-of-trust") + + def test_verify_attestation_validation_failed(self, get_root_of_trust, load_pubkeys, + HSMCertificate, head, _): + self.configure_mocks(get_root_of_trust, load_pubkeys, HSMCertificate, head) + self.mock_certificate.validate_and_get_values.return_value = { + "quote": (False, "a validation error") + } + + with self.assertRaises(AdminError) as e: + do_verify_attestation(self.options) + self.assertIn("validation error", str(e.exception)) + + get_root_of_trust.assert_called_with(DEFAULT_ROOT_AUTHORITY) + load_pubkeys.assert_called_with(self.pubkeys_path) + HSMCertificate.from_jsonfile.assert_called_with(self.certification_path) + self.mock_certificate.validate_and_get_values \ + .assert_called_with("the-root-of-trust") + + def test_verify_attestation_invalid_header(self, get_root_of_trust, load_pubkeys, + HSMCertificate, head, _): + self.configure_mocks(get_root_of_trust, load_pubkeys, HSMCertificate, head) + self.validate_result["quote"][1]["message"] = "aabbccdd" + + with self.assertRaises(AdminError) as e: + do_verify_attestation(self.options) + self.assertIn("message header", str(e.exception)) + + get_root_of_trust.assert_called_with(DEFAULT_ROOT_AUTHORITY) + load_pubkeys.assert_called_with(self.pubkeys_path) + HSMCertificate.from_jsonfile.assert_called_with(self.certification_path) + self.mock_certificate.validate_and_get_values \ + .assert_called_with("the-root-of-trust") + + def test_verify_attestation_invalid_message(self, get_root_of_trust, load_pubkeys, + HSMCertificate, head, _): + self.configure_mocks(get_root_of_trust, load_pubkeys, HSMCertificate, head) + self.validate_result["quote"][1]["message"] = b"POWHSM:5.4::plf".hex() + + with self.assertRaises(AdminError) as e: + do_verify_attestation(self.options) + self.assertIn("parsing", str(e.exception)) + + get_root_of_trust.assert_called_with(DEFAULT_ROOT_AUTHORITY) + load_pubkeys.assert_called_with(self.pubkeys_path) + HSMCertificate.from_jsonfile.assert_called_with(self.certification_path) + self.mock_certificate.validate_and_get_values \ + .assert_called_with("the-root-of-trust") + + def test_verify_attestation_pkh_mismatch(self, get_root_of_trust, load_pubkeys, + HSMCertificate, head, _): + self.configure_mocks(get_root_of_trust, load_pubkeys, HSMCertificate, head) + self.public_keys.popitem() + + with self.assertRaises(AdminError) as e: + do_verify_attestation(self.options) + self.assertIn("hash mismatch", str(e.exception)) + + get_root_of_trust.assert_called_with(DEFAULT_ROOT_AUTHORITY) + load_pubkeys.assert_called_with(self.pubkeys_path) + HSMCertificate.from_jsonfile.assert_called_with(self.certification_path) + self.mock_certificate.validate_and_get_values \ + .assert_called_with("the-root-of-trust") diff --git a/setup.cfg b/setup.cfg index 36a420ae..1dbd51f0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,6 +27,7 @@ per-file-ignores = middleware/admin/certificate.py:F401, middleware/tests/sgx/test_envelope.py:E122, middleware/tests/admin/test_certificate_v2_resources.py:E501, + middleware/tests/admin/test_attestation_utils_resources.py:E501, show-source = False statistics = True