diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index f7f82208b69..bb80e038c21 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/client_golang/prometheus" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/timerutil" atomicutil "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -289,7 +290,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV()) if err != nil { log.Warn("watch resource group meta failed", zap.Error(err)) - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) }) @@ -299,7 +300,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) if err != nil { log.Warn("watch resource group config failed", zap.Error(err)) - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) } } case <-emergencyTokenAcquisitionTicker.C: @@ -333,7 +334,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { }) if !ok { watchMetaChannel = nil - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) }) @@ -369,7 +370,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { case resp, ok := <-watchConfigChannel: if !ok { watchConfigChannel = nil - watchRetryTimer.Reset(watchRetryInterval) + timerutil.SafeResetTimer(watchRetryTimer, watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) }) diff --git a/client/timerpool/pool.go b/client/timerutil/pool.go similarity index 98% rename from client/timerpool/pool.go rename to client/timerutil/pool.go index 28ffacfc629..2d608b09053 100644 --- a/client/timerpool/pool.go +++ b/client/timerutil/pool.go @@ -4,7 +4,7 @@ // Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 -package timerpool +package timerutil import ( "sync" diff --git a/client/timerpool/pool_test.go b/client/timerutil/pool_test.go similarity index 98% rename from client/timerpool/pool_test.go rename to client/timerutil/pool_test.go index d6dffc723a9..f90a305d99f 100644 --- a/client/timerpool/pool_test.go +++ b/client/timerutil/pool_test.go @@ -4,7 +4,7 @@ // Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 -package timerpool +package timerutil import ( "testing" diff --git a/client/timerutil/util.go b/client/timerutil/util.go new file mode 100644 index 00000000000..7e24671a09e --- /dev/null +++ b/client/timerutil/util.go @@ -0,0 +1,32 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package timerutil + +import "time" + +// SafeResetTimer is used to reset timer safely. +// Before Go 1.23, the only safe way to use Reset was to call Timer.Stop and explicitly drain the timer first. +// We need be careful here, see more details in the comments of Timer.Reset. +// https://pkg.go.dev/time@master#Timer.Reset +func SafeResetTimer(t *time.Timer, d time.Duration) { + // Stop the timer if it's not stopped. + if !t.Stop() { + select { + case <-t.C: // try to drain from the channel + default: + } + } + t.Reset(d) +} diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 6b2c33ca58d..80b9dc87dca 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -29,7 +29,7 @@ import ( "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/grpcutil" "github.com/tikv/pd/client/retry" - "github.com/tikv/pd/client/timerpool" + "github.com/tikv/pd/client/timerutil" "github.com/tikv/pd/client/tsoutil" "go.uber.org/zap" "google.golang.org/grpc" @@ -155,7 +155,7 @@ func newTSDeadline( done chan struct{}, cancel context.CancelFunc, ) *deadline { - timer := timerpool.GlobalTimerPool.Get(timeout) + timer := timerutil.GlobalTimerPool.Get(timeout) return &deadline{ timer: timer, done: done, @@ -201,11 +201,11 @@ func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) { case <-d.timer.C: log.Error("[tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout)) d.cancel() - timerpool.GlobalTimerPool.Put(d.timer) + timerutil.GlobalTimerPool.Put(d.timer) case <-d.done: - timerpool.GlobalTimerPool.Put(d.timer) + timerutil.GlobalTimerPool.Put(d.timer) case <-ctx.Done(): - timerpool.GlobalTimerPool.Put(d.timer) + timerutil.GlobalTimerPool.Put(d.timer) return } case <-ctx.Done(): @@ -419,16 +419,7 @@ tsoBatchLoop: if maxBatchWaitInterval >= 0 { tbc.adjustBestBatchSize() } - // Stop the timer if it's not stopped. - if !streamLoopTimer.Stop() { - select { - case <-streamLoopTimer.C: // try to drain from the channel - default: - } - } - // We need be careful here, see more details in the comments of Timer.Reset. - // https://pkg.go.dev/time@master#Timer.Reset - streamLoopTimer.Reset(c.option.timeout) + timerutil.SafeResetTimer(streamLoopTimer, c.option.timeout) // Choose a stream to send the TSO gRPC request. streamChoosingLoop: for { diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 916200bfa3e..b3873cf8cee 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -49,7 +49,7 @@ func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) { if c.GetRegionStats() != nil { c.GetRegionStats().ClearDefunctRegion(item.GetID()) } - c.GetLabelStats().ClearDefunctRegion(item.GetID()) + c.GetLabelStats().MarkDefunctRegion(item.GetID()) c.GetRuleManager().InvalidCache(item.GetID()) } } diff --git a/pkg/election/lease.go b/pkg/election/lease.go index eada4f8786d..28bc39a1752 100644 --- a/pkg/election/lease.go +++ b/pkg/election/lease.go @@ -23,6 +23,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/timerutil" "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" @@ -124,16 +125,7 @@ func (l *lease) KeepAlive(ctx context.Context) { l.expireTime.Store(t) } } - // Stop the timer if it's not stopped. - if !timer.Stop() { - select { - case <-timer.C: // try to drain from the channel - default: - } - } - // We need be careful here, see more details in the comments of Timer.Reset. - // https://pkg.go.dev/time@master#Timer.Reset - timer.Reset(l.leaseTimeout) + timerutil.SafeResetTimer(timer, l.leaseTimeout) case <-timer.C: log.Info("lease timeout", zap.Time("expire", l.expireTime.Load().(time.Time)), zap.String("purpose", l.Purpose)) return diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index d106578b5fe..cd41fc9ca36 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -68,7 +68,7 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, ruleManager: ruleManager, labelerManager: labelerManager, persistConfig: persistConfig, - hotStat: statistics.NewHotStat(ctx), + hotStat: statistics.NewHotStat(ctx, basicCluster), labelStats: statistics.NewLabelStatistics(), regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), storage: storage, @@ -329,13 +329,12 @@ func (c *Cluster) waitSchedulersInitialized() { } } -// TODO: implement the following methods - // UpdateRegionsLabelLevelStats updates the status of the region label level by types. func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { for _, region := range regions { c.labelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.persistConfig.GetLocationLabels()) } + c.labelStats.ClearDefunctRegions() } func (c *Cluster) getStoresWithoutLabelLocked(region *core.RegionInfo, key, value string) []*core.StoreInfo { diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 01282b40534..0596ffc9ff2 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -63,11 +63,12 @@ type Cluster struct { // NewCluster creates a new Cluster func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { + bc := core.NewBasicCluster() c := &Cluster{ ctx: ctx, - BasicCluster: core.NewBasicCluster(), + BasicCluster: bc, IDAllocator: mockid.NewIDAllocator(), - HotStat: statistics.NewHotStat(ctx), + HotStat: statistics.NewHotStat(ctx, bc), HotBucketCache: buckets.NewBucketsCache(ctx), PersistOptions: opts, suspectRegions: map[uint64]struct{}{}, diff --git a/pkg/mock/mockserver/mockserver.go b/pkg/mock/mockserver/mockserver.go new file mode 100644 index 00000000000..d79d79ffa03 --- /dev/null +++ b/pkg/mock/mockserver/mockserver.go @@ -0,0 +1,88 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mockserver + +import ( + "context" + + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/grpcutil" +) + +// MockServer is used to mock Server for test use. +type MockServer struct { + ctx context.Context + member, leader *pdpb.Member + storage storage.Storage + bc *core.BasicCluster +} + +// NewMockServer creates a new MockServer. +func NewMockServer(ctx context.Context, member, leader *pdpb.Member, storage storage.Storage, bc *core.BasicCluster) *MockServer { + return &MockServer{ + ctx: ctx, + member: member, + leader: leader, + storage: storage, + bc: bc, + } +} + +// LoopContext returns the context of the server. +func (s *MockServer) LoopContext() context.Context { + return s.ctx +} + +// ClusterID returns the cluster ID of the server. +func (*MockServer) ClusterID() uint64 { + return 1 +} + +// GetMemberInfo returns the member info of the server. +func (s *MockServer) GetMemberInfo() *pdpb.Member { + return s.member +} + +// GetLeader returns the leader of the server. +func (s *MockServer) GetLeader() *pdpb.Member { + return s.leader +} + +// GetStorage returns the storage of the server. +func (s *MockServer) GetStorage() storage.Storage { + return s.storage +} + +// Name returns the name of the server. +func (*MockServer) Name() string { + return "mock-server" +} + +// GetRegions returns the regions of the server. +func (s *MockServer) GetRegions() []*core.RegionInfo { + return s.bc.GetRegions() +} + +// GetTLSConfig returns the TLS config of the server. +func (*MockServer) GetTLSConfig() *grpcutil.TLSConfig { + return &grpcutil.TLSConfig{} +} + +// GetBasicCluster returns the basic cluster of the server. +func (s *MockServer) GetBasicCluster() *core.BasicCluster { + return s.bc +} diff --git a/pkg/replication/replication_mode.go b/pkg/replication/replication_mode.go index 30b34e4596a..9093f911901 100644 --- a/pkg/replication/replication_mode.go +++ b/pkg/replication/replication_mode.go @@ -212,6 +212,7 @@ const ( type drAutoSyncStatus struct { State string `json:"state,omitempty"` StateID uint64 `json:"state_id,omitempty"` + AsyncStartTime *time.Time `json:"async_start,omitempty"` RecoverStartTime *time.Time `json:"recover_start,omitempty"` TotalRegions int `json:"total_regions,omitempty"` SyncedRegions int `json:"synced_regions,omitempty"` @@ -262,7 +263,8 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error { log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err } - dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores} + now := time.Now() + dr := drAutoSyncStatus{State: drStateAsync, StateID: id, AvailableStores: availableStores, AsyncStartTime: &now} if err := m.storage.SaveReplicationStatus(modeDRAutoSync, dr); err != nil { log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err)) return err @@ -272,6 +274,15 @@ func (m *ModeManager) drSwitchToAsyncWithLock(availableStores []uint64) error { return nil } +func (m *ModeManager) drDurationSinceAsyncStart() time.Duration { + m.RLock() + defer m.RUnlock() + if m.drAutoSync.AsyncStartTime == nil { + return 0 + } + return time.Since(*m.drAutoSync.AsyncStartTime) +} + func (m *ModeManager) drSwitchToSyncRecover() error { m.Lock() defer m.Unlock() @@ -477,7 +488,7 @@ func (m *ModeManager) tickUpdateState() { m.drSwitchToAsync(storeIDs[primaryUp]) } case drStateAsync: - if canSync { + if canSync && m.drDurationSinceAsyncStart() > m.config.DRAutoSync.WaitRecoverTimeout.Duration { m.drSwitchToSyncRecover() break } diff --git a/pkg/replication/replication_mode_test.go b/pkg/replication/replication_mode_test.go index e01fb7a0b9a..5cf9f1a1450 100644 --- a/pkg/replication/replication_mode_test.go +++ b/pkg/replication/replication_mode_test.go @@ -16,6 +16,7 @@ package replication import ( "context" + "encoding/json" "errors" "fmt" "testing" @@ -159,6 +160,20 @@ func newMockReplicator(ids []uint64) *mockFileReplicator { } } +func assertLastData(t *testing.T, data string, state string, stateID uint64, availableStores []uint64) { + type status struct { + State string `json:"state"` + StateID uint64 `json:"state_id"` + AvailableStores []uint64 `json:"available_stores"` + } + var s status + err := json.Unmarshal([]byte(data), &s) + require.NoError(t, err) + require.Equal(t, state, s.State) + require.Equal(t, stateID, s.StateID) + require.Equal(t, availableStores, s.AvailableStores) +} + func TestStateSwitch(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) @@ -190,7 +205,7 @@ func TestStateSwitch(t *testing.T) { stateID := rep.drAutoSync.StateID re.NotEqual(uint64(0), stateID) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "sync", stateID, nil) assertStateIDUpdate := func() { re.NotEqual(stateID, rep.drAutoSync.StateID) stateID = rep.drAutoSync.StateID @@ -207,7 +222,7 @@ func TestStateSwitch(t *testing.T) { re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 2, 3, 4}) re.False(rep.GetReplicationStatus().GetDrAutoSync().GetPauseRegionSplit()) conf.DRAutoSync.PauseRegionSplit = true @@ -218,7 +233,7 @@ func TestStateSwitch(t *testing.T) { rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 2, 3, 4}) // add new store in dr zone. cluster.AddLabelsStore(5, 1, map[string]string{"zone": "zone2"}) @@ -268,18 +283,19 @@ func TestStateSwitch(t *testing.T) { rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) assertStateIDUpdate() + rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 2, 3, 4}) setStoreState(cluster, "down", "up", "up", "up", "down", "down") rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{2, 3, 4}) setStoreState(cluster, "up", "down", "up", "up", "down", "down") rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", stateID, []uint64{1, 3, 4}) // async_wait -> async rep.tickUpdateState() @@ -291,26 +307,32 @@ func TestStateSwitch(t *testing.T) { rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 3, 4}) // async -> async setStoreState(cluster, "up", "up", "up", "up", "down", "down") rep.tickUpdateState() // store 2 won't be available before it syncs status. rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 3, 4}) syncStoreStatus(1, 2, 3, 4) rep.tickUpdateState() assertStateIDUpdate() rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async","state_id":%d,"available_stores":[1,2,3,4]}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async", stateID, []uint64{1, 2, 3, 4}) // async -> sync_recover setStoreState(cluster, "up", "up", "up", "up", "up", "up") rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) assertStateIDUpdate() + rep.drSwitchToAsync([]uint64{1, 2, 3, 4, 5}) + rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(time.Hour) + rep.tickUpdateState() + re.Equal(drStateAsync, rep.drGetState()) // wait recover timeout + + rep.config.DRAutoSync.WaitRecoverTimeout = typeutil.NewDuration(0) setStoreState(cluster, "down", "up", "up", "up", "up", "up") rep.tickUpdateState() re.Equal(drStateSyncRecover, rep.drGetState()) @@ -387,27 +409,27 @@ func TestReplicateState(t *testing.T) { stateID := rep.drAutoSync.StateID // replicate after initialized rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "sync", stateID, nil) // repliate state to new member replicator.memberIDs = append(replicator.memberIDs, 2, 3) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2]) - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[3]) + assertLastData(t, replicator.lastData[2], "sync", stateID, nil) + assertLastData(t, replicator.lastData[3], "sync", stateID, nil) // inject error replicator.errors[2] = errors.New("failed to persist") rep.tickUpdateState() // switch async_wait since there is only one zone newStateID := rep.drAutoSync.StateID rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[1]) - re.Equal(fmt.Sprintf(`{"state":"sync","state_id":%d}`, stateID), replicator.lastData[2]) - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[3]) + assertLastData(t, replicator.lastData[1], "async_wait", newStateID, []uint64{1, 2}) + assertLastData(t, replicator.lastData[2], "sync", stateID, nil) + assertLastData(t, replicator.lastData[3], "async_wait", newStateID, []uint64{1, 2}) // clear error, replicate to node 2 next time delete(replicator.errors, 2) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2]}`, newStateID), replicator.lastData[2]) + assertLastData(t, replicator.lastData[2], "async_wait", newStateID, []uint64{1, 2}) } func TestAsynctimeout(t *testing.T) { @@ -637,7 +659,7 @@ func TestComplexPlacementRules(t *testing.T) { rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4,5,6]}`, rep.drAutoSync.StateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4, 5, 6}) // reset to sync setStoreState(cluster, "up", "up", "up", "up", "up", "up", "up", "up", "up", "up") @@ -698,7 +720,7 @@ func TestComplexPlacementRules2(t *testing.T) { rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4}) } func TestComplexPlacementRules3(t *testing.T) { @@ -737,7 +759,7 @@ func TestComplexPlacementRules3(t *testing.T) { rep.tickUpdateState() re.Equal(drStateAsyncWait, rep.drGetState()) rep.tickReplicateStatus() - re.Equal(fmt.Sprintf(`{"state":"async_wait","state_id":%d,"available_stores":[1,2,3,4]}`, rep.drAutoSync.StateID), replicator.lastData[1]) + assertLastData(t, replicator.lastData[1], "async_wait", rep.drAutoSync.StateID, []uint64{1, 2, 3, 4}) } func genRegions(cluster *mockcluster.Cluster, stateID uint64, n int) []*core.RegionInfo { diff --git a/pkg/schedule/schedulers/evict_leader.go b/pkg/schedule/schedulers/evict_leader.go index d6c8c1910ce..4434504f0d0 100644 --- a/pkg/schedule/schedulers/evict_leader.go +++ b/pkg/schedule/schedulers/evict_leader.go @@ -78,7 +78,7 @@ func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error { failpoint.Inject("buildWithArgsErr", func() { failpoint.Return(errors.New("fail to build with args")) }) - if len(args) != 1 { + if len(args) < 1 { return errs.ErrSchedulerConfig.FastGenByArgs("id") } diff --git a/pkg/schedule/schedulers/grant_leader.go b/pkg/schedule/schedulers/grant_leader.go index 4562534fbd0..913805c2671 100644 --- a/pkg/schedule/schedulers/grant_leader.go +++ b/pkg/schedule/schedulers/grant_leader.go @@ -57,7 +57,7 @@ type grantLeaderSchedulerConfig struct { } func (conf *grantLeaderSchedulerConfig) BuildWithArgs(args []string) error { - if len(args) != 1 { + if len(args) < 1 { return errs.ErrSchedulerConfig.FastGenByArgs("id") } diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index 5a72cbddf41..47def26b2e7 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -1645,6 +1645,9 @@ func TestHotCacheUpdateCache(t *testing.T) { cancel, _, tc, _ := prepareSchedulersTest() defer cancel() tc.SetHotRegionCacheHitsThreshold(0) + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } // For read flow addRegionInfo(tc, utils.Read, []testRegionInfo{ @@ -1712,6 +1715,9 @@ func TestHotCacheKeyThresholds(t *testing.T) { cancel, _, tc, _ := prepareSchedulersTest() defer cancel() tc.SetHotRegionCacheHitsThreshold(0) + for i := 0; i < 6; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } addRegionInfo(tc, utils.Read, []testRegionInfo{ {1, []uint64{1, 2, 3}, 0, 1, 0}, {2, []uint64{1, 2, 3}, 0, 1 * units.KiB, 0}, @@ -1730,6 +1736,9 @@ func TestHotCacheKeyThresholds(t *testing.T) { { // many regions cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } regions := []testRegionInfo{} for i := 1; i <= 1000; i += 2 { regions = append(regions, @@ -1784,6 +1793,9 @@ func TestHotCacheByteAndKey(t *testing.T) { cancel, _, tc, _ := prepareSchedulersTest() defer cancel() tc.SetHotRegionCacheHitsThreshold(0) + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } statistics.ThresholdsUpdateInterval = 0 defer func() { statistics.ThresholdsUpdateInterval = 8 * time.Second @@ -1910,6 +1922,9 @@ func TestHotCacheCheckRegionFlow(t *testing.T) { func checkHotCacheCheckRegionFlow(re *require.Assertions, testCase testHotCacheCheckRegionFlowCase, enablePlacementRules bool) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} @@ -1985,6 +2000,9 @@ func TestHotCacheCheckRegionFlowWithDifferentThreshold(t *testing.T) { func checkHotCacheCheckRegionFlowWithDifferentThreshold(re *require.Assertions, enablePlacementRules bool) { cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} diff --git a/pkg/schedule/schedulers/init.go b/pkg/schedule/schedulers/init.go index 58d044f79c9..4594ee6b903 100644 --- a/pkg/schedule/schedulers/init.go +++ b/pkg/schedule/schedulers/init.go @@ -119,7 +119,7 @@ func schedulersRegister() { // evict leader RegisterSliceDecoderBuilder(EvictLeaderType, func(args []string) ConfigDecoder { return func(v interface{}) error { - if len(args) != 1 { + if len(args) < 1 { return errs.ErrSchedulerConfig.FastGenByArgs("id") } conf, ok := v.(*evictLeaderSchedulerConfig) @@ -236,7 +236,7 @@ func schedulersRegister() { // grant leader RegisterSliceDecoderBuilder(GrantLeaderType, func(args []string) ConfigDecoder { return func(v interface{}) error { - if len(args) != 1 { + if len(args) < 1 { return errs.ErrSchedulerConfig.FastGenByArgs("id") } diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index de7189a1332..f17f58e7e46 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -38,11 +38,11 @@ type HotCache struct { } // NewHotCache creates a new hot spot cache. -func NewHotCache(ctx context.Context) *HotCache { +func NewHotCache(ctx context.Context, cluster *core.BasicCluster) *HotCache { w := &HotCache{ ctx: ctx, - writeCache: NewHotPeerCache(ctx, utils.Write), - readCache: NewHotPeerCache(ctx, utils.Read), + writeCache: NewHotPeerCache(ctx, cluster, utils.Write), + readCache: NewHotPeerCache(ctx, cluster, utils.Read), } go w.updateItems(w.readCache.taskQueue, w.runReadTask) go w.updateItems(w.writeCache.taskQueue, w.runWriteTask) diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index 1ac07289a3c..55b951bcd93 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -60,6 +60,7 @@ type thresholds struct { // hotPeerCache saves the hot peer's statistics. type hotPeerCache struct { kind utils.RWType + cluster *core.BasicCluster peersOfStore map[uint64]*utils.TopN // storeID -> hot peers storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs @@ -67,13 +68,14 @@ type hotPeerCache struct { taskQueue *chanx.UnboundedChan[FlowItemTask] thresholdsOfStore map[uint64]*thresholds // storeID -> thresholds metrics map[uint64][utils.ActionTypeLen]prometheus.Gauge // storeID -> metrics - // TODO: consider to remove store info when store is offline. + lastGCTime time.Time } // NewHotPeerCache creates a hotPeerCache -func NewHotPeerCache(ctx context.Context, kind utils.RWType) *hotPeerCache { +func NewHotPeerCache(ctx context.Context, cluster *core.BasicCluster, kind utils.RWType) *hotPeerCache { return &hotPeerCache{ kind: kind, + cluster: cluster, peersOfStore: make(map[uint64]*utils.TopN), storesOfRegion: make(map[uint64]map[uint64]struct{}), regionsOfStore: make(map[uint64]map[uint64]struct{}), @@ -114,6 +116,7 @@ func (f *hotPeerCache) updateStat(item *HotPeerStat) { return } f.incMetrics(item.actionType, item.StoreID) + f.gc() } func (f *hotPeerCache) incMetrics(action utils.ActionType, storeID uint64) { @@ -544,6 +547,36 @@ func (f *hotPeerCache) removeItem(item *HotPeerStat) { } } +func (f *hotPeerCache) gc() { + if time.Since(f.lastGCTime) < f.topNTTL { + return + } + f.lastGCTime = time.Now() + // remove tombstone stores + stores := make(map[uint64]struct{}) + for _, storeID := range f.cluster.GetStores() { + stores[storeID.GetID()] = struct{}{} + } + for storeID := range f.peersOfStore { + if _, ok := stores[storeID]; !ok { + delete(f.peersOfStore, storeID) + delete(f.regionsOfStore, storeID) + delete(f.thresholdsOfStore, storeID) + delete(f.metrics, storeID) + } + } + // remove expired items + for _, peers := range f.peersOfStore { + regions := peers.RemoveExpired() + for _, regionID := range regions { + delete(f.storesOfRegion, regionID) + for storeID := range f.regionsOfStore { + delete(f.regionsOfStore[storeID], regionID) + } + } + } +} + func (f *hotPeerCache) coldItem(newItem, oldItem *HotPeerStat) { newItem.HotDegree = oldItem.HotDegree - 1 newItem.AntiCount = oldItem.AntiCount - 1 diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index 36f922d3830..8e69e4e4141 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -18,6 +18,7 @@ import ( "context" "math/rand" "sort" + "sync" "testing" "time" @@ -26,28 +27,12 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/movingaverage" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/utils/typeutil" ) -func TestStoreTimeUnsync(t *testing.T) { - re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Write) - intervals := []uint64{120, 60} - for _, interval := range intervals { - region := buildRegion(utils.Write, 3, interval) - checkAndUpdate(re, cache, region, 3) - { - stats := cache.RegionStats(0) - re.Len(stats, 3) - for _, s := range stats { - re.Len(s, 1) - } - } - } -} - type operator int const ( @@ -79,8 +64,9 @@ func TestCache(t *testing.T) { utils.Read: 3, // all peers utils.Write: 3, // all peers } - cache := NewHotPeerCache(context.Background(), test.kind) - region := buildRegion(test.kind, 3, 60) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, test.kind) + region := buildRegion(cluster, test.kind, 3, 60) checkAndUpdate(re, cache, region, defaultSize[test.kind]) checkHit(re, cache, region, test.kind, utils.Add) // all peers are new @@ -252,12 +238,31 @@ func pickFollower(region *core.RegionInfo) (index int, peer *metapb.Peer) { return dst, meta.Peers[dst] } -func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.RegionInfo { - peers := newPeers(peerCount, - func(i int) uint64 { return uint64(10000 + i) }, - func(i int) uint64 { return uint64(i) }) +var ( + idAllocator *mockid.IDAllocator + once sync.Once +) + +func getIDAllocator() *mockid.IDAllocator { + once.Do(func() { + idAllocator = mockid.NewIDAllocator() + }) + return idAllocator +} + +func buildRegion(cluster *core.BasicCluster, kind utils.RWType, peerCount int, interval uint64) (region *core.RegionInfo) { + peers := make([]*metapb.Peer, 0, peerCount) + for i := 0; i < peerCount; i++ { + id, _ := getIDAllocator().Alloc() + storeID, _ := getIDAllocator().Alloc() + peers = append(peers, &metapb.Peer{ + Id: id, + StoreId: storeID, + }) + } + id, _ := getIDAllocator().Alloc() meta := &metapb.Region{ - Id: 1000, + Id: id, Peers: peers, StartKey: []byte(""), EndKey: []byte(""), @@ -268,7 +273,7 @@ func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.Region switch kind { case utils.Read: - return core.NewRegionInfo( + region = core.NewRegionInfo( meta, leader, core.SetReportInterval(0, interval), @@ -277,7 +282,7 @@ func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.Region core.SetReadQuery(1024*interval), ) case utils.Write: - return core.NewRegionInfo( + region = core.NewRegionInfo( meta, leader, core.SetReportInterval(0, interval), @@ -285,31 +290,21 @@ func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.Region core.SetWrittenKeys(10*units.MiB*interval), core.SetWrittenQuery(1024*interval), ) - default: - return nil } -} - -type genID func(i int) uint64 - -func newPeers(n int, pid genID, sid genID) []*metapb.Peer { - peers := make([]*metapb.Peer, 0, n) - for i := 1; i <= n; i++ { - peer := &metapb.Peer{ - Id: pid(i), - } - peer.StoreId = sid(i) - peers = append(peers, peer) + for _, peer := range region.GetPeers() { + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: peer.GetStoreId()}, core.SetLastHeartbeatTS(time.Now()))) } - return peers + return region } func TestUpdateHotPeerStat(t *testing.T) { re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Read) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) storeID, regionID := uint64(1), uint64(2) peer := &metapb.Peer{StoreId: storeID} region := core.NewRegionInfo(&metapb.Region{Id: regionID, Peers: []*metapb.Peer{peer}}, peer) + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))) // we statistic read peer info from store heartbeat rather than region heartbeat m := utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval ThresholdsUpdateInterval = 0 @@ -400,8 +395,10 @@ func TestThresholdWithUpdateHotPeerStat(t *testing.T) { } func testMetrics(re *require.Assertions, interval, byteRate, expectThreshold float64) { - cache := NewHotPeerCache(context.Background(), utils.Read) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) storeID := uint64(1) + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))) re.GreaterOrEqual(byteRate, utils.MinHotThresholds[utils.RegionReadBytes]) ThresholdsUpdateInterval = 0 defer func() { @@ -447,8 +444,9 @@ func TestRemoveFromCache(t *testing.T) { interval := uint64(5) checkers := []check{checkAndUpdate, checkAndUpdateWithOrdering} for _, checker := range checkers { - cache := NewHotPeerCache(context.Background(), utils.Write) - region := buildRegion(utils.Write, peerCount, interval) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + region := buildRegion(cluster, utils.Write, peerCount, interval) // prepare intervalSums := make(map[uint64]int) for i := 1; i <= 200; i++ { @@ -482,8 +480,9 @@ func TestRemoveFromCacheRandom(t *testing.T) { for _, peerCount := range peerCounts { for _, interval := range intervals { for _, checker := range checkers { - cache := NewHotPeerCache(context.Background(), utils.Write) - region := buildRegion(utils.Write, peerCount, interval) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + region := buildRegion(cluster, utils.Write, peerCount, interval) target := uint64(10) intervalSums := make(map[uint64]int) @@ -536,8 +535,9 @@ func checkCoolDown(re *require.Assertions, cache *hotPeerCache, region *core.Reg func TestCoolDownTransferLeader(t *testing.T) { re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Read) - region := buildRegion(utils.Read, 3, 60) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) + region := buildRegion(cluster, utils.Read, 3, 60) moveLeader := func() { _, region = schedule(re, movePeer, region, 10) @@ -569,8 +569,9 @@ func TestCoolDownTransferLeader(t *testing.T) { } testCases := []func(){moveLeader, transferLeader, movePeer, addReplica, removeReplica} for _, testCase := range testCases { - cache = NewHotPeerCache(context.Background(), utils.Read) - region = buildRegion(utils.Read, 3, 60) + cluster = core.NewBasicCluster() + cache = NewHotPeerCache(context.Background(), cluster, utils.Read) + region = buildRegion(cluster, utils.Read, 3, 60) for i := 1; i <= 200; i++ { checkAndUpdate(re, cache, region) } @@ -582,8 +583,9 @@ func TestCoolDownTransferLeader(t *testing.T) { // See issue #4510 func TestCacheInherit(t *testing.T) { re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Read) - region := buildRegion(utils.Read, 3, 10) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) + region := buildRegion(cluster, utils.Read, 3, 10) // prepare for i := 1; i <= 200; i++ { checkAndUpdate(re, cache, region) @@ -673,13 +675,16 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { re := require.New(t) testWithUpdateInterval := func(interval time.Duration) { ThresholdsUpdateInterval = interval - cache := NewHotPeerCache(context.Background(), utils.Write) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) now := time.Now() + storeID := uint64(1) for id := uint64(0); id < 100; id++ { meta := &metapb.Region{ Id: id, - Peers: []*metapb.Peer{{Id: id, StoreId: 1}}, + Peers: []*metapb.Peer{{Id: id, StoreId: storeID}}, } + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))) region := core.NewRegionInfo(meta, meta.Peers[0], core.SetWrittenBytes(id*6000), core.SetWrittenKeys(id*6000), core.SetWrittenQuery(id*6000)) for i := 0; i < 10; i++ { start := uint64(now.Add(time.Minute * time.Duration(i)).Unix()) @@ -714,9 +719,53 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { testWithUpdateInterval(0) } +func TestRemoveExpireItems(t *testing.T) { + re := require.New(t) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + cache.topNTTL = 100 * time.Millisecond + // case1: remove expired items + region1 := buildRegion(cluster, utils.Write, 3, 10) + checkAndUpdate(re, cache, region1) + re.NotEmpty(cache.storesOfRegion[region1.GetID()]) + time.Sleep(cache.topNTTL) + region2 := buildRegion(cluster, utils.Write, 3, 10) + checkAndUpdate(re, cache, region2) + re.Empty(cache.storesOfRegion[region1.GetID()]) + re.NotEmpty(cache.storesOfRegion[region2.GetID()]) + time.Sleep(cache.topNTTL) + // case2: remove items when the store is not exist + re.NotNil(cache.peersOfStore[region1.GetLeader().GetStoreId()]) + re.NotNil(cache.peersOfStore[region2.GetLeader().GetStoreId()]) + cluster.ResetStores() + re.Empty(cluster.GetStores()) + region3 := buildRegion(cluster, utils.Write, 3, 10) + checkAndUpdate(re, cache, region3) + re.Nil(cache.peersOfStore[region1.GetLeader().GetStoreId()]) + re.Nil(cache.peersOfStore[region2.GetLeader().GetStoreId()]) + re.NotEmpty(cache.regionsOfStore[region3.GetLeader().GetStoreId()]) +} + +func TestDifferentReportInterval(t *testing.T) { + re := require.New(t) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + region := buildRegion(cluster, utils.Write, 3, 5) + for _, interval := range []uint64{120, 60, 30} { + region = region.Clone(core.SetReportInterval(0, interval)) + checkAndUpdate(re, cache, region, 3) + stats := cache.RegionStats(0) + re.Len(stats, 3) + for _, s := range stats { + re.Len(s, 1) + } + } +} + func BenchmarkCheckRegionFlow(b *testing.B) { - cache := NewHotPeerCache(context.Background(), utils.Read) - region := buildRegion(utils.Read, 3, 10) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) + region := buildRegion(cluster, utils.Read, 3, 10) peerInfos := make([]*core.PeerInfo, 0) for _, peer := range region.GetPeers() { peerInfo := core.NewPeerInfo(peer, region.GetLoads(), 10) diff --git a/pkg/statistics/hot_stat.go b/pkg/statistics/hot_stat.go index 3cda52aba89..c9276f11b20 100644 --- a/pkg/statistics/hot_stat.go +++ b/pkg/statistics/hot_stat.go @@ -17,6 +17,7 @@ package statistics import ( "context" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/statistics/buckets" ) @@ -28,9 +29,9 @@ type HotStat struct { } // NewHotStat creates the container to hold cluster's hotspot statistics. -func NewHotStat(ctx context.Context) *HotStat { +func NewHotStat(ctx context.Context, cluster *core.BasicCluster) *HotStat { return &HotStat{ - HotCache: NewHotCache(ctx), + HotCache: NewHotCache(ctx, cluster), StoresStats: NewStoresStats(), HotBucketCache: buckets.NewBucketsCache(ctx), } diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 5f0752b72c7..ebcb24560dd 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -306,6 +306,7 @@ type LabelStatistics struct { syncutil.RWMutex regionLabelStats map[uint64]string labelCounter map[string]int + defunctRegions map[uint64]struct{} } // NewLabelStatistics creates a new LabelStatistics. @@ -313,6 +314,7 @@ func NewLabelStatistics() *LabelStatistics { return &LabelStatistics{ regionLabelStats: make(map[uint64]string), labelCounter: make(map[string]int), + defunctRegions: make(map[uint64]struct{}), } } @@ -346,14 +348,26 @@ func (l *LabelStatistics) Reset() { regionLabelLevelGauge.Reset() } -// ClearDefunctRegion is used to handle the overlap region. -func (l *LabelStatistics) ClearDefunctRegion(regionID uint64) { +// MarkDefunctRegion is used to handle the overlap region. +// It is used to mark the region as defunct and remove it from the label statistics later. +func (l *LabelStatistics) MarkDefunctRegion(regionID uint64) { l.Lock() defer l.Unlock() - if label, ok := l.regionLabelStats[regionID]; ok { - l.labelCounter[label]-- - delete(l.regionLabelStats, regionID) + l.defunctRegions[regionID] = struct{}{} +} + +// ClearDefunctRegions is used to handle the overlap region. +// It is used to remove the defunct regions from the label statistics. +func (l *LabelStatistics) ClearDefunctRegions() { + l.Lock() + defer l.Unlock() + for regionID := range l.defunctRegions { + if label, ok := l.regionLabelStats[regionID]; ok { + l.labelCounter[label]-- + delete(l.regionLabelStats, regionID) + } } + l.defunctRegions = make(map[uint64]struct{}) } // GetLabelCounter is only used for tests. diff --git a/pkg/statistics/utils/topn.go b/pkg/statistics/utils/topn.go index 916bbb82f92..9f9471f7de5 100644 --- a/pkg/statistics/utils/topn.go +++ b/pkg/statistics/utils/topn.go @@ -97,15 +97,14 @@ func (tn *TopN) Put(item TopNItem) (isUpdate bool) { isUpdate = stn.Put(item) } tn.ttlLst.Put(item.ID()) - tn.maintain() return } // RemoveExpired deletes all expired items. -func (tn *TopN) RemoveExpired() { +func (tn *TopN) RemoveExpired() []uint64 { tn.rw.Lock() defer tn.rw.Unlock() - tn.maintain() + return tn.maintain() } // Remove deletes the item by given ID and returns it. @@ -116,16 +115,18 @@ func (tn *TopN) Remove(id uint64) (item TopNItem) { item = stn.Remove(id) } _ = tn.ttlLst.Remove(id) - tn.maintain() return } -func (tn *TopN) maintain() { +func (tn *TopN) maintain() []uint64 { + ids := make([]uint64, 0) for _, id := range tn.ttlLst.TakeExpired() { for _, stn := range tn.topns { stn.Remove(id) + ids = append(ids, id) } } + return ids } type singleTopN struct { diff --git a/pkg/statistics/utils/topn_test.go b/pkg/statistics/utils/topn_test.go index f92d5a61f34..4f2bc5e4f3b 100644 --- a/pkg/statistics/utils/topn_test.go +++ b/pkg/statistics/utils/topn_test.go @@ -208,6 +208,7 @@ func TestTTL(t *testing.T) { putPerm(re, tn, Total, func(x int) float64 { return float64(-x) }, false /*insert*/) + re.Len(tn.GetAll(), Total) time.Sleep(900 * time.Millisecond) { @@ -217,6 +218,8 @@ func TestTTL(t *testing.T) { } re.True(tn.Put(item)) } + re.Len(tn.RemoveExpired(), (Total-1)*DimLen) + for i := 3; i < Total; i += 3 { item := &item{id: uint64(i), values: []float64{float64(-i) + 100}} for k := 1; k < DimLen; k++ { @@ -224,7 +227,6 @@ func TestTTL(t *testing.T) { } re.False(tn.Put(item)) } - tn.RemoveExpired() re.Equal(Total/3+1, tn.Len()) items := tn.GetAllTopN(0) diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index 45f4a100c60..68ce35899fc 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -39,6 +39,7 @@ const ( keepaliveTime = 10 * time.Second keepaliveTimeout = 3 * time.Second msgSize = 8 * units.MiB + retryInterval = time.Second ) // StopSyncWithLeader stop to sync the region with leader. @@ -89,6 +90,8 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { go func() { defer logutil.LogPanic() defer s.wg.Done() + timer := time.NewTimer(retryInterval) + defer timer.Stop() // used to load region from kv storage to cache storage. bc := s.server.GetBasicCluster() regionStorage := s.server.GetStorage() @@ -139,7 +142,19 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { } } log.Error("server failed to establish sync stream with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), errs.ZapError(err)) - time.Sleep(time.Second) + if !timer.Stop() { + select { + case <-timer.C: // try to drain from the channel + default: + } + } + timer.Reset(retryInterval) + select { + case <-ctx.Done(): + log.Info("stop synchronizing with leader due to context canceled") + return + case <-timer.C: + } continue } @@ -151,7 +166,19 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { if err = stream.CloseSend(); err != nil { log.Error("failed to terminate client stream", errs.ZapError(errs.ErrGRPCCloseSend, err)) } - time.Sleep(time.Second) + if !timer.Stop() { + select { + case <-timer.C: // try to drain from the channel + default: + } + } + timer.Reset(retryInterval) + select { + case <-ctx.Done(): + log.Info("stop synchronizing with leader due to context canceled") + return + case <-timer.C: + } break } if s.history.GetNextIndex() != resp.GetStartIndex() { diff --git a/pkg/syncer/client_test.go b/pkg/syncer/client_test.go index 34c2b383ab3..8301c1c7567 100644 --- a/pkg/syncer/client_test.go +++ b/pkg/syncer/client_test.go @@ -21,9 +21,9 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/mock/mockserver" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/grpcutil" "google.golang.org/grpc/codes" @@ -37,11 +37,13 @@ func TestLoadRegion(t *testing.T) { rs, err := storage.NewStorageWithLevelDBBackend(context.Background(), tempDir, nil) re.NoError(err) - server := &mockServer{ - ctx: context.Background(), - storage: storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs), - bc: core.NewBasicCluster(), - } + server := mockserver.NewMockServer( + context.Background(), + nil, + nil, + storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs), + core.NewBasicCluster(), + ) for i := 0; i < 30; i++ { rs.SaveRegion(&metapb.Region{Id: uint64(i) + 1}) } @@ -64,11 +66,13 @@ func TestErrorCode(t *testing.T) { tempDir := t.TempDir() rs, err := storage.NewStorageWithLevelDBBackend(context.Background(), tempDir, nil) re.NoError(err) - server := &mockServer{ - ctx: context.Background(), - storage: storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs), - bc: core.NewBasicCluster(), - } + server := mockserver.NewMockServer( + context.Background(), + nil, + nil, + storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs), + core.NewBasicCluster(), + ) ctx, cancel := context.WithCancel(context.TODO()) rc := NewRegionSyncer(server) conn, err := grpcutil.GetClientConn(ctx, "http://127.0.0.1", nil) @@ -79,46 +83,3 @@ func TestErrorCode(t *testing.T) { re.True(ok) re.Equal(codes.Canceled, ev.Code()) } - -type mockServer struct { - ctx context.Context - member, leader *pdpb.Member - storage storage.Storage - bc *core.BasicCluster -} - -func (s *mockServer) LoopContext() context.Context { - return s.ctx -} - -func (s *mockServer) ClusterID() uint64 { - return 1 -} - -func (s *mockServer) GetMemberInfo() *pdpb.Member { - return s.member -} - -func (s *mockServer) GetLeader() *pdpb.Member { - return s.leader -} - -func (s *mockServer) GetStorage() storage.Storage { - return s.storage -} - -func (s *mockServer) Name() string { - return "mock-server" -} - -func (s *mockServer) GetRegions() []*core.RegionInfo { - return s.bc.GetRegions() -} - -func (s *mockServer) GetTLSConfig() *grpcutil.TLSConfig { - return &grpcutil.TLSConfig{} -} - -func (s *mockServer) GetBasicCluster() *core.BasicCluster { - return s.bc -} diff --git a/pkg/timerpool/pool.go b/pkg/utils/timerutil/pool.go similarity index 98% rename from pkg/timerpool/pool.go rename to pkg/utils/timerutil/pool.go index 28ffacfc629..2d608b09053 100644 --- a/pkg/timerpool/pool.go +++ b/pkg/utils/timerutil/pool.go @@ -4,7 +4,7 @@ // Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 -package timerpool +package timerutil import ( "sync" diff --git a/pkg/timerpool/pool_test.go b/pkg/utils/timerutil/pool_test.go similarity index 98% rename from pkg/timerpool/pool_test.go rename to pkg/utils/timerutil/pool_test.go index d6dffc723a9..f90a305d99f 100644 --- a/pkg/timerpool/pool_test.go +++ b/pkg/utils/timerutil/pool_test.go @@ -4,7 +4,7 @@ // Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133 -package timerpool +package timerutil import ( "testing" diff --git a/pkg/utils/timerutil/util.go b/pkg/utils/timerutil/util.go new file mode 100644 index 00000000000..7e24671a09e --- /dev/null +++ b/pkg/utils/timerutil/util.go @@ -0,0 +1,32 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package timerutil + +import "time" + +// SafeResetTimer is used to reset timer safely. +// Before Go 1.23, the only safe way to use Reset was to call Timer.Stop and explicitly drain the timer first. +// We need be careful here, see more details in the comments of Timer.Reset. +// https://pkg.go.dev/time@master#Timer.Reset +func SafeResetTimer(t *time.Timer, d time.Duration) { + // Stop the timer if it's not stopped. + if !t.Stop() { + select { + case <-t.C: // try to drain from the channel + default: + } + } + t.Reset(d) +} diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index 6d1ee2ace28..b2e453e45e2 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -24,9 +24,9 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/timerpool" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/timerutil" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -209,7 +209,7 @@ func NewTSDeadline( done chan struct{}, cancel context.CancelFunc, ) *TSDeadline { - timer := timerpool.GlobalTimerPool.Get(timeout) + timer := timerutil.GlobalTimerPool.Get(timeout) return &TSDeadline{ timer: timer, done: done, @@ -230,11 +230,11 @@ func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) { log.Error("tso proxy request processing is canceled due to timeout", errs.ZapError(errs.ErrProxyTSOTimeout)) d.cancel() - timerpool.GlobalTimerPool.Put(d.timer) + timerutil.GlobalTimerPool.Put(d.timer) case <-d.done: - timerpool.GlobalTimerPool.Put(d.timer) + timerutil.GlobalTimerPool.Put(d.timer) case <-ctx.Done(): - timerpool.GlobalTimerPool.Put(d.timer) + timerutil.GlobalTimerPool.Put(d.timer) return } case <-ctx.Done(): diff --git a/plugin/scheduler_example/evict_leader.go b/plugin/scheduler_example/evict_leader.go index 9c8053c1334..0c725abf501 100644 --- a/plugin/scheduler_example/evict_leader.go +++ b/plugin/scheduler_example/evict_leader.go @@ -47,7 +47,7 @@ const ( func init() { schedulers.RegisterSliceDecoderBuilder(EvictLeaderType, func(args []string) schedulers.ConfigDecoder { return func(v interface{}) error { - if len(args) != 1 { + if len(args) < 1 { return errors.New("should specify the store-id") } conf, ok := v.(*evictLeaderSchedulerConfig) @@ -99,7 +99,7 @@ type evictLeaderSchedulerConfig struct { } func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error { - if len(args) != 1 { + if len(args) < 1 { return errors.New("should specify the store-id") } diff --git a/server/api/scheduler.go b/server/api/scheduler.go index e28e852b006..22ba6c7c465 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -142,10 +142,23 @@ func (h *schedulerHandler) CreateScheduler(w http.ResponseWriter, r *http.Reques return } - case schedulers.GrantLeaderName: - h.addEvictOrGrant(w, input, schedulers.GrantLeaderName) - case schedulers.EvictLeaderName: - h.addEvictOrGrant(w, input, schedulers.EvictLeaderName) + case schedulers.GrantLeaderName, schedulers.EvictLeaderName: + storeID, ok := input["store_id"].(float64) + if !ok { + h.r.JSON(w, http.StatusBadRequest, "missing store id") + return + } + exist, err := h.AddEvictOrGrant(storeID, name) + if err != nil { + h.r.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + // we should ensure whether it is the first time to create evict-leader-scheduler + // or just update the evict-leader. + if exist { + h.r.JSON(w, http.StatusOK, "The scheduler has been applied to the store.") + return + } case schedulers.ShuffleLeaderName: if err := h.AddShuffleLeaderScheduler(); err != nil { h.r.JSON(w, http.StatusInternalServerError, err.Error()) @@ -204,18 +217,6 @@ func (h *schedulerHandler) CreateScheduler(w http.ResponseWriter, r *http.Reques h.r.JSON(w, http.StatusOK, "The scheduler is created.") } -func (h *schedulerHandler) addEvictOrGrant(w http.ResponseWriter, input map[string]interface{}, name string) { - storeID, ok := input["store_id"].(float64) - if !ok { - h.r.JSON(w, http.StatusBadRequest, "missing store id") - return - } - err := h.AddEvictOrGrant(storeID, name) - if err != nil { - h.r.JSON(w, http.StatusInternalServerError, err.Error()) - } -} - // @Tags scheduler // @Summary Delete a scheduler. // @Param name path string true "The name of the scheduler." diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c11cead61f7..396a618a173 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -270,7 +270,7 @@ func (c *RaftCluster) InitCluster( c.core, c.opt, c.storage, c.id = basicCluster, opt.(*config.PersistOptions), storage, id c.ctx, c.cancel = context.WithCancel(c.serverCtx) c.labelLevelStats = statistics.NewLabelStatistics() - c.hotStat = statistics.NewHotStat(c.ctx) + c.hotStat = statistics.NewHotStat(c.ctx, basicCluster) c.slowStat = statistics.NewSlowStat(c.ctx) c.progressManager = progress.NewManager() c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit) @@ -2256,6 +2256,7 @@ func (c *RaftCluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) { for _, region := range regions { c.labelLevelStats.Observe(region, c.getStoresWithoutLabelLocked(region, core.EngineKey, core.EngineTiFlash), c.opt.GetLocationLabels()) } + c.labelLevelStats.ClearDefunctRegions() } func (c *RaftCluster) getRegionStoresLocked(region *core.RegionInfo) []*core.StoreInfo { diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index 59a7c431e99..d1c8c8fe9cb 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -601,7 +601,10 @@ func TestRegionHeartbeatHotStat(t *testing.T) { re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) - newTestStores(4, "2.0.0") + stores := newTestStores(4, "2.0.0") + for _, store := range stores { + cluster.PutStore(store.GetMeta()) + } peers := []*metapb.Peer{ { Id: 1, @@ -1106,6 +1109,7 @@ func TestRegionLabelIsolationLevel(t *testing.T) { opt.SetReplicationConfig(cfg) re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster()) + cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) for i := uint64(1); i <= 4; i++ { var labels []*metapb.StoreLabel @@ -1140,13 +1144,42 @@ func TestRegionLabelIsolationLevel(t *testing.T) { StartKey: []byte{byte(1)}, EndKey: []byte{byte(2)}, } - r := core.NewRegionInfo(region, peers[0]) - re.NoError(cluster.putRegion(r)) + r1 := core.NewRegionInfo(region, peers[0]) + re.NoError(cluster.putRegion(r1)) - cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r}) + cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r1}) counter := cluster.labelLevelStats.GetLabelCounter() re.Equal(0, counter["none"]) re.Equal(1, counter["zone"]) + + region = &metapb.Region{ + Id: 10, + Peers: peers, + StartKey: []byte{byte(2)}, + EndKey: []byte{byte(3)}, + } + r2 := core.NewRegionInfo(region, peers[0]) + re.NoError(cluster.putRegion(r2)) + + cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r2}) + counter = cluster.labelLevelStats.GetLabelCounter() + re.Equal(0, counter["none"]) + re.Equal(2, counter["zone"]) + + // issue: https://github.com/tikv/pd/issues/8700 + // step1: heartbeat a overlap region, which is used to simulate the case that the region is merged. + // step2: update region 9 and region 10, which is used to simulate the case that patrol is triggered. + // We should only count region 9. + overlapRegion := r1.Clone( + core.WithStartKey(r1.GetStartKey()), + core.WithEndKey(r2.GetEndKey()), + core.WithLeader(r2.GetPeer(8)), + ) + re.NoError(cluster.HandleRegionHeartbeat(overlapRegion)) + cluster.UpdateRegionsLabelLevelStats([]*core.RegionInfo{r1, r2}) + counter = cluster.labelLevelStats.GetLabelCounter() + re.Equal(0, counter["none"]) + re.Equal(1, counter["zone"]) } func heartbeatRegions(re *require.Assertions, cluster *RaftCluster, regions []*core.RegionInfo) { diff --git a/server/config/config.go b/server/config/config.go index e835567ac5b..c6e147c8f25 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -826,13 +826,14 @@ func NormalizeReplicationMode(m string) string { // DRAutoSyncReplicationConfig is the configuration for auto sync mode between 2 data centers. type DRAutoSyncReplicationConfig struct { - LabelKey string `toml:"label-key" json:"label-key"` - Primary string `toml:"primary" json:"primary"` - DR string `toml:"dr" json:"dr"` - PrimaryReplicas int `toml:"primary-replicas" json:"primary-replicas"` - DRReplicas int `toml:"dr-replicas" json:"dr-replicas"` - WaitStoreTimeout typeutil.Duration `toml:"wait-store-timeout" json:"wait-store-timeout"` - PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"` + LabelKey string `toml:"label-key" json:"label-key"` + Primary string `toml:"primary" json:"primary"` + DR string `toml:"dr" json:"dr"` + PrimaryReplicas int `toml:"primary-replicas" json:"primary-replicas"` + DRReplicas int `toml:"dr-replicas" json:"dr-replicas"` + WaitStoreTimeout typeutil.Duration `toml:"wait-store-timeout" json:"wait-store-timeout"` + WaitRecoverTimeout typeutil.Duration `toml:"wait-recover-timeout" json:"wait-recover-timeout"` + PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"` } func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) { diff --git a/server/handler.go b/server/handler.go index 2b7f0eb7c6f..14c4acc10d1 100644 --- a/server/handler.go +++ b/server/handler.go @@ -582,10 +582,10 @@ func (h *Handler) redirectSchedulerUpdate(name string, storeID float64) error { } // AddEvictOrGrant add evict leader scheduler or grant leader scheduler. -func (h *Handler) AddEvictOrGrant(storeID float64, name string) error { - if exist, err := h.IsSchedulerExisted(name); !exist { +func (h *Handler) AddEvictOrGrant(storeID float64, name string) (exist bool, err error) { + if exist, err = h.IsSchedulerExisted(name); !exist { if err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerNotFound.FastGenByArgs()) { - return err + return exist, err } switch name { case schedulers.EvictLeaderName: @@ -594,13 +594,14 @@ func (h *Handler) AddEvictOrGrant(storeID float64, name string) error { err = h.AddGrantLeaderScheduler(uint64(storeID)) } if err != nil { - return err + return exist, err } } else { if err := h.redirectSchedulerUpdate(name, storeID); err != nil { - return err + return exist, err } log.Info("update scheduler", zap.String("scheduler-name", name), zap.Uint64("store-id", uint64(storeID))) + return exist, nil } - return nil + return exist, nil } diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 4e571e20374..fb344ecebee 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -631,6 +631,13 @@ func TestEvictLeaderScheduler(t *testing.T) { output, err := pdctl.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}...) re.NoError(err) re.Contains(string(output), "Success!") + re.False(false, leaderServer.GetRaftCluster().GetStore(2).AllowLeaderTransfer()) + // execute twice to verify this issue: https://github.com/tikv/pd/issues/8756 + output, err = pdctl.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}...) + re.NoError(err) + re.Contains(string(output), "Success!") + re.False(false, leaderServer.GetRaftCluster().GetStore(2).AllowLeaderTransfer()) + failpoint.Enable("github.com/tikv/pd/pkg/schedule/schedulers/buildWithArgsErr", "return(true)") output, err = pdctl.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...) re.NoError(err) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index fc6991e4179..3b70e837ac5 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -35,6 +35,7 @@ import ( "github.com/tikv/pd/pkg/dashboard" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/mock/mockid" + "github.com/tikv/pd/pkg/mock/mockserver" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/schedulers" @@ -42,6 +43,7 @@ import ( "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/syncer" "github.com/tikv/pd/pkg/tso" + "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -1812,3 +1814,45 @@ func TestExternalTimestamp(t *testing.T) { re.Equal(ts, resp4.GetTimestamp()) } } + +func TestFollowerExitSyncTime(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tc, err := tests.NewTestCluster(ctx, 1) + defer tc.Destroy() + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + tc.WaitLeader() + leaderServer := tc.GetLeaderServer() + re.NoError(leaderServer.BootstrapCluster()) + + tempDir := t.TempDir() + rs, err := storage.NewStorageWithLevelDBBackend(context.Background(), tempDir, nil) + re.NoError(err) + + server := mockserver.NewMockServer( + context.Background(), + &pdpb.Member{MemberId: 1, Name: "test", ClientUrls: []string{tempurl.Alloc()}}, + nil, + storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs), + core.NewBasicCluster(), + ) + s := syncer.NewRegionSyncer(server) + s.StartSyncWithLeader(leaderServer.GetAddr()) + time.Sleep(time.Second) + + // Record the time when exiting sync + startTime := time.Now() + + // Simulate leader change scenario + // Directly call StopSyncWithLeader to simulate exit + s.StopSyncWithLeader() + + // Calculate time difference + elapsedTime := time.Since(startTime) + + // Assert that the sync exit time is within expected range + re.Less(elapsedTime, time.Second) +}