Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract methods added to Key for signing scheme dissection #837

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 7 additions & 24 deletions securesystemslib/signer/_azure_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import securesystemslib.hash as sslib_hash
from securesystemslib.exceptions import UnsupportedLibraryError
from securesystemslib.signer._key import Key, SSlibKey
from securesystemslib.signer._key import SSlibKey
from securesystemslib.signer._signer import SecretsHandler, Signature, Signer
from securesystemslib.signer._utils import compute_default_keyid

Expand Down Expand Up @@ -65,7 +65,7 @@ class AzureSigner(Signer):

SCHEME = "azurekms"

def __init__(self, az_key_uri: str, public_key: Key):
def __init__(self, az_key_uri: str, public_key: SSlibKey):
if AZURE_IMPORT_ERROR:
raise UnsupportedLibraryError(AZURE_IMPORT_ERROR)

Expand All @@ -78,14 +78,14 @@ def __init__(self, az_key_uri: str, public_key: Key):
self.signature_algorithm = self._get_signature_algorithm(
public_key,
)
self.hash_algorithm = self._get_hash_algorithm(public_key)
self.hash_algorithm = public_key.get_hash_algorithm_str()
except UnsupportedKeyType as e:
logger.info("Key %s has unsupported key type or unsupported elliptic curve")
raise e
self._public_key = public_key

@property
def public_key(self) -> Key:
def public_key(self) -> SSlibKey:
return self._public_key

