Skip to content

Commit

Permalink
Prevent information loss on host shutdown
Browse files Browse the repository at this point in the history
Do not pass global CancellationToken to persistence operations on host
shutdown. This way it is ensured that persistence operations are not
cancelled.
  • Loading branch information
mamidenn committed Aug 16, 2022
1 parent 7de5032 commit 0444177
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
finally
{
WorkflowActivity.Enrich(result);
await _persistenceStore.PersistWorkflow(workflow, result.Subscriptions, cancellationToken);
await _persistenceStore.PersistWorkflow(workflow, result.Subscriptions);
await QueueProvider.QueueWork(itemId, QueueType.Index);
_greylist.Remove($"wf:{itemId}");
}
Expand All @@ -68,10 +68,10 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
{
foreach (var sub in result.Subscriptions)
{
await TryProcessSubscription(sub, _persistenceStore, cancellationToken);
await TryProcessSubscription(sub, _persistenceStore);
}

await _persistenceStore.PersistErrors(result.Errors, cancellationToken);
await _persistenceStore.PersistErrors(result.Errors);

if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue)
{
Expand All @@ -98,24 +98,28 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()

}

private async Task TryProcessSubscription(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken)
private async Task TryProcessSubscription(EventSubscription subscription, IPersistenceProvider persistenceStore)
{
//TODO: move to own class
Logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId);

await persistenceStore.CreateEventSubscription(subscription, cancellationToken);
if (subscription.EventName != Event.EventTypeActivity)
{
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken);
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf);

foreach (var evt in events)
{
var eventKey = $"evt:{evt}";
bool acquiredLock = false;
try
{
acquiredLock = await _lockProvider.AcquireLock(eventKey, cancellationToken);
acquiredLock = await _lockProvider.AcquireLock(eventKey, CancellationToken.None);
int attempt = 0;
while (!acquiredLock && attempt < 10)
{
await Task.Delay(Options.IdleTime, cancellationToken);
acquiredLock = await _lockProvider.AcquireLock(eventKey, cancellationToken);
await Task.Delay(Options.IdleTime);
acquiredLock = await _lockProvider.AcquireLock(eventKey, CancellationToken.None);

attempt++;
}
Expand All @@ -127,7 +131,7 @@ private async Task TryProcessSubscription(EventSubscription subscription, IPersi
else
{
_greylist.Remove(eventKey);
await persistenceStore.MarkEventUnprocessed(evt, cancellationToken);
await persistenceStore.MarkEventUnprocessed(evt);
await QueueProvider.QueueWork(evt, QueueType.Event);
}
}
Expand Down

0 comments on commit 0444177

Please sign in to comment.