From 30430c3f20087b886f06400d41f28688231af03d Mon Sep 17 00:00:00 2001 From: ShahabT Date: Sun, 1 Dec 2024 21:58:09 -0800 Subject: [PATCH] Add independent activity test and fix --- common/testing/taskpoller/taskpoller.go | 8 +- common/testing/testvars/test_vars.go | 4 + .../api/recordactivitytaskstarted/api.go | 72 +- service/history/history_engine.go | 2 +- .../matching/task_queue_partition_manager.go | 1 - tests/testcore/functional.go | 2 +- tests/versioning_3_test.go | 705 ++++++++---------- 7 files changed, 370 insertions(+), 424 deletions(-) diff --git a/common/testing/taskpoller/taskpoller.go b/common/testing/taskpoller/taskpoller.go index 7a8e579cf41..cc873f125c9 100644 --- a/common/testing/taskpoller/taskpoller.go +++ b/common/testing/taskpoller/taskpoller.go @@ -89,8 +89,8 @@ func New( t *testing.T, client workflowservice.WorkflowServiceClient, namespace string, -) TaskPoller { - return TaskPoller{ +) *TaskPoller { + return &TaskPoller{ t: t, client: client, namespace: namespace, @@ -314,7 +314,9 @@ func (p *workflowTaskPoller) respondTaskCompleted( if reply.Identity == "" { reply.Identity = opts.tv.WorkerIdentity() } - reply.ReturnNewWorkflowTask = true + if reply.ForceCreateNewWorkflowTask { + reply.ReturnNewWorkflowTask = true + } return p.client.RespondWorkflowTaskCompleted(ctx, reply) } diff --git a/common/testing/testvars/test_vars.go b/common/testing/testvars/test_vars.go index 1e4d6728a2d..975c7d2e024 100644 --- a/common/testing/testvars/test_vars.go +++ b/common/testing/testvars/test_vars.go @@ -167,6 +167,10 @@ func (tv *TestVars) DeploymentSeries(key ...string) string { return tv.getOrCreate("deployment_series", key).(string) } +func (tv *TestVars) WithDeploymentSeries(series string, key ...string) *TestVars { + return tv.cloneSet("deployment_series", key, series) +} + func (tv *TestVars) Deployment(key ...string) *deployment.Deployment { //revive:disable-next-line:unchecked-type-assertion return &deployment.Deployment{ diff --git a/service/history/api/recordactivitytaskstarted/api.go b/service/history/api/recordactivitytaskstarted/api.go index 0d92db5a021..de9f4eb0e0e 100644 --- a/service/history/api/recordactivitytaskstarted/api.go +++ b/service/history/api/recordactivitytaskstarted/api.go @@ -29,9 +29,11 @@ import ( "errors" "fmt" + deploymentpb "go.temporal.io/api/deployment/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/api/matchingservice/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/metrics" @@ -50,6 +52,7 @@ func Invoke( request *historyservice.RecordActivityTaskStartedRequest, shardContext shard.Context, workflowConsistencyChecker api.WorkflowConsistencyChecker, + matchingClient matchingservice.MatchingServiceClient, ) (resp *historyservice.RecordActivityTaskStartedResponse, retError error) { var err error @@ -70,7 +73,7 @@ func Invoke( return nil, consts.ErrWorkflowCompleted } - response, startedTransition, err = recordActivityTaskStarted(ctx, shardContext, mutableState, request) + response, startedTransition, err = recordActivityTaskStarted(ctx, shardContext, mutableState, request, matchingClient) if err != nil { return nil, err } @@ -105,6 +108,7 @@ func recordActivityTaskStarted( shardContext shard.Context, mutableState workflow.MutableState, request *historyservice.RecordActivityTaskStartedRequest, + matchingClient matchingservice.MatchingServiceClient, ) (*historyservice.RecordActivityTaskStartedResponse, bool, error) { namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(request.GetNamespaceId())) if err != nil { @@ -189,18 +193,33 @@ func recordActivityTaskStarted( } if !pollerDeployment.Equal(wfDeployment) { - // Task is redirected, see if a transition can start. - if err := mutableState.StartDeploymentTransition(pollerDeployment); err != nil { - if errors.Is(err, workflow.ErrPinnedWorkflowCannotTransition) { - // This must be a task from a time that the workflow was unpinned, but it's - // now pinned so can't transition. Matching can drop the task safely. - return nil, false, serviceerrors.NewObsoleteMatchingTask(err.Error()) - } + // Task is redirected, see if this activity should start a transition on the workflow. The + // workflow transition happens only if the workflow TQ also is present in the poller + // deployment. Otherwise, it means the activity is independently versioned, we allow it to + // start without affecting the workflow. + wfTaskQueueIsPresent, err := deploymentContainTaskQueue(ctx, + request.NamespaceId, + pollerDeployment, + mutableState.GetExecutionInfo().GetTaskQueue(), + enumspb.TASK_QUEUE_TYPE_WORKFLOW, + matchingClient) + if err != nil { + // Let matching retry return nil, false, err } - // This activity started a transition, make sure the MS changes are written but - // reject the activity task. - return nil, true, nil + if wfTaskQueueIsPresent { + if err := mutableState.StartDeploymentTransition(pollerDeployment); err != nil { + if errors.Is(err, workflow.ErrPinnedWorkflowCannotTransition) { + // This must be a task from a time that the workflow was unpinned, but it's + // now pinned so can't transition. Matching can drop the task safely. + return nil, false, serviceerrors.NewObsoleteMatchingTask(err.Error()) + } + return nil, false, err + } + // This activity started a transition, make sure the MS changes are written but + // reject the activity task. + return nil, true, nil + } } versioningStamp := worker_versioning.StampFromCapabilities(request.PollRequest.WorkerVersionCapabilities) @@ -232,3 +251,34 @@ func recordActivityTaskStarted( return response, false, nil } + +// TODO: cache this result (especially if the answer is true) +func deploymentContainTaskQueue( + ctx context.Context, + namespaceID string, + deployment *deploymentpb.Deployment, + taskQueueName string, + taskQueueType enumspb.TaskQueueType, + matchingClient matchingservice.MatchingServiceClient, +) (bool, error) { + resp, err := matchingClient.GetTaskQueueUserData(ctx, + &matchingservice.GetTaskQueueUserDataRequest{ + NamespaceId: namespaceID, + TaskQueue: taskQueueName, + TaskQueueType: taskQueueType, + }) + if err != nil { + return false, err + } + tqData, ok := resp.GetUserData().GetData().GetPerType()[int32(taskQueueType)] + if !ok { + // The TQ is unversioned, return true if the passed deployment is also unversioned (nil) + return deployment == nil, err + } + for _, d := range tqData.GetDeploymentData().GetDeployments() { + if d.GetDeployment().Equal(deployment) { + return true, nil + } + } + return false, nil +} diff --git a/service/history/history_engine.go b/service/history/history_engine.go index 90c83beb062..24fd4c240c0 100644 --- a/service/history/history_engine.go +++ b/service/history/history_engine.go @@ -525,7 +525,7 @@ func (e *historyEngineImpl) RecordActivityTaskStarted( ctx context.Context, request *historyservice.RecordActivityTaskStartedRequest, ) (*historyservice.RecordActivityTaskStartedResponse, error) { - return recordactivitytaskstarted.Invoke(ctx, request, e.shardContext, e.workflowConsistencyChecker) + return recordactivitytaskstarted.Invoke(ctx, request, e.shardContext, e.workflowConsistencyChecker, e.matchingClient) } // ScheduleWorkflowTask schedules a workflow task if no outstanding workflow task found diff --git a/service/matching/task_queue_partition_manager.go b/service/matching/task_queue_partition_manager.go index 7c48e64b347..a06a59f9a0b 100644 --- a/service/matching/task_queue_partition_manager.go +++ b/service/matching/task_queue_partition_manager.go @@ -336,7 +336,6 @@ func (pm *taskQueuePartitionManagerImpl) ProcessSpooledTask( ) error { taskInfo := task.event.GetData() // This task came from taskReader so task.event is always set here. - // TODO: in WV2 we should not look at a spooled task directive anymore [cleanup-old-wv] directive := taskInfo.GetVersionDirective() if assignedBuildId != "" { // construct directive based on the build ID of the spool queue diff --git a/tests/testcore/functional.go b/tests/testcore/functional.go index 27ba81491cf..a43c4ab7d87 100644 --- a/tests/testcore/functional.go +++ b/tests/testcore/functional.go @@ -47,7 +47,7 @@ type ( updateutils.UpdateUtils FunctionalTestBase - TaskPoller taskpoller.TaskPoller + TaskPoller *taskpoller.TaskPoller } ) diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index b78371fcc53..cb201ec1df8 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -26,6 +26,7 @@ package tests import ( "context" + "errors" "fmt" "sync/atomic" "testing" @@ -148,465 +149,365 @@ func (s *Versioning3Suite) TestUnpinnedTask_OldDeployment() { ) } -type Versioning3Suite struct { - // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, - // not merely log an error - testcore.FunctionalTestBase +func (s *Versioning3Suite) TestWorkflowWithPinnedOverride_Sticky() { + s.RunTestWithMatchingBehavior( + func() { + s.testWorkflowWithPinnedOverride(true) + }, + ) } -func TestVersioning3FunctionalSuite(t *testing.T) { - t.Parallel() - suite.Run(t, new(Versioning3Suite)) +func (s *Versioning3Suite) TestWorkflowWithPinnedOverride_NoSticky() { + s.RunTestWithMatchingBehavior( + func() { + s.testWorkflowWithPinnedOverride(false) + }, + ) } -func (s *Versioning3Suite) SetupSuite() { - dynamicConfigOverrides := map[dynamicconfig.Key]any{ - dynamicconfig.FrontendEnableDeployments.Key(): true, - dynamicconfig.FrontendEnableWorkerVersioningWorkflowAPIs.Key(): true, - dynamicconfig.MatchingForwarderMaxChildrenPerNode.Key(): partitionTreeDegree, - - // Make sure we don't hit the rate limiter in tests - dynamicconfig.FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS.Key(): 1000, - dynamicconfig.FrontendMaxNamespaceNamespaceReplicationInducingAPIsBurstRatioPerInstance.Key(): 1, - dynamicconfig.FrontendNamespaceReplicationInducingAPIsRPS.Key(): 1000, +func (s *Versioning3Suite) testWorkflowWithPinnedOverride(sticky bool) { + tv := testvars.New(s) - // this is overridden for tests using RunTestWithMatchingBehavior - dynamicconfig.MatchingNumTaskqueueReadPartitions.Key(): 1, - dynamicconfig.MatchingNumTaskqueueWritePartitions.Key(): 1, + if sticky { + s.warmUpSticky(tv) } - s.SetDynamicConfigOverrides(dynamicConfigOverrides) - s.FunctionalTestBase.SetupSuite("testdata/es_cluster.yaml") -} -func (s *Versioning3Suite) TearDownSuite() { - s.FunctionalTestBase.TearDownSuite() -} - -func (s *Versioning3Suite) SetupTest() { - s.FunctionalTestBase.SetupTest() -} - -func (s *Versioning3Suite) TestPinnedTask_NoProperPoller() { - s.RunTestWithMatchingBehavior( - func() { - tv := testvars.New(s) - - other := tv.WithBuildId("other") - go func() { - s.idlePollWorkflow(other, true, minPollTime, "other deployment should not receive pinned task") - }() - s.waitForDeploymentDataPropagation(other, tqTypeWf) - - s.startWorkflow(tv, pinnedOverride(tv.Deployment())) - s.idlePollWorkflow(tv, false, minPollTime, "unversioned worker should not receive pinned task") + wftCompleted := make(chan interface{}) + s.pollWftAndHandle(tv, false, wftCompleted, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondWftWithActivities(tv, tv, sticky, vbUnpinned, "5"), nil }) -} + s.waitForDeploymentDataPropagation(tv, tqTypeWf) -func (s *Versioning3Suite) TestUnpinnedTask_NonCurrentDeployment() { - s.RunTestWithMatchingBehavior( - func() { - tv := testvars.New(s) - go func() { - s.idlePollWorkflow(tv, true, minPollTime, "non-current versioned poller should not receive unpinned task") - }() - s.waitForDeploymentDataPropagation(tv, tqTypeWf) - - s.startWorkflow(tv, nil) + actCompleted := make(chan interface{}) + s.pollActivityAndHandle(tv, actCompleted, + func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { + s.NotNil(task) + return respondActivity(), nil }) -} + s.waitForDeploymentDataPropagation(tv, tqTypeAct) -func (s *Versioning3Suite) TestUnpinnedTask_OldDeployment() { - s.RunTestWithMatchingBehavior( - func() { - tv := testvars.New(s) - // previous current deployment - s.updateTaskQueueDeploymentData(tv.WithBuildId("older"), time.Minute, tqTypeWf) - // current deployment - s.updateTaskQueueDeploymentData(tv, 0, tqTypeWf) + override := makePinnedOverride(tv.Deployment()) + we := s.startWorkflow(tv, override) - s.startWorkflow(tv, nil) + <-wftCompleted + s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) + if sticky { + s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) + } - s.idlePollWorkflow( - tv.WithBuildId("older"), - true, - minPollTime, - "old deployment should not receive unpinned task", - ) - }, - ) + <-actCompleted + s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) + + s.pollWftAndHandle(tv, sticky, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondCompleteWorkflow(tv, vbUnpinned), nil + }) + s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) } -func (s *Versioning3Suite) TestWorkflowWithPinnedOverride_Sticky() { +func (s *Versioning3Suite) TestUnpinnedWorkflow_Sticky() { s.RunTestWithMatchingBehavior( func() { - s.testWorkflowWithPinnedOverride(true) + s.testUnpinnedWorkflow(true) }, ) } -func (s *Versioning3Suite) TestWorkflowWithPinnedOverride_NoSticky() { +func (s *Versioning3Suite) TestUnpinnedWorkflow_NoSticky() { s.RunTestWithMatchingBehavior( func() { - s.testWorkflowWithPinnedOverride(false) + s.testUnpinnedWorkflow(false) }, ) } -func (s *Versioning3Suite) testWorkflowWithPinnedOverride(sticky bool) { +func (s *Versioning3Suite) testUnpinnedWorkflow(sticky bool) { tv := testvars.New(s) + d := tv.Deployment() if sticky { s.warmUpSticky(tv) } - func(s *Versioning3Suite) TestWorkflowWithPinnedOverride_Sticky() - { - s.RunTestWithMatchingBehavior( - func() { - s.testWorkflowWithPinnedOverride(true) - }, - ) - } + wftCompleted := make(chan interface{}) + s.pollWftAndHandle(tv, false, wftCompleted, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.verifyWorkflowVersioning(tv, vbUnspecified, nil, nil, transitionTo(d)) + return respondWftWithActivities(tv, tv, sticky, vbUnpinned, "5"), nil + }) + s.waitForDeploymentDataPropagation(tv, tqTypeWf) - func(s *Versioning3Suite) TestWorkflowWithPinnedOverride_NoSticky() - { - s.RunTestWithMatchingBehavior( - func() { - s.testWorkflowWithPinnedOverride(false) - }, - ) + actCompleted := make(chan interface{}) + s.pollActivityAndHandle(tv, actCompleted, + func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { + s.NotNil(task) + return respondActivity(), nil + }) + s.waitForDeploymentDataPropagation(tv, tqTypeAct) + + s.updateTaskQueueDeploymentData(tv, 0, tqTypeWf, tqTypeAct) + + we := s.startWorkflow(tv, nil) + + <-wftCompleted + s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), nil, nil) + if sticky { + s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) } - func(s *Versioning3Suite) testWorkflowWithPinnedOverride(sticky - bool) { - tv := testvars.New(s) + <-actCompleted + s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), nil, nil) - if sticky { - s.warmUpSticky(tv) - } + s.pollWftAndHandle(tv, sticky, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondCompleteWorkflow(tv, vbUnpinned), nil + }) + s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), nil, nil) +} - wftCompleted := make(chan interface{}) - s.pollWftAndHandle( - tv, false, wftCompleted, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.NotNil(task) - return respondWftWithActivities(tv, sticky, vbUnpinned, "5"), nil - } - ) - s.waitForDeploymentDataPropagation(tv, tqTypeWf) - - actCompleted := make(chan interface{}) - s.pollActivityAndHandle( - tv, actCompleted, - func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { - s.NotNil(task) - return respondActivity(), nil - } - ) - s.waitForDeploymentDataPropagation(tv, tqTypeAct) +func (s *Versioning3Suite) TestTransitionFromWft_Sticky() { + s.testTransitionFromWft(true) +} - override := makePinnedOverride(tv.Deployment()) - we := s.startWorkflow(tv, override) +func (s *Versioning3Suite) TestTransitionFromWft_NoSticky() { + s.testTransitionFromWft(false) +} - <-wftCompleted - s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) - if sticky { - s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) - } +func (s *Versioning3Suite) testTransitionFromWft(sticky bool) { + // Wf runs one TWF and one AC on dA, then the second WFT is redirected to dB and + // transitions the wf with it. - <-actCompleted - s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) + tvA := testvars.New(s).WithBuildId("A") + tvB := tvA.WithBuildId("B") + dA := tvA.Deployment() + dB := tvB.Deployment() - s.pollWftAndHandle( - tv, sticky, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.NotNil(task) - return respondCompleteWorkflow(tv, vbUnpinned), nil - } - ) - s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) + if sticky { + s.warmUpSticky(tvA) } - func(s *Versioning3Suite) TestUnpinnedWorkflow_Sticky() - { - s.RunTestWithMatchingBehavior( - func() { - s.testUnpinnedWorkflow(true) - }, - ) - } + s.updateTaskQueueDeploymentData(tvA, 0, tqTypeWf, tqTypeAct) + we := s.startWorkflow(tvA, nil) - func(s *Versioning3Suite) TestUnpinnedWorkflow_NoSticky() - { - s.RunTestWithMatchingBehavior( - func() { - s.testUnpinnedWorkflow(false) - }, - ) + s.pollWftAndHandle(tvA, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.verifyWorkflowVersioning(tvA, vbUnspecified, nil, nil, transitionTo(dA)) + return respondWftWithActivities(tvA, tvA, sticky, vbUnpinned, "5"), nil + }) + s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, nil) + if sticky { + s.verifyWorkflowStickyQueue(we, tvA.StickyTaskQueue()) } - func(s *Versioning3Suite) testUnpinnedWorkflow(sticky - bool) { - tv := testvars.New(s) - d := tv.Deployment() + s.pollActivityAndHandle(tvA, nil, + func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { + s.NotNil(task) + return respondActivity(), nil + }) + s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, nil) - if sticky { - s.warmUpSticky(tv) - } + // Set B as the current deployment + s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) - wftCompleted := make(chan interface{}) - s.pollWftAndHandle( - tv, false, wftCompleted, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.NotNil(task) - s.verifyWorkflowVersioning(tv, vbUnspecified, nil, nil, transitionTo(d)) - return respondWftWithActivities(tv, sticky, vbUnpinned, "5"), nil - } - ) - s.waitForDeploymentDataPropagation(tv, tqTypeWf) - - actCompleted := make(chan interface{}) - s.pollActivityAndHandle( - tv, actCompleted, - func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { - s.NotNil(task) - return respondActivity(), nil - } - ) - s.waitForDeploymentDataPropagation(tv, tqTypeAct) + s.pollWftAndHandle(tvB, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, transitionTo(dB)) + return respondCompleteWorkflow(tvB, vbUnpinned), nil + }) + s.verifyWorkflowVersioning(tvB, vbUnpinned, dB, nil, nil) +} - s.updateTaskQueueDeploymentData(tv, 0, tqTypeWf, tqTypeAct) +func (s *Versioning3Suite) TestTransitionFromActivity_Sticky() { + s.testTransitionFromActivity(true) +} - we := s.startWorkflow(tv, nil) +func (s *Versioning3Suite) TestTransitionFromActivity_NoSticky() { + s.testTransitionFromActivity(false) +} - <-wftCompleted - s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), nil, nil) - if sticky { - s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) - } +func (s *Versioning3Suite) testTransitionFromActivity(sticky bool) { + // Wf runs one TWF on dA and schedules four activities, then: + // 1. The first and second activities starts on dA + // 2. Current deployment becomes dB + // 3. The third activity is redirected to dB and starts a transition in the wf, without being + // dispatched. + // 4. The 4th activity also does not start on any of the builds although there are pending + // pollers on both. + // 5. The transition generates a WFT, and it is started in dB. + // 6. The 1st act is completed here while the transition is going on. + // 7. The 2nd act fails and makes another attempt. But it is not dispatched. + // 8. WFT completes and the transition completes. + // 9. All the 3 remaining activities are now dispatched and completed. - <-actCompleted - s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), nil, nil) + tvA := testvars.New(s).WithBuildId("A") + tvB := tvA.WithBuildId("B") + dA := tvA.Deployment() + dB := tvB.Deployment() - s.pollWftAndHandle( - tv, sticky, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.NotNil(task) - return respondCompleteWorkflow(tv, vbUnpinned), nil - } - ) - s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), nil, nil) + if sticky { + s.warmUpSticky(tvA) } - func(s *Versioning3Suite) TestTransitionFromWft_Sticky() - { - s.testTransitionFromWft(true) - } + s.updateTaskQueueDeploymentData(tvA, 0, tqTypeWf, tqTypeAct) + we := s.startWorkflow(tvA, nil) - func(s *Versioning3Suite) TestTransitionFromWft_NoSticky() - { - s.testTransitionFromWft(false) + s.pollWftAndHandle(tvA, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.verifyWorkflowVersioning(tvA, vbUnspecified, nil, nil, transitionTo(dA)) + return respondWftWithActivities(tvA, tvA, sticky, vbUnpinned, "5", "6", "7", "8"), nil + }) + s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, nil) + if sticky { + s.verifyWorkflowStickyQueue(we, tvA.StickyTaskQueue()) } - func(s *Versioning3Suite) testTransitionFromWft(sticky - bool) { - // Wf runs one TWF and one AC on dA, then the second WFT is redirected to dB and - // transitions the wf with it. - - tvA := testvars.New(s).WithBuildId("A") - tvB := tvA.WithBuildId("B") - dA := tvA.Deployment() - dB := tvB.Deployment() + transitionCompleted := atomic.Bool{} + transitionStarted := make(chan interface{}) + act1Started := make(chan interface{}) + act1Completed := make(chan interface{}) + act2Started := make(chan interface{}) + act2Failed := make(chan interface{}) + act2To4Completed := make(chan interface{}) - if sticky { - s.warmUpSticky(tvA) - } + // 1. Start 1st and 2nd activities + s.pollActivityAndHandle(tvA, act1Completed, + func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { + s.NotNil(task) + s.Logger.Info(fmt.Sprintf("Activity 1 started ID: %s", task.ActivityId)) + close(act1Started) + // block until the transition WFT starts + <-transitionStarted + // 6. the 1st act completes during transition + s.Logger.Info(fmt.Sprintf("Activity 1 completed ID: %s", task.ActivityId)) + return respondActivity(), nil + }) - s.updateTaskQueueDeploymentData(tvA, 0, tqTypeWf, tqTypeAct) - we := s.startWorkflow(tvA, nil) + <-act1Started + s.pollActivityAndHandle(tvA, act2Failed, + func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { + s.NotNil(task) + s.Logger.Info(fmt.Sprintf("Activity 2 started ID: %s", task.ActivityId)) + close(act2Started) + // block until the transition WFT starts + <-transitionStarted + // 7. 2nd activity fails. Respond with error so it is retried. + s.Logger.Info(fmt.Sprintf("Activity 2 failed ID: %s", task.ActivityId)) + return nil, errors.New("intentional activity failure") + }) - s.pollWftAndHandle( - tvA, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.NotNil(task) - s.verifyWorkflowVersioning(tvA, vbUnspecified, nil, nil, transitionTo(dA)) - return respondWftWithActivities(tvA, sticky, vbUnpinned, "5"), nil - } - ) - s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, nil) - if sticky { - s.verifyWorkflowStickyQueue(we, tvA.StickyTaskQueue()) + <-act2Started + s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, nil) + + // 2. Set dB as the current deployment + s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) + // Although updateTaskQueueDeploymentData waits for deployment data to reach the TQs, backlogged + // tasks might still be waiting behind the old deployment's poll channel. Partition manage should + // immediately react to the deployment data changes, but there still is a race possible and the + // only way to safeguard against it is to wait a little while before proceeding. + time.Sleep(time.Millisecond * 100) //nolint:forbidigo + + // Pollers of dA are there, but should not get any task + go s.idlePollActivity(tvA, true, ver3MinPollTime, "activities should not go to the old deployment") + + go func() { + for i := 2; i <= 4; i++ { + // 3-4. The new dB poller should trigger the third activity to be redirected, but the activity should + // not start until transition completes in the next wft. + // Repeating the handler so it processes all the three activities + s.pollActivityAndHandle(tvB, nil, + func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { + // Activity should not start until the transition is completed + s.True(transitionCompleted.Load()) + s.NotNil(task) + s.Logger.Info(fmt.Sprintf("Remaining activity completed ID: %s", task.ActivityId)) + return respondActivity(), nil + }) } + close(act2To4Completed) + }() - s.pollActivityAndHandle( - tvA, nil, - func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { - s.NotNil(task) - return respondActivity(), nil - } - ) - s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, nil) - - // Set B as the current deployment - s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) - - s.pollWftAndHandle( - tvB, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.NotNil(task) - s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, transitionTo(dB)) - return respondCompleteWorkflow(tvB, vbUnpinned), nil - } - ) - s.verifyWorkflowVersioning(tvB, vbUnpinned, dB, nil, nil) - } - - func(s *Versioning3Suite) TestTransitionFromActivity_Sticky() - { - s.testTransitionFromActivity(true) - } - - func(s *Versioning3Suite) TestTransitionFromActivity_NoSticky() - { - s.testTransitionFromActivity(false) + // 5. The transition should create a new WFT to be sent to dB. + s.pollWftAndHandle(tvB, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, transitionTo(dB)) + close(transitionStarted) + s.Logger.Info("Transition wft started") + // 8. Complete the transition after act1 completes and act2's first attempt fails. + <-act1Completed + <-act2Failed + transitionCompleted.Store(true) + s.Logger.Info("Transition wft completed") + return respondEmptyWft(tvB, sticky, vbUnpinned), nil + }) + s.verifyWorkflowVersioning(tvB, vbUnpinned, dB, nil, nil) + if sticky { + s.verifyWorkflowStickyQueue(we, tvB.StickyTaskQueue()) } - func(s *Versioning3Suite) testTransitionFromActivity(sticky - bool) { - // Wf runs one TWF on dA and schedules four activities, then: - // 1. The first and second activities starts on dA - // 2. Current deployment becomes dB - // 3. The third activity is redirected to dB and starts a transition in the wf, without being - // dispatched. - // 4. The 4th activity also does not start on any of the builds although there are pending - // pollers on both. - // 5. The transition generates a WFT, and it is started in dB. - // 6. The 1st act is completed here while the transition is going on. - // 7. The 2nd act fails and makes another attempt. But it is not dispatched. - // 8. WFT completes and the transition completes. - // 9. All the 3 remaining activities are now dispatched and completed. - - tvA := testvars.New(s).WithBuildId("A") - tvB := tvA.WithBuildId("B") - dA := tvA.Deployment() - dB := tvB.Deployment() - - if sticky { - s.warmUpSticky(tvA) - } - - s.updateTaskQueueDeploymentData(tvA, 0, tqTypeWf, tqTypeAct) - we := s.startWorkflow(tvA, nil) - - s.pollWftAndHandle( - tvA, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.NotNil(task) - s.verifyWorkflowVersioning(tvA, unspecified, nil, nil, transitionTo(dA)) - return respondWftWithActivities(tvA, sticky, unpinned, "5", "6", "7", "8"), nil - } - ) - s.verifyWorkflowVersioning(tvA, unpinned, dA, nil, nil) - if sticky { - s.verifyWorkflowStickyQueue(we, tvA.StickyTaskQueue()) - } + // 9. Now all activities should complete. + <-act2To4Completed + s.pollWftAndHandle(tvB, sticky, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.Logger.Info("Final wft completed") + return respondCompleteWorkflow(tvB, vbUnpinned), nil + }) + s.verifyWorkflowVersioning(tvB, vbUnpinned, dB, nil, nil) +} - transitionCompleted := atomic.Bool{} - transitionStarted := make(chan interface{}) - act1Started := make(chan interface{}) - act1Completed := make(chan interface{}) - act2Started := make(chan interface{}) - act2Failed := make(chan interface{}) - act2To4Completed := make(chan interface{}) - - // 1. Start 1st and 2nd activities - s.pollActivityAndHandle( - tvA, act1Completed, - func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { - s.NotNil(task) - close(act1Started) - // block until the transition WFT starts - <-transitionStarted - // 6. the 1st act completes during transition - return respondActivity(), nil - } - ) +func (s *Versioning3Suite) TestIndependentActivity_Pinned() { + // TODO (shahab): need to handle pinned activities properly so the wf does not get stuck. + s.T().Skip() + s.testIndependentActivity(vbPinned) +} - <-act1Started - s.pollActivityAndHandle( - tvA, act2Failed, - func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { - s.NotNil(task) - close(act2Started) - // block until the transition WFT starts - <-transitionStarted - // 7. 2nd activity fails. Respond with error so it is retried. - return nil, errors.New("intentional activity failure") - } - ) +func (s *Versioning3Suite) TestIndependentActivity_Unpinned() { + s.testIndependentActivity(vbUnpinned) +} - <-act2Started - s.verifyWorkflowVersioning(tvA, unpinned, dA, nil, nil) +func (s *Versioning3Suite) testIndependentActivity(behavior enumspb.VersioningBehavior) { + tvWf := testvars.New(s).WithDeploymentSeries("wf-series") + tvAct := testvars.New(s).WithDeploymentSeries("act-series").WithTaskQueue("act-tq") + dWf := tvWf.Deployment() - // 2. Set dB as the current deployment - s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) + // Set current deployment for each TQ + s.updateTaskQueueDeploymentData(tvWf, 0, tqTypeWf) + s.updateTaskQueueDeploymentData(tvAct, 0, tqTypeAct) - // Pollers of dA are there, but should not get any task - go s.idlePollActivity( - tvA, - true, - time.Second*10, - "activities should not go to the old deployment" - ) + s.startWorkflow(tvWf, nil) - go func() { - for i := 2; i <= 4; i++ { - // 3-4. The new dB poller should trigger the third activity to be redirected, but the activity should - // not start until transition completes in the next wft. - // Repeating the handler so it processes all the three activities - s.pollActivityAndHandle( - tvB, nil, - func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { - // Activity should not start until the transition is completed - s.True(transitionCompleted.Load()) - s.NotNil(task) - return respondActivity(), nil - } - ) - } - close(act2To4Completed) - }() + s.pollWftAndHandle(tvWf, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.verifyWorkflowVersioning(tvWf, vbUnspecified, nil, nil, transitionTo(dWf)) + s.Logger.Info("First wf task completed") + return respondWftWithActivities(tvWf, tvAct, false, behavior, "5"), nil + }) + s.verifyWorkflowVersioning(tvWf, behavior, dWf, nil, nil) - // 5. The transition should create a new WFT to be sent to dB. - s.pollWftAndHandle( - tvB, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.NotNil(task) - s.verifyWorkflowVersioning(tvA, unpinned, dA, nil, transitionTo(dB)) - close(transitionStarted) - // 8. Complete the transition after act1 completes and act2's first attempt fails. - <-act1Completed - <-act2Failed - transitionCompleted.Store(true) - return respondEmptyWft(tvB, sticky, unpinned), nil - } - ) - s.verifyWorkflowVersioning(tvB, unpinned, dB, nil, nil) - if sticky { - s.verifyWorkflowStickyQueue(we, tvB.StickyTaskQueue()) - } + s.pollActivityAndHandle(tvAct, nil, + func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { + s.NotNil(task) + s.Logger.Info("Activity completed") + return respondActivity(), nil + }) + s.verifyWorkflowVersioning(tvWf, behavior, dWf, nil, nil) - // 9. Now all activities should complete. - <-act2To4Completed - s.pollWftAndHandle( - tvB, false, nil, - func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { - s.NotNil(task) - return respondCompleteWorkflow(tvB, unpinned), nil - } - ) - s.verifyWorkflowVersioning(tvB, unpinned, dB, nil, nil) - } + s.pollWftAndHandle(tvWf, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondCompleteWorkflow(tvWf, behavior), nil + }) + s.verifyWorkflowVersioning(tvWf, behavior, dWf, nil, nil) } func transitionTo(d *deploymentpb.Deployment) *workflow.DeploymentTransition { @@ -677,7 +578,7 @@ func (s *Versioning3Suite) verifyWorkflowVersioning( } if !versioningInfo.GetDeploymentTransition().Equal(transition) { - s.Fail(fmt.Sprintf("deployment override mismatch. expected: {%s}, actual: {%s}", + s.Fail(fmt.Sprintf("deployment transition mismatch. expected: {%s}, actual: {%s}", transition, versioningInfo.GetDeploymentTransition(), )) @@ -689,7 +590,8 @@ func respondActivity() *workflowservice.RespondActivityTaskCompletedRequest { } func respondWftWithActivities( - tv *testvars.TestVars, + tvWf *testvars.TestVars, + tvAct *testvars.TestVars, sticky bool, behavior enumspb.VersioningBehavior, activityIds ...string, @@ -697,7 +599,7 @@ func respondWftWithActivities( var stickyAttr *taskqueuepb.StickyExecutionAttributes if sticky { stickyAttr = &taskqueuepb.StickyExecutionAttributes{ - WorkerTaskQueue: tv.StickyTaskQueue(), + WorkerTaskQueue: tvWf.StickyTaskQueue(), ScheduleToStartTimeout: durationpb.New(5 * time.Second), } } @@ -709,7 +611,7 @@ func respondWftWithActivities( ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ ActivityId: a, ActivityType: &commonpb.ActivityType{Name: "act"}, - TaskQueue: tv.TaskQueue(), + TaskQueue: tvAct.TaskQueue(), Input: payloads.EncodeString("input"), // TODO (shahab): tests with forced task forward take multiple seconds. Need to know why? ScheduleToCloseTimeout: durationpb.New(10 * time.Second), @@ -724,9 +626,8 @@ func respondWftWithActivities( return &workflowservice.RespondWorkflowTaskCompletedRequest{ Commands: commands, StickyAttributes: stickyAttr, - ReturnNewWorkflowTask: false, ForceCreateNewWorkflowTask: false, - Deployment: tv.Deployment(), + Deployment: tvWf.Deployment(), VersioningBehavior: behavior, } } @@ -736,20 +637,7 @@ func respondEmptyWft( sticky bool, behavior enumspb.VersioningBehavior, ) *workflowservice.RespondWorkflowTaskCompletedRequest { - var stickyAttr *taskqueuepb.StickyExecutionAttributes - if sticky { - stickyAttr = &taskqueuepb.StickyExecutionAttributes{ - WorkerTaskQueue: tv.StickyTaskQueue(), - ScheduleToStartTimeout: durationpb.New(5 * time.Second), - } - } - return &workflowservice.RespondWorkflowTaskCompletedRequest{ - StickyAttributes: stickyAttr, - ReturnNewWorkflowTask: false, - ForceCreateNewWorkflowTask: false, - Deployment: tv.Deployment(), - VersioningBehavior: behavior, - } + return respondWftWithActivities(tv, tv, sticky, behavior) } func respondCompleteWorkflow( @@ -767,7 +655,6 @@ func respondCompleteWorkflow( }, }, }, - ReturnNewWorkflowTask: false, ForceCreateNewWorkflowTask: false, Deployment: tv.Deployment(), VersioningBehavior: behavior, @@ -818,6 +705,7 @@ func (s *Versioning3Suite) Name() string { // pollWftAndHandle can be used in sync and async mode. For async mode pass the async channel. It // will be closed when the task is handled. +// Returns the poller and poll response only in sync mode (can be used to process new wft in the response) func (s *Versioning3Suite) pollWftAndHandle( tv *testvars.TestVars, sticky bool, @@ -927,7 +815,10 @@ func (s *Versioning3Suite) idlePollActivity( ).HandleTask( tv, func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { - s.Fail(unexpectedTaskMessage) + if task != nil { + s.Logger.Error(fmt.Sprintf("Unexpected activity task received, ID: %s", task.ActivityId)) + s.Fail(unexpectedTaskMessage) + } return nil, nil }, taskpoller.WithTimeout(timeout),