From 57fdd79e53e59c82fdc5666993b0585993112cc6 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 12 Dec 2024 11:33:30 +0800 Subject: [PATCH] statistics: add gc in hot peer cache (#8702) (#8897) close tikv/pd#8698 Signed-off-by: ti-chi-bot Signed-off-by: lhy1024 Co-authored-by: lhy1024 Co-authored-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/scheduling/server/cluster.go | 2 +- pkg/mock/mockcluster/mockcluster.go | 5 +- pkg/schedule/schedulers/hot_region_test.go | 18 +++ pkg/statistics/hot_cache.go | 6 +- pkg/statistics/hot_peer_cache.go | 37 ++++- pkg/statistics/hot_peer_cache_test.go | 163 ++++++++++++++------- pkg/statistics/hot_stat.go | 5 +- pkg/statistics/utils/topn.go | 11 +- pkg/statistics/utils/topn_test.go | 4 +- server/cluster/cluster_test.go | 5 +- server/cluster/scheduling_controller.go | 2 +- 11 files changed, 183 insertions(+), 75 deletions(-) diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index 3a0b17100fd..647706b0afd 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -75,7 +75,7 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, ruleManager: ruleManager, labelerManager: labelerManager, persistConfig: persistConfig, - hotStat: statistics.NewHotStat(ctx), + hotStat: statistics.NewHotStat(ctx, basicCluster), labelStats: statistics.NewLabelStatistics(), regionStats: statistics.NewRegionStatistics(basicCluster, persistConfig, ruleManager), storage: storage, diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 6cf7ae143df..00759d9b902 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -63,11 +63,12 @@ type Cluster struct { // NewCluster creates a new Cluster func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster { + bc := core.NewBasicCluster() c := &Cluster{ ctx: ctx, - BasicCluster: core.NewBasicCluster(), + BasicCluster: bc, IDAllocator: mockid.NewIDAllocator(), - HotStat: statistics.NewHotStat(ctx), + HotStat: statistics.NewHotStat(ctx, bc), HotBucketCache: buckets.NewBucketsCache(ctx), PersistOptions: opts, suspectRegions: map[uint64]struct{}{}, diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index c9b138e3b32..29a4417344f 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -1658,6 +1658,9 @@ func TestHotCacheUpdateCache(t *testing.T) { cancel, _, tc, _ := prepareSchedulersTest() defer cancel() tc.SetHotRegionCacheHitsThreshold(0) + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } // For read flow addRegionInfo(tc, utils.Read, []testRegionInfo{ @@ -1725,6 +1728,9 @@ func TestHotCacheKeyThresholds(t *testing.T) { cancel, _, tc, _ := prepareSchedulersTest() defer cancel() tc.SetHotRegionCacheHitsThreshold(0) + for i := 0; i < 6; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } addRegionInfo(tc, utils.Read, []testRegionInfo{ {1, []uint64{1, 2, 3}, 0, 1, 0}, {2, []uint64{1, 2, 3}, 0, 1 * units.KiB, 0}, @@ -1743,6 +1749,9 @@ func TestHotCacheKeyThresholds(t *testing.T) { { // many regions cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } regions := []testRegionInfo{} for i := 1; i <= 1000; i += 2 { regions = append(regions, @@ -1797,6 +1806,9 @@ func TestHotCacheByteAndKey(t *testing.T) { cancel, _, tc, _ := prepareSchedulersTest() defer cancel() tc.SetHotRegionCacheHitsThreshold(0) + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } statistics.ThresholdsUpdateInterval = 0 defer func() { statistics.ThresholdsUpdateInterval = 8 * time.Second @@ -1923,6 +1935,9 @@ func TestHotCacheCheckRegionFlow(t *testing.T) { func checkHotCacheCheckRegionFlow(re *require.Assertions, testCase testHotCacheCheckRegionFlowCase, enablePlacementRules bool) { cancel, _, tc, oc := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} @@ -1998,6 +2013,9 @@ func TestHotCacheCheckRegionFlowWithDifferentThreshold(t *testing.T) { func checkHotCacheCheckRegionFlowWithDifferentThreshold(re *require.Assertions, enablePlacementRules bool) { cancel, _, tc, _ := prepareSchedulersTest() defer cancel() + for i := 0; i < 3; i++ { + tc.PutStore(core.NewStoreInfo(&metapb.Store{Id: uint64(i + 1)})) + } tc.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.Version4_0)) tc.SetEnablePlacementRules(enablePlacementRules) labels := []string{"zone", "host"} diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index 799fb240d10..68b496c26fa 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -38,11 +38,11 @@ type HotCache struct { } // NewHotCache creates a new hot spot cache. -func NewHotCache(ctx context.Context) *HotCache { +func NewHotCache(ctx context.Context, cluster *core.BasicCluster) *HotCache { w := &HotCache{ ctx: ctx, - writeCache: NewHotPeerCache(ctx, utils.Write), - readCache: NewHotPeerCache(ctx, utils.Read), + writeCache: NewHotPeerCache(ctx, cluster, utils.Write), + readCache: NewHotPeerCache(ctx, cluster, utils.Read), } go w.updateItems(w.readCache.taskQueue, w.runReadTask) go w.updateItems(w.writeCache.taskQueue, w.runWriteTask) diff --git a/pkg/statistics/hot_peer_cache.go b/pkg/statistics/hot_peer_cache.go index 0e35e0e23be..f5700f3fa83 100644 --- a/pkg/statistics/hot_peer_cache.go +++ b/pkg/statistics/hot_peer_cache.go @@ -60,6 +60,7 @@ type thresholds struct { // hotPeerCache saves the hot peer's statistics. type hotPeerCache struct { kind utils.RWType + cluster *core.BasicCluster peersOfStore map[uint64]*utils.TopN // storeID -> hot peers storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs @@ -67,13 +68,14 @@ type hotPeerCache struct { taskQueue *chanx.UnboundedChan[FlowItemTask] thresholdsOfStore map[uint64]*thresholds // storeID -> thresholds metrics map[uint64][utils.ActionTypeLen]prometheus.Gauge // storeID -> metrics - // TODO: consider to remove store info when store is offline. + lastGCTime time.Time } // NewHotPeerCache creates a hotPeerCache -func NewHotPeerCache(ctx context.Context, kind utils.RWType) *hotPeerCache { +func NewHotPeerCache(ctx context.Context, cluster *core.BasicCluster, kind utils.RWType) *hotPeerCache { return &hotPeerCache{ kind: kind, + cluster: cluster, peersOfStore: make(map[uint64]*utils.TopN), storesOfRegion: make(map[uint64]map[uint64]struct{}), regionsOfStore: make(map[uint64]map[uint64]struct{}), @@ -114,6 +116,7 @@ func (f *hotPeerCache) updateStat(item *HotPeerStat) { return } f.incMetrics(item.actionType, item.StoreID) + f.gc() } func (f *hotPeerCache) incMetrics(action utils.ActionType, storeID uint64) { @@ -544,6 +547,36 @@ func (f *hotPeerCache) removeItem(item *HotPeerStat) { } } +func (f *hotPeerCache) gc() { + if time.Since(f.lastGCTime) < f.topNTTL { + return + } + f.lastGCTime = time.Now() + // remove tombstone stores + stores := make(map[uint64]struct{}) + for _, storeID := range f.cluster.GetStores() { + stores[storeID.GetID()] = struct{}{} + } + for storeID := range f.peersOfStore { + if _, ok := stores[storeID]; !ok { + delete(f.peersOfStore, storeID) + delete(f.regionsOfStore, storeID) + delete(f.thresholdsOfStore, storeID) + delete(f.metrics, storeID) + } + } + // remove expired items + for _, peers := range f.peersOfStore { + regions := peers.RemoveExpired() + for _, regionID := range regions { + delete(f.storesOfRegion, regionID) + for storeID := range f.regionsOfStore { + delete(f.regionsOfStore[storeID], regionID) + } + } + } +} + // removeAllItem removes all items of the cache. // It is used for test. func (f *hotPeerCache) removeAllItem() { diff --git a/pkg/statistics/hot_peer_cache_test.go b/pkg/statistics/hot_peer_cache_test.go index 36f922d3830..8e69e4e4141 100644 --- a/pkg/statistics/hot_peer_cache_test.go +++ b/pkg/statistics/hot_peer_cache_test.go @@ -18,6 +18,7 @@ import ( "context" "math/rand" "sort" + "sync" "testing" "time" @@ -26,28 +27,12 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/movingaverage" "github.com/tikv/pd/pkg/statistics/utils" "github.com/tikv/pd/pkg/utils/typeutil" ) -func TestStoreTimeUnsync(t *testing.T) { - re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Write) - intervals := []uint64{120, 60} - for _, interval := range intervals { - region := buildRegion(utils.Write, 3, interval) - checkAndUpdate(re, cache, region, 3) - { - stats := cache.RegionStats(0) - re.Len(stats, 3) - for _, s := range stats { - re.Len(s, 1) - } - } - } -} - type operator int const ( @@ -79,8 +64,9 @@ func TestCache(t *testing.T) { utils.Read: 3, // all peers utils.Write: 3, // all peers } - cache := NewHotPeerCache(context.Background(), test.kind) - region := buildRegion(test.kind, 3, 60) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, test.kind) + region := buildRegion(cluster, test.kind, 3, 60) checkAndUpdate(re, cache, region, defaultSize[test.kind]) checkHit(re, cache, region, test.kind, utils.Add) // all peers are new @@ -252,12 +238,31 @@ func pickFollower(region *core.RegionInfo) (index int, peer *metapb.Peer) { return dst, meta.Peers[dst] } -func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.RegionInfo { - peers := newPeers(peerCount, - func(i int) uint64 { return uint64(10000 + i) }, - func(i int) uint64 { return uint64(i) }) +var ( + idAllocator *mockid.IDAllocator + once sync.Once +) + +func getIDAllocator() *mockid.IDAllocator { + once.Do(func() { + idAllocator = mockid.NewIDAllocator() + }) + return idAllocator +} + +func buildRegion(cluster *core.BasicCluster, kind utils.RWType, peerCount int, interval uint64) (region *core.RegionInfo) { + peers := make([]*metapb.Peer, 0, peerCount) + for i := 0; i < peerCount; i++ { + id, _ := getIDAllocator().Alloc() + storeID, _ := getIDAllocator().Alloc() + peers = append(peers, &metapb.Peer{ + Id: id, + StoreId: storeID, + }) + } + id, _ := getIDAllocator().Alloc() meta := &metapb.Region{ - Id: 1000, + Id: id, Peers: peers, StartKey: []byte(""), EndKey: []byte(""), @@ -268,7 +273,7 @@ func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.Region switch kind { case utils.Read: - return core.NewRegionInfo( + region = core.NewRegionInfo( meta, leader, core.SetReportInterval(0, interval), @@ -277,7 +282,7 @@ func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.Region core.SetReadQuery(1024*interval), ) case utils.Write: - return core.NewRegionInfo( + region = core.NewRegionInfo( meta, leader, core.SetReportInterval(0, interval), @@ -285,31 +290,21 @@ func buildRegion(kind utils.RWType, peerCount int, interval uint64) *core.Region core.SetWrittenKeys(10*units.MiB*interval), core.SetWrittenQuery(1024*interval), ) - default: - return nil } -} - -type genID func(i int) uint64 - -func newPeers(n int, pid genID, sid genID) []*metapb.Peer { - peers := make([]*metapb.Peer, 0, n) - for i := 1; i <= n; i++ { - peer := &metapb.Peer{ - Id: pid(i), - } - peer.StoreId = sid(i) - peers = append(peers, peer) + for _, peer := range region.GetPeers() { + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: peer.GetStoreId()}, core.SetLastHeartbeatTS(time.Now()))) } - return peers + return region } func TestUpdateHotPeerStat(t *testing.T) { re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Read) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) storeID, regionID := uint64(1), uint64(2) peer := &metapb.Peer{StoreId: storeID} region := core.NewRegionInfo(&metapb.Region{Id: regionID, Peers: []*metapb.Peer{peer}}, peer) + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))) // we statistic read peer info from store heartbeat rather than region heartbeat m := utils.RegionHeartBeatReportInterval / utils.StoreHeartBeatReportInterval ThresholdsUpdateInterval = 0 @@ -400,8 +395,10 @@ func TestThresholdWithUpdateHotPeerStat(t *testing.T) { } func testMetrics(re *require.Assertions, interval, byteRate, expectThreshold float64) { - cache := NewHotPeerCache(context.Background(), utils.Read) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) storeID := uint64(1) + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))) re.GreaterOrEqual(byteRate, utils.MinHotThresholds[utils.RegionReadBytes]) ThresholdsUpdateInterval = 0 defer func() { @@ -447,8 +444,9 @@ func TestRemoveFromCache(t *testing.T) { interval := uint64(5) checkers := []check{checkAndUpdate, checkAndUpdateWithOrdering} for _, checker := range checkers { - cache := NewHotPeerCache(context.Background(), utils.Write) - region := buildRegion(utils.Write, peerCount, interval) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + region := buildRegion(cluster, utils.Write, peerCount, interval) // prepare intervalSums := make(map[uint64]int) for i := 1; i <= 200; i++ { @@ -482,8 +480,9 @@ func TestRemoveFromCacheRandom(t *testing.T) { for _, peerCount := range peerCounts { for _, interval := range intervals { for _, checker := range checkers { - cache := NewHotPeerCache(context.Background(), utils.Write) - region := buildRegion(utils.Write, peerCount, interval) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + region := buildRegion(cluster, utils.Write, peerCount, interval) target := uint64(10) intervalSums := make(map[uint64]int) @@ -536,8 +535,9 @@ func checkCoolDown(re *require.Assertions, cache *hotPeerCache, region *core.Reg func TestCoolDownTransferLeader(t *testing.T) { re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Read) - region := buildRegion(utils.Read, 3, 60) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) + region := buildRegion(cluster, utils.Read, 3, 60) moveLeader := func() { _, region = schedule(re, movePeer, region, 10) @@ -569,8 +569,9 @@ func TestCoolDownTransferLeader(t *testing.T) { } testCases := []func(){moveLeader, transferLeader, movePeer, addReplica, removeReplica} for _, testCase := range testCases { - cache = NewHotPeerCache(context.Background(), utils.Read) - region = buildRegion(utils.Read, 3, 60) + cluster = core.NewBasicCluster() + cache = NewHotPeerCache(context.Background(), cluster, utils.Read) + region = buildRegion(cluster, utils.Read, 3, 60) for i := 1; i <= 200; i++ { checkAndUpdate(re, cache, region) } @@ -582,8 +583,9 @@ func TestCoolDownTransferLeader(t *testing.T) { // See issue #4510 func TestCacheInherit(t *testing.T) { re := require.New(t) - cache := NewHotPeerCache(context.Background(), utils.Read) - region := buildRegion(utils.Read, 3, 10) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) + region := buildRegion(cluster, utils.Read, 3, 10) // prepare for i := 1; i <= 200; i++ { checkAndUpdate(re, cache, region) @@ -673,13 +675,16 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { re := require.New(t) testWithUpdateInterval := func(interval time.Duration) { ThresholdsUpdateInterval = interval - cache := NewHotPeerCache(context.Background(), utils.Write) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) now := time.Now() + storeID := uint64(1) for id := uint64(0); id < 100; id++ { meta := &metapb.Region{ Id: id, - Peers: []*metapb.Peer{{Id: id, StoreId: 1}}, + Peers: []*metapb.Peer{{Id: id, StoreId: storeID}}, } + cluster.PutStore(core.NewStoreInfo(&metapb.Store{Id: storeID}, core.SetLastHeartbeatTS(time.Now()))) region := core.NewRegionInfo(meta, meta.Peers[0], core.SetWrittenBytes(id*6000), core.SetWrittenKeys(id*6000), core.SetWrittenQuery(id*6000)) for i := 0; i < 10; i++ { start := uint64(now.Add(time.Minute * time.Duration(i)).Unix()) @@ -714,9 +719,53 @@ func TestHotPeerCacheTopNThreshold(t *testing.T) { testWithUpdateInterval(0) } +func TestRemoveExpireItems(t *testing.T) { + re := require.New(t) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + cache.topNTTL = 100 * time.Millisecond + // case1: remove expired items + region1 := buildRegion(cluster, utils.Write, 3, 10) + checkAndUpdate(re, cache, region1) + re.NotEmpty(cache.storesOfRegion[region1.GetID()]) + time.Sleep(cache.topNTTL) + region2 := buildRegion(cluster, utils.Write, 3, 10) + checkAndUpdate(re, cache, region2) + re.Empty(cache.storesOfRegion[region1.GetID()]) + re.NotEmpty(cache.storesOfRegion[region2.GetID()]) + time.Sleep(cache.topNTTL) + // case2: remove items when the store is not exist + re.NotNil(cache.peersOfStore[region1.GetLeader().GetStoreId()]) + re.NotNil(cache.peersOfStore[region2.GetLeader().GetStoreId()]) + cluster.ResetStores() + re.Empty(cluster.GetStores()) + region3 := buildRegion(cluster, utils.Write, 3, 10) + checkAndUpdate(re, cache, region3) + re.Nil(cache.peersOfStore[region1.GetLeader().GetStoreId()]) + re.Nil(cache.peersOfStore[region2.GetLeader().GetStoreId()]) + re.NotEmpty(cache.regionsOfStore[region3.GetLeader().GetStoreId()]) +} + +func TestDifferentReportInterval(t *testing.T) { + re := require.New(t) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Write) + region := buildRegion(cluster, utils.Write, 3, 5) + for _, interval := range []uint64{120, 60, 30} { + region = region.Clone(core.SetReportInterval(0, interval)) + checkAndUpdate(re, cache, region, 3) + stats := cache.RegionStats(0) + re.Len(stats, 3) + for _, s := range stats { + re.Len(s, 1) + } + } +} + func BenchmarkCheckRegionFlow(b *testing.B) { - cache := NewHotPeerCache(context.Background(), utils.Read) - region := buildRegion(utils.Read, 3, 10) + cluster := core.NewBasicCluster() + cache := NewHotPeerCache(context.Background(), cluster, utils.Read) + region := buildRegion(cluster, utils.Read, 3, 10) peerInfos := make([]*core.PeerInfo, 0) for _, peer := range region.GetPeers() { peerInfo := core.NewPeerInfo(peer, region.GetLoads(), 10) diff --git a/pkg/statistics/hot_stat.go b/pkg/statistics/hot_stat.go index 3cda52aba89..c9276f11b20 100644 --- a/pkg/statistics/hot_stat.go +++ b/pkg/statistics/hot_stat.go @@ -17,6 +17,7 @@ package statistics import ( "context" + "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/statistics/buckets" ) @@ -28,9 +29,9 @@ type HotStat struct { } // NewHotStat creates the container to hold cluster's hotspot statistics. -func NewHotStat(ctx context.Context) *HotStat { +func NewHotStat(ctx context.Context, cluster *core.BasicCluster) *HotStat { return &HotStat{ - HotCache: NewHotCache(ctx), + HotCache: NewHotCache(ctx, cluster), StoresStats: NewStoresStats(), HotBucketCache: buckets.NewBucketsCache(ctx), } diff --git a/pkg/statistics/utils/topn.go b/pkg/statistics/utils/topn.go index 7ab6c6eaf3e..c010f1f8481 100644 --- a/pkg/statistics/utils/topn.go +++ b/pkg/statistics/utils/topn.go @@ -97,15 +97,14 @@ func (tn *TopN) Put(item TopNItem) (isUpdate bool) { isUpdate = stn.Put(item) } tn.ttlLst.Put(item.ID()) - tn.maintain() return } // RemoveExpired deletes all expired items. -func (tn *TopN) RemoveExpired() { +func (tn *TopN) RemoveExpired() []uint64 { tn.rw.Lock() defer tn.rw.Unlock() - tn.maintain() + return tn.maintain() } // Remove deletes the item by given ID and returns it. @@ -116,16 +115,18 @@ func (tn *TopN) Remove(id uint64) (item TopNItem) { item = stn.Remove(id) } _ = tn.ttlLst.Remove(id) - tn.maintain() return } -func (tn *TopN) maintain() { +func (tn *TopN) maintain() []uint64 { + ids := make([]uint64, 0) for _, id := range tn.ttlLst.TakeExpired() { for _, stn := range tn.topns { stn.Remove(id) + ids = append(ids, id) } } + return ids } type singleTopN struct { diff --git a/pkg/statistics/utils/topn_test.go b/pkg/statistics/utils/topn_test.go index f92d5a61f34..4f2bc5e4f3b 100644 --- a/pkg/statistics/utils/topn_test.go +++ b/pkg/statistics/utils/topn_test.go @@ -208,6 +208,7 @@ func TestTTL(t *testing.T) { putPerm(re, tn, Total, func(x int) float64 { return float64(-x) }, false /*insert*/) + re.Len(tn.GetAll(), Total) time.Sleep(900 * time.Millisecond) { @@ -217,6 +218,8 @@ func TestTTL(t *testing.T) { } re.True(tn.Put(item)) } + re.Len(tn.RemoveExpired(), (Total-1)*DimLen) + for i := 3; i < Total; i += 3 { item := &item{id: uint64(i), values: []float64{float64(-i) + 100}} for k := 1; k < DimLen; k++ { @@ -224,7 +227,6 @@ func TestTTL(t *testing.T) { } re.False(tn.Put(item)) } - tn.RemoveExpired() re.Equal(Total/3+1, tn.Len()) items := tn.GetAllTopN(0) diff --git a/server/cluster/cluster_test.go b/server/cluster/cluster_test.go index dc0f7966761..c65369fa63a 100644 --- a/server/cluster/cluster_test.go +++ b/server/cluster/cluster_test.go @@ -602,7 +602,10 @@ func TestRegionHeartbeatHotStat(t *testing.T) { re.NoError(err) cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend()) cluster.coordinator = schedule.NewCoordinator(ctx, cluster, nil) - newTestStores(4, "2.0.0") + stores := newTestStores(4, "2.0.0") + for _, store := range stores { + cluster.PutStore(store.GetMeta()) + } peers := []*metapb.Peer{ { Id: 1, diff --git a/server/cluster/scheduling_controller.go b/server/cluster/scheduling_controller.go index 322ccc94d0e..6a63060022f 100644 --- a/server/cluster/scheduling_controller.go +++ b/server/cluster/scheduling_controller.go @@ -67,7 +67,7 @@ func newSchedulingController(parentCtx context.Context, basicCluster *core.Basic BasicCluster: basicCluster, opt: opt, labelStats: statistics.NewLabelStatistics(), - hotStat: statistics.NewHotStat(parentCtx), + hotStat: statistics.NewHotStat(parentCtx, basicCluster), slowStat: statistics.NewSlowStat(parentCtx), regionStats: statistics.NewRegionStatistics(basicCluster, opt, ruleManager), }