Skip to content

Commit

Permalink
sync all activities when disabling state-based replication (#6656)
Browse files Browse the repository at this point in the history
## What changed?
sync all activities when disabling state-based replication

## Why?
We may dropped some tasks before disabling state-based replication. So
when disabling always needs to sync all activities.

## How did you test it?
unit tests.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
hai719 authored Oct 17, 2024
1 parent 92450c8 commit 6ccadac
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 1 deletion.
26 changes: 25 additions & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ type (
// TODO deprecate nextEventIDInDB in favor of dbRecordVersion.
// Indicates the next event ID in DB, for conditional update.
nextEventIDInDB int64
// Indicates the versionedTransition in DB, can be used to
// calculate if the state-based replication is disabling.
versionedTransitionInDB *persistencespb.VersionedTransition
// Indicates the DB record version, for conditional update.
dbRecordVersion int64
// Namespace entry contains a snapshot of namespace.
Expand Down Expand Up @@ -455,6 +458,9 @@ func NewMutableStateFromDB(
mutableState.nextEventIDInDB = dbRecord.NextEventId
mutableState.dbRecordVersion = dbRecordVersion
mutableState.checksum = dbRecord.Checksum
if len(mutableState.executionInfo.TransitionHistory) != 0 {
mutableState.versionedTransitionInDB = mutableState.executionInfo.TransitionHistory[len(mutableState.executionInfo.TransitionHistory)-1]
}

if len(dbRecord.Checksum.GetValue()) > 0 {
switch {
Expand Down Expand Up @@ -5619,6 +5625,11 @@ func (ms *MutableStateImpl) cleanupTransaction() error {

ms.stateInDB = ms.executionState.State
ms.nextEventIDInDB = ms.GetNextEventID()
if len(ms.executionInfo.TransitionHistory) != 0 {
ms.versionedTransitionInDB = ms.executionInfo.TransitionHistory[len(ms.executionInfo.TransitionHistory)-1]
} else {
ms.versionedTransitionInDB = nil
}
// ms.dbRecordVersion remains the same

ms.hBuilder = historybuilder.New(
Expand Down Expand Up @@ -5713,6 +5724,15 @@ func (ms *MutableStateImpl) syncActivityToReplicationTask(
switch transactionPolicy {
case TransactionPolicyActive:
if ms.generateReplicationTask() {
var activityIDs map[int64]struct{}
if ms.disablingTransitionHistory() {
activityIDs = make(map[int64]struct{}, len(ms.GetPendingActivityInfos()))
for activityID := range ms.GetPendingActivityInfos() {
activityIDs[activityID] = struct{}{}
}
} else {
activityIDs = ms.syncActivityTasks
}
return convertSyncActivityInfos(
now,
definition.NewWorkflowKey(
Expand All @@ -5721,7 +5741,7 @@ func (ms *MutableStateImpl) syncActivityToReplicationTask(
ms.executionState.RunId,
),
ms.pendingActivityInfoIDs,
ms.syncActivityTasks,
activityIDs,
)
}
return nil
Expand Down Expand Up @@ -6702,3 +6722,7 @@ func (ms *MutableStateImpl) applyTombstones(tombstoneBatches []*persistencespb.S
ms.capTombstoneCount()
return nil
}

func (ms *MutableStateImpl) disablingTransitionHistory() bool {
return ms.versionedTransitionInDB != nil && len(ms.executionInfo.TransitionHistory) == 0
}
86 changes: 86 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2813,6 +2813,92 @@ func (s *mutableStateSuite) TestCloseTransactionPrepareReplicationTasks_SyncHSMT
}
}

func (s *mutableStateSuite) setDisablingTransitionHistory(ms *MutableStateImpl) {
ms.versionedTransitionInDB = &persistencespb.VersionedTransition{TransitionCount: 1025}
ms.executionInfo.TransitionHistory = nil
}

func (s *mutableStateSuite) TestCloseTransactionPrepareReplicationTasks_SyncActivityTask() {
testCases := []struct {
name string
disablingTransitionHistory bool
expectedReplicationTask []tasks.SyncActivityTask
}{
{
name: "NoDisablingTransitionHistory",
disablingTransitionHistory: false,
expectedReplicationTask: []tasks.SyncActivityTask{
{
ScheduledEventID: 100,
},
},
},
{
name: "DisablingTransitionHistory",
disablingTransitionHistory: true,
expectedReplicationTask: []tasks.SyncActivityTask{
{
ScheduledEventID: 90,
},
{
ScheduledEventID: 100,
},
},
},
}

for _, tc := range testCases {
s.T().Run(tc.name, func(t *testing.T) {
dbState := s.buildWorkflowMutableState()
dbState.ActivityInfos[100] = &persistencespb.ActivityInfo{
ScheduledEventId: 100,
}
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 123)
s.NoError(err)

if tc.disablingTransitionHistory {
s.setDisablingTransitionHistory(ms)
}

ms.UpdateActivityProgress(ms.pendingActivityInfoIDs[100], &workflowservice.RecordActivityTaskHeartbeatRequest{})

repicationTasks := ms.syncActivityToReplicationTask(TransactionPolicyActive)
s.Len(repicationTasks, len(tc.expectedReplicationTask))
for i, task := range tc.expectedReplicationTask {
s.Equal(task.ScheduledEventID, repicationTasks[i].(*tasks.SyncActivityTask).ScheduledEventID)
}
})
}
}

func (s *mutableStateSuite) TestVersionedTransitionInDB() {
// case 1: versionedTransitionInDB is not nil
dbState := s.buildWorkflowMutableState()
ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 123)
s.NoError(err)

s.True(proto.Equal(ms.executionInfo.TransitionHistory[len(ms.executionInfo.TransitionHistory)-1], ms.versionedTransitionInDB))

s.NoError(ms.cleanupTransaction())
s.True(proto.Equal(ms.executionInfo.TransitionHistory[len(ms.executionInfo.TransitionHistory)-1], ms.versionedTransitionInDB))

ms.executionInfo.TransitionHistory = nil
s.NoError(ms.cleanupTransaction())
s.Nil(ms.versionedTransitionInDB)

// case 2: versionedTransitionInDB is nil
dbState = s.buildWorkflowMutableState()
dbState.ExecutionInfo.TransitionHistory = nil
ms, err = NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 123)
s.NoError(err)

s.Nil(ms.versionedTransitionInDB)

ms.executionInfo.TransitionHistory = UpdatedTransitionHistory(ms.executionInfo.TransitionHistory, s.namespaceEntry.FailoverVersion())
s.NoError(ms.cleanupTransaction())
s.True(proto.Equal(ms.executionInfo.TransitionHistory[len(ms.executionInfo.TransitionHistory)-1], ms.versionedTransitionInDB))
}

func (s *mutableStateSuite) TestCloseTransactionTrackTombstones() {
testCases := []struct {
name string
Expand Down

0 comments on commit 6ccadac

Please sign in to comment.