diff --git a/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs b/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs index 7bd688f..f7ff5d4 100644 --- a/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs +++ b/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs @@ -65,14 +65,17 @@ protected override Task ExecuteAsync(CancellationToken stoppingToken) var migrationRunner = scope.ServiceProvider.GetRequiredService(); migrationRunner.MigrateUp(); - _didFinish = true; - _didFinishChanged.Release(); } catch (Exception e) { logger.LogError(e, "Error while migrating"); Environment.Exit(-1); } + finally + { + _didFinish = true; + _didFinishChanged.Release(); + } }, stoppingToken); } } diff --git a/src/Fluss/SideEffects/SideEffectDispatcher.cs b/src/Fluss/SideEffects/SideEffectDispatcher.cs index 905b4e1..50b90b6 100644 --- a/src/Fluss/SideEffects/SideEffectDispatcher.cs +++ b/src/Fluss/SideEffects/SideEffectDispatcher.cs @@ -32,11 +32,16 @@ public SideEffectDispatcher(IEnumerable sideEffects, IServiceProvide CacheSideEffects(sideEffects); } - public async Task StartAsync(CancellationToken cancellationToken) + public Task StartAsync(CancellationToken cancellationToken) + { + Init(); + return Task.CompletedTask; + } + + private async void Init() { var upcaster = _serviceProvider.GetRequiredService(); await upcaster.WaitForCompletionAsync(); - _transientEventRepository.NewEvents += HandleNewEvents; _transientEventRepository.NewTransientEvents += HandleNewTransientEvents; } diff --git a/src/Fluss/Upcasting/EventUpcasterService.cs b/src/Fluss/Upcasting/EventUpcasterService.cs index 484c598..414804b 100644 --- a/src/Fluss/Upcasting/EventUpcasterService.cs +++ b/src/Fluss/Upcasting/EventUpcasterService.cs @@ -17,7 +17,7 @@ public class EventUpcasterService( { private readonly List _sortedUpcasters = sorter.SortByDependencies(upcasters); - private readonly CancellationTokenSource _onCompletedSource = new(); + private readonly TaskCompletionSource _onCompletedSource = new(); public async ValueTask Run() { @@ -47,13 +47,11 @@ public async ValueTask Run() events = upcastedEvents; } - _onCompletedSource.Cancel(); + _onCompletedSource.SetResult(); } public async Task WaitForCompletionAsync() { - var tcs = new TaskCompletionSource(); - _onCompletedSource.Token.Register(s => ((TaskCompletionSource)s!).SetResult(true), tcs); - await tcs.Task; + await _onCompletedSource.Task; } }