Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add top-level ServerVerifier.verify API #9805

Merged
merged 5 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/x509/verification.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ chain building, etc.

The verifier's validation time.

.. attribute:: store

:type: :class:`Store`

The verifier's trust store.

.. class:: PolicyBuilder

.. versionadded:: 42.0.0
Expand All @@ -75,6 +81,14 @@ chain building, etc.

:returns: A new instance of :class:`PolicyBuilder`

.. method:: store(new_store)

Sets the verifier's trust store.

:param new_store: The :class:`Store` to use in the verifier

:returns: A new instance of :class:`PolicyBuilder`

.. method:: build_server_verifier(subject)

Builds a verifier for verifying server certificates.
Expand Down
8 changes: 8 additions & 0 deletions src/cryptography/hazmat/bindings/_rust/x509.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def create_x509_crl(
) -> x509.CertificateRevocationList: ...
def create_server_verifier(
name: x509.verification.Subject,
store: Store,
time: datetime.datetime | None,
) -> x509.verification.ServerVerifier: ...

Expand All @@ -53,6 +54,13 @@ class ServerVerifier:
def subject(self) -> x509.verification.Subject: ...
@property
def validation_time(self) -> datetime.datetime: ...
@property
def store(self) -> Store: ...
def verify(
self,
leaf: x509.Certificate,
intermediates: list[x509.Certificate],
) -> list[x509.Certificate]: ...

class Store:
def __init__(self, certs: list[x509.Certificate]) -> None: ...
25 changes: 21 additions & 4 deletions src/cryptography/x509/verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ def __init__(
self,
*,
time: datetime.datetime | None = None,
store: Store | None = None,
):
self._time = time
self._store = store

