From d1a1c0a7ea9658be51f8f1676eb8ece6e853dab0 Mon Sep 17 00:00:00 2001 From: "Michel v. Varendorff" <80046268+mvarendorff2@users.noreply.github.com> Date: Wed, 2 Oct 2024 10:12:15 +0200 Subject: [PATCH] fix: startup deadlock (#50) --- src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs | 7 +++++-- src/Fluss/SideEffects/SideEffectDispatcher.cs | 9 +++++++-- src/Fluss/Upcasting/EventUpcasterService.cs | 8 +++----- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs b/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs index 7bd688f..f7ff5d4 100644 --- a/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs +++ b/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs @@ -65,14 +65,17 @@ protected override Task ExecuteAsync(CancellationToken stoppingToken) var migrationRunner = scope.ServiceProvider.GetRequiredService(); migrationRunner.MigrateUp(); - _didFinish = true; - _didFinishChanged.Release(); } catch (Exception e) { logger.LogError(e, "Error while migrating"); Environment.Exit(-1); } + finally + { + _didFinish = true; + _didFinishChanged.Release(); + } }, stoppingToken); } } diff --git a/src/Fluss/SideEffects/SideEffectDispatcher.cs b/src/Fluss/SideEffects/SideEffectDispatcher.cs index 905b4e1..50b90b6 100644 --- a/src/Fluss/SideEffects/SideEffectDispatcher.cs +++ b/src/Fluss/SideEffects/SideEffectDispatcher.cs @@ -32,11 +32,16 @@ public SideEffectDispatcher(IEnumerable sideEffects, IServiceProvide CacheSideEffects(sideEffects); } - public async Task StartAsync(CancellationToken cancellationToken) + public Task StartAsync(CancellationToken cancellationToken) + { + Init(); + return Task.CompletedTask; + } + + private async void Init() { var upcaster = _serviceProvider.GetRequiredService(); await upcaster.WaitForCompletionAsync(); - _transientEventRepository.NewEvents += HandleNewEvents; _transientEventRepository.NewTransientEvents += HandleNewTransientEvents; } diff --git a/src/Fluss/Upcasting/EventUpcasterService.cs b/src/Fluss/Upcasting/EventUpcasterService.cs index 484c598..414804b 100644 --- a/src/Fluss/Upcasting/EventUpcasterService.cs +++ b/src/Fluss/Upcasting/EventUpcasterService.cs @@ -17,7 +17,7 @@ public class EventUpcasterService( { private readonly List _sortedUpcasters = sorter.SortByDependencies(upcasters); - private readonly CancellationTokenSource _onCompletedSource = new(); + private readonly TaskCompletionSource _onCompletedSource = new(); public async ValueTask Run() { @@ -47,13 +47,11 @@ public async ValueTask Run() events = upcastedEvents; } - _onCompletedSource.Cancel(); + _onCompletedSource.SetResult(); } public async Task WaitForCompletionAsync() { - var tcs = new TaskCompletionSource(); - _onCompletedSource.Token.Register(s => ((TaskCompletionSource)s!).SetResult(true), tcs); - await tcs.Task; + await _onCompletedSource.Task; } }