Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Certificate V2 parser #230

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions middleware/admin/certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,10 @@
from .certificate_v2 import HSMCertificateV2, HSMCertificateV2ElementSGXQuote, \
HSMCertificateV2ElementSGXAttestationKey, \
HSMCertificateV2ElementX509


# Assign version mapping to the parent class
HSMCertificate.VERSION_MAPPING = {
1: HSMCertificate,
2: HSMCertificateV2,
}
28 changes: 16 additions & 12 deletions middleware/admin/certificate_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,24 @@ class HSMCertificate:
VERSION = 1 # Only supported version
ROOT_ELEMENT = "root"
ELEMENT_BASE_CLASS = HSMCertificateElement
ELEMENT_FACTORY = HSMCertificateElement

@staticmethod
def from_jsonfile(path):
@classmethod
def from_jsonfile(kls, path):
try:
with open(path, "r") as file:
certificate_map = json.loads(file.read())

if type(certificate_map) != dict:
raise ValueError(
"JSON file must contain an object as a top level element")
"Certificate file must contain an object as a top level element")

return HSMCertificate(certificate_map)
if certificate_map.get("version") not in kls.VERSION_MAPPING:
raise ValueError("Invalid or unsupported HSM certificate "
"version (supported versions are "
amendelzon marked this conversation as resolved.
Show resolved Hide resolved
f"{", ".join(kls.VERSION_MAPPING.keys())})")

return kls.VERSION_MAPPING[certificate_map["version"]](certificate_map)
except (ValueError, json.JSONDecodeError) as e:
raise ValueError('Unable to read HSM certificate from "%s": %s' %
(path, str(e)))
Expand Down Expand Up @@ -190,8 +196,8 @@ def validate_and_get_values(self, raw_root_pubkey_hex):

def add_element(self, element):
if not isinstance(element, self.ELEMENT_BASE_CLASS):
raise ValueError(
f"Expected an HSMCertificateElement but got a {type(element)}")
raise ValueError(f"Expected an {self.ELEMENT_BASE_CLASS.__name__} "
"but got a {type(element)}")
self._elements[element.name] = element

def clear_targets(self):
Expand All @@ -214,11 +220,9 @@ def save_to_jsonfile(self, path):
file.write("%s\n" % json.dumps(self.to_dict(), indent=2))

def _parse(self, certificate_map):
if "version" not in certificate_map or certificate_map["version"] != self.VERSION:
raise ValueError(
"Invalid or unsupported HSM certificate version "
f"(current version is {self.VERSION})"
)
if certificate_map.get("version") != self.VERSION:
raise ValueError("Invalid or unexpected HSM certificate version "
amendelzon marked this conversation as resolved.
Show resolved Hide resolved
f"(expected {self.VERSION})")

if "targets" not in certificate_map or type(certificate_map["targets"]) != list:
raise ValueError("Missing or invalid targets")
Expand All @@ -229,7 +233,7 @@ def _parse(self, certificate_map):
raise ValueError("Missing elements")

for item in certificate_map["elements"]:
element = HSMCertificateElement(item)
element = self.ELEMENT_FACTORY(item)
self._elements[item["name"]] = element

# Sanity: check each target has a path to the root authority
Expand Down
160 changes: 130 additions & 30 deletions middleware/admin/certificate_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,76 +20,176 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import base64
from .certificate_v1 import HSMCertificate
from .utils import is_nonempty_hex_string


class HSMCertificateV2Element:
pass
def __init__(self):
raise RuntimeError("Cannot instantiate an "
"abstract HSMCertificateV2Element")

@classmethod
def from_dict(kls, element_map):
if element_map.get("type") not in kls.TYPE_MAPPING:
raise ValueError("Invalid or missing element type for "
f"element {element_map.get("name")}")

return kls.TYPE_MAPPING[element_map["type"]](element_map)

def _init_with_map(self, element_map):
if "name" not in element_map:
raise ValueError("Missing name for HSM certificate element")

