From 90e3d082a41fefaf09d14286b880c54a28af6c21 Mon Sep 17 00:00:00 2001 From: Jacob Blain Christen Date: Fri, 13 Aug 2021 09:24:45 -0700 Subject: [PATCH] adjust job node affinity to use hostname value (#126) Fix job node affinity for nodes with names that do not match the value of their `kubernetes.io/hostname` label. With this change, node names are still reported in the .status.applying slice but jobs are generated with correct node-affinity via matching on the value of `kubernetes.io/hostname`. Fixes #119 Signed-off-by: Jacob Blain Christen --- pkg/upgrade/handle_upgrade.go | 9 ++++----- pkg/upgrade/job/flags_test.go | 13 ++++++++++++- pkg/upgrade/job/job.go | 20 +++++++++++++------- pkg/upgrade/plan/plan.go | 12 +++++++----- 4 files changed, 36 insertions(+), 18 deletions(-) diff --git a/pkg/upgrade/handle_upgrade.go b/pkg/upgrade/handle_upgrade.go index f297cffc..260b83b0 100644 --- a/pkg/upgrade/handle_upgrade.go +++ b/pkg/upgrade/handle_upgrade.go @@ -68,15 +68,14 @@ func (ctl *Controller) handlePlans(ctx context.Context) error { if !upgradeapiv1.PlanLatestResolved.IsTrue(obj) { return objects, status, nil } - concurrentNodeNames, err := upgradeplan.SelectConcurrentNodeNames(obj, nodes.Cache()) + concurrentNodes, err := upgradeplan.SelectConcurrentNodes(obj, nodes.Cache()) if err != nil { return objects, status, err } - logrus.Debugf("concurrentNodeNames = %q", concurrentNodeNames) - for _, nodeName := range concurrentNodeNames { - objects = append(objects, upgradejob.New(obj, nodeName, ctl.Name)) + for _, node := range concurrentNodes { + objects = append(objects, upgradejob.New(obj, node, ctl.Name)) + obj.Status.Applying = append(obj.Status.Applying, node.Name) } - obj.Status.Applying = concurrentNodeNames return objects, obj.Status, nil }, &generic.GeneratingHandlerOptions{ diff --git a/pkg/upgrade/job/flags_test.go b/pkg/upgrade/job/flags_test.go index 12b0674f..3ee7d725 100644 --- a/pkg/upgrade/job/flags_test.go +++ b/pkg/upgrade/job/flags_test.go @@ -5,6 +5,9 @@ import ( "testing" upgradeapiv1 "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" ) func TestNew(t *testing.T) { @@ -20,7 +23,15 @@ func TestNew(t *testing.T) { Spec: upgradeapiv1.PlanSpec{Drain: &upgradeapiv1.DrainSpec{SkipWaitForDeleteTimeout: val}, Upgrade: &upgradeapiv1.ContainerSpec{Image: "image"}}, }) - job := New(plan, "node1", "ctr") + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: labels.Set{ + corev1.LabelHostname: "node1", + }, + }, + } + job := New(plan, node, "ctr") t.Logf("%#v", job.Spec.Template.Spec.InitContainers) for _, container := range job.Spec.Template.Spec.InitContainers { if container.Name == "drain" { diff --git a/pkg/upgrade/job/job.go b/pkg/upgrade/job/job.go index 81cdb579..ed060e10 100644 --- a/pkg/upgrade/job/job.go +++ b/pkg/upgrade/job/job.go @@ -90,19 +90,25 @@ var ( ConditionFailed = condition.Cond(batchv1.JobFailed) ) -func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job { +func New(plan *upgradeapiv1.Plan, node *corev1.Node, controllerName string) *batchv1.Job { hostPathDirectory := corev1.HostPathDirectory labelPlanName := upgradeapi.LabelPlanName(plan.Name) + nodeHostname := node.Name + if node.Labels != nil { + if hostname, ok := node.Labels[corev1.LabelHostname]; ok { + nodeHostname = hostname + } + } job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ - Name: name.SafeConcatName("apply", plan.Name, "on", nodeName, "with", plan.Status.LatestHash), + Name: name.SafeConcatName("apply", plan.Name, "on", node.Name, "with", plan.Status.LatestHash), Namespace: plan.Namespace, Annotations: labels.Set{ upgradeapi.AnnotationTTLSecondsAfterFinished: strconv.FormatInt(int64(TTLSecondsAfterFinished), 10), }, Labels: labels.Set{ upgradeapi.LabelController: controllerName, - upgradeapi.LabelNode: nodeName, + upgradeapi.LabelNode: node.Name, upgradeapi.LabelPlan: plan.Name, upgradeapi.LabelVersion: plan.Status.LatestVersion, labelPlanName: plan.Status.LatestHash, @@ -115,7 +121,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job ObjectMeta: metav1.ObjectMeta{ Labels: labels.Set{ upgradeapi.LabelController: controllerName, - upgradeapi.LabelNode: nodeName, + upgradeapi.LabelNode: node.Name, upgradeapi.LabelPlan: plan.Name, upgradeapi.LabelVersion: plan.Status.LatestVersion, labelPlanName: plan.Status.LatestHash, @@ -135,7 +141,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job Key: corev1.LabelHostname, Operator: corev1.NodeSelectorOpIn, Values: []string{ - nodeName, + nodeHostname, }, }}, }}, @@ -213,7 +219,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job // then we cordon/drain cordon, drain := plan.Spec.Cordon, plan.Spec.Drain if drain != nil { - args := []string{"drain", nodeName, "--pod-selector", `!` + upgradeapi.LabelController} + args := []string{"drain", node.Name, "--pod-selector", `!` + upgradeapi.LabelController} if drain.IgnoreDaemonSets == nil || *plan.Spec.Drain.IgnoreDaemonSets { args = append(args, "--ignore-daemonsets") } @@ -252,7 +258,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job podTemplate.Spec.InitContainers = append(podTemplate.Spec.InitContainers, upgradectr.New("cordon", upgradeapiv1.ContainerSpec{ Image: KubectlImage, - Args: []string{"cordon", nodeName}, + Args: []string{"cordon", node.Name}, }, upgradectr.WithSecrets(plan.Spec.Secrets), upgradectr.WithPlanEnvironment(plan.Name, plan.Status), diff --git a/pkg/upgrade/plan/plan.go b/pkg/upgrade/plan/plan.go index d16fb5d8..b93fdda7 100644 --- a/pkg/upgrade/plan/plan.go +++ b/pkg/upgrade/plan/plan.go @@ -123,10 +123,10 @@ func ResolveChannel(ctx context.Context, url, latestVersion, clusterID string) ( return "", fmt.Errorf("unexpected response: %s %s", response.Proto, response.Status) } -func SelectConcurrentNodeNames(plan *upgradeapiv1.Plan, nodeCache corectlv1.NodeCache) ([]string, error) { +func SelectConcurrentNodes(plan *upgradeapiv1.Plan, nodeCache corectlv1.NodeCache) ([]*corev1.Node, error) { var ( applying = plan.Status.Applying - selected []string + selected []*corev1.Node ) nodeSelector, err := metav1.LabelSelectorAsSelector(plan.Spec.NodeSelector) if err != nil { @@ -147,7 +147,7 @@ func SelectConcurrentNodeNames(plan *upgradeapiv1.Plan, nodeCache corectlv1.Node return nil, err } for _, node := range applyingNodes { - selected = append(selected, node.Name) + selected = append(selected, node.DeepCopy()) } requirementNotApplying, err := labels.NewRequirement(corev1.LabelHostname, selection.NotIn, applying) if err != nil { @@ -173,10 +173,12 @@ func SelectConcurrentNodeNames(plan *upgradeapiv1.Plan, nodeCache corectlv1.Node }) for i := 0; i < len(candidateNodes) && int64(len(selected)) < plan.Spec.Concurrency; i++ { - selected = append(selected, candidateNodes[i].Name) + selected = append(selected, candidateNodes[i].DeepCopy()) } } - sort.Strings(selected) + sort.Slice(selected, func(i, j int) bool { + return selected[i].Name < selected[i].Name + }) return selected, nil }