Skip to content

Commit

Permalink
Some docs; changelog and remove dead code (#109)
Browse files Browse the repository at this point in the history
* Some docs; changelog and remove dead code

* Remove unused imports

* resolve some clippy::pedantics checks
  • Loading branch information
cottinisimone authored Sep 29, 2022
1 parent 0547676 commit eff556f
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 46 deletions.
1 change: 1 addition & 0 deletions .clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
msrv = "1.58.0"
11 changes: 9 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

Note: this version contains hard breaking changes and may take a lot of time in order to upgrade library version!
Refer to: [#107]
Refer to: [#107], [#108] and [#109]

### Added

- `AggregateManager`
- should implement `name` function that act as `Identifier`. Be sure to not change the name previously set in
`Aggregate::name` function. This would cause the store to create a new table, losing pre-migration events.
`Identifier::name` function. This would cause the store to create a new table, losing pre-migration events.
- depends on `Aggregate`, so user must implement `Aggregate` trait in order to implement `AggregateManager` trait.
- should implement `EventStore` associated type.

- `EventStore::delete` function with which an entire aggregate could be deleted by `aggregate_id`.

- `PgStore`
- `setup` function to create table and indexes if not exists. This function should be used only once at your
application startup. It tries to create the event table and its indexes if they not exist.
Expand Down Expand Up @@ -80,6 +83,7 @@ Refer to: [#107]
- `EventStore`
- `run_policies`. To customize the way policies behave override `Aggregate::store_events` using
`EventStore::persist` function.
- `close` function.

- `PgStore`
- `test` function. Use `#[sqlx::test]` in your tests to test the store.
Expand Down Expand Up @@ -110,4 +114,7 @@ Refer to: [#107]

[Unreleased]: https://github.com/primait/event_sourcing.rs/compare/0.6.2...HEAD
[0.6.2]: https://github.com/primait/event_sourcing.rs/compare/0.6.1...0.6.2

[#107]: https://github.com/primait/event_sourcing.rs/pull/107
[#108]: https://github.com/primait/event_sourcing.rs/pull/108
[#109]: https://github.com/primait/event_sourcing.rs/pull/109
27 changes: 0 additions & 27 deletions examples/rebuilder/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use std::convert::TryInto;
use std::fmt::Debug;

use chrono::{DateTime, Utc};
use futures_util::stream::StreamExt;
use serde_json::Value;
use sqlx::migrate::MigrateDatabase;
use sqlx::{pool::PoolOptions, Pool, Postgres, Transaction};
use uuid::Uuid;

use esrs::postgres::Projector;
use esrs::types::SequenceNumber;
use esrs::{Aggregate, AggregateManager, AggregateState, EventStore, StoreEvent};
use simple_projection::aggregate::CounterAggregate;
use simple_projection::projector::{Counter, CounterProjector};
Expand Down Expand Up @@ -142,26 +138,3 @@ async fn main() {

assert!(res.counter_id == count_id && res.count == 3);
}

#[derive(sqlx::FromRow, serde::Serialize, serde::Deserialize, Debug)]
pub struct Event {
pub id: Uuid,
pub aggregate_id: Uuid,
pub payload: Value,
pub occurred_on: DateTime<Utc>,
pub sequence_number: SequenceNumber,
}

impl<E: serde::de::DeserializeOwned> TryInto<StoreEvent<E>> for Event {
type Error = serde_json::Error;

fn try_into(self) -> Result<StoreEvent<E>, Self::Error> {
Ok(StoreEvent {
id: self.id,
aggregate_id: self.aggregate_id,
payload: serde_json::from_value::<E>(self.payload)?,
occurred_on: self.occurred_on,
sequence_number: self.sequence_number,
})
}
}
7 changes: 6 additions & 1 deletion src/esrs/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ pub trait Aggregate {
type Error;

/// Handles, validate a command and emits events.
///
/// # Errors
///
/// Will return `Err` if the user of this library set up command validations. Every error here
/// could be just a "domain error". No technical errors.
fn handle_command(state: &Self::State, command: Self::Command) -> Result<Vec<Self::Event>, Self::Error>;

/// Updates the aggregate state using the new event. This assumes that the event can be correctly applied
Expand Down Expand Up @@ -91,8 +96,8 @@ pub trait AggregateManager: Aggregate {
);

AggregateState {
inner,
sequence_number,
inner,
..aggregate_state
}
}
Expand Down
41 changes: 28 additions & 13 deletions src/esrs/postgres/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use uuid::Uuid;

use crate::esrs::policy;
use crate::types::SequenceNumber;
use crate::{AggregateManager, EventStore, StoreEvent};
use crate::{Aggregate, AggregateManager, EventStore, StoreEvent};

use super::{event, projector, statement::Statements};

Expand Down Expand Up @@ -48,6 +48,7 @@ where
Manager::Error: From<sqlx::Error> + From<serde_json::Error> + std::error::Error,
{
/// Creates a new implementation of an aggregate
#[must_use]
pub fn new(pool: Pool<Postgres>) -> Self {
let inner: InnerPgStore<Manager> = InnerPgStore {
pool,
Expand Down Expand Up @@ -78,6 +79,10 @@ where
///
/// This function should be used only once at your application startup. It tries to create the
/// event table and its indexes if they not exist.
///
/// # Errors
///
/// Will return an `Err` if there's an error connecting with database or creating tables/indexes.
pub async fn setup(self) -> Result<Self, Manager::Error> {
let mut transaction: Transaction<Postgres> = self.inner.pool.begin().await?;

Expand All @@ -102,6 +107,10 @@ where
}

/// Save an event in the event store and return a new `StoreEvent` instance.
///
/// # Errors
///
/// Will return an `Err` if the insert of the values into the database fails.
pub async fn save_event(
&self,
aggregate_id: Uuid,
Expand Down Expand Up @@ -161,9 +170,14 @@ where
///
/// An example of how to use this function is in `examples/customize_persistence_flow` example
/// folder.
pub async fn persist<'a, F: Send, T>(&'a self, fun: F) -> Result<Vec<StoreEvent<Manager::Event>>, Manager::Error>
///
/// # Errors
///
/// Will return an `Err` if the given `fun` returns an `Err`. In the `EventStore` implementation
/// for `PgStore` this function return an `Err` if the event insertion or its projection fails.
pub async fn persist<'a, F, T>(&'a self, fun: F) -> Result<Vec<StoreEvent<Manager::Event>>, Manager::Error>
where
F: FnOnce(&'a Pool<Postgres>) -> T,
F: Send + FnOnce(&'a Pool<Postgres>) -> T,
T: Future<Output = Result<Vec<StoreEvent<Manager::Event>>, Manager::Error>> + Send,
{
fun(&self.inner.pool).await
Expand Down Expand Up @@ -203,30 +217,31 @@ where
let occurred_on: DateTime<Utc> = Utc::now();
let mut store_events: Vec<StoreEvent<Manager::Event>> = vec![];

for (index, event) in events.into_iter().enumerate() {
store_events.push(
self.save_event(
for (index, event) in (0..).zip(events.into_iter()) {
let store_event: StoreEvent<<Manager as Aggregate>::Event> = self
.save_event(
aggregate_id,
event,
occurred_on,
starting_sequence_number + index as i32,
starting_sequence_number + index,
&mut *transaction,
)
.await?,
)
.await?;

store_events.push(store_event);
}

for store_event in store_events.iter() {
for store_event in &store_events {
for projector in self.projectors().iter() {
projector.project(store_event, &mut transaction).await?;
}
}

transaction.commit().await?;

for store_event in store_events.iter() {
for store_event in &store_events {
for policy in self.policies().iter() {
let _ = policy.handle_event(store_event).await;
let _policy_result = policy.handle_event(store_event).await;
}
}

Expand All @@ -243,7 +258,7 @@ where
.map(|_| ())?;

for projector in self.projectors().iter() {
projector.delete(aggregate_id, &mut *transaction).await?;
projector.delete(aggregate_id, &mut transaction).await?;
}

transaction.commit().await?;
Expand Down
4 changes: 2 additions & 2 deletions src/esrs/postgres/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ fn delete_store_events_and_projections_test(pool: Pool<Postgres>) {

#[sqlx::test]
fn policy_test(pool: Pool<Postgres>) {
let last_id: Arc<Mutex<Uuid>> = Arc::new(Mutex::new(Default::default()));
let last_id: Arc<Mutex<Uuid>> = Arc::new(Mutex::new(Uuid::default()));
let policy: Box<TestPolicy> = Box::new(TestPolicy {
last_id: last_id.clone(),
});
Expand All @@ -225,7 +225,7 @@ fn policy_test(pool: Pool<Postgres>) {
.unwrap();

let guard: MutexGuard<Uuid> = last_id.lock().unwrap();
assert_eq!(*guard, event_internal_id)
assert_eq!(*guard, event_internal_id);
}

async fn create_test_projection_table(pool: &Pool<Postgres>) {
Expand Down
12 changes: 12 additions & 0 deletions src/esrs/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,29 @@ use uuid::Uuid;

use crate::types::SequenceNumber;

/// The internal state for an Aggregate.
/// It contains and id representing the aggregate id, an incremental sequence number and a state
/// defined by the user of this library.
#[derive(Clone)]
pub struct AggregateState<S> {
pub(crate) id: Uuid,
pub(crate) sequence_number: SequenceNumber,
pub(crate) inner: S,
}

/// Default implementation for [`AggregateState`]
impl<S: Default> Default for AggregateState<S> {
fn default() -> Self {
Self::new(Uuid::new_v4())
}
}

impl<S: Default> AggregateState<S> {
/// Creates a new instance of an [`AggregateState`] with the given aggregate id. The use of this
/// is discouraged being that that aggregate id could be already existing and a clash of ids
/// might happen.
///
/// Prefer [Default] implementation.
#[must_use]
pub fn new(id: Uuid) -> Self {
Self {
Expand All @@ -25,14 +34,17 @@ impl<S: Default> AggregateState<S> {
}
}

/// Returns an Uuid representing the aggregate id
pub const fn id(&self) -> &Uuid {
&self.id
}

/// Returns the internal state
pub const fn inner(&self) -> &S {
&self.inner
}

/// Returns the internal sequence number incremented by 1.
pub const fn next_sequence_number(&self) -> SequenceNumber {
self.sequence_number + 1
}
Expand Down
2 changes: 1 addition & 1 deletion src/esrs/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub trait EventStore {
async fn delete(&self, aggregate_id: Uuid) -> Result<(), <Self::Manager as Aggregate>::Error>;
}

/// A StoreEvent contains the payload (the original event) alongside the event's metadata.
/// A `StoreEvent` contains the payload (the original event) alongside the event's metadata.
pub struct StoreEvent<Event> {
/// Uniquely identifies an event among all events emitted from all aggregates.
pub id: Uuid,
Expand Down

0 comments on commit eff556f

Please sign in to comment.