Skip to content

Commit

Permalink
Added examples of process managers in HotelManagement example
Browse files Browse the repository at this point in the history
  • Loading branch information
oskardudycz committed Feb 21, 2023
1 parent 6e8d737 commit 16ddcd6
Show file tree
Hide file tree
Showing 32 changed files with 1,363 additions and 43 deletions.
48 changes: 48 additions & 0 deletions Core.Marten/Aggregates/DocumentSessionExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using Core.Aggregates;
using Core.Exceptions;
using Marten;

namespace Core.Marten.Aggregates;

/// <summary>
/// This code assumes that Aggregate:
/// - is event-driven but not fully event-sourced
/// - streams have string identifiers
/// - aggregate is versioned, so optimistic concurrency is applied
/// </summary>
public static class DocumentSessionExtensions
{
public static Task Add<T>(
this IDocumentSession documentSession,
string id,
T aggregate,
CancellationToken ct
) where T : IAggregate
{
documentSession.Insert(aggregate);
documentSession.Events.Append($"events-{id}", aggregate.DequeueUncommittedEvents());

return documentSession.SaveChangesAsync(token: ct);
}

public static async Task GetAndUpdate<T>(
this IDocumentSession documentSession,
string id,
Action<T> handle,
CancellationToken ct
) where T : IAggregate
{
var aggregate = await documentSession.LoadAsync<T>(id, ct).ConfigureAwait(false);

if (aggregate is null)
throw AggregateNotFoundException.For<T>(id);

handle(aggregate);

documentSession.Update(aggregate);

documentSession.Events.Append($"events-{id}", aggregate.DequeueUncommittedEvents());

await documentSession.SaveChangesAsync(token: ct).ConfigureAwait(false);
}
}
58 changes: 58 additions & 0 deletions Core.Marten/ProcessManagers/DocumentSessionExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
using Core.Exceptions;
using Core.ProcessManagers;
using Marten;

namespace Core.Marten.ProcessManagers;

/// <summary>
/// This code assumes that Process Manager:
/// - is event-driven but not fully event-sourced
/// - streams have string identifiers
/// - process manager is versioned, so optimistic concurrency is applied
/// </summary>
public static class DocumentSessionExtensions
{
public static Task Add<T>(
this IDocumentSession documentSession,
string id,
T processManager,
CancellationToken ct
) where T : IProcessManager
{
documentSession.Insert(processManager);
EnqueueMessages(documentSession, id, processManager);

return documentSession.SaveChangesAsync(token: ct);
}

public static async Task GetAndUpdate<T>(
this IDocumentSession documentSession,
string id,
Action<T> handle,
CancellationToken ct
) where T : IProcessManager
{
var processManager = await documentSession.LoadAsync<T>(id, ct).ConfigureAwait(false);

if (processManager is null)
throw AggregateNotFoundException.For<T>(id);

handle(processManager);

documentSession.Update(processManager);

EnqueueMessages(documentSession, id, processManager);
await documentSession.SaveChangesAsync(token: ct).ConfigureAwait(false);
}

private static void EnqueueMessages<T>(IDocumentSession documentSession, string id, T processManager) where T : IProcessManager
{
foreach (var message in processManager.DequeuePendingMessages())
{
message.Switch(
@event => documentSession.Events.Append($"events-{id}", @event),
command => documentSession.Events.Append($"commands-{id}", command)
);
}
}
}
30 changes: 30 additions & 0 deletions Core/ProcessManagers/IProcessManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using Core.Projections;
using Core.Structures;

namespace Core.ProcessManagers;

public interface IProcessManager: IProcessManager<Guid>
{
}

public interface IProcessManager<out T>: IProjection
{
T Id { get; }
int Version { get; }

EventOrCommand[] DequeuePendingMessages();
}

public class EventOrCommand: Either<object, object>
{
public static EventOrCommand Event(object @event) =>
new(Maybe<object>.Of(@event), Maybe<object>.Empty);


public static EventOrCommand Command(object @event) =>
new(Maybe<object>.Empty, Maybe<object>.Of(@event));

private EventOrCommand(Maybe<object> left, Maybe<object> right): base(left, right)
{
}
}
33 changes: 33 additions & 0 deletions Core/ProcessManagers/ProcessManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace Core.ProcessManagers;

public abstract class ProcessManager: ProcessManager<Guid>, IProcessManager
{
}

public abstract class ProcessManager<T>: IProcessManager<T> where T : notnull
{
public T Id { get; protected set; } = default!;

public int Version { get; protected set; }

[NonSerialized] private readonly Queue<EventOrCommand> scheduledCommands = new();

public EventOrCommand[] DequeuePendingMessages()
{
var dequeuedEvents = scheduledCommands.ToArray();

scheduledCommands.Clear();

return dequeuedEvents;
}

protected void EnqueueEvent(object @event) =>
scheduledCommands.Enqueue(EventOrCommand.Event(@event));

protected void ScheduleCommand(object @event) =>
scheduledCommands.Enqueue(EventOrCommand.Command(@event));

public virtual void When(object @event)
{
}
}
132 changes: 132 additions & 0 deletions Core/Structures/Either.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
namespace Core.Structures;

public class Either<TLeft, TRight>
{
public Maybe<TLeft> Left { get; }
public Maybe<TRight> Right { get; }

public Either(TLeft value)
{
Left = Maybe<TLeft>.Of(value);
Right = Maybe<TRight>.Empty;
}

public Either(TRight value)
{
Left = Maybe<TLeft>.Empty;
Right = Maybe<TRight>.Of(value);
}

public Either(Maybe<TLeft> left, Maybe<TRight> right)
{
if (!left.IsPresent && !right.IsPresent)
throw new ArgumentOutOfRangeException(nameof(right));

Left = left;
Right = right;
}

public TMapped Map<TMapped>(
Func<TLeft, TMapped> mapLeft,
Func<TRight, TMapped> mapRight
)
{
if (Left.IsPresent)
return mapLeft(Left.GetOrThrow());

if (Right.IsPresent)
return mapRight(Right.GetOrThrow());

throw new Exception("That should never happen!");
}

public void Switch(
Action<TLeft> onLeft,
Action<TRight> onRight
)
{
if (Left.IsPresent)
{
onLeft(Left.GetOrThrow());
return;
}

if (Right.IsPresent)
{
onRight(Right.GetOrThrow());
return;
}

throw new Exception("That should never happen!");
}
}

public static class EitherExtensions
{
public static (TLeft? Left, TRight? Right) AssertAnyDefined<TLeft, TRight>(
this (TLeft? Left, TRight? Right) value
)
{
if (value.Left == null && value.Right == null)
throw new ArgumentOutOfRangeException(nameof(value), "One of values needs to be set");

return value;
}

public static TMapped Map<TLeft, TRight, TMapped>(
this (TLeft? Left, TRight? Right) value,
Func<TLeft, TMapped> mapLeft,
Func<TRight, TMapped> mapRight
)
where TLeft: struct
where TRight: struct
{
var (left, right) = value.AssertAnyDefined();

if (left.HasValue)
return mapLeft(left.Value);

if (right.HasValue)
return mapRight(right.Value);

throw new Exception("That should never happen!");
}

public static TMapped Map<TLeft, TRight, TMapped>(
this (TLeft? Left, TRight? Right) value,
Func<TLeft, TMapped> mapT1,
Func<TRight, TMapped> mapT2
)
{
value.AssertAnyDefined();

var either = value.Left != null
? new Either<TLeft, TRight>(value.Left!)
: new Either<TLeft, TRight>(value.Right!);

return either.Map(mapT1, mapT2);
}

public static void Switch<TLeft, TRight>(
this (TLeft? Left, TRight? Right) value,
Action<TLeft> onT1,
Action<TRight> onT2
)
{
value.AssertAnyDefined();

var either = value.Left != null
? new Either<TLeft, TRight>(value.Left!)
: new Either<TLeft, TRight>(value.Right!);

either.Switch(onT1, onT2);
}

public static (TLeft?, TRight?) Either<TLeft, TRight>(
TLeft? left = default
) => (left, default);

public static (TLeft?, TRight?) Either<TLeft, TRight>(
TRight? right = default
) => (default, right);
}
Loading

0 comments on commit 16ddcd6

Please sign in to comment.