From adbd23c5d6f9ad5f30f4df6a819a8674e5c9316e Mon Sep 17 00:00:00 2001 From: Lulu Zhang Date: Mon, 2 Dec 2024 15:27:23 -0800 Subject: [PATCH] [raft] rebalance leases in driver (#7992) --- enterprise/server/raft/driver/driver.go | 271 +++++++++++++++---- enterprise/server/raft/driver/driver_test.go | 121 ++++++++- enterprise/server/raft/store/store.go | 1 + 3 files changed, 345 insertions(+), 48 deletions(-) diff --git a/enterprise/server/raft/driver/driver.go b/enterprise/server/raft/driver/driver.go index 939b72bb559..a5a8a85b222 100644 --- a/enterprise/server/raft/driver/driver.go +++ b/enterprise/server/raft/driver/driver.go @@ -53,7 +53,8 @@ const ( DriverRemoveDeadReplica DriverAddReplica DriverReplaceDeadReplica - DriverConsiderRebalance + DriverRebalanceReplica + DriverRebalanceLease ) const ( @@ -67,23 +68,30 @@ const ( // considered around the mean. replicaCountMeanRatioThreshold = .05 // The minimum number of ranges by which a store must deviate from the mean - // to be considerred above or below the mean. + // to be considered above or below the mean. minReplicaCountThreshold = 2 + + // Similar to replica count mean ration Threshold; but for lease count + // instead. + leaseCountMeanRatioThreshold = .05 + // The minimum number of leases by which a store must deviate from the mean + // to be considered above or below the mean. + minLeaseCountThreshold = 2 ) func (a DriverAction) Priority() float64 { switch a { case DriverReplaceDeadReplica: - return 500 + return 600 case DriverAddReplica: - return 400 + return 500 case DriverRemoveDeadReplica: - return 300 + return 400 case DriverRemoveReplica: - return 200 + return 300 case DriverSplitRange: - return 100 - case DriverConsiderRebalance, DriverNoop: + return 200 + case DriverRebalanceReplica, DriverRebalanceLease, DriverNoop: return 0 default: alert.UnexpectedEvent("unknown-driver-action", "unknown driver action %s", a) @@ -103,8 +111,10 @@ func (a DriverAction) String() string { return "add-replica" case DriverReplaceDeadReplica: return "replace-dead-replica" - case DriverConsiderRebalance: - return "consider-rebalance" + case DriverRebalanceReplica: + return "consider-rebalance-replica" + case DriverRebalanceLease: + return "consider-rebalance-lease" case DriverSplitRange: return "split-range" default: @@ -126,6 +136,7 @@ type IStore interface { RemoveReplica(ctx context.Context, req *rfpb.RemoveReplicaRequest) (*rfpb.RemoveReplicaResponse, error) GetReplicaStates(ctx context.Context, rd *rfpb.RangeDescriptor) map[uint64]constants.ReplicaState SplitRange(ctx context.Context, req *rfpb.SplitRangeRequest) (*rfpb.SplitRangeResponse, error) + TransferLeadership(ctx context.Context, req *rfpb.TransferLeadershipRequest) (*rfpb.TransferLeadershipResponse, error) NHID() string } @@ -137,11 +148,12 @@ func computeQuorum(numNodes int) int { } // computeAction computes the action needed and its priority. -func (rq *Queue) computeAction(replicas []*rfpb.ReplicaDescriptor, usage *rfpb.ReplicaUsage) (DriverAction, float64) { +func (rq *Queue) computeAction(rd *rfpb.RangeDescriptor, usage *rfpb.ReplicaUsage, localReplicaID uint64) (DriverAction, float64) { if rq.storeMap == nil { action := DriverNoop return action, action.Priority() } + replicas := rd.GetReplicas() curReplicas := len(replicas) if curReplicas == 0 { action := DriverNoop @@ -200,7 +212,32 @@ func (rq *Queue) computeAction(replicas []*rfpb.ReplicaDescriptor, usage *rfpb.R } } - action := DriverConsiderRebalance + if rd.GetRangeId() == constants.MetaRangeID { + // Do not try to re-balance meta-range. + // + // When meta-range is moved onto a different node, range cache has to + // update its range descriptor. Before the range descriptor get updated, + // SyncPropose to all other ranges can fail temporarily because the range + // descriptor is not current. Therefore, we should only move meta-range + // when it's absolutely necessary. + action := DriverNoop + return action, action.Priority() + } + + // For DriverConsiderRebalance check if there are rebalance opportunities. + storesWithStats := rq.storeMap.GetStoresWithStats() + op := rq.findRebalanceReplicaOp(rd, storesWithStats, localReplicaID) + if op != nil { + action := DriverRebalanceReplica + return action, action.Priority() + } + op = rq.findRebalanceLeaseOp(rd, localReplicaID) + if op != nil { + action := DriverRebalanceLease + return action, action.Priority() + } + + action := DriverNoop return action, action.Priority() } @@ -304,29 +341,12 @@ func (rq *Queue) shouldQueue(ctx context.Context, repl IReplica) (bool, float64) rq.log.Errorf("failed to get Usage of replica c%dn%d", repl.RangeID(), repl.ReplicaID()) } - action, priority := rq.computeAction(rd.GetReplicas(), usage) + action, priority := rq.computeAction(rd, usage, repl.ReplicaID()) if action == DriverNoop { rq.log.Debugf("should not queue because no-op") return false, 0 - } else if action != DriverConsiderRebalance { - return true, priority } - - if rd.GetRangeId() == constants.MetaRangeID { - // Do not try to re-balance meta-range. - // - // When meta-range is moved onto a different node, range cache has to - // update its range descriptor. Before the range descriptor get updated, - // SyncPropose to all other ranges can fail temporarily because the range - // descriptor is not current. Therefore, we should only move meta-range - // when it's absolutely necessary. - return false, 0 - } - - // For DriverConsiderRebalance check if there are rebalance opportunities. - storesWithStats := rq.storeMap.GetStoresWithStats() - op := rq.findRebalanceOp(rd, storesWithStats, repl.ReplicaID()) - return op != nil, 0 + return true, priority } func (rq *Queue) pushLocked(item *pqItem) { @@ -513,9 +533,10 @@ func (rq *Queue) findNodeForAllocation(rd *rfpb.RangeDescriptor, storesWithStats } type change struct { - addOp *rfpb.AddReplicaRequest - removeOp *rfpb.RemoveReplicaRequest - splitOp *rfpb.SplitRangeRequest + addOp *rfpb.AddReplicaRequest + removeOp *rfpb.RemoveReplicaRequest + splitOp *rfpb.SplitRangeRequest + transferLeadershipOp *rfpb.TransferLeadershipRequest } func (rq *Queue) splitRange(rd *rfpb.RangeDescriptor) *change { @@ -598,7 +619,7 @@ func compareOp(op1 *rebalanceOp, op2 *rebalanceOp) int { return compareByScore(op1.to, op2.to) } -func canConvergeByRebalance(choice *rebalanceChoice, allStores *storemap.StoresWithStats) bool { +func canConvergeByRebalanceReplica(choice *rebalanceChoice, allStores *storemap.StoresWithStats) bool { if len(choice.candidates) == 0 { return false } @@ -620,6 +641,28 @@ func canConvergeByRebalance(choice *rebalanceChoice, allStores *storemap.StoresW return false } +func canConvergeByRebalanceLease(choice *rebalanceChoice, allStores *storemap.StoresWithStats) bool { + if len(choice.candidates) == 0 { + return false + } + overfullThreshold := int64(math.Ceil(aboveMeanLeaseCountThreshold(allStores.LeaseCount.Mean))) + // The existing store is too far above the mean. + if choice.existing.usage.LeaseCount > overfullThreshold { + return true + } + + // The existing store is above the mean, but not too far; but there is at least one other store that is too far below the mean. + if float64(choice.existing.usage.LeaseCount) > allStores.LeaseCount.Mean { + underfullThreshold := int64(math.Floor(belowMeanReplicaCountThreshold(allStores.LeaseCount.Mean))) + for _, c := range choice.candidates { + if c.usage.LeaseCount < underfullThreshold { + return true + } + } + } + return false +} + func findReplicaWithNHID(rd *rfpb.RangeDescriptor, nhid string) (uint64, error) { for _, replica := range rd.GetReplicas() { if replica.GetNhid() == nhid { @@ -629,17 +672,17 @@ func findReplicaWithNHID(rd *rfpb.RangeDescriptor, nhid string) (uint64, error) return 0, status.InternalErrorf("cannot find replica with NHID: %s", nhid) } -func (rq *Queue) rebalance(rd *rfpb.RangeDescriptor, localRepl IReplica) *change { +func (rq *Queue) rebalanceReplica(rd *rfpb.RangeDescriptor, localRepl IReplica) *change { storesWithStats := rq.storeMap.GetStoresWithStats() - op := rq.findRebalanceOp(rd, storesWithStats, localRepl.ReplicaID()) + op := rq.findRebalanceReplicaOp(rd, storesWithStats, localRepl.ReplicaID()) if op == nil { return nil } - rq.log.Debugf("found rebalance op for range %d: %s -> %s", rd.GetRangeId(), op.from.nhid, op.to.nhid) + rq.log.Debugf("found rebalance replica op for range %d: %s -> %s", rd.GetRangeId(), op.from.nhid, op.to.nhid) replicaID, err := findReplicaWithNHID(rd, op.from.usage.GetNode().GetNhid()) if err != nil { - rq.log.Errorf("failed to rebalance: %s", err) + rq.log.Errorf("failed to rebalance replica: %s", err) return nil } @@ -655,7 +698,92 @@ func (rq *Queue) rebalance(rd *rfpb.RangeDescriptor, localRepl IReplica) *change } } -func (rq *Queue) findRebalanceOp(rd *rfpb.RangeDescriptor, storesWithStats *storemap.StoresWithStats, localReplicaID uint64) *rebalanceOp { +func (rq *Queue) rebalanceLease(rd *rfpb.RangeDescriptor, localRepl IReplica) *change { + op := rq.findRebalanceLeaseOp(rd, localRepl.ReplicaID()) + if op == nil { + return nil + } + rq.log.Debugf("found rebalance lease op for range %d: %s -> %s", rd.GetRangeId(), op.from.nhid, op.to.nhid) + replicaID, err := findReplicaWithNHID(rd, op.from.usage.GetNode().GetNhid()) + if err != nil { + rq.log.Errorf("failed to rebalance lease: %s", err) + return nil + } + return &change{ + transferLeadershipOp: &rfpb.TransferLeadershipRequest{ + RangeId: rd.GetRangeId(), + TargetReplicaId: replicaID, + }, + } +} + +func (rq *Queue) findRebalanceLeaseOp(rd *rfpb.RangeDescriptor, localReplicaID uint64) *rebalanceOp { + var existing *candidate + nhids := make([]string, 0, len(rd.GetReplicas())) + existingNHID := "" + for _, repl := range rd.GetReplicas() { + nhids = append(nhids, repl.GetNhid()) + if repl.GetReplicaId() == localReplicaID { + existingNHID = repl.GetNhid() + } + } + storesWithStats := rq.storeMap.GetStoresWithStatsFromIDs(nhids) + allStores := make(map[string]*candidate) + for _, su := range storesWithStats.Usages { + nhid := su.GetNode().GetNhid() + store := &candidate{ + nhid: nhid, + usage: su, + } + allStores[nhid] = store + if nhid == existingNHID { + existing = store + } + } + + if existing == nil { + rq.log.Warningf("failed to find existing store for c%dn%d", rd.GetRangeId(), localReplicaID) + return nil + } + + existing.leaseCount = existing.usage.LeaseCount + existing.leaseCountMeanLevel = leaseCountMeanLevel(storesWithStats, existing.usage) + choice := &rebalanceChoice{ + existing: existing, + candidates: make([]*candidate, 0, len(rd.GetReplicas())-1), + } + for _, repl := range rd.GetReplicas() { + if repl.GetReplicaId() == localReplicaID { + continue + } + store, ok := allStores[repl.GetNhid()] + if !ok { + // The store might not be available. + continue + } + choice.candidates = append(choice.candidates, &candidate{ + nhid: repl.GetNhid(), + usage: store.usage, + leaseCount: store.usage.LeaseCount, + leaseCountMeanLevel: leaseCountMeanLevel(storesWithStats, store.usage), + }) + } + if !canConvergeByRebalanceLease(choice, storesWithStats) { + return nil + } + + best := slices.MaxFunc(choice.candidates, compareByScoreAndID) + + if compareByScore(best, existing) < 0 { + return nil + } + return &rebalanceOp{ + from: existing, + to: best, + } +} + +func (rq *Queue) findRebalanceReplicaOp(rd *rfpb.RangeDescriptor, storesWithStats *storemap.StoresWithStats, localReplicaID uint64) *rebalanceOp { allStores := make(map[string]*candidate) existingStores := make(map[string]*candidate) @@ -722,8 +850,9 @@ func (rq *Queue) findRebalanceOp(rd *rfpb.RangeDescriptor, storesWithStats *stor if !needRebalance { for _, choice := range choices { - if canConvergeByRebalance(choice, storesWithStats) { + if canConvergeByRebalanceReplica(choice, storesWithStats) { needRebalance = true + break } } } @@ -915,6 +1044,14 @@ func (rq *Queue) applyChange(ctx context.Context, change *change) error { } rq.log.Infof("RemoveReplicaRequest finished: %+v", change.removeOp) } + if change.transferLeadershipOp != nil { + _, err := rq.store.TransferLeadership(ctx, change.transferLeadershipOp) + if err != nil { + rq.log.Errorf("TransferLeadership %+v err: %s", change.transferLeadershipOp, err) + return err + } + rq.log.Infof("TransferLeadershipRequest finished: %+v", change.transferLeadershipOp) + } return nil } @@ -930,7 +1067,7 @@ func (rq *Queue) processReplica(ctx context.Context, repl IReplica) (bool, error if err != nil { rq.log.Errorf("failed to get Usage of replica c%dn%d", repl.RangeID(), repl.ReplicaID()) } - action, _ := rq.computeAction(rd.GetReplicas(), usage) + action, _ := rq.computeAction(rd, usage, repl.ReplicaID()) var change *change @@ -951,9 +1088,12 @@ func (rq *Queue) processReplica(ctx context.Context, repl IReplica) (bool, error case DriverRemoveDeadReplica: rq.log.Debugf("remove dead replica (range_id: %d)", repl.RangeID()) change = rq.removeDeadReplica(rd) - case DriverConsiderRebalance: - rq.log.Debugf("consider rebalance: (range_id: %d)", repl.RangeID()) - change = rq.rebalance(rd, repl) + case DriverRebalanceReplica: + rq.log.Debugf("consider rebalance replica: (range_id: %d)", repl.RangeID()) + change = rq.rebalanceReplica(rd, repl) + case DriverRebalanceLease: + rq.log.Debugf("consider rebalance lease: (range_id: %d)", repl.RangeID()) + change = rq.rebalanceLease(rd, repl) } if change == nil { @@ -966,7 +1106,7 @@ func (rq *Queue) processReplica(ctx context.Context, repl IReplica) (bool, error rq.log.Warningf("Error apply change to range_id: %d: %s", repl.RangeID(), err) } - if action == DriverNoop || action == DriverConsiderRebalance { + if action == DriverNoop || action == DriverRebalanceReplica || action == DriverRebalanceLease { return false, err } return true, err @@ -1011,6 +1151,14 @@ func belowMeanReplicaCountThreshold(mean float64) float64 { return mean - math.Max(mean*replicaCountMeanRatioThreshold, minReplicaCountThreshold) } +func aboveMeanLeaseCountThreshold(mean float64) float64 { + return mean + math.Max(mean*leaseCountMeanRatioThreshold, minLeaseCountThreshold) +} + +func belowMeanLeaseCountThreshold(mean float64) float64 { + return mean - math.Max(mean*leaseCountMeanRatioThreshold, minLeaseCountThreshold) +} + type meanLevel int const ( @@ -1025,6 +1173,8 @@ type candidate struct { fullDisk bool replicaCountMeanLevel meanLevel replicaCount int64 + leaseCount int64 + leaseCountMeanLevel meanLevel } // compare returns @@ -1059,6 +1209,23 @@ func compareByScore(a *candidate, b *candidate) int { } else if a.replicaCount > b.replicaCount { return -int(math.Ceil(diff / float64(a.replicaCount) * 10)) } + + // [10, 12] or [-12, -10] + if a.leaseCountMeanLevel != b.leaseCountMeanLevel { + score := int(10 + math.Abs(float64(a.leaseCountMeanLevel-b.leaseCountMeanLevel))) + if a.leaseCountMeanLevel > b.leaseCountMeanLevel { + return score + } + return -score + } + + // (-10, 10) + leaseCountDiff := math.Abs(float64(a.leaseCount - b.leaseCount)) + if a.leaseCount < b.leaseCount { + return int(math.Ceil(leaseCountDiff / float64(b.leaseCount) * 10)) + } else if a.leaseCount > b.leaseCount { + return -int(math.Ceil(leaseCountDiff / float64(a.leaseCount) * 10)) + } return 0 } @@ -1080,3 +1247,15 @@ func replicaCountMeanLevel(storesWithStats *storemap.StoresWithStats, su *rfpb.S } return aroundMean } + +func leaseCountMeanLevel(storesWithStats *storemap.StoresWithStats, su *rfpb.StoreUsage) meanLevel { + maxLeaseCount := aboveMeanReplicaCountThreshold(storesWithStats.LeaseCount.Mean) + minLeaseCount := belowMeanReplicaCountThreshold(storesWithStats.LeaseCount.Mean) + curLeaseCount := float64(su.GetLeaseCount()) + if curLeaseCount < minLeaseCount { + return belowMean + } else if curLeaseCount >= maxLeaseCount { + return aboveMean + } + return aroundMean +} diff --git a/enterprise/server/raft/driver/driver_test.go b/enterprise/server/raft/driver/driver_test.go index 3692f17d23e..c1441a2151c 100644 --- a/enterprise/server/raft/driver/driver_test.go +++ b/enterprise/server/raft/driver/driver_test.go @@ -447,7 +447,7 @@ func TestFindReplicaForRemoval(t *testing.T) { } } -func TestRebalance(t *testing.T) { +func TestRebalanceReplica(t *testing.T) { localReplicaID := uint64(1) tests := []struct { desc string @@ -657,7 +657,7 @@ func TestRebalance(t *testing.T) { storeMap := newTestStoreMap(tc.usages) rq := &Queue{log: log.NamedSubLogger("test"), storeMap: storeMap} storesWithStats := storemap.CreateStoresWithStats(tc.usages) - actual := rq.findRebalanceOp(tc.rd, storesWithStats, localReplicaID) + actual := rq.findRebalanceReplicaOp(tc.rd, storesWithStats, localReplicaID) if tc.expected != nil { require.NotNil(t, actual) require.Equal(t, tc.expected.from.nhid, actual.from.nhid) @@ -668,3 +668,120 @@ func TestRebalance(t *testing.T) { }) } } + +func TestRebalanceLeases(t *testing.T) { + localReplicaID := uint64(1) + tests := []struct { + desc string + usages []*rfpb.StoreUsage + rd *rfpb.RangeDescriptor + expected *rebalanceOp + }{ + { + desc: "move-lease-to-node-far-below-mean", + rd: &rfpb.RangeDescriptor{ + RangeId: 1, + Replicas: []*rfpb.ReplicaDescriptor{ + {RangeId: 1, ReplicaId: 1, Nhid: proto.String("nhid-1")}, // local + {RangeId: 1, ReplicaId: 2, Nhid: proto.String("nhid-2")}, + {RangeId: 1, ReplicaId: 3, Nhid: proto.String("nhid-3")}, + }, + }, + usages: []*rfpb.StoreUsage{ + { + Node: &rfpb.NodeDescriptor{Nhid: "nhid-1"}, + LeaseCount: 70, + }, + { + Node: &rfpb.NodeDescriptor{Nhid: "nhid-2"}, + LeaseCount: 10, + }, + { + Node: &rfpb.NodeDescriptor{Nhid: "nhid-3"}, + LeaseCount: 20, + }, + { + Node: &rfpb.NodeDescriptor{Nhid: "nhid-4"}, + LeaseCount: 20, + }, + }, + expected: &rebalanceOp{ + from: &candidate{nhid: "nhid-1"}, + to: &candidate{nhid: "nhid-2"}, + }, + }, + { + desc: "no-reblance-when-around-mean", + rd: &rfpb.RangeDescriptor{ + RangeId: 1, + Replicas: []*rfpb.ReplicaDescriptor{ + {RangeId: 1, ReplicaId: 1, Nhid: proto.String("nhid-1")}, // local + {RangeId: 1, ReplicaId: 2, Nhid: proto.String("nhid-2")}, + {RangeId: 1, ReplicaId: 3, Nhid: proto.String("nhid-3")}, + }, + }, + usages: []*rfpb.StoreUsage{ + { + Node: &rfpb.NodeDescriptor{Nhid: "nhid-1"}, + LeaseCount: 30, + }, + { + Node: &rfpb.NodeDescriptor{Nhid: "nhid-2"}, + LeaseCount: 31, + }, + { + Node: &rfpb.NodeDescriptor{Nhid: "nhid-3"}, + LeaseCount: 29, + }, + }, + expected: nil, + }, + { + desc: "no-rebalance-with-good-choice", + rd: &rfpb.RangeDescriptor{ + RangeId: 1, + Replicas: []*rfpb.ReplicaDescriptor{ + {RangeId: 1, ReplicaId: 1, Nhid: proto.String("nhid-1")}, // local + {RangeId: 1, ReplicaId: 2, Nhid: proto.String("nhid-2")}, + {RangeId: 1, ReplicaId: 3, Nhid: proto.String("nhid-3")}, + }, + }, + usages: []*rfpb.StoreUsage{ + { + Node: &rfpb.NodeDescriptor{Nhid: "nhid-1"}, + LeaseCount: 70, + }, + { + Node: &rfpb.NodeDescriptor{Nhid: "nhid-2"}, + LeaseCount: 69, + }, + { + Node: &rfpb.NodeDescriptor{Nhid: "nhid-3"}, + LeaseCount: 69, + }, + { + Node: &rfpb.NodeDescriptor{Nhid: "nhid-4"}, + LeaseCount: 5, + }, + }, + expected: nil, + }, + } + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + storeMap := newTestStoreMap(tc.usages) + rq := &Queue{log: log.NamedSubLogger("test"), storeMap: storeMap} + actual := rq.findRebalanceLeaseOp(tc.rd, localReplicaID) + if tc.expected != nil { + require.NotNil(t, actual) + require.Equal(t, tc.expected.from.nhid, actual.from.nhid) + require.Equal(t, tc.expected.to.nhid, actual.to.nhid) + } else { + if actual != nil { + log.Infof("actual: from: %s to %s", actual.from.nhid, actual.to.nhid) + } + require.Nil(t, actual) + } + }) + } +} diff --git a/enterprise/server/raft/store/store.go b/enterprise/server/raft/store/store.go index f730cfa158b..c9da92683de 100644 --- a/enterprise/server/raft/store/store.go +++ b/enterprise/server/raft/store/store.go @@ -867,6 +867,7 @@ func (s *Store) IsLeader(rangeID uint64) bool { } func (s *Store) TransferLeadership(ctx context.Context, req *rfpb.TransferLeadershipRequest) (*rfpb.TransferLeadershipResponse, error) { + log.Debugf("request to transfer leadership of range %d to replica %d", req.GetRangeId(), req.GetTargetReplicaId()) if err := s.nodeHost.RequestLeaderTransfer(req.GetRangeId(), req.GetTargetReplicaId()); err != nil { return nil, err }