Skip to content

Commit

Permalink
Don't scale down Longhorn pods before pvmigrate runs (#4868)
Browse files Browse the repository at this point in the history
* don't scale down Longhorn pods before pvmigrate runs
  • Loading branch information
diamonwiggins authored Oct 9, 2023
1 parent a5f03a4 commit bfc4b43
Showing 1 changed file with 0 additions and 280 deletions.
280 changes: 0 additions & 280 deletions pkg/cli/longhorn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@ import (
"fmt"
"log"
"strconv"
"strings"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -25,8 +22,6 @@ import (
lhv1b1 "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta1"
promv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/spf13/cobra"

"github.com/replicatedhq/kurl/pkg/k8sutil"
)

var scaleDownReplicasWaitTime = 5 * time.Minute
Expand Down Expand Up @@ -189,9 +184,6 @@ func NewLonghornPrepareForMigration(cli CLI) *cobra.Command {
}
logger.Print("All Longhorn volumes and nodes are healthy.")

if err := scaleDownPodsUsingLonghorn(cmd.Context(), logger, cli); err != nil {
return fmt.Errorf("error scaling down pods using longhorn volumes: %w", err)
}
logger.Print("Environment is ready for the Longhorn migration.")
return nil
},
Expand Down Expand Up @@ -253,28 +245,6 @@ func scaleUpPodsUsingLonghorn(ctx context.Context, logger *log.Logger, cli clien
return nil
}

// scaleDownPodsUsingLonghorn scales down all pods using Longhorn volumes.
func scaleDownPodsUsingLonghorn(ctx context.Context, logger *log.Logger, cli client.Client) error {
logger.Print("Scaling down pods using Longhorn volumes.")
if err := scaleEkco(ctx, logger, cli, 0); err != nil {
return fmt.Errorf("error scaling down ekco operator: %w", err)
}
if err := scaleDownPrometheus(ctx, logger, cli); err != nil {
return fmt.Errorf("error scaling down prometheus: %w", err)
}
objects, err := getObjectsUsingLonghorn(ctx, cli)
if err != nil {
return fmt.Errorf("error getting objects using longhorn: %w", err)
}
for _, obj := range objects {
if err := scaleDownObject(ctx, logger, cli, obj); err != nil {
return err
}
}
logger.Print("Pods using Longhorn volumes have been scaled down.")
return nil
}

