Skip to content

Commit

Permalink
lock inside state (#133)
Browse files Browse the repository at this point in the history
* lock inside state

* expose lock guard types

* fix unit test

* dont arc swap multiple times

* lets rewrite everything

* not returning local state after handle
  • Loading branch information
angelo-rendina-prima authored Nov 30, 2022
1 parent 6c79c7d commit b97f8af
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 154 deletions.
15 changes: 12 additions & 3 deletions examples/customize_persistence_flow/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl AggregateManager for CounterAggregate {

async fn store_events(
&self,
aggregate_state: &AggregateState<Self::State>,
aggregate_state: &mut AggregateState<Self::State>,
events: Vec<Self::Event>,
) -> Result<Vec<StoreEvent<Self::Event>>, Self::Error> {
// Here is the persistence flow customization.
Expand All @@ -84,14 +84,23 @@ impl AggregateManager for CounterAggregate {
)
}

// Acquiring the list of projectors early, as it is an expensive operation.
let projectors = self.event_store().projectors();
for store_event in store_events.iter() {
for projector in self.event_store.projectors().iter() {
for projector in projectors.iter() {
projector.project(store_event, &mut connection).await?;
}
}

// We need to drop the lock on the aggregate state here as:
// 1. the events have already been persisted, hence the DB has the latest aggregate;
// 2. the policies below might need to access this aggregate atomically (causing a deadlock!).
drop(aggregate_state.take_lock());

// Acquiring the list of policies early, as it is an expensive operation.
let policies = self.event_store().policies();
for store_event in store_events.iter() {
for policy in self.event_store.policies().iter() {
for policy in policies.iter() {
// We want to just log errors instead of return them. This is the customization
// we wanted.
match policy.handle_event(store_event).await {
Expand Down
16 changes: 5 additions & 11 deletions examples/locking_strategies/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ use delete_aggregate::structs::CounterError;
/// Increment the value behind this `aggregate_id` as soon as atomical access can be obtained.
/// The lock can be obtained even if there are current optimistic accesses! Avoid mixing the two strategies when writing.
pub async fn increment_atomically(aggregate: CounterAggregate, aggregate_id: Uuid) -> Result<(), CounterError> {
// Obtain a lock for this aggregate_id, or wait for an existing one to be released.
let _guard = aggregate.lock(aggregate_id).await?;
// Only one atomical access at a time can proceed further.
let aggregate_state = aggregate.load(aggregate_id).await.unwrap_or_default();
let aggregate_state = aggregate.lock_and_load(aggregate_id).await?.unwrap_or_default();
aggregate
.handle_command(aggregate_state, CounterCommand::Increment)
.await?;
Expand All @@ -22,7 +19,7 @@ pub async fn increment_atomically(aggregate: CounterAggregate, aggregate_id: Uui
/// Optimistic access ignores any current active lock! Avoid mixing the two strategies when writing.
pub async fn increment_optimistically(aggregate: CounterAggregate, aggregate_id: Uuid) -> Result<(), CounterError> {
// Every optimistic access can take place concurrently...
let aggregate_state = aggregate.load(aggregate_id).await.unwrap_or_default();
let aggregate_state = aggregate.load(aggregate_id).await?.unwrap_or_default();
// ...and events are persisted in non-deterministic order.
// This could raise optimistic locking errors, that must be handled downstream.
aggregate
Expand All @@ -35,10 +32,7 @@ pub async fn increment_optimistically(aggregate: CounterAggregate, aggregate_id:
/// Avoid using atomic reads if writes are optimistic, as the state would be modified anyway!
/// If writes are atomic, it is perfectly fine to use a mixture of atomic and optimistic reads.
pub async fn with_atomic_read(aggregate: CounterAggregate, aggregate_id: Uuid) -> Result<(), CounterError> {
// Obtain a lock for this aggregate_id, or wait for an existing one to be released.
let _guard = aggregate.lock(aggregate_id).await?;
// Only one atomical access at a time can proceed further.
let aggregate_state = aggregate.load(aggregate_id).await.unwrap_or_default();
let aggregate_state = aggregate.lock_and_load(aggregate_id).await?.unwrap_or_default();
// No one else (employing locking!) can read or modify the state just loaded here,
// ensuring this really is the *latest* aggregate state.
println!(
Expand All @@ -50,10 +44,10 @@ pub async fn with_atomic_read(aggregate: CounterAggregate, aggregate_id: Uuid) -

/// Load the aggregate state for read-only purposes, optimistically assuming nothing is modifying it.
/// If writes are atomic, it is perfectly fine to use a mixture of atomic and optimistic reads.
/// Otherwise, optimistic reads are fine: beware there are no guarantees the state loaded is actually the latest.
/// Otherwise, optimistic reads are allowed: beware there are no guarantees the state loaded is actually the latest.
pub async fn with_optimistic_read(aggregate: CounterAggregate, aggregate_id: Uuid) -> Result<(), CounterError> {
// Read the state now, ignoring any explicit locking...
let aggregate_state = aggregate.load(aggregate_id).await.unwrap_or_default();
let aggregate_state = aggregate.load(aggregate_id).await?.unwrap_or_default();
// ...but nothing prevents something else from updating the data in the store in the meanwhile,
// so the `aggregate_state` here might be already outdated at this point.
println!(
Expand Down
4 changes: 2 additions & 2 deletions examples/simple_saga/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ impl Policy<LoggingAggregate> for LoggingPolicy {
let aggregate_state: AggregateState<u64> = self
.aggregate
.load(aggregate_id)
.await
.unwrap_or_else(|| AggregateState::new(aggregate_id)); // This should never happen
.await?
.unwrap_or_else(|| AggregateState::with_id(aggregate_id)); // This should never happen

if let LoggingEvent::Received(msg) = event.payload() {
if msg.contains("fail_policy") {
Expand Down
99 changes: 36 additions & 63 deletions src/esrs/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use async_trait::async_trait;
use uuid::Uuid;

use crate::esrs::store::EventStoreLockGuard;
use crate::types::SequenceNumber;
use crate::{AggregateState, EventStore, StoreEvent};

/// The Aggregate trait is responsible for validating commands, mapping commands to events, and applying
Expand Down Expand Up @@ -44,14 +42,6 @@ pub trait Aggregate {
///
/// If this is not the case, this function is allowed to panic.
fn apply_event(state: Self::State, payload: Self::Event) -> Self::State;

/// Updates the aggregate state using the list of new events. Take a look to
/// [`Aggregate::apply_event`] for further information.
fn apply_events(state: Self::State, events: Vec<Self::Event>) -> Self::State {
events.into_iter().fold(state, |acc: Self::State, event: Self::Event| {
Self::apply_event(acc, event)
})
}
}

/// The AggregateManager is responsible for coupling the Aggregate with a Store, so that the events
Expand Down Expand Up @@ -80,72 +70,57 @@ pub trait AggregateManager: Aggregate {
/// Validates and handles the command onto the given state, and then passes the events to the store.
async fn handle_command(
&self,
aggregate_state: AggregateState<Self::State>,
mut aggregate_state: AggregateState<Self::State>,
command: Self::Command,
) -> Result<AggregateState<Self::State>, Self::Error> {
) -> Result<(), Self::Error> {
let events: Vec<Self::Event> = <Self as Aggregate>::handle_command(aggregate_state.inner(), command)?;
let stored_events: Vec<StoreEvent<Self::Event>> = self.store_events(&aggregate_state, events).await?;

Ok(<Self as AggregateManager>::apply_events(aggregate_state, stored_events))
}

/// Acquires a lock for the given aggregate, or waits for outstanding guards to be released.
///
/// Used to prevent concurrent access to the aggregate state.
/// Note that any process which does *not* `lock` will get immediate (possibly shared!) access.
/// ALL accesses (regardless of this guard) are subject to the usual optimistic locking strategy on write.
async fn lock(&self, aggregate_id: impl Into<Uuid> + Send) -> Result<EventStoreLockGuard, Self::Error> {
self.event_store().lock(aggregate_id.into()).await
}

/// Responsible for applying events in order onto the aggregate state, and incrementing the sequence number.
///
/// `events` will be passed in order of ascending sequence number.
///
/// You should _avoid_ implementing this function, and be _very_ careful if you decide to do so.
fn apply_events(
aggregate_state: AggregateState<Self::State>,
store_events: Vec<StoreEvent<Self::Event>>,
) -> AggregateState<Self::State> {
let sequence_number: SequenceNumber = store_events.last().map_or(0, StoreEvent::sequence_number);

let events: Vec<Self::Event> = store_events
.into_iter()
.map(|store_event| store_event.payload)
.collect();

let inner: Self::State = <Self as Aggregate>::apply_events(aggregate_state.inner, events);

AggregateState {
sequence_number,
inner,
..aggregate_state
}
self.store_events(&mut aggregate_state, events).await?;
Ok(())
}

/// Loads an aggregate instance from the event store, by applying previously persisted events onto
/// the aggregate state by order of their sequence number
///
/// You should _avoid_ implementing this function, and be _very_ careful if you decide to do so.
async fn load(&self, aggregate_id: impl Into<Uuid> + Send) -> Option<AggregateState<Self::State>> {
async fn load(
&self,
aggregate_id: impl Into<Uuid> + Send,
) -> Result<Option<AggregateState<Self::State>>, Self::Error> {
let aggregate_id: Uuid = aggregate_id.into();

let events: Vec<StoreEvent<Self::Event>> = self
let store_events: Vec<StoreEvent<Self::Event>> = self
.event_store()
.by_aggregate_id(aggregate_id)
.await
.ok()?
.await?
.into_iter()
.collect();

if events.is_empty() {
Ok(if store_events.is_empty() {
None
} else {
Some(<Self as AggregateManager>::apply_events(
AggregateState::new(aggregate_id),
events,
))
}
let aggregate_state = AggregateState::with_id(aggregate_id);
Some(aggregate_state.apply_store_events(store_events, Self::apply_event))
})
}

/// Acquires a lock on this aggregate instance, and only then loads it from the event store,
/// by applying previously persisted events onto the aggregate state by order of their sequence number.
///
/// The lock is contained in the returned `AggregateState`, and released when this is dropped.
/// It can also be extracted with the `take_lock` method for more advanced uses.
///
/// You should _avoid_ implementing this function, and be _very_ careful if you decide to do so.
async fn lock_and_load(
&self,
aggregate_id: impl Into<Uuid> + Send,
) -> Result<Option<AggregateState<Self::State>>, Self::Error> {
let id = aggregate_id.into();
let guard = self.event_store().lock(id).await?;

Ok(self.load(id).await?.map(|mut state| {
state.set_lock(guard);
state
}))
}

/// Transactional persists events in store - recording it in the aggregate instance's history.
Expand All @@ -160,12 +135,10 @@ pub trait AggregateManager: Aggregate {
/// behaviour of policies, e.g. if you want to log something on error.
async fn store_events(
&self,
aggregate_state: &AggregateState<Self::State>,
aggregate_state: &mut AggregateState<Self::State>,
events: Vec<Self::Event>,
) -> Result<Vec<StoreEvent<Self::Event>>, Self::Error> {
self.event_store()
.persist(aggregate_state.id, events, aggregate_state.next_sequence_number())
.await
self.event_store().persist(aggregate_state, events).await
}

/// `delete` should either complete the aggregate instance, along with all its associated events
Expand Down
23 changes: 17 additions & 6 deletions src/esrs/postgres/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::esrs::policy;
use crate::esrs::postgres::projector::Consistency;
use crate::esrs::store::{EventStoreLockGuard, UnlockOnDrop};
use crate::types::SequenceNumber;
use crate::{Aggregate, AggregateManager, EventStore, StoreEvent};
use crate::{Aggregate, AggregateManager, AggregateState, EventStore, StoreEvent};

use super::{event, projector, statement::Statements};

Expand Down Expand Up @@ -245,14 +245,16 @@ where

async fn persist(
&self,
aggregate_id: Uuid,
events: Vec<Manager::Event>,
starting_sequence_number: SequenceNumber,
aggregate_state: &mut AggregateState<<Self::Manager as Aggregate>::State>,
events: Vec<<Self::Manager as Aggregate>::Event>,
) -> Result<Vec<StoreEvent<Manager::Event>>, Manager::Error> {
let mut transaction: Transaction<Postgres> = self.inner.pool.begin().await?;
let occurred_on: DateTime<Utc> = Utc::now();
let mut store_events: Vec<StoreEvent<Manager::Event>> = vec![];

let starting_sequence_number = aggregate_state.next_sequence_number();
let aggregate_id = *aggregate_state.id();

for (index, event) in (0..).zip(events.into_iter()) {
let store_event: StoreEvent<<Manager as Aggregate>::Event> = self
.save_event(
Expand All @@ -267,8 +269,10 @@ where
store_events.push(store_event);
}

// Acquiring the list of projectors early, as it is an expensive operation.
let projectors = self.projectors();
for store_event in &store_events {
for projector in self.projectors().iter() {
for projector in projectors.iter() {
let span = tracing::trace_span!(
"esrs_project_event",
event_id = %store_event.id,
Expand All @@ -292,8 +296,15 @@ where

transaction.commit().await?;

// We need to drop the lock on the aggregate state here as:
// 1. the events have already been persisted, hence the DB has the latest aggregate;
// 2. the policies below might need to access this aggregate atomically (causing a deadlock!).
drop(aggregate_state.take_lock());

// Acquiring the list of policies early, as it is an expensive operation.
let policies = self.policies();
for store_event in &store_events {
for policy in self.policies().iter() {
for policy in policies.iter() {
let span = tracing::info_span!("esrs_apply_policy" , event_id = %store_event.id, aggregate_id = %store_event.aggregate_id, policy = policy.name());
let _policy_result = policy.handle_event(store_event).instrument(span).await;
}
Expand Down
35 changes: 19 additions & 16 deletions src/esrs/postgres/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use sqlx::{PgConnection, Pool, Postgres};
use uuid::Uuid;

use crate::postgres::{PgStore, Projector};
use crate::{Aggregate, AggregateManager, EventStore, Policy, StoreEvent};
use crate::{Aggregate, AggregateManager, AggregateState, EventStore, Policy, StoreEvent};

#[sqlx::test]
fn setup_database_test(pool: Pool<Postgres>) {
Expand Down Expand Up @@ -87,17 +87,18 @@ fn by_aggregate_id_insert_and_delete_by_aggregate_id_test(pool: Pool<Postgres>)
fn persist_single_event_test(pool: Pool<Postgres>) {
let store: PgStore<TestAggregate> = PgStore::new(pool.clone()).setup().await.unwrap();

let event_internal_id: Uuid = Uuid::new_v4();
let aggregate_id: Uuid = Uuid::new_v4();
let mut aggregate_state = AggregateState::new();
let aggregate_id = *aggregate_state.id();

let event_internal_id: Uuid = Uuid::new_v4();
let store_event: Vec<StoreEvent<TestEvent>> =
EventStore::persist(&store, aggregate_id, vec![TestEvent { id: event_internal_id }], 0)
EventStore::persist(&store, &mut aggregate_state, vec![TestEvent { id: event_internal_id }])
.await
.unwrap();

assert_eq!(store_event[0].aggregate_id, aggregate_id);
assert_eq!(store_event[0].payload.id, event_internal_id);
assert_eq!(store_event[0].sequence_number, 0);
assert_eq!(store_event[0].sequence_number, 1);

let store_events: Vec<StoreEvent<TestEvent>> = store.by_aggregate_id(aggregate_id).await.unwrap();
assert_eq!(store_events.len(), 1);
Expand All @@ -109,24 +110,24 @@ fn persist_multiple_events_test(pool: Pool<Postgres>) {

let test_event_1: TestEvent = TestEvent { id: Uuid::new_v4() };
let test_event_2: TestEvent = TestEvent { id: Uuid::new_v4() };
let aggregate_id: Uuid = Uuid::new_v4();
let mut aggregate_state = AggregateState::new();
let aggregate_id = *aggregate_state.id();

let store_event: Vec<StoreEvent<TestEvent>> = EventStore::persist(
&store,
aggregate_id,
&mut aggregate_state,
vec![test_event_1.clone(), test_event_2.clone()],
0,
)
.await
.unwrap();

assert_eq!(store_event.len(), 2);
assert_eq!(store_event[0].aggregate_id, aggregate_id);
assert_eq!(store_event[0].payload.id, test_event_1.id);
assert_eq!(store_event[0].sequence_number, 0);
assert_eq!(store_event[0].sequence_number, 1);
assert_eq!(store_event[1].aggregate_id, aggregate_id);
assert_eq!(store_event[1].payload.id, test_event_2.id);
assert_eq!(store_event[1].sequence_number, 1);
assert_eq!(store_event[1].sequence_number, 2);

let store_events: Vec<StoreEvent<TestEvent>> = store.by_aggregate_id(aggregate_id).await.unwrap();
assert_eq!(store_events.len(), 2);
Expand All @@ -143,10 +144,11 @@ fn event_projection_test(pool: Pool<Postgres>) {
create_test_projection_table(&pool).await;

let event_internal_id: Uuid = Uuid::new_v4();
let aggregate_id: Uuid = Uuid::new_v4();
let mut aggregate_state = AggregateState::new();
let aggregate_id = *aggregate_state.id();

let _store_event: Vec<StoreEvent<TestEvent>> =
EventStore::persist(&store, aggregate_id, vec![TestEvent { id: event_internal_id }], 0)
EventStore::persist(&store, &mut aggregate_state, vec![TestEvent { id: event_internal_id }])
.await
.unwrap();

Expand All @@ -171,10 +173,11 @@ fn delete_store_events_and_projections_test(pool: Pool<Postgres>) {
create_test_projection_table(&pool).await;

let event_internal_id: Uuid = Uuid::new_v4();
let aggregate_id: Uuid = Uuid::new_v4();
let mut aggregate_state = AggregateState::new();
let aggregate_id = *aggregate_state.id();

let _store_event: Vec<StoreEvent<TestEvent>> =
EventStore::persist(&store, aggregate_id, vec![TestEvent { id: event_internal_id }], 0)
EventStore::persist(&store, &mut aggregate_state, vec![TestEvent { id: event_internal_id }])
.await
.unwrap();

Expand Down Expand Up @@ -217,10 +220,10 @@ fn policy_test(pool: Pool<Postgres>) {
.unwrap();

let event_internal_id: Uuid = Uuid::new_v4();
let aggregate_id: Uuid = Uuid::new_v4();
let mut aggregate_state = AggregateState::new();

let _store_event: Vec<StoreEvent<TestEvent>> =
EventStore::persist(&store, aggregate_id, vec![TestEvent { id: event_internal_id }], 0)
EventStore::persist(&store, &mut aggregate_state, vec![TestEvent { id: event_internal_id }])
.await
.unwrap();

Expand Down
Loading

0 comments on commit b97f8af

Please sign in to comment.