self._name = element_map["name"]

if "signed_by" not in element_map:
raise ValueError("Missing certifier for HSM certificate element")
self._signed_by = element_map["signed_by"]

@property
def name(self):
return self._name

@property
def signed_by(self):
return self._signed_by


class HSMCertificateV2ElementSGXQuote(HSMCertificateV2Element):
def __init__(self, name, message, custom_data, signature, signed_by):
self.name = name
self.message = message
self.custom_data = custom_data
self.signature = signature
self.signed_by = signed_by
def __init__(self, element_map):
self._init_with_map(element_map)

def _init_with_map(self, element_map):
super()._init_with_map(element_map)

if not is_nonempty_hex_string(element_map.get("message")):
raise ValueError(f"Invalid message for HSM certificate element {self.name}")
self._message = bytes.fromhex(element_map["message"])

if not is_nonempty_hex_string(element_map.get("custom_data")):
raise ValueError("Invalid custom data for HSM certificate "
f"element {self.name}")
self._custom_data = bytes.fromhex(element_map["custom_data"])

if not is_nonempty_hex_string(element_map.get("signature")):
raise ValueError("Invalid signature for HSM certificate element {self.name}")
self._signature = bytes.fromhex(element_map["signature"])

@property
def message(self):
return self._message.hex()

@property
def custom_data(self):
return self._custom_data.hex()

@property
def signature(self):
return self._signature.hex()

def to_dict(self):
return {
"name": self.name,
"type": "sgx_quote",
"message": self.message.hex(),
"custom_data": self.custom_data.hex(),
"signature": self.signature.hex(),
"message": self.message,
"custom_data": self.custom_data,
"signature": self.signature,
"signed_by": self.signed_by,
}


class HSMCertificateV2ElementSGXAttestationKey(HSMCertificateV2Element):
def __init__(self, name, message, key, auth_data, signature, signed_by):
self.name = name
self.message = message
self.key = key
self.auth_data = auth_data
self.signature = signature
self.signed_by = signed_by
def __init__(self, element_map):
self._init_with_map(element_map)

def _init_with_map(self, element_map):
super()._init_with_map(element_map)

if not is_nonempty_hex_string(element_map.get("message")):
raise ValueError(f"Invalid message for HSM certificate element {self.name}")
self._message = bytes.fromhex(element_map["message"])

if not is_nonempty_hex_string(element_map.get("key")):
raise ValueError(f"Invalid key for HSM certificate element {self.name}")
self._key = bytes.fromhex(element_map["key"])

if not is_nonempty_hex_string(element_map.get("auth_data")):
raise ValueError(f"Invalid auth data for HSM certificate element {self.name}")
self._auth_data = bytes.fromhex(element_map["auth_data"])

if not is_nonempty_hex_string(element_map.get("signature")):
raise ValueError(f"Invalid signature for HSM certificate element {self.name}")
self._signature = bytes.fromhex(element_map["signature"])

@property
def message(self):
return self._message.hex()

@property
def key(self):
return self._key.hex()

@property
def auth_data(self):
return self._auth_data.hex()

@property
def signature(self):
return self._signature.hex()

def to_dict(self):
return {
"name": self.name,
"type": "sgx_attestation_key",
"message": self.message.hex(),
"key": self.key.hex(),
"auth_data": self.auth_data.hex(),
"signature": self.signature.hex(),
"message": self.message,
"key": self.key,
"auth_data": self.auth_data,
"signature": self.signature,
"signed_by": self.signed_by,
}


class HSMCertificateV2ElementX509(HSMCertificateV2Element):
def __init__(self, name, message, signed_by):
self.name = name
self.message = message
self.signed_by = signed_by
def __init__(self, element_map):
self._init_with_map(element_map)

def _init_with_map(self, element_map):
super()._init_with_map(element_map)

try:
self._message = base64.b64decode(element_map.get("message"))
except Exception:
raise ValueError(f"Invalid message for HSM certificate element {self.name}")