@staticmethod
Expand Down Expand Up @@ -128,7 +128,7 @@ def _create_crypto_client(
raise e

@staticmethod
def _get_signature_algorithm(public_key: Key) -> "SignatureAlgorithm":
def _get_signature_algorithm(public_key: SSlibKey) -> "SignatureAlgorithm":
"""Return SignatureAlgorithm after parsing the public key"""
if public_key.keytype != "ecdsa":
logger.info("only EC keys are supported for now")
Expand All @@ -147,23 +147,6 @@ def _get_signature_algorithm(public_key: Key) -> "SignatureAlgorithm":

raise UnsupportedKeyType("Unsupported curve supplied by key")

@staticmethod
def _get_hash_algorithm(public_key: "Key") -> str:
"""Return the hash algorithm used by the public key"""
# Format is "ecdsa-sha2-nistp256"
comps = public_key.scheme.split("-")
if len(comps) != 3: # noqa: PLR2004
raise UnsupportedKeyType("Invalid scheme found")

if comps[2] == "nistp256":
return "sha256"
if comps[2] == "nistp384":
return "sha384"
if comps[2] == "nistp521":
return "sha512"

raise UnsupportedKeyType("Unsupported curve supplied by key")

@staticmethod
def _get_keytype_and_scheme(crv: str) -> Tuple[str, str]:
if crv == KeyCurveName.p_256:
Expand All @@ -179,7 +162,7 @@ def _get_keytype_and_scheme(crv: str) -> Tuple[str, str]:
def from_priv_key_uri(
cls,
priv_key_uri: str,
public_key: Key,
public_key: SSlibKey,
secrets_handler: Optional[SecretsHandler] = None,
) -> "AzureSigner":
uri = parse.urlparse(priv_key_uri)
Expand All @@ -191,7 +174,7 @@ def from_priv_key_uri(
return cls(az_key_uri, public_key)

@classmethod
def import_(cls, az_vault_name: str, az_key_name: str) -> Tuple[str, Key]:
def import_(cls, az_vault_name: str, az_key_name: str) -> Tuple[str, SSlibKey]:
"""Load key and signer details from KMS

Returns the private key uri and the public key. This method should only
Expand Down
33 changes: 3 additions & 30 deletions securesystemslib/signer/_crypto_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from securesystemslib.signer._signature import Signature
from securesystemslib.signer._signer import SecretsHandler, Signer

# ruff: noqa: F401
CRYPTO_IMPORT_ERROR = None
try:
from cryptography.hazmat.primitives.asymmetric.ec import (
Expand Down Expand Up @@ -77,33 +78,6 @@ class _NoSignArgs:
_ECDSA_KEYTYPES = ["ecdsa", "ecdsa-sha2-nistp256"]


def _get_hash_algorithm(name: str) -> "HashAlgorithm":
"""Helper to return hash algorithm for name."""
algorithm: HashAlgorithm
if name == "sha224":
algorithm = SHA224()
if name == "sha256":
algorithm = SHA256()
if name == "sha384":
algorithm = SHA384()
if name == "sha512":
algorithm = SHA512()

return algorithm


def _get_rsa_padding(name: str, hash_algorithm: "HashAlgorithm") -> "AsymmetricPadding":
"""Helper to return rsa signature padding for name."""
padding: AsymmetricPadding
if name == "pss":
padding = PSS(mgf=MGF1(hash_algorithm), salt_length=PSS.DIGEST_LENGTH)

if name == "pkcs1v15":
padding = PKCS1v15()

return padding


class CryptoSigner(Signer):
"""PYCA/cryptography Signer implementations.

Expand Down Expand Up @@ -155,9 +129,8 @@ def __init__(
if not isinstance(private_key, RSAPrivateKey):
raise ValueError(f"invalid rsa key: {type(private_key)}")

padding_name, hash_name = public_key.scheme.split("-")[1:]
hash_algo = _get_hash_algorithm(hash_name)
padding = _get_rsa_padding(padding_name, hash_algo)
hash_algo = public_key.get_hash_algorithm()
padding = public_key.get_padding_name(hash_algo, PSS.DIGEST_LENGTH)
self._sign_args = _RSASignArgs(padding, hash_algo)
self._private_key = private_key

Expand Down
39 changes: 6 additions & 33 deletions securesystemslib/signer/_gcp_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import securesystemslib.hash as sslib_hash
from securesystemslib import exceptions
from securesystemslib.signer._key import Key, SSlibKey
from securesystemslib.signer._key import SSlibKey
from securesystemslib.signer._signer import SecretsHandler, Signature, Signer
from securesystemslib.signer._utils import compute_default_keyid

Expand Down Expand Up @@ -55,24 +55,24 @@ class GCPSigner(Signer):

SCHEME = "gcpkms"

def __init__(self, gcp_keyid: str, public_key: Key):
def __init__(self, gcp_keyid: str, public_key: SSlibKey):
if GCP_IMPORT_ERROR:
raise exceptions.UnsupportedLibraryError(GCP_IMPORT_ERROR)

self.hash_algorithm = self._get_hash_algorithm(public_key)
self.hash_algorithm = public_key.get_hash_algorithm_str()
self.gcp_keyid = gcp_keyid
self._public_key = public_key
self.client = kms.KeyManagementServiceClient()

@property
def public_key(self) -> Key:
def public_key(self) -> SSlibKey:
return self._public_key

@classmethod
def from_priv_key_uri(
cls,
priv_key_uri: str,
public_key: Key,
public_key: SSlibKey,
secrets_handler: Optional[SecretsHandler] = None,
) -> "GCPSigner":
uri = parse.urlparse(priv_key_uri)
Expand All @@ -83,7 +83,7 @@ def from_priv_key_uri(
return cls(uri.path, public_key)

@classmethod
def import_(cls, gcp_keyid: str) -> Tuple[str, Key]:
def import_(cls, gcp_keyid: str) -> Tuple[str, SSlibKey]:
"""Load key and signer details from KMS

Returns the private key uri and the public key. This method should only
Expand Down Expand Up @@ -155,33 +155,6 @@ def _get_keytype_and_scheme(algorithm: int) -> Tuple[str, str]:
}
return keytypes_and_schemes[algorithm]

@staticmethod
def _get_hash_algorithm(public_key: Key) -> str:
"""Helper function to return payload hash algorithm used for this key"""

# TODO: This could be a public abstract method on Key so that GCPSigner
# would not be tied to a specific Key implementation -- not all keys
# have a pre hash algorithm though.
if public_key.keytype == "rsa":
# hash algorithm is encoded as last scheme portion
algo = public_key.scheme.split("-")[-1]
elif public_key.keytype in [
"ecdsa",
"ecdsa-sha2-nistp256",
"ecdsa-sha2-nistp384",
]:
# nistp256 uses sha-256, nistp384 uses sha-384
bits = public_key.scheme.split("-nistp")[-1]
algo = f"sha{bits}"
else:
raise exceptions.UnsupportedAlgorithmError(
f"Unsupported key type {public_key.keytype} in key {public_key.keyid}"
)

# trigger UnsupportedAlgorithm if appropriate
_ = sslib_hash.digest(algo)
return algo

def sign(self, payload: bytes) -> Signature:
"""Signs payload with Google Cloud KMS.

Expand Down
12 changes: 12 additions & 0 deletions securesystemslib/signer/_gpg_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ def verify_signature(self, signature: Signature, data: bytes) -> None:
f"Unknown failure to verify signature by {self.keyid}"
) from e

def get_hash_algorithm_str(self) -> None:
raise NotImplementedError

def get_hash_algorithm(self) -> None:
raise NotImplementedError

def get_padding_name_str(self) -> None:
raise NotImplementedError

def get_padding_name(self, hash_algorithm: None, salt_length: None) -> None:
raise NotImplementedError
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These stubs are no longer needed, right?



class GPGSigner(Signer):
"""OpenPGP Signer
Expand Down
113 changes: 81 additions & 32 deletions securesystemslib/signer/_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from abc import ABCMeta, abstractmethod
from typing import Any, Dict, Optional, Tuple, Type, cast

import securesystemslib.hash as sslib_hash
from securesystemslib._vendor.ed25519.ed25519 import (
SignatureMismatch,
checkvalid,
)
from securesystemslib.exceptions import (
UnsupportedAlgorithmError,
UnsupportedLibraryError,
UnverifiedSignatureError,
VerificationError,
Expand Down Expand Up @@ -56,6 +58,11 @@

logger = logging.getLogger(__name__)


class UnsupportedKeyType(Exception): # noqa: N818
pass
Comment on lines +62 to +63
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be marked private (_UnsupportedKeyType) since I don't think we intend to leak it outside... but that's a nit



# NOTE Key dispatch table is defined here so it's usable by Key,
# but is populated in __init__.py (and can be appended by users).
KEY_FOR_TYPE_AND_SCHEME: Dict[Tuple[str, str], Type] = {}
Expand Down Expand Up @@ -301,35 +308,6 @@ def from_crypto(

return SSlibKey(keyid, keytype, scheme, keyval)

@staticmethod
def _get_hash_algorithm(name: str) -> "HashAlgorithm":
"""Helper to return hash algorithm for name."""
algorithm: HashAlgorithm
if name == "sha224":
algorithm = SHA224()
if name == "sha256":
algorithm = SHA256()
if name == "sha384":
algorithm = SHA384()
if name == "sha512":
algorithm = SHA512()

return algorithm

@staticmethod
def _get_rsa_padding(
name: str, hash_algorithm: "HashAlgorithm"
) -> "AsymmetricPadding":
"""Helper to return rsa signature padding for name."""
padding: AsymmetricPadding
if name == "pss":
padding = PSS(mgf=MGF1(hash_algorithm), salt_length=PSS.AUTO)

if name == "pkcs1v15":
padding = PKCS1v15()

return padding

def _verify_ed25519_fallback(self, signature: bytes, data: bytes) -> None:
"""Helper to verify ed25519 sig if pyca/cryptography is unavailable."""
try:
Expand Down Expand Up @@ -364,9 +342,8 @@ def _validate_curve(key, curve):
]:
key = cast(RSAPublicKey, self._crypto_key())
_validate_type(key, RSAPublicKey)
padding_name, hash_name = self.scheme.split("-")[1:]
hash_algorithm = self._get_hash_algorithm(hash_name)
padding = self._get_rsa_padding(padding_name, hash_algorithm)
hash_algorithm = self.get_hash_algorithm()
padding = self.get_padding_name(hash_algorithm, PSS.AUTO)
key.verify(signature, data, padding, hash_algorithm)

elif (
Expand Down Expand Up @@ -426,3 +403,75 @@ def verify_signature(self, signature: Signature, data: bytes) -> None:
raise VerificationError(
f"Unknown failure to verify signature by {self.keyid}"
) from e

def get_hash_algorithm_str(self) -> str:
"""Returns the hash algorithm from the key scheme as a string."""
# key scheme should always be of format xxx-xxx-xxx
comps = self.scheme.split("-")
if len(comps) != 3: # noqa: PLR2004
raise UnsupportedKeyType("Invalid scheme found")

if self.keytype == "rsa":
# hash algorithm is encoded as last scheme portion
hash_algo = self.scheme.split("-")[-1]
elif self.keytype in [
"ecdsa",
"ecdsa-sha2-nistp256",
"ecdsa-sha2-nistp384",
]:
# nistp256 uses sha-256, nistp384 uses sha-384
bits = self.scheme.split("-nistp")[-1]
hash_algo = f"sha{bits}"
else:
raise UnsupportedAlgorithmError(
f"Unsupported key type {self.keytype} in key {self.keyid}"
)

# trigger UnsupportedAlgorithm if appropriate
_ = sslib_hash.digest(hash_algo)

return hash_algo

def get_hash_algorithm(self) -> "HashAlgorithm":
"""Returns the hash algorithm from the key scheme as a HashAlgorithm"""
name = self.get_hash_algorithm_str()
algorithm: HashAlgorithm
if name == "sha224":
algorithm = SHA224()
if name == "sha256":
algorithm = SHA256()
if name == "sha384":
algorithm = SHA384()
if name == "sha512":
algorithm = SHA512()

return algorithm

def get_padding_name_str(self) -> str:
"""Returns the padding name from the key scheme as a string"""
padding_name = self.scheme.split("-")[1]
return padding_name

def get_padding_name(
self, hash_algorithm: "HashAlgorithm", salt_length: Any
) -> "AsymmetricPadding":
"""Returns the padding name from the key scheme as a AsymmetricPadding

Args:
hash_algorithm: the hash algorithm used as a HashAlgorithm
object, only for PSS.
selt_length: the salt length to use for PSS.
PSS.AUTO or PSS.DIGEST_LENGTH

Returns:
AsymmetricPadding

"""
name = self.get_padding_name_str()
padding: AsymmetricPadding
if name == "pss":
padding = PSS(mgf=MGF1(hash_algorithm), salt_length=salt_length)
if name == "pkcs1v15":
padding = PKCS1v15()

return padding
Loading