def time(self, new_time: datetime.datetime) -> PolicyBuilder:
"""
Expand All @@ -34,13 +36,28 @@ def time(self, new_time: datetime.datetime) -> PolicyBuilder:
if self._time is not None:
raise ValueError("The validation time may only be set once.")

return PolicyBuilder(
time=new_time,
)
return PolicyBuilder(time=new_time, store=self._store)

def store(self, new_store: Store) -> PolicyBuilder:
"""
Sets the trust store.
"""

if self._store is not None:
raise ValueError("The trust store may only be set once.")

return PolicyBuilder(time=self._time, store=new_store)

def build_server_verifier(self, subject: Subject) -> ServerVerifier:
"""
Builds a verifier for verifying server certificates.
"""

return rust_x509.create_server_verifier(subject, self._time)
if self._store is None:
raise ValueError("A server verifier must have a trust store")

return rust_x509.create_server_verifier(
subject,
self._store,
self._time,
)
2 changes: 1 addition & 1 deletion src/rust/cryptography-x509/src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl KeyUsage<'_> {
mod tests {
use crate::oid::{AUTHORITY_KEY_IDENTIFIER_OID, BASIC_CONSTRAINTS_OID};

use super::{BasicConstraints, DuplicateExtensionsError, Extension, Extensions, KeyUsage};
use super::{BasicConstraints, Extension, Extensions, KeyUsage};

#[test]
fn test_get_extension() {
Expand Down
13 changes: 13 additions & 0 deletions src/rust/src/x509/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ struct PyServerVerifier {
#[pyo3(get, name = "subject")]
py_subject: pyo3::Py<pyo3::PyAny>,
policy: OwnedPolicy,
#[pyo3(get)]
store: pyo3::Py<PyStore>,
}

impl PyServerVerifier {
Expand All @@ -90,6 +92,15 @@ impl PyServerVerifier {
fn validation_time<'p>(&self, py: pyo3::Python<'p>) -> pyo3::PyResult<&'p pyo3::PyAny> {
datetime_to_py(py, &self.as_policy().validation_time)
}

fn verify<'p>(
&self,
_py: pyo3::Python<'p>,
_leaf: &PyCertificate,
_intermediates: &'p pyo3::types::PyList,
) -> CryptographyResult<Vec<PyCertificate>> {
Err(pyo3::exceptions::PyNotImplementedError::new_err("unimplemented").into())
}
}

fn build_subject_owner(
Expand Down Expand Up @@ -142,6 +153,7 @@ fn build_subject<'a>(
fn create_server_verifier(
py: pyo3::Python<'_>,
subject: pyo3::Py<pyo3::PyAny>,
store: pyo3::Py<PyStore>,
time: Option<&pyo3::PyAny>,
) -> pyo3::PyResult<PyServerVerifier> {
let time = match time {
Expand All @@ -162,6 +174,7 @@ fn create_server_verifier(
Ok(PyServerVerifier {
py_subject: subject,
policy,
store,
})
}

Expand Down
57 changes: 45 additions & 12 deletions tests/x509/test_verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import datetime
import os
from functools import lru_cache
from ipaddress import IPv4Address

import pytest
Expand All @@ -14,53 +15,77 @@
from tests.x509.test_x509 import _load_cert


@lru_cache(maxsize=1)
def dummy_store() -> Store:
cert = _load_cert(
os.path.join("x509", "cryptography.io.pem"),
x509.load_pem_x509_certificate,
)
return Store([cert])


class TestPolicyBuilder:
def test_time_already_set(self):
with pytest.raises(ValueError):
PolicyBuilder().time(datetime.datetime.now()).time(
datetime.datetime.now()
)

def test_store_already_set(self):
with pytest.raises(ValueError):
PolicyBuilder().store(dummy_store()).store(dummy_store())

def test_ipaddress_subject(self):
policy = PolicyBuilder().build_server_verifier(
IPAddress(IPv4Address("0.0.0.0"))
policy = (
PolicyBuilder()
.store(dummy_store())
.build_server_verifier(IPAddress(IPv4Address("0.0.0.0")))
)
assert policy.subject == IPAddress(IPv4Address("0.0.0.0"))

def test_dnsname_subject(self):
policy = PolicyBuilder().build_server_verifier(
DNSName("cryptography.io")
policy = (
PolicyBuilder()
.store(dummy_store())
.build_server_verifier(DNSName("cryptography.io"))
)
assert policy.subject == DNSName("cryptography.io")

def test_subject_bad_types(self):
# Subject must be a supported GeneralName type
with pytest.raises(TypeError):
PolicyBuilder().build_server_verifier(
PolicyBuilder().store(dummy_store()).build_server_verifier(
"cryptography.io" # type: ignore[arg-type]
)
with pytest.raises(TypeError):
PolicyBuilder().build_server_verifier(
PolicyBuilder().store(dummy_store()).build_server_verifier(
"0.0.0.0" # type: ignore[arg-type]
)
with pytest.raises(TypeError):
PolicyBuilder().build_server_verifier(
PolicyBuilder().store(dummy_store()).build_server_verifier(
IPv4Address("0.0.0.0") # type: ignore[arg-type]
)
with pytest.raises(TypeError):
PolicyBuilder().build_server_verifier(
None # type: ignore[arg-type]
)
PolicyBuilder().store(dummy_store()).build_server_verifier(None) # type: ignore[arg-type]

def test_builder_pattern(self):
now = datetime.datetime.now().replace(microsecond=0)
store = dummy_store()

builder = PolicyBuilder()
builder = builder.time(now)
builder = builder.store(store)

verifier = builder.build_server_verifier(DNSName("cryptography.io"))
assert verifier.subject == DNSName("cryptography.io")
assert verifier.validation_time == now
assert verifier.store == store

def test_build_server_verifier_missing_store(self):
with pytest.raises(
ValueError, match="A server verifier must have a trust store"
):
PolicyBuilder().build_server_verifier(DNSName("cryptography.io"))


class TestStore:
Expand All @@ -72,9 +97,17 @@ def test_store_rejects_non_certificates(self):
with pytest.raises(TypeError):
Store(["not a cert"]) # type: ignore[list-item]

def test_store_initializes(self):

class TestServerVerifier:
def test_not_implemented(self):
verifier = (
PolicyBuilder()
.store(dummy_store())
.build_server_verifier(DNSName("cryptography.io"))
)
cert = _load_cert(
os.path.join("x509", "cryptography.io.pem"),
x509.load_pem_x509_certificate,
)
assert Store([cert]) is not None
with pytest.raises(NotImplementedError):
verifier.verify(cert, [])
Loading