diff --git a/Core.Marten/Extensions/DocumentSessionExtensions.cs b/Core.Marten/Extensions/DocumentSessionExtensions.cs index 5e7de1632..ad7154699 100644 --- a/Core.Marten/Extensions/DocumentSessionExtensions.cs +++ b/Core.Marten/Extensions/DocumentSessionExtensions.cs @@ -1,3 +1,4 @@ +using Core.ProcessManagers; using Core.Structures; using Marten; @@ -16,6 +17,17 @@ CancellationToken ct return documentSession.SaveChangesAsync(token: ct); } + public static Task Add( + this IDocumentSession documentSession, + string id, + object @event, + CancellationToken ct + ) where T : class + { + documentSession.Events.StartStream(id, @event); + return documentSession.SaveChangesAsync(token: ct); + } + public static Task GetAndUpdate( this IDocumentSession documentSession, Guid id, int version, @@ -25,6 +37,55 @@ CancellationToken ct documentSession.Events.WriteToAggregate(id, version, stream => stream.AppendOne(handle(stream.Aggregate)), ct); + public static Task GetAndUpdate( + this IDocumentSession documentSession, + string id, + int version, + Func> handle, + CancellationToken ct + ) where T : class => + documentSession.Events.WriteToAggregate(id, version, stream => + { + var messages = handle(stream.Aggregate); + + foreach (var message in messages) + { + message.Switch( + stream.AppendOne, + command => documentSession.Events.Append($"commands-{id}", command) + ); + } + }, ct); + + + public static Task GetAndUpdate( + this IDocumentSession documentSession, + string id, + Func> handle, + CancellationToken ct + ) where T : class => + documentSession.Events.WriteToAggregate(id, stream => + { + var messages = handle(stream.Aggregate); + + foreach (var message in messages) + { + message.Switch( + stream.AppendOne, + command => documentSession.Events.Append($"commands-{id}", command) + ); + } + }, ct); + + public static Task GetAndUpdate( + this IDocumentSession documentSession, + string id, + Func> handle, + CancellationToken ct + ) where T : class => + documentSession.Events.WriteToAggregate(id, + stream => stream.AppendMany(handle(stream.Aggregate)), ct); + public static Task GetAndUpdate( this IDocumentSession documentSession, Guid id, diff --git a/Core/ProcessManagers/IProcessManager.cs b/Core/ProcessManagers/IProcessManager.cs index 6bac30c6c..a7f78e401 100644 --- a/Core/ProcessManagers/IProcessManager.cs +++ b/Core/ProcessManagers/IProcessManager.cs @@ -14,17 +14,3 @@ public interface IProcessManager: IProjection EventOrCommand[] DequeuePendingMessages(); } - -public class EventOrCommand: Either -{ - public static EventOrCommand Event(object @event) => - new(Maybe.Of(@event), Maybe.Empty); - - - public static EventOrCommand Command(object @event) => - new(Maybe.Empty, Maybe.Of(@event)); - - private EventOrCommand(Maybe left, Maybe right): base(left, right) - { - } -} diff --git a/Core/ProcessManagers/ProcessManager.cs b/Core/ProcessManagers/ProcessManager.cs index 091fcc378..a344d32f8 100644 --- a/Core/ProcessManagers/ProcessManager.cs +++ b/Core/ProcessManagers/ProcessManager.cs @@ -1,3 +1,5 @@ +using Core.Structures; + namespace Core.ProcessManagers; public abstract class ProcessManager: ProcessManager, IProcessManager diff --git a/Core/Structures/EventOrCommand.cs b/Core/Structures/EventOrCommand.cs new file mode 100644 index 000000000..fc67fbc6c --- /dev/null +++ b/Core/Structures/EventOrCommand.cs @@ -0,0 +1,20 @@ +namespace Core.Structures; + +public class EventOrCommand: Either +{ + public static EventOrCommand Event(object @event) => + new(Maybe.Of(@event), Maybe.Empty); + + public static IEnumerable Events(params object[] events) => + events.Select(Event); + + public static IEnumerable Events(IEnumerable events) => + events.Select(Event); + + public static EventOrCommand Command(object @event) => + new(Maybe.Empty, Maybe.Of(@event)); + + private EventOrCommand(Maybe left, Maybe right): base(left, right) + { + } +} diff --git a/Sample/HotelManagement/HotelManagement/Choreography/GroupCheckouts/GroupCheckout.cs b/Sample/HotelManagement/HotelManagement/Choreography/GroupCheckouts/GroupCheckout.cs index e380b7ca6..8a92206e0 100644 --- a/Sample/HotelManagement/HotelManagement/Choreography/GroupCheckouts/GroupCheckout.cs +++ b/Sample/HotelManagement/HotelManagement/Choreography/GroupCheckouts/GroupCheckout.cs @@ -4,7 +4,7 @@ namespace HotelManagement.Choreography.GroupCheckouts; public record GroupCheckoutInitiated( - Guid GroupCheckoutId, + Guid GroupCheckOutId, Guid ClerkId, Guid[] GuestStayIds, DateTimeOffset InitiatedAt @@ -55,48 +55,50 @@ DateTimeOffset initiatedAt ) => new GroupCheckoutInitiated(groupCheckoutId, clerkId, guestStayIds, initiatedAt); - public Maybe RecordGuestCheckoutsInitiation( + public GuestCheckoutsInitiated? RecordGuestCheckoutsInitiation( Guid[] initiatedGuestStayIds, DateTimeOffset now - ) => - Maybe.If( - Status == CheckoutStatus.Initiated, - () => new GuestCheckoutsInitiated(Id, initiatedGuestStayIds, now) - ); + ) + { + if (Status == CheckoutStatus.Initiated) + return null; - public Maybe RecordGuestCheckoutCompletion( + return new GuestCheckoutsInitiated(Id, initiatedGuestStayIds, now); + } + + public object[] RecordGuestCheckoutCompletion( Guid guestStayId, DateTimeOffset now - ) => - Maybe.If( - Status == CheckoutStatus.Initiated && GuestStayCheckouts[guestStayId] != CheckoutStatus.Completed, - () => - { - var guestCheckoutCompleted = new GuestCheckoutCompleted(Id, guestStayId, now); + ) + { + if (Status == CheckoutStatus.Initiated && GuestStayCheckouts[guestStayId] != CheckoutStatus.Completed) + return Array.Empty(); + + var guestCheckoutCompleted = new GuestCheckoutCompleted(Id, guestStayId, now); - var guestStayCheckouts = GuestStayCheckouts.With(guestStayId, CheckoutStatus.Completed); + var guestStayCheckouts = GuestStayCheckouts.With(guestStayId, CheckoutStatus.Completed); - return AreAnyOngoingCheckouts(guestStayCheckouts) - ? new object[] { guestCheckoutCompleted } - : new[] { guestCheckoutCompleted, Finalize(guestStayCheckouts, now) }; - }); + return AreAnyOngoingCheckouts(guestStayCheckouts) + ? new object[] { guestCheckoutCompleted } + : new[] { guestCheckoutCompleted, Finalize(guestStayCheckouts, now) }; + } - public Maybe RecordGuestCheckoutFailure( + public object[] RecordGuestCheckoutFailure( Guid guestStayId, DateTimeOffset now - ) => - Maybe.If( - Status == CheckoutStatus.Initiated && GuestStayCheckouts[guestStayId] != CheckoutStatus.Failed, - () => - { - var guestCheckoutFailed = new GuestCheckoutFailed(Id, guestStayId, now); + ) + { + if(Status == CheckoutStatus.Initiated && GuestStayCheckouts[guestStayId] != CheckoutStatus.Failed) + return Array.Empty(); + + var guestCheckoutFailed = new GuestCheckoutFailed(Id, guestStayId, now); - var guestStayCheckouts = GuestStayCheckouts.With(guestStayId, CheckoutStatus.Failed); + var guestStayCheckouts = GuestStayCheckouts.With(guestStayId, CheckoutStatus.Failed); - return AreAnyOngoingCheckouts(guestStayCheckouts) - ? new object[] { guestCheckoutFailed } - : new[] { guestCheckoutFailed, Finalize(guestStayCheckouts, now) }; - }); + return AreAnyOngoingCheckouts(guestStayCheckouts) + ? new object[] { guestCheckoutFailed } + : new[] { guestCheckoutFailed, Finalize(guestStayCheckouts, now) }; + } private object Finalize(Dictionary guestStayCheckouts, DateTimeOffset now) => !AreAnyFailedCheckouts(guestStayCheckouts) @@ -128,7 +130,7 @@ private static Guid[] CheckoutsWith(Dictionary guestStayCh public static GroupCheckout Create(GroupCheckoutInitiated @event) => new GroupCheckout( - @event.GroupCheckoutId, + @event.GroupCheckOutId, @event.GuestStayIds.ToDictionary(id => id, _ => CheckoutStatus.Pending) ); diff --git a/Sample/HotelManagement/HotelManagement/Choreography/GroupCheckouts/GroupCheckoutDomainService.cs b/Sample/HotelManagement/HotelManagement/Choreography/GroupCheckouts/GroupCheckoutDomainService.cs index 320e481f8..33608d5f6 100644 --- a/Sample/HotelManagement/HotelManagement/Choreography/GroupCheckouts/GroupCheckoutDomainService.cs +++ b/Sample/HotelManagement/HotelManagement/Choreography/GroupCheckouts/GroupCheckoutDomainService.cs @@ -1,6 +1,10 @@ using Core.Commands; +using Core.Events; using Core.Marten.Extensions; +using Core.Structures; +using HotelManagement.Choreography.GuestStayAccounts; using Marten; +using static Core.Structures.EventOrCommand; namespace HotelManagement.Choreography.GroupCheckouts; @@ -29,18 +33,22 @@ DateTimeOffset FailedAt public class GuestStayDomainService: ICommandHandler, - ICommandHandler, - ICommandHandler, - ICommandHandler + IEventHandler, + IEventHandler, + IEventHandler { private readonly IDocumentSession documentSession; + private readonly IAsyncCommandBus commandBus; - public GuestStayDomainService(IDocumentSession documentSession) => + public GuestStayDomainService(IDocumentSession documentSession, IAsyncCommandBus commandBus) + { this.documentSession = documentSession; + this.commandBus = commandBus; + } public Task Handle(InitiateGroupCheckout command, CancellationToken ct) => documentSession.Add( - command.GroupCheckoutId, + command.GroupCheckoutId.ToString(), GroupCheckout.Initiate( command.GroupCheckoutId, command.ClerkId, @@ -50,24 +58,46 @@ public Task Handle(InitiateGroupCheckout command, CancellationToken ct) => ct ); - public Task Handle(RecordGuestCheckoutsInitiation command, CancellationToken ct) => - documentSession.GetAndUpdate( - command.GroupCheckoutId, - (GroupCheckout state) => state.RecordGuestCheckoutsInitiation(command.InitiatedGuestStayIds, DateTimeOffset.UtcNow), - ct - ); + public Task Handle(GroupCheckoutInitiated @event, CancellationToken ct) + { + IEnumerable OnInitiated(GroupCheckout groupCheckout) + { + var result = groupCheckout.RecordGuestCheckoutsInitiation(@event.GuestStayIds, @event.InitiatedAt); + + if (result is not null) + { + foreach (var guestAccountId in @event.GuestStayIds) + { + yield return Command(new CheckOutGuest(guestAccountId, @event.GroupCheckOutId)); + } + + yield return Event(result); + } + } - public Task Handle(RecordGuestCheckoutCompletion command, CancellationToken ct) => - documentSession.GetAndUpdate( - command.GuestStayId, - (GroupCheckout state) => state.RecordGuestCheckoutCompletion(command.GuestStayId, command.CompletedAt), + return documentSession.GetAndUpdate(@event.GroupCheckOutId.ToString(), OnInitiated, ct); + } + + public Task Handle(GuestStayAccounts.GuestCheckedOut @event, CancellationToken ct) + { + if (!@event.GroupCheckOutId.HasValue) + return Task.CompletedTask; + + return documentSession.GetAndUpdate(@event.GroupCheckOutId.Value.ToString(), + groupCheckout => groupCheckout.RecordGuestCheckoutCompletion(@event.GuestStayId, @event.CheckedOutAt), ct ); + } + + public Task Handle(Choreography.GuestStayAccounts.GuestCheckoutFailed @event, CancellationToken ct) + { + if (!@event.GroupCheckOutId.HasValue) + return Task.CompletedTask; - public Task Handle(RecordGuestCheckoutFailure command, CancellationToken ct) => - documentSession.GetAndUpdate( - command.GuestStayId, - (GroupCheckout state) => state.RecordGuestCheckoutFailure(command.GuestStayId, command.FailedAt), + return documentSession.GetAndUpdate(@event.GroupCheckOutId.Value.ToString(), + groupCheckout => + groupCheckout.RecordGuestCheckoutFailure(@event.GuestStayId, @event.FailedAt), ct ); + } } diff --git a/Sample/HotelManagement/HotelManagement/Choreography/GroupCheckouts/GroupCheckoutSaga.cs b/Sample/HotelManagement/HotelManagement/Choreography/GroupCheckouts/GroupCheckoutSaga.cs index 4aa0f0cab..e43a8921b 100644 --- a/Sample/HotelManagement/HotelManagement/Choreography/GroupCheckouts/GroupCheckoutSaga.cs +++ b/Sample/HotelManagement/HotelManagement/Choreography/GroupCheckouts/GroupCheckoutSaga.cs @@ -19,14 +19,14 @@ public async Task Handle(GroupCheckoutInitiated @event, CancellationToken ct) foreach (var guestAccountId in @event.GuestStayIds) { await commandBus.Schedule( - new CheckOutGuest(guestAccountId, @event.GroupCheckoutId), + new CheckOutGuest(guestAccountId, @event.GroupCheckOutId), ct ); } await commandBus.Schedule( new RecordGuestCheckoutsInitiation( - @event.GroupCheckoutId, + @event.GroupCheckOutId, @event.GuestStayIds ), ct diff --git a/Sample/HotelManagement/README.md b/Sample/HotelManagement/README.md index f29e720f6..0366879ee 100644 --- a/Sample/HotelManagement/README.md +++ b/Sample/HotelManagement/README.md @@ -9,7 +9,7 @@ It was modelled and explained in detail in the [Implementing Distributed Process How to deal with privacy and GDPR in Event-Sourced systems It shows how to: -- orchestrate and coordinate a business workflow spanning across multiple aggregates using the [Saga pattern](https://event-driven.io/en/saga_process_manager_distributed_transactions/), +- orchestrate and coordinate a business workflow spanning across multiple aggregates using the [Saga pattern](./HotelManagement/Sagas), [Choreography](./HotelManagement/Choreography) and [Process Managers](./HotelManagement/ProcessManagers), - handle distributed processing both for asynchronous command scheduling and event publishing, - getting at-least-once delivery guarantee, - implementing command store and outbox pattern on top of Marten and EventStoreDB,