Skip to content

Commit

Permalink
Certificate v2 validator
Browse files Browse the repository at this point in the history
- Replacing plain public key in validate_and_get_values method for a root of trust object with get_pubkey method
- Using inherited validate_and_get_values in HSMCertificateV2
- Added HSMCertificateRoot to certificate version 1 to act as root of trust in existing validations
- Updated verify ledger attestation accordingly
- Added and updated unit tests
  • Loading branch information
amendelzon committed Dec 20, 2024
1 parent 76b4683 commit e2fea3a
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 65 deletions.
2 changes: 1 addition & 1 deletion middleware/admin/certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from .certificate_v1 import HSMCertificate, HSMCertificateElement
from .certificate_v1 import HSMCertificate, HSMCertificateRoot, HSMCertificateElement
from .certificate_v2 import HSMCertificateV2, HSMCertificateV2ElementSGXQuote, \
HSMCertificateV2ElementSGXAttestationKey, \
HSMCertificateV2ElementX509
Expand Down
42 changes: 28 additions & 14 deletions middleware/admin/certificate_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@
from .utils import is_nonempty_hex_string


class HSMCertificateRoot:
def __init__(self, raw_pubkey_hex):
# Parse the public key
try:
self.pubkey = ec.PublicKey(bytes.fromhex(raw_pubkey_hex), raw=True)
except Exception:
raise ValueError("Error parsing certificate root public key")

def __repr__(self):
return self.pubkey.serialize(compressed=False).hex()

def get_pubkey(self):
return self.pubkey


class HSMCertificateElement:
VALID_NAMES = ["device", "attestation", "ui", "signer"]
EXTRACTORS = {
Expand Down Expand Up @@ -98,10 +113,11 @@ def to_dict(self):

return result

def is_valid(self, certifier_pubkey):
def is_valid(self, certifier):
try:
message = bytes.fromhex(self.message)

certifier_pubkey = certifier.get_pubkey()
verifier_pubkey = certifier_pubkey
if self.tweak is not None:
tweak = hmac.new(
Expand All @@ -120,6 +136,12 @@ def is_valid(self, certifier_pubkey):
def get_value(self):
return self.EXTRACTORS[self.name](bytes.fromhex(self.message)).hex()

def get_pubkey(self):
return ec.PublicKey(bytes.fromhex(self.get_value()), raw=True)

def get_tweak(self):
return self.tweak


class HSMCertificate:
VERSION = 1 # Only supported version
Expand Down Expand Up @@ -155,14 +177,7 @@ def __init__(self, certificate_map=None):
if certificate_map is not None:
self._parse(certificate_map)

def validate_and_get_values(self, raw_root_pubkey_hex):
# Parse the root public key
try:
root_pubkey = ec.PublicKey(bytes.fromhex(raw_root_pubkey_hex), raw=True)
except Exception:
return dict([(target, (False, self.ROOT_ELEMENT))
for target in self._targets])

def validate_and_get_values(self, root_of_trust):
result = {}
for target in self._targets:
# Build the chain from the target to the root
Expand All @@ -178,19 +193,18 @@ def validate_and_get_values(self, raw_root_pubkey_hex):
# If valid, return True and the value of the leaf
# If not valid, return False and the name of the element that
# failed the validation
current_pubkey = root_pubkey
current_certifier = root_of_trust
while True:
# Validate this element
if not current.is_valid(current_pubkey):
if not current.is_valid(current_certifier):
result[target] = (False, current.name)
break
# Reached the leaf? => valid!
if len(chain) == 0:
result[target] = (True, current.get_value(), current.tweak)
result[target] = (True, current.get_value(), current.get_tweak())
break

current_pubkey = ec.PublicKey(bytes.fromhex(current.get_value()),
raw=True)
current_certifier = current
current = chain.pop()

return result
Expand Down
26 changes: 20 additions & 6 deletions middleware/admin/certificate_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@

class HSMCertificateV2Element:
def __init__(self):
raise RuntimeError("Cannot instantiate an "
"abstract HSMCertificateV2Element")
raise NotImplementedError("Cannot instantiate a HSMCertificateV2Element")

@classmethod
def from_dict(kls, element_map):
Expand Down Expand Up @@ -56,6 +55,22 @@ def name(self):
def signed_by(self):
return self._signed_by

def get_value(self):
raise NotImplementedError(f"{type(self).__name__} can't provide a value")

def get_pubkey(self):
# TODO: this should yield not implemented
# TODO: implementation should be down to each specific subclass
return None

def is_valid(self, certifier):
# TODO: this should yield not implemented
# TODO: implementation should be down to each specific subclass
return True

def get_tweak(self):
return None


class HSMCertificateV2ElementSGXQuote(HSMCertificateV2Element):
def __init__(self, element_map):
Expand Down Expand Up @@ -89,6 +104,9 @@ def custom_data(self):
def signature(self):
return self._signature.hex()

def get_value(self):
return self.custom_data

def to_dict(self):
return {
"name": self.name,
Expand Down Expand Up @@ -189,7 +207,3 @@ class HSMCertificateV2(HSMCertificate):
ROOT_ELEMENT = "sgx_root"
ELEMENT_BASE_CLASS = HSMCertificateV2Element
ELEMENT_FACTORY = HSMCertificateV2Element.from_dict

def validate_and_get_values(self, raw_root_pubkey_hex):
# TODO
pass
6 changes: 5 additions & 1 deletion middleware/admin/verify_ledger_attestation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import re
from .misc import info, head, AdminError
from .utils import is_nonempty_hex_string
from .certificate import HSMCertificate
from .certificate import HSMCertificate, HSMCertificateRoot


UI_MESSAGE_HEADER_REGEX = re.compile(b"^HSM:UI:(5.[0-9])")
Expand Down Expand Up @@ -69,6 +69,10 @@ def do_verify_attestation(options):
if not is_nonempty_hex_string(options.root_authority):
raise AdminError("Invalid root authority")
root_authority = options.root_authority
try:
root_authority = HSMCertificateRoot(root_authority)
except ValueError:
raise AdminError("Invalid root authority")
info(f"Using {root_authority} as root authority")

# Load the given public keys and compute
Expand Down
45 changes: 5 additions & 40 deletions middleware/tests/admin/test_certificate_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from unittest import TestCase
from unittest.mock import call, patch, mock_open
from admin.certificate import HSMCertificate, HSMCertificateElement
from admin.certificate import HSMCertificate, HSMCertificateRoot, HSMCertificateElement


class TestHSMCertificate(TestCase):
Expand Down Expand Up @@ -240,6 +240,7 @@ def test_create_certificate_signer_not_in_elements(self):
def test_validate_and_get_values_ok(self):
root_privkey = ec.PrivateKey()
root_pubkey = root_privkey.pubkey.serialize(compressed=False).hex()
root_of_trust = HSMCertificateRoot(root_pubkey)
device_privkey = ec.PrivateKey()
device_pubkey = device_privkey.pubkey.serialize(compressed=False).hex()
att_pubkey = ec.PrivateKey().pubkey.serialize(compressed=False).hex()
Expand Down Expand Up @@ -273,48 +274,12 @@ def test_validate_and_get_values_ok(self):
self.assertEqual({
'attestation': (True, att_pubkey, None),
'device': (True, device_pubkey, None)
}, cert.validate_and_get_values(root_pubkey))

def test_create_and_get_values_invalid_pubkey(self):
root_privkey = ec.PrivateKey()
device_privkey = ec.PrivateKey()
device_pubkey = device_privkey.pubkey.serialize(compressed=False).hex()
att_pubkey = ec.PrivateKey().pubkey.serialize(compressed=False).hex()

att_msg = 'ff' + att_pubkey
att_sig = device_privkey.ecdsa_serialize(
device_privkey.ecdsa_sign(bytes.fromhex(att_msg))).hex()

device_msg = os.urandom(16).hex() + device_pubkey
device_sig = root_privkey.ecdsa_serialize(
root_privkey.ecdsa_sign(bytes.fromhex(device_msg))).hex()

cert = HSMCertificate({
"version": 1,
"targets": ["attestation", "device"],
"elements": [
{
"name": "attestation",
"message": att_msg,
"signature": att_sig,
"signed_by": "device"
},
{
"name": "device",
"message": device_msg,
"signature": device_sig,
"signed_by": "root"
}]
})

self.assertEqual({
'attestation': (False, 'root'),
'device': (False, 'root')
}, cert.validate_and_get_values('invalid-pubkey'))
}, cert.validate_and_get_values(root_of_trust))

def test_validate_and_get_values_invalid_element(self):
root_privkey = ec.PrivateKey()
root_pubkey = root_privkey.pubkey.serialize(compressed=False).hex()
root_of_trust = HSMCertificateRoot(root_pubkey)
device_privkey = ec.PrivateKey()
device_pubkey = device_privkey.pubkey.serialize(compressed=False).hex()
att_pubkey = ec.PrivateKey().pubkey.serialize(compressed=False).hex()
Expand Down Expand Up @@ -347,7 +312,7 @@ def test_validate_and_get_values_invalid_element(self):
self.assertEqual({
'attestation': (False, 'attestation'),
'device': (True, device_pubkey, None)
}, cert.validate_and_get_values(root_pubkey))
}, cert.validate_and_get_values(root_of_trust))

def test_validate_and_get_values_invalid_elements(self):
att_privkey = ec.PrivateKey()
Expand Down
22 changes: 19 additions & 3 deletions middleware/tests/admin/test_certificate_v1_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,21 @@
import secp256k1 as ec

from unittest import TestCase
from admin.certificate import HSMCertificateElement
from unittest.mock import Mock
from admin.certificate import HSMCertificateRoot, HSMCertificateElement


class TestHSMCertificateRoot(TestCase):
def test_ok(self):
pubkey = ec.PrivateKey().pubkey
root = HSMCertificateRoot(pubkey.serialize(compressed=False).hex())
self.assertEqual(
pubkey.serialize(compressed=True),
root.get_pubkey().serialize(compressed=True))

def test_invalid_pubkey(self):
with self.assertRaises(ValueError):
HSMCertificateRoot("invalid-pubkey")


class TestHSMCertificateElement(TestCase):
Expand Down Expand Up @@ -100,6 +114,7 @@ def test_certificate_element_is_valid_ok(self):
privkey = ec.PrivateKey()
msg = 'aa' * 65
signature = privkey.ecdsa_serialize(privkey.ecdsa_sign(bytes.fromhex(msg))).hex()
mock_certifier = Mock(get_pubkey=lambda: privkey.pubkey)

element = HSMCertificateElement({
"name": "device",
Expand All @@ -113,7 +128,7 @@ def test_certificate_element_is_valid_ok(self):
"signature": signature,
"signed_by": "root"
}, element.to_dict())
self.assertTrue(element.is_valid(privkey.pubkey))
self.assertTrue(element.is_valid(mock_certifier))

def test_certificate_element_is_valid_with_tweak_ok(self):
privkey = ec.PrivateKey()
Expand All @@ -124,6 +139,7 @@ def test_certificate_element_is_valid_with_tweak_ok(self):
pubkey.serialize(compressed=False),
hashlib.sha256,
).digest()
mock_certifier = Mock(get_pubkey=lambda: pubkey)

tweak_privkey = ec.PrivateKey(privkey.tweak_add(tweak), raw=True)
msg = os.urandom(66).hex()
Expand All @@ -144,7 +160,7 @@ def test_certificate_element_is_valid_with_tweak_ok(self):
"signed_by": "root",
"tweak": raw_tweak
}, element.to_dict())
self.assertTrue(element.is_valid(pubkey))
self.assertTrue(element.is_valid(mock_certifier))

def test_certificate_element_is_valid_wrong_signature(self):
privkey = ec.PrivateKey()
Expand Down
12 changes: 12 additions & 0 deletions middleware/tests/admin/test_certificate_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ def test_parse_identity(self):
cert = HSMCertificateV2(TEST_CERTIFICATE)
self.assertEqual(TEST_CERTIFICATE, cert.to_dict())

def test_validate_and_get_values_value(self):
cert = HSMCertificateV2(TEST_CERTIFICATE)
self.assertEqual({
"quote": (
True,
"504f5748534d3a352e343a3a736778f36f7bc09aab50c0886a442b2d04b18186720bd"
"a7a753643066cd0bc0a4191800c4d091913d39750dc8975adbdd261bd10c1c2e110fa"
"a47cfbe30e740895552bbdcb3c17c7aee714cec8ad900341bfd987b452280220dcbd6"
"e7191f67ea4209b00000000000000000000000000000000",
None)
}, cert.validate_and_get_values('a-root-of-trust'))


class TestHSMCertificateV2Element(TestCase):
def test_from_dict_unknown_type(self):
Expand Down

0 comments on commit e2fea3a

Please sign in to comment.