diff --git a/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs b/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs index 502e4d0..f232a83 100644 --- a/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs +++ b/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs @@ -398,6 +398,9 @@ public async Task TestDatabaseNotificationForwarding() } }; + // Short delay to allow database trigger to register + await Task.Delay(1000); + // Publish an event using the first repository await repository1.Publish([ new EventEnvelope @@ -410,13 +413,13 @@ await repository1.Publish([ ]); // Wait for both event handlers to be triggered or timeout after 5 seconds - var timeoutTask = Task.Delay(TimeSpan.FromSeconds(5)); - var allTasks = await Task.WhenAny( + var timeoutTask = Task.Delay(TimeSpan.FromSeconds(10)); + var fasterTask = await Task.WhenAny( Task.WhenAll(eventRaised1.Task, eventRaised2.Task), timeoutTask ); - Assert.NotEqual(timeoutTask, allTasks); + Assert.True(timeoutTask != fasterTask, "Ran into timeout"); Assert.True(await eventRaised1.Task, "NewEvents event was not raised on the first repository"); Assert.True(await eventRaised2.Task, "NewEvents event was not raised on the second repository"); } diff --git a/src/Fluss/ServiceCollectionExtensions.cs b/src/Fluss/ServiceCollectionExtensions.cs index c7197be..e417710 100644 --- a/src/Fluss/ServiceCollectionExtensions.cs +++ b/src/Fluss/ServiceCollectionExtensions.cs @@ -39,13 +39,8 @@ public static IServiceCollection AddEventSourcing(this IServiceCollection servic return eventListenerFactory; }) .AddSingleton() - .AddTransient(sp => UnitOfWork.Create( - sp.GetRequiredService(), - sp.GetRequiredService(), - sp.GetServices(), - sp.GetRequiredService(), - sp.GetRequiredService())) - .AddTransient(sp => sp.GetRequiredService()) + .AddTransient(CreateNewUnitOfWork) + .AddTransient(CreateNewUnitOfWork) .AddTransient() .AddSingleton() .AddHostedService() @@ -64,7 +59,20 @@ public static IServiceCollection AddEventSourcing(this IServiceCollection servic return services; } - public static IServiceCollection AddEventRepositoryPipeline<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TEventRepository>(this IServiceCollection services) + private static UnitOfWork CreateNewUnitOfWork(IServiceProvider sp) + { + return UnitOfWork.Create( + sp.GetRequiredService(), + sp.GetRequiredService(), + sp.GetServices(), + sp.GetRequiredService(), + sp.GetRequiredService() + ); + } + + public static IServiceCollection AddEventRepositoryPipeline< + [DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TEventRepository>( + this IServiceCollection services) where TEventRepository : EventRepositoryPipeline { return services @@ -72,14 +80,17 @@ public static IServiceCollection AddEventSourcing(this IServiceCollection servic .AddSingleton(sp => sp.GetRequiredService()); } - public static IServiceCollection AddBaseEventRepository<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TBaseEventRepository>(this IServiceCollection services) + public static IServiceCollection AddBaseEventRepository< + [DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TBaseEventRepository>( + this IServiceCollection services) where TBaseEventRepository : class, IBaseEventRepository { ArgumentNullException.ThrowIfNull(services); return services .AddSingleton() - .AddSingleton(sp => sp.GetRequiredService()) + .AddSingleton(sp => + sp.GetRequiredService()) .AddSingleton(sp => { var pipeline = sp.GetServices(); @@ -94,8 +105,10 @@ public static IServiceCollection AddEventSourcing(this IServiceCollection servic }); } - public static IServiceCollection AddUpcaster<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TUpcaster>(this IServiceCollection services) where TUpcaster : class, IUpcaster + public static IServiceCollection AddUpcaster< + [DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TUpcaster>( + this IServiceCollection services) where TUpcaster : class, IUpcaster { return services.AddSingleton(); } -} +} \ No newline at end of file