Skip to content

Commit

Permalink
Use __future.annotations module
Browse files Browse the repository at this point in the history
This allows us to use even python 3.10 annotation features
already.
* start using "X | None" instead of Optional[X]
* Drop the quotes from factory function return values

This also keeps us compatible with python 3.8 for now.

Signed-off-by: Jussi Kukkonen <[email protected]>
  • Loading branch information
jku committed Nov 29, 2024
1 parent d84c1fc commit c6fe086
Show file tree
Hide file tree
Showing 15 changed files with 99 additions and 77 deletions.
5 changes: 3 additions & 2 deletions securesystemslib/_gpg/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@
handling
"""

from __future__ import annotations

import functools
import logging
import os
import shlex
import subprocess
from typing import Optional

log = logging.getLogger(__name__)

GPG_TIMEOUT = 10


@functools.lru_cache(maxsize=3)
def is_available_gnupg(gnupg: str, timeout: Optional[int] = None) -> bool:
def is_available_gnupg(gnupg: str, timeout: int | None = None) -> bool:
"""Returns whether gnupg points to a gpg binary."""
if timeout is None:
timeout = GPG_TIMEOUT
Expand Down
4 changes: 3 additions & 1 deletion securesystemslib/dsse.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Dead Simple Signing Envelope"""

from __future__ import annotations

import logging
from typing import Any

Expand Down Expand Up @@ -41,7 +43,7 @@ def __eq__(self, other: Any) -> bool:
)

@classmethod
def from_dict(cls, data: dict) -> "Envelope":
def from_dict(cls, data: dict) -> Envelope:
"""Creates a DSSE Envelope from its JSON/dict representation.
Arguments:
Expand Down
11 changes: 6 additions & 5 deletions securesystemslib/signer/_aws_signer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Signer implementation for AWS Key Management Service"""

from __future__ import annotations

import logging
from typing import Optional
from urllib import parse

from securesystemslib.exceptions import (
Expand Down Expand Up @@ -91,8 +92,8 @@ def from_priv_key_uri(
cls,
priv_key_uri: str,
public_key: Key,
secrets_handler: Optional[SecretsHandler] = None,
) -> "AWSSigner":
secrets_handler: SecretsHandler | None = None,
) -> AWSSigner:
uri = parse.urlparse(priv_key_uri)

if uri.scheme != cls.SCHEME:
Expand All @@ -101,7 +102,7 @@ def from_priv_key_uri(
return cls(uri.path, public_key)

@classmethod
def _get_default_scheme(cls, supported_by_key: list[str]) -> Optional[str]:
def _get_default_scheme(cls, supported_by_key: list[str]) -> str | None:
# Iterate over supported AWS algorithms, pick the **first** that is also
# supported by the key, and return the related securesystemslib scheme.
for scheme, algo in cls.aws_algos.items():
Expand All @@ -119,7 +120,7 @@ def _get_keytype_for_scheme(scheme: str) -> str:

@classmethod
def import_(
cls, aws_key_id: str, local_scheme: Optional[str] = None
cls, aws_key_id: str, local_scheme: str | None = None
) -> tuple[str, Key]:
"""Loads a key and signer details from AWS KMS.
Expand Down
21 changes: 11 additions & 10 deletions securesystemslib/signer/_azure_signer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Signer implementation for Azure Key Vault"""

from __future__ import annotations

import logging
from typing import Optional
from urllib import parse

import securesystemslib.hash as sslib_hash
Expand Down Expand Up @@ -90,10 +91,10 @@ def public_key(self) -> Key:

@staticmethod
def _get_key_vault_key(
cred: "DefaultAzureCredential",
cred: DefaultAzureCredential,
vault_name: str,
key_name: str,
) -> "KeyVaultKey":
) -> KeyVaultKey:
"""Return KeyVaultKey created from the Vault name and key name"""
vault_url = f"https://{vault_name}.vault.azure.net/"

Expand All @@ -112,9 +113,9 @@ def _get_key_vault_key(

@staticmethod
def _create_crypto_client(
cred: "DefaultAzureCredential",
kv_key: "KeyVaultKey",
) -> "CryptographyClient":
cred: DefaultAzureCredential,
kv_key: KeyVaultKey,
) -> CryptographyClient:
"""Return CryptographyClient created Azure credentials and a KeyVaultKey"""
try:
return CryptographyClient(kv_key, credential=cred)
Expand All @@ -128,7 +129,7 @@ def _create_crypto_client(
raise e

@staticmethod
def _get_signature_algorithm(public_key: Key) -> "SignatureAlgorithm":
def _get_signature_algorithm(public_key: Key) -> SignatureAlgorithm:
"""Return SignatureAlgorithm after parsing the public key"""
if public_key.keytype != "ecdsa":
logger.info("only EC keys are supported for now")
Expand All @@ -148,7 +149,7 @@ def _get_signature_algorithm(public_key: Key) -> "SignatureAlgorithm":
raise UnsupportedKeyType("Unsupported curve supplied by key")

@staticmethod
def _get_hash_algorithm(public_key: "Key") -> str:
def _get_hash_algorithm(public_key: Key) -> str:
"""Return the hash algorithm used by the public key"""
# Format is "ecdsa-sha2-nistp256"
comps = public_key.scheme.split("-")
Expand Down Expand Up @@ -180,8 +181,8 @@ def from_priv_key_uri(
cls,
priv_key_uri: str,
public_key: Key,
secrets_handler: Optional[SecretsHandler] = None,
) -> "AzureSigner":
secrets_handler: SecretsHandler | None = None,
) -> AzureSigner:
uri = parse.urlparse(priv_key_uri)

