Skip to content

Commit

Permalink
Handle upgrade pod already exists (#73)
Browse files Browse the repository at this point in the history
Fixes: #72
  • Loading branch information
HomayoonAlimohammadi authored Nov 6, 2024
1 parent 14d9900 commit 925ee32
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 94 deletions.
145 changes: 94 additions & 51 deletions controllers/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/utils/ptr"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/external"
"sigs.k8s.io/cluster-api/util"
Expand Down Expand Up @@ -141,8 +142,12 @@ func (r *MicroK8sControlPlaneReconciler) reconcileMachines(ctx context.Context,
var oldVersion, newVersion string

if numMachines > 0 {
var err error
sort.Sort(SortByCreationTimestamp(machines))
oldVersion = semver.MajorMinor(*machines[0].Spec.Version)
oldVersion, err = getOldestVersion(machines)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get oldest version: %w", err)
}
newVersion = semver.MajorMinor(mcp.Spec.Version)
}

Expand Down Expand Up @@ -202,50 +207,51 @@ func (r *MicroK8sControlPlaneReconciler) reconcileMachines(ctx context.Context,

// For each machine, get the node and upgrade it
for _, machine := range machines {
if isMachineUpgraded(machine, newVersion) {
logger.Info("Machine already upgraded", "machine", machine.Name, "version", newVersion)
continue
}

if machine.Status.NodeRef == nil {
logger.Info("Machine does not have a nodeRef yet, requeueing...", "machine", machine.Name)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}

// Get the node for the machine
node, err := kubeclient.CoreV1().Nodes().Get(ctx, machine.Status.NodeRef.Name, metav1.GetOptions{})
if err != nil {
return ctrl.Result{RequeueAfter: 20 * time.Second}, err
return ctrl.Result{}, fmt.Errorf("failed to get node: %w", err)
}

logger.Info(fmt.Sprintf("Creating upgrade pod on %s...", node.Name))
pod, err := createUpgradePod(ctx, kubeclient, node.Name, mcp.Spec.Version)
if err != nil {
logger.Error(err, "Error creating upgrade pod.")
return ctrl.Result{}, fmt.Errorf("failed to create upgrade pod: %w", err)
}

logger.Info("Waiting for upgrade node to be updated to the given version...")
err = waitForNodeUpgrade(ctx, kubeclient, node.Name, mcp.Spec.Version)
if err != nil {
logger.Error(err, "Error waiting for node upgrade.")
logger.Info("Waiting for node to be updated to the given version...", "node", node.Name)
if err := waitForNodeUpgrade(ctx, kubeclient, node.Name, mcp.Spec.Version); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to wait for node upgrade: %w", err)
}

time.Sleep(10 * time.Second)

// Get the current machine
logger.Info("Node upgraded successfully.", "node", node.Name)
// Update the machine version
currentMachine := &clusterv1.Machine{}
currentMachineName := node.Annotations["cluster.x-k8s.io/machine"]
err = r.Client.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: currentMachineName}, currentMachine)
if err != nil {
logger.Error(err, "Error getting machine.")
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: cluster.Namespace, Name: currentMachineName}, currentMachine); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to get machine: %w", err)
}

// Update the machine version
logger.Info("Updating machine version...", "machine", currentMachine.Name)
currentMachine.Spec.Version = &mcp.Spec.Version
logger.Info(fmt.Sprintf("Now updating machine %s version to %s...", currentMachine.Name, *currentMachine.Spec.Version))
err = r.Client.Update(ctx, currentMachine)
if err != nil {
logger.Error(err, "Could not update the machine version. We will retry.")
if err := r.Client.Update(ctx, currentMachine); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to update machine: %w", err)
}

time.Sleep(10 * time.Second)

// wait until pod is deleted
logger.Info(fmt.Sprintf("Removing upgrade pod %s from %s...", pod.ObjectMeta.Name, node.Name))
err = waitForPodDeletion(ctx, kubeclient, pod.ObjectMeta.Name)
if err != nil {
logger.Error(err, "Error waiting for pod deletion.")
if err := waitForPodDeletion(ctx, kubeclient, pod.ObjectMeta.Name); err != nil {
return ctrl.Result{}, fmt.Errorf("failed to wait for pod deletion: %w", err)
}

logger.Info(fmt.Sprintf("Upgrade of node %s completed.\n", node.Name))
Expand Down Expand Up @@ -689,18 +695,25 @@ func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Contex
return nil
}

// createUpgradePod creates a pod that upgrades the node to the given version.
// If the upgrade pod already exists, it is deleted and a new one will be created.
func createUpgradePod(ctx context.Context, kubeclient *kubernetesClient, nodeName string, nodeVersion string) (*corev1.Pod, error) {
nodeVersion = strings.TrimPrefix(semver.MajorMinor(nodeVersion), "v")
podName := "upgrade-pod"

uid := int64(0)
priv := true
// delete the pod if it exists
if err := waitForPodDeletion(ctx, kubeclient, podName); err != nil {
return nil, fmt.Errorf("failed to delete pod %s: %w", podName, err)
}

nodeVersion = strings.TrimPrefix(semver.MajorMinor(nodeVersion), "v")

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "upgrade-pod",
Name: podName,
},
Spec: corev1.PodSpec{
NodeName: nodeName,
NodeName: nodeName,
RestartPolicy: corev1.RestartPolicyOnFailure,
Containers: []corev1.Container{
{
Name: "upgrade",
Expand All @@ -709,7 +722,7 @@ func createUpgradePod(ctx context.Context, kubeclient *kubernetesClient, nodeNam
"su",
"-c",
},
SecurityContext: &corev1.SecurityContext{Privileged: &priv, RunAsUser: &uid},
SecurityContext: &corev1.SecurityContext{Privileged: ptr.To(true), RunAsUser: ptr.To(int64(0))},
Args: []string{
fmt.Sprintf("curl -X POST -H \"Content-Type: application/json\" --unix-socket /run/snapd.socket -d '{\"action\": \"refresh\",\"channel\":\"%s/stable\"}' http://localhost/v2/snaps/microk8s", nodeVersion),
},
Expand All @@ -736,47 +749,77 @@ func createUpgradePod(ctx context.Context, kubeclient *kubernetesClient, nodeNam

pod, err := kubeclient.CoreV1().Pods("default").Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create pod %s: %w", podName, err)
}

return pod, nil
}

func waitForNodeUpgrade(ctx context.Context, kubeclient *kubernetesClient, nodeName, nodeVersion string) error {
// attempt to connect 60 times. With a wait of 10 secs this should be 600 sec = 10 min
attempts := 60
for attempts > 0 {
for attempts := 100; attempts > 0; attempts-- {
node, err := kubeclient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return err
return fmt.Errorf("failed to get node %s: %w", nodeName, err)
}
currentVersion := semver.MajorMinor(node.Status.NodeInfo.KubeletVersion)
nodeVersion = semver.MajorMinor(nodeVersion)
if strings.HasPrefix(currentVersion, nodeVersion) {
break
return nil
}
time.Sleep(10 * time.Second)
attempts--

time.Sleep(3 * time.Second)
}
return nil

return fmt.Errorf("timed out waiting for node %s to be upgraded to version %s", nodeName, nodeVersion)
}

// waitForPodDeletion waits for the pod to be deleted. If the pod doesn't exist, it returns nil.
func waitForPodDeletion(ctx context.Context, kubeclient *kubernetesClient, podName string) error {
for {
gracePeriod := int64(0)
var err error
for attempts := 5; attempts > 0; attempts-- {
deleteOptions := metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriod,
GracePeriodSeconds: ptr.To(int64(0)),
}
err := kubeclient.CoreV1().Pods("default").Delete(ctx, podName, deleteOptions)
time.Sleep(10 * time.Second)
if err != nil {
if apierrors.IsNotFound(err) {
break
}
return err
} else {
break
err = kubeclient.CoreV1().Pods("default").Delete(ctx, podName, deleteOptions)
if err == nil || apierrors.IsNotFound(err) {
return nil
}
time.Sleep(3 * time.Second)
}
return nil

return fmt.Errorf("timed out waiting for pod %s to be deleted: %w", podName, err)
}

// getOldestVersion returns the oldest version of the machines.
func getOldestVersion(machines []clusterv1.Machine) (string, error) {
var v string
for _, m := range machines {
if m.Spec.Version == nil {
// weird!
continue
}

if v == "" {
v = semver.MajorMinor(*m.Spec.Version)
continue
}

if semver.Compare(v, *m.Spec.Version) > 0 {
v = semver.MajorMinor(*m.Spec.Version)
}
}

if v == "" {
return "", fmt.Errorf("no version found")
}
return v, nil
}

func isMachineUpgraded(m clusterv1.Machine, newVersion string) bool {
if m.Spec.Version == nil {
return false
}
machineVersion := semver.MajorMinor(*m.Spec.Version)
newVersion = semver.MajorMinor(newVersion) // just being extra careful
return semver.Compare(machineVersion, newVersion) == 0
}
11 changes: 9 additions & 2 deletions pkg/clusteragent/clusteragent.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ func (c *Client) do(ctx context.Context, method, endpoint string, header http.He

// createPod creates a pod that runs a curl command.
func (c *Client) createPod(ctx context.Context, method, endpoint string, header http.Header, data map[string]any) (*corev1.Pod, error) {
podName := fmt.Sprintf(CallerPodNameFormat, c.nodeName)

// delete the pod if it exists
if err := c.deletePod(ctx, podName); err != nil {
return nil, fmt.Errorf("failed to delete pod: %w", err)
}

curl, err := c.createCURLString(method, endpoint, header, data)
if err != nil {
return nil, fmt.Errorf("failed to create curl string: %w", err)
Expand All @@ -156,7 +163,7 @@ func (c *Client) createPod(ctx context.Context, method, endpoint string, header

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf(CallerPodNameFormat, c.nodeName),
Name: podName,
},
Spec: corev1.PodSpec{
NodeName: c.nodeName,
Expand Down Expand Up @@ -211,7 +218,7 @@ func (c *Client) createCURLString(method, endpoint string, header http.Header, d
return req, nil
}

// deletePod deletes a pod.
// deletePod deletes a pod. It will succeed if the pod doesn't exist.
func (c *Client) deletePod(ctx context.Context, podName string) error {
deleteOptions := metav1.DeleteOptions{
GracePeriodSeconds: ptr.To(int64(0)),
Expand Down
Loading

0 comments on commit 925ee32

Please sign in to comment.