Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: consider leader score when evict leader #8912

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (c *RuleChecker) fixLooseMatchPeer(region *core.RegionInfo, fit *placement.
if region.GetLeader().GetId() != peer.GetId() && rf.Rule.Role == placement.Leader {
ruleCheckerFixLeaderRoleCounter.Inc()
if c.allowLeader(fit, peer) {
return operator.CreateTransferLeaderOperator("fix-leader-role", c.cluster, region, peer.GetStoreId(), []uint64{}, 0)
return operator.CreateTransferLeaderOperator("fix-leader-role", c.cluster, region, peer.GetStoreId(), 0)
}
ruleCheckerNotAllowLeaderCounter.Inc()
return nil, errPeerCannotBeLeader
Expand All @@ -329,7 +329,7 @@ func (c *RuleChecker) fixLooseMatchPeer(region *core.RegionInfo, fit *placement.
ruleCheckerFixFollowerRoleCounter.Inc()
for _, p := range region.GetPeers() {
if c.allowLeader(fit, p) {
return operator.CreateTransferLeaderOperator("fix-follower-role", c.cluster, region, p.GetStoreId(), []uint64{}, 0)
return operator.CreateTransferLeaderOperator("fix-follower-role", c.cluster, region, p.GetStoreId(), 0)
}
}
ruleCheckerNoNewLeaderCounter.Inc()
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ type SchedulerConfigProvider interface {
IsTraceRegionFlow() bool

GetTolerantSizeRatio() float64
GetLeaderSchedulePolicy() constant.SchedulePolicy

IsDebugMetricsEnabled() bool
IsDiagnosticAllowed() bool
Expand Down Expand Up @@ -112,6 +111,7 @@ type SharedConfigProvider interface {
IsCrossTableMergeEnabled() bool
IsOneWayMergeEnabled() bool
GetMergeScheduleLimit() uint64
GetLeaderSchedulePolicy() constant.SchedulePolicy
GetRegionScoreFormulaVersion() string
GetSchedulerMaxWaitingOperator() uint64
GetStoreLimitByType(uint64, storelimit.Type) float64
Expand Down
18 changes: 18 additions & 0 deletions pkg/schedule/filter/comparer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,24 @@ func RegionScoreComparer(conf config.SharedConfigProvider) StoreComparer {
}
}

// LeaderScoreComparer creates a StoreComparer to sort store by leader
// score.
func LeaderScoreComparer(conf config.SchedulerConfigProvider) StoreComparer {
leaderSchedulePolicy := conf.GetLeaderSchedulePolicy()
return func(a, b *core.StoreInfo) int {
sa := a.LeaderScore(leaderSchedulePolicy, 0)
sb := b.LeaderScore(leaderSchedulePolicy, 0)
switch {
case sa > sb:
return 1
case sa < sb:
return -1
default:
return 0
}
}
}

// IsolationComparer creates a StoreComparer to sort store by isolation score.
func IsolationComparer(locationLabels []string, regionStores []*core.StoreInfo) StoreComparer {
return func(a, b *core.StoreInfo) int {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (h *Handler) AddTransferLeaderOperator(regionID uint64, storeID uint64) err
return errors.Errorf("region has no voter in store %v", storeID)
}

op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, newLeader.GetStoreId(), []uint64{}, operator.OpAdmin)
op, err := operator.CreateTransferLeaderOperator("admin-transfer-leader", c, region, newLeader.GetStoreId(), operator.OpAdmin)
if err != nil {
log.Debug("fail to create transfer leader operator", errs.ZapError(err))
return err
Expand Down
19 changes: 0 additions & 19 deletions pkg/schedule/operator/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,25 +296,6 @@ func (b *Builder) SetLeader(storeID uint64) *Builder {
return b
}

// SetLeaders records all valid target leaders in Builder.
func (b *Builder) SetLeaders(storeIDs []uint64) *Builder {
if b.err != nil {
return b
}
sort.Slice(storeIDs, func(i, j int) bool { return storeIDs[i] < storeIDs[j] })
for _, storeID := range storeIDs {
peer := b.targetPeers[storeID]
if peer == nil || core.IsLearner(peer) || b.unhealthyPeers[storeID] != nil {
continue
}
b.targetLeaderStoreIDs = append(b.targetLeaderStoreIDs, storeID)
}
// Don't need to check if there's valid target, because `targetLeaderStoreIDs`
// can be empty if this is not a multi-target evict leader operation. Besides,
// `targetLeaderStoreID` must be valid and there must be at least one valid target.
return b
}

// SetPeers resets the target peer list.
//
// If peer's ID is 0, the builder will allocate a new ID later. If current
Expand Down
3 changes: 1 addition & 2 deletions pkg/schedule/operator/create_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,9 @@ func CreateRemovePeerOperator(desc string, ci sche.SharedCluster, kind OpKind, r
}

// CreateTransferLeaderOperator creates an operator that transfers the leader from a source store to a target store.
func CreateTransferLeaderOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, targetStoreID uint64, targetStoreIDs []uint64, kind OpKind) (*Operator, error) {
func CreateTransferLeaderOperator(desc string, ci sche.SharedCluster, region *core.RegionInfo, targetStoreID uint64, kind OpKind) (*Operator, error) {
return NewBuilder(desc, ci, region, SkipOriginJointStateCheck).
SetLeader(targetStoreID).
SetLeaders(targetStoreIDs).
Build(kind)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/operator/create_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func (suite *createOperatorTestSuite) TestCreateTransferLeaderOperator() {
}
for _, testCase := range testCases {
region := core.NewRegionInfo(&metapb.Region{Id: 1, Peers: testCase.originPeers}, testCase.originPeers[0])
op, err := CreateTransferLeaderOperator("test", suite.cluster, region, testCase.targetLeaderStoreID, []uint64{}, 0)
op, err := CreateTransferLeaderOperator("test", suite.cluster, region, testCase.targetLeaderStoreID, 0)

if testCase.isErr {
re.Error(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func (s *balanceLeaderScheduler) createOperator(solver *solver, collector *plan.
}
solver.Step++
defer func() { solver.Step-- }()
op, err := operator.CreateTransferLeaderOperator(s.GetName(), solver, solver.Region, solver.targetStoreID(), []uint64{}, operator.OpLeader)
op, err := operator.CreateTransferLeaderOperator(s.GetName(), solver, solver.Region, solver.targetStoreID(), operator.OpLeader)
if err != nil {
log.Debug("fail to create balance leader operator", errs.ZapError(err))
if collector != nil {
Expand Down
29 changes: 19 additions & 10 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,19 +361,12 @@ func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCl
filters = append(filters, &filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent})
candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)).
FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...)
// Compatible with old TiKV transfer leader logic.
target := candidates.RandomPick()
targets := candidates.PickAll()
// `targets` MUST contains `target`, so only needs to check if `target` is nil here.
if target == nil {

if len(candidates.Stores) == 0 {
evictLeaderNoTargetStoreCounter.Inc()
continue
}
targetIDs := make([]uint64, 0, len(targets))
for _, t := range targets {
targetIDs = append(targetIDs, t.GetID())
}
op, err := operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), targetIDs, operator.OpLeader)
op, err := createOperatorWithSort(name, cluster, candidates, region)
if err != nil {
log.Debug("fail to create evict leader operator", errs.ZapError(err))
continue
Expand All @@ -385,6 +378,22 @@ func scheduleEvictLeaderOnce(r *rand.Rand, name string, cluster sche.SchedulerCl
return ops
}

func createOperatorWithSort(name string, cluster sche.SchedulerCluster, candidates *filter.StoreCandidates, region *core.RegionInfo) (*operator.Operator, error) {
// we will pick low leader score store firstly.
candidates.Sort(filter.RegionScoreComparer(cluster.GetSharedConfig()))
var (
op *operator.Operator
err error
)
for _, target := range candidates.Stores {
op, err = operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), operator.OpLeader)
if op != nil && err == nil {
return op, err
}
}
return op, err
}

type evictLeaderHandler struct {
rd *render.Render
config *evictLeaderSchedulerConfig
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (s *grantHotRegionScheduler) transfer(cluster sche.SchedulerCluster, region
dstStore := &metapb.Peer{StoreId: destStoreIDs[i]}

if isLeader {
op, err = operator.CreateTransferLeaderOperator(s.GetName()+"-leader", cluster, srcRegion, dstStore.StoreId, []uint64{}, operator.OpLeader)
op, err = operator.CreateTransferLeaderOperator(s.GetName()+"-leader", cluster, srcRegion, dstStore.StoreId, operator.OpLeader)
} else {
op, err = operator.CreateMovePeerOperator(s.GetName()+"-move", cluster, srcRegion, operator.OpRegion|operator.OpLeader, srcStore.GetID(), dstStore)
}
Expand Down
1 change: 0 additions & 1 deletion pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1473,7 +1473,6 @@ func (bs *balanceSolver) createOperator(region *core.RegionInfo, srcStoreID, dst
bs,
region,
dstStoreID,
[]uint64{},
operator.OpHotRegion)
} else {
srcPeer := region.GetStorePeer(srcStoreID) // checked in `filterHotPeers`
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) {
case movePeer:
op, err = operator.CreateMovePeerOperator("move-peer-test", tc, region, operator.OpAdmin, 2, &metapb.Peer{Id: region.GetID()*10000 + 1, StoreId: 4})
case transferLeader:
op, err = operator.CreateTransferLeaderOperator("transfer-leader-test", tc, region, 2, []uint64{}, operator.OpAdmin)
op, err = operator.CreateTransferLeaderOperator("transfer-leader-test", tc, region, 2, operator.OpAdmin)
}
re.NoError(err)
re.NotNil(op)
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (s *labelScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) ([]*ope
continue
}

op, err := operator.CreateTransferLeaderOperator("label-reject-leader", cluster, region, target.GetID(), []uint64{}, operator.OpLeader)
op, err := operator.CreateTransferLeaderOperator("label-reject-leader", cluster, region, target.GetID(), operator.OpLeader)
if err != nil {
log.Debug("fail to create transfer label reject leader operator", errs.ZapError(err))
return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/shuffle_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool)
shuffleLeaderNoFollowerCounter.Inc()
return nil, nil
}
op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, targetStore.GetID(), []uint64{}, operator.OpAdmin)
op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, targetStore.GetID(), operator.OpAdmin)
if err != nil {
log.Debug("fail to create shuffle leader operator", errs.ZapError(err))
return nil, nil
Expand Down
21 changes: 12 additions & 9 deletions pkg/schedule/schedulers/transfer_witness_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,22 @@ func scheduleTransferWitnessLeader(r *rand.Rand, name string, cluster sche.Sched
filters = append(filters, filter.NewExcludedFilter(name, nil, unhealthyPeerStores),
&filter.StoreStateFilter{ActionScope: name, TransferLeader: true, OperatorLevel: constant.Urgent})
candidates := filter.NewCandidates(r, cluster.GetFollowerStores(region)).FilterTarget(cluster.GetSchedulerConfig(), nil, nil, filters...)
// Compatible with old TiKV transfer leader logic.
target := candidates.RandomPick()
targets := candidates.PickAll()
// `targets` MUST contains `target`, so only needs to check if `target` is nil here.
if target == nil {
if len(candidates.Stores) == 0 {
transferWitnessLeaderNoTargetStoreCounter.Inc()
return nil, errors.New("no target store to schedule")
}
targetIDs := make([]uint64, 0, len(targets))
for _, t := range targets {
targetIDs = append(targetIDs, t.GetID())
// TODO: also add sort such as evict leader
var (
op *operator.Operator
err error
)
for _, target := range candidates.Stores {
op, err = operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), operator.OpLeader)
if op != nil && err == nil {
return op, err
}
}
return operator.CreateTransferLeaderOperator(name, cluster, region, target.GetID(), targetIDs, operator.OpWitnessLeader)
return op, err
}

// RecvRegionInfo receives a checked region from coordinator
Expand Down
2 changes: 1 addition & 1 deletion plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (s *evictLeaderScheduler) Schedule(cluster sche.SchedulerCluster, _ bool) (
if target == nil {
continue
}
op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, target.GetID(), []uint64{}, operator.OpLeader)
op, err := operator.CreateTransferLeaderOperator(s.GetName(), cluster, region, target.GetID(), operator.OpLeader)
if err != nil {
log.Debug("fail to create evict leader operator", errs.ZapError(err))
continue
Expand Down
Loading