diff --git a/controllers/reconcile.go b/controllers/reconcile.go index e7f8259..90efcea 100644 --- a/controllers/reconcile.go +++ b/controllers/reconcile.go @@ -23,6 +23,7 @@ import ( kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/storage/names" + "k8s.io/utils/ptr" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/cluster-api/controllers/external" "sigs.k8s.io/cluster-api/util" @@ -141,8 +142,12 @@ func (r *MicroK8sControlPlaneReconciler) reconcileMachines(ctx context.Context, var oldVersion, newVersion string if numMachines > 0 { + var err error sort.Sort(SortByCreationTimestamp(machines)) - oldVersion = semver.MajorMinor(*machines[0].Spec.Version) + oldVersion, err = getOldestVersion(machines) + if err != nil { + return ctrl.Result{}, fmt.Errorf("failed to get oldest version: %w", err) + } newVersion = semver.MajorMinor(mcp.Spec.Version) } @@ -202,50 +207,51 @@ func (r *MicroK8sControlPlaneReconciler) reconcileMachines(ctx context.Context, // For each machine, get the node and upgrade it for _, machine := range machines { + if isMachineUpgraded(machine, newVersion) { + logger.Info("Machine already upgraded", "machine", machine.Name, "version", newVersion) + continue + } + + if machine.Status.NodeRef == nil { + logger.Info("Machine does not have a nodeRef yet, requeueing...", "machine", machine.Name) + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } // Get the node for the machine node, err := kubeclient.CoreV1().Nodes().Get(ctx, machine.Status.NodeRef.Name, metav1.GetOptions{}) if err != nil { - return ctrl.Result{RequeueAfter: 20 * time.Second}, err + return ctrl.Result{}, fmt.Errorf("failed to get node: %w", err) } logger.Info(fmt.Sprintf("Creating upgrade pod on %s...", node.Name)) pod, err := createUpgradePod(ctx, kubeclient, node.Name, mcp.Spec.Version) if err != nil { - logger.Error(err, "Error creating upgrade pod.") + return ctrl.Result{}, fmt.Errorf("failed to create upgrade pod: %w", err) } - logger.Info("Waiting for upgrade node to be updated to the given version...") - err = waitForNodeUpgrade(ctx, kubeclient, node.Name, mcp.Spec.Version) - if err != nil { - logger.Error(err, "Error waiting for node upgrade.") + logger.Info("Waiting for node to be updated to the given version...", "node", node.Name) + if err := waitForNodeUpgrade(ctx, kubeclient, node.Name, mcp.Spec.Version); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to wait for node upgrade: %w", err) } - time.Sleep(10 * time.Second) - - // Get the current machine + logger.Info("Node upgraded successfully.", "node", node.Name) + // Update the machine version currentMachine := &clusterv1.Machine{} currentMachineName := node.Annotations["cluster.x-k8s.io/machine"] - err = r.Client.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: currentMachineName}, currentMachine) - if err != nil { - logger.Error(err, "Error getting machine.") + if err := r.Client.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: currentMachineName}, currentMachine); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to get machine: %w", err) } - // Update the machine version + logger.Info("Updating machine version...", "machine", currentMachine.Name) currentMachine.Spec.Version = &mcp.Spec.Version logger.Info(fmt.Sprintf("Now updating machine %s version to %s...", currentMachine.Name, *currentMachine.Spec.Version)) - err = r.Client.Update(ctx, currentMachine) - if err != nil { - logger.Error(err, "Could not update the machine version. We will retry.") + if err := r.Client.Update(ctx, currentMachine); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to update machine: %w", err) } - time.Sleep(10 * time.Second) - - // wait until pod is deleted logger.Info(fmt.Sprintf("Removing upgrade pod %s from %s...", pod.ObjectMeta.Name, node.Name)) - err = waitForPodDeletion(ctx, kubeclient, pod.ObjectMeta.Name) - if err != nil { - logger.Error(err, "Error waiting for pod deletion.") + if err := waitForPodDeletion(ctx, kubeclient, pod.ObjectMeta.Name); err != nil { + return ctrl.Result{}, fmt.Errorf("failed to wait for pod deletion: %w", err) } logger.Info(fmt.Sprintf("Upgrade of node %s completed.\n", node.Name)) @@ -689,18 +695,25 @@ func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Contex return nil } +// createUpgradePod creates a pod that upgrades the node to the given version. +// If the upgrade pod already exists, it is deleted and a new one will be created. func createUpgradePod(ctx context.Context, kubeclient *kubernetesClient, nodeName string, nodeVersion string) (*corev1.Pod, error) { - nodeVersion = strings.TrimPrefix(semver.MajorMinor(nodeVersion), "v") + podName := "upgrade-pod" - uid := int64(0) - priv := true + // delete the pod if it exists + if err := waitForPodDeletion(ctx, kubeclient, podName); err != nil { + return nil, fmt.Errorf("failed to delete pod %s: %w", podName, err) + } + + nodeVersion = strings.TrimPrefix(semver.MajorMinor(nodeVersion), "v") pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: "upgrade-pod", + Name: podName, }, Spec: corev1.PodSpec{ - NodeName: nodeName, + NodeName: nodeName, + RestartPolicy: corev1.RestartPolicyOnFailure, Containers: []corev1.Container{ { Name: "upgrade", @@ -709,7 +722,7 @@ func createUpgradePod(ctx context.Context, kubeclient *kubernetesClient, nodeNam "su", "-c", }, - SecurityContext: &corev1.SecurityContext{Privileged: &priv, RunAsUser: &uid}, + SecurityContext: &corev1.SecurityContext{Privileged: ptr.To(true), RunAsUser: ptr.To(int64(0))}, Args: []string{ fmt.Sprintf("curl -X POST -H \"Content-Type: application/json\" --unix-socket /run/snapd.socket -d '{\"action\": \"refresh\",\"channel\":\"%s/stable\"}' http://localhost/v2/snaps/microk8s", nodeVersion), }, @@ -736,47 +749,77 @@ func createUpgradePod(ctx context.Context, kubeclient *kubernetesClient, nodeNam pod, err := kubeclient.CoreV1().Pods("default").Create(ctx, pod, metav1.CreateOptions{}) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create pod %s: %w", podName, err) } return pod, nil } func waitForNodeUpgrade(ctx context.Context, kubeclient *kubernetesClient, nodeName, nodeVersion string) error { - // attempt to connect 60 times. With a wait of 10 secs this should be 600 sec = 10 min - attempts := 60 - for attempts > 0 { + for attempts := 100; attempts > 0; attempts-- { node, err := kubeclient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) if err != nil { - return err + return fmt.Errorf("failed to get node %s: %w", nodeName, err) } currentVersion := semver.MajorMinor(node.Status.NodeInfo.KubeletVersion) nodeVersion = semver.MajorMinor(nodeVersion) if strings.HasPrefix(currentVersion, nodeVersion) { - break + return nil } - time.Sleep(10 * time.Second) - attempts-- + + time.Sleep(3 * time.Second) } - return nil + + return fmt.Errorf("timed out waiting for node %s to be upgraded to version %s", nodeName, nodeVersion) } +// waitForPodDeletion waits for the pod to be deleted. If the pod doesn't exist, it returns nil. func waitForPodDeletion(ctx context.Context, kubeclient *kubernetesClient, podName string) error { - for { - gracePeriod := int64(0) + var err error + for attempts := 5; attempts > 0; attempts-- { deleteOptions := metav1.DeleteOptions{ - GracePeriodSeconds: &gracePeriod, + GracePeriodSeconds: ptr.To(int64(0)), } - err := kubeclient.CoreV1().Pods("default").Delete(ctx, podName, deleteOptions) - time.Sleep(10 * time.Second) - if err != nil { - if apierrors.IsNotFound(err) { - break - } - return err - } else { - break + err = kubeclient.CoreV1().Pods("default").Delete(ctx, podName, deleteOptions) + if err == nil || apierrors.IsNotFound(err) { + return nil } + time.Sleep(3 * time.Second) } - return nil + + return fmt.Errorf("timed out waiting for pod %s to be deleted: %w", podName, err) +} + +// getOldestVersion returns the oldest version of the machines. +func getOldestVersion(machines []clusterv1.Machine) (string, error) { + var v string + for _, m := range machines { + if m.Spec.Version == nil { + // weird! + continue + } + + if v == "" { + v = semver.MajorMinor(*m.Spec.Version) + continue + } + + if semver.Compare(v, *m.Spec.Version) > 0 { + v = semver.MajorMinor(*m.Spec.Version) + } + } + + if v == "" { + return "", fmt.Errorf("no version found") + } + return v, nil +} + +func isMachineUpgraded(m clusterv1.Machine, newVersion string) bool { + if m.Spec.Version == nil { + return false + } + machineVersion := semver.MajorMinor(*m.Spec.Version) + newVersion = semver.MajorMinor(newVersion) // just being extra careful + return semver.Compare(machineVersion, newVersion) == 0 } diff --git a/pkg/clusteragent/clusteragent.go b/pkg/clusteragent/clusteragent.go index 7c0d5db..3a85882 100644 --- a/pkg/clusteragent/clusteragent.go +++ b/pkg/clusteragent/clusteragent.go @@ -147,6 +147,13 @@ func (c *Client) do(ctx context.Context, method, endpoint string, header http.He // createPod creates a pod that runs a curl command. func (c *Client) createPod(ctx context.Context, method, endpoint string, header http.Header, data map[string]any) (*corev1.Pod, error) { + podName := fmt.Sprintf(CallerPodNameFormat, c.nodeName) + + // delete the pod if it exists + if err := c.deletePod(ctx, podName); err != nil { + return nil, fmt.Errorf("failed to delete pod: %w", err) + } + curl, err := c.createCURLString(method, endpoint, header, data) if err != nil { return nil, fmt.Errorf("failed to create curl string: %w", err) @@ -156,7 +163,7 @@ func (c *Client) createPod(ctx context.Context, method, endpoint string, header pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf(CallerPodNameFormat, c.nodeName), + Name: podName, }, Spec: corev1.PodSpec{ NodeName: c.nodeName, @@ -211,7 +218,7 @@ func (c *Client) createCURLString(method, endpoint string, header http.Header, d return req, nil } -// deletePod deletes a pod. +// deletePod deletes a pod. It will succeed if the pod doesn't exist. func (c *Client) deletePod(ctx context.Context, podName string) error { deleteOptions := metav1.DeleteOptions{ GracePeriodSeconds: ptr.To(int64(0)), diff --git a/pkg/clusteragent/clusteragent_test.go b/pkg/clusteragent/clusteragent_test.go index ab0c298..44b3d5a 100644 --- a/pkg/clusteragent/clusteragent_test.go +++ b/pkg/clusteragent/clusteragent_test.go @@ -125,57 +125,114 @@ func TestClient(t *testing.T) { } func TestDo(t *testing.T) { - g := NewWithT(t) - - kubeclient := fake.NewSimpleClientset() - nodeName := "node" - nodeAddress := "5.6.7.8" - port := "1234" - method := "POST" - endpoint := "my/endpoint" - dataKey, dataValue := "dkey", "dvalue" - data := map[string]any{ - dataKey: dataValue, - } - headerKey, headerValue := "hkey", "hvalue" - header := map[string][]string{ - headerKey: {headerValue}, - } + t.Run("Success", func(t *testing.T) { + g := NewWithT(t) + + kubeclient := fake.NewSimpleClientset() + nodeName := "node" + nodeAddress := "5.6.7.8" + port := "1234" + method := "POST" + endpoint := "my/endpoint" + dataKey, dataValue := "dkey", "dvalue" + data := map[string]any{ + dataKey: dataValue, + } + headerKey, headerValue := "hkey", "hvalue" + header := map[string][]string{ + headerKey: {headerValue}, + } - c, err := NewClient(kubeclient, newLogger(), []clusterv1.Machine{ - { - Status: clusterv1.MachineStatus{ - NodeRef: &corev1.ObjectReference{ - Name: nodeName, + c, err := NewClient(kubeclient, newLogger(), []clusterv1.Machine{ + { + Status: clusterv1.MachineStatus{ + NodeRef: &corev1.ObjectReference{ + Name: nodeName, + }, + Addresses: clusterv1.MachineAddresses{ + { + Address: nodeAddress, + }, + }, }, - Addresses: clusterv1.MachineAddresses{ - { - Address: nodeAddress, + }, + }, port, Options{SkipSucceededCheck: true, SkipPodCleanup: true}) + + g.Expect(err).ToNot(HaveOccurred()) + + g.Expect(c.do(context.Background(), method, endpoint, header, data)).To(Succeed()) + + pod, err := kubeclient.CoreV1().Pods(DefaultPodNameSpace).Get(context.Background(), fmt.Sprintf(CallerPodNameFormat, nodeName), v1.GetOptions{}) + g.Expect(err).ToNot(HaveOccurred()) + + g.Expect(pod.Spec.NodeName).To(Equal(nodeName)) + g.Expect(pod.Spec.Containers).To(HaveLen(1)) + + container := pod.Spec.Containers[0] + g.Expect(container.Image).To(Equal(images.CurlImage)) + g.Expect(*container.SecurityContext.Privileged).To(BeTrue()) + g.Expect(*container.SecurityContext.RunAsUser).To(Equal(int64(0))) + g.Expect(container.Command).To(HaveLen(3)) + g.Expect(container.Command[2]).To(Equal(fmt.Sprintf( + "curl -k -X %s -H \"%s: %s\" -d '{\"%s\":\"%s\"}' https://%s:%s/%s", + method, headerKey, headerValue, dataKey, dataValue, nodeAddress, port, endpoint, + ))) + }) + + t.Run("PodAlreadyExists", func(t *testing.T) { + g := NewWithT(t) + + nodeName := "node" + podName := fmt.Sprintf(CallerPodNameFormat, nodeName) + kubeclient := fake.NewSimpleClientset(&corev1.Pod{ObjectMeta: v1.ObjectMeta{Name: podName, Namespace: "default"}}) + nodeAddress := "5.6.7.8" + port := "1234" + method := "POST" + endpoint := "my/endpoint" + dataKey, dataValue := "dkey", "dvalue" + data := map[string]any{ + dataKey: dataValue, + } + headerKey, headerValue := "hkey", "hvalue" + header := map[string][]string{ + headerKey: {headerValue}, + } + + c, err := NewClient(kubeclient, newLogger(), []clusterv1.Machine{ + { + Status: clusterv1.MachineStatus{ + NodeRef: &corev1.ObjectReference{ + Name: nodeName, + }, + Addresses: clusterv1.MachineAddresses{ + { + Address: nodeAddress, + }, }, }, }, - }, - }, port, Options{SkipSucceededCheck: true, SkipPodCleanup: true}) + }, port, Options{SkipSucceededCheck: true, SkipPodCleanup: true}) - g.Expect(err).ToNot(HaveOccurred()) + g.Expect(err).ToNot(HaveOccurred()) - g.Expect(c.do(context.Background(), method, endpoint, header, data)).To(Succeed()) + g.Expect(c.do(context.Background(), method, endpoint, header, data)).To(Succeed()) - pod, err := kubeclient.CoreV1().Pods(DefaultPodNameSpace).Get(context.Background(), fmt.Sprintf(CallerPodNameFormat, nodeName), v1.GetOptions{}) - g.Expect(err).ToNot(HaveOccurred()) + pod, err := kubeclient.CoreV1().Pods(DefaultPodNameSpace).Get(context.Background(), fmt.Sprintf(CallerPodNameFormat, nodeName), v1.GetOptions{}) + g.Expect(err).ToNot(HaveOccurred()) - g.Expect(pod.Spec.NodeName).To(Equal(nodeName)) - g.Expect(pod.Spec.Containers).To(HaveLen(1)) + g.Expect(pod.Spec.NodeName).To(Equal(nodeName)) + g.Expect(pod.Spec.Containers).To(HaveLen(1)) - container := pod.Spec.Containers[0] - g.Expect(container.Image).To(Equal(images.CurlImage)) - g.Expect(*container.SecurityContext.Privileged).To(BeTrue()) - g.Expect(*container.SecurityContext.RunAsUser).To(Equal(int64(0))) - g.Expect(container.Command).To(HaveLen(3)) - g.Expect(container.Command[2]).To(Equal(fmt.Sprintf( - "curl -k -X %s -H \"%s: %s\" -d '{\"%s\":\"%s\"}' https://%s:%s/%s", - method, headerKey, headerValue, dataKey, dataValue, nodeAddress, port, endpoint, - ))) + container := pod.Spec.Containers[0] + g.Expect(container.Image).To(Equal(images.CurlImage)) + g.Expect(*container.SecurityContext.Privileged).To(BeTrue()) + g.Expect(*container.SecurityContext.RunAsUser).To(Equal(int64(0))) + g.Expect(container.Command).To(HaveLen(3)) + g.Expect(container.Command[2]).To(Equal(fmt.Sprintf( + "curl -k -X %s -H \"%s: %s\" -d '{\"%s\":\"%s\"}' https://%s:%s/%s", + method, headerKey, headerValue, dataKey, dataValue, nodeAddress, port, endpoint, + ))) + }) } func shuffleMachines(src []clusterv1.Machine) []clusterv1.Machine {