Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More versioning-3 tests #6910

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
de5c1ee
add specific check for SDK GetWorkflowExecutionOptions path
carlydf Nov 28, 2024
8663f0a
Versioning 3 functional tests
ShahabT Nov 29, 2024
62a625c
improved tests
ShahabT Nov 29, 2024
6d6d3d2
Merge branch 'main' of github.com:temporalio/temporal into cdf/more-v…
carlydf Nov 29, 2024
4164051
transition tests
ShahabT Nov 29, 2024
009217b
add form of test that would work with new sdk
carlydf Nov 29, 2024
e6a5e3a
Merge remote-tracking branch 'origin/main' into shahab/versioning-tests
ShahabT Nov 29, 2024
e2d6fc2
another non-working child override inheritance test
carlydf Nov 29, 2024
0c172f1
fix lint
ShahabT Nov 29, 2024
e6ec3df
convert test to new poller, got 'history is nil' error
carlydf Nov 30, 2024
3bf7cbd
fix user data manage tests
ShahabT Nov 30, 2024
b217ae6
add print and rename tests
carlydf Nov 30, 2024
9d6b573
wrap test variables in tv.String
carlydf Nov 30, 2024
0584967
make RandomizeString shorter
dnr Nov 30, 2024
8cec945
rename dynamic config
dnr Nov 30, 2024
f25176b
simplify code, rename constants
dnr Nov 30, 2024
7ab569b
child workflow no override test works, with override poller is not ge…
carlydf Nov 30, 2024
d501401
Merge branch 'main' of github.com:temporalio/temporal into cdf/more-v…
carlydf Nov 30, 2024
0524e3b
push
carlydf Nov 30, 2024
1ea10d7
Revert "Make ChildWorkflowExecution inherit versioning override of pa…
carlydf Nov 30, 2024
5b92ab5
revert RandomizeStr change
ShahabT Nov 30, 2024
db2b3ea
fix nil pointer
carlydf Nov 30, 2024
203eee9
Merge branch 'shahab/versioning-tests' of github.com:temporalio/tempo…
carlydf Nov 30, 2024
b34617b
Revert "Revert "Make ChildWorkflowExecution inherit versioning overri…
carlydf Nov 30, 2024
d64a51b
test child workflow inherit
carlydf Nov 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions api/historyservice/v1/request_response.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 5 additions & 13 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,10 +852,12 @@ of Timeout and if no activity is seen even after that the connection is closed.`
true,
`FrontendEnableSchedules enables schedule-related RPCs in the frontend`,
)
FrontendEnableDeployments = NewNamespaceBoolSetting(
"frontend.enableDeployments",
EnableDeployments = NewNamespaceBoolSetting(
"system.enableDeployments",
false,
`FrontendEnableDeployments enables deployment-related RPCs in the frontend`,
`EnableDeployments enables deployments (versioning v3) in all services,
including deployment-related RPCs in the frontend, deployment entity workflows in the worker,
and deployment interaction in matching and history.`,
)
EnableNexus = NewGlobalBoolSetting(
"system.enableNexus",
Expand Down Expand Up @@ -1220,11 +1222,6 @@ these log lines can be noisy, we want to be able to turn on and sample selective
false,
`MatchingDropNonRetryableTasks states if we should drop matching tasks with Internal/Dataloss errors`,
)
MatchingEnableDeployments = NewNamespaceBoolSetting(
"matching.enableDeployment",
false,
`MatchingEnableDeployments enables deployment-related RPCs in matching`,
)
MatchingMaxTaskQueuesInDeployment = NewNamespaceIntSetting(
"matching.maxTaskQueuesInDeployment",
1000,
Expand Down Expand Up @@ -2560,11 +2557,6 @@ If the service configures with archival feature enabled, update worker.historySc
`How long to sleep within a local activity before pushing to workflow level sleep (don't make this
close to or more than the workflow task timeout)`,
)
WorkerEnableDeployment = NewNamespaceBoolSetting(
"worker.enableDeployment",
false,
`WorkerEnableDeploymentGroup controls whether to start the worker for deployment and deployment-name workflows`,
)
WorkerDeleteNamespaceActivityLimits = NewGlobalTypedSetting(
"worker.deleteNamespaceActivityLimitsConfig",
sdkworker.Options{},
Expand Down
2 changes: 1 addition & 1 deletion common/testing/taskpoller/taskpoller.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (p *workflowTaskPoller) pollTask(
if err != nil {
return nil, err
}
if resp == nil {
if resp == nil || resp.TaskToken == nil {
return nil, NoWorkflowTaskAvailable
}

Expand Down
13 changes: 13 additions & 0 deletions common/testing/testvars/test_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/deployment/v1"
enumspb "go.temporal.io/api/enums/v1"
namespacepb "go.temporal.io/api/namespace/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
Expand Down Expand Up @@ -157,11 +158,23 @@ func (tv *TestVars) BuildId(key ...string) string {
return tv.getOrCreate("build_id", key).(string)
}

func (tv *TestVars) WithBuildId(buildId string, key ...string) *TestVars {
return tv.cloneSet("build_id", key, buildId)
}

func (tv *TestVars) DeploymentSeries(key ...string) string {
//revive:disable-next-line:unchecked-type-assertion
return tv.getOrCreate("deployment_series", key).(string)
}

func (tv *TestVars) Deployment(key ...string) *deployment.Deployment {
//revive:disable-next-line:unchecked-type-assertion
return &deployment.Deployment{
SeriesName: tv.DeploymentSeries(key...),
BuildId: tv.BuildId(key...),
}
}

func (tv *TestVars) WithWorkflowID(workflowID string, key ...string) *TestVars {
return tv.cloneSet("workflow_id", key, workflowID)
}
Expand Down
6 changes: 5 additions & 1 deletion common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,14 @@ func MakeBuildIdDirective(buildId string) *taskqueuespb.TaskVersionDirective {
}

func StampFromCapabilities(cap *commonpb.WorkerVersionCapabilities) *commonpb.WorkerVersionStamp {
if cap.GetUseVersioning() && cap.GetDeploymentSeriesName() != "" {
// Versioning 3, do not return stamp.
return nil
}
// TODO: remove `cap.BuildId != ""` condition after old versioning cleanup. this condition is used to differentiate
// between old and new versioning in Record*TaskStart calls. [cleanup-old-wv]
// we don't want to add stamp for task started events in old versioning
if cap != nil && cap.BuildId != "" {
if cap.GetBuildId() != "" {
return &commonpb.WorkerVersionStamp{UseVersioning: cap.UseVersioning, BuildId: cap.BuildId}
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,7 @@ message RecordWorkflowTaskStartedRequest {
temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest poll_request = 6;
temporal.server.api.clock.v1.VectorClock clock = 7;
temporal.server.api.taskqueue.v1.BuildIdRedirectInfo build_id_redirect_info = 8;
// Presence of this value means matching has redirected the task to a deployment other than
// the deployment that History passed when scheduling the task.
// The deployment passed by History when the task was scheduled.
temporal.api.deployment.v1.Deployment scheduled_deployment = 9;

}
Expand Down Expand Up @@ -286,13 +285,12 @@ message RecordActivityTaskStartedRequest {

// Stamp represents the internal “version” of the activity options and can/will be changed with Activity API.
int32 stamp = 9;
// Presence of this value means matching has redirected the task to a deployment other than
// the deployment that History passed when scheduling the task.
// The deployment passed by History when the task was scheduled.
temporal.api.deployment.v1.Deployment scheduled_deployment = 10;
// Whether the directive deployment contains the activity's task queue. Used by History to
// Whether the scheduled deployment contains the activity's task queue. Used by History to
// determine if the activity redirect should affect the workflow.
// Only set if `directive_deployment` is set (i.e. the task is redirected).
bool directive_deployment_contains_task_queue = 11;
// Only set if `scheduled_deployment` is set (i.e. the task is redirected).
bool scheduled_deployment_contains_task_queue = 11;
}

message RecordActivityTaskStartedResponse {
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func NewConfig(

EnableSchedules: dynamicconfig.FrontendEnableSchedules.Get(dc),

EnableDeployments: dynamicconfig.FrontendEnableDeployments.Get(dc),
EnableDeployments: dynamicconfig.EnableDeployments.Get(dc),

EnableBatcher: dynamicconfig.FrontendEnableBatcher.Get(dc),
MaxConcurrentBatchOperation: dynamicconfig.FrontendMaxConcurrentBatchOperationPerNamespace.Get(dc),
Expand Down
4 changes: 0 additions & 4 deletions service/history/api/recordworkflowtaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,6 @@ func Invoke(
pollerDeployment := worker_versioning.DeploymentFromCapabilities(req.PollRequest.WorkerVersionCapabilities)
// Effective deployment of the workflow when History scheduled the WFT.
scheduledDeployment := req.GetScheduledDeployment()
if scheduledDeployment == nil {
// Matching does not send the directive deployment when it's the same as poller's.
scheduledDeployment = pollerDeployment
}
if !scheduledDeployment.Equal(wfDeployment) {
// This must be an AT scheduled before the workflow transitions to the current
// deployment. Matching can drop it.
Expand Down
2 changes: 1 addition & 1 deletion service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func NewConfig(
TestDisableSyncMatch: dynamicconfig.TestMatchingDisableSyncMatch.Get(dc),
LoadUserData: dynamicconfig.MatchingLoadUserData.Get(dc),
HistoryMaxPageSize: dynamicconfig.MatchingHistoryMaxPageSize.Get(dc),
EnableDeployments: dynamicconfig.MatchingEnableDeployments.Get(dc),
EnableDeployments: dynamicconfig.EnableDeployments.Get(dc),
MaxTaskQueuesInDeployment: dynamicconfig.MatchingMaxTaskQueuesInDeployment.Get(dc),
RPS: dynamicconfig.MatchingRPS.Get(dc),
OperatorRPSRatio: dynamicconfig.OperatorRPSRatio.Get(dc),
Expand Down
18 changes: 2 additions & 16 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2406,14 +2406,7 @@ func (e *matchingEngineImpl) recordWorkflowTaskStarted(
RequestId: uuid.New(),
PollRequest: pollReq,
BuildIdRedirectInfo: task.redirectInfo,
}

scheduledDeployment := task.event.Data.VersionDirective.GetDeployment()
dispatchDeployment := worker_versioning.DeploymentFromCapabilities(pollReq.GetWorkerVersionCapabilities())
if !scheduledDeployment.Equal(dispatchDeployment) {
// Redirect has happened, set the directive deployment in the request so History can
// validate the task is not stale.
recordStartedRequest.ScheduledDeployment = scheduledDeployment
ScheduledDeployment: task.event.Data.VersionDirective.GetDeployment(),
}

return e.historyClient.RecordWorkflowTaskStarted(ctx, recordStartedRequest)
Expand All @@ -2436,14 +2429,7 @@ func (e *matchingEngineImpl) recordActivityTaskStarted(
PollRequest: pollReq,
BuildIdRedirectInfo: task.redirectInfo,
Stamp: task.event.Data.GetStamp(),
}

scheduledDeployment := task.event.Data.VersionDirective.GetDeployment()
dispatchDeployment := worker_versioning.DeploymentFromCapabilities(pollReq.GetWorkerVersionCapabilities())
if !scheduledDeployment.Equal(dispatchDeployment) {
// Redirect has happened, set the directive deployment in the request so History can
// validate the task is not stale.
recordStartedRequest.ScheduledDeployment = scheduledDeployment
ScheduledDeployment: task.event.Data.VersionDirective.GetDeployment(),
}

return e.historyClient.RecordActivityTaskStarted(ctx, recordStartedRequest)
Expand Down
105 changes: 51 additions & 54 deletions service/matching/task_queue_partition_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,62 +781,61 @@ func (pm *taskQueuePartitionManagerImpl) getPhysicalQueuesForAdd(
forwardInfo *taskqueuespb.TaskForwardInfo,
runId string,
) (pinnedQueue physicalTaskQueueManager, syncMatchQueue physicalTaskQueueManager, userDataChanged <-chan struct{}, err error) {
if deployment := directive.GetDeployment(); deployment != nil {
wfBehavior := directive.GetBehavior()

switch wfBehavior {
case enumspb.VERSIONING_BEHAVIOR_PINNED:
if pm.partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY {
// TODO (shahab): we can verify the passed deployment matches the last poller's deployment
return pm.defaultQueue, pm.defaultQueue, userDataChanged, nil
}
wfBehavior := directive.GetBehavior()
deployment := directive.GetDeployment()

err = worker_versioning.ValidateDeployment(deployment)
if err != nil {
return nil, nil, nil, err
}
pinnedQueue, err = pm.getVersionedQueue(ctx, "", "", deployment, true)
if err != nil {
return nil, nil, nil, err
}
if forwardInfo == nil {
// Task is not forwarded, so it can be spooled if sync match fails.
// Spool queue and sync match queue is the same for pinned workflows.
return pinnedQueue, pinnedQueue, nil, nil
} else {
// Forwarded from child partition - only do sync match.
return nil, pinnedQueue, nil, nil
}
case enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE:
perTypeUserData, perTypeUserDataChanged, err := pm.getPerTypeUserData()
if err != nil {
return nil, nil, nil, err
}
if wfBehavior == enumspb.VERSIONING_BEHAVIOR_PINNED {
if pm.partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY {
// TODO (shahab): we can verify the passed deployment matches the last poller's deployment
return pm.defaultQueue, pm.defaultQueue, userDataChanged, nil
}

currentDeployment := findCurrentDeployment(perTypeUserData.GetDeploymentData())
err = worker_versioning.ValidateDeployment(deployment)
if err != nil {
return nil, nil, nil, err
}
pinnedQueue, err = pm.getVersionedQueue(ctx, "", "", deployment, true)
if err != nil {
return nil, nil, nil, err
}
if forwardInfo == nil {
// Task is not forwarded, so it can be spooled if sync match fails.
// Spool queue and sync match queue is the same for pinned workflows.
return pinnedQueue, pinnedQueue, nil, nil
} else {
// Forwarded from child partition - only do sync match.
return nil, pinnedQueue, nil, nil
}
}

if pm.partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY {
if !deployment.Equal(currentDeployment) {
// Current deployment has changed, so the workflow should move to a normal queue to
// get redirected to the new deployment.
return nil, nil, nil, serviceerrors.NewStickyWorkerUnavailable()
}
perTypeUserData, perTypeUserDataChanged, err := pm.getPerTypeUserData()
if err != nil {
return nil, nil, nil, err
}

// TODO (shahab): we can verify the passed deployment matches the last poller's deployment
return pm.defaultQueue, pm.defaultQueue, perTypeUserDataChanged, nil
currentDeployment := findCurrentDeployment(perTypeUserData.GetDeploymentData())
if currentDeployment != nil &&
// Make sure the wf is not v1-2 versioned
directive.GetAssignedBuildId() == "" {
if pm.partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY {
if !deployment.Equal(currentDeployment) {
// Current deployment has changed, so the workflow should move to a normal queue to
// get redirected to the new deployment.
return nil, nil, nil, serviceerrors.NewStickyWorkerUnavailable()
}

currentDeploymentQueue, err := pm.getVersionedQueue(ctx, "", "", currentDeployment, true)
if forwardInfo == nil {
// Task is not forwarded, so it can be spooled if sync match fails.
// Unpinned tasks are spooled in default queue
return pm.defaultQueue, currentDeploymentQueue, perTypeUserDataChanged, err
} else {
// Forwarded from child partition - only do sync match.
return nil, currentDeploymentQueue, perTypeUserDataChanged, err
}
case enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED:
return nil, nil, nil, serviceerror.NewInvalidArgument("versioning behavior must be set")
// TODO (shahab): we can verify the passed deployment matches the last poller's deployment
return pm.defaultQueue, pm.defaultQueue, perTypeUserDataChanged, nil
}

currentDeploymentQueue, err := pm.getVersionedQueue(ctx, "", "", currentDeployment, true)
if forwardInfo == nil {
// Task is not forwarded, so it can be spooled if sync match fails.
// Unpinned tasks are spooled in default queue
return pm.defaultQueue, currentDeploymentQueue, perTypeUserDataChanged, err
} else {
// Forwarded from child partition - only do sync match.
return nil, currentDeploymentQueue, perTypeUserDataChanged, err
}
}

Expand Down Expand Up @@ -987,8 +986,6 @@ func (pm *taskQueuePartitionManagerImpl) getPerTypeUserData() (*persistencespb.T
if err != nil {
return nil, nil, err
}
if perType, ok := userData.GetData().GetPerType()[int32(pm.Partition().TaskType())]; ok {
return perType, userDataChanged, nil
}
return nil, userDataChanged, nil
perType := userData.GetData().GetPerType()[int32(pm.Partition().TaskType())]
return perType, userDataChanged, nil
}
7 changes: 5 additions & 2 deletions service/matching/task_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/primitives/timestamp"
serviceerrors "go.temporal.io/server/common/serviceerror"
"go.temporal.io/server/common/util"
"go.temporal.io/server/internal/goro"
)
Expand Down Expand Up @@ -155,10 +156,12 @@ dispatchLoop:
continue dispatchLoop
}

var stickyUnavailable *serviceerrors.StickyWorkerUnavailable
// if task is still valid (truly valid or unable to verify if task is valid)
metrics.BufferThrottlePerTaskQueueCounter.With(tr.taggedMetricsHandler()).Record(1)
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
// Don't log here if encounters missing user data error when dispatch a versioned task.
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) &&
// StickyWorkerUnavailable is expected for versioned sticky queues
!errors.As(err, &stickyUnavailable) {
tr.throttledLogger().Error("taskReader: unexpected error dispatching task", tag.Error(err))
}
util.InterruptibleSleep(ctx, taskReaderOfferThrottleWait)
Expand Down
Loading
Loading