From 992baa69b84e9bc3dd14dbf5bbaebc72a28dffa8 Mon Sep 17 00:00:00 2001 From: Chris Wesseling Date: Thu, 13 Oct 2022 17:05:13 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8[#4]=20Add=20utils.check=5Fpem?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Uses pyOpenSSL primitives to approximately check the validity of a chain. And instead of using pem's in the repo that will expire, it creates temporary fixture certificates. Someday there might be a correct abstraction in the cryptography library. See: https://github.com/pyca/cryptography/issues/6229 https://github.com/pyca/cryptography/issues/2381 --- setup.cfg | 2 + simple_certmanager/utils.py | 54 +++++++++++ tests/conftest.py | 181 ++++++++++++++++++++++++++++++++++++ tests/test_utils.py | 17 ++++ 4 files changed, 254 insertions(+) create mode 100644 tests/test_utils.py diff --git a/setup.cfg b/setup.cfg index 19f547b..3c0d3d1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -38,6 +38,8 @@ install_requires = django-choices django-privates pyopenssl + cryptograpy + certifi tests_require = pytest pytest-django diff --git a/simple_certmanager/utils.py b/simple_certmanager/utils.py index 64f1f01..2147296 100644 --- a/simple_certmanager/utils.py +++ b/simple_certmanager/utils.py @@ -1,6 +1,60 @@ +from os import PathLike +from typing import Generator, Optional, Union + +import certifi +from cryptography import x509 +from OpenSSL import crypto + + def pretty_print_certificate_components(x509name) -> str: components = [ (label.decode("utf-8"), value.decode("utf-8")) for (label, value) in x509name.get_components() ] return ", ".join([f"{label}: {value}" for (label, value) in components]) + + +def split_pem(pem: bytes) -> Generator[bytes, None, None]: + "Split a concatenated pem into its constituent parts" + mark = b"-----END CERTIFICATE-----" + if mark not in pem: + return + end = pem.find(mark) + len(mark) + yield pem[:end] + yield from split_pem(pem[end:]) + + +def load_pem_chain(pem: bytes) -> Generator[x509.Certificate, None, None]: + for data in split_pem(pem): + yield x509.load_pem_x509_certificate(data) + + +def check_pem( + pem: bytes, + ca: Union[bytes, str, PathLike] = certifi.where(), + ca_path: Optional[Union[str, PathLike]] = None, +) -> bool: + """Simple (possibly incomplete) sanity check on pem chain. + + If the pam passes this check it MAY be valid for use. This is only intended + to catch blatant misconfigurations early. This gives NO guarantees on + security nor authenticity. + """ + # We need still need to use pyOpenSSL primitives for this: + # https://github.com/pyca/cryptography/issues/6229 + # https://github.com/pyca/cryptography/issues/2381 + + # Establish roots + store = crypto.X509Store() + store.load_locations(ca, ca_path) + + leaf, *chain = map(crypto.X509.from_cryptography, load_pem_chain(pem)) + + # Create a context + ctx = crypto.X509StoreContext(store, leaf, chain) + try: + ctx.verify_certificate() + except crypto.X509StoreContextError: + return False + else: + return True diff --git a/tests/conftest.py b/tests/conftest.py index e69de29..e3d5559 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -0,0 +1,181 @@ +import datetime +from pathlib import Path + +import pytest +from cryptography import x509 +from cryptography.hazmat.primitives import asymmetric, hashes, serialization + + +@pytest.fixture(scope="session") +def root_key() -> asymmetric.rsa.RSAPrivateKey: + "RSA key for the RootCA" + key = gen_key() + # with (Path(__file__).parent / "data" / "test.key").open("rb") as f: + # return serialization.load_pem_private_key(f.read(), password=None) + return key + + +@pytest.fixture(scope="session") +def root_cert(root_key) -> x509.Certificate: + "Certificate for the RootCA" + return mkcert( + x509.Name( + [ + x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "NL"), + x509.NameAttribute(x509.oid.NameOID.STATE_OR_PROVINCE_NAME, "NH"), + x509.NameAttribute(x509.oid.NameOID.LOCALITY_NAME, "Amsterdam"), + x509.NameAttribute(x509.oid.NameOID.ORGANIZATION_NAME, "Root CA"), + x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "rootca.example.org"), + ] + ), + root_key, + ) + + +@pytest.fixture +def leaf_pem(root_cert: x509.Certificate, root_key) -> bytes: + "A valid pem encoded certificate directly issued by the Root CA" + leaf_cert = mkcert( + subject=x509.Name( + [ + x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "NL"), + x509.NameAttribute( + x509.oid.NameOID.STATE_OR_PROVINCE_NAME, "Some-State" + ), + x509.NameAttribute( + x509.oid.NameOID.ORGANIZATION_NAME, "Internet Widgits Pty Ltd" + ), + x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "widgits.example.org"), + ] + ), + subject_key=gen_key(), + issuer=root_cert.subject, + issuer_key=root_key, + can_issue=False, + ) + return to_pem(leaf_cert) + + +@pytest.fixture +def chain_pem(root_cert: x509.Certificate, root_key) -> bytes: + "A valid pem encoded full certificate chain" + inter_key = gen_key() + intermediate_cert = mkcert( + subject=x509.Name( + [ + x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "NL"), + x509.NameAttribute( + x509.oid.NameOID.ORGANIZATION_NAME, "Men in the Middle Ltd" + ), + x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "mitm.example.org"), + ] + ), + subject_key=inter_key, + issuer=root_cert.subject, + issuer_key=root_key, + can_issue=True, + ) + leaf_cert = mkcert( + subject=x509.Name( + [ + x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "NL"), + x509.NameAttribute( + x509.oid.NameOID.STATE_OR_PROVINCE_NAME, "Some-State" + ), + x509.NameAttribute( + x509.oid.NameOID.ORGANIZATION_NAME, "Internet Widgits Pty Ltd" + ), + x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "widgits.example.org"), + ] + ), + subject_key=gen_key(), + issuer=intermediate_cert.subject, + issuer_key=inter_key, + can_issue=False, + ) + return b"".join(map(to_pem, [leaf_cert, intermediate_cert])) + + +@pytest.fixture +def broken_chain_pem(root_cert: x509.Certificate, root_key): + """An invalid pem encoded full certificate chain. + + The intermediate is no a valid issuer. + """ + inter_key = gen_key() + intermediate_cert = mkcert( + subject=x509.Name( + [ + x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "NL"), + x509.NameAttribute( + x509.oid.NameOID.ORGANIZATION_NAME, "Men in the Middle Ltd" + ), + x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "mitm.example.org"), + ] + ), + subject_key=inter_key, + issuer=root_cert.subject, + issuer_key=root_key, + can_issue=False, # Middle isn't allowed to issue certs. + ) + leaf_cert = mkcert( + subject=x509.Name( + [ + x509.NameAttribute(x509.oid.NameOID.COUNTRY_NAME, "NL"), + x509.NameAttribute( + x509.oid.NameOID.STATE_OR_PROVINCE_NAME, "Some-State" + ), + x509.NameAttribute( + x509.oid.NameOID.ORGANIZATION_NAME, "Internet Widgits Pty Ltd" + ), + x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "widgits.example.org"), + ] + ), + subject_key=gen_key(), + issuer=intermediate_cert.subject, + issuer_key=inter_key, + can_issue=False, + ) + return b"".join(map(to_pem, [leaf_cert, intermediate_cert])) + + +@pytest.fixture(scope="session") +def root_ca_path(root_cert, tmp_path_factory) -> Path: + "A path to a temporary .pem for the Root CA" + cert_path = tmp_path_factory.mktemp("fake_pki") / "fake_ca_cert.pem" + with cert_path.open("wb") as f: + f.write(to_pem(root_cert)) + return cert_path + + +def mkcert(subject, subject_key, issuer=None, issuer_key=None, can_issue=True): + cert = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer if issuer else subject) + .public_key(subject_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.datetime.utcnow()) + .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=1)) + .add_extension( + x509.SubjectAlternativeName([x509.DNSName("localhost")]), + critical=False, + ) + ) + + if can_issue: + cert = cert.add_extension( + x509.BasicConstraints(ca=True, path_length=None), + critical=True, + ) + + cert = cert.sign(issuer_key if issuer_key else subject_key, hashes.SHA256()) + return cert + + +def to_pem(cert: x509.Certificate) -> bytes: + return cert.public_bytes(serialization.Encoding.PEM) + + +def gen_key(): + return asymmetric.rsa.generate_private_key(public_exponent=0x10001, key_size=2048) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..b05023d --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,17 @@ +from simple_certmanager.utils import check_pem + + +def test_check_pem_checks_directly_issued(leaf_pem, root_ca_path): + assert check_pem(leaf_pem, ca=root_ca_path) + + +def test_check_pem_fails_unrooted_pem(leaf_pem): + assert not check_pem(leaf_pem) + + +def test_check_pem_checks_chain(chain_pem, root_ca_path): + assert check_pem(chain_pem, ca=root_ca_path) + + +def test_check_pem_fails_bad_chain(broken_chain_pem, root_ca_path): + assert not check_pem(broken_chain_pem, ca=root_ca_path)