func isPrometheusInstalled(ctx context.Context, cli client.Client) (bool, error) {
nsn := types.NamespacedName{Name: prometheusNamespace}
if err := cli.Get(ctx, nsn, &corev1.Namespace{}); err != nil {
Expand All @@ -286,67 +256,6 @@ func isPrometheusInstalled(ctx context.Context, cli client.Client) (bool, error)
return true, nil
}

// scaleDownPrometheus scales down prometheus.
func scaleDownPrometheus(ctx context.Context, logger *log.Logger, cli client.Client) error {
if installed, err := isPrometheusInstalled(ctx, cli); err != nil {
return fmt.Errorf("error scaling down prometheus: %w", err)
} else if !installed {
return nil
}

nsn := types.NamespacedName{Namespace: prometheusNamespace, Name: prometheusName}
var prometheus promv1.Prometheus
if err := cli.Get(ctx, nsn, &prometheus); err != nil {
if errors.IsNotFound(err) {
return nil
}
return fmt.Errorf("error getting prometheus: %w", err)
}

patch := map[string]interface{}{
"spec": map[string]interface{}{
"replicas": 0,
},
}
if _, ok := prometheus.Annotations[pvmigrateScaleDownAnnotation]; !ok {
promReplicas := int32(0)
if prometheus.Spec.Replicas != nil {
promReplicas = *prometheus.Spec.Replicas
}
patch["metadata"] = map[string]interface{}{
"annotations": map[string]string{
pvmigrateScaleDownAnnotation: fmt.Sprintf("%d", promReplicas),
},
}
}

rawPatch, err := json.Marshal(patch)
if err != nil {
return fmt.Errorf("error creating prometheus patch: %w", err)
}
if err := cli.Patch(ctx, &prometheus, client.RawPatch(types.MergePatchType, rawPatch)); err != nil {
return fmt.Errorf("error scaling prometheus: %w", err)
}

var st appsv1.StatefulSet
if err := wait.PollUntilContextTimeout(ctx, 3*time.Second, 5*time.Minute, true, func(ctx2 context.Context) (bool, error) {
nsn = types.NamespacedName{Namespace: prometheusNamespace, Name: prometheusStatefulSetName}
if err := cli.Get(ctx2, nsn, &st); err != nil {
return false, fmt.Errorf("error getting prometheus statefulset: %w", err)
}
return st.Status.Replicas == 0 && st.Status.UpdatedReplicas == 0, nil
}); err != nil {
return fmt.Errorf("error waiting for prometheus statefulset to scale: %w", err)
}

logger.Print("Waiting for prometheus StatefulSet to scale down.")
selector := labels.SelectorFromSet(st.Spec.Selector.MatchLabels)
if err := waitForPodsToBeScaledDown(ctx, logger, cli, ekcoNamespace, selector); err != nil {
return fmt.Errorf("error waiting for prometheus to scale down: %w", err)
}
return nil
}

// scaleUpPrometheus scales up prometheus.
func scaleUpPrometheus(ctx context.Context, cli client.Client) error {
if installed, err := isPrometheusInstalled(ctx, cli); err != nil {
Expand Down Expand Up @@ -437,195 +346,6 @@ func waitForPodsToBeScaledDown(ctx context.Context, logger *log.Logger, cli clie
})
}

// scaleDownObject scales down a deployment or a statefulset to 0 replicas.
func scaleDownObject(ctx context.Context, logger *log.Logger, cli client.Client, obj client.Object) error {
kind := strings.ToLower(fmt.Sprintf("%T", obj))
logger.Printf("Scaling down %s %s/%s", kind, obj.GetNamespace(), obj.GetName())

var selector labels.Selector
var replicas *int32
switch concrete := obj.(type) {
case *appsv1.Deployment:
replicas = concrete.Spec.Replicas
concrete.Spec.Replicas = ptr.To(int32(0))
selector = labels.SelectorFromSet(concrete.Spec.Selector.MatchLabels)
case *appsv1.StatefulSet:
replicas = concrete.Spec.Replicas
concrete.Spec.Replicas = ptr.To(int32(0))
selector = labels.SelectorFromSet(concrete.Spec.Selector.MatchLabels)
default:
return fmt.Errorf("unsupported object type %T", obj)
}

if replicas == nil {
replicas = ptr.To(int32(0))
}

annotations := obj.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
if _, ok := annotations[pvmigrateScaleDownAnnotation]; !ok {
annotations[pvmigrateScaleDownAnnotation] = fmt.Sprintf("%d", *replicas)
obj.SetAnnotations(annotations)
}

if err := cli.Update(ctx, obj); err != nil {
return fmt.Errorf("error scaling down %s %s/%s: %w", kind, obj.GetNamespace(), obj.GetName(), err)
}
if err := waitForPodsToBeScaledDown(ctx, logger, cli, obj.GetNamespace(), selector); err != nil {
return fmt.Errorf("error waiting for %s %s/%s to scale down: %w", kind, obj.GetNamespace(), obj.GetName(), err)
}
return nil
}

// getObjectsUsingLonghorn returns all objects that use Longhorn volumes. Only deployments, statefulsets,
// and replicasets are supported (as those are the only types supported by pvmigrate).
func getObjectsUsingLonghorn(ctx context.Context, cli client.Client) ([]client.Object, error) {
pods, err := getPodsUsingLonghorn(ctx, cli)
if err != nil {
return nil, fmt.Errorf("error getting pods using longhorn: %w", err)
}
var objects []client.Object
controllers := make(map[string]client.Object)
for _, pod := range pods {
ctrl := metav1.GetControllerOf(&pod)
if ctrl == nil {
return nil, fmt.Errorf("pod %s/%s has no owners and can't be migrated", pod.Name, pod.Namespace)
}
uid, obj, err := getOwnerObject(ctx, cli, pod.Namespace, *ctrl)
if err != nil {
return nil, fmt.Errorf("error getting owner object for pod %s/%s: %w", pod.Namespace, pod.Name, err)
}
controllers[*uid] = obj
}

// convert map to slice
for _, obj := range controllers {
objects = append(objects, obj)
}

return objects, nil
}

// getOwnerObject returns the object referred by the provided owner reference. Only deployments, statefulsets,
// and replicasets are supported (as those are the only types supported by pvmigrate).
func getOwnerObject(ctx context.Context, cli client.Client, namespace string, owner metav1.OwnerReference) (*string, client.Object, error) {
switch owner.Kind {
case "StatefulSet":
nsn := types.NamespacedName{Namespace: namespace, Name: owner.Name}
var sset appsv1.StatefulSet
if err := cli.Get(ctx, nsn, &sset); err != nil {
return nil, nil, fmt.Errorf("error getting statefulset %s: %w", nsn, err)
}
return (*string)(&sset.UID), &sset, nil
case "ReplicaSet":
nsn := types.NamespacedName{Namespace: namespace, Name: owner.Name}
var rset appsv1.ReplicaSet
if err := cli.Get(ctx, nsn, &rset); err != nil {
return nil, nil, fmt.Errorf("error getting replicaset %s: %w", nsn, err)
}
ctrl := metav1.GetControllerOf(&rset)
if ctrl == nil {
return nil, nil, fmt.Errorf("no owner found for replicaset %s/%s", owner.Name, namespace)
} else if ctrl.Kind != "Deployment" {
return nil, nil, fmt.Errorf(
"expected replicaset %s in %s to have a deployment as owner, found %s instead",
owner.Name, namespace, ctrl.Kind,
)
}
nsn = types.NamespacedName{Namespace: namespace, Name: ctrl.Name}
var deployment appsv1.Deployment
if err := cli.Get(ctx, nsn, &deployment); err != nil {
return nil, nil, fmt.Errorf("error getting deployment %s: %w", nsn, err)
}
return (*string)(&deployment.UID), &deployment, nil
default:
return nil, nil, fmt.Errorf(
"scaling pods controlled by a %s is not supported, please delete the pods controlled by "+
"%s in %s before retrying", owner.Kind, owner.Kind, namespace,
)
}
}

// getPodsUsingLonghorn returns all pods that mount Longhorn volumes.
func getPodsUsingLonghorn(ctx context.Context, cli client.Client) ([]corev1.Pod, error) {
pvcs, err := getLonghornPersistenVolumeClaims(ctx, cli)
if err != nil {
return nil, fmt.Errorf("error getting longhorn persistent volume claims: %w", err)
}
var pods corev1.PodList
if err := cli.List(ctx, &pods); err != nil {
return nil, fmt.Errorf("error listing pods: %w", err)
}
var podsUsingLonghorn []corev1.Pod
for _, pvc := range pvcs {
for _, pod := range pods.Items {
if k8sutil.PodHasPVC(pod, pvc.Namespace, pvc.Name) {
podsUsingLonghorn = append(podsUsingLonghorn, pod)
}
}
}
return podsUsingLonghorn, nil
}

// getLonghornPersistenVolumeClaims returns all persistent volume claims that are claiming Longhorn volumes.
func getLonghornPersistenVolumeClaims(ctx context.Context, cli client.Client) ([]corev1.PersistentVolumeClaim, error) {
pvs, err := getLonghornPersistenVolumes(ctx, cli)
if err != nil {
return nil, fmt.Errorf("error getting longhorn persistent volumes: %w", err)
}
var pvcs []corev1.PersistentVolumeClaim
for _, pv := range pvs {
if pv.Spec.ClaimRef == nil {
return nil, fmt.Errorf("pv %s does not have an associated pvc, resolve this before rerunning", pv.Name)
}
nsn := types.NamespacedName{Namespace: pv.Spec.ClaimRef.Namespace, Name: pv.Spec.ClaimRef.Name}
var pvc corev1.PersistentVolumeClaim
if err := cli.Get(ctx, nsn, &pvc); err != nil {
return nil, fmt.Errorf("error getting persistent volume claim %s: %w", nsn, err)
}
pvcs = append(pvcs, pvc)
}
return pvcs, nil
}

// getLonghornPersistenVolumes returns all persistent volumes that are backed by Longhorn.
func getLonghornPersistenVolumes(ctx context.Context, cli client.Client) ([]corev1.PersistentVolume, error) {
storageClasses, err := getLonghornStorageClasses(ctx, cli)
if err != nil {
return nil, fmt.Errorf("error getting longhorn storage classes: %w", err)
}
var allPVs corev1.PersistentVolumeList
if err := cli.List(ctx, &allPVs); err != nil {
return nil, fmt.Errorf("error listing persistent volumes: %w", err)
}
var longhornPVs []corev1.PersistentVolume
for _, pv := range allPVs.Items {
for _, storageClass := range storageClasses {
if pv.Spec.StorageClassName == storageClass.Name {
longhornPVs = append(longhornPVs, pv)
}
}
}
return longhornPVs, nil
}

// getLonghornStorageClasses returns all storage classes that use the Longhorn provisioner.
func getLonghornStorageClasses(ctx context.Context, cli client.Client) ([]storagev1.StorageClass, error) {
var storageClasses storagev1.StorageClassList
if err := cli.List(ctx, &storageClasses); err != nil {
return nil, fmt.Errorf("error listing storage classes: %w", err)
}
var longhornStorageClasses []storagev1.StorageClass
for _, storageClass := range storageClasses.Items {
if strings.Contains(storageClass.Provisioner, "longhorn") {
longhornStorageClasses = append(longhornStorageClasses, storageClass)
}
}
return longhornStorageClasses, nil
}

// scaleDownReplicas scales down the number of replicas for all volumes to 1. Returns a bool indicating if any
// of the volumes were scaled down.
func scaleDownReplicas(ctx context.Context, logger *log.Logger, cli client.Client) (bool, error) {
Expand Down

0 comments on commit bfc4b43

Please sign in to comment.