Skip to content

Commit

Permalink
[Reset] Record the new runID when an execution is reset (#6694)
Browse files Browse the repository at this point in the history
## What changed?
During reset we now record the new run ID in the base execution. It
handles the cases where the base and current could either be same or
different.

## Why?
This is useful to point users in UI to the new run when they are viewing
an execution that was reset.

## How did you test it?
Added new functional tests for reset.

## Potential risks
N/A

## Documentation
N/A

## Is hotfix candidate?
No
  • Loading branch information
gow authored Nov 5, 2024
1 parent 705f714 commit 62ffa80
Show file tree
Hide file tree
Showing 15 changed files with 1,362 additions and 921 deletions.
1,777 changes: 894 additions & 883 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ message WorkflowExecutionInfo {

// The workflow has been reset.
bool workflow_was_reset = 96;

// Reset Run ID points to the new nun when this execution is reset. If the execution is reset multiple times, it points to the latest run.
string reset_run_id = 97;
}

message ExecutionStats {
Expand Down Expand Up @@ -325,7 +328,7 @@ message ReplicationTaskInfo {
temporal.server.api.enums.v1.TaskPriority priority = 18;
VersionedTransition versioned_transition = 19;
// A list of event-based replication tasks that, together, are equivalent
// to this state-based task.
// to this state-based task.
// TODO: Remove this field when state-based replication is stable and
// doesn't need to be disabled.
repeated ReplicationTaskInfo task_equivalents = 20;
Expand Down Expand Up @@ -618,7 +621,7 @@ message CallbackInfo {

// NexusOperationInfo contains the state of a nexus operation.
message NexusOperationInfo {
// Endpoint name.
// Endpoint name.
// Resolved the endpoint registry for this workflow's namespace.
string endpoint = 1;

Expand Down
14 changes: 8 additions & 6 deletions service/history/api/reapplyevents/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ func Invoke(
}
baseCurrentBranchToken := baseCurrentVersionHistory.GetBranchToken()
baseNextEventID := mutableState.GetNextEventID()
baseWorkflow := ndc.NewWorkflow(
shard.GetClusterMetadata(),
context,
mutableState,
wcache.NoopReleaseFn,
)

err = workflowResetter.ResetWorkflow(
ctx,
Expand All @@ -150,12 +156,8 @@ func Invoke(
baseNextEventID,
resetRunID.String(),
uuid.New().String(),
ndc.NewWorkflow(
shard.GetClusterMetadata(),
context,
mutableState,
wcache.NoopReleaseFn,
),
baseWorkflow,
baseWorkflow,
ndc.EventsReapplicationResetWorkflowReason,
toReapplyEvents,
nil,
Expand Down
7 changes: 7 additions & 0 deletions service/history/api/resetworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ func Invoke(
}
baseCurrentBranchToken := baseCurrentVersionHistory.GetBranchToken()
baseNextEventID := baseMutableState.GetNextEventID()
baseWorkflow := ndc.NewWorkflow(
shard.GetClusterMetadata(),
baseWorkflowLease.GetContext(),
baseWorkflowLease.GetMutableState(),
baseWorkflowLease.GetReleaseFn(),
)

if err := ndc.NewWorkflowResetter(
shard,
Expand All @@ -153,6 +159,7 @@ func Invoke(
baseNextEventID,
resetRunID,
request.GetRequestId(),
baseWorkflow,
ndc.NewWorkflow(
shard.GetClusterMetadata(),
currentWorkflowLease.GetContext(),
Expand Down
2 changes: 1 addition & 1 deletion service/history/history_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5313,7 +5313,7 @@ func (s *engineSuite) TestReapplyEvents_ResetWorkflow() {
s.mockWorkflowResetter.EXPECT().ResetWorkflow(
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
).Return(nil)

err = s.mockHistoryEngine.ReapplyEvents(
Expand Down
1 change: 1 addition & 0 deletions service/history/ndc/transaction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ func (r *transactionMgrImpl) backfillWorkflowEventsReapply(
resetRunID,
uuid.New(),
targetWorkflow,
targetWorkflow,
EventsReapplicationResetWorkflowReason,
totalEvents,
nil,
Expand Down
2 changes: 2 additions & 0 deletions service/history/ndc/transaction_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Active_Closed
gomock.Any(),
gomock.Any(),
targetWorkflow,
targetWorkflow,
EventsReapplicationResetWorkflowReason,
workflowEvents.Events,
nil,
Expand Down Expand Up @@ -324,6 +325,7 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Closed_ResetF
gomock.Any(),
gomock.Any(),
targetWorkflow,
targetWorkflow,
EventsReapplicationResetWorkflowReason,
workflowEvents.Events,
nil,
Expand Down
73 changes: 60 additions & 13 deletions service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type (
baseNextEventID int64,
resetRunID string,
resetRequestID string,
baseWorkflow Workflow,
currentWorkflow Workflow,
resetReason string,
additionalReapplyEvents []*historypb.HistoryEvent,
Expand Down Expand Up @@ -109,6 +110,10 @@ func NewWorkflowResetter(
}
}

// ResetWorkflow resets the given base run and creates a new run that would start after baseNextEventID. It additionally does the following
// - cherry-picks the relevant events from base run and reapplies them to the new run.
// - Terminates the current run if one is running. Also cherry picks and reapplies any relevant events generated due to termination of current run.
// - Performs other bookkeeping like linking base->new run, marking current as terminated due to reset etc.
func (r *workflowResetterImpl) ResetWorkflow(
ctx context.Context,
namespaceID namespace.ID,
Expand All @@ -120,6 +125,7 @@ func (r *workflowResetterImpl) ResetWorkflow(
baseNextEventID int64,
resetRunID string,
resetRequestID string,
baseWorkflow Workflow,
currentWorkflow Workflow,
resetReason string,
additionalReapplyEvents []*historypb.HistoryEvent,
Expand All @@ -130,6 +136,10 @@ func (r *workflowResetterImpl) ResetWorkflow(
if err != nil {
return err
}

// update base workflow to point to new runID after the reset.
baseWorkflow.GetMutableState().UpdateResetRunID(resetRunID)

resetWorkflowVersion := namespaceEntry.FailoverVersion()

var currentWorkflowMutation *persistence.WorkflowMutation
Expand Down Expand Up @@ -229,6 +239,7 @@ func (r *workflowResetterImpl) ResetWorkflow(

if err = r.persistToDB(
ctx,
baseWorkflow,
currentWorkflow,
currentWorkflowMutation,
currentWorkflowEventsSeq,
Expand Down Expand Up @@ -320,20 +331,35 @@ func (r *workflowResetterImpl) prepareResetWorkflow(

func (r *workflowResetterImpl) persistToDB(
ctx context.Context,
baseWorkflow Workflow,
currentWorkflow Workflow,
currentWorkflowMutation *persistence.WorkflowMutation,
currentWorkflowEventsSeq []*persistence.WorkflowEvents,
resetWorkflow Workflow,
) error {

currentRunID := currentWorkflow.GetMutableState().GetExecutionState().GetRunId()
baseRunID := baseWorkflow.GetMutableState().GetExecutionState().GetRunId()
resetWorkflowSnapshot, resetWorkflowEventsSeq, err := resetWorkflow.GetMutableState().CloseTransactionAsSnapshot(
workflow.TransactionPolicyActive,
)
if err != nil {
return err
}

if currentWorkflowMutation != nil {
if currentRunID == baseRunID {
// There are just 2 runs to update - base & new run.
// Additionally since base is the same as current, we should ensure that we prepare only one of them for transaction and do it only once (to avoid DBRecordVersion conflict)
// So check if current was already prepared for transaction. If not prepare the mutation for transaction.
if currentWorkflowMutation == nil {
currentWorkflowMutation, currentWorkflowEventsSeq, err = currentWorkflow.GetMutableState().CloseTransactionAsMutation(
workflow.TransactionPolicyActive,
)
if err != nil {
return err
}

}

if _, _, err := r.transaction.UpdateWorkflowExecution(
ctx,
persistence.UpdateWorkflowModeUpdateCurrent,
Expand All @@ -347,25 +373,46 @@ func (r *workflowResetterImpl) persistToDB(
return err
}
return nil

}

currentMutableState := currentWorkflow.GetMutableState()
currentRunID := currentMutableState.GetExecutionState().GetRunId()
currentCloseVersion, err := currentMutableState.GetCloseVersion()
if currentWorkflowMutation == nil {
// We have 2 different runs to update here - the base run & the new run. There were no changes to current.
// However we are still preparing current for transaction only to be able to use transaction.ConflictResolveWorkflowExecution() method below.
currentWorkflowMutation, currentWorkflowEventsSeq, err = currentWorkflow.GetMutableState().CloseTransactionAsMutation(
workflow.TransactionPolicyActive,
)
if err != nil {
return err
}
}

// We have 3 different runs to update here. However we have to prepare the snapshot of the base for transaction to be used in transaction.ConflictResolveWorkflowExecution() method.
// We use this method since it allows us to commit changes from all 3 different runs in the same DB transaction.
baseSnapshot, baseEventsSeq, err := baseWorkflow.GetMutableState().CloseTransactionAsSnapshot(
workflow.TransactionPolicyActive,
)
if err != nil {
return err
}

return resetWorkflow.GetContext().CreateWorkflowExecution(
resetRunVersion := resetWorkflow.GetMutableState().GetCurrentVersion()
currentRunVerson := currentWorkflow.GetMutableState().GetCurrentVersion()
if _, _, _, err := r.transaction.ConflictResolveWorkflowExecution(
ctx,
r.shardContext,
persistence.CreateWorkflowModeUpdateCurrent,
currentRunID,
currentCloseVersion,
resetWorkflow.GetMutableState(),
persistence.ConflictResolveWorkflowModeUpdateCurrent,
baseWorkflow.GetMutableState().GetCurrentVersion(),
baseSnapshot,
baseEventsSeq,
&resetRunVersion,
resetWorkflowSnapshot,
resetWorkflowEventsSeq,
)
&currentRunVerson,
currentWorkflowMutation,
currentWorkflowEventsSeq,
); err != nil {
return err
}
return nil
}

func (r *workflowResetterImpl) replayResetWorkflow(
Expand Down
8 changes: 4 additions & 4 deletions service/history/ndc/workflow_resetter_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 18 additions & 12 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentTerminated() {
currentWorkflow.EXPECT().GetContext().Return(currentContext).AnyTimes()
currentWorkflow.EXPECT().GetMutableState().Return(currentMutableState).AnyTimes()
currentWorkflow.EXPECT().GetReleaseFn().Return(currentReleaseFn).AnyTimes()
currentMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
RunId: s.currentRunID,
}).AnyTimes()

currentMutableState.EXPECT().GetCurrentVersion().Return(int64(0)).AnyTimes()
currentNewEventsSize := int64(3444)
Expand Down Expand Up @@ -211,16 +214,14 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentTerminated() {
resetEventsSeq,
).Return(currentNewEventsSize, resetNewEventsSize, nil)

err := s.workflowResetter.persistToDB(context.Background(), currentWorkflow, currentMutation, currentEventsSeq, resetWorkflow)
err := s.workflowResetter.persistToDB(context.Background(), currentWorkflow, currentWorkflow, currentMutation, currentEventsSeq, resetWorkflow)
s.NoError(err)
// persistToDB function is not charged of releasing locks
s.False(currentReleaseCalled)
s.False(resetReleaseCalled)
}

func (s *workflowResetterSuite) TestPersistToDB_CurrentNotTerminated() {
currentCloseVersion := int64(1234)

currentWorkflow := NewMockWorkflow(s.controller)
currentReleaseCalled := false
currentContext := workflow.NewMockContext(s.controller)
Expand All @@ -233,12 +234,16 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentNotTerminated() {
RunId: s.currentRunID,
}).AnyTimes()

currentMutableState.EXPECT().GetCloseVersion().Return(currentCloseVersion, nil).AnyTimes()
currentMutation := &persistence.WorkflowMutation{}
currentEventsSeq := []*persistence.WorkflowEvents{{}}
currentMutableState.EXPECT().GetCurrentVersion().Return(int64(0)).AnyTimes()
currentMutableState.EXPECT().CloseTransactionAsMutation(workflow.TransactionPolicyActive).Return(currentMutation, currentEventsSeq, nil)

resetWorkflow := NewMockWorkflow(s.controller)
resetReleaseCalled := false
resetContext := workflow.NewMockContext(s.controller)
resetMutableState := workflow.NewMockMutableState(s.controller)
resetMutableState.EXPECT().GetCurrentVersion().Return(int64(0)).AnyTimes()
var tarGetReleaseFn wcache.ReleaseCacheFunc = func(error) { resetReleaseCalled = true }
resetWorkflow.EXPECT().GetContext().Return(resetContext).AnyTimes()
resetWorkflow.EXPECT().GetMutableState().Return(resetMutableState).AnyTimes()
Expand All @@ -259,18 +264,19 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentNotTerminated() {
resetMutableState.EXPECT().CloseTransactionAsSnapshot(
workflow.TransactionPolicyActive,
).Return(resetSnapshot, resetEventsSeq, nil)
resetContext.EXPECT().CreateWorkflowExecution(

s.mockTransaction.EXPECT().UpdateWorkflowExecution(
gomock.Any(),
s.mockShard,
persistence.CreateWorkflowModeUpdateCurrent,
s.currentRunID,
currentCloseVersion,
resetMutableState,
persistence.UpdateWorkflowModeUpdateCurrent,
int64(0),
currentMutation,
currentEventsSeq,
util.Ptr(int64(0)),
resetSnapshot,
resetEventsSeq,
).Return(nil)
).Return(int64(0), int64(0), nil)

err := s.workflowResetter.persistToDB(context.Background(), currentWorkflow, nil, nil, resetWorkflow)
err := s.workflowResetter.persistToDB(context.Background(), currentWorkflow, currentWorkflow, nil, nil, resetWorkflow)
s.NoError(err)
// persistToDB function is not charged of releasing locks
s.False(currentReleaseCalled)
Expand Down
9 changes: 9 additions & 0 deletions service/history/transfer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,7 @@ func (t *transferQueueActiveTaskExecutor) processResetWorkflow(
task,
reason,
resetPoint,
baseContext,
baseMutableState,
currentContext,
currentMutableState,
Expand Down Expand Up @@ -1396,6 +1397,7 @@ func (t *transferQueueActiveTaskExecutor) resetWorkflow(
task *tasks.ResetWorkflowTask,
reason string,
resetPoint *workflowpb.ResetPointInfo,
baseContext workflow.Context,
baseMutableState workflow.MutableState,
currentContext workflow.Context,
currentMutableState workflow.MutableState,
Expand Down Expand Up @@ -1425,6 +1427,12 @@ func (t *transferQueueActiveTaskExecutor) resetWorkflow(
baseCurrentBranchToken := baseCurrentVersionHistory.GetBranchToken()
baseNextEventID := baseMutableState.GetNextEventID()

baseWorkflow := ndc.NewWorkflow(
t.shardContext.GetClusterMetadata(),
baseContext,
baseMutableState,
wcache.NoopReleaseFn, // this is fine since caller will defer on release
)
err = t.workflowResetter.ResetWorkflow(
resetWorkflowCtx,
namespaceID,
Expand All @@ -1436,6 +1444,7 @@ func (t *transferQueueActiveTaskExecutor) resetWorkflow(
baseNextEventID,
resetRunID,
uuid.New(),
baseWorkflow,
ndc.NewWorkflow(
t.shardContext.GetClusterMetadata(),
currentContext,
Expand Down
3 changes: 3 additions & 0 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ type (
GetUpdateOutcome(ctx context.Context, updateID string) (*updatepb.Outcome, error)

CheckResettable() error
// UpdateResetRunID saves the runID that resulted when this execution was reset.
UpdateResetRunID(runID string)

CloneToProto() *persistencespb.WorkflowMutableState
RetryActivity(ai *persistencespb.ActivityInfo, failure *failurepb.Failure) (enumspb.RetryState, error)
RecordLastActivityStarted(ai *persistencespb.ActivityInfo)
Expand Down
Loading

0 comments on commit 62ffa80

Please sign in to comment.