Skip to content

Commit

Permalink
adjust job node affinity to use hostname value (rancher#126)
Browse files Browse the repository at this point in the history
Fix job node affinity for nodes with names that do not match the value
of their `kubernetes.io/hostname` label. With this change, node names
are still reported in the .status.applying slice but jobs are generated
with correct node-affinity via matching on the value of
`kubernetes.io/hostname`.

Fixes rancher#119

Signed-off-by: Jacob Blain Christen <[email protected]>
  • Loading branch information
dweomer authored Aug 13, 2021
1 parent b4e5a67 commit 90e3d08
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 18 deletions.
9 changes: 4 additions & 5 deletions pkg/upgrade/handle_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,14 @@ func (ctl *Controller) handlePlans(ctx context.Context) error {
if !upgradeapiv1.PlanLatestResolved.IsTrue(obj) {
return objects, status, nil
}
concurrentNodeNames, err := upgradeplan.SelectConcurrentNodeNames(obj, nodes.Cache())
concurrentNodes, err := upgradeplan.SelectConcurrentNodes(obj, nodes.Cache())
if err != nil {
return objects, status, err
}
logrus.Debugf("concurrentNodeNames = %q", concurrentNodeNames)
for _, nodeName := range concurrentNodeNames {
objects = append(objects, upgradejob.New(obj, nodeName, ctl.Name))
for _, node := range concurrentNodes {
objects = append(objects, upgradejob.New(obj, node, ctl.Name))
obj.Status.Applying = append(obj.Status.Applying, node.Name)
}
obj.Status.Applying = concurrentNodeNames
return objects, obj.Status, nil
},
&generic.GeneratingHandlerOptions{
Expand Down
13 changes: 12 additions & 1 deletion pkg/upgrade/job/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"testing"

upgradeapiv1 "github.com/rancher/system-upgrade-controller/pkg/apis/upgrade.cattle.io/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)

func TestNew(t *testing.T) {
Expand All @@ -20,7 +23,15 @@ func TestNew(t *testing.T) {
Spec: upgradeapiv1.PlanSpec{Drain: &upgradeapiv1.DrainSpec{SkipWaitForDeleteTimeout: val},
Upgrade: &upgradeapiv1.ContainerSpec{Image: "image"}},
})
job := New(plan, "node1", "ctr")
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Labels: labels.Set{
corev1.LabelHostname: "node1",
},
},
}
job := New(plan, node, "ctr")
t.Logf("%#v", job.Spec.Template.Spec.InitContainers)
for _, container := range job.Spec.Template.Spec.InitContainers {
if container.Name == "drain" {
Expand Down
20 changes: 13 additions & 7 deletions pkg/upgrade/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,25 @@ var (
ConditionFailed = condition.Cond(batchv1.JobFailed)
)

func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job {
func New(plan *upgradeapiv1.Plan, node *corev1.Node, controllerName string) *batchv1.Job {
hostPathDirectory := corev1.HostPathDirectory
labelPlanName := upgradeapi.LabelPlanName(plan.Name)
nodeHostname := node.Name
if node.Labels != nil {
if hostname, ok := node.Labels[corev1.LabelHostname]; ok {
nodeHostname = hostname
}
}
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name.SafeConcatName("apply", plan.Name, "on", nodeName, "with", plan.Status.LatestHash),
Name: name.SafeConcatName("apply", plan.Name, "on", node.Name, "with", plan.Status.LatestHash),
Namespace: plan.Namespace,
Annotations: labels.Set{
upgradeapi.AnnotationTTLSecondsAfterFinished: strconv.FormatInt(int64(TTLSecondsAfterFinished), 10),
},
Labels: labels.Set{
upgradeapi.LabelController: controllerName,
upgradeapi.LabelNode: nodeName,
upgradeapi.LabelNode: node.Name,
upgradeapi.LabelPlan: plan.Name,
upgradeapi.LabelVersion: plan.Status.LatestVersion,
labelPlanName: plan.Status.LatestHash,
Expand All @@ -115,7 +121,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job
ObjectMeta: metav1.ObjectMeta{
Labels: labels.Set{
upgradeapi.LabelController: controllerName,
upgradeapi.LabelNode: nodeName,
upgradeapi.LabelNode: node.Name,
upgradeapi.LabelPlan: plan.Name,
upgradeapi.LabelVersion: plan.Status.LatestVersion,
labelPlanName: plan.Status.LatestHash,
Expand All @@ -135,7 +141,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job
Key: corev1.LabelHostname,
Operator: corev1.NodeSelectorOpIn,
Values: []string{
nodeName,
nodeHostname,
},
}},
}},
Expand Down Expand Up @@ -213,7 +219,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job
// then we cordon/drain
cordon, drain := plan.Spec.Cordon, plan.Spec.Drain
if drain != nil {
args := []string{"drain", nodeName, "--pod-selector", `!` + upgradeapi.LabelController}
args := []string{"drain", node.Name, "--pod-selector", `!` + upgradeapi.LabelController}
if drain.IgnoreDaemonSets == nil || *plan.Spec.Drain.IgnoreDaemonSets {
args = append(args, "--ignore-daemonsets")
}
Expand Down Expand Up @@ -252,7 +258,7 @@ func New(plan *upgradeapiv1.Plan, nodeName, controllerName string) *batchv1.Job
podTemplate.Spec.InitContainers = append(podTemplate.Spec.InitContainers,
upgradectr.New("cordon", upgradeapiv1.ContainerSpec{
Image: KubectlImage,
Args: []string{"cordon", nodeName},
Args: []string{"cordon", node.Name},
},
upgradectr.WithSecrets(plan.Spec.Secrets),
upgradectr.WithPlanEnvironment(plan.Name, plan.Status),
Expand Down
12 changes: 7 additions & 5 deletions pkg/upgrade/plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ func ResolveChannel(ctx context.Context, url, latestVersion, clusterID string) (
return "", fmt.Errorf("unexpected response: %s %s", response.Proto, response.Status)
}

func SelectConcurrentNodeNames(plan *upgradeapiv1.Plan, nodeCache corectlv1.NodeCache) ([]string, error) {
func SelectConcurrentNodes(plan *upgradeapiv1.Plan, nodeCache corectlv1.NodeCache) ([]*corev1.Node, error) {
var (
applying = plan.Status.Applying
selected []string
selected []*corev1.Node
)
nodeSelector, err := metav1.LabelSelectorAsSelector(plan.Spec.NodeSelector)
if err != nil {
Expand All @@ -147,7 +147,7 @@ func SelectConcurrentNodeNames(plan *upgradeapiv1.Plan, nodeCache corectlv1.Node
return nil, err
}
for _, node := range applyingNodes {
selected = append(selected, node.Name)
selected = append(selected, node.DeepCopy())
}
requirementNotApplying, err := labels.NewRequirement(corev1.LabelHostname, selection.NotIn, applying)
if err != nil {
Expand All @@ -173,10 +173,12 @@ func SelectConcurrentNodeNames(plan *upgradeapiv1.Plan, nodeCache corectlv1.Node
})

for i := 0; i < len(candidateNodes) && int64(len(selected)) < plan.Spec.Concurrency; i++ {
selected = append(selected, candidateNodes[i].Name)
selected = append(selected, candidateNodes[i].DeepCopy())
}
}
sort.Strings(selected)
sort.Slice(selected, func(i, j int) bool {
return selected[i].Name < selected[i].Name
})
return selected, nil
}

Expand Down

0 comments on commit 90e3d08

Please sign in to comment.