diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index 30816a3b6..d1c9923b7 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -4,10 +4,10 @@
https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md
git
https://github.com/danielgerlag/workflow-core.git
- 3.6.4
- 3.6.4.0
- 3.6.4.0
+ 4.0.0
+ 4.0.0.0
+ 4.0.0.0
https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png
- 3.6.4
+ 4.0.0
diff --git a/src/WorkflowCore.Testing/WorkflowTest.cs b/src/WorkflowCore.Testing/WorkflowTest.cs
index bf0eb97ab..1bda5ef1e 100644
--- a/src/WorkflowCore.Testing/WorkflowTest.cs
+++ b/src/WorkflowCore.Testing/WorkflowTest.cs
@@ -17,6 +17,7 @@ public abstract class WorkflowTest : IDisposable
protected IWorkflowHost Host;
protected IPersistenceProvider PersistenceProvider;
protected List UnhandledStepErrors = new List();
+ private bool isDisposed;
protected virtual void Setup()
{
@@ -116,9 +117,22 @@ protected TData GetData(string workflowId)
return (TData)instance.Data;
}
+ protected virtual void Dispose(bool disposing)
+ {
+ if (!isDisposed)
+ {
+ if (disposing)
+ {
+ Host.Stop();
+ }
+ isDisposed = true;
+ }
+ }
+
public void Dispose()
{
- Host.Stop();
+ Dispose(disposing: true);
+ GC.SuppressFinalize(this);
}
}
diff --git a/src/WorkflowCore/Interface/Persistence/IEventRepository.cs b/src/WorkflowCore/Interface/Persistence/IEventRepository.cs
index 75651c24b..3de29cdbd 100644
--- a/src/WorkflowCore/Interface/Persistence/IEventRepository.cs
+++ b/src/WorkflowCore/Interface/Persistence/IEventRepository.cs
@@ -8,7 +8,7 @@ namespace WorkflowCore.Interface
{
public interface IEventRepository
{
- Task CreateEvent(Event newEvent, CancellationToken cancellationToken = default);
+ Task CreateEvent(Event newEvent);
Task GetEvent(string id, CancellationToken cancellationToken = default);
@@ -16,9 +16,9 @@ public interface IEventRepository
Task> GetEvents(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);
- Task MarkEventProcessed(string id, CancellationToken cancellationToken = default);
+ Task MarkEventProcessed(string id);
- Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default);
+ Task MarkEventUnprocessed(string id);
}
}
diff --git a/src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs b/src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs
index 4b83f5920..57fd630a1 100644
--- a/src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs
+++ b/src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs
@@ -9,7 +9,7 @@ namespace WorkflowCore.Interface
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository, IScheduledCommandRepository
{
- Task PersistErrors(IEnumerable errors, CancellationToken cancellationToken = default);
+ Task PersistErrors(IEnumerable errors);
void EnsureStoreExists();
diff --git a/src/WorkflowCore/Interface/Persistence/ISubscriptionRepository.cs b/src/WorkflowCore/Interface/Persistence/ISubscriptionRepository.cs
index 2f22a45f4..4206fc43a 100644
--- a/src/WorkflowCore/Interface/Persistence/ISubscriptionRepository.cs
+++ b/src/WorkflowCore/Interface/Persistence/ISubscriptionRepository.cs
@@ -8,19 +8,19 @@ namespace WorkflowCore.Interface
{
public interface ISubscriptionRepository
{
- Task CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default);
+ Task CreateEventSubscription(EventSubscription subscription);
Task> GetSubscriptions(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);
- Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default);
+ Task TerminateSubscription(string eventSubscriptionId);
Task GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default);
Task GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default);
- Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default);
+ Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry);
- Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default);
+ Task ClearSubscriptionToken(string eventSubscriptionId, string token);
}
}
diff --git a/src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs b/src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs
index 1da4275cd..b4612b430 100644
--- a/src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs
+++ b/src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs
@@ -8,9 +8,9 @@ namespace WorkflowCore.Interface
{
public interface IWorkflowRepository
{
- Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);
+ Task CreateNewWorkflow(WorkflowInstance workflow);
- Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default);
+ Task PersistWorkflow(WorkflowInstance workflow);
Task> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default);
diff --git a/src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs
index dd7323b01..37cb2fce5 100644
--- a/src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs
+++ b/src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs
@@ -59,7 +59,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
if (activity == null)
{
Logger.LogWarning($"Activity already processed - {(evt.EventData as ActivityResult).SubscriptionId}");
- await _eventRepository.MarkEventProcessed(itemId, cancellationToken);
+ await _eventRepository.MarkEventProcessed(itemId);
return;
}
subs = new List { activity };
@@ -77,7 +77,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
if (complete)
{
- await _eventRepository.MarkEventProcessed(itemId, cancellationToken);
+ await _eventRepository.MarkEventProcessed(itemId);
}
else
{
@@ -135,8 +135,8 @@ private async Task SeedSubscription(Event evt, EventSubscription sub, Hash
p.Active = true;
}
workflow.NextExecution = 0;
- await _workflowRepository.PersistWorkflow(workflow, cancellationToken);
- await _subscriptionRepository.TerminateSubscription(sub.Id, cancellationToken);
+ await _workflowRepository.PersistWorkflow(workflow);
+ await _subscriptionRepository.TerminateSubscription(sub.Id);
return true;
}
catch (Exception ex)
diff --git a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs
index b3dd4aa0b..92b4b881d 100644
--- a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs
+++ b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs
@@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
finally
{
WorkflowActivity.Enrich(result);
- await _persistenceStore.PersistWorkflow(workflow, cancellationToken);
+ await _persistenceStore.PersistWorkflow(workflow);
await QueueProvider.QueueWork(itemId, QueueType.Index);
_greylist.Remove($"wf:{itemId}");
}
@@ -71,7 +71,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
await SubscribeEvent(sub, _persistenceStore, cancellationToken);
}
- await _persistenceStore.PersistErrors(result.Errors, cancellationToken);
+ await _persistenceStore.PersistErrors(result.Errors);
if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue)
{
@@ -103,7 +103,7 @@ private async Task SubscribeEvent(EventSubscription subscription, IPersistencePr
//TODO: move to own class
Logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId);
- await persistenceStore.CreateEventSubscription(subscription, cancellationToken);
+ await persistenceStore.CreateEventSubscription(subscription);
if (subscription.EventName != Event.EventTypeActivity)
{
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken);
@@ -131,7 +131,7 @@ private async Task SubscribeEvent(EventSubscription subscription, IPersistencePr
else
{
_greylist.Remove(eventKey);
- await persistenceStore.MarkEventUnprocessed(evt, cancellationToken);
+ await persistenceStore.MarkEventUnprocessed(evt);
await QueueProvider.QueueWork(evt, QueueType.Event);
}
}
diff --git a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs
index e983f8fd5..e4d52953f 100644
--- a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs
+++ b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs
@@ -26,7 +26,7 @@ public class MemoryPersistenceProvider : ISingletonMemoryProvider
public bool SupportsScheduledCommands => false;
- public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default)
+ public async Task CreateNewWorkflow(WorkflowInstance workflow)
{
lock (_instances)
{
@@ -36,7 +36,7 @@ public async Task CreateNewWorkflow(WorkflowInstance workflow, Cancellat
}
}
- public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default)
+ public async Task PersistWorkflow(WorkflowInstance workflow)
{
lock (_instances)
{
@@ -107,7 +107,7 @@ public async Task> GetWorkflowInstances(WorkflowSt
}
- public async Task CreateEventSubscription(EventSubscription subscription, CancellationToken _ = default)
+ public async Task CreateEventSubscription(EventSubscription subscription)
{
lock (_subscriptions)
{
@@ -126,7 +126,7 @@ public async Task> GetSubscriptions(string eventN
}
}
- public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default)
+ public async Task TerminateSubscription(string eventSubscriptionId)
{
lock (_subscriptions)
{
@@ -154,7 +154,7 @@ public Task GetFirstOpenSubscription(string eventName, string
}
}
- public Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default)
+ public Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry)
{
lock (_subscriptions)
{
@@ -167,7 +167,7 @@ public Task SetSubscriptionToken(string eventSubscriptionId, string token,
}
}
- public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default)
+ public Task ClearSubscriptionToken(string eventSubscriptionId, string token)
{
lock (_subscriptions)
{
@@ -186,7 +186,7 @@ public void EnsureStoreExists()
{
}
- public async Task CreateEvent(Event newEvent, CancellationToken _ = default)
+ public async Task CreateEvent(Event newEvent)
{
lock (_events)
{
@@ -196,7 +196,7 @@ public async Task CreateEvent(Event newEvent, CancellationToken _ = defa
}
}
- public async Task MarkEventProcessed(string id, CancellationToken _ = default)
+ public async Task MarkEventProcessed(string id)
{
lock (_events)
{
@@ -238,7 +238,7 @@ public async Task> GetEvents(string eventName, string eventK
}
}
- public async Task MarkEventUnprocessed(string id, CancellationToken _ = default)
+ public async Task MarkEventUnprocessed(string id)
{
lock (_events)
{
@@ -250,7 +250,7 @@ public async Task MarkEventUnprocessed(string id, CancellationToken _ = default)
}
}
- public async Task PersistErrors(IEnumerable errors, CancellationToken _ = default)
+ public async Task PersistErrors(IEnumerable errors)
{
lock (errors)
{
diff --git a/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs b/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs
index 0a94c6f8e..f5d458ef7 100644
--- a/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs
+++ b/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs
@@ -18,11 +18,11 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
_innerService = innerService;
}
- public Task CreateEvent(Event newEvent, CancellationToken _ = default) => _innerService.CreateEvent(newEvent);
+ public Task CreateEvent(Event newEvent) => _innerService.CreateEvent(newEvent);
- public Task CreateEventSubscription(EventSubscription subscription, CancellationToken _ = default) => _innerService.CreateEventSubscription(subscription);
+ public Task CreateEventSubscription(EventSubscription subscription) => _innerService.CreateEventSubscription(subscription);
- public Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default) => _innerService.CreateNewWorkflow(workflow);
+ public Task CreateNewWorkflow(WorkflowInstance workflow) => _innerService.CreateNewWorkflow(workflow);
public void EnsureStoreExists() => _innerService.EnsureStoreExists();
@@ -42,22 +42,22 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
public Task> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take) => _innerService.GetWorkflowInstances(status, type, createdFrom, createdTo, skip, take);
- public Task MarkEventProcessed(string id, CancellationToken _ = default) => _innerService.MarkEventProcessed(id);
+ public Task MarkEventProcessed(string id) => _innerService.MarkEventProcessed(id);
- public Task MarkEventUnprocessed(string id, CancellationToken _ = default) => _innerService.MarkEventUnprocessed(id);
+ public Task MarkEventUnprocessed(string id) => _innerService.MarkEventUnprocessed(id);
- public Task PersistErrors(IEnumerable errors, CancellationToken _ = default) => _innerService.PersistErrors(errors);
+ public Task PersistErrors(IEnumerable errors) => _innerService.PersistErrors(errors);
- public Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default) => _innerService.PersistWorkflow(workflow);
+ public Task PersistWorkflow(WorkflowInstance workflow) => _innerService.PersistWorkflow(workflow);
- public Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.TerminateSubscription(eventSubscriptionId);
+ public Task TerminateSubscription(string eventSubscriptionId) => _innerService.TerminateSubscription(eventSubscriptionId);
public Task GetSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.GetSubscription(eventSubscriptionId);
public Task GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken _ = default) => _innerService.GetFirstOpenSubscription(eventName, eventKey, asOf);
- public Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry);
+ public Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry);
- public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token);
+ public Task ClearSubscriptionToken(string eventSubscriptionId, string token) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token);
public Task ScheduleCommand(ScheduledCommand command)
{
diff --git a/src/WorkflowCore/Services/SyncWorkflowRunner.cs b/src/WorkflowCore/Services/SyncWorkflowRunner.cs
index 5317a8966..4a4521829 100644
--- a/src/WorkflowCore/Services/SyncWorkflowRunner.cs
+++ b/src/WorkflowCore/Services/SyncWorkflowRunner.cs
@@ -72,7 +72,7 @@ public async Task RunWorkflowSync(string workflowId, in
var id = Guid.NewGuid().ToString();
if (persistSate)
- id = await _persistenceStore.CreateNewWorkflow(wf, token);
+ id = await _persistenceStore.CreateNewWorkflow(wf);
else
wf.Id = id;
@@ -89,7 +89,7 @@ public async Task RunWorkflowSync(string workflowId, in
{
await _executor.Execute(wf, token);
if (persistSate)
- await _persistenceStore.PersistWorkflow(wf, token);
+ await _persistenceStore.PersistWorkflow(wf);
}
}
finally
@@ -103,4 +103,4 @@ public async Task RunWorkflowSync(string workflowId, in
return wf;
}
}
-}
\ No newline at end of file
+}
diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs
index 44798a0c9..8d3a9957c 100644
--- a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs
+++ b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs
@@ -26,26 +26,26 @@ public EntityFrameworkPersistenceProvider(IWorkflowDbContextFactory contextFacto
_canMigrateDB = canMigrateDB;
}
- public async Task CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default)
+ public async Task CreateEventSubscription(EventSubscription subscription)
{
using (var db = ConstructDbContext())
{
subscription.Id = Guid.NewGuid().ToString();
var persistable = subscription.ToPersistable();
var result = db.Set().Add(persistable);
- await db.SaveChangesAsync(cancellationToken);
+ await db.SaveChangesAsync();
return subscription.Id;
}
}
- public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
+ public async Task CreateNewWorkflow(WorkflowInstance workflow)
{
using (var db = ConstructDbContext())
{
workflow.Id = Guid.NewGuid().ToString();
var persistable = workflow.ToPersistable();
var result = db.Set().Add(persistable);
- await db.SaveChangesAsync(cancellationToken);
+ await db.SaveChangesAsync();
return workflow.Id;
}
}
@@ -134,7 +134,7 @@ public async Task> GetWorkflowInstances(IEnumerabl
}
}
- public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
+ public async Task PersistWorkflow(WorkflowInstance workflow)
{
using (var db = ConstructDbContext())
{
@@ -145,21 +145,21 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
.ThenInclude(ep => ep.ExtensionAttributes)
.Include(wf => wf.ExecutionPointers)
.AsTracking()
- .FirstAsync(cancellationToken);
+ .FirstAsync();
var persistable = workflow.ToPersistable(existingEntity);
- await db.SaveChangesAsync(cancellationToken);
+ await db.SaveChangesAsync();
}
}
- public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default)
+ public async Task TerminateSubscription(string eventSubscriptionId)
{
using (var db = ConstructDbContext())
{
var uid = new Guid(eventSubscriptionId);
- var existing = await db.Set().FirstAsync(x => x.SubscriptionId == uid, cancellationToken);
+ var existing = await db.Set().FirstAsync(x => x.SubscriptionId == uid);
db.Set().Remove(existing);
- await db.SaveChangesAsync(cancellationToken);
+ await db.SaveChangesAsync();
}
}
@@ -194,14 +194,14 @@ public async Task> GetSubscriptions(string eventN
}
}
- public async Task CreateEvent(Event newEvent, CancellationToken cancellationToken = default)
+ public async Task CreateEvent(Event newEvent)
{
using (var db = ConstructDbContext())
{
newEvent.Id = Guid.NewGuid().ToString();
var persistable = newEvent.ToPersistable();
var result = db.Set().Add(persistable);
- await db.SaveChangesAsync(cancellationToken);
+ await db.SaveChangesAsync();
return newEvent.Id;
}
}
@@ -237,7 +237,7 @@ public async Task> GetRunnableEvents(DateTime asAt, Cancella
}
}
- public async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default)
+ public async Task MarkEventProcessed(string id)
{
using (var db = ConstructDbContext())
{
@@ -245,10 +245,10 @@ public async Task MarkEventProcessed(string id, CancellationToken cancellationTo
var existingEntity = await db.Set()
.Where(x => x.EventId == uid)
.AsTracking()
- .FirstAsync(cancellationToken);
+ .FirstAsync();
existingEntity.IsProcessed = true;
- await db.SaveChangesAsync(cancellationToken);
+ await db.SaveChangesAsync();
}
}
@@ -271,7 +271,7 @@ public async Task> GetEvents(string eventName, string eventK
}
}
- public async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default)
+ public async Task MarkEventUnprocessed(string id)
{
using (var db = ConstructDbContext())
{
@@ -279,14 +279,14 @@ public async Task MarkEventUnprocessed(string id, CancellationToken cancellation
var existingEntity = await db.Set()
.Where(x => x.EventId == uid)
.AsTracking()
- .FirstAsync(cancellationToken);
+ .FirstAsync();
existingEntity.IsProcessed = false;
- await db.SaveChangesAsync(cancellationToken);
+ await db.SaveChangesAsync();
}
}
- public async Task PersistErrors(IEnumerable errors, CancellationToken cancellationToken = default)
+ public async Task PersistErrors(IEnumerable errors)
{
using (var db = ConstructDbContext())
{
@@ -297,7 +297,7 @@ public async Task PersistErrors(IEnumerable errors, Cancellation
{
db.Set().Add(error.ToPersistable());
}
- await db.SaveChangesAsync(cancellationToken);
+ await db.SaveChangesAsync();
}
}
@@ -329,7 +329,7 @@ public async Task GetFirstOpenSubscription(string eventName,
}
}
- public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default)
+ public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry)
{
using (var db = ConstructDbContext())
{
@@ -337,18 +337,18 @@ public async Task SetSubscriptionToken(string eventSubscriptionId, string
var existingEntity = await db.Set()
.Where(x => x.SubscriptionId == uid)
.AsTracking()
- .FirstAsync(cancellationToken);
+ .FirstAsync();
existingEntity.ExternalToken = token;
existingEntity.ExternalWorkerId = workerId;
existingEntity.ExternalTokenExpiry = expiry;
- await db.SaveChangesAsync(cancellationToken);
+ await db.SaveChangesAsync();
return true;
}
}
- public async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default)
+ public async Task ClearSubscriptionToken(string eventSubscriptionId, string token)
{
using (var db = ConstructDbContext())
{
@@ -356,7 +356,7 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke
var existingEntity = await db.Set()
.Where(x => x.SubscriptionId == uid)
.AsTracking()
- .FirstAsync(cancellationToken);
+ .FirstAsync();
if (existingEntity.ExternalToken != token)
throw new InvalidOperationException();
@@ -364,7 +364,7 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke
existingEntity.ExternalToken = null;
existingEntity.ExternalWorkerId = null;
existingEntity.ExternalTokenExpiry = null;
- await db.SaveChangesAsync(cancellationToken);
+ await db.SaveChangesAsync();
}
}
diff --git a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs
index 06be13355..8c163cd19 100644
--- a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs
+++ b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs
@@ -138,15 +138,15 @@ static void CreateIndexes(MongoPersistenceProvider instance)
private IMongoCollection ScheduledCommands => _database.GetCollection("wfc.scheduled_commands");
- public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
+ public async Task CreateNewWorkflow(WorkflowInstance workflow)
{
- await WorkflowInstances.InsertOneAsync(workflow, cancellationToken: cancellationToken);
+ await WorkflowInstances.InsertOneAsync(workflow);
return workflow.Id;
}
- public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
+ public async Task PersistWorkflow(WorkflowInstance workflow)
{
- await WorkflowInstances.ReplaceOneAsync(x => x.Id == workflow.Id, workflow, cancellationToken: cancellationToken);
+ await WorkflowInstances.ReplaceOneAsync(x => x.Id == workflow.Id, workflow);
}
public async Task> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default)
@@ -195,15 +195,15 @@ public async Task> GetWorkflowInstances(WorkflowSt
return await result.Skip(skip).Take(take).ToListAsync();
}
- public async Task CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default)
+ public async Task CreateEventSubscription(EventSubscription subscription)
{
- await EventSubscriptions.InsertOneAsync(subscription, cancellationToken: cancellationToken);
+ await EventSubscriptions.InsertOneAsync(subscription);
return subscription.Id;
}
- public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default)
+ public async Task TerminateSubscription(string eventSubscriptionId)
{
- await EventSubscriptions.DeleteOneAsync(x => x.Id == eventSubscriptionId, cancellationToken);
+ await EventSubscriptions.DeleteOneAsync(x => x.Id == eventSubscriptionId);
}
public async Task GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default)
@@ -220,25 +220,25 @@ public async Task GetFirstOpenSubscription(string eventName,
return await query.FirstOrDefaultAsync(cancellationToken);
}
- public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default)
+ public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry)
{
var update = Builders.Update
.Set(x => x.ExternalToken, token)
.Set(x => x.ExternalTokenExpiry, expiry)
.Set(x => x.ExternalWorkerId, workerId);
- var result = await EventSubscriptions.UpdateOneAsync(x => x.Id == eventSubscriptionId && x.ExternalToken == null, update, cancellationToken: cancellationToken);
+ var result = await EventSubscriptions.UpdateOneAsync(x => x.Id == eventSubscriptionId && x.ExternalToken == null, update);
return (result.ModifiedCount > 0);
}
- public async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default)
+ public async Task ClearSubscriptionToken(string eventSubscriptionId, string token)
{
var update = Builders.Update
.Set(x => x.ExternalToken, null)
.Set(x => x.ExternalTokenExpiry, null)
.Set(x => x.ExternalWorkerId, null);
- await EventSubscriptions.UpdateOneAsync(x => x.Id == eventSubscriptionId && x.ExternalToken == token, update, cancellationToken: cancellationToken);
+ await EventSubscriptions.UpdateOneAsync(x => x.Id == eventSubscriptionId && x.ExternalToken == token, update);
}
public void EnsureStoreExists()
@@ -254,9 +254,9 @@ public async Task> GetSubscriptions(string eventN
return await query.ToListAsync(cancellationToken);
}
- public async Task CreateEvent(Event newEvent, CancellationToken cancellationToken = default)
+ public async Task CreateEvent(Event newEvent)
{
- await Events.InsertOneAsync(newEvent, cancellationToken: cancellationToken);
+ await Events.InsertOneAsync(newEvent);
return newEvent.Id;
}
@@ -276,12 +276,12 @@ public async Task> GetRunnableEvents(DateTime asAt, Cancella
return await query.ToListAsync(cancellationToken);
}
- public async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default)
+ public async Task MarkEventProcessed(string id)
{
var update = Builders.Update
.Set(x => x.IsProcessed, true);
- await Events.UpdateOneAsync(x => x.Id == id, update, cancellationToken: cancellationToken);
+ await Events.UpdateOneAsync(x => x.Id == id, update);
}
public async Task> GetEvents(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken)
@@ -293,18 +293,18 @@ public async Task> GetEvents(string eventName, string eventK
return await query.ToListAsync(cancellationToken);
}
- public async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default)
+ public async Task MarkEventUnprocessed(string id)
{
var update = Builders.Update
.Set(x => x.IsProcessed, false);
- await Events.UpdateOneAsync(x => x.Id == id, update, cancellationToken: cancellationToken);
+ await Events.UpdateOneAsync(x => x.Id == id, update);
}
- public async Task PersistErrors(IEnumerable errors, CancellationToken cancellationToken = default)
+ public async Task PersistErrors(IEnumerable errors)
{
if (errors.Any())
- await ExecutionErrors.InsertManyAsync(errors, cancellationToken: cancellationToken);
+ await ExecutionErrors.InsertManyAsync(errors);
}
public bool SupportsScheduledCommands => true;
diff --git a/src/providers/WorkflowCore.Persistence.MySQL/MysqlContext.cs b/src/providers/WorkflowCore.Persistence.MySQL/MysqlContext.cs
index dc49617fd..9e9967dce 100644
--- a/src/providers/WorkflowCore.Persistence.MySQL/MysqlContext.cs
+++ b/src/providers/WorkflowCore.Persistence.MySQL/MysqlContext.cs
@@ -23,7 +23,7 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
base.OnConfiguring(optionsBuilder);
#if NETSTANDARD2_0
optionsBuilder.UseMySql(_connectionString, _mysqlOptionsAction);
-#elif NETSTANDARD2_1_OR_GREATER
+#else
optionsBuilder.UseMySql(_connectionString, ServerVersion.AutoDetect(_connectionString), _mysqlOptionsAction);
#endif
}
diff --git a/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs
index 695b8e31f..19ad37fad 100644
--- a/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs
+++ b/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs
@@ -41,18 +41,18 @@ static void CreateIndexes(RavendbPersistenceProvider instance)
}
}
- public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
+ public async Task CreateNewWorkflow(WorkflowInstance workflow)
{
using (var session = _database.OpenAsyncSession())
{
- await session.StoreAsync(workflow, cancellationToken);
+ await session.StoreAsync(workflow);
var id = workflow.Id;
- await session.SaveChangesAsync(cancellationToken);
+ await session.SaveChangesAsync();
return id;
}
}
- public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
+ public async Task PersistWorkflow(WorkflowInstance workflow)
{
using (var session = _database.OpenAsyncSession())
{
@@ -67,7 +67,7 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
session.Advanced.Patch(workflow.Id, x => x.CreateTime, workflow.CreateTime);
session.Advanced.Patch(workflow.Id, x => x.CompleteTime, workflow.CompleteTime);
- await session.SaveChangesAsync(cancellationToken);
+ await session.SaveChangesAsync();
}
}
@@ -130,18 +130,18 @@ public async Task> GetWorkflowInstances(WorkflowSt
}
}
- public async Task CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default)
+ public async Task CreateEventSubscription(EventSubscription subscription)
{
using (var session = _database.OpenAsyncSession())
{
- await session.StoreAsync(subscription, cancellationToken);
+ await session.StoreAsync(subscription);
var id = subscription.Id;
- await session.SaveChangesAsync(cancellationToken);
+ await session.SaveChangesAsync();
return id;
}
}
- public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default)
+ public async Task TerminateSubscription(string eventSubscriptionId)
{
using (var session = _database.OpenAsyncSession())
{
@@ -174,7 +174,7 @@ public async Task GetFirstOpenSubscription(string eventName,
}
}
- public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default)
+ public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry)
{
try
{
@@ -189,7 +189,7 @@ public async Task SetSubscriptionToken(string eventSubscriptionId, string
strbuilder.Append($"e.ExternalWorkerId = 'workerId'");
strbuilder.Append("}");
- var operation = await _database.Operations.SendAsync(new PatchByQueryOperation(strbuilder.ToString()), token: cancellationToken);
+ var operation = await _database.Operations.SendAsync(new PatchByQueryOperation(strbuilder.ToString()));
operation.WaitForCompletion();
return true;
}
@@ -199,7 +199,7 @@ public async Task SetSubscriptionToken(string eventSubscriptionId, string
}
}
- public async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default)
+ public async Task ClearSubscriptionToken(string eventSubscriptionId, string token)
{
try
{
@@ -214,7 +214,7 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke
strbuilder.Append($"e.ExternalWorkerId = null");
strbuilder.Append("}");
- var operation = await _database.Operations.SendAsync(new PatchByQueryOperation(strbuilder.ToString()), token: cancellationToken);
+ var operation = await _database.Operations.SendAsync(new PatchByQueryOperation(strbuilder.ToString()));
operation.WaitForCompletion();
}
catch (Exception e)
@@ -239,13 +239,13 @@ public async Task> GetSubscriptions(string eventN
}
}
- public async Task CreateEvent(Event newEvent, CancellationToken cancellationToken = default)
+ public async Task CreateEvent(Event newEvent)
{
using (var session = _database.OpenAsyncSession())
{
- await session.StoreAsync(newEvent, cancellationToken);
+ await session.StoreAsync(newEvent);
var id = newEvent.Id;
- await session.SaveChangesAsync(cancellationToken);
+ await session.SaveChangesAsync();
return id;
}
}
@@ -272,13 +272,13 @@ public async Task> GetRunnableEvents(DateTime asAt, Cancella
}
}
- public async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default)
+ public async Task MarkEventProcessed(string id)
{
using (var session = _database.OpenAsyncSession())
{
session.Advanced.Patch(id, x => x.IsProcessed, true);
- await session.SaveChangesAsync(cancellationToken);
+ await session.SaveChangesAsync();
}
}
@@ -297,21 +297,21 @@ public async Task> GetEvents(string eventName, string eventK
}
}
- public async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default)
+ public async Task MarkEventUnprocessed(string id)
{
using (var session = _database.OpenAsyncSession())
{
session.Advanced.Patch(id, x => x.IsProcessed, false);
- await session.SaveChangesAsync(cancellationToken);
+ await session.SaveChangesAsync();
}
}
- public async Task PersistErrors(IEnumerable errors, CancellationToken cancellationToken = default)
+ public async Task PersistErrors(IEnumerable errors)
{
if (errors.Any())
{
- var blk = _database.BulkInsert(token: cancellationToken);
+ var blk = _database.BulkInsert();
foreach (var error in errors)
await blk.StoreAsync(error);
diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs
index 1126b6c20..9a10f9a8a 100644
--- a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs
+++ b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs
@@ -34,7 +34,7 @@ public DynamoPersistenceProvider(AWSCredentials credentials, AmazonDynamoDBConfi
_provisioner = provisioner;
}
- public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
+ public async Task CreateNewWorkflow(WorkflowInstance workflow)
{
workflow.Id = Guid.NewGuid().ToString();
@@ -45,12 +45,12 @@ public async Task CreateNewWorkflow(WorkflowInstance workflow, Cancellat
ConditionExpression = "attribute_not_exists(id)"
};
- var _ = await _client.PutItemAsync(req, cancellationToken);
+ var _ = await _client.PutItemAsync(req);
return workflow.Id;
}
- public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
+ public async Task PersistWorkflow(WorkflowInstance workflow)
{
var request = new PutItemRequest
{
@@ -58,7 +58,7 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
Item = workflow.ToDynamoMap()
};
- var response = await _client.PutItemAsync(request, cancellationToken);
+ var response = await _client.PutItemAsync(request);
}
public async Task> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default)
@@ -163,7 +163,7 @@ public async Task> GetWorkflowInstances(IEnumerabl
return result.Select(i => i.ToWorkflowInstance());
}
- public async Task CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default)
+ public async Task CreateEventSubscription(EventSubscription subscription)
{
subscription.Id = Guid.NewGuid().ToString();
@@ -174,7 +174,7 @@ public async Task CreateEventSubscription(EventSubscription subscription
ConditionExpression = "attribute_not_exists(id)"
};
- var response = await _client.PutItemAsync(req, cancellationToken);
+ var response = await _client.PutItemAsync(req);
return subscription.Id;
}
@@ -215,7 +215,7 @@ public async Task> GetSubscriptions(string eventN
return result;
}
- public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default)
+ public async Task TerminateSubscription(string eventSubscriptionId)
{
var request = new DeleteItemRequest
{
@@ -225,10 +225,10 @@ public async Task TerminateSubscription(string eventSubscriptionId, Cancellation
{ "id", new AttributeValue(eventSubscriptionId) }
}
};
- await _client.DeleteItemAsync(request, cancellationToken);
+ await _client.DeleteItemAsync(request);
}
- public async Task CreateEvent(Event newEvent, CancellationToken cancellationToken = default)
+ public async Task CreateEvent(Event newEvent)
{
newEvent.Id = Guid.NewGuid().ToString();
@@ -239,7 +239,7 @@ public async Task CreateEvent(Event newEvent, CancellationToken cancella
ConditionExpression = "attribute_not_exists(id)"
};
- var _ = await _client.PutItemAsync(req, cancellationToken);
+ var _ = await _client.PutItemAsync(req);
return newEvent.Id;
}
@@ -329,7 +329,7 @@ public async Task> GetEvents(string eventName, string eventK
return result;
}
- public async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default)
+ public async Task MarkEventProcessed(string id)
{
var request = new UpdateItemRequest
{
@@ -340,10 +340,10 @@ public async Task MarkEventProcessed(string id, CancellationToken cancellationTo
},
UpdateExpression = "REMOVE not_processed"
};
- await _client.UpdateItemAsync(request, cancellationToken);
+ await _client.UpdateItemAsync(request);
}
- public async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default)
+ public async Task MarkEventUnprocessed(string id)
{
var request = new UpdateItemRequest
{
@@ -358,10 +358,10 @@ public async Task MarkEventUnprocessed(string id, CancellationToken cancellation
{ ":n" , new AttributeValue { N = 1.ToString() } }
}
};
- await _client.UpdateItemAsync(request, cancellationToken);
+ await _client.UpdateItemAsync(request);
}
- public Task PersistErrors(IEnumerable errors, CancellationToken _ = default)
+ public Task PersistErrors(IEnumerable errors)
{
//TODO
return Task.CompletedTask;
@@ -423,7 +423,7 @@ public async Task GetFirstOpenSubscription(string eventName,
return result.FirstOrDefault();
}
- public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default)
+ public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry)
{
var request = new UpdateItemRequest
{
@@ -443,7 +443,7 @@ public async Task SetSubscriptionToken(string eventSubscriptionId, string
};
try
{
- await _client.UpdateItemAsync(request, cancellationToken);
+ await _client.UpdateItemAsync(request);
return true;
}
catch (ConditionalCheckFailedException)
@@ -452,7 +452,7 @@ public async Task SetSubscriptionToken(string eventSubscriptionId, string
}
}
- public async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default)
+ public async Task ClearSubscriptionToken(string eventSubscriptionId, string token)
{
var request = new UpdateItemRequest
{
@@ -469,7 +469,7 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke
}
};
- await _client.UpdateItemAsync(request, cancellationToken);
+ await _client.UpdateItemAsync(request);
}
public Task ScheduleCommand(ScheduledCommand command)
@@ -482,4 +482,4 @@ public Task ProcessCommands(DateTimeOffset asOf, Func ac
throw new NotImplementedException();
}
}
-}
\ No newline at end of file
+}
diff --git a/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs
index 644009df2..20c9fa850 100644
--- a/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs
+++ b/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs
@@ -37,7 +37,7 @@ public CosmosDbPersistenceProvider(
public bool SupportsScheduledCommands => false;
- public async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default)
+ public async Task ClearSubscriptionToken(string eventSubscriptionId, string token)
{
var existing = await _subscriptionContainer.Value.ReadItemAsync(eventSubscriptionId, new PartitionKey(eventSubscriptionId));
@@ -47,27 +47,27 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke
existing.Resource.ExternalWorkerId = null;
existing.Resource.ExternalTokenExpiry = null;
- await _subscriptionContainer.Value.ReplaceItemAsync(existing.Resource, eventSubscriptionId, cancellationToken: cancellationToken);
+ await _subscriptionContainer.Value.ReplaceItemAsync(existing.Resource, eventSubscriptionId);
}
- public async Task CreateEvent(Event newEvent, CancellationToken cancellationToken)
+ public async Task CreateEvent(Event newEvent)
{
newEvent.Id = Guid.NewGuid().ToString();
- var result = await _eventContainer.Value.CreateItemAsync(PersistedEvent.FromInstance(newEvent), cancellationToken: cancellationToken);
+ var result = await _eventContainer.Value.CreateItemAsync(PersistedEvent.FromInstance(newEvent));
return result.Resource.id;
}
- public async Task CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken)
+ public async Task CreateEventSubscription(EventSubscription subscription)
{
subscription.Id = Guid.NewGuid().ToString();
- var result = await _subscriptionContainer.Value.CreateItemAsync(PersistedSubscription.FromInstance(subscription), cancellationToken: cancellationToken);
+ var result = await _subscriptionContainer.Value.CreateItemAsync(PersistedSubscription.FromInstance(subscription));
return result.Resource.id;
}
- public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken)
+ public async Task CreateNewWorkflow(WorkflowInstance workflow)
{
workflow.Id = Guid.NewGuid().ToString();
- var result = await _workflowContainer.Value.CreateItemAsync(PersistedWorkflow.FromInstance(workflow), cancellationToken: cancellationToken);
+ var result = await _workflowContainer.Value.CreateItemAsync(PersistedWorkflow.FromInstance(workflow));
return result.Resource.id;
}
@@ -203,28 +203,28 @@ public Task> GetWorkflowInstances(IEnumerable(id, new PartitionKey(id), cancellationToken: cancellationToken);
+ var evt = await _eventContainer.Value.ReadItemAsync(id, new PartitionKey(id));
evt.Resource.IsProcessed = true;
- await _eventContainer.Value.ReplaceItemAsync(evt.Resource, id, cancellationToken: cancellationToken);
+ await _eventContainer.Value.ReplaceItemAsync(evt.Resource, id);
}
- public async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken)
+ public async Task MarkEventUnprocessed(string id)
{
- var evt = await _eventContainer.Value.ReadItemAsync(id, new PartitionKey(id), cancellationToken: cancellationToken);
+ var evt = await _eventContainer.Value.ReadItemAsync(id, new PartitionKey(id));
evt.Resource.IsProcessed = false;
- await _eventContainer.Value.ReplaceItemAsync(evt.Resource, id, cancellationToken: cancellationToken);
+ await _eventContainer.Value.ReplaceItemAsync(evt.Resource, id);
}
- public Task PersistErrors(IEnumerable errors, CancellationToken _ = default)
+ public Task PersistErrors(IEnumerable errors)
{
return Task.CompletedTask;
}
- public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken)
+ public async Task PersistWorkflow(WorkflowInstance workflow)
{
- await _workflowContainer.Value.UpsertItemAsync(PersistedWorkflow.FromInstance(workflow), cancellationToken: cancellationToken);
+ await _workflowContainer.Value.UpsertItemAsync(PersistedWorkflow.FromInstance(workflow));
}
public Task ProcessCommands(DateTimeOffset asOf, Func action, CancellationToken cancellationToken = default)
@@ -237,22 +237,22 @@ public Task ScheduleCommand(ScheduledCommand command)
throw new NotImplementedException();
}
- public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken)
+ public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry)
{
- var sub = await _subscriptionContainer.Value.ReadItemAsync(eventSubscriptionId, new PartitionKey(eventSubscriptionId), cancellationToken: cancellationToken);
+ var sub = await _subscriptionContainer.Value.ReadItemAsync(eventSubscriptionId, new PartitionKey(eventSubscriptionId));
var existingEntity = sub.Resource;
existingEntity.ExternalToken = token;
existingEntity.ExternalWorkerId = workerId;
existingEntity.ExternalTokenExpiry = expiry;
- await _subscriptionContainer.Value.ReplaceItemAsync(existingEntity, eventSubscriptionId, cancellationToken: cancellationToken);
+ await _subscriptionContainer.Value.ReplaceItemAsync(existingEntity, eventSubscriptionId);
return true;
}
- public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken)
+ public async Task TerminateSubscription(string eventSubscriptionId)
{
- await _subscriptionContainer.Value.DeleteItemAsync(eventSubscriptionId, new PartitionKey(eventSubscriptionId), cancellationToken: cancellationToken);
+ await _subscriptionContainer.Value.DeleteItemAsync(eventSubscriptionId, new PartitionKey(eventSubscriptionId));
}
}
}
diff --git a/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs
index cde65e2e5..c0c5db9bf 100644
--- a/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs
+++ b/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs
@@ -40,14 +40,14 @@ public RedisPersistenceProvider(string connectionString, string prefix, bool rem
_removeComplete = removeComplete;
}
- public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default)
+ public async Task CreateNewWorkflow(WorkflowInstance workflow)
{
workflow.Id = Guid.NewGuid().ToString();
await PersistWorkflow(workflow);
return workflow.Id;
}
- public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default)
+ public async Task PersistWorkflow(WorkflowInstance workflow)
{
var str = JsonConvert.SerializeObject(workflow, _serializerSettings);
await _redis.HashSetAsync($"{_prefix}.{WORKFLOW_SET}", workflow.Id, str);
@@ -96,7 +96,7 @@ public async Task> GetWorkflowInstances(IEnumerabl
return raw.Select(r => JsonConvert.DeserializeObject(r, _serializerSettings));
}
- public async Task CreateEventSubscription(EventSubscription subscription, CancellationToken _ = default)
+ public async Task CreateEventSubscription(EventSubscription subscription)
{
subscription.Id = Guid.NewGuid().ToString();
var str = JsonConvert.SerializeObject(subscription, _serializerSettings);
@@ -121,7 +121,7 @@ public async Task> GetSubscriptions(string eventN
return result;
}
- public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default)
+ public async Task TerminateSubscription(string eventSubscriptionId)
{
var existingRaw = await _redis.HashGetAsync($"{_prefix}.{SUBSCRIPTION_SET}", eventSubscriptionId);
var existing = JsonConvert.DeserializeObject(existingRaw, _serializerSettings);
@@ -140,7 +140,7 @@ public async Task GetFirstOpenSubscription(string eventName,
return (await GetSubscriptions(eventName, eventKey, asOf, cancellationToken)).FirstOrDefault(sub => string.IsNullOrEmpty(sub.ExternalToken));
}
- public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default)
+ public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry)
{
var item = JsonConvert.DeserializeObject(await _redis.HashGetAsync($"{_prefix}.{SUBSCRIPTION_SET}", eventSubscriptionId), _serializerSettings);
if (item.ExternalToken != null)
@@ -153,7 +153,7 @@ public async Task SetSubscriptionToken(string eventSubscriptionId, string
return true;
}
- public async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default)
+ public async Task ClearSubscriptionToken(string eventSubscriptionId, string token)
{
var item = JsonConvert.DeserializeObject(await _redis.HashGetAsync($"{_prefix}.{SUBSCRIPTION_SET}", eventSubscriptionId), _serializerSettings);
if (item.ExternalToken != token)
@@ -165,7 +165,7 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke
await _redis.HashSetAsync($"{_prefix}.{SUBSCRIPTION_SET}", eventSubscriptionId, str);
}
- public async Task CreateEvent(Event newEvent, CancellationToken _ = default)
+ public async Task CreateEvent(Event newEvent)
{
newEvent.Id = Guid.NewGuid().ToString();
var str = JsonConvert.SerializeObject(newEvent, _serializerSettings);
@@ -208,25 +208,25 @@ public async Task> GetEvents(string eventName, string eventK
return result;
}
- public async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default)
+ public async Task MarkEventProcessed(string id)
{
- var evt = await GetEvent(id, cancellationToken);
+ var evt = await GetEvent(id);
evt.IsProcessed = true;
var str = JsonConvert.SerializeObject(evt, _serializerSettings);
await _redis.HashSetAsync($"{_prefix}.{EVENT_SET}", evt.Id, str);
await _redis.SortedSetRemoveAsync($"{_prefix}.{EVENT_SET}.{RUNNABLE_INDEX}", id);
}
- public async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default)
+ public async Task MarkEventUnprocessed(string id)
{
- var evt = await GetEvent(id, cancellationToken);
+ var evt = await GetEvent(id);
evt.IsProcessed = false;
var str = JsonConvert.SerializeObject(evt, _serializerSettings);
await _redis.HashSetAsync($"{_prefix}.{EVENT_SET}", evt.Id, str);
await _redis.SortedSetAddAsync($"{_prefix}.{EVENT_SET}.{RUNNABLE_INDEX}", evt.Id, evt.EventTime.Ticks);
}
- public Task PersistErrors(IEnumerable errors, CancellationToken _ = default)
+ public Task PersistErrors(IEnumerable errors)
{
return Task.CompletedTask;
}
diff --git a/test/Docker.Testify/Docker.Testify.csproj b/test/Docker.Testify/Docker.Testify.csproj
index a83648df4..cff2d13e8 100644
--- a/test/Docker.Testify/Docker.Testify.csproj
+++ b/test/Docker.Testify/Docker.Testify.csproj
@@ -1,7 +1,7 @@
-
+
diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/StopScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/StopScenario.cs
new file mode 100644
index 000000000..e487a4053
--- /dev/null
+++ b/test/WorkflowCore.IntegrationTests/Scenarios/StopScenario.cs
@@ -0,0 +1,47 @@
+using System.Threading;
+using System.Threading.Tasks;
+using FluentAssertions;
+using WorkflowCore.Interface;
+using WorkflowCore.Models;
+using WorkflowCore.Models.LifeCycleEvents;
+using WorkflowCore.Testing;
+using Xunit;
+
+namespace WorkflowCore.IntegrationTests.Scenarios
+{
+ public class StopScenario : WorkflowTest
+ {
+ public class StopWorkflow : IWorkflow
+ {
+ public string Id => "StopWorkflow";
+ public int Version => 1;
+ public void Build(IWorkflowBuilder