Skip to content

Commit

Permalink
Merge pull request kubernetes#6164 from artemvmin/scale-down-drainabi…
Browse files Browse the repository at this point in the history
…lity

Convert scale-down checks to drainability rules
  • Loading branch information
k8s-ci-robot authored Oct 11, 2023
2 parents e7bf3ec + 33e300f commit 133fdc7
Show file tree
Hide file tree
Showing 35 changed files with 2,809 additions and 1,401 deletions.
10 changes: 10 additions & 0 deletions cluster-autoscaler/core/scaledown/pdb/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ func (t *basicRemainingPdbTracker) GetPdbs() []*policyv1.PodDisruptionBudget {
return pdbs
}

func (t *basicRemainingPdbTracker) MatchingPdbs(pod *apiv1.Pod) []*policyv1.PodDisruptionBudget {
var pdbs []*policyv1.PodDisruptionBudget
for _, pdbInfo := range t.pdbInfos {
if pod.Namespace == pdbInfo.pdb.Namespace && pdbInfo.selector.Matches(labels.Set(pod.Labels)) {
pdbs = append(pdbs, pdbInfo.pdb)
}
}
return pdbs
}

func (t *basicRemainingPdbTracker) CanRemovePods(pods []*apiv1.Pod) (canRemove, inParallel bool, blockingPod *drain.BlockingPod) {
inParallel = true
for _, pdbInfo := range t.pdbInfos {
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/core/scaledown/pdb/pdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type RemainingPdbTracker interface {
SetPdbs(pdbs []*policyv1.PodDisruptionBudget) error
// GetPdbs returns the current remaining PDBs.
GetPdbs() []*policyv1.PodDisruptionBudget
// MatchingPdbs returns all PDBs matching the pod.
MatchingPdbs(pod *apiv1.Pod) []*policyv1.PodDisruptionBudget

// CanRemovePods checks if the set of pods can be removed.
// inParallel indicates if the pods can be removed in parallel. If it is false
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (m *onNodeGroupDeleteMock) Delete(id string) error {

func setUpScaleDownActuator(ctx *context.AutoscalingContext, autoscalingOptions config.AutoscalingOptions) {
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(), NewTestProcessors(ctx).NodeGroupConfigProcessor)
ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(deleteOptions), NewTestProcessors(ctx).NodeGroupConfigProcessor)
}

func TestStaticAutoscalerRunOnce(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
if autoscalingOptions.ParallelDrain {
sdCandidatesSorting := previouscandidates.NewPreviousCandidates()
scaleDownCandidatesComparers = []scaledowncandidates.CandidatesComparer{
emptycandidates.NewEmptySortingProcessor(emptycandidates.NewNodeInfoGetter(opts.ClusterSnapshot), deleteOptions, rules.Default()),
emptycandidates.NewEmptySortingProcessor(emptycandidates.NewNodeInfoGetter(opts.ClusterSnapshot), deleteOptions, rules.Default(deleteOptions)),
sdCandidatesSorting,
}
opts.Processors.ScaleDownCandidatesNotifier.Register(sdCandidatesSorting)
Expand Down
9 changes: 3 additions & 6 deletions cluster-autoscaler/simulator/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ func (r *RemovalSimulator) FindNodesToRemove(
timestamp time.Time,
remainingPdbTracker pdb.RemainingPdbTracker,
) (nodesToRemove []NodeToBeRemoved, unremovableNodes []*UnremovableNode) {
result := make([]NodeToBeRemoved, 0)
unremovable := make([]*UnremovableNode, 0)

destinationMap := make(map[string]bool, len(destinations))
for _, destination := range destinations {
destinationMap[destination] = true
Expand All @@ -134,12 +131,12 @@ func (r *RemovalSimulator) FindNodesToRemove(
for _, nodeName := range candidates {
rn, urn := r.SimulateNodeRemoval(nodeName, destinationMap, timestamp, remainingPdbTracker)
if rn != nil {
result = append(result, *rn)
nodesToRemove = append(nodesToRemove, *rn)
} else if urn != nil {
unremovable = append(unremovable, urn)
unremovableNodes = append(unremovableNodes, urn)
}
}
return result, unremovable
return nodesToRemove, unremovableNodes
}

// SimulateNodeRemoval simulates removing a node from the cluster to check
Expand Down
37 changes: 12 additions & 25 deletions cluster-autoscaler/simulator/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,11 @@ func TestFindNodesToRemove(t *testing.T) {
fullNodeInfo.AddPod(pod4)

emptyNodeToRemove := NodeToBeRemoved{
Node: emptyNode,
PodsToReschedule: []*apiv1.Pod{},
DaemonSetPods: []*apiv1.Pod{},
Node: emptyNode,
}
drainableNodeToRemove := NodeToBeRemoved{
Node: drainableNode,
PodsToReschedule: []*apiv1.Pod{pod1, pod2},
DaemonSetPods: []*apiv1.Pod{},
}

clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
Expand All @@ -152,25 +149,19 @@ func TestFindNodesToRemove(t *testing.T) {
tracker := NewUsageTracker()

tests := []findNodesToRemoveTestConfig{
// just an empty node, should be removed
{
name: "just an empty node, should be removed",
pods: []*apiv1.Pod{},
candidates: []string{emptyNode.Name},
allNodes: []*apiv1.Node{emptyNode},
toRemove: []NodeToBeRemoved{emptyNodeToRemove},
unremovable: []*UnremovableNode{},
name: "just an empty node, should be removed",
candidates: []string{emptyNode.Name},
allNodes: []*apiv1.Node{emptyNode},
toRemove: []NodeToBeRemoved{emptyNodeToRemove},
},
// just a drainable node, but nowhere for pods to go to
{
name: "just a drainable node, but nowhere for pods to go to",
pods: []*apiv1.Pod{pod1, pod2},
candidates: []string{drainableNode.Name},
allNodes: []*apiv1.Node{drainableNode},
toRemove: []NodeToBeRemoved{},
unremovable: []*UnremovableNode{{Node: drainableNode, Reason: NoPlaceToMovePods}},
},
// drainable node, and a mostly empty node that can take its pods
{
name: "drainable node, and a mostly empty node that can take its pods",
pods: []*apiv1.Pod{pod1, pod2, pod3},
Expand All @@ -179,23 +170,19 @@ func TestFindNodesToRemove(t *testing.T) {
toRemove: []NodeToBeRemoved{drainableNodeToRemove},
unremovable: []*UnremovableNode{{Node: nonDrainableNode, Reason: BlockedByPod, BlockingPod: &drain.BlockingPod{Pod: pod3, Reason: drain.NotReplicated}}},
},
// drainable node, and a full node that cannot fit anymore pods
{
name: "drainable node, and a full node that cannot fit anymore pods",
pods: []*apiv1.Pod{pod1, pod2, pod4},
candidates: []string{drainableNode.Name},
allNodes: []*apiv1.Node{drainableNode, fullNode},
toRemove: []NodeToBeRemoved{},
unremovable: []*UnremovableNode{{Node: drainableNode, Reason: NoPlaceToMovePods}},
},
// 4 nodes, 1 empty, 1 drainable
{
name: "4 nodes, 1 empty, 1 drainable",
pods: []*apiv1.Pod{pod1, pod2, pod3, pod4},
candidates: []string{emptyNode.Name, drainableNode.Name},
allNodes: []*apiv1.Node{emptyNode, drainableNode, fullNode, nonDrainableNode},
toRemove: []NodeToBeRemoved{emptyNodeToRemove, drainableNodeToRemove},
unremovable: []*UnremovableNode{},
name: "4 nodes, 1 empty, 1 drainable",
pods: []*apiv1.Pod{pod1, pod2, pod3, pod4},
candidates: []string{emptyNode.Name, drainableNode.Name},
allNodes: []*apiv1.Node{emptyNode, drainableNode, fullNode, nonDrainableNode},
toRemove: []NodeToBeRemoved{emptyNodeToRemove, drainableNodeToRemove},
},
}

Expand All @@ -209,8 +196,8 @@ func TestFindNodesToRemove(t *testing.T) {
r := NewRemovalSimulator(registry, clusterSnapshot, predicateChecker, tracker, testDeleteOptions(), nil, false)
toRemove, unremovable := r.FindNodesToRemove(test.candidates, destinations, time.Now(), nil)
fmt.Printf("Test scenario: %s, found len(toRemove)=%v, expected len(test.toRemove)=%v\n", test.name, len(toRemove), len(test.toRemove))
assert.Equal(t, toRemove, test.toRemove)
assert.Equal(t, unremovable, test.unremovable)
assert.Equal(t, test.toRemove, toRemove)
assert.Equal(t, test.unremovable, unremovable)
})
}
}
Expand Down
57 changes: 16 additions & 41 deletions cluster-autoscaler/simulator/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package simulator

import (
"fmt"
"time"

apiv1 "k8s.io/api/core/v1"
Expand All @@ -31,66 +30,42 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

// GetPodsToMove returns a list of pods that should be moved elsewhere
// and a list of DaemonSet pods that should be evicted if the node
// is drained. Raises error if there is an unreplicated pod.
// Based on kubectl drain code. If listers is nil it makes an assumption that RC, DS, Jobs and RS were deleted
// along with their pods (no abandoned pods with dangling created-by annotation).
// If listers is not nil it checks whether RC, DS, Jobs and RS that created these pods
// still exist.
// TODO(x13n): Rewrite GetPodsForDeletionOnNodeDrain into a set of DrainabilityRules.
// GetPodsToMove returns a list of pods that should be moved elsewhere and a
// list of DaemonSet pods that should be evicted if the node is drained.
// Raises error if there is an unreplicated pod.
// Based on kubectl drain code. If listers is nil it makes an assumption that
// RC, DS, Jobs and RS were deleted along with their pods (no abandoned pods
// with dangling created-by annotation).
// If listers is not nil it checks whether RC, DS, Jobs and RS that created
// these pods still exist.
func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, listers kube_util.ListerRegistry, remainingPdbTracker pdb.RemainingPdbTracker, timestamp time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *drain.BlockingPod, err error) {
var drainPods, drainDs []*apiv1.Pod
if drainabilityRules == nil {
drainabilityRules = rules.Default()
drainabilityRules = rules.Default(deleteOptions)
}
if remainingPdbTracker == nil {
remainingPdbTracker = pdb.NewBasicRemainingPdbTracker()
}
drainCtx := &drainability.DrainContext{
RemainingPdbTracker: remainingPdbTracker,
DeleteOptions: deleteOptions,
Listers: listers,
Timestamp: timestamp,
}
for _, podInfo := range nodeInfo.Pods {
pod := podInfo.Pod
status := drainabilityRules.Drainable(drainCtx, pod)
switch status.Outcome {
case drainability.UndefinedOutcome:
pods = append(pods, podInfo.Pod)
case drainability.DrainOk:
case drainability.UndefinedOutcome, drainability.DrainOk:
if pod_util.IsDaemonSetPod(pod) {
drainDs = append(drainDs, pod)
daemonSetPods = append(daemonSetPods, pod)
} else {
drainPods = append(drainPods, pod)
pods = append(pods, pod)
}
case drainability.BlockDrain:
blockingPod = &drain.BlockingPod{
return nil, nil, &drain.BlockingPod{
Pod: pod,
Reason: status.BlockingReason,
}
err = status.Error
return
}, status.Error
}
}

pods, daemonSetPods, blockingPod, err = drain.GetPodsForDeletionOnNodeDrain(
pods,
remainingPdbTracker.GetPdbs(),
deleteOptions.SkipNodesWithSystemPods,
deleteOptions.SkipNodesWithLocalStorage,
deleteOptions.SkipNodesWithCustomControllerPods,
listers,
int32(deleteOptions.MinReplicaCount),
timestamp)
pods = append(pods, drainPods...)
daemonSetPods = append(daemonSetPods, drainDs...)
if err != nil {
return pods, daemonSetPods, blockingPod, err
}
if canRemove, _, blockingPodInfo := remainingPdbTracker.CanRemovePods(pods); !canRemove {
pod := blockingPodInfo.Pod
return []*apiv1.Pod{}, []*apiv1.Pod{}, blockingPodInfo, fmt.Errorf("not enough pod disruption budget to move %s/%s", pod.Namespace, pod.Name)
}

return pods, daemonSetPods, nil, nil
}
Loading

0 comments on commit 133fdc7

Please sign in to comment.