Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove CancellationToken from writing persistence operations #1079

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
<PackageLicenseUrl>https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md</PackageLicenseUrl>
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/danielgerlag/workflow-core.git</RepositoryUrl>
<Version>3.6.4</Version>
<AssemblyVersion>3.6.4.0</AssemblyVersion>
<FileVersion>3.6.4.0</FileVersion>
<Version>4.0.0</Version>
<AssemblyVersion>4.0.0.0</AssemblyVersion>
<FileVersion>4.0.0.0</FileVersion>
<PackageIconUrl>https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png</PackageIconUrl>
<PackageVersion>3.6.4</PackageVersion>
<PackageVersion>4.0.0</PackageVersion>
</PropertyGroup>
</Project>
16 changes: 15 additions & 1 deletion src/WorkflowCore.Testing/WorkflowTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public abstract class WorkflowTest<TWorkflow, TData> : IDisposable
protected IWorkflowHost Host;
protected IPersistenceProvider PersistenceProvider;
protected List<StepError> UnhandledStepErrors = new List<StepError>();
private bool isDisposed;

protected virtual void Setup()
{
Expand Down Expand Up @@ -116,9 +117,22 @@ protected TData GetData(string workflowId)
return (TData)instance.Data;
}

protected virtual void Dispose(bool disposing)
{
if (!isDisposed)
{
if (disposing)
{
Host.Stop();
}
isDisposed = true;
}
}

public void Dispose()
{
Host.Stop();
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/WorkflowCore/Interface/Persistence/IEventRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ namespace WorkflowCore.Interface
{
public interface IEventRepository
{
Task<string> CreateEvent(Event newEvent, CancellationToken cancellationToken = default);
Task<string> CreateEvent(Event newEvent);

Task<Event> GetEvent(string id, CancellationToken cancellationToken = default);

Task<IEnumerable<string>> GetRunnableEvents(DateTime asAt, CancellationToken cancellationToken = default);

Task<IEnumerable<string>> GetEvents(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);

Task MarkEventProcessed(string id, CancellationToken cancellationToken = default);
Task MarkEventProcessed(string id);

Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default);
Task MarkEventUnprocessed(string id);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace WorkflowCore.Interface
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository, IScheduledCommandRepository
{

Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = default);
Task PersistErrors(IEnumerable<ExecutionError> errors);

void EnsureStoreExists();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ namespace WorkflowCore.Interface
{
public interface ISubscriptionRepository
{
Task<string> CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default);
Task<string> CreateEventSubscription(EventSubscription subscription);

Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);

Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default);
Task TerminateSubscription(string eventSubscriptionId);

Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default);

Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);

Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default);
Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry);

Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default);
Task ClearSubscriptionToken(string eventSubscriptionId, string token);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ namespace WorkflowCore.Interface
{
public interface IWorkflowRepository
{
Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);
Task<string> CreateNewWorkflow(WorkflowInstance workflow);

Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);
Task PersistWorkflow(WorkflowInstance workflow);

Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default);

Expand Down
8 changes: 4 additions & 4 deletions src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
if (activity == null)
{
Logger.LogWarning($"Activity already processed - {(evt.EventData as ActivityResult).SubscriptionId}");
await _eventRepository.MarkEventProcessed(itemId, cancellationToken);
await _eventRepository.MarkEventProcessed(itemId);
return;
}
subs = new List<EventSubscription> { activity };
Expand All @@ -77,7 +77,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance

if (complete)
{
await _eventRepository.MarkEventProcessed(itemId, cancellationToken);
await _eventRepository.MarkEventProcessed(itemId);
}
else
{
Expand Down Expand Up @@ -135,8 +135,8 @@ private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, Hash
p.Active = true;
}
workflow.NextExecution = 0;
await _workflowRepository.PersistWorkflow(workflow, cancellationToken);
await _subscriptionRepository.TerminateSubscription(sub.Id, cancellationToken);
await _workflowRepository.PersistWorkflow(workflow);
await _subscriptionRepository.TerminateSubscription(sub.Id);
return true;
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
finally
{
WorkflowActivity.Enrich(result);
await _persistenceStore.PersistWorkflow(workflow, cancellationToken);
await _persistenceStore.PersistWorkflow(workflow);
await QueueProvider.QueueWork(itemId, QueueType.Index);
_greylist.Remove($"wf:{itemId}");
}
Expand All @@ -71,7 +71,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
await SubscribeEvent(sub, _persistenceStore, cancellationToken);
}

await _persistenceStore.PersistErrors(result.Errors, cancellationToken);
await _persistenceStore.PersistErrors(result.Errors);

if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue)
{
Expand Down Expand Up @@ -103,7 +103,7 @@ private async Task SubscribeEvent(EventSubscription subscription, IPersistencePr
//TODO: move to own class
Logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId);

await persistenceStore.CreateEventSubscription(subscription, cancellationToken);
await persistenceStore.CreateEventSubscription(subscription);
if (subscription.EventName != Event.EventTypeActivity)
{
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken);
Expand Down Expand Up @@ -131,7 +131,7 @@ private async Task SubscribeEvent(EventSubscription subscription, IPersistencePr
else
{
_greylist.Remove(eventKey);
await persistenceStore.MarkEventUnprocessed(evt, cancellationToken);
await persistenceStore.MarkEventUnprocessed(evt);
await QueueProvider.QueueWork(evt, QueueType.Event);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class MemoryPersistenceProvider : ISingletonMemoryProvider

public bool SupportsScheduledCommands => false;

public async Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default)
public async Task<string> CreateNewWorkflow(WorkflowInstance workflow)
{
lock (_instances)
{
Expand All @@ -36,7 +36,7 @@ public async Task<string> CreateNewWorkflow(WorkflowInstance workflow, Cancellat
}
}

public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default)
public async Task PersistWorkflow(WorkflowInstance workflow)
{
lock (_instances)
{
Expand Down Expand Up @@ -107,7 +107,7 @@ public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowSt
}


public async Task<string> CreateEventSubscription(EventSubscription subscription, CancellationToken _ = default)
public async Task<string> CreateEventSubscription(EventSubscription subscription)
{
lock (_subscriptions)
{
Expand All @@ -126,7 +126,7 @@ public async Task<IEnumerable<EventSubscription>> GetSubscriptions(string eventN
}
}

public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default)
public async Task TerminateSubscription(string eventSubscriptionId)
{
lock (_subscriptions)
{
Expand Down Expand Up @@ -154,7 +154,7 @@ public Task<EventSubscription> GetFirstOpenSubscription(string eventName, string
}
}

public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default)
public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry)
{
lock (_subscriptions)
{
Expand All @@ -167,7 +167,7 @@ public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token,
}
}

public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default)
public Task ClearSubscriptionToken(string eventSubscriptionId, string token)
{
lock (_subscriptions)
{
Expand All @@ -186,7 +186,7 @@ public void EnsureStoreExists()
{
}

public async Task<string> CreateEvent(Event newEvent, CancellationToken _ = default)
public async Task<string> CreateEvent(Event newEvent)
{
lock (_events)
{
Expand All @@ -196,7 +196,7 @@ public async Task<string> CreateEvent(Event newEvent, CancellationToken _ = defa
}
}

public async Task MarkEventProcessed(string id, CancellationToken _ = default)
public async Task MarkEventProcessed(string id)
{
lock (_events)
{
Expand Down Expand Up @@ -238,7 +238,7 @@ public async Task<IEnumerable<string>> GetEvents(string eventName, string eventK
}
}

public async Task MarkEventUnprocessed(string id, CancellationToken _ = default)
public async Task MarkEventUnprocessed(string id)
{
lock (_events)
{
Expand All @@ -250,7 +250,7 @@ public async Task MarkEventUnprocessed(string id, CancellationToken _ = default)
}
}

public async Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken _ = default)
public async Task PersistErrors(IEnumerable<ExecutionError> errors)
{
lock (errors)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
_innerService = innerService;
}

public Task<string> CreateEvent(Event newEvent, CancellationToken _ = default) => _innerService.CreateEvent(newEvent);
public Task<string> CreateEvent(Event newEvent) => _innerService.CreateEvent(newEvent);

public Task<string> CreateEventSubscription(EventSubscription subscription, CancellationToken _ = default) => _innerService.CreateEventSubscription(subscription);
public Task<string> CreateEventSubscription(EventSubscription subscription) => _innerService.CreateEventSubscription(subscription);

public Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default) => _innerService.CreateNewWorkflow(workflow);
public Task<string> CreateNewWorkflow(WorkflowInstance workflow) => _innerService.CreateNewWorkflow(workflow);

public void EnsureStoreExists() => _innerService.EnsureStoreExists();

Expand All @@ -42,22 +42,22 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)

public Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take) => _innerService.GetWorkflowInstances(status, type, createdFrom, createdTo, skip, take);

public Task MarkEventProcessed(string id, CancellationToken _ = default) => _innerService.MarkEventProcessed(id);
public Task MarkEventProcessed(string id) => _innerService.MarkEventProcessed(id);

public Task MarkEventUnprocessed(string id, CancellationToken _ = default) => _innerService.MarkEventUnprocessed(id);
public Task MarkEventUnprocessed(string id) => _innerService.MarkEventUnprocessed(id);

public Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken _ = default) => _innerService.PersistErrors(errors);
public Task PersistErrors(IEnumerable<ExecutionError> errors) => _innerService.PersistErrors(errors);

public Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default) => _innerService.PersistWorkflow(workflow);
public Task PersistWorkflow(WorkflowInstance workflow) => _innerService.PersistWorkflow(workflow);

public Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.TerminateSubscription(eventSubscriptionId);
public Task TerminateSubscription(string eventSubscriptionId) => _innerService.TerminateSubscription(eventSubscriptionId);
public Task<EventSubscription> GetSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.GetSubscription(eventSubscriptionId);

public Task<EventSubscription> GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken _ = default) => _innerService.GetFirstOpenSubscription(eventName, eventKey, asOf);

public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry);
public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry);

public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token);
public Task ClearSubscriptionToken(string eventSubscriptionId, string token) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token);

public Task ScheduleCommand(ScheduledCommand command)
{
Expand Down
6 changes: 3 additions & 3 deletions src/WorkflowCore/Services/SyncWorkflowRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, in
var id = Guid.NewGuid().ToString();

if (persistSate)
id = await _persistenceStore.CreateNewWorkflow(wf, token);
id = await _persistenceStore.CreateNewWorkflow(wf);
else
wf.Id = id;

Expand All @@ -89,7 +89,7 @@ public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, in
{
await _executor.Execute(wf, token);
if (persistSate)
await _persistenceStore.PersistWorkflow(wf, token);
await _persistenceStore.PersistWorkflow(wf);
}
}
finally
Expand All @@ -103,4 +103,4 @@ public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, in
return wf;
}
}
}
}
Loading