Skip to content

Commit

Permalink
verification: move Store into PolicyBuilder/ServerVerifier
Browse files Browse the repository at this point in the history
Signed-off-by: William Woodruff <[email protected]>
  • Loading branch information
woodruffw committed Oct 31, 2023
1 parent d6e8514 commit c563378
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 25 deletions.
4 changes: 3 additions & 1 deletion src/cryptography/hazmat/bindings/_rust/x509.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def create_x509_crl(
) -> x509.CertificateRevocationList: ...
def create_server_verifier(
name: x509.verification.Subject,
store: Store,
time: datetime.datetime | None,
) -> x509.verification.ServerVerifier: ...

Expand All @@ -53,11 +54,12 @@ class ServerVerifier:
def subject(self) -> x509.verification.Subject: ...
@property
def validation_time(self) -> datetime.datetime: ...
@property
def store(self) -> Store: ...
def verify(
self,
leaf: x509.Certificate,
intermediates: list[x509.Certificate],
store: Store,
) -> list[x509.Certificate]: ...

class Store:
Expand Down
25 changes: 21 additions & 4 deletions src/cryptography/x509/verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ def __init__(
self,
*,
time: datetime.datetime | None = None,
store: Store | None = None,
):
self._time = time
self._store = store

def time(self, new_time: datetime.datetime) -> PolicyBuilder:
"""
Expand All @@ -34,13 +36,28 @@ def time(self, new_time: datetime.datetime) -> PolicyBuilder:
if self._time is not None:
raise ValueError("The validation time may only be set once.")

return PolicyBuilder(
time=new_time,
)
return PolicyBuilder(time=new_time, store=self._store)

def store(self, new_store: Store) -> PolicyBuilder:
"""
Sets the trust store.
"""

if self._store is not None:
raise ValueError("The trust store may only be set once.")

return PolicyBuilder(time=self._time, store=new_store)

def build_server_verifier(self, subject: Subject) -> ServerVerifier:
"""
Builds a verifier for verifying server certificates.
"""

return rust_x509.create_server_verifier(subject, self._time)
if self._store is None:
raise ValueError("A server verifier must have a trust store")

return rust_x509.create_server_verifier(
subject,
self._store,
self._time,
)
2 changes: 1 addition & 1 deletion src/rust/cryptography-x509/src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl KeyUsage<'_> {
mod tests {
use crate::oid::{AUTHORITY_KEY_IDENTIFIER_OID, BASIC_CONSTRAINTS_OID};

use super::{BasicConstraints, DuplicateExtensionsError, Extension, Extensions, KeyUsage};
use super::{BasicConstraints, Extension, Extensions, KeyUsage};

#[test]
fn test_get_extension() {
Expand Down
5 changes: 4 additions & 1 deletion src/rust/src/x509/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ struct PyServerVerifier {
#[pyo3(get, name = "subject")]
py_subject: pyo3::Py<pyo3::PyAny>,
policy: OwnedPolicy,
#[pyo3(get)]
store: pyo3::Py<PyStore>,
}

impl PyServerVerifier {
Expand All @@ -96,7 +98,6 @@ impl PyServerVerifier {
_py: pyo3::Python<'p>,
_leaf: &PyCertificate,
_intermediates: &'p pyo3::types::PyList,
_store: &'p PyStore,
) -> CryptographyResult<Vec<PyCertificate>> {
Err(pyo3::exceptions::PyNotImplementedError::new_err("unimplemented").into())
}
Expand Down Expand Up @@ -152,6 +153,7 @@ fn build_subject<'a>(
fn create_server_verifier(
py: pyo3::Python<'_>,
subject: pyo3::Py<pyo3::PyAny>,
store: pyo3::Py<PyStore>,
time: Option<&pyo3::PyAny>,
) -> pyo3::PyResult<PyServerVerifier> {
let time = match time {
Expand All @@ -172,6 +174,7 @@ fn create_server_verifier(
Ok(PyServerVerifier {
py_subject: subject,
policy,
store,
})
}

Expand Down
60 changes: 42 additions & 18 deletions tests/x509/test_verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import datetime
import os
from functools import lru_cache
from ipaddress import IPv4Address

import pytest
Expand All @@ -14,49 +15,77 @@
from tests.x509.test_x509 import _load_cert


@lru_cache(maxsize=1)
def dummy_store() -> Store:
cert = _load_cert(
os.path.join("x509", "cryptography.io.pem"),
x509.load_pem_x509_certificate,
)
return Store([cert])


class TestPolicyBuilder:
def test_time_already_set(self):
with pytest.raises(ValueError):
PolicyBuilder().time(datetime.datetime.now()).time(
datetime.datetime.now()
)

def test_store_already_set(self):
with pytest.raises(ValueError):
PolicyBuilder().store(dummy_store()).store(dummy_store())

def test_ipaddress_subject(self):
policy = PolicyBuilder().build_server_verifier(
IPAddress(IPv4Address("0.0.0.0"))
policy = (
PolicyBuilder()
.store(dummy_store())
.build_server_verifier(IPAddress(IPv4Address("0.0.0.0")))
)
assert policy.subject == IPAddress(IPv4Address("0.0.0.0"))

def test_dnsname_subject(self):
policy = PolicyBuilder().build_server_verifier(
DNSName("cryptography.io")
policy = (
PolicyBuilder()
.store(dummy_store())
.build_server_verifier(DNSName("cryptography.io"))
)
assert policy.subject == DNSName("cryptography.io")

def test_subject_bad_types(self):
# Subject must be a supported GeneralName type
with pytest.raises(TypeError):
PolicyBuilder().build_server_verifier(
PolicyBuilder().store(dummy_store()).build_server_verifier(
"cryptography.io" # type: ignore[arg-type]
)
with pytest.raises(TypeError):
PolicyBuilder().build_server_verifier("0.0.0.0") # type: ignore[arg-type]
PolicyBuilder().store(dummy_store()).build_server_verifier(
"0.0.0.0"
) # type: ignore[arg-type]
with pytest.raises(TypeError):
PolicyBuilder().build_server_verifier(
PolicyBuilder().store(dummy_store()).build_server_verifier(
IPv4Address("0.0.0.0") # type: ignore[arg-type]
)
with pytest.raises(TypeError):
PolicyBuilder().build_server_verifier(None) # type: ignore[arg-type]
PolicyBuilder().store(dummy_store()).build_server_verifier(None) # type: ignore[arg-type]

def test_builder_pattern(self):
now = datetime.datetime.now().replace(microsecond=0)
store = dummy_store()

builder = PolicyBuilder()
builder = builder.time(now)
builder = builder.store(store)

verifier = builder.build_server_verifier(DNSName("cryptography.io"))
assert verifier.subject == DNSName("cryptography.io")
assert verifier.validation_time == now
assert verifier.store == store

def test_build_server_verifier_missing_store(self):
with pytest.raises(
ValueError, match="A server verifier must have a trust store"
):
PolicyBuilder().build_server_verifier(DNSName("cryptography.io"))


class TestStore:
Expand All @@ -68,22 +97,17 @@ def test_store_rejects_non_certificates(self):
with pytest.raises(TypeError):
Store(["not a cert"]) # type: ignore[list-item]

def test_store_initializes(self):
cert = _load_cert(
os.path.join("x509", "cryptography.io.pem"),
x509.load_pem_x509_certificate,
)
assert Store([cert]) is not None


class TestServerVerifier:
def test_not_implemented(self):
verifier = PolicyBuilder().build_server_verifier(
DNSName("cryptography.io")
verifier = (
PolicyBuilder()
.store(dummy_store())
.build_server_verifier(DNSName("cryptography.io"))
)
cert = _load_cert(
os.path.join("x509", "cryptography.io.pem"),
x509.load_pem_x509_certificate,
)
with pytest.raises(NotImplementedError):
verifier.verify(cert, [], Store([cert]))
verifier.verify(cert, [])

0 comments on commit c563378

Please sign in to comment.