From 202e7d36826bda9ee73092736c26cb38ffa9a7c0 Mon Sep 17 00:00:00 2001 From: Viktor Shevchenko Date: Fri, 5 Aug 2022 14:36:33 +0300 Subject: [PATCH 1/2] Fixed reliability of WorkflowConsumer when persisting workflow --- .../Persistence/IWorkflowRepository.cs | 2 + .../BackgroundTasks/WorkflowConsumer.cs | 6 +- .../MemoryPersistenceProvider.cs | 19 ++++++ .../TransientMemoryPersistenceProvider.cs | 10 +++ .../EntityFrameworkPersistenceProvider.cs | 26 ++++++++ .../Services/MongoPersistenceProvider.cs | 11 ++++ .../Services/RavendbPersistenceProvider.cs | 41 +++++++++--- .../Services/DynamoPersistenceProvider.cs | 37 +++++++++++ .../Services/CosmosDbPersistenceProvider.cs | 10 +++ .../Services/RedisPersistenceProvider.cs | 10 +++ .../BasePersistenceFixture.cs | 66 +++++++++++++++++++ 11 files changed, 223 insertions(+), 15 deletions(-) diff --git a/src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs b/src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs index 1da4275cd..09842af7a 100644 --- a/src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs +++ b/src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs @@ -12,6 +12,8 @@ public interface IWorkflowRepository Task PersistWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default); + Task PersistWorkflow(WorkflowInstance workflow, List subscriptions, CancellationToken cancellationToken = default); + Task> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default); [Obsolete] diff --git a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs index b3dd4aa0b..ce917e41a 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs @@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance finally { WorkflowActivity.Enrich(result); - await _persistenceStore.PersistWorkflow(workflow, cancellationToken); + await _persistenceStore.PersistWorkflow(workflow, result.Subscriptions, cancellationToken); await QueueProvider.QueueWork(itemId, QueueType.Index); _greylist.Remove($"wf:{itemId}"); } @@ -100,10 +100,6 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand() private async Task SubscribeEvent(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken) { - //TODO: move to own class - Logger.LogDebug("Subscribing to event {0} {1} for workflow {2} step {3}", subscription.EventName, subscription.EventKey, subscription.WorkflowId, subscription.StepId); - - await persistenceStore.CreateEventSubscription(subscription, cancellationToken); if (subscription.EventName != Event.EventTypeActivity) { var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken); diff --git a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs index e983f8fd5..cb69c8791 100644 --- a/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs +++ b/src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs @@ -46,6 +46,25 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ } } + public async Task PersistWorkflow(WorkflowInstance workflow, List subscriptions, CancellationToken cancellationToken = default) + { + lock (_instances) + { + var existing = _instances.First(x => x.Id == workflow.Id); + _instances.Remove(existing); + _instances.Add(workflow); + + lock (_subscriptions) + { + foreach (var subscription in subscriptions) + { + subscription.Id = Guid.NewGuid().ToString(); + _subscriptions.Add(subscription); + } + } + } + } + public async Task> GetRunnableInstances(DateTime asAt, CancellationToken _ = default) { lock (_instances) diff --git a/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs b/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs index 0a94c6f8e..31ec6dbe2 100644 --- a/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs +++ b/src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs @@ -50,6 +50,16 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService) public Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default) => _innerService.PersistWorkflow(workflow); + public async Task PersistWorkflow(WorkflowInstance workflow, List subscriptions, CancellationToken cancellationToken = default) + { + await PersistWorkflow(workflow, cancellationToken); + + foreach(var subscription in subscriptions) + { + await CreateEventSubscription(subscription, cancellationToken); + } + } + public Task TerminateSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.TerminateSubscription(eventSubscriptionId); public Task GetSubscription(string eventSubscriptionId, CancellationToken _ = default) => _innerService.GetSubscription(eventSubscriptionId); diff --git a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs index 44798a0c9..22ba680f5 100644 --- a/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs @@ -151,6 +151,32 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c await db.SaveChangesAsync(cancellationToken); } } + + public async Task PersistWorkflow(WorkflowInstance workflow, List subscriptions, CancellationToken cancellationToken = default) + { + using (var db = ConstructDbContext()) + { + var uid = new Guid(workflow.Id); + var existingEntity = await db.Set() + .Where(x => x.InstanceId == uid) + .Include(wf => wf.ExecutionPointers) + .ThenInclude(ep => ep.ExtensionAttributes) + .Include(wf => wf.ExecutionPointers) + .AsTracking() + .FirstAsync(cancellationToken); + + var workflowPersistable = workflow.ToPersistable(existingEntity); + + foreach (var subscription in subscriptions) + { + subscription.Id = Guid.NewGuid().ToString(); + var subscriptionPersistable = subscription.ToPersistable(); + db.Set().Add(subscriptionPersistable); + } + + await db.SaveChangesAsync(cancellationToken); + } + } public async Task TerminateSubscription(string eventSubscriptionId, CancellationToken cancellationToken = default) { diff --git a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs index 06be13355..2824f9674 100644 --- a/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs @@ -149,6 +149,17 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c await WorkflowInstances.ReplaceOneAsync(x => x.Id == workflow.Id, workflow, cancellationToken: cancellationToken); } + public async Task PersistWorkflow(WorkflowInstance workflow, List subscriptions, CancellationToken cancellationToken = default) + { + using (var session = await _database.Client.StartSessionAsync()) + { + session.StartTransaction(); + await PersistWorkflow(workflow, cancellationToken); + await EventSubscriptions.InsertManyAsync(subscriptions, cancellationToken: cancellationToken); + await session.CommitTransactionAsync(); + } + } + public async Task> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default) { var now = asAt.ToUniversalTime().Ticks; diff --git a/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs b/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs index 695b8e31f..3e1d5e2ea 100644 --- a/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs @@ -1,6 +1,7 @@ using Raven.Client.Documents; using Raven.Client.Documents.Linq; using Raven.Client.Documents.Operations; +using Raven.Client.Documents.Session; using System; using System.Collections.Generic; using System.Linq; @@ -56,21 +57,41 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c { using (var session = _database.OpenAsyncSession()) { - session.Advanced.Patch(workflow.Id, x => x.WorkflowDefinitionId, workflow.WorkflowDefinitionId); - session.Advanced.Patch(workflow.Id, x => x.Version, workflow.Version); - session.Advanced.Patch(workflow.Id, x => x.Description, workflow.Description); - session.Advanced.Patch(workflow.Id, x => x.Reference, workflow.Reference); - session.Advanced.Patch(workflow.Id, x => x.ExecutionPointers, workflow.ExecutionPointers); - session.Advanced.Patch(workflow.Id, x => x.NextExecution, workflow.NextExecution); - session.Advanced.Patch(workflow.Id, x => x.Status, workflow.Status); - session.Advanced.Patch(workflow.Id, x => x.Data, workflow.Data); - session.Advanced.Patch(workflow.Id, x => x.CreateTime, workflow.CreateTime); - session.Advanced.Patch(workflow.Id, x => x.CompleteTime, workflow.CompleteTime); + PatchSession(session, workflow); + await session.SaveChangesAsync(cancellationToken); + } + } + + public async Task PersistWorkflow(WorkflowInstance workflow, List subscriptions, CancellationToken cancellationToken = default) + { + using (var session = _database.OpenAsyncSession()) + { + PatchSession(session, workflow); + + foreach (var subscription in subscriptions) + { + await session.StoreAsync(subscription, cancellationToken); + } await session.SaveChangesAsync(cancellationToken); } } + private void PatchSession(IAsyncDocumentSession session, WorkflowInstance workflow) + { + session.Advanced.Patch(workflow.Id, x => x.WorkflowDefinitionId, workflow.WorkflowDefinitionId); + session.Advanced.Patch(workflow.Id, x => x.Version, workflow.Version); + session.Advanced.Patch(workflow.Id, x => x.Description, workflow.Description); + session.Advanced.Patch(workflow.Id, x => x.Reference, workflow.Reference); + session.Advanced.Patch(workflow.Id, x => x.ExecutionPointers, workflow.ExecutionPointers); + session.Advanced.Patch(workflow.Id, x => x.NextExecution, workflow.NextExecution); + session.Advanced.Patch(workflow.Id, x => x.Status, workflow.Status); + session.Advanced.Patch(workflow.Id, x => x.Data, workflow.Data); + session.Advanced.Patch(workflow.Id, x => x.CreateTime, workflow.CreateTime); + session.Advanced.Patch(workflow.Id, x => x.CompleteTime, workflow.CompleteTime); + + } + public async Task> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default) { var now = asAt.ToUniversalTime().Ticks; diff --git a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs index 1126b6c20..09f1dbc4c 100644 --- a/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs @@ -61,6 +61,43 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c var response = await _client.PutItemAsync(request, cancellationToken); } + public async Task PersistWorkflow(WorkflowInstance workflow, List subscriptions, CancellationToken cancellationToken = default) + { + var transactionWriteItemsRequest = new TransactWriteItemsRequest() + { + TransactItems = new List() + { + { + new TransactWriteItem() + { + Put = new Put() + { + TableName = $"{_tablePrefix}-{WORKFLOW_TABLE}", + Item = workflow.ToDynamoMap() + } + } + } + } + }; + + foreach(var subscription in subscriptions) + { + subscription.Id = Guid.NewGuid().ToString(); + + transactionWriteItemsRequest.TransactItems.Add(new TransactWriteItem() + { + Put = new Put() + { + TableName = $"{_tablePrefix}-{SUBCRIPTION_TABLE}", + Item = subscription.ToDynamoMap(), + ConditionExpression = "attribute_not_exists(id)" + } + }); + } + + await _client.TransactWriteItemsAsync(transactionWriteItemsRequest, cancellationToken); + } + public async Task> GetRunnableInstances(DateTime asAt, CancellationToken cancellationToken = default) { var result = new List(); diff --git a/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs index 644009df2..008136494 100644 --- a/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.Azure/Services/CosmosDbPersistenceProvider.cs @@ -227,6 +227,16 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c await _workflowContainer.Value.UpsertItemAsync(PersistedWorkflow.FromInstance(workflow), cancellationToken: cancellationToken); } + public async Task PersistWorkflow(WorkflowInstance workflow, List subscriptions, CancellationToken cancellationToken = default) + { + await PersistWorkflow(workflow, cancellationToken); + + foreach(var subscription in subscriptions) + { + await CreateEventSubscription(subscription, cancellationToken); + } + } + public Task ProcessCommands(DateTimeOffset asOf, Func action, CancellationToken cancellationToken = default) { throw new NotImplementedException(); diff --git a/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs b/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs index cde65e2e5..6bf8df875 100644 --- a/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs +++ b/src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs @@ -47,6 +47,16 @@ public async Task CreateNewWorkflow(WorkflowInstance workflow, Cancellat return workflow.Id; } + public async Task PersistWorkflow(WorkflowInstance workflow, List subscriptions, CancellationToken cancellationToken = default) + { + await PersistWorkflow(workflow, cancellationToken); + + foreach (var subscription in subscriptions) + { + await CreateEventSubscription(subscription, cancellationToken); + } + } + public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken _ = default) { var str = JsonConvert.SerializeObject(workflow, _serializerSettings); diff --git a/test/WorkflowCore.UnitTests/BasePersistenceFixture.cs b/test/WorkflowCore.UnitTests/BasePersistenceFixture.cs index 0afa9bd7e..9114da7b9 100644 --- a/test/WorkflowCore.UnitTests/BasePersistenceFixture.cs +++ b/test/WorkflowCore.UnitTests/BasePersistenceFixture.cs @@ -185,6 +185,72 @@ public void PersistWorkflow() var current = Subject.GetWorkflowInstance(workflowId).Result; current.ShouldBeEquivalentTo(newWorkflow); } + + [Fact] + public void PersistWorkflow_with_subscriptions() + { + var workflow = new WorkflowInstance + { + Data = new TestData { Value1 = 7 }, + Description = "My Description", + Status = WorkflowStatus.Runnable, + NextExecution = 0, + Version = 1, + WorkflowDefinitionId = "My Workflow", + CreateTime = new DateTime(2000, 1, 1).ToUniversalTime(), + ExecutionPointers = new ExecutionPointerCollection(), + Reference = Guid.NewGuid().ToString() + }; + + workflow.ExecutionPointers.Add(new ExecutionPointer + { + Id = Guid.NewGuid().ToString(), + Active = true, + StepId = 0, + Scope = new List { "1", "2", "3", "4" }, + EventName = "Event1" + }); + + workflow.ExecutionPointers.Add(new ExecutionPointer + { + Id = Guid.NewGuid().ToString(), + Active = true, + StepId = 1, + Scope = new List { "1", "2", "3", "4" }, + EventName = "Event2", + }); + + var workflowId = Subject.CreateNewWorkflow(workflow).Result; + workflow.NextExecution = 0; + + List subscriptions = new List(); + foreach (var pointer in workflow.ExecutionPointers) + { + var subscription = new EventSubscription() + { + WorkflowId = workflowId, + StepId = pointer.StepId, + ExecutionPointerId = pointer.Id, + EventName = pointer.EventName, + EventKey = workflowId, + SubscribeAsOf = DateTime.UtcNow, + SubscriptionData = "data" + }; + + subscriptions.Add(subscription); + } + + Subject.PersistWorkflow(workflow, subscriptions).Wait(); + + var current = Subject.GetWorkflowInstance(workflowId).Result; + current.ShouldBeEquivalentTo(workflow); + + foreach (var pointer in workflow.ExecutionPointers) + { + subscriptions = Subject.GetSubscriptions(pointer.EventName, workflowId, DateTime.UtcNow).Result.ToList(); + subscriptions.Should().HaveCount(1); + } + } [Fact] public void ConcurrentPersistWorkflow() From 4cf59daf2d410f83f4caae668f21ab33943350a5 Mon Sep 17 00:00:00 2001 From: Viktor Shevchenko Date: Sun, 14 Aug 2022 15:00:02 +0300 Subject: [PATCH 2/2] Changed method name --- src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs index ce917e41a..3a6465902 100644 --- a/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs +++ b/src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs @@ -68,7 +68,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance { foreach (var sub in result.Subscriptions) { - await SubscribeEvent(sub, _persistenceStore, cancellationToken); + await TryProcessSubscription(sub, _persistenceStore, cancellationToken); } await _persistenceStore.PersistErrors(result.Errors, cancellationToken); @@ -98,7 +98,7 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand() } - private async Task SubscribeEvent(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken) + private async Task TryProcessSubscription(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken) { if (subscription.EventName != Event.EventTypeActivity) {