From b97f8af9cc91674f1aef88239ed4bdbbb347f9a0 Mon Sep 17 00:00:00 2001 From: angelo-rendina-prima <95231944+angelo-rendina-prima@users.noreply.github.com> Date: Wed, 30 Nov 2022 16:39:51 +0000 Subject: [PATCH] lock inside state (#133) * lock inside state * expose lock guard types * fix unit test * dont arc swap multiple times * lets rewrite everything * not returning local state after handle --- .../src/aggregate.rs | 15 ++- examples/locking_strategies/src/lib.rs | 16 +-- examples/simple_saga/src/lib.rs | 4 +- src/esrs/aggregate.rs | 99 +++++++---------- src/esrs/postgres/store.rs | 23 ++-- src/esrs/postgres/tests/mod.rs | 35 +++--- src/esrs/state.rs | 100 +++++++++++++++--- src/esrs/store.rs | 18 ++-- src/esrs/tests/mod.rs | 49 ++++----- src/lib.rs | 2 +- 10 files changed, 207 insertions(+), 154 deletions(-) diff --git a/examples/customize_persistence_flow/src/aggregate.rs b/examples/customize_persistence_flow/src/aggregate.rs index 41682e8e..dcaf84b6 100644 --- a/examples/customize_persistence_flow/src/aggregate.rs +++ b/examples/customize_persistence_flow/src/aggregate.rs @@ -59,7 +59,7 @@ impl AggregateManager for CounterAggregate { async fn store_events( &self, - aggregate_state: &AggregateState, + aggregate_state: &mut AggregateState, events: Vec, ) -> Result>, Self::Error> { // Here is the persistence flow customization. @@ -84,14 +84,23 @@ impl AggregateManager for CounterAggregate { ) } + // Acquiring the list of projectors early, as it is an expensive operation. + let projectors = self.event_store().projectors(); for store_event in store_events.iter() { - for projector in self.event_store.projectors().iter() { + for projector in projectors.iter() { projector.project(store_event, &mut connection).await?; } } + // We need to drop the lock on the aggregate state here as: + // 1. the events have already been persisted, hence the DB has the latest aggregate; + // 2. the policies below might need to access this aggregate atomically (causing a deadlock!). + drop(aggregate_state.take_lock()); + + // Acquiring the list of policies early, as it is an expensive operation. + let policies = self.event_store().policies(); for store_event in store_events.iter() { - for policy in self.event_store.policies().iter() { + for policy in policies.iter() { // We want to just log errors instead of return them. This is the customization // we wanted. match policy.handle_event(store_event).await { diff --git a/examples/locking_strategies/src/lib.rs b/examples/locking_strategies/src/lib.rs index c35e53b8..6f0d259f 100644 --- a/examples/locking_strategies/src/lib.rs +++ b/examples/locking_strategies/src/lib.rs @@ -8,10 +8,7 @@ use delete_aggregate::structs::CounterError; /// Increment the value behind this `aggregate_id` as soon as atomical access can be obtained. /// The lock can be obtained even if there are current optimistic accesses! Avoid mixing the two strategies when writing. pub async fn increment_atomically(aggregate: CounterAggregate, aggregate_id: Uuid) -> Result<(), CounterError> { - // Obtain a lock for this aggregate_id, or wait for an existing one to be released. - let _guard = aggregate.lock(aggregate_id).await?; - // Only one atomical access at a time can proceed further. - let aggregate_state = aggregate.load(aggregate_id).await.unwrap_or_default(); + let aggregate_state = aggregate.lock_and_load(aggregate_id).await?.unwrap_or_default(); aggregate .handle_command(aggregate_state, CounterCommand::Increment) .await?; @@ -22,7 +19,7 @@ pub async fn increment_atomically(aggregate: CounterAggregate, aggregate_id: Uui /// Optimistic access ignores any current active lock! Avoid mixing the two strategies when writing. pub async fn increment_optimistically(aggregate: CounterAggregate, aggregate_id: Uuid) -> Result<(), CounterError> { // Every optimistic access can take place concurrently... - let aggregate_state = aggregate.load(aggregate_id).await.unwrap_or_default(); + let aggregate_state = aggregate.load(aggregate_id).await?.unwrap_or_default(); // ...and events are persisted in non-deterministic order. // This could raise optimistic locking errors, that must be handled downstream. aggregate @@ -35,10 +32,7 @@ pub async fn increment_optimistically(aggregate: CounterAggregate, aggregate_id: /// Avoid using atomic reads if writes are optimistic, as the state would be modified anyway! /// If writes are atomic, it is perfectly fine to use a mixture of atomic and optimistic reads. pub async fn with_atomic_read(aggregate: CounterAggregate, aggregate_id: Uuid) -> Result<(), CounterError> { - // Obtain a lock for this aggregate_id, or wait for an existing one to be released. - let _guard = aggregate.lock(aggregate_id).await?; - // Only one atomical access at a time can proceed further. - let aggregate_state = aggregate.load(aggregate_id).await.unwrap_or_default(); + let aggregate_state = aggregate.lock_and_load(aggregate_id).await?.unwrap_or_default(); // No one else (employing locking!) can read or modify the state just loaded here, // ensuring this really is the *latest* aggregate state. println!( @@ -50,10 +44,10 @@ pub async fn with_atomic_read(aggregate: CounterAggregate, aggregate_id: Uuid) - /// Load the aggregate state for read-only purposes, optimistically assuming nothing is modifying it. /// If writes are atomic, it is perfectly fine to use a mixture of atomic and optimistic reads. -/// Otherwise, optimistic reads are fine: beware there are no guarantees the state loaded is actually the latest. +/// Otherwise, optimistic reads are allowed: beware there are no guarantees the state loaded is actually the latest. pub async fn with_optimistic_read(aggregate: CounterAggregate, aggregate_id: Uuid) -> Result<(), CounterError> { // Read the state now, ignoring any explicit locking... - let aggregate_state = aggregate.load(aggregate_id).await.unwrap_or_default(); + let aggregate_state = aggregate.load(aggregate_id).await?.unwrap_or_default(); // ...but nothing prevents something else from updating the data in the store in the meanwhile, // so the `aggregate_state` here might be already outdated at this point. println!( diff --git a/examples/simple_saga/src/lib.rs b/examples/simple_saga/src/lib.rs index 96f41aaa..672f266d 100644 --- a/examples/simple_saga/src/lib.rs +++ b/examples/simple_saga/src/lib.rs @@ -95,8 +95,8 @@ impl Policy for LoggingPolicy { let aggregate_state: AggregateState = self .aggregate .load(aggregate_id) - .await - .unwrap_or_else(|| AggregateState::new(aggregate_id)); // This should never happen + .await? + .unwrap_or_else(|| AggregateState::with_id(aggregate_id)); // This should never happen if let LoggingEvent::Received(msg) = event.payload() { if msg.contains("fail_policy") { diff --git a/src/esrs/aggregate.rs b/src/esrs/aggregate.rs index 56e96580..38fd04ba 100644 --- a/src/esrs/aggregate.rs +++ b/src/esrs/aggregate.rs @@ -1,8 +1,6 @@ use async_trait::async_trait; use uuid::Uuid; -use crate::esrs::store::EventStoreLockGuard; -use crate::types::SequenceNumber; use crate::{AggregateState, EventStore, StoreEvent}; /// The Aggregate trait is responsible for validating commands, mapping commands to events, and applying @@ -44,14 +42,6 @@ pub trait Aggregate { /// /// If this is not the case, this function is allowed to panic. fn apply_event(state: Self::State, payload: Self::Event) -> Self::State; - - /// Updates the aggregate state using the list of new events. Take a look to - /// [`Aggregate::apply_event`] for further information. - fn apply_events(state: Self::State, events: Vec) -> Self::State { - events.into_iter().fold(state, |acc: Self::State, event: Self::Event| { - Self::apply_event(acc, event) - }) - } } /// The AggregateManager is responsible for coupling the Aggregate with a Store, so that the events @@ -80,72 +70,57 @@ pub trait AggregateManager: Aggregate { /// Validates and handles the command onto the given state, and then passes the events to the store. async fn handle_command( &self, - aggregate_state: AggregateState, + mut aggregate_state: AggregateState, command: Self::Command, - ) -> Result, Self::Error> { + ) -> Result<(), Self::Error> { let events: Vec = ::handle_command(aggregate_state.inner(), command)?; - let stored_events: Vec> = self.store_events(&aggregate_state, events).await?; - - Ok(::apply_events(aggregate_state, stored_events)) - } - - /// Acquires a lock for the given aggregate, or waits for outstanding guards to be released. - /// - /// Used to prevent concurrent access to the aggregate state. - /// Note that any process which does *not* `lock` will get immediate (possibly shared!) access. - /// ALL accesses (regardless of this guard) are subject to the usual optimistic locking strategy on write. - async fn lock(&self, aggregate_id: impl Into + Send) -> Result { - self.event_store().lock(aggregate_id.into()).await - } - - /// Responsible for applying events in order onto the aggregate state, and incrementing the sequence number. - /// - /// `events` will be passed in order of ascending sequence number. - /// - /// You should _avoid_ implementing this function, and be _very_ careful if you decide to do so. - fn apply_events( - aggregate_state: AggregateState, - store_events: Vec>, - ) -> AggregateState { - let sequence_number: SequenceNumber = store_events.last().map_or(0, StoreEvent::sequence_number); - - let events: Vec = store_events - .into_iter() - .map(|store_event| store_event.payload) - .collect(); - - let inner: Self::State = ::apply_events(aggregate_state.inner, events); - - AggregateState { - sequence_number, - inner, - ..aggregate_state - } + self.store_events(&mut aggregate_state, events).await?; + Ok(()) } /// Loads an aggregate instance from the event store, by applying previously persisted events onto /// the aggregate state by order of their sequence number /// /// You should _avoid_ implementing this function, and be _very_ careful if you decide to do so. - async fn load(&self, aggregate_id: impl Into + Send) -> Option> { + async fn load( + &self, + aggregate_id: impl Into + Send, + ) -> Result>, Self::Error> { let aggregate_id: Uuid = aggregate_id.into(); - let events: Vec> = self + let store_events: Vec> = self .event_store() .by_aggregate_id(aggregate_id) - .await - .ok()? + .await? .into_iter() .collect(); - if events.is_empty() { + Ok(if store_events.is_empty() { None } else { - Some(::apply_events( - AggregateState::new(aggregate_id), - events, - )) - } + let aggregate_state = AggregateState::with_id(aggregate_id); + Some(aggregate_state.apply_store_events(store_events, Self::apply_event)) + }) + } + + /// Acquires a lock on this aggregate instance, and only then loads it from the event store, + /// by applying previously persisted events onto the aggregate state by order of their sequence number. + /// + /// The lock is contained in the returned `AggregateState`, and released when this is dropped. + /// It can also be extracted with the `take_lock` method for more advanced uses. + /// + /// You should _avoid_ implementing this function, and be _very_ careful if you decide to do so. + async fn lock_and_load( + &self, + aggregate_id: impl Into + Send, + ) -> Result>, Self::Error> { + let id = aggregate_id.into(); + let guard = self.event_store().lock(id).await?; + + Ok(self.load(id).await?.map(|mut state| { + state.set_lock(guard); + state + })) } /// Transactional persists events in store - recording it in the aggregate instance's history. @@ -160,12 +135,10 @@ pub trait AggregateManager: Aggregate { /// behaviour of policies, e.g. if you want to log something on error. async fn store_events( &self, - aggregate_state: &AggregateState, + aggregate_state: &mut AggregateState, events: Vec, ) -> Result>, Self::Error> { - self.event_store() - .persist(aggregate_state.id, events, aggregate_state.next_sequence_number()) - .await + self.event_store().persist(aggregate_state, events).await } /// `delete` should either complete the aggregate instance, along with all its associated events diff --git a/src/esrs/postgres/store.rs b/src/esrs/postgres/store.rs index 3cd6d011..ae50b178 100644 --- a/src/esrs/postgres/store.rs +++ b/src/esrs/postgres/store.rs @@ -18,7 +18,7 @@ use crate::esrs::policy; use crate::esrs::postgres::projector::Consistency; use crate::esrs::store::{EventStoreLockGuard, UnlockOnDrop}; use crate::types::SequenceNumber; -use crate::{Aggregate, AggregateManager, EventStore, StoreEvent}; +use crate::{Aggregate, AggregateManager, AggregateState, EventStore, StoreEvent}; use super::{event, projector, statement::Statements}; @@ -245,14 +245,16 @@ where async fn persist( &self, - aggregate_id: Uuid, - events: Vec, - starting_sequence_number: SequenceNumber, + aggregate_state: &mut AggregateState<::State>, + events: Vec<::Event>, ) -> Result>, Manager::Error> { let mut transaction: Transaction = self.inner.pool.begin().await?; let occurred_on: DateTime = Utc::now(); let mut store_events: Vec> = vec![]; + let starting_sequence_number = aggregate_state.next_sequence_number(); + let aggregate_id = *aggregate_state.id(); + for (index, event) in (0..).zip(events.into_iter()) { let store_event: StoreEvent<::Event> = self .save_event( @@ -267,8 +269,10 @@ where store_events.push(store_event); } + // Acquiring the list of projectors early, as it is an expensive operation. + let projectors = self.projectors(); for store_event in &store_events { - for projector in self.projectors().iter() { + for projector in projectors.iter() { let span = tracing::trace_span!( "esrs_project_event", event_id = %store_event.id, @@ -292,8 +296,15 @@ where transaction.commit().await?; + // We need to drop the lock on the aggregate state here as: + // 1. the events have already been persisted, hence the DB has the latest aggregate; + // 2. the policies below might need to access this aggregate atomically (causing a deadlock!). + drop(aggregate_state.take_lock()); + + // Acquiring the list of policies early, as it is an expensive operation. + let policies = self.policies(); for store_event in &store_events { - for policy in self.policies().iter() { + for policy in policies.iter() { let span = tracing::info_span!("esrs_apply_policy" , event_id = %store_event.id, aggregate_id = %store_event.aggregate_id, policy = policy.name()); let _policy_result = policy.handle_event(store_event).instrument(span).await; } diff --git a/src/esrs/postgres/tests/mod.rs b/src/esrs/postgres/tests/mod.rs index 5250e90b..7cd1c6e4 100644 --- a/src/esrs/postgres/tests/mod.rs +++ b/src/esrs/postgres/tests/mod.rs @@ -6,7 +6,7 @@ use sqlx::{PgConnection, Pool, Postgres}; use uuid::Uuid; use crate::postgres::{PgStore, Projector}; -use crate::{Aggregate, AggregateManager, EventStore, Policy, StoreEvent}; +use crate::{Aggregate, AggregateManager, AggregateState, EventStore, Policy, StoreEvent}; #[sqlx::test] fn setup_database_test(pool: Pool) { @@ -87,17 +87,18 @@ fn by_aggregate_id_insert_and_delete_by_aggregate_id_test(pool: Pool) fn persist_single_event_test(pool: Pool) { let store: PgStore = PgStore::new(pool.clone()).setup().await.unwrap(); - let event_internal_id: Uuid = Uuid::new_v4(); - let aggregate_id: Uuid = Uuid::new_v4(); + let mut aggregate_state = AggregateState::new(); + let aggregate_id = *aggregate_state.id(); + let event_internal_id: Uuid = Uuid::new_v4(); let store_event: Vec> = - EventStore::persist(&store, aggregate_id, vec![TestEvent { id: event_internal_id }], 0) + EventStore::persist(&store, &mut aggregate_state, vec![TestEvent { id: event_internal_id }]) .await .unwrap(); assert_eq!(store_event[0].aggregate_id, aggregate_id); assert_eq!(store_event[0].payload.id, event_internal_id); - assert_eq!(store_event[0].sequence_number, 0); + assert_eq!(store_event[0].sequence_number, 1); let store_events: Vec> = store.by_aggregate_id(aggregate_id).await.unwrap(); assert_eq!(store_events.len(), 1); @@ -109,13 +110,13 @@ fn persist_multiple_events_test(pool: Pool) { let test_event_1: TestEvent = TestEvent { id: Uuid::new_v4() }; let test_event_2: TestEvent = TestEvent { id: Uuid::new_v4() }; - let aggregate_id: Uuid = Uuid::new_v4(); + let mut aggregate_state = AggregateState::new(); + let aggregate_id = *aggregate_state.id(); let store_event: Vec> = EventStore::persist( &store, - aggregate_id, + &mut aggregate_state, vec![test_event_1.clone(), test_event_2.clone()], - 0, ) .await .unwrap(); @@ -123,10 +124,10 @@ fn persist_multiple_events_test(pool: Pool) { assert_eq!(store_event.len(), 2); assert_eq!(store_event[0].aggregate_id, aggregate_id); assert_eq!(store_event[0].payload.id, test_event_1.id); - assert_eq!(store_event[0].sequence_number, 0); + assert_eq!(store_event[0].sequence_number, 1); assert_eq!(store_event[1].aggregate_id, aggregate_id); assert_eq!(store_event[1].payload.id, test_event_2.id); - assert_eq!(store_event[1].sequence_number, 1); + assert_eq!(store_event[1].sequence_number, 2); let store_events: Vec> = store.by_aggregate_id(aggregate_id).await.unwrap(); assert_eq!(store_events.len(), 2); @@ -143,10 +144,11 @@ fn event_projection_test(pool: Pool) { create_test_projection_table(&pool).await; let event_internal_id: Uuid = Uuid::new_v4(); - let aggregate_id: Uuid = Uuid::new_v4(); + let mut aggregate_state = AggregateState::new(); + let aggregate_id = *aggregate_state.id(); let _store_event: Vec> = - EventStore::persist(&store, aggregate_id, vec![TestEvent { id: event_internal_id }], 0) + EventStore::persist(&store, &mut aggregate_state, vec![TestEvent { id: event_internal_id }]) .await .unwrap(); @@ -171,10 +173,11 @@ fn delete_store_events_and_projections_test(pool: Pool) { create_test_projection_table(&pool).await; let event_internal_id: Uuid = Uuid::new_v4(); - let aggregate_id: Uuid = Uuid::new_v4(); + let mut aggregate_state = AggregateState::new(); + let aggregate_id = *aggregate_state.id(); let _store_event: Vec> = - EventStore::persist(&store, aggregate_id, vec![TestEvent { id: event_internal_id }], 0) + EventStore::persist(&store, &mut aggregate_state, vec![TestEvent { id: event_internal_id }]) .await .unwrap(); @@ -217,10 +220,10 @@ fn policy_test(pool: Pool) { .unwrap(); let event_internal_id: Uuid = Uuid::new_v4(); - let aggregate_id: Uuid = Uuid::new_v4(); + let mut aggregate_state = AggregateState::new(); let _store_event: Vec> = - EventStore::persist(&store, aggregate_id, vec![TestEvent { id: event_internal_id }], 0) + EventStore::persist(&store, &mut aggregate_state, vec![TestEvent { id: event_internal_id }]) .await .unwrap(); diff --git a/src/esrs/state.rs b/src/esrs/state.rs index 5ca9bf1f..15415496 100644 --- a/src/esrs/state.rs +++ b/src/esrs/state.rs @@ -1,51 +1,117 @@ use uuid::Uuid; +use crate::esrs::store::EventStoreLockGuard; +use crate::esrs::store::StoreEvent; use crate::types::SequenceNumber; /// The internal state for an Aggregate. -/// It contains and id representing the aggregate id, an incremental sequence number and a state -/// defined by the user of this library. -#[derive(Clone, Debug)] +/// It contains: +/// - an id uniquely representing the aggregate, +/// - an incremental sequence number, +/// - a lock representing the atomicity of the access to the aggregate, +/// - a state defined by the user of this library. pub struct AggregateState { - pub(crate) id: Uuid, - pub(crate) sequence_number: SequenceNumber, - pub(crate) inner: S, + id: Uuid, + sequence_number: SequenceNumber, + lock: Option, + inner: S, +} + +impl std::fmt::Debug for AggregateState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AggregateState") + .field("id", &self.id) + .field("sequence_number", &self.sequence_number) + .field("lock", &self.lock.is_some()) + .field("inner", &self.inner) + .finish() + } } /// Default implementation for [`AggregateState`] impl Default for AggregateState { fn default() -> Self { - Self::new(Uuid::new_v4()) + Self::new() } } impl AggregateState { - /// Creates a new instance of an [`AggregateState`] with the given aggregate id. The use of this - /// is discouraged being that that aggregate id could be already existing and a clash of ids - /// might happen. + /// Creates a new instance of an [`AggregateState`] with a new unique id. + pub fn new() -> Self { + Self { + id: Uuid::new_v4(), + inner: Default::default(), + sequence_number: 0, + lock: None, + } + } + + /// Creates a new instance of an [`AggregateState`] with the given aggregate id. /// - /// Prefer [Default] implementation. - #[must_use] - pub fn new(id: impl Into) -> Self { + /// This should be used almost exclusively when loading by aggregate id yields nothing, + /// and this becomes the brand new aggregate state for that id. + /// + /// Other uses are strongly discouraged, since the id could be already existing + /// and a clash might happen when persisting events. + pub fn with_id(id: impl Into) -> Self { Self { id: id.into(), inner: Default::default(), sequence_number: 0, + lock: None, } } - /// Returns an Uuid representing the aggregate id + /// Consumes the aggregate state and generates a new one with the events applied to it, + /// as dictaded by `apply_event`. + pub fn apply_store_events(self, store_events: Vec>, apply_event: F) -> Self + where + F: Fn(S, T) -> S, + { + store_events.into_iter().fold(self, |state, store_event| { + let sequence_number = *store_event.sequence_number(); + let inner = apply_event(state.inner, store_event.payload); + + Self { + sequence_number, + inner, + ..state + } + }) + } + + /// Returns an Uuid representing the aggregate id. pub const fn id(&self) -> &Uuid { &self.id } - /// Returns the internal state + /// Returns the internal state. pub const fn inner(&self) -> &S { &self.inner } - /// Returns the internal sequence number incremented by 1. - pub const fn next_sequence_number(&self) -> SequenceNumber { + /// Consumes self and extracts the internal state. + pub fn into_inner(self) -> S { + self.inner + } + + /// Returns the internal sequence number. + pub const fn sequence_number(&self) -> &SequenceNumber { + &self.sequence_number + } + + /// Computes the internal sequence number incremented by 1. + pub fn next_sequence_number(&self) -> SequenceNumber { self.sequence_number + 1 } + + /// Inserts the lock guard into self, replacing any current one. + pub fn set_lock(&mut self, guard: EventStoreLockGuard) { + self.lock = Some(guard); + } + + /// Extracts the lock from self, leaving nothing behind. + pub fn take_lock(&mut self) -> Option { + self.lock.take() + } } diff --git a/src/esrs/store.rs b/src/esrs/store.rs index 93f9ca9d..3748b8f3 100644 --- a/src/esrs/store.rs +++ b/src/esrs/store.rs @@ -5,12 +5,12 @@ use chrono::{DateTime, Utc}; use uuid::Uuid; use crate::types::SequenceNumber; -use crate::{Aggregate, AggregateManager}; +use crate::{Aggregate, AggregateManager, AggregateState}; /// Marker trait for every EventStoreLockGuard. /// /// Implementors should unlock concurrent access to the guarded resource, when dropped. -pub trait UnlockOnDrop: Send + 'static {} +pub trait UnlockOnDrop: Send + Sync + 'static {} /// Lock guard preventing concurrent access to a resource. /// @@ -50,9 +50,8 @@ pub trait EventStore { /// Persisting events may additionally trigger configured Projectors. async fn persist( &self, - aggregate_id: Uuid, + aggregate_state: &mut AggregateState<::State>, events: Vec<::Event>, - starting_sequence_number: SequenceNumber, ) -> Result::Event>>, ::Error>; /// Delete all events from events store related to given `aggregate_id`. @@ -108,13 +107,10 @@ where async fn persist( &self, - aggregate_id: Uuid, + aggregate_state: &mut AggregateState<::State>, events: Vec<::Event>, - starting_sequence_number: SequenceNumber, ) -> Result::Event>>, ::Error> { - self.deref() - .persist(aggregate_id, events, starting_sequence_number) - .await + self.deref().persist(aggregate_state, events).await } async fn delete(&self, aggregate_id: Uuid) -> Result<(), ::Error> { @@ -138,8 +134,8 @@ pub struct StoreEvent { } impl StoreEvent { - pub const fn sequence_number(&self) -> SequenceNumber { - self.sequence_number + pub const fn sequence_number(&self) -> &SequenceNumber { + &self.sequence_number } pub const fn payload(&self) -> &Event { diff --git a/src/esrs/tests/mod.rs b/src/esrs/tests/mod.rs index 4abad095..152fe1d3 100644 --- a/src/esrs/tests/mod.rs +++ b/src/esrs/tests/mod.rs @@ -1,7 +1,6 @@ use std::fmt::{Display, Formatter}; use sqlx::{Pool, Postgres}; -use uuid::Uuid; use crate::postgres::PgStore; use crate::{Aggregate, AggregateManager, AggregateState}; @@ -9,53 +8,55 @@ use crate::{Aggregate, AggregateManager, AggregateState}; #[sqlx::test] fn handle_command_test(pool: Pool) { let aggregate: TestAggregate = TestAggregate::new(&pool).await; - let aggregate_state: AggregateState = AggregateState::new(Uuid::new_v4()); + let aggregate_state: AggregateState = AggregateState::new(); + let aggregate_id = *aggregate_state.id(); - let aggregate_state: AggregateState = aggregate + aggregate .handle_command(aggregate_state, TestCommand::Single) .await .unwrap(); + + let aggregate_state = aggregate.lock_and_load(aggregate_id).await.unwrap().unwrap(); assert_eq!(aggregate_state.inner().count, 2); - assert_eq!(aggregate_state.sequence_number, 1); + assert_eq!(aggregate_state.sequence_number(), &1); - let aggregate_state: AggregateState = aggregate + aggregate .handle_command(aggregate_state, TestCommand::Single) .await .unwrap(); + + let aggregate_state = aggregate.lock_and_load(aggregate_id).await.unwrap().unwrap(); assert_eq!(aggregate_state.inner().count, 3); - assert_eq!(aggregate_state.sequence_number, 2); + assert_eq!(aggregate_state.sequence_number(), &2); - let aggregate_state: AggregateState = aggregate + aggregate .handle_command(aggregate_state, TestCommand::Multi) .await .unwrap(); + + let aggregate_state = aggregate.lock_and_load(aggregate_id).await.unwrap().unwrap(); assert_eq!(aggregate_state.inner().count, 5); - assert_eq!(aggregate_state.sequence_number, 4); + assert_eq!(aggregate_state.sequence_number(), &4); } #[sqlx::test] fn load_aggregate_state_test(pool: Pool) { let aggregate: TestAggregate = TestAggregate::new(&pool).await; - let initial_aggregate_state: AggregateState = AggregateState::new(Uuid::new_v4()); + let initial_aggregate_state: AggregateState = AggregateState::new(); - let aggregate_state: AggregateState = aggregate - .handle_command(initial_aggregate_state.clone(), TestCommand::Multi) + let initial_id = *initial_aggregate_state.id(); + let initial_sequence_number = *initial_aggregate_state.sequence_number(); + let initial_count = initial_aggregate_state.inner().count; + + aggregate + .handle_command(initial_aggregate_state, TestCommand::Multi) .await .unwrap(); - assert_eq!(initial_aggregate_state.id(), aggregate_state.id()); - assert_eq!( - initial_aggregate_state.sequence_number + 2, - aggregate_state.sequence_number - ); - assert_eq!(initial_aggregate_state.inner.count + 2, aggregate_state.inner.count); - - let loaded_aggregate_state: AggregateState = - aggregate.load(*initial_aggregate_state.id()).await.unwrap(); - - assert_eq!(aggregate_state.id, loaded_aggregate_state.id); - assert_eq!(aggregate_state.sequence_number, loaded_aggregate_state.sequence_number); - assert_eq!(aggregate_state.inner.count, loaded_aggregate_state.inner.count); + let aggregate_state = aggregate.lock_and_load(initial_id).await.unwrap().unwrap(); + assert_eq!(&initial_id, aggregate_state.id()); + assert_eq!(initial_sequence_number + 2, *aggregate_state.sequence_number()); + assert_eq!(initial_count + 2, aggregate_state.inner().count); } struct TestAggregate { diff --git a/src/lib.rs b/src/lib.rs index d7d64aa3..9a3c82bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,7 @@ pub use crate::esrs::aggregate::{Aggregate, AggregateManager}; pub use crate::esrs::policy::Policy; pub use crate::esrs::state::AggregateState; -pub use crate::esrs::store::{EventStore, StoreEvent}; +pub use crate::esrs::store::{EventStore, EventStoreLockGuard, StoreEvent, UnlockOnDrop}; mod esrs;