Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into shahab/versioning-tes…
Browse files Browse the repository at this point in the history
…ts-act

# Conflicts:
#	service/matching/config.go
#	service/worker/deployment/fx.go
#	tests/deployment_test.go
#	tests/testcore/functional_test_base.go
#	tests/versioning_3_test.go
#	tests/versioning_test.go
  • Loading branch information
ShahabT committed Dec 2, 2024
2 parents e7fa120 + 6be0492 commit 99636b0
Show file tree
Hide file tree
Showing 13 changed files with 492 additions and 326 deletions.
8 changes: 5 additions & 3 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,10 +852,12 @@ of Timeout and if no activity is seen even after that the connection is closed.`
true,
`FrontendEnableSchedules enables schedule-related RPCs in the frontend`,
)
FrontendEnableDeployments = NewNamespaceBoolSetting(
"frontend.enableDeployments",
EnableDeployments = NewNamespaceBoolSetting(
"system.enableDeployments",
false,
`FrontendEnableDeployments enables deployment-related RPCs in the frontend`,
`EnableDeployments enables deployments (versioning v3) in all services,
including deployment-related RPCs in the frontend, deployment entity workflows in the worker,
and deployment interaction in matching and history.`,
)
EnableNexus = NewGlobalBoolSetting(
"system.enableNexus",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ require (
go.opentelemetry.io/otel/sdk v1.31.0
go.opentelemetry.io/otel/sdk/metric v1.31.0
go.opentelemetry.io/otel/trace v1.31.0
go.temporal.io/api v1.42.1-0.20241127002502-4bef51a251c3
go.temporal.io/api v1.43.0
go.temporal.io/sdk v1.30.1
go.temporal.io/version v0.3.0
go.uber.org/automaxprocs v1.5.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,8 @@ go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HY
go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A=
go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0=
go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8=
go.temporal.io/api v1.42.1-0.20241127002502-4bef51a251c3 h1:mEFYZlGzw0i4sEKdf5mrFl4QpsO+UGlKkqIcF/Tu5lY=
go.temporal.io/api v1.42.1-0.20241127002502-4bef51a251c3/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/api v1.43.0 h1:lBhq+u5qFJqGMXwWsmg/i8qn1UA/3LCwVc88l2xUMHg=
go.temporal.io/api v1.43.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/sdk v1.30.1 h1:4wgfSjwuaayQl9Q0mUzpNV6w55TPAESSroR6Z5lE49o=
go.temporal.io/sdk v1.30.1/go.mod h1:hNCZzd6dt7bxD9B4AECQgjHTd2NrzjdmGDbbv4xHuFU=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func NewConfig(

EnableSchedules: dynamicconfig.FrontendEnableSchedules.Get(dc),

EnableDeployments: dynamicconfig.FrontendEnableDeployments.Get(dc),
EnableDeployments: dynamicconfig.EnableDeployments.Get(dc),

EnableBatcher: dynamicconfig.FrontendEnableBatcher.Get(dc),
MaxConcurrentBatchOperation: dynamicconfig.FrontendMaxConcurrentBatchOperationPerNamespace.Get(dc),
Expand Down
82 changes: 43 additions & 39 deletions service/history/api/multioperation/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ var (
)

type (
// updateError is a wrapper to distinguish an update error from a start error.
updateError struct{ error }
// noStartError is a sentinel error that indicates no workflow start occurred.
noStartError struct{ startworkflow.StartOutcome }
)

func Invoke(
Expand Down Expand Up @@ -131,15 +134,18 @@ func Invoke(

// For workflow id conflict policy terminate-existing, always attempt a start
// since that works when the workflow is already running *and* when it's not running.
if startReq.StartRequest.WorkflowIdConflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING {
if resp, outcome, err := startAndUpdateWorkflow(
ctx, shardContext, workflowConsistencyChecker, starter, updater,
); err != nil {
conflictPolicy := startReq.StartRequest.WorkflowIdConflictPolicy
if conflictPolicy == enumspb.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING {
resp, err := startAndUpdateWorkflow(ctx, starter, updater)
var noStartErr *noStartError
switch {
case errors.As(err, &noStartErr):
// The start request was deduped. Continue below and only send the update.
case err != nil:
return nil, err
} else if outcome != startworkflow.StartDeduped {
default:
return resp, nil
}
// if the start was deduped, we fall through to the update
}

currentWorkflowLease, err := workflowConsistencyChecker.GetWorkflowLease(
Expand All @@ -157,7 +163,7 @@ func Invoke(

// workflow was already started, ...
if currentWorkflowLease != nil {
switch startReq.StartRequest.WorkflowIdConflictPolicy {
switch conflictPolicy {
case enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING:
// ... skip the start and only send the update
// NOTE: currentWorkflowLease will be released by the function
Expand Down Expand Up @@ -191,13 +197,20 @@ func Invoke(
}

// workflow hasn't been started yet: start and then apply update
resp, _, err := startAndUpdateWorkflow(
ctx,
shardContext,
workflowConsistencyChecker,
starter,
updater,
)
resp, err := startAndUpdateWorkflow(ctx, starter, updater)
var noStartErr *noStartError
if errors.As(err, &noStartErr) {
// The workflow was meant to be started - but was actually *not* started.
// The problem is that the update has not been applied.
//
// This can happen when there's a race: another workflow start occurred right after the check for a
// running workflow above - but before the new workflow could be created (and locked).
// TODO: Consider a refactoring of the startworkflow.Starter to make this case impossible.
//
// The best way forward is to exit and retry from the top.
// By returning an Unavailable service error, the entire MultiOperation will be retried.
return nil, newMultiOpError(serviceerror.NewUnavailable(err.Error()), multiOpAbortedErr)
}
return resp, err
}

Expand Down Expand Up @@ -252,44 +265,31 @@ func updateWorkflow(
}, nil
}

// NOTE: Returns a `noStartError` if the start was unexpectedly reused/deduped.
func startAndUpdateWorkflow(
ctx context.Context,
shardContext shard.Context,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
starter *startworkflow.Starter,
updater *updateworkflow.Updater,
) (*historyservice.ExecuteMultiOperationResponse, startworkflow.StartOutcome, error) {
) (*historyservice.ExecuteMultiOperationResponse, error) {
startResp, startOutcome, err := starter.Invoke(ctx)
if err != nil {
// an update error occurred
// An update error occurred.
if errors.As(err, &updateError{}) {
return nil, startOutcome, newMultiOpError(multiOpAbortedErr, err)
return nil, newMultiOpError(multiOpAbortedErr, err)
}

// a start error occurred
return nil, startOutcome, newMultiOpError(err, multiOpAbortedErr)
// A start error occurred.
return nil, newMultiOpError(err, multiOpAbortedErr)
}

switch startOutcome {
case startworkflow.NoStart:
panic("unreachable")
case startworkflow.StartNew:
case startworkflow.StartReused:
// The workflow was meant to be *started* - but was actually *not* started since it's already running.
// The best way forward is to exit and retry from the top.
// By returning an Unavailable service error, the entire MultiOperation will be retried.
return nil, startOutcome, newMultiOpError(
serviceerror.NewUnavailable("Workflow could not be started as it is already running"), multiOpAbortedErr)
case startworkflow.StartDeduped:
// Since the start request was deduped, the update was not applied to the *current* workflow execution.
// Returning here to allow the caller to apply the update to the current workflow execution.
return nil, startOutcome, nil
if startOutcome != startworkflow.StartNew {
// The workflow was not started.
// Aborting since the update has not been applied.
return nil, &noStartError{startOutcome}
}

// wait for the update to complete
updateResp, err := updater.OnSuccess(ctx)
if err != nil {
return nil, startOutcome, newMultiOpError(nil, err) // `nil` for start since it succeeded
return nil, newMultiOpError(nil, err) // `nil` for start since it succeeded
}

return &historyservice.ExecuteMultiOperationResponse{
Expand All @@ -305,7 +305,7 @@ func startAndUpdateWorkflow(
},
},
},
}, startOutcome, nil
}, nil
}

func newMultiOpError(startErr, updateErr error) error {
Expand All @@ -326,3 +326,7 @@ func newMultiOpError(startErr, updateErr error) error {
func dedup(startReq *historyservice.StartWorkflowExecutionRequest, currentWorkflowLease api.WorkflowLease) bool {
return startReq.StartRequest.RequestId == currentWorkflowLease.GetMutableState().GetExecutionState().GetCreateRequestId()
}

func (e *noStartError) Error() string {
return fmt.Sprintf("Workflow was not started: %v", e.StartOutcome)
}
41 changes: 28 additions & 13 deletions service/history/api/startworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ const (
)

const (
NoStart StartOutcome = iota
StartErr StartOutcome = iota
StartNew
StartReused
StartDeduped
Expand Down Expand Up @@ -192,20 +192,20 @@ func (s *Starter) Invoke(
) (resp *historyservice.StartWorkflowExecutionResponse, startOutcome StartOutcome, retError error) {
request := s.request.StartRequest
if err := s.prepare(ctx); err != nil {
return nil, NoStart, err
return nil, StartErr, err
}

creationParams, err := s.prepareNewWorkflow(request.GetWorkflowId())
if err != nil {
return nil, NoStart, err
return nil, StartErr, err
}
defer func() {
creationParams.workflowLease.GetReleaseFn()(retError)
}()

currentExecutionLock, err := s.lockCurrentWorkflowExecution(ctx)
if err != nil {
return nil, NoStart, err
return nil, StartErr, err
}
defer func() {
currentExecutionLock(retError)
Expand All @@ -219,7 +219,7 @@ func (s *Starter) Invoke(
return s.handleConflict(ctx, creationParams, currentWorkflowConditionFailedError)
}

return nil, NoStart, err
return nil, StartErr, err
}

resp, err = s.generateResponse(
Expand Down Expand Up @@ -320,18 +320,18 @@ func (s *Starter) handleConflict(
}

if err := s.verifyNamespaceActive(creationParams, currentWorkflowConditionFailed); err != nil {
return nil, NoStart, err
return nil, StartErr, err
}

response, startOutcome, err := s.resolveDuplicateWorkflowID(ctx, currentWorkflowConditionFailed)
if err != nil {
return nil, NoStart, err
return nil, StartErr, err
} else if response != nil {
return response, startOutcome, nil
}

if err := s.createAsCurrent(ctx, creationParams, currentWorkflowConditionFailed); err != nil {
return nil, NoStart, err
return nil, StartErr, err
}

resp, err := s.generateResponse(
Expand Down Expand Up @@ -422,9 +422,9 @@ func (s *Starter) resolveDuplicateWorkflowID(
}
return resp, StartReused, nil
case err != nil:
return nil, NoStart, err
return nil, StartErr, err
case currentExecutionUpdateAction == nil:
return nil, NoStart, nil
return nil, StartErr, nil
}

var workflowLease api.WorkflowLease
Expand Down Expand Up @@ -484,16 +484,16 @@ func (s *Starter) resolveDuplicateWorkflowID(
}
events, err := s.getWorkflowHistory(ctx, mutableStateInfo)
if err != nil {
return nil, NoStart, err
return nil, StartErr, err
}
resp, err := s.generateResponse(newRunID, mutableStateInfo.workflowTask, events)
return resp, StartNew, err
case consts.ErrWorkflowCompleted:
// current workflow already closed
// fallthough to the logic for only creating the new workflow below
return nil, NoStart, nil
return nil, StartErr, nil
default:
return nil, NoStart, err
return nil, StartErr, err
}
}

Expand Down Expand Up @@ -686,3 +686,18 @@ func (s *Starter) generateResponse(
},
}, nil
}

func (s StartOutcome) String() string {
switch s {
case StartErr:
return "StartErr"
case StartNew:
return "StartNew"
case StartReused:
return "StartReused"
case StartDeduped:
return "StartDeduped"
default:
return "Unknown"
}
}
2 changes: 1 addition & 1 deletion service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func NewConfig(
TestDisableSyncMatch: dynamicconfig.TestMatchingDisableSyncMatch.Get(dc),
LoadUserData: dynamicconfig.MatchingLoadUserData.Get(dc),
HistoryMaxPageSize: dynamicconfig.MatchingHistoryMaxPageSize.Get(dc),
EnableDeployments: dynamicconfig.FrontendEnableDeployments.Get(dc),
EnableDeployments: dynamicconfig.EnableDeployments.Get(dc),
MaxTaskQueuesInDeployment: dynamicconfig.MatchingMaxTaskQueuesInDeployment.Get(dc),
RPS: dynamicconfig.MatchingRPS.Get(dc),
OperatorRPSRatio: dynamicconfig.OperatorRPSRatio.Get(dc),
Expand Down
6 changes: 2 additions & 4 deletions service/matching/task_queue_partition_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,8 +986,6 @@ func (pm *taskQueuePartitionManagerImpl) getPerTypeUserData() (*persistencespb.T
if err != nil {
return nil, nil, err
}
if perType, ok := userData.GetData().GetPerType()[int32(pm.Partition().TaskType())]; ok {
return perType, userDataChanged, nil
}
return nil, userDataChanged, nil
perType := userData.GetData().GetPerType()[int32(pm.Partition().TaskType())]
return perType, userDataChanged, nil
}
2 changes: 1 addition & 1 deletion service/worker/deployment/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewResult(
return fxResult{
Component: &workerComponent{
activityDeps: params,
enabledForNs: dynamicconfig.FrontendEnableDeployments.Get(dc),
enabledForNs: dynamicconfig.EnableDeployments.Get(dc),
},
}
}
Expand Down
6 changes: 2 additions & 4 deletions tests/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestDeploymentSuite(t *testing.T) {
func (s *DeploymentSuite) SetupSuite() {
s.setAssertions()
dynamicConfigOverrides := map[dynamicconfig.Key]any{
dynamicconfig.FrontendEnableDeployments.Key(): true,
dynamicconfig.EnableDeployments.Key(): true,
dynamicconfig.FrontendEnableWorkerVersioningDataAPIs.Key(): true,
dynamicconfig.FrontendEnableWorkerVersioningWorkflowAPIs.Key(): true,
dynamicconfig.FrontendEnableWorkerVersioningRuleAPIs.Key(): true,
Expand Down Expand Up @@ -207,9 +207,7 @@ func (s *DeploymentSuite) TestDescribeDeployment_RegisterTaskQueue_ConcurrentPol
for p := 0; p < 4; p++ {
for i := 0; i < 3; i++ {
tq := &taskqueuepb.TaskQueue{Name: root.TaskQueue().NormalPartition(p).RpcName(), Kind: enumspb.TASK_QUEUE_KIND_NORMAL}
go func() {
s.pollFromDeployment(ctx, tq, workerDeployment)
}()
s.pollFromDeployment(ctx, tq, workerDeployment)
}
}

Expand Down
6 changes: 3 additions & 3 deletions tests/testcore/functional_test_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ func (s *FunctionalTestBase) SetupSuite(defaultClusterConfigFile string, options
s.operatorClient = s.testCluster.OperatorClient()
s.httpAPIAddress = cluster.Host().FrontendHTTPAddress()

s.namespace = "functional-test-namespace"
s.namespace = RandomizeStr("namespace")
s.Require().NoError(s.registerNamespaceWithDefaults(s.namespace))

s.foreignNamespace = RandomizeStr("functional-foreign-test-namespace")
s.foreignNamespace = RandomizeStr("foreign-namespace")
s.Require().NoError(s.registerNamespaceWithDefaults(s.foreignNamespace))

if clusterConfig.EnableArchival {
s.archivalNamespace = RandomizeStr("functional-archival-enabled-namespace")
s.archivalNamespace = RandomizeStr("archival-enabled-namespace")
s.Require().NoError(s.registerArchivalNamespace(s.archivalNamespace))
}
}
Expand Down
Loading

0 comments on commit 99636b0

Please sign in to comment.