diff --git a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs index 3a6465902..33a086cf8 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs @@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance finally { WorkflowActivity.Enrich(result); - await _persistenceStore.PersistWorkflow(workflow, result.Subscriptions, cancellationToken); + await _persistenceStore.PersistWorkflow(workflow, result.Subscriptions); await QueueProvider.QueueWork(itemId, QueueType.Index); _greylist.Remove($"wf:{itemId}"); } @@ -68,10 +68,10 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance { foreach (var sub in result.Subscriptions) { - await TryProcessSubscription(sub, _persistenceStore, cancellationToken); + await TryProcessSubscription(sub, _persistenceStore); } - await _persistenceStore.PersistErrors(result.Errors, cancellationToken); + await _persistenceStore.PersistErrors(result.Errors); if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue) { @@ -98,11 +98,15 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand() } - private async Task TryProcessSubscription(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken) + private async Task TryProcessSubscription(EventSubscription subscription, IPersistenceProvider persistenceStore) { + //TODO: move to own class + Logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId); + + await persistenceStore.CreateEventSubscription(subscription, cancellationToken); if (subscription.EventName != Event.EventTypeActivity) { - var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken); + var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf); foreach (var evt in events) { @@ -110,12 +114,12 @@ private async Task TryProcessSubscription(EventSubscription subscription, IPersi bool acquiredLock = false; try { - acquiredLock = await _lockProvider.AcquireLock(eventKey, cancellationToken); + acquiredLock = await _lockProvider.AcquireLock(eventKey, CancellationToken.None); int attempt = 0; while (!acquiredLock && attempt < 10) { - await Task.Delay(Options.IdleTime, cancellationToken); - acquiredLock = await _lockProvider.AcquireLock(eventKey, cancellationToken); + await Task.Delay(Options.IdleTime); + acquiredLock = await _lockProvider.AcquireLock(eventKey, CancellationToken.None); attempt++; } @@ -127,7 +131,7 @@ private async Task TryProcessSubscription(EventSubscription subscription, IPersi else { _greylist.Remove(eventKey); - await persistenceStore.MarkEventUnprocessed(evt, cancellationToken); + await persistenceStore.MarkEventUnprocessed(evt); await QueueProvider.QueueWork(evt, QueueType.Event); } }