Skip to content

Commit

Permalink
Remove CancellationToken from persistence ops
Browse files Browse the repository at this point in the history
Writing persistence operations can no longer be cancelled to prevent
data loss when cancelling a workflow.

Fixes #953, #1032
  • Loading branch information
mamidenn committed Aug 8, 2022
1 parent 088bc1a commit 7b58646
Show file tree
Hide file tree
Showing 15 changed files with 163 additions and 163 deletions.
6 changes: 3 additions & 3 deletions src/WorkflowCore/Interface/Persistence/IEventRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ namespace WorkflowCore.Interface
{
public interface IEventRepository
{
Task<string> CreateEvent(Event newEvent, CancellationToken cancellationToken = default);
Task<string> CreateEvent(Event newEvent);

Task<Event> GetEvent(string id, CancellationToken cancellationToken = default);

Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt, CancellationToken cancellationToken = default);

Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);

Task MarkEventProcessed(string id, CancellationToken cancellationToken = default);
Task MarkEventProcessed(string id);

Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default);
Task MarkEventUnprocessed(string id);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace WorkflowCore.Interface
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository, IScheduledCommandRepository
{

Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = default);
Task PersistErrors(IEnumerable<ExecutionError> errors);

void EnsureStoreExists();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ namespace WorkflowCore.Interface
{
public interface ISubscriptionRepository
{
Task<string> CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default);
Task<string> CreateEventSubscription(EventSubscription subscription);

Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);

Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default);
Task TerminateSubscription(string eventSubscriptionId);

Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default);

Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);

Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default);
Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry);

Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default);
Task ClearSubscriptionToken(string eventSubscriptionId, string token);

}
}
4 changes: 2 additions & 2 deletions src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ namespace WorkflowCore.Interface
{
public interface IWorkflowRepository
{
Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);
Task<string> CreateNewWorkflow(WorkflowInstance workflow);

Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);
Task PersistWorkflow(WorkflowInstance workflow);

Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default);

Expand Down
8 changes: 4 additions & 4 deletions src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
if (activity == null)
{
Logger.LogWarning($"Activity already processed - {(evt.EventData as ActivityResult).SubscriptionId}");
await _eventRepository.MarkEventProcessed(itemId, cancellationToken);
await _eventRepository.MarkEventProcessed(itemId);
return;
}
subs = new List<EventSubscription> { activity };
Expand All @@ -77,7 +77,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance

if (complete)
{
await _eventRepository.MarkEventProcessed(itemId, cancellationToken);
await _eventRepository.MarkEventProcessed(itemId);
}
else
{
Expand Down Expand Up @@ -135,8 +135,8 @@ private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, Hash
p.Active = true;
}
workflow.NextExecution = 0;
await _workflowRepository.PersistWorkflow(workflow, cancellationToken);
await _subscriptionRepository.TerminateSubscription(sub.Id, cancellationToken);
await _workflowRepository.PersistWorkflow(workflow);
await _subscriptionRepository.TerminateSubscription(sub.Id);
return true;
}
catch (Exception ex)
Expand Down
8 changes: 4 additions & 4 deletions src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
finally
{
WorkflowActivity.Enrich(result);
await _persistenceStore.PersistWorkflow(workflow, cancellationToken);
await _persistenceStore.PersistWorkflow(workflow);
await QueueProvider.QueueWork(itemId, QueueType.Index);
_greylist.Remove($"wf:{itemId}");
}
Expand All @@ -71,7 +71,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
await SubscribeEvent(sub, _persistenceStore, cancellationToken);
}

await _persistenceStore.PersistErrors(result.Errors, cancellationToken);
await _persistenceStore.PersistErrors(result.Errors);

if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue)
{
Expand Down Expand Up @@ -103,7 +103,7 @@ private async Task SubscribeEvent(EventSubscription subscription, IPersistencePr
//TODO: move to own class
Logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId);

await persistenceStore.CreateEventSubscription(subscription, cancellationToken);
await persistenceStore.CreateEventSubscription(subscription);
if (subscription.EventName != Event.EventTypeActivity)
{
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken);
Expand Down Expand Up @@ -131,7 +131,7 @@ private async Task SubscribeEvent(EventSubscription subscription, IPersistencePr
else
{
_greylist.Remove(eventKey);
await persistenceStore.MarkEventUnprocessed(evt, cancellationToken);
await persistenceStore.MarkEventUnprocessed(evt);
await QueueProvider.QueueWork(evt, QueueType.Event);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class MemoryPersistenceProvider : ISingletonMemoryProvider

public bool SupportsScheduledCommands => false;

public async Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default)
public async Task<string> CreateNewWorkflow(WorkflowInstance workflow)
{
lock (_instances)
{
Expand All @@ -36,7 +36,7 @@ public async Task<string> CreateNewWorkflow(WorkflowInstance workflow, Cancellat
}
}

public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default)
public async Task PersistWorkflow(WorkflowInstance workflow)
{
lock (_instances)
{
Expand Down Expand Up @@ -107,7 +107,7 @@ public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowSt
}


public async Task<string> CreateEventSubscription(EventSubscription subscription, CancellationToken _ = default)
public async Task<string> CreateEventSubscription(EventSubscription subscription)
{
lock (_subscriptions)
{
Expand All @@ -126,7 +126,7 @@ public async Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventN
}
}

public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default)
public async Task TerminateSubscription(string eventSubscriptionId)
{
lock (_subscriptions)
{
Expand Down Expand Up @@ -154,7 +154,7 @@ public Task<EventSubscription> GetFirstOpenSubscription(string eventName, string
}
}

public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default)
public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry)
{
lock (_subscriptions)
{
Expand All @@ -167,7 +167,7 @@ public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token,
}
}

public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default)
public Task ClearSubscriptionToken(string eventSubscriptionId, string token)
{
lock (_subscriptions)
{
Expand All @@ -186,7 +186,7 @@ public void EnsureStoreExists()
{
}

public async Task<string> CreateEvent(Event newEvent, CancellationToken _ = default)
public async Task<string> CreateEvent(Event newEvent)
{
lock (_events)
{
Expand All @@ -196,7 +196,7 @@ public async Task<string> CreateEvent(Event newEvent, CancellationToken _ = defa
}
}

public async Task MarkEventProcessed(string id, CancellationToken _ = default)
public async Task MarkEventProcessed(string id)
{
lock (_events)
{
Expand Down Expand Up @@ -238,7 +238,7 @@ public async Task<IEnumerable<string>> GetEvents(string eventName, string eventK
}
}

public async Task MarkEventUnprocessed(string id, CancellationToken _ = default)
public async Task MarkEventUnprocessed(string id)
{
lock (_events)
{
Expand All @@ -250,7 +250,7 @@ public async Task MarkEventUnprocessed(string id, CancellationToken _ = default)
}
}

public async Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken _ = default)
public async Task PersistErrors(IEnumerable<ExecutionError> errors)
{
lock (errors)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
_innerService = innerService;
}

public Task<string> CreateEvent(Event newEvent, CancellationToken _ = default) => _innerService.CreateEvent(newEvent);
public Task<string> CreateEvent(Event newEvent) => _innerService.CreateEvent(newEvent);

public Task<string> CreateEventSubscription(EventSubscription subscription, CancellationToken _ = default) => _innerService.CreateEventSubscription(subscription);
public Task<string> CreateEventSubscription(EventSubscription subscription) => _innerService.CreateEventSubscription(subscription);

public Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default) => _innerService.CreateNewWorkflow(workflow);
public Task<string> CreateNewWorkflow(WorkflowInstance workflow) => _innerService.CreateNewWorkflow(workflow);

public void EnsureStoreExists() => _innerService.EnsureStoreExists();

Expand All @@ -42,22 +42,22 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)

public Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take) => _innerService.GetWorkflowInstances(status, type, createdFrom, createdTo, skip, take);

public Task MarkEventProcessed(string id, CancellationToken _ = default) => _innerService.MarkEventProcessed(id);
public Task MarkEventProcessed(string id) => _innerService.MarkEventProcessed(id);

public Task MarkEventUnprocessed(string id, CancellationToken _ = default) => _innerService.MarkEventUnprocessed(id);
public Task MarkEventUnprocessed(string id) => _innerService.MarkEventUnprocessed(id);

public Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken _ = default) => _innerService.PersistErrors(errors);
public Task PersistErrors(IEnumerable<ExecutionError> errors) => _innerService.PersistErrors(errors);

public Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default) => _innerService.PersistWorkflow(workflow);
public Task PersistWorkflow(WorkflowInstance workflow) => _innerService.PersistWorkflow(workflow);

public Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.TerminateSubscription(eventSubscriptionId);
public Task TerminateSubscription(string eventSubscriptionId) => _innerService.TerminateSubscription(eventSubscriptionId);
public Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.GetSubscription(eventSubscriptionId);

public Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken _ = default) => _innerService.GetFirstOpenSubscription(eventName, eventKey, asOf);

public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry);
public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry);

public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token);
public Task ClearSubscriptionToken(string eventSubscriptionId, string token) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token);

public Task ScheduleCommand(ScheduledCommand command)
{
Expand Down
6 changes: 3 additions & 3 deletions src/WorkflowCore/Services/SyncWorkflowRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, in
var id = Guid.NewGuid().ToString();

if (persistSate)
id = await _persistenceStore.CreateNewWorkflow(wf, token);
id = await _persistenceStore.CreateNewWorkflow(wf);
else
wf.Id = id;

Expand All @@ -89,7 +89,7 @@ public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, in
{
await _executor.Execute(wf, token);
if (persistSate)
await _persistenceStore.PersistWorkflow(wf, token);
await _persistenceStore.PersistWorkflow(wf);
}
}
finally
Expand All @@ -103,4 +103,4 @@ public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, in
return wf;
}
}
}
}
Loading

0 comments on commit 7b58646

Please sign in to comment.