Skip to content

Commit

Permalink
Add independent activity test and fix
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahabT committed Dec 2, 2024
1 parent 99636b0 commit 30430c3
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 424 deletions.
8 changes: 5 additions & 3 deletions common/testing/taskpoller/taskpoller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func New(
t *testing.T,
client workflowservice.WorkflowServiceClient,
namespace string,
) TaskPoller {
return TaskPoller{
) *TaskPoller {
return &TaskPoller{
t: t,
client: client,
namespace: namespace,
Expand Down Expand Up @@ -314,7 +314,9 @@ func (p *workflowTaskPoller) respondTaskCompleted(
if reply.Identity == "" {
reply.Identity = opts.tv.WorkerIdentity()
}
reply.ReturnNewWorkflowTask = true
if reply.ForceCreateNewWorkflowTask {
reply.ReturnNewWorkflowTask = true
}

return p.client.RespondWorkflowTaskCompleted(ctx, reply)
}
Expand Down
4 changes: 4 additions & 0 deletions common/testing/testvars/test_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ func (tv *TestVars) DeploymentSeries(key ...string) string {
return tv.getOrCreate("deployment_series", key).(string)
}

func (tv *TestVars) WithDeploymentSeries(series string, key ...string) *TestVars {
return tv.cloneSet("deployment_series", key, series)
}

func (tv *TestVars) Deployment(key ...string) *deployment.Deployment {
//revive:disable-next-line:unchecked-type-assertion
return &deployment.Deployment{
Expand Down
72 changes: 61 additions & 11 deletions service/history/api/recordactivitytaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import (
"errors"
"fmt"

deploymentpb "go.temporal.io/api/deployment/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/metrics"
Expand All @@ -50,6 +52,7 @@ func Invoke(
request *historyservice.RecordActivityTaskStartedRequest,
shardContext shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
matchingClient matchingservice.MatchingServiceClient,
) (resp *historyservice.RecordActivityTaskStartedResponse, retError error) {

var err error
Expand All @@ -70,7 +73,7 @@ func Invoke(
return nil, consts.ErrWorkflowCompleted
}

response, startedTransition, err = recordActivityTaskStarted(ctx, shardContext, mutableState, request)
response, startedTransition, err = recordActivityTaskStarted(ctx, shardContext, mutableState, request, matchingClient)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -105,6 +108,7 @@ func recordActivityTaskStarted(
shardContext shard.Context,
mutableState workflow.MutableState,
request *historyservice.RecordActivityTaskStartedRequest,
matchingClient matchingservice.MatchingServiceClient,
) (*historyservice.RecordActivityTaskStartedResponse, bool, error) {
namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(request.GetNamespaceId()))
if err != nil {
Expand Down Expand Up @@ -189,18 +193,33 @@ func recordActivityTaskStarted(
}

if !pollerDeployment.Equal(wfDeployment) {
// Task is redirected, see if a transition can start.
if err := mutableState.StartDeploymentTransition(pollerDeployment); err != nil {
if errors.Is(err, workflow.ErrPinnedWorkflowCannotTransition) {
// This must be a task from a time that the workflow was unpinned, but it's
// now pinned so can't transition. Matching can drop the task safely.
return nil, false, serviceerrors.NewObsoleteMatchingTask(err.Error())
}
// Task is redirected, see if this activity should start a transition on the workflow. The
// workflow transition happens only if the workflow TQ also is present in the poller
// deployment. Otherwise, it means the activity is independently versioned, we allow it to
// start without affecting the workflow.
wfTaskQueueIsPresent, err := deploymentContainTaskQueue(ctx,
request.NamespaceId,
pollerDeployment,
mutableState.GetExecutionInfo().GetTaskQueue(),
enumspb.TASK_QUEUE_TYPE_WORKFLOW,
matchingClient)
if err != nil {
// Let matching retry
return nil, false, err
}
// This activity started a transition, make sure the MS changes are written but
// reject the activity task.
return nil, true, nil
if wfTaskQueueIsPresent {
if err := mutableState.StartDeploymentTransition(pollerDeployment); err != nil {
if errors.Is(err, workflow.ErrPinnedWorkflowCannotTransition) {
// This must be a task from a time that the workflow was unpinned, but it's
// now pinned so can't transition. Matching can drop the task safely.
return nil, false, serviceerrors.NewObsoleteMatchingTask(err.Error())
}
return nil, false, err
}
// This activity started a transition, make sure the MS changes are written but
// reject the activity task.
return nil, true, nil
}
}

versioningStamp := worker_versioning.StampFromCapabilities(request.PollRequest.WorkerVersionCapabilities)
Expand Down Expand Up @@ -232,3 +251,34 @@ func recordActivityTaskStarted(

return response, false, nil
}

// TODO: cache this result (especially if the answer is true)
func deploymentContainTaskQueue(
ctx context.Context,
namespaceID string,
deployment *deploymentpb.Deployment,
taskQueueName string,
taskQueueType enumspb.TaskQueueType,
matchingClient matchingservice.MatchingServiceClient,
) (bool, error) {
resp, err := matchingClient.GetTaskQueueUserData(ctx,
&matchingservice.GetTaskQueueUserDataRequest{
NamespaceId: namespaceID,
TaskQueue: taskQueueName,
TaskQueueType: taskQueueType,
})
if err != nil {
return false, err
}
tqData, ok := resp.GetUserData().GetData().GetPerType()[int32(taskQueueType)]
if !ok {
// The TQ is unversioned, return true if the passed deployment is also unversioned (nil)
return deployment == nil, err
}
for _, d := range tqData.GetDeploymentData().GetDeployments() {
if d.GetDeployment().Equal(deployment) {
return true, nil
}
}
return false, nil
}
2 changes: 1 addition & 1 deletion service/history/history_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func (e *historyEngineImpl) RecordActivityTaskStarted(
ctx context.Context,
request *historyservice.RecordActivityTaskStartedRequest,
) (*historyservice.RecordActivityTaskStartedResponse, error) {
return recordactivitytaskstarted.Invoke(ctx, request, e.shardContext, e.workflowConsistencyChecker)
return recordactivitytaskstarted.Invoke(ctx, request, e.shardContext, e.workflowConsistencyChecker, e.matchingClient)
}

// ScheduleWorkflowTask schedules a workflow task if no outstanding workflow task found
Expand Down
1 change: 0 additions & 1 deletion service/matching/task_queue_partition_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ func (pm *taskQueuePartitionManagerImpl) ProcessSpooledTask(
) error {
taskInfo := task.event.GetData()
// This task came from taskReader so task.event is always set here.
// TODO: in WV2 we should not look at a spooled task directive anymore [cleanup-old-wv]
directive := taskInfo.GetVersionDirective()
if assignedBuildId != "" {
// construct directive based on the build ID of the spool queue
Expand Down
2 changes: 1 addition & 1 deletion tests/testcore/functional.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type (
updateutils.UpdateUtils
FunctionalTestBase

TaskPoller taskpoller.TaskPoller
TaskPoller *taskpoller.TaskPoller
}
)

Expand Down
Loading

0 comments on commit 30430c3

Please sign in to comment.