@property
def message(self):
return base64.b64encode(self._message).decode("ASCII")

def to_dict(self):
return {
"name": self.name,
"type": "x509_pem",
"message": self.message.decode('ASCII'),
"message": self.message,
"signed_by": self.signed_by,
}


# Element type mappings
HSMCertificateV2Element.TYPE_MAPPING = {
"sgx_quote": HSMCertificateV2ElementSGXQuote,
"sgx_attestation_key": HSMCertificateV2ElementSGXAttestationKey,
"x509_pem": HSMCertificateV2ElementX509,
}


class HSMCertificateV2(HSMCertificate):
VERSION = 2
ROOT_ELEMENT = "sgx_root"
ELEMENT_BASE_CLASS = HSMCertificateV2Element
ELEMENT_FACTORY = HSMCertificateV2Element.from_dict

def validate_and_get_values(self, raw_root_pubkey_hex):
# TODO
pass

def _parse(self, certificate_map):
# TODO
pass
50 changes: 25 additions & 25 deletions middleware/admin/sgx_attestation.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,34 +98,34 @@ def do_attestation(options):
att_cert = HSMCertificateV2()

att_cert.add_element(
HSMCertificateV2ElementSGXQuote(
name="quote",
message=envelope.quote.get_raw_data(),
custom_data=envelope.custom_message,
signature=quote_signature,
signed_by="attestation",
))
HSMCertificateV2ElementSGXQuote({
"name": "quote",
"message": envelope.quote.get_raw_data().hex(),
"custom_data": envelope.custom_message.hex(),
"signature": quote_signature.hex(),
"signed_by": "attestation",
}))
att_cert.add_element(
HSMCertificateV2ElementSGXAttestationKey(
name="attestation",
message=envelope.quote_auth_data.qe_report_body.get_raw_data(),
key=att_key.to_string("uncompressed"),
auth_data=envelope.qe_auth_data.data,
signature=qe_rb_signature,
signed_by="quoting_enclave",
))
HSMCertificateV2ElementSGXAttestationKey({
"name": "attestation",
"message": envelope.quote_auth_data.qe_report_body.get_raw_data().hex(),
"key": att_key.to_string("uncompressed").hex(),
"auth_data": envelope.qe_auth_data.data.hex(),
"signature": qe_rb_signature.hex(),
"signed_by": "quoting_enclave",
}))
att_cert.add_element(
HSMCertificateV2ElementX509(
name="quoting_enclave",
message=envelope.qe_cert_data.certs[0],
signed_by="platform_ca",
))
HSMCertificateV2ElementX509({
"name": "quoting_enclave",
"message": envelope.qe_cert_data.certs[0],
"signed_by": "platform_ca",
}))
att_cert.add_element(
HSMCertificateV2ElementX509(
name="platform_ca",
message=envelope.qe_cert_data.certs[1],
signed_by="sgx_root",
))
HSMCertificateV2ElementX509({
"name": "platform_ca",
"message": envelope.qe_cert_data.certs[1],
"signed_by": "sgx_root",
}))

att_cert.add_target("quote")
att_cert.save_to_jsonfile(options.output_file_path)
Expand Down
4 changes: 2 additions & 2 deletions middleware/tests/admin/test_certificate_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from unittest import TestCase
from unittest.mock import call, patch, mock_open
from admin.certificate_v1 import HSMCertificate, HSMCertificateElement
from admin.certificate import HSMCertificate, HSMCertificateElement


class TestHSMCertificate(TestCase):
Expand Down Expand Up @@ -155,7 +155,7 @@ def test_create_certificate_missing_elements(self):
"targets": ["attestation", "device"]
})

@patch('admin.certificate_v1.HSMCertificateElement')
@patch('admin.certificate_v1.HSMCertificate.ELEMENT_FACTORY')
def test_create_certificate_invalid_element(self, certElementMock):
certElementMock.side_effect = ValueError()
with self.assertRaises(ValueError):
Expand Down
Loading