if uri.scheme != cls.SCHEME:
Expand Down
7 changes: 4 additions & 3 deletions securesystemslib/signer/_gcp_signer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Signer implementation for Google Cloud KMS"""

from __future__ import annotations

import logging
from typing import Optional
from urllib import parse

import securesystemslib.hash as sslib_hash
Expand Down Expand Up @@ -73,8 +74,8 @@ def from_priv_key_uri(
cls,
priv_key_uri: str,
public_key: Key,
secrets_handler: Optional[SecretsHandler] = None,
) -> "GCPSigner":
secrets_handler: SecretsHandler | None = None,
) -> GCPSigner:
uri = parse.urlparse(priv_key_uri)

if uri.scheme != cls.SCHEME:
Expand Down
14 changes: 8 additions & 6 deletions securesystemslib/signer/_gpg_signer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Signer implementation for OpenPGP"""

from __future__ import annotations

import logging
from typing import Any, Optional
from typing import Any
from urllib import parse

from securesystemslib import exceptions
Expand Down Expand Up @@ -32,7 +34,7 @@ class GPGKey(Key):
"""

@classmethod
def from_dict(cls, keyid: str, key_dict: dict[str, Any]) -> "GPGKey":
def from_dict(cls, keyid: str, key_dict: dict[str, Any]) -> GPGKey:
keytype, scheme, keyval = cls._from_dict(key_dict)
return cls(keyid, keytype, scheme, keyval, key_dict)

Expand Down Expand Up @@ -84,7 +86,7 @@ class GPGSigner(Signer):
def __init__(
self,
public_key: Key,
homedir: Optional[str] = None,
homedir: str | None = None,
):
self.homedir = homedir
self._public_key = public_key
Expand All @@ -98,8 +100,8 @@ def from_priv_key_uri(
cls,
priv_key_uri: str,
public_key: Key,
secrets_handler: Optional[SecretsHandler] = None,
) -> "GPGSigner":
secrets_handler: SecretsHandler | None = None,
) -> GPGSigner:
if not isinstance(public_key, GPGKey):
raise ValueError(f"expected GPGKey for {priv_key_uri}")

Expand Down Expand Up @@ -147,7 +149,7 @@ def _key_from_legacy_dict(key_dict: dict[str, Any]) -> GPGKey:
return GPGKey(keyid, keytype, scheme, keyval)

@classmethod
def import_(cls, keyid: str, homedir: Optional[str] = None) -> tuple[str, Key]:
def import_(cls, keyid: str, homedir: str | None = None) -> tuple[str, Key]:
"""Load key and signer details from GnuPG keyring.
NOTE: Information about the key validity (expiration, revocation, etc.)
Expand Down
21 changes: 11 additions & 10 deletions securesystemslib/signer/_hsm_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
"""

from __future__ import annotations

import binascii
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Optional
from urllib import parse

from securesystemslib.exceptions import UnsupportedLibraryError
Expand Down Expand Up @@ -184,7 +185,7 @@ def _find_pkcs_slot(filters: dict[str, str]) -> int:

@staticmethod
@contextmanager
def _get_session(filters: dict[str, str]) -> Iterator["PyKCS11.Session"]:
def _get_session(filters: dict[str, str]) -> Iterator[PyKCS11.Session]:
"""Context manager to handle a HSM session.
The cryptographic token is selected by filtering by token info fields.
Expand All @@ -201,9 +202,9 @@ def _get_session(filters: dict[str, str]) -> Iterator["PyKCS11.Session"]:
@classmethod
def _find_key(
cls,
session: "PyKCS11.Session",
session: PyKCS11.Session,
keyid: int,
key_type: Optional[int] = None,
key_type: int | None = None,
) -> int:
"""Find ecdsa key on HSM."""
if key_type is None:
Expand All @@ -226,8 +227,8 @@ def _find_key(

@classmethod
def _find_key_values(
cls, session: "PyKCS11.Session", keyid: int
) -> tuple["ECDomainParameters", bytes]:
cls, session: PyKCS11.Session, keyid: int
) -> tuple[ECDomainParameters, bytes]:
"""Find ecdsa public key values on HSM."""
key = cls._find_key(session, keyid)
params, point = session.getAttributeValue(
Expand Down Expand Up @@ -261,8 +262,8 @@ def _build_token_filter(cls) -> dict[str, str]:
@classmethod
def import_(
cls,
hsm_keyid: Optional[int] = None,
token_filter: Optional[dict[str, str]] = None,
hsm_keyid: int | None = None,
token_filter: dict[str, str] | None = None,
) -> tuple[str, SSlibKey]:
"""Import public key and signer details from HSM.
Expand Down Expand Up @@ -337,8 +338,8 @@ def from_priv_key_uri(
cls,
priv_key_uri: str,
public_key: Key,
secrets_handler: Optional[SecretsHandler] = None,
) -> "HSMSigner":
secrets_handler: SecretsHandler | None = None,
) -> HSMSigner:
if not isinstance(public_key, SSlibKey):
raise ValueError(f"expected SSlibKey for {priv_key_uri}")

Expand Down
30 changes: 15 additions & 15 deletions securesystemslib/signer/_key.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""Key interface and the default implementations"""

from __future__ import annotations

import logging
from abc import ABCMeta, abstractmethod
from typing import Any, Optional, cast
from typing import Any, cast

from securesystemslib._vendor.ed25519.ed25519 import (
SignatureMismatch,
Expand Down Expand Up @@ -94,7 +96,7 @@ def __init__(
keytype: str,
scheme: str,
keyval: dict[str, Any],
unrecognized_fields: Optional[dict[str, Any]] = None,
unrecognized_fields: dict[str, Any] | None = None,
):
if not all(
isinstance(at, str) for at in [keyid, keytype, scheme]
Expand Down Expand Up @@ -124,7 +126,7 @@ def __eq__(self, other: Any) -> bool:

@classmethod
@abstractmethod
def from_dict(cls, keyid: str, key_dict: dict[str, Any]) -> "Key":
def from_dict(cls, keyid: str, key_dict: dict[str, Any]) -> Key:
"""Creates ``Key`` object from a serialization dict
Key implementations must override this factory constructor that is used
Expand Down Expand Up @@ -207,14 +209,14 @@ def __init__(
keytype: str,
scheme: str,
keyval: dict[str, Any],
unrecognized_fields: Optional[dict[str, Any]] = None,
unrecognized_fields: dict[str, Any] | None = None,
):
if "public" not in keyval or not isinstance(keyval["public"], str):
raise ValueError(f"public key string required for scheme {scheme}")
super().__init__(keyid, keytype, scheme, keyval, unrecognized_fields)

@classmethod
def from_dict(cls, keyid: str, key_dict: dict[str, Any]) -> "SSlibKey":
def from_dict(cls, keyid: str, key_dict: dict[str, Any]) -> SSlibKey:
keytype, scheme, keyval = cls._from_dict(key_dict)

# All fields left in the key_dict are unrecognized.
Expand All @@ -223,13 +225,13 @@ def from_dict(cls, keyid: str, key_dict: dict[str, Any]) -> "SSlibKey":
def to_dict(self) -> dict[str, Any]:
return self._to_dict()

def _crypto_key(self) -> "PublicKeyTypes":
def _crypto_key(self) -> PublicKeyTypes:
"""Helper to get a `cryptography` public key for this SSlibKey."""
public_bytes = self.keyval["public"].encode("utf-8")
return load_pem_public_key(public_bytes)

@staticmethod
def _from_crypto(public_key: "PublicKeyTypes") -> tuple[str, str, str]:
def _from_crypto(public_key: PublicKeyTypes) -> tuple[str, str, str]:
"""Return tuple of keytype, default scheme and serialized public key
value for the passed public key.
Expand Down Expand Up @@ -269,10 +271,10 @@ def _pem() -> str:
@classmethod
def from_crypto(
cls,
public_key: "PublicKeyTypes",
keyid: Optional[str] = None,
scheme: Optional[str] = None,
) -> "SSlibKey":
public_key: PublicKeyTypes,
keyid: str | None = None,
scheme: str | None = None,
) -> SSlibKey:
"""Create SSlibKey from pyca/cryptography public key.
Args:
Expand Down Expand Up @@ -306,7 +308,7 @@ def from_crypto(
return SSlibKey(keyid, keytype, scheme, keyval)

@staticmethod
def _get_hash_algorithm(name: str) -> "HashAlgorithm":
def _get_hash_algorithm(name: str) -> HashAlgorithm:
"""Helper to return hash algorithm for name."""
algorithm: HashAlgorithm
if name == "sha224":
Expand All @@ -321,9 +323,7 @@ def _get_hash_algorithm(name: str) -> "HashAlgorithm":
return algorithm

@staticmethod
def _get_rsa_padding(
name: str, hash_algorithm: "HashAlgorithm"
) -> "AsymmetricPadding":
def _get_rsa_padding(name: str, hash_algorithm: HashAlgorithm) -> AsymmetricPadding:
"""Helper to return rsa signature padding for name."""
padding: AsymmetricPadding
if name == "pss":
Expand Down
Loading

0 comments on commit c6fe086

Please sign in to comment.