Skip to content

Commit

Permalink
[PG-172]: transactionally emit/persist multiple events from handling…
Browse files Browse the repository at this point in the history
… command (#78)
  • Loading branch information
emturner authored Dec 22, 2021
1 parent b380dcf commit 8758384
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 120 deletions.
4 changes: 2 additions & 2 deletions examples/postgres_payments/src/bank_account/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ impl AggregateManager for BankAccountAggregate {
) -> Result<AggregateState<Self::State>, Self::Error> {
match cmd {
BankAccountCommand::Withdraw { amount } => {
self.persist(aggregate_state, BankAccountEvent::Withdrawn { amount })
self.persist(aggregate_state, vec![BankAccountEvent::Withdrawn { amount }])
.await
}
BankAccountCommand::Deposit { amount } => {
self.persist(aggregate_state, BankAccountEvent::Deposited { amount })
self.persist(aggregate_state, vec![BankAccountEvent::Deposited { amount }])
.await
}
}
Expand Down
10 changes: 7 additions & 3 deletions examples/postgres_payments/src/credit_card/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,14 @@ impl Aggregate for CreditCardAggregate {
}
}

fn handle_command(&self, _aggregate_state: &AggregateState<CreditCardState>, cmd: Self::Command) -> Self::Event {
fn handle_command(
&self,
_aggregate_state: &AggregateState<CreditCardState>,
cmd: Self::Command,
) -> Vec<Self::Event> {
match cmd {
CreditCardCommand::Pay { amount } => CreditCardEvent::Payed { amount },
CreditCardCommand::Refund { amount } => CreditCardEvent::Refunded { amount },
CreditCardCommand::Pay { amount } => vec![CreditCardEvent::Payed { amount }],
CreditCardCommand::Refund { amount } => vec![CreditCardEvent::Refunded { amount }],
}
}
}
10 changes: 7 additions & 3 deletions examples/sqlite_payments/src/bank_account/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,14 @@ impl Aggregate for BankAccountAggregate {
}
}

fn handle_command(&self, _aggregate_state: &AggregateState<BankAccountState>, cmd: Self::Command) -> Self::Event {
fn handle_command(
&self,
_aggregate_state: &AggregateState<BankAccountState>,
cmd: Self::Command,
) -> Vec<Self::Event> {
match cmd {
BankAccountCommand::Withdraw { amount } => BankAccountEvent::Withdrawn { amount },
BankAccountCommand::Deposit { amount } => BankAccountEvent::Deposited { amount },
BankAccountCommand::Withdraw { amount } => vec![BankAccountEvent::Withdrawn { amount }],
BankAccountCommand::Deposit { amount } => vec![BankAccountEvent::Deposited { amount }],
}
}
}
7 changes: 5 additions & 2 deletions examples/sqlite_payments/src/credit_card/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,12 @@ impl AggregateManager for CreditCardAggregate {
cmd: Self::Command,
) -> Result<AggregateState<Self::State>, Self::Error> {
match cmd {
CreditCardCommand::Pay { amount } => self.persist(aggregate_state, CreditCardEvent::Payed { amount }).await,
CreditCardCommand::Pay { amount } => {
self.persist(aggregate_state, vec![CreditCardEvent::Payed { amount }])
.await
}
CreditCardCommand::Refund { amount } => {
self.persist(aggregate_state, CreditCardEvent::Refunded { amount })
self.persist(aggregate_state, vec![CreditCardEvent::Refunded { amount }])
.await
}
}
Expand Down
38 changes: 15 additions & 23 deletions src/esrs/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,22 @@ pub trait AggregateManager: Identifier {
}

/// Responsible for applying events in order onto the aggregate state, and incrementing the sequence number.
/// You should avoid implenting this method, and be _very_ careful if you decide to do so.
/// You should avoid implementing this method, and be _very_ careful if you decide to do so.
///
/// `events` will be passed in order of ascending sequence number.
fn apply_events(
aggregate_state: AggregateState<Self::State>,
events: Vec<StoreEvent<Self::Event>>,
) -> AggregateState<Self::State> {
let mut max_seq_number: SequenceNumber = 0;

let aggregate_id: &Uuid = &aggregate_state.id;
let inner: Self::State = events.iter().fold(
aggregate_state.inner,
|acc: Self::State, event: &StoreEvent<Self::Event>| {
if event.sequence_number() > max_seq_number {
max_seq_number = event.sequence_number()
}
Self::apply_event(aggregate_id, acc, event)
},
|acc: Self::State, event: &StoreEvent<Self::Event>| Self::apply_event(aggregate_id, acc, event),
);

AggregateState {
inner,
sequence_number: max_seq_number,
sequence_number: events.last().map_or(0, |e| e.sequence_number()),
..aggregate_state
}
}
Expand All @@ -120,18 +115,15 @@ pub trait AggregateManager: Identifier {
async fn persist(
&self,
aggregate_state: AggregateState<Self::State>,
event: Self::Event,
events: Vec<Self::Event>,
) -> Result<AggregateState<Self::State>, Self::Error> {
let next_sequence_number: SequenceNumber = aggregate_state.sequence_number + 1;
Ok(self
let events = self
.event_store()
.persist(aggregate_state.id, event, next_sequence_number)
.await
.map(|event| AggregateState {
inner: Self::apply_event(&aggregate_state.id, aggregate_state.inner, &event),
sequence_number: next_sequence_number,
..aggregate_state
})?)
.persist(aggregate_state.id, events, next_sequence_number)
.await?;

Ok(Self::apply_events(aggregate_state, events))
}
}

Expand Down Expand Up @@ -163,8 +155,8 @@ pub trait Aggregate {
/// if validation succeeds.
fn validate_command(aggregate_state: &AggregateState<Self::State>, cmd: &Self::Command) -> Result<(), Self::Error>;

/// Handles a validated command, and emits a single event.
fn handle_command(&self, aggregate_state: &AggregateState<Self::State>, cmd: Self::Command) -> Self::Event;
/// Handles a validated command, and emits events.
fn handle_command(&self, aggregate_state: &AggregateState<Self::State>, cmd: Self::Command) -> Vec<Self::Event>;
}

#[async_trait]
Expand All @@ -191,8 +183,8 @@ impl<T: Aggregate + Sync + Identifier> AggregateManager for T {
aggregate_state: AggregateState<Self::State>,
cmd: Self::Command,
) -> Result<AggregateState<T::State>, T::Error> {
let event = Aggregate::handle_command(self, &aggregate_state, cmd);
AggregateManager::persist(self, aggregate_state, event).await
let events = Aggregate::handle_command(self, &aggregate_state, cmd);
AggregateManager::persist(self, aggregate_state, events).await
}

async fn handle_command(
Expand Down
99 changes: 56 additions & 43 deletions src/esrs/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use futures::TryStreamExt;
use serde::de::DeserializeOwned;
use serde::Serialize;
use sqlx::pool::{PoolConnection, PoolOptions};
use sqlx::postgres::PgQueryResult;
use sqlx::{Pool, Postgres};
use uuid::Uuid;

Expand Down Expand Up @@ -179,55 +178,66 @@ impl<
async fn persist(
&self,
aggregate_id: Uuid,
event: Evt,
sequence_number: SequenceNumber,
) -> Result<StoreEvent<Evt>, Err> {
events: Vec<Evt>,
starting_sequence_number: SequenceNumber,
) -> Result<Vec<StoreEvent<Evt>>, Err> {
let mut connection: PoolConnection<Postgres> = self.begin().await?;

let event_id: Uuid = Uuid::new_v4();
let occurred_on: DateTime<Utc> = Utc::now();
let store_event_result: Result<PgQueryResult, Err> = sqlx::query(self.queries.insert())
.bind(event_id)
.bind(aggregate_id)
.bind(Json(&event))
.bind(occurred_on)
.bind(sequence_number)
.execute(&mut *connection)
.await
.map_err(|error| error.into());

let rebuild_result: Result<StoreEvent<Evt>, Err> = match store_event_result {
Ok(_) => {
let store_event: StoreEvent<Evt> = StoreEvent {
id: event_id,
aggregate_id,
payload: event,
occurred_on,
sequence_number,
};

self.project_event(&store_event, &mut connection)
.await
.map(|()| store_event)

let events: Vec<_> = events
.into_iter()
.map(|e| (Uuid::new_v4(), e))
.zip(starting_sequence_number..)
.collect();

for ((event_id, event), sequence_number) in events.iter() {
let result = sqlx::query(self.queries.insert())
.bind(event_id)
.bind(aggregate_id)
.bind(Json(event))
.bind(occurred_on)
.bind(sequence_number)
.execute(&mut *connection)
.await;

if let Err(err) = result {
self.rollback(connection).await?;
return Err(err.into());
}
Err(error) => Err(error),
};
}

match rebuild_result {
Ok(event) => {
self.commit(connection).await?;
let store_events: Vec<_> = events
.into_iter()
.map(|((event_id, event), sequence_number)| StoreEvent {
id: event_id,
aggregate_id,
payload: event,
occurred_on,
sequence_number,
})
.collect();

for store_event in store_events.iter() {
let result = self.project_event(store_event, &mut connection).await;

if let Err(err) = result {
self.rollback(connection).await?;
return Err(err);
}
}

for policy in &self.policies {
policy.handle_event(&event, &self.pool).await?
}
self.commit(connection).await?;

Ok(event)
}
Err(err) => {
self.rollback(connection).await?;
Err(err)
// REVIEW: This implies that potentially half of the policies would trigger, then one fails, and the rest wouldn't.
// potentially we should be returning some other kind of error, that includes the errors from any failed policies?
for policy in &self.policies {
for store_event in store_events.iter() {
policy.handle_event(store_event, &self.pool).await?
}
}

Ok(store_events)
}

async fn close(&self) {
Expand Down Expand Up @@ -358,8 +368,11 @@ mod tests {
async fn persist(database_url: &str) {
let aggregate_id: Uuid = Uuid::new_v4();
let test_store: PgStore<String, Error> = PgStore::test::<Hello>(database_url, vec![], vec![]).await.unwrap();
let _ = test_store.persist(aggregate_id, "hello".to_string(), 0).await.unwrap();
let _ = test_store
.persist(aggregate_id, vec!["hello".to_string(), "goodbye".to_string()], 0)
.await
.unwrap();
let list = test_store.by_aggregate_id(aggregate_id).await.unwrap();
assert_eq!(list.len(), 1);
assert_eq!(list.len(), 2);
}
}
92 changes: 51 additions & 41 deletions src/esrs/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use futures::TryStreamExt;
use serde::de::DeserializeOwned;
use serde::Serialize;
use sqlx::pool::{PoolConnection, PoolOptions};
use sqlx::sqlite::SqliteQueryResult;
use sqlx::{Pool, Sqlite};
use uuid::Uuid;

Expand Down Expand Up @@ -174,55 +173,66 @@ impl<
async fn persist(
&self,
aggregate_id: Uuid,
event: Evt,
sequence_number: SequenceNumber,
) -> Result<StoreEvent<Evt>, Err> {
events: Vec<Evt>,
starting_sequence_number: SequenceNumber,
) -> Result<Vec<StoreEvent<Evt>>, Err> {
let mut connection: PoolConnection<Sqlite> = self.begin().await?;

let event_id: Uuid = Uuid::new_v4();
let occurred_on: DateTime<Utc> = Utc::now();
let store_event_result: Result<SqliteQueryResult, Err> = sqlx::query(self.queries.insert())
.bind(event_id)
.bind(aggregate_id)
.bind(Json(&event))
.bind(occurred_on)
.bind(sequence_number)
.execute(&mut *connection)
.await
.map_err(|error| error.into());

let rebuild_result: Result<StoreEvent<Evt>, Err> = match store_event_result {
Ok(_) => {
let store_event: StoreEvent<Evt> = StoreEvent {
id: event_id,
aggregate_id,
payload: event,
occurred_on,
sequence_number,
};

self.project_event(&store_event, &mut connection)
.await
.map(|()| store_event)

let events: Vec<_> = events
.into_iter()
.map(|e| (Uuid::new_v4(), e))
.zip(starting_sequence_number..)
.collect();

for ((event_id, event), sequence_number) in events.iter() {
let result = sqlx::query(self.queries.insert())
.bind(event_id)
.bind(aggregate_id)
.bind(Json(event))
.bind(occurred_on)
.bind(sequence_number)
.execute(&mut *connection)
.await;

if let Err(err) = result {
self.rollback(connection).await?;
return Err(err.into());
}
Err(error) => Err(error),
};
}

match rebuild_result {
Ok(event) => {
self.commit(connection).await?;
let store_events: Vec<_> = events
.into_iter()
.map(|((event_id, event), sequence_number)| StoreEvent {
id: event_id,
aggregate_id,
payload: event,
occurred_on,
sequence_number,
})
.collect();

for store_event in store_events.iter() {
let result = self.project_event(store_event, &mut connection).await;

if let Err(err) = result {
self.rollback(connection).await?;
return Err(err);
}
}

for policy in &self.policies {
policy.handle_event(&event, &self.pool).await?
}
self.commit(connection).await?;

Ok(event)
}
Err(err) => {
self.rollback(connection).await?;
Err(err)
// REVIEW: This implies that potentially half of the policies would trigger, then one fails, and the rest wouldn't.
// potentially we should be returning some other kind of error, that includes the errors from any failed policies?
for policy in &self.policies {
for store_event in store_events.iter() {
policy.handle_event(store_event, &self.pool).await?
}
}

Ok(store_events)
}

async fn close(&self) {
Expand Down
Loading

0 comments on commit 8758384

Please sign in to comment.