Skip to content

Commit

Permalink
fix: unit of work policies (#51)
Browse files Browse the repository at this point in the history
* fix: unit of work policies

* fix: Duplicate Dispose

* fix: flaky test
  • Loading branch information
mvarendorff2 authored Oct 7, 2024
1 parent c2313ab commit 6e647c8
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 15 deletions.
9 changes: 6 additions & 3 deletions src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ public async Task TestDatabaseNotificationForwarding()
}
};

// Short delay to allow database trigger to register
await Task.Delay(1000);

// Publish an event using the first repository
await repository1.Publish([
new EventEnvelope
Expand All @@ -410,13 +413,13 @@ await repository1.Publish([
]);

// Wait for both event handlers to be triggered or timeout after 5 seconds
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(5));
var allTasks = await Task.WhenAny(
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(10));
var fasterTask = await Task.WhenAny(
Task.WhenAll(eventRaised1.Task, eventRaised2.Task),
timeoutTask
);

Assert.NotEqual(timeoutTask, allTasks);
Assert.True(timeoutTask != fasterTask, "Ran into timeout");
Assert.True(await eventRaised1.Task, "NewEvents event was not raised on the first repository");
Assert.True(await eventRaised2.Task, "NewEvents event was not raised on the second repository");
}
Expand Down
37 changes: 25 additions & 12 deletions src/Fluss/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,8 @@ public static IServiceCollection AddEventSourcing(this IServiceCollection servic
return eventListenerFactory;
})
.AddSingleton<IArbitraryUserUnitOfWorkCache, ArbitraryUserUnitOfWorkCache>()
.AddTransient<UnitOfWork>(sp => UnitOfWork.Create(
sp.GetRequiredService<IEventRepository>(),
sp.GetRequiredService<IEventListenerFactory>(),
sp.GetServices<Policy>(),
sp.GetRequiredService<UserIdProvider>(),
sp.GetRequiredService<IRootValidator>()))
.AddTransient<IUnitOfWork>(sp => sp.GetRequiredService<UnitOfWork>())
.AddTransient<UnitOfWork>(CreateNewUnitOfWork)
.AddTransient<IUnitOfWork>(CreateNewUnitOfWork)
.AddTransient<UnitOfWorkFactory>()
.AddSingleton<IRootValidator, RootValidator>()
.AddHostedService<SideEffectDispatcher>()
Expand All @@ -64,22 +59,38 @@ public static IServiceCollection AddEventSourcing(this IServiceCollection servic
return services;
}

public static IServiceCollection AddEventRepositoryPipeline<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TEventRepository>(this IServiceCollection services)
private static UnitOfWork CreateNewUnitOfWork(IServiceProvider sp)
{
return UnitOfWork.Create(
sp.GetRequiredService<IEventRepository>(),
sp.GetRequiredService<IEventListenerFactory>(),
sp.GetServices<Policy>(),
sp.GetRequiredService<UserIdProvider>(),
sp.GetRequiredService<IRootValidator>()
);
}

public static IServiceCollection AddEventRepositoryPipeline<
[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TEventRepository>(
this IServiceCollection services)
where TEventRepository : EventRepositoryPipeline
{
return services
.AddSingleton<TEventRepository>()
.AddSingleton<EventRepositoryPipeline>(sp => sp.GetRequiredService<TEventRepository>());
}

public static IServiceCollection AddBaseEventRepository<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TBaseEventRepository>(this IServiceCollection services)
public static IServiceCollection AddBaseEventRepository<
[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TBaseEventRepository>(
this IServiceCollection services)
where TBaseEventRepository : class, IBaseEventRepository
{
ArgumentNullException.ThrowIfNull(services);

return services
.AddSingleton<TBaseEventRepository>()
.AddSingleton<IBaseEventRepository, TBaseEventRepository>(sp => sp.GetRequiredService<TBaseEventRepository>())
.AddSingleton<IBaseEventRepository, TBaseEventRepository>(sp =>
sp.GetRequiredService<TBaseEventRepository>())
.AddSingleton(sp =>
{
var pipeline = sp.GetServices<EventRepositoryPipeline>();
Expand All @@ -94,8 +105,10 @@ public static IServiceCollection AddEventSourcing(this IServiceCollection servic
});
}

public static IServiceCollection AddUpcaster<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TUpcaster>(this IServiceCollection services) where TUpcaster : class, IUpcaster
public static IServiceCollection AddUpcaster<
[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TUpcaster>(
this IServiceCollection services) where TUpcaster : class, IUpcaster
{
return services.AddSingleton<IUpcaster, TUpcaster>();
}
}
}

0 comments on commit 6e647c8

Please sign in to comment.