Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Certificate v2 validator #231

Open
wants to merge 1 commit into
base: feature/sgx-attestation
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion middleware/admin/certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from .certificate_v1 import HSMCertificate, HSMCertificateElement
from .certificate_v1 import HSMCertificate, HSMCertificateRoot, HSMCertificateElement
from .certificate_v2 import HSMCertificateV2, HSMCertificateV2ElementSGXQuote, \
HSMCertificateV2ElementSGXAttestationKey, \
HSMCertificateV2ElementX509
Expand Down
42 changes: 28 additions & 14 deletions middleware/admin/certificate_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@
from .utils import is_nonempty_hex_string


class HSMCertificateRoot:
def __init__(self, raw_pubkey_hex):
# Parse the public key
try:
self.pubkey = ec.PublicKey(bytes.fromhex(raw_pubkey_hex), raw=True)
except Exception:
raise ValueError("Error parsing certificate root public key")

def __repr__(self):
return self.pubkey.serialize(compressed=False).hex()

def get_pubkey(self):
return self.pubkey


class HSMCertificateElement:
VALID_NAMES = ["device", "attestation", "ui", "signer"]
EXTRACTORS = {
Expand Down Expand Up @@ -98,10 +113,11 @@ def to_dict(self):

return result

def is_valid(self, certifier_pubkey):
def is_valid(self, certifier):
try:
message = bytes.fromhex(self.message)

certifier_pubkey = certifier.get_pubkey()
verifier_pubkey = certifier_pubkey
if self.tweak is not None:
tweak = hmac.new(
Expand All @@ -120,6 +136,12 @@ def is_valid(self, certifier_pubkey):
def get_value(self):
return self.EXTRACTORS[self.name](bytes.fromhex(self.message)).hex()

def get_pubkey(self):
return ec.PublicKey(bytes.fromhex(self.get_value()), raw=True)

def get_tweak(self):
return self.tweak


class HSMCertificate:
VERSION = 1 # Only supported version
Expand Down Expand Up @@ -155,14 +177,7 @@ def __init__(self, certificate_map=None):
if certificate_map is not None:
self._parse(certificate_map)

def validate_and_get_values(self, raw_root_pubkey_hex):
# Parse the root public key
try:
root_pubkey = ec.PublicKey(bytes.fromhex(raw_root_pubkey_hex), raw=True)
except Exception:
return dict([(target, (False, self.ROOT_ELEMENT))
for target in self._targets])

def validate_and_get_values(self, root_of_trust):
result = {}
for target in self._targets:
# Build the chain from the target to the root
Expand All @@ -178,19 +193,18 @@ def validate_and_get_values(self, raw_root_pubkey_hex):
# If valid, return True and the value of the leaf
# If not valid, return False and the name of the element that
# failed the validation
current_pubkey = root_pubkey
current_certifier = root_of_trust
while True:
# Validate this element
if not current.is_valid(current_pubkey):
if not current.is_valid(current_certifier):
result[target] = (False, current.name)
break
# Reached the leaf? => valid!
if len(chain) == 0:
result[target] = (True, current.get_value(), current.tweak)
result[target] = (True, current.get_value(), current.get_tweak())
break

current_pubkey = ec.PublicKey(bytes.fromhex(current.get_value()),
raw=True)
current_certifier = current
current = chain.pop()

return result
Expand Down
26 changes: 20 additions & 6 deletions middleware/admin/certificate_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@

class HSMCertificateV2Element:
def __init__(self):
raise RuntimeError("Cannot instantiate an "
"abstract HSMCertificateV2Element")
raise NotImplementedError("Cannot instantiate a HSMCertificateV2Element")

@classmethod
def from_dict(kls, element_map):
Expand Down Expand Up @@ -56,6 +55,22 @@ def name(self):
def signed_by(self):
return self._signed_by

def get_value(self):
raise NotImplementedError(f"{type(self).__name__} can't provide a value")

def get_pubkey(self):
# TODO: this should yield not implemented
# TODO: implementation should be down to each specific subclass
return None

def is_valid(self, certifier):
# TODO: this should yield not implemented
# TODO: implementation should be down to each specific subclass
return True

def get_tweak(self):
return None


class HSMCertificateV2ElementSGXQuote(HSMCertificateV2Element):
def __init__(self, element_map):
Expand Down Expand Up @@ -89,6 +104,9 @@ def custom_data(self):
def signature(self):
return self._signature.hex()

def get_value(self):
return self.custom_data

def to_dict(self):
return {
"name": self.name,
Expand Down Expand Up @@ -189,7 +207,3 @@ class HSMCertificateV2(HSMCertificate):
ROOT_ELEMENT = "sgx_root"
ELEMENT_BASE_CLASS = HSMCertificateV2Element
ELEMENT_FACTORY = HSMCertificateV2Element.from_dict

def validate_and_get_values(self, raw_root_pubkey_hex):
# TODO
pass
6 changes: 5 additions & 1 deletion middleware/admin/verify_ledger_attestation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import re
from .misc import info, head, AdminError
from .utils import is_nonempty_hex_string
from .certificate import HSMCertificate
from .certificate import HSMCertificate, HSMCertificateRoot


UI_MESSAGE_HEADER_REGEX = re.compile(b"^HSM:UI:(5.[0-9])")
Expand Down Expand Up @@ -69,6 +69,10 @@ def do_verify_attestation(options):
if not is_nonempty_hex_string(options.root_authority):
raise AdminError("Invalid root authority")
root_authority = options.root_authority
try:
root_authority = HSMCertificateRoot(root_authority)
except ValueError:
raise AdminError("Invalid root authority")
info(f"Using {root_authority} as root authority")

# Load the given public keys and compute
Expand Down
45 changes: 5 additions & 40 deletions middleware/tests/admin/test_certificate_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from unittest import TestCase
from unittest.mock import call, patch, mock_open
from admin.certificate import HSMCertificate, HSMCertificateElement
from admin.certificate import HSMCertificate, HSMCertificateRoot, HSMCertificateElement


class TestHSMCertificate(TestCase):
Expand Down Expand Up @@ -240,6 +240,7 @@ def test_create_certificate_signer_not_in_elements(self):
def test_validate_and_get_values_ok(self):
root_privkey = ec.PrivateKey()
root_pubkey = root_privkey.pubkey.serialize(compressed=False).hex()
root_of_trust = HSMCertificateRoot(root_pubkey)
device_privkey = ec.PrivateKey()
device_pubkey = device_privkey.pubkey.serialize(compressed=False).hex()
att_pubkey = ec.PrivateKey().pubkey.serialize(compressed=False).hex()
Expand Down Expand Up @@ -273,48 +274,12 @@ def test_validate_and_get_values_ok(self):
self.assertEqual({
'attestation': (True, att_pubkey, None),
'device': (True, device_pubkey, None)
}, cert.validate_and_get_values(root_pubkey))

def test_create_and_get_values_invalid_pubkey(self):
root_privkey = ec.PrivateKey()
device_privkey = ec.PrivateKey()
device_pubkey = device_privkey.pubkey.serialize(compressed=False).hex()
att_pubkey = ec.PrivateKey().pubkey.serialize(compressed=False).hex()

att_msg = 'ff' + att_pubkey
att_sig = device_privkey.ecdsa_serialize(
device_privkey.ecdsa_sign(bytes.fromhex(att_msg))).hex()

device_msg = os.urandom(16).hex() + device_pubkey
device_sig = root_privkey.ecdsa_serialize(
root_privkey.ecdsa_sign(bytes.fromhex(device_msg))).hex()

cert = HSMCertificate({
"version": 1,
"targets": ["attestation", "device"],
"elements": [
{
"name": "attestation",
"message": att_msg,
"signature": att_sig,
"signed_by": "device"
},
{
"name": "device",
"message": device_msg,
"signature": device_sig,
"signed_by": "root"
}]
})

self.assertEqual({
'attestation': (False, 'root'),
'device': (False, 'root')
}, cert.validate_and_get_values('invalid-pubkey'))
}, cert.validate_and_get_values(root_of_trust))

def test_validate_and_get_values_invalid_element(self):
root_privkey = ec.PrivateKey()
root_pubkey = root_privkey.pubkey.serialize(compressed=False).hex()
root_of_trust = HSMCertificateRoot(root_pubkey)
device_privkey = ec.PrivateKey()
device_pubkey = device_privkey.pubkey.serialize(compressed=False).hex()
att_pubkey = ec.PrivateKey().pubkey.serialize(compressed=False).hex()
Expand Down Expand Up @@ -347,7 +312,7 @@ def test_validate_and_get_values_invalid_element(self):
self.assertEqual({
'attestation': (False, 'attestation'),
'device': (True, device_pubkey, None)
}, cert.validate_and_get_values(root_pubkey))
}, cert.validate_and_get_values(root_of_trust))

def test_validate_and_get_values_invalid_elements(self):
att_privkey = ec.PrivateKey()
Expand Down
22 changes: 19 additions & 3 deletions middleware/tests/admin/test_certificate_v1_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,21 @@
import secp256k1 as ec

from unittest import TestCase
from admin.certificate import HSMCertificateElement
from unittest.mock import Mock
from admin.certificate import HSMCertificateRoot, HSMCertificateElement


class TestHSMCertificateRoot(TestCase):
def test_ok(self):
pubkey = ec.PrivateKey().pubkey
root = HSMCertificateRoot(pubkey.serialize(compressed=False).hex())
self.assertEqual(
pubkey.serialize(compressed=True),
root.get_pubkey().serialize(compressed=True))

def test_invalid_pubkey(self):
with self.assertRaises(ValueError):
HSMCertificateRoot("invalid-pubkey")


class TestHSMCertificateElement(TestCase):
Expand Down Expand Up @@ -100,6 +114,7 @@ def test_certificate_element_is_valid_ok(self):
privkey = ec.PrivateKey()
msg = 'aa' * 65
signature = privkey.ecdsa_serialize(privkey.ecdsa_sign(bytes.fromhex(msg))).hex()
mock_certifier = Mock(get_pubkey=lambda: privkey.pubkey)

element = HSMCertificateElement({
"name": "device",
Expand All @@ -113,7 +128,7 @@ def test_certificate_element_is_valid_ok(self):
"signature": signature,
"signed_by": "root"
}, element.to_dict())
self.assertTrue(element.is_valid(privkey.pubkey))
self.assertTrue(element.is_valid(mock_certifier))

def test_certificate_element_is_valid_with_tweak_ok(self):
privkey = ec.PrivateKey()
Expand All @@ -124,6 +139,7 @@ def test_certificate_element_is_valid_with_tweak_ok(self):
pubkey.serialize(compressed=False),
hashlib.sha256,
).digest()
mock_certifier = Mock(get_pubkey=lambda: pubkey)

tweak_privkey = ec.PrivateKey(privkey.tweak_add(tweak), raw=True)
msg = os.urandom(66).hex()
Expand All @@ -144,7 +160,7 @@ def test_certificate_element_is_valid_with_tweak_ok(self):
"signed_by": "root",
"tweak": raw_tweak
}, element.to_dict())
self.assertTrue(element.is_valid(pubkey))
self.assertTrue(element.is_valid(mock_certifier))

def test_certificate_element_is_valid_wrong_signature(self):
privkey = ec.PrivateKey()
Expand Down
12 changes: 12 additions & 0 deletions middleware/tests/admin/test_certificate_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ def test_parse_identity(self):
cert = HSMCertificateV2(TEST_CERTIFICATE)
self.assertEqual(TEST_CERTIFICATE, cert.to_dict())

def test_validate_and_get_values_value(self):
cert = HSMCertificateV2(TEST_CERTIFICATE)
self.assertEqual({
"quote": (
True,
"504f5748534d3a352e343a3a736778f36f7bc09aab50c0886a442b2d04b18186720bd"
"a7a753643066cd0bc0a4191800c4d091913d39750dc8975adbdd261bd10c1c2e110fa"
"a47cfbe30e740895552bbdcb3c17c7aee714cec8ad900341bfd987b452280220dcbd6"
"e7191f67ea4209b00000000000000000000000000000000",
None)
}, cert.validate_and_get_values('a-root-of-trust'))


class TestHSMCertificateV2Element(TestCase):
def test_from_dict_unknown_type(self):
Expand Down