Skip to content

Commit

Permalink
fix: startup deadlock (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
mvarendorff2 authored Oct 2, 2024
1 parent f877aca commit d1a1c0a
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
7 changes: 5 additions & 2 deletions src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,17 @@ protected override Task ExecuteAsync(CancellationToken stoppingToken)
var migrationRunner = scope.ServiceProvider.GetRequiredService<IMigrationRunner>();
migrationRunner.MigrateUp();

_didFinish = true;
_didFinishChanged.Release();
}
catch (Exception e)
{
logger.LogError(e, "Error while migrating");
Environment.Exit(-1);
}
finally
{
_didFinish = true;
_didFinishChanged.Release();
}
}, stoppingToken);
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/Fluss/SideEffects/SideEffectDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ public SideEffectDispatcher(IEnumerable<SideEffect> sideEffects, IServiceProvide
CacheSideEffects(sideEffects);
}

public async Task StartAsync(CancellationToken cancellationToken)
public Task StartAsync(CancellationToken cancellationToken)
{
Init();
return Task.CompletedTask;
}

private async void Init()
{
var upcaster = _serviceProvider.GetRequiredService<EventUpcasterService>();
await upcaster.WaitForCompletionAsync();

_transientEventRepository.NewEvents += HandleNewEvents;
_transientEventRepository.NewTransientEvents += HandleNewTransientEvents;
}
Expand Down
8 changes: 3 additions & 5 deletions src/Fluss/Upcasting/EventUpcasterService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class EventUpcasterService(
{
private readonly List<IUpcaster> _sortedUpcasters = sorter.SortByDependencies(upcasters);

private readonly CancellationTokenSource _onCompletedSource = new();
private readonly TaskCompletionSource _onCompletedSource = new();

public async ValueTask Run()
{
Expand Down Expand Up @@ -47,13 +47,11 @@ public async ValueTask Run()
events = upcastedEvents;
}

_onCompletedSource.Cancel();
_onCompletedSource.SetResult();
}

public async Task WaitForCompletionAsync()
{
var tcs = new TaskCompletionSource<bool>();
_onCompletedSource.Token.Register(s => ((TaskCompletionSource<bool>)s!).SetResult(true), tcs);
await tcs.Task;
await _onCompletedSource.Task;
}
}

0 comments on commit d1a1c0a

Please sign in to comment.