From 20843ce0cf6ea330c62abef0cbb47f531262b0bc Mon Sep 17 00:00:00 2001 From: Martin Dennhardt Date: Mon, 8 Aug 2022 12:38:41 +0200 Subject: [PATCH 1/6] Fix MysqlContext for .Net6 --- src/providers/WorkflowCore.Persistence.MySQL/MysqlContext.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/providers/WorkflowCore.Persistence.MySQL/MysqlContext.cs b/src/providers/WorkflowCore.Persistence.MySQL/MysqlContext.cs index dc49617fd..9e9967dce 100644 --- a/src/providers/WorkflowCore.Persistence.MySQL/MysqlContext.cs +++ b/src/providers/WorkflowCore.Persistence.MySQL/MysqlContext.cs @@ -23,7 +23,7 @@ protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) base.OnConfiguring(optionsBuilder); #if NETSTANDARD2_0 optionsBuilder.UseMySql(_connectionString, _mysqlOptionsAction); -#elif NETSTANDARD2_1_OR_GREATER +#else optionsBuilder.UseMySql(_connectionString, ServerVersion.AutoDetect(_connectionString), _mysqlOptionsAction); #endif } From 4b2ea94ba19650ad4472d3c6e90f2042f43f74ef Mon Sep 17 00:00:00 2001 From: Martin Dennhardt Date: Mon, 8 Aug 2022 12:38:46 +0200 Subject: [PATCH 2/6] Add test to reproduce race condition If the host is stopped as soon as WorkflowCompleted LifeCycleEvent is raised, some persistence providers cannot persist the 'Completed' state in time. --- src/WorkflowCore.Testing/WorkflowTest.cs | 16 +++++- .../Scenarios/StopScenario.cs | 52 +++++++++++++++++++ .../Scenarios/DynamoStopScenario.cs | 18 +++++++ .../Scenarios/MongoStopScenario.cs | 16 ++++++ .../Scenarios/MysqlStopScenario.cs | 16 ++++++ .../Scenarios/PostgresStopScenario.cs | 16 ++++++ .../Scenarios/RedisStopScenario.cs | 16 ++++++ .../Scenarios/SqlServerStopScenario.cs | 16 ++++++ .../Scenarios/SqliteStopScenario.cs | 17 ++++++ .../SqliteCollection.cs | 2 +- .../SqlitePersistenceProviderFixture.cs | 11 +--- 11 files changed, 185 insertions(+), 11 deletions(-) create mode 100644 test/WorkflowCore.IntegrationTests/Scenarios/StopScenario.cs create mode 100644 test/WorkflowCore.Tests.DynamoDB/Scenarios/DynamoStopScenario.cs create mode 100644 test/WorkflowCore.Tests.MongoDB/Scenarios/MongoStopScenario.cs create mode 100644 test/WorkflowCore.Tests.MySQL/Scenarios/MysqlStopScenario.cs create mode 100644 test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresStopScenario.cs create mode 100644 test/WorkflowCore.Tests.Redis/Scenarios/RedisStopScenario.cs create mode 100644 test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerStopScenario.cs create mode 100644 test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteStopScenario.cs diff --git a/src/WorkflowCore.Testing/WorkflowTest.cs b/src/WorkflowCore.Testing/WorkflowTest.cs index bf0eb97ab..1bda5ef1e 100644 --- a/src/WorkflowCore.Testing/WorkflowTest.cs +++ b/src/WorkflowCore.Testing/WorkflowTest.cs @@ -17,6 +17,7 @@ public abstract class WorkflowTest : IDisposable protected IWorkflowHost Host; protected IPersistenceProvider PersistenceProvider; protected List UnhandledStepErrors = new List(); + private bool isDisposed; protected virtual void Setup() { @@ -116,9 +117,22 @@ protected TData GetData(string workflowId) return (TData)instance.Data; } + protected virtual void Dispose(bool disposing) + { + if (!isDisposed) + { + if (disposing) + { + Host.Stop(); + } + isDisposed = true; + } + } + public void Dispose() { - Host.Stop(); + Dispose(disposing: true); + GC.SuppressFinalize(this); } } diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/StopScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/StopScenario.cs new file mode 100644 index 000000000..f58ddebea --- /dev/null +++ b/test/WorkflowCore.IntegrationTests/Scenarios/StopScenario.cs @@ -0,0 +1,52 @@ +using WorkflowCore.Interface; +using WorkflowCore.Models; +using Xunit; +using FluentAssertions; +using WorkflowCore.Testing; +using WorkflowCore.Models.LifeCycleEvents; +using System.Threading.Tasks; +using System.Threading; +using Moq; + +namespace WorkflowCore.IntegrationTests.Scenarios +{ + public class StopScenario : WorkflowTest + { + public class StopWorkflow : IWorkflow + { + public string Id => "StopWorkflow"; + public int Version => 1; + public void Build(IWorkflowBuilder builder) + { + builder.StartWith(context => ExecutionResult.Next()); + } + } + + public StopScenario() + { + Setup(); + } + + [Fact] + public async Task Scenario() + { + var tcs = new TaskCompletionSource(); + Host.OnLifeCycleEvent += (evt) => OnLifeCycleEvent(evt, tcs); + var workflowId = StartWorkflow(null); + + await tcs.Task; + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); + } + + private async void OnLifeCycleEvent(LifeCycleEvent evt, TaskCompletionSource tcs) + { + if (evt is WorkflowCompleted) + { + await Host.StopAsync(CancellationToken.None); + tcs.SetResult(new()); + } + } + + protected override void Dispose(bool disposing) { } + } +} diff --git a/test/WorkflowCore.Tests.DynamoDB/Scenarios/DynamoStopScenario.cs b/test/WorkflowCore.Tests.DynamoDB/Scenarios/DynamoStopScenario.cs new file mode 100644 index 000000000..6900460d3 --- /dev/null +++ b/test/WorkflowCore.Tests.DynamoDB/Scenarios/DynamoStopScenario.cs @@ -0,0 +1,18 @@ +using System; +using Amazon.DynamoDBv2; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.DynamoDB.Scenarios +{ + [Collection("DynamoDb collection")] + public class DynamoStopScenario : StopScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + var cfg = new AmazonDynamoDBConfig {ServiceURL = DynamoDbDockerSetup.ConnectionString}; + services.AddWorkflow(x => x.UseAwsDynamoPersistence(DynamoDbDockerSetup.Credentials, cfg, "tests-")); + } + } +} diff --git a/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoStopScenario.cs b/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoStopScenario.cs new file mode 100644 index 000000000..050c203c0 --- /dev/null +++ b/test/WorkflowCore.Tests.MongoDB/Scenarios/MongoStopScenario.cs @@ -0,0 +1,16 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.MongoDB.Scenarios +{ + [Collection("Mongo collection")] + public class MongoStopScenario : StopScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseMongoDB(MongoDockerSetup.ConnectionString, "integration-tests")); + } + } +} diff --git a/test/WorkflowCore.Tests.MySQL/Scenarios/MysqlStopScenario.cs b/test/WorkflowCore.Tests.MySQL/Scenarios/MysqlStopScenario.cs new file mode 100644 index 000000000..e00b738d1 --- /dev/null +++ b/test/WorkflowCore.Tests.MySQL/Scenarios/MysqlStopScenario.cs @@ -0,0 +1,16 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.MySQL.Scenarios +{ + [Collection("Mysql collection")] + public class MysqlStopScenario : StopScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseMySQL(MysqlDockerSetup.ScenarioConnectionString, true, true)); + } + } +} diff --git a/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresStopScenario.cs b/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresStopScenario.cs new file mode 100644 index 000000000..995681160 --- /dev/null +++ b/test/WorkflowCore.Tests.PostgreSQL/Scenarios/PostgresStopScenario.cs @@ -0,0 +1,16 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.PostgreSQL.Scenarios +{ + [Collection("Postgres collection")] + public class PostgresStopScenario : StopScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UsePostgreSQL(PostgresDockerSetup.ScenarioConnectionString, true, true)); + } + } +} diff --git a/test/WorkflowCore.Tests.Redis/Scenarios/RedisStopScenario.cs b/test/WorkflowCore.Tests.Redis/Scenarios/RedisStopScenario.cs new file mode 100644 index 000000000..fad60c3fc --- /dev/null +++ b/test/WorkflowCore.Tests.Redis/Scenarios/RedisStopScenario.cs @@ -0,0 +1,16 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.Redis.Scenarios +{ + [Collection("Redis collection")] + public class RedisStopScenario : StopScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseRedisPersistence(RedisDockerSetup.ConnectionString, "scenario-")); + } + } +} diff --git a/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerStopScenario.cs b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerStopScenario.cs new file mode 100644 index 000000000..de4e06ec1 --- /dev/null +++ b/test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerStopScenario.cs @@ -0,0 +1,16 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using Xunit; + +namespace WorkflowCore.Tests.SqlServer.Scenarios +{ + [Collection("SqlServer collection")] + public class SqlServerStopScenario : StopScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseSqlServer(SqlDockerSetup.ScenarioConnectionString, true, true)); + } + } +} diff --git a/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteStopScenario.cs b/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteStopScenario.cs new file mode 100644 index 000000000..0b3d7e188 --- /dev/null +++ b/test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteStopScenario.cs @@ -0,0 +1,17 @@ +using System; +using Microsoft.Extensions.DependencyInjection; +using WorkflowCore.IntegrationTests.Scenarios; +using WorkflowCore.Tests.Sqlite; +using Xunit; + +namespace WorkflowCore.Tests.Sqlite.Scenarios +{ + [Collection("Sqlite collection")] + public class SqliteStopScenario : StopScenario + { + protected override void ConfigureServices(IServiceCollection services) + { + services.AddWorkflow(x => x.UseSqlite(SqliteSetup.ConnectionString, true)); + } + } +} diff --git a/test/WorkflowCore.Tests.Sqlite/SqliteCollection.cs b/test/WorkflowCore.Tests.Sqlite/SqliteCollection.cs index 69ed8c41a..41906a519 100644 --- a/test/WorkflowCore.Tests.Sqlite/SqliteCollection.cs +++ b/test/WorkflowCore.Tests.Sqlite/SqliteCollection.cs @@ -10,7 +10,7 @@ public class SqliteCollection : ICollectionFixture public class SqliteSetup : IDisposable { - public string ConnectionString { get; set; } + public static string ConnectionString { get; set; } public SqliteSetup() { diff --git a/test/WorkflowCore.Tests.Sqlite/SqlitePersistenceProviderFixture.cs b/test/WorkflowCore.Tests.Sqlite/SqlitePersistenceProviderFixture.cs index f8318ad9b..edfea0077 100644 --- a/test/WorkflowCore.Tests.Sqlite/SqlitePersistenceProviderFixture.cs +++ b/test/WorkflowCore.Tests.Sqlite/SqlitePersistenceProviderFixture.cs @@ -10,18 +10,11 @@ namespace WorkflowCore.Tests.Sqlite [Collection("Sqlite collection")] public class SqlitePersistenceProviderFixture : BasePersistenceFixture { - string _connectionString; - - public SqlitePersistenceProviderFixture(SqliteSetup setup) - { - _connectionString = setup.ConnectionString; - } - protected override IPersistenceProvider Subject { get - { - var db = new EntityFrameworkPersistenceProvider(new SqliteContextFactory(_connectionString), true, false); + { + var db = new EntityFrameworkPersistenceProvider(new SqliteContextFactory(SqliteSetup.ConnectionString), true, false); db.EnsureStoreExists(); return db; } From 440d198380cb619c87a0609116e5492b0fe62d3f Mon Sep 17 00:00:00 2001 From: Martin Dennhardt Date: Mon, 8 Aug 2022 12:38:56 +0200 Subject: [PATCH 3/6] Refactor StopScenario for better readability --- .../Scenarios/StopScenario.cs | 41 ++++++++----------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/test/WorkflowCore.IntegrationTests/Scenarios/StopScenario.cs b/test/WorkflowCore.IntegrationTests/Scenarios/StopScenario.cs index f58ddebea..e487a4053 100644 --- a/test/WorkflowCore.IntegrationTests/Scenarios/StopScenario.cs +++ b/test/WorkflowCore.IntegrationTests/Scenarios/StopScenario.cs @@ -1,12 +1,11 @@ -using WorkflowCore.Interface; -using WorkflowCore.Models; -using Xunit; +using System.Threading; +using System.Threading.Tasks; using FluentAssertions; -using WorkflowCore.Testing; +using WorkflowCore.Interface; +using WorkflowCore.Models; using WorkflowCore.Models.LifeCycleEvents; -using System.Threading.Tasks; -using System.Threading; -using Moq; +using WorkflowCore.Testing; +using Xunit; namespace WorkflowCore.IntegrationTests.Scenarios { @@ -18,33 +17,29 @@ public class StopWorkflow : IWorkflow public int Version => 1; public void Build(IWorkflowBuilder builder) { - builder.StartWith(context => ExecutionResult.Next()); + builder.StartWith(context => ExecutionResult.Next()); } } - public StopScenario() - { - Setup(); - } + public StopScenario() => Setup(); [Fact] public async Task Scenario() { var tcs = new TaskCompletionSource(); - Host.OnLifeCycleEvent += (evt) => OnLifeCycleEvent(evt, tcs); - var workflowId = StartWorkflow(null); + Host.OnLifeCycleEvent += async (evt) => + { + if (evt is WorkflowCompleted) + { + await Host.StopAsync(CancellationToken.None); + tcs.SetResult(default); + } + }; + var workflowId = StartWorkflow(default); await tcs.Task; - GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); - } - private async void OnLifeCycleEvent(LifeCycleEvent evt, TaskCompletionSource tcs) - { - if (evt is WorkflowCompleted) - { - await Host.StopAsync(CancellationToken.None); - tcs.SetResult(new()); - } + GetStatus(workflowId).Should().Be(WorkflowStatus.Complete); } protected override void Dispose(bool disposing) { } From d3d3c8595f5c3e36faf69a98535d4ab211b357a2 Mon Sep 17 00:00:00 2001 From: Martin Dennhardt Date: Mon, 8 Aug 2022 12:39:02 +0200 Subject: [PATCH 4/6] Bump Docker.DotNet to 3.125.10 --- test/Docker.Testify/Docker.Testify.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Docker.Testify/Docker.Testify.csproj b/test/Docker.Testify/Docker.Testify.csproj index a83648df4..cff2d13e8 100644 --- a/test/Docker.Testify/Docker.Testify.csproj +++ b/test/Docker.Testify/Docker.Testify.csproj @@ -1,7 +1,7 @@  - + From 86a041a75365e18c750d3a5c1eaf2db7a8ea5de3 Mon Sep 17 00:00:00 2001 From: Martin Dennhardt Date: Mon, 8 Aug 2022 12:39:07 +0200 Subject: [PATCH 5/6] Remove CancellationToken from persistence ops Writing persistence operations can no longer be cancelled to prevent data loss when cancelling a workflow. Fixes #953, #1032 --- .../Interface/Persistence/IEventRepository.cs | 6 +-- .../Persistence/IPersistenceProvider.cs | 2 +- .../Persistence/ISubscriptionRepository.cs | 8 +-- .../Persistence/IWorkflowRepository.cs | 4 +- .../Services/BackgroundTasks/EventConsumer.cs | 8 +-- .../BackgroundTasks/WorkflowConsumer.cs | 8 +-- .../MemoryPersistenceProvider.cs | 20 +++---- .../TransientMemoryPersistenceProvider.cs | 20 +++---- .../Services/SyncWorkflowRunner.cs | 6 +-- .../EntityFrameworkPersistenceProvider.cs | 52 +++++++++---------- .../Services/MongoPersistenceProvider.cs | 40 +++++++------- .../Services/RavendbPersistenceProvider.cs | 44 ++++++++-------- .../Services/DynamoPersistenceProvider.cs | 40 +++++++------- .../Services/CosmosDbPersistenceProvider.cs | 44 ++++++++-------- .../Services/RedisPersistenceProvider.cs | 24 ++++----- 15 files changed, 163 insertions(+), 163 deletions(-) diff --git a/src/WorkflowCore/Interface/Persistence/IEventRepository.cs b/src/WorkflowCore/Interface/Persistence/IEventRepository.cs index 75651c24b..3de29cdbd 100644 --- a/src/WorkflowCore/Interface/Persistence/IEventRepository.cs +++ b/src/WorkflowCore/Interface/Persistence/IEventRepository.cs @@ -8,7 +8,7 @@ namespace WorkflowCore.Interface { public interface IEventRepository { - Task CreateEvent(Event newEvent, CancellationToken cancellationToken = default); + Task CreateEvent(Event newEvent); Task GetEvent(string id, CancellationToken cancellationToken = default); @@ -16,9 +16,9 @@ public interface IEventRepository Task> GetEvents(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default); - Task MarkEventProcessed(string id, CancellationToken cancellationToken = default); + Task MarkEventProcessed(string id); - Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default); + Task MarkEventUnprocessed(string id); } } diff --git a/src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs b/src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs index 4b83f5920..57fd630a1 100644 --- a/src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs +++ b/src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs @@ -9,7 +9,7 @@ namespace WorkflowCore.Interface public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository, IScheduledCommandRepository { - Task PersistErrors(IEnumerable errors, CancellationToken cancellationToken = default); + Task PersistErrors(IEnumerable errors); void EnsureStoreExists(); diff --git a/src/WorkflowCore/Interface/Persistence/ISubscriptionRepository.cs b/src/WorkflowCore/Interface/Persistence/ISubscriptionRepository.cs index 2f22a45f4..4206fc43a 100644 --- a/src/WorkflowCore/Interface/Persistence/ISubscriptionRepository.cs +++ b/src/WorkflowCore/Interface/Persistence/ISubscriptionRepository.cs @@ -8,19 +8,19 @@ namespace WorkflowCore.Interface { public interface ISubscriptionRepository { - Task CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default); + Task CreateEventSubscription(EventSubscription subscription); Task> GetSubscriptions(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default); - Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default); + Task TerminateSubscription(string eventSubscriptionId); Task GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default); Task GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken = default); - Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default); + Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry); - Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default); + Task ClearSubscriptionToken(string eventSubscriptionId, string token); } } diff --git a/src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs b/src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs index 1da4275cd..b4612b430 100644 --- a/src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs +++ b/src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs @@ -8,9 +8,9 @@ namespace WorkflowCore.Interface { public interface IWorkflowRepository { - Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default); + Task CreateNewWorkflow(WorkflowInstance workflow); - Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default); + Task PersistWorkflow(WorkflowInstance workflow); Task> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default); diff --git a/src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs index dd7323b01..37cb2fce5 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs @@ -59,7 +59,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance if (activity == null) { Logger.LogWarning($"Activity already processed - {(evt.EventData as ActivityResult).SubscriptionId}"); - await _eventRepository.MarkEventProcessed(itemId, cancellationToken); + await _eventRepository.MarkEventProcessed(itemId); return; } subs = new List { activity }; @@ -77,7 +77,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance if (complete) { - await _eventRepository.MarkEventProcessed(itemId, cancellationToken); + await _eventRepository.MarkEventProcessed(itemId); } else { @@ -135,8 +135,8 @@ private async Task SeedSubscription(Event evt, EventSubscription sub, Hash p.Active = true; } workflow.NextExecution = 0; - await _workflowRepository.PersistWorkflow(workflow, cancellationToken); - await _subscriptionRepository.TerminateSubscription(sub.Id, cancellationToken); + await _workflowRepository.PersistWorkflow(workflow); + await _subscriptionRepository.TerminateSubscription(sub.Id); return true; } catch (Exception ex) diff --git a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs index b3dd4aa0b..92b4b881d 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs @@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance finally { WorkflowActivity.Enrich(result); - await _persistenceStore.PersistWorkflow(workflow, cancellationToken); + await _persistenceStore.PersistWorkflow(workflow); await QueueProvider.QueueWork(itemId, QueueType.Index); _greylist.Remove($"wf:{itemId}"); } @@ -71,7 +71,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance await SubscribeEvent(sub, _persistenceStore, cancellationToken); } - await _persistenceStore.PersistErrors(result.Errors, cancellationToken); + await _persistenceStore.PersistErrors(result.Errors); if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue) { @@ -103,7 +103,7 @@ private async Task SubscribeEvent(EventSubscription subscription, IPersistencePr //TODO: move to own class Logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId); - await persistenceStore.CreateEventSubscription(subscription, cancellationToken); + await persistenceStore.CreateEventSubscription(subscription); if (subscription.EventName != Event.EventTypeActivity) { var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken); @@ -131,7 +131,7 @@ private async Task SubscribeEvent(EventSubscription subscription, IPersistencePr else { _greylist.Remove(eventKey); - await persistenceStore.MarkEventUnprocessed(evt, cancellationToken); + await persistenceStore.MarkEventUnprocessed(evt); await QueueProvider.QueueWork(evt, QueueType.Event); } } diff --git a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs index e983f8fd5..e4d52953f 100644 --- a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs +++ b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs @@ -26,7 +26,7 @@ public class MemoryPersistenceProvider : ISingletonMemoryProvider public bool SupportsScheduledCommands => false; - public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default) + public async Task CreateNewWorkflow(WorkflowInstance workflow) { lock (_instances) { @@ -36,7 +36,7 @@ public async Task CreateNewWorkflow(WorkflowInstance workflow, Cancellat } } - public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default) + public async Task PersistWorkflow(WorkflowInstance workflow) { lock (_instances) { @@ -107,7 +107,7 @@ public async Task> GetWorkflowInstances(WorkflowSt } - public async Task CreateEventSubscription(EventSubscription subscription, CancellationToken _ = default) + public async Task CreateEventSubscription(EventSubscription subscription) { lock (_subscriptions) { @@ -126,7 +126,7 @@ public async Task> GetSubscriptions(string eventN } } - public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default) + public async Task TerminateSubscription(string eventSubscriptionId) { lock (_subscriptions) { @@ -154,7 +154,7 @@ public Task GetFirstOpenSubscription(string eventName, string } } - public Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default) + public Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry) { lock (_subscriptions) { @@ -167,7 +167,7 @@ public Task SetSubscriptionToken(string eventSubscriptionId, string token, } } - public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default) + public Task ClearSubscriptionToken(string eventSubscriptionId, string token) { lock (_subscriptions) { @@ -186,7 +186,7 @@ public void EnsureStoreExists() { } - public async Task CreateEvent(Event newEvent, CancellationToken _ = default) + public async Task CreateEvent(Event newEvent) { lock (_events) { @@ -196,7 +196,7 @@ public async Task CreateEvent(Event newEvent, CancellationToken _ = defa } } - public async Task MarkEventProcessed(string id, CancellationToken _ = default) + public async Task MarkEventProcessed(string id) { lock (_events) { @@ -238,7 +238,7 @@ public async Task> GetEvents(string eventName, string eventK } } - public async Task MarkEventUnprocessed(string id, CancellationToken _ = default) + public async Task MarkEventUnprocessed(string id) { lock (_events) { @@ -250,7 +250,7 @@ public async Task MarkEventUnprocessed(string id, CancellationToken _ = default) } } - public async Task PersistErrors(IEnumerable errors, CancellationToken _ = default) + public async Task PersistErrors(IEnumerable errors) { lock (errors) { diff --git a/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs b/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs index 0a94c6f8e..f5d458ef7 100644 --- a/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs +++ b/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs @@ -18,11 +18,11 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService) _innerService = innerService; } - public Task CreateEvent(Event newEvent, CancellationToken _ = default) => _innerService.CreateEvent(newEvent); + public Task CreateEvent(Event newEvent) => _innerService.CreateEvent(newEvent); - public Task CreateEventSubscription(EventSubscription subscription, CancellationToken _ = default) => _innerService.CreateEventSubscription(subscription); + public Task CreateEventSubscription(EventSubscription subscription) => _innerService.CreateEventSubscription(subscription); - public Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default) => _innerService.CreateNewWorkflow(workflow); + public Task CreateNewWorkflow(WorkflowInstance workflow) => _innerService.CreateNewWorkflow(workflow); public void EnsureStoreExists() => _innerService.EnsureStoreExists(); @@ -42,22 +42,22 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService) public Task> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take) => _innerService.GetWorkflowInstances(status, type, createdFrom, createdTo, skip, take); - public Task MarkEventProcessed(string id, CancellationToken _ = default) => _innerService.MarkEventProcessed(id); + public Task MarkEventProcessed(string id) => _innerService.MarkEventProcessed(id); - public Task MarkEventUnprocessed(string id, CancellationToken _ = default) => _innerService.MarkEventUnprocessed(id); + public Task MarkEventUnprocessed(string id) => _innerService.MarkEventUnprocessed(id); - public Task PersistErrors(IEnumerable errors, CancellationToken _ = default) => _innerService.PersistErrors(errors); + public Task PersistErrors(IEnumerable errors) => _innerService.PersistErrors(errors); - public Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default) => _innerService.PersistWorkflow(workflow); + public Task PersistWorkflow(WorkflowInstance workflow) => _innerService.PersistWorkflow(workflow); - public Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.TerminateSubscription(eventSubscriptionId); + public Task TerminateSubscription(string eventSubscriptionId) => _innerService.TerminateSubscription(eventSubscriptionId); public Task GetSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.GetSubscription(eventSubscriptionId); public Task GetFirstOpenSubscription(string eventName, string eventKey, DateTime asOf, CancellationToken _ = default) => _innerService.GetFirstOpenSubscription(eventName, eventKey, asOf); - public Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry); + public Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry); - public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token); + public Task ClearSubscriptionToken(string eventSubscriptionId, string token) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token); public Task ScheduleCommand(ScheduledCommand command) { diff --git a/src/WorkflowCore/Services/SyncWorkflowRunner.cs b/src/WorkflowCore/Services/SyncWorkflowRunner.cs index 5317a8966..4a4521829 100644 --- a/src/WorkflowCore/Services/SyncWorkflowRunner.cs +++ b/src/WorkflowCore/Services/SyncWorkflowRunner.cs @@ -72,7 +72,7 @@ public async Task RunWorkflowSync(string workflowId, in var id = Guid.NewGuid().ToString(); if (persistSate) - id = await _persistenceStore.CreateNewWorkflow(wf, token); + id = await _persistenceStore.CreateNewWorkflow(wf); else wf.Id = id; @@ -89,7 +89,7 @@ public async Task RunWorkflowSync(string workflowId, in { await _executor.Execute(wf, token); if (persistSate) - await _persistenceStore.PersistWorkflow(wf, token); + await _persistenceStore.PersistWorkflow(wf); } } finally @@ -103,4 +103,4 @@ public async Task RunWorkflowSync(string workflowId, in return wf; } } -} \ No newline at end of file +} diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs index 44798a0c9..8d3a9957c 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs @@ -26,26 +26,26 @@ public EntityFrameworkPersistenceProvider(IWorkflowDbContextFactory contextFacto _canMigrateDB = canMigrateDB; } - public async Task CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default) + public async Task CreateEventSubscription(EventSubscription subscription) { using (var db = ConstructDbContext()) { subscription.Id = Guid.NewGuid().ToString(); var persistable = subscription.ToPersistable(); var result = db.Set().Add(persistable); - await db.SaveChangesAsync(cancellationToken); + await db.SaveChangesAsync(); return subscription.Id; } } - public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default) + public async Task CreateNewWorkflow(WorkflowInstance workflow) { using (var db = ConstructDbContext()) { workflow.Id = Guid.NewGuid().ToString(); var persistable = workflow.ToPersistable(); var result = db.Set().Add(persistable); - await db.SaveChangesAsync(cancellationToken); + await db.SaveChangesAsync(); return workflow.Id; } } @@ -134,7 +134,7 @@ public async Task> GetWorkflowInstances(IEnumerabl } } - public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default) + public async Task PersistWorkflow(WorkflowInstance workflow) { using (var db = ConstructDbContext()) { @@ -145,21 +145,21 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c .ThenInclude(ep => ep.ExtensionAttributes) .Include(wf => wf.ExecutionPointers) .AsTracking() - .FirstAsync(cancellationToken); + .FirstAsync(); var persistable = workflow.ToPersistable(existingEntity); - await db.SaveChangesAsync(cancellationToken); + await db.SaveChangesAsync(); } } - public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default) + public async Task TerminateSubscription(string eventSubscriptionId) { using (var db = ConstructDbContext()) { var uid = new Guid(eventSubscriptionId); - var existing = await db.Set().FirstAsync(x => x.SubscriptionId == uid, cancellationToken); + var existing = await db.Set().FirstAsync(x => x.SubscriptionId == uid); db.Set().Remove(existing); - await db.SaveChangesAsync(cancellationToken); + await db.SaveChangesAsync(); } } @@ -194,14 +194,14 @@ public async Task> GetSubscriptions(string eventN } } - public async Task CreateEvent(Event newEvent, CancellationToken cancellationToken = default) + public async Task CreateEvent(Event newEvent) { using (var db = ConstructDbContext()) { newEvent.Id = Guid.NewGuid().ToString(); var persistable = newEvent.ToPersistable(); var result = db.Set().Add(persistable); - await db.SaveChangesAsync(cancellationToken); + await db.SaveChangesAsync(); return newEvent.Id; } } @@ -237,7 +237,7 @@ public async Task> GetRunnableEvents(DateTime asAt, Cancella } } - public async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default) + public async Task MarkEventProcessed(string id) { using (var db = ConstructDbContext()) { @@ -245,10 +245,10 @@ public async Task MarkEventProcessed(string id, CancellationToken cancellationTo var existingEntity = await db.Set() .Where(x => x.EventId == uid) .AsTracking() - .FirstAsync(cancellationToken); + .FirstAsync(); existingEntity.IsProcessed = true; - await db.SaveChangesAsync(cancellationToken); + await db.SaveChangesAsync(); } } @@ -271,7 +271,7 @@ public async Task> GetEvents(string eventName, string eventK } } - public async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default) + public async Task MarkEventUnprocessed(string id) { using (var db = ConstructDbContext()) { @@ -279,14 +279,14 @@ public async Task MarkEventUnprocessed(string id, CancellationToken cancellation var existingEntity = await db.Set() .Where(x => x.EventId == uid) .AsTracking() - .FirstAsync(cancellationToken); + .FirstAsync(); existingEntity.IsProcessed = false; - await db.SaveChangesAsync(cancellationToken); + await db.SaveChangesAsync(); } } - public async Task PersistErrors(IEnumerable errors, CancellationToken cancellationToken = default) + public async Task PersistErrors(IEnumerable errors) { using (var db = ConstructDbContext()) { @@ -297,7 +297,7 @@ public async Task PersistErrors(IEnumerable errors, Cancellation { db.Set().Add(error.ToPersistable()); } - await db.SaveChangesAsync(cancellationToken); + await db.SaveChangesAsync(); } } @@ -329,7 +329,7 @@ public async Task GetFirstOpenSubscription(string eventName, } } - public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default) + public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry) { using (var db = ConstructDbContext()) { @@ -337,18 +337,18 @@ public async Task SetSubscriptionToken(string eventSubscriptionId, string var existingEntity = await db.Set() .Where(x => x.SubscriptionId == uid) .AsTracking() - .FirstAsync(cancellationToken); + .FirstAsync(); existingEntity.ExternalToken = token; existingEntity.ExternalWorkerId = workerId; existingEntity.ExternalTokenExpiry = expiry; - await db.SaveChangesAsync(cancellationToken); + await db.SaveChangesAsync(); return true; } } - public async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default) + public async Task ClearSubscriptionToken(string eventSubscriptionId, string token) { using (var db = ConstructDbContext()) { @@ -356,7 +356,7 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke var existingEntity = await db.Set() .Where(x => x.SubscriptionId == uid) .AsTracking() - .FirstAsync(cancellationToken); + .FirstAsync(); if (existingEntity.ExternalToken != token) throw new InvalidOperationException(); @@ -364,7 +364,7 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke existingEntity.ExternalToken = null; existingEntity.ExternalWorkerId = null; existingEntity.ExternalTokenExpiry = null; - await db.SaveChangesAsync(cancellationToken); + await db.SaveChangesAsync(); } } diff --git a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs index 06be13355..8c163cd19 100644 --- a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs @@ -138,15 +138,15 @@ static void CreateIndexes(MongoPersistenceProvider instance) private IMongoCollection ScheduledCommands => _database.GetCollection("wfc.scheduled_commands"); - public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default) + public async Task CreateNewWorkflow(WorkflowInstance workflow) { - await WorkflowInstances.InsertOneAsync(workflow, cancellationToken: cancellationToken); + await WorkflowInstances.InsertOneAsync(workflow); return workflow.Id; } - public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default) + public async Task PersistWorkflow(WorkflowInstance workflow) { - await WorkflowInstances.ReplaceOneAsync(x => x.Id == workflow.Id, workflow, cancellationToken: cancellationToken); + await WorkflowInstances.ReplaceOneAsync(x => x.Id == workflow.Id, workflow); } public async Task> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default) @@ -195,15 +195,15 @@ public async Task> GetWorkflowInstances(WorkflowSt return await result.Skip(skip).Take(take).ToListAsync(); } - public async Task CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default) + public async Task CreateEventSubscription(EventSubscription subscription) { - await EventSubscriptions.InsertOneAsync(subscription, cancellationToken: cancellationToken); + await EventSubscriptions.InsertOneAsync(subscription); return subscription.Id; } - public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default) + public async Task TerminateSubscription(string eventSubscriptionId) { - await EventSubscriptions.DeleteOneAsync(x => x.Id == eventSubscriptionId, cancellationToken); + await EventSubscriptions.DeleteOneAsync(x => x.Id == eventSubscriptionId); } public async Task GetSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default) @@ -220,25 +220,25 @@ public async Task GetFirstOpenSubscription(string eventName, return await query.FirstOrDefaultAsync(cancellationToken); } - public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default) + public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry) { var update = Builders.Update .Set(x => x.ExternalToken, token) .Set(x => x.ExternalTokenExpiry, expiry) .Set(x => x.ExternalWorkerId, workerId); - var result = await EventSubscriptions.UpdateOneAsync(x => x.Id == eventSubscriptionId && x.ExternalToken == null, update, cancellationToken: cancellationToken); + var result = await EventSubscriptions.UpdateOneAsync(x => x.Id == eventSubscriptionId && x.ExternalToken == null, update); return (result.ModifiedCount > 0); } - public async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default) + public async Task ClearSubscriptionToken(string eventSubscriptionId, string token) { var update = Builders.Update .Set(x => x.ExternalToken, null) .Set(x => x.ExternalTokenExpiry, null) .Set(x => x.ExternalWorkerId, null); - await EventSubscriptions.UpdateOneAsync(x => x.Id == eventSubscriptionId && x.ExternalToken == token, update, cancellationToken: cancellationToken); + await EventSubscriptions.UpdateOneAsync(x => x.Id == eventSubscriptionId && x.ExternalToken == token, update); } public void EnsureStoreExists() @@ -254,9 +254,9 @@ public async Task> GetSubscriptions(string eventN return await query.ToListAsync(cancellationToken); } - public async Task CreateEvent(Event newEvent, CancellationToken cancellationToken = default) + public async Task CreateEvent(Event newEvent) { - await Events.InsertOneAsync(newEvent, cancellationToken: cancellationToken); + await Events.InsertOneAsync(newEvent); return newEvent.Id; } @@ -276,12 +276,12 @@ public async Task> GetRunnableEvents(DateTime asAt, Cancella return await query.ToListAsync(cancellationToken); } - public async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default) + public async Task MarkEventProcessed(string id) { var update = Builders.Update .Set(x => x.IsProcessed, true); - await Events.UpdateOneAsync(x => x.Id == id, update, cancellationToken: cancellationToken); + await Events.UpdateOneAsync(x => x.Id == id, update); } public async Task> GetEvents(string eventName, string eventKey, DateTime asOf, CancellationToken cancellationToken) @@ -293,18 +293,18 @@ public async Task> GetEvents(string eventName, string eventK return await query.ToListAsync(cancellationToken); } - public async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default) + public async Task MarkEventUnprocessed(string id) { var update = Builders.Update .Set(x => x.IsProcessed, false); - await Events.UpdateOneAsync(x => x.Id == id, update, cancellationToken: cancellationToken); + await Events.UpdateOneAsync(x => x.Id == id, update); } - public async Task PersistErrors(IEnumerable errors, CancellationToken cancellationToken = default) + public async Task PersistErrors(IEnumerable errors) { if (errors.Any()) - await ExecutionErrors.InsertManyAsync(errors, cancellationToken: cancellationToken); + await ExecutionErrors.InsertManyAsync(errors); } public bool SupportsScheduledCommands => true; diff --git a/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs index 695b8e31f..19ad37fad 100644 --- a/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs @@ -41,18 +41,18 @@ static void CreateIndexes(RavendbPersistenceProvider instance) } } - public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default) + public async Task CreateNewWorkflow(WorkflowInstance workflow) { using (var session = _database.OpenAsyncSession()) { - await session.StoreAsync(workflow, cancellationToken); + await session.StoreAsync(workflow); var id = workflow.Id; - await session.SaveChangesAsync(cancellationToken); + await session.SaveChangesAsync(); return id; } } - public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default) + public async Task PersistWorkflow(WorkflowInstance workflow) { using (var session = _database.OpenAsyncSession()) { @@ -67,7 +67,7 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c session.Advanced.Patch(workflow.Id, x => x.CreateTime, workflow.CreateTime); session.Advanced.Patch(workflow.Id, x => x.CompleteTime, workflow.CompleteTime); - await session.SaveChangesAsync(cancellationToken); + await session.SaveChangesAsync(); } } @@ -130,18 +130,18 @@ public async Task> GetWorkflowInstances(WorkflowSt } } - public async Task CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default) + public async Task CreateEventSubscription(EventSubscription subscription) { using (var session = _database.OpenAsyncSession()) { - await session.StoreAsync(subscription, cancellationToken); + await session.StoreAsync(subscription); var id = subscription.Id; - await session.SaveChangesAsync(cancellationToken); + await session.SaveChangesAsync(); return id; } } - public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default) + public async Task TerminateSubscription(string eventSubscriptionId) { using (var session = _database.OpenAsyncSession()) { @@ -174,7 +174,7 @@ public async Task GetFirstOpenSubscription(string eventName, } } - public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default) + public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry) { try { @@ -189,7 +189,7 @@ public async Task SetSubscriptionToken(string eventSubscriptionId, string strbuilder.Append($"e.ExternalWorkerId = 'workerId'"); strbuilder.Append("}"); - var operation = await _database.Operations.SendAsync(new PatchByQueryOperation(strbuilder.ToString()), token: cancellationToken); + var operation = await _database.Operations.SendAsync(new PatchByQueryOperation(strbuilder.ToString())); operation.WaitForCompletion(); return true; } @@ -199,7 +199,7 @@ public async Task SetSubscriptionToken(string eventSubscriptionId, string } } - public async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default) + public async Task ClearSubscriptionToken(string eventSubscriptionId, string token) { try { @@ -214,7 +214,7 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke strbuilder.Append($"e.ExternalWorkerId = null"); strbuilder.Append("}"); - var operation = await _database.Operations.SendAsync(new PatchByQueryOperation(strbuilder.ToString()), token: cancellationToken); + var operation = await _database.Operations.SendAsync(new PatchByQueryOperation(strbuilder.ToString())); operation.WaitForCompletion(); } catch (Exception e) @@ -239,13 +239,13 @@ public async Task> GetSubscriptions(string eventN } } - public async Task CreateEvent(Event newEvent, CancellationToken cancellationToken = default) + public async Task CreateEvent(Event newEvent) { using (var session = _database.OpenAsyncSession()) { - await session.StoreAsync(newEvent, cancellationToken); + await session.StoreAsync(newEvent); var id = newEvent.Id; - await session.SaveChangesAsync(cancellationToken); + await session.SaveChangesAsync(); return id; } } @@ -272,13 +272,13 @@ public async Task> GetRunnableEvents(DateTime asAt, Cancella } } - public async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default) + public async Task MarkEventProcessed(string id) { using (var session = _database.OpenAsyncSession()) { session.Advanced.Patch(id, x => x.IsProcessed, true); - await session.SaveChangesAsync(cancellationToken); + await session.SaveChangesAsync(); } } @@ -297,21 +297,21 @@ public async Task> GetEvents(string eventName, string eventK } } - public async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default) + public async Task MarkEventUnprocessed(string id) { using (var session = _database.OpenAsyncSession()) { session.Advanced.Patch(id, x => x.IsProcessed, false); - await session.SaveChangesAsync(cancellationToken); + await session.SaveChangesAsync(); } } - public async Task PersistErrors(IEnumerable errors, CancellationToken cancellationToken = default) + public async Task PersistErrors(IEnumerable errors) { if (errors.Any()) { - var blk = _database.BulkInsert(token: cancellationToken); + var blk = _database.BulkInsert(); foreach (var error in errors) await blk.StoreAsync(error); diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs index 1126b6c20..9a10f9a8a 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs @@ -34,7 +34,7 @@ public DynamoPersistenceProvider(AWSCredentials credentials, AmazonDynamoDBConfi _provisioner = provisioner; } - public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default) + public async Task CreateNewWorkflow(WorkflowInstance workflow) { workflow.Id = Guid.NewGuid().ToString(); @@ -45,12 +45,12 @@ public async Task CreateNewWorkflow(WorkflowInstance workflow, Cancellat ConditionExpression = "attribute_not_exists(id)" }; - var _ = await _client.PutItemAsync(req, cancellationToken); + var _ = await _client.PutItemAsync(req); return workflow.Id; } - public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default) + public async Task PersistWorkflow(WorkflowInstance workflow) { var request = new PutItemRequest { @@ -58,7 +58,7 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c Item = workflow.ToDynamoMap() }; - var response = await _client.PutItemAsync(request, cancellationToken); + var response = await _client.PutItemAsync(request); } public async Task> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default) @@ -163,7 +163,7 @@ public async Task> GetWorkflowInstances(IEnumerabl return result.Select(i => i.ToWorkflowInstance()); } - public async Task CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken = default) + public async Task CreateEventSubscription(EventSubscription subscription) { subscription.Id = Guid.NewGuid().ToString(); @@ -174,7 +174,7 @@ public async Task CreateEventSubscription(EventSubscription subscription ConditionExpression = "attribute_not_exists(id)" }; - var response = await _client.PutItemAsync(req, cancellationToken); + var response = await _client.PutItemAsync(req); return subscription.Id; } @@ -215,7 +215,7 @@ public async Task> GetSubscriptions(string eventN return result; } - public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default) + public async Task TerminateSubscription(string eventSubscriptionId) { var request = new DeleteItemRequest { @@ -225,10 +225,10 @@ public async Task TerminateSubscription(string eventSubscriptionId, Cancellation { "id", new AttributeValue(eventSubscriptionId) } } }; - await _client.DeleteItemAsync(request, cancellationToken); + await _client.DeleteItemAsync(request); } - public async Task CreateEvent(Event newEvent, CancellationToken cancellationToken = default) + public async Task CreateEvent(Event newEvent) { newEvent.Id = Guid.NewGuid().ToString(); @@ -239,7 +239,7 @@ public async Task CreateEvent(Event newEvent, CancellationToken cancella ConditionExpression = "attribute_not_exists(id)" }; - var _ = await _client.PutItemAsync(req, cancellationToken); + var _ = await _client.PutItemAsync(req); return newEvent.Id; } @@ -329,7 +329,7 @@ public async Task> GetEvents(string eventName, string eventK return result; } - public async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default) + public async Task MarkEventProcessed(string id) { var request = new UpdateItemRequest { @@ -340,10 +340,10 @@ public async Task MarkEventProcessed(string id, CancellationToken cancellationTo }, UpdateExpression = "REMOVE not_processed" }; - await _client.UpdateItemAsync(request, cancellationToken); + await _client.UpdateItemAsync(request); } - public async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default) + public async Task MarkEventUnprocessed(string id) { var request = new UpdateItemRequest { @@ -358,10 +358,10 @@ public async Task MarkEventUnprocessed(string id, CancellationToken cancellation { ":n" , new AttributeValue { N = 1.ToString() } } } }; - await _client.UpdateItemAsync(request, cancellationToken); + await _client.UpdateItemAsync(request); } - public Task PersistErrors(IEnumerable errors, CancellationToken _ = default) + public Task PersistErrors(IEnumerable errors) { //TODO return Task.CompletedTask; @@ -423,7 +423,7 @@ public async Task GetFirstOpenSubscription(string eventName, return result.FirstOrDefault(); } - public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken = default) + public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry) { var request = new UpdateItemRequest { @@ -443,7 +443,7 @@ public async Task SetSubscriptionToken(string eventSubscriptionId, string }; try { - await _client.UpdateItemAsync(request, cancellationToken); + await _client.UpdateItemAsync(request); return true; } catch (ConditionalCheckFailedException) @@ -452,7 +452,7 @@ public async Task SetSubscriptionToken(string eventSubscriptionId, string } } - public async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default) + public async Task ClearSubscriptionToken(string eventSubscriptionId, string token) { var request = new UpdateItemRequest { @@ -469,7 +469,7 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke } }; - await _client.UpdateItemAsync(request, cancellationToken); + await _client.UpdateItemAsync(request); } public Task ScheduleCommand(ScheduledCommand command) @@ -482,4 +482,4 @@ public Task ProcessCommands(DateTimeOffset asOf, Func ac throw new NotImplementedException(); } } -} \ No newline at end of file +} diff --git a/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs index 644009df2..20c9fa850 100644 --- a/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs @@ -37,7 +37,7 @@ public CosmosDbPersistenceProvider( public bool SupportsScheduledCommands => false; - public async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken cancellationToken = default) + public async Task ClearSubscriptionToken(string eventSubscriptionId, string token) { var existing = await _subscriptionContainer.Value.ReadItemAsync(eventSubscriptionId, new PartitionKey(eventSubscriptionId)); @@ -47,27 +47,27 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke existing.Resource.ExternalWorkerId = null; existing.Resource.ExternalTokenExpiry = null; - await _subscriptionContainer.Value.ReplaceItemAsync(existing.Resource, eventSubscriptionId, cancellationToken: cancellationToken); + await _subscriptionContainer.Value.ReplaceItemAsync(existing.Resource, eventSubscriptionId); } - public async Task CreateEvent(Event newEvent, CancellationToken cancellationToken) + public async Task CreateEvent(Event newEvent) { newEvent.Id = Guid.NewGuid().ToString(); - var result = await _eventContainer.Value.CreateItemAsync(PersistedEvent.FromInstance(newEvent), cancellationToken: cancellationToken); + var result = await _eventContainer.Value.CreateItemAsync(PersistedEvent.FromInstance(newEvent)); return result.Resource.id; } - public async Task CreateEventSubscription(EventSubscription subscription, CancellationToken cancellationToken) + public async Task CreateEventSubscription(EventSubscription subscription) { subscription.Id = Guid.NewGuid().ToString(); - var result = await _subscriptionContainer.Value.CreateItemAsync(PersistedSubscription.FromInstance(subscription), cancellationToken: cancellationToken); + var result = await _subscriptionContainer.Value.CreateItemAsync(PersistedSubscription.FromInstance(subscription)); return result.Resource.id; } - public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken) + public async Task CreateNewWorkflow(WorkflowInstance workflow) { workflow.Id = Guid.NewGuid().ToString(); - var result = await _workflowContainer.Value.CreateItemAsync(PersistedWorkflow.FromInstance(workflow), cancellationToken: cancellationToken); + var result = await _workflowContainer.Value.CreateItemAsync(PersistedWorkflow.FromInstance(workflow)); return result.Resource.id; } @@ -203,28 +203,28 @@ public Task> GetWorkflowInstances(IEnumerable(id, new PartitionKey(id), cancellationToken: cancellationToken); + var evt = await _eventContainer.Value.ReadItemAsync(id, new PartitionKey(id)); evt.Resource.IsProcessed = true; - await _eventContainer.Value.ReplaceItemAsync(evt.Resource, id, cancellationToken: cancellationToken); + await _eventContainer.Value.ReplaceItemAsync(evt.Resource, id); } - public async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken) + public async Task MarkEventUnprocessed(string id) { - var evt = await _eventContainer.Value.ReadItemAsync(id, new PartitionKey(id), cancellationToken: cancellationToken); + var evt = await _eventContainer.Value.ReadItemAsync(id, new PartitionKey(id)); evt.Resource.IsProcessed = false; - await _eventContainer.Value.ReplaceItemAsync(evt.Resource, id, cancellationToken: cancellationToken); + await _eventContainer.Value.ReplaceItemAsync(evt.Resource, id); } - public Task PersistErrors(IEnumerable errors, CancellationToken _ = default) + public Task PersistErrors(IEnumerable errors) { return Task.CompletedTask; } - public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken) + public async Task PersistWorkflow(WorkflowInstance workflow) { - await _workflowContainer.Value.UpsertItemAsync(PersistedWorkflow.FromInstance(workflow), cancellationToken: cancellationToken); + await _workflowContainer.Value.UpsertItemAsync(PersistedWorkflow.FromInstance(workflow)); } public Task ProcessCommands(DateTimeOffset asOf, Func action, CancellationToken cancellationToken = default) @@ -237,22 +237,22 @@ public Task ScheduleCommand(ScheduledCommand command) throw new NotImplementedException(); } - public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken cancellationToken) + public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry) { - var sub = await _subscriptionContainer.Value.ReadItemAsync(eventSubscriptionId, new PartitionKey(eventSubscriptionId), cancellationToken: cancellationToken); + var sub = await _subscriptionContainer.Value.ReadItemAsync(eventSubscriptionId, new PartitionKey(eventSubscriptionId)); var existingEntity = sub.Resource; existingEntity.ExternalToken = token; existingEntity.ExternalWorkerId = workerId; existingEntity.ExternalTokenExpiry = expiry; - await _subscriptionContainer.Value.ReplaceItemAsync(existingEntity, eventSubscriptionId, cancellationToken: cancellationToken); + await _subscriptionContainer.Value.ReplaceItemAsync(existingEntity, eventSubscriptionId); return true; } - public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken) + public async Task TerminateSubscription(string eventSubscriptionId) { - await _subscriptionContainer.Value.DeleteItemAsync(eventSubscriptionId, new PartitionKey(eventSubscriptionId), cancellationToken: cancellationToken); + await _subscriptionContainer.Value.DeleteItemAsync(eventSubscriptionId, new PartitionKey(eventSubscriptionId)); } } } diff --git a/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs index cde65e2e5..c0c5db9bf 100644 --- a/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs @@ -40,14 +40,14 @@ public RedisPersistenceProvider(string connectionString, string prefix, bool rem _removeComplete = removeComplete; } - public async Task CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default) + public async Task CreateNewWorkflow(WorkflowInstance workflow) { workflow.Id = Guid.NewGuid().ToString(); await PersistWorkflow(workflow); return workflow.Id; } - public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default) + public async Task PersistWorkflow(WorkflowInstance workflow) { var str = JsonConvert.SerializeObject(workflow, _serializerSettings); await _redis.HashSetAsync($"{_prefix}.{WORKFLOW_SET}", workflow.Id, str); @@ -96,7 +96,7 @@ public async Task> GetWorkflowInstances(IEnumerabl return raw.Select(r => JsonConvert.DeserializeObject(r, _serializerSettings)); } - public async Task CreateEventSubscription(EventSubscription subscription, CancellationToken _ = default) + public async Task CreateEventSubscription(EventSubscription subscription) { subscription.Id = Guid.NewGuid().ToString(); var str = JsonConvert.SerializeObject(subscription, _serializerSettings); @@ -121,7 +121,7 @@ public async Task> GetSubscriptions(string eventN return result; } - public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default) + public async Task TerminateSubscription(string eventSubscriptionId) { var existingRaw = await _redis.HashGetAsync($"{_prefix}.{SUBSCRIPTION_SET}", eventSubscriptionId); var existing = JsonConvert.DeserializeObject(existingRaw, _serializerSettings); @@ -140,7 +140,7 @@ public async Task GetFirstOpenSubscription(string eventName, return (await GetSubscriptions(eventName, eventKey, asOf, cancellationToken)).FirstOrDefault(sub => string.IsNullOrEmpty(sub.ExternalToken)); } - public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default) + public async Task SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry) { var item = JsonConvert.DeserializeObject(await _redis.HashGetAsync($"{_prefix}.{SUBSCRIPTION_SET}", eventSubscriptionId), _serializerSettings); if (item.ExternalToken != null) @@ -153,7 +153,7 @@ public async Task SetSubscriptionToken(string eventSubscriptionId, string return true; } - public async Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default) + public async Task ClearSubscriptionToken(string eventSubscriptionId, string token) { var item = JsonConvert.DeserializeObject(await _redis.HashGetAsync($"{_prefix}.{SUBSCRIPTION_SET}", eventSubscriptionId), _serializerSettings); if (item.ExternalToken != token) @@ -165,7 +165,7 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke await _redis.HashSetAsync($"{_prefix}.{SUBSCRIPTION_SET}", eventSubscriptionId, str); } - public async Task CreateEvent(Event newEvent, CancellationToken _ = default) + public async Task CreateEvent(Event newEvent) { newEvent.Id = Guid.NewGuid().ToString(); var str = JsonConvert.SerializeObject(newEvent, _serializerSettings); @@ -208,25 +208,25 @@ public async Task> GetEvents(string eventName, string eventK return result; } - public async Task MarkEventProcessed(string id, CancellationToken cancellationToken = default) + public async Task MarkEventProcessed(string id) { - var evt = await GetEvent(id, cancellationToken); + var evt = await GetEvent(id); evt.IsProcessed = true; var str = JsonConvert.SerializeObject(evt, _serializerSettings); await _redis.HashSetAsync($"{_prefix}.{EVENT_SET}", evt.Id, str); await _redis.SortedSetRemoveAsync($"{_prefix}.{EVENT_SET}.{RUNNABLE_INDEX}", id); } - public async Task MarkEventUnprocessed(string id, CancellationToken cancellationToken = default) + public async Task MarkEventUnprocessed(string id) { - var evt = await GetEvent(id, cancellationToken); + var evt = await GetEvent(id); evt.IsProcessed = false; var str = JsonConvert.SerializeObject(evt, _serializerSettings); await _redis.HashSetAsync($"{_prefix}.{EVENT_SET}", evt.Id, str); await _redis.SortedSetAddAsync($"{_prefix}.{EVENT_SET}.{RUNNABLE_INDEX}", evt.Id, evt.EventTime.Ticks); } - public Task PersistErrors(IEnumerable errors, CancellationToken _ = default) + public Task PersistErrors(IEnumerable errors) { return Task.CompletedTask; } From c92d06ba2c6b7d0dc28649f223ce95c2546e2302 Mon Sep 17 00:00:00 2001 From: Martin Dennhardt Date: Mon, 8 Aug 2022 12:39:14 +0200 Subject: [PATCH 6/6] Bump major version due to breaking API change --- src/Directory.Build.props | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Directory.Build.props b/src/Directory.Build.props index 30816a3b6..d1c9923b7 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -4,10 +4,10 @@ https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md git https://github.com/danielgerlag/workflow-core.git - 3.6.4 - 3.6.4.0 - 3.6.4.0 + 4.0.0 + 4.0.0.0 + 4.0.0.0 https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png - 3.6.4 + 4.0.0