Skip to content

Commit

Permalink
fix: startup deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
mvarendorff2 committed Oct 2, 2024
1 parent 949242e commit a7c3630
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
7 changes: 5 additions & 2 deletions src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,17 @@ protected override Task ExecuteAsync(CancellationToken stoppingToken)
var migrationRunner = scope.ServiceProvider.GetRequiredService<IMigrationRunner>();
migrationRunner.MigrateUp();

_didFinish = true;
_didFinishChanged.Release();
}
catch (Exception e)
{
logger.LogError(e, "Error while migrating");
Environment.Exit(-1);
}
finally
{
_didFinish = true;
_didFinishChanged.Release();
}

Check warning on line 78 in src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs

View check run for this annotation

Codecov / codecov/patch

src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs#L74-L78

Added lines #L74 - L78 were not covered by tests
}, stoppingToken);
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/Fluss/SideEffects/SideEffectDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@ public SideEffectDispatcher(IEnumerable<SideEffect> sideEffects, IServiceProvide
CacheSideEffects(sideEffects);
}

public async Task StartAsync(CancellationToken cancellationToken)
public Task StartAsync(CancellationToken cancellationToken)
{
Init();
return Task.CompletedTask;
}

private async void Init()
{
var upcaster = _serviceProvider.GetRequiredService<EventUpcasterService>();
await upcaster.WaitForCompletionAsync();

_transientEventRepository.NewEvents += HandleNewEvents;
_transientEventRepository.NewTransientEvents += HandleNewTransientEvents;
}
Expand Down
8 changes: 3 additions & 5 deletions src/Fluss/Upcasting/EventUpcasterService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class EventUpcasterService(
{
private readonly List<IUpcaster> _sortedUpcasters = sorter.SortByDependencies(upcasters);

private readonly CancellationTokenSource _onCompletedSource = new();
private readonly TaskCompletionSource _onCompletedSource = new();

public async ValueTask Run()
{
Expand Down Expand Up @@ -47,13 +47,11 @@ public async ValueTask Run()
events = upcastedEvents;
}

_onCompletedSource.Cancel();
_onCompletedSource.SetResult();
}

public async Task WaitForCompletionAsync()
{
var tcs = new TaskCompletionSource<bool>();
_onCompletedSource.Token.Register(s => ((TaskCompletionSource<bool>)s!).SetResult(true), tcs);
await tcs.Task;
await _onCompletedSource.Task;
}
}

0 comments on commit a7c3630

Please sign in to comment.