From 3210ff23f4e61ef0eb685683acdbcb5792a6dda3 Mon Sep 17 00:00:00 2001 From: Andrew Lavery Date: Sat, 16 Jul 2022 18:04:50 -0400 Subject: [PATCH 1/2] refactor getPVCs to take the full options object also factor out the 'createDestinationPVCs' component into its own function --- Makefile | 2 +- pkg/migrate/migrate.go | 34 ++++++++++++++++++++++------------ pkg/migrate/migrate_test.go | 2 +- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/Makefile b/Makefile index fc40e4a..cd477f3 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,7 @@ clean: .PHONY: deps deps: - go get golang.org/x/lint/golint + go install golang.org/x/lint/golint .PHONY: lint lint: deps diff --git a/pkg/migrate/migrate.go b/pkg/migrate/migrate.go index 06d38f1..cc65840 100644 --- a/pkg/migrate/migrate.go +++ b/pkg/migrate/migrate.go @@ -89,7 +89,7 @@ func Migrate(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, return err } - matchingPVCs, namespaces, err := getPVCs(ctx, w, clientset, options.SourceSCName, options.DestSCName, options.Namespace) + matchingPVCs, namespaces, err := getPVCs(ctx, w, clientset, options) if err != nil { return err } @@ -404,7 +404,9 @@ func createMigrationPod(ctx context.Context, clientset k8sclient.Interface, ns s // a map of namespaces to arrays of original PVCs // an array of namespaces that the PVCs were found within // an error, if one was encountered -func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, sourceSCName, destSCName string, Namespace string) (map[string][]corev1.PersistentVolumeClaim, []string, error) { +func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, options Options) (map[string][]corev1.PersistentVolumeClaim, []string, error) { + sourceSCName, destSCName, namespace := options.SourceSCName, options.DestSCName, options.Namespace + // get PVs using the specified storage provider pvs, err := clientset.CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{}) if err != nil { @@ -430,7 +432,7 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, return nil, nil, fmt.Errorf("failed to get PVC for PV %s in %s: %w", pv.Spec.ClaimRef.Name, pv.Spec.ClaimRef.Namespace, err) } - if pv.Spec.ClaimRef.Namespace == Namespace || Namespace == "" { + if pv.Spec.ClaimRef.Namespace == namespace || namespace == "" { matchingPVCs[pv.Spec.ClaimRef.Namespace] = append(matchingPVCs[pv.Spec.ClaimRef.Namespace], *pvc) } @@ -458,7 +460,16 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, return nil, nil, fmt.Errorf("failed to print PVCs: %w", err) } - // create new PVCs for each matching PVC + err = createDestinationPVCs(ctx, w, clientset, destSCName, matchingPVCs, pvsByName) + if err != nil { + return nil, nil, err + } + + return matchingPVCs, pvcNamespaces, nil +} + +// create new PVCs in the destination storageclass for each matching PVC with matching size and accessMode +func createDestinationPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, destSCName string, matchingPVCs map[string][]corev1.PersistentVolumeClaim, pvsByName map[string]corev1.PersistentVolume) error { w.Printf("\nCreating new PVCs to migrate data to using the %s StorageClass\n", destSCName) for ns, nsPvcs := range matchingPVCs { for _, nsPvc := range nsPvcs { @@ -466,19 +477,19 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, desiredPV, ok := pvsByName[nsPvc.Spec.VolumeName] if !ok { - return nil, nil, fmt.Errorf("failed to find existing PV %s for PVC %s in %s", nsPvc.Spec.VolumeName, nsPvc.Name, ns) + return fmt.Errorf("failed to find existing PV %s for PVC %s in %s", nsPvc.Spec.VolumeName, nsPvc.Name, ns) } desiredPvStorage, ok := desiredPV.Spec.Capacity[corev1.ResourceStorage] if !ok { - return nil, nil, fmt.Errorf("failed to find storage capacity for PV %s for PVC %s in %s", nsPvc.Spec.VolumeName, nsPvc.Name, ns) + return fmt.Errorf("failed to find storage capacity for PV %s for PVC %s in %s", nsPvc.Spec.VolumeName, nsPvc.Name, ns) } // check to see if the desired PVC name already exists (and is appropriate) existingPVC, err := clientset.CoreV1().PersistentVolumeClaims(ns).Get(ctx, newName, metav1.GetOptions{}) if err != nil { if !k8serrors.IsNotFound(err) { - return nil, nil, fmt.Errorf("failed to find existing PVC: %w", err) + return fmt.Errorf("failed to find existing PVC: %w", err) } } else if existingPVC != nil { if existingPVC.Spec.StorageClassName != nil && *existingPVC.Spec.StorageClassName == destSCName { @@ -488,10 +499,10 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, w.Printf("found existing PVC with name %s, not creating new one\n", newName) continue } else { - return nil, nil, fmt.Errorf("PVC %s already exists in namespace %s but with size %s instead of %s, cannot create migration target from %s - please delete this to continue", newName, ns, existingSize, desiredPvStorage.String(), nsPvc.Name) + return fmt.Errorf("PVC %s already exists in namespace %s but with size %s instead of %s, cannot create migration target from %s - please delete this to continue", newName, ns, existingSize, desiredPvStorage.String(), nsPvc.Name) } } else { - return nil, nil, fmt.Errorf("PVC %s already exists in namespace %s but with storage class %v, cannot create migration target from %s - please delete this to continue", newName, ns, existingPVC.Spec.StorageClassName, nsPvc.Name) + return fmt.Errorf("PVC %s already exists in namespace %s but with storage class %v, cannot create migration target from %s - please delete this to continue", newName, ns, existingPVC.Spec.StorageClassName, nsPvc.Name) } } @@ -517,13 +528,12 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, }, }, metav1.CreateOptions{}) if err != nil { - return nil, nil, fmt.Errorf("failed to create new PVC %s in %s: %w", newName, ns, err) + return fmt.Errorf("failed to create new PVC %s in %s: %w", newName, ns, err) } w.Printf("created new PVC %s with size %v in %s\n", newName, newPVC.Spec.Resources.Requests.Storage().String(), ns) } } - - return matchingPVCs, pvcNamespaces, nil + return nil } func validateStorageClasses(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, sourceSCName string, destSCName string, skipSourceValidation bool) error { diff --git a/pkg/migrate/migrate_test.go b/pkg/migrate/migrate_test.go index 575f4a6..75e8733 100644 --- a/pkg/migrate/migrate_test.go +++ b/pkg/migrate/migrate_test.go @@ -904,7 +904,7 @@ func TestGetPVCs(t *testing.T) { req := require.New(t) clientset := fake.NewSimpleClientset(test.resources...) testlog := log.New(testWriter{t: t}, "", 0) - originalPVCs, nses, err := getPVCs(context.Background(), testlog, clientset, test.sourceScName, test.destScName, test.namespace) + originalPVCs, nses, err := getPVCs(context.Background(), testlog, clientset, Options{SourceSCName: test.sourceScName, DestSCName: test.destScName, Namespace: test.namespace}) if !test.wantErr { req.NoError(err) } else { From c82fe26fdf0c1827f089b73319135276d78ca916 Mon Sep 17 00:00:00 2001 From: Andrew Lavery Date: Sat, 16 Jul 2022 21:31:14 -0400 Subject: [PATCH 2/2] first implementation of dry run --- README.md | 3 ++- pkg/migrate/migrate.go | 35 +++++++++++++++++++++++++++++------ pkg/migrate/migrate_test.go | 2 +- 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index d86e41d..fa9c6bb 100644 --- a/README.md +++ b/README.md @@ -17,11 +17,12 @@ pvmigrate --source-sc default --dest-sc mynewsc |--------------------------|--------|----------|------------------|--------------------------------------------------------------------------------------------------| | --source-sc | String | ✓ | | storage provider name to migrate from | | --dest-sc | String | ✓ | | storage provider name to migrate to | -| --namespace | String | | | only migrate PVCs within this namespace | +| --namespace | String | | | only migrate PVCs within this namespace | | --rsync-image | String | | eeacms/rsync:2.3 | the image to use to copy PVCs - must have 'rsync' on the path | | --set-defaults | Bool | | false | change default storage class from source to dest | | --verbose-copy | Bool | | false | show output from the rsync command used to copy data between PVCs | | --skip-source-validation | Bool | | false | migrate from PVCs using a particular StorageClass name, even if that StorageClass does not exist | +| --dry-run | Bool | | false | do not apply changes to the cluster and instead print what resources would be impacted | ## Process diff --git a/pkg/migrate/migrate.go b/pkg/migrate/migrate.go index cc65840..689e245 100644 --- a/pkg/migrate/migrate.go +++ b/pkg/migrate/migrate.go @@ -44,6 +44,7 @@ type Options struct { SetDefaults bool VerboseCopy bool SkipSourceValidation bool + DryRun bool } // Cli uses CLI options to run Migrate @@ -57,6 +58,7 @@ func Cli() { flag.BoolVar(&options.SetDefaults, "set-defaults", false, "change default storage class from source to dest") flag.BoolVar(&options.VerboseCopy, "verbose-copy", false, "show output from the rsync command used to copy data between PVCs") flag.BoolVar(&options.SkipSourceValidation, "skip-source-validation", false, "migrate from PVCs using a particular StorageClass name, even if that StorageClass does not exist") + flag.BoolVar(&options.DryRun, "dry-run", false, "do not apply changes to the cluster and instead print what resources would be impacted") flag.Parse() @@ -94,11 +96,16 @@ func Migrate(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, return err } - err = scaleDownPods(ctx, w, clientset, matchingPVCs, time.Second*5) + err = scaleDownPods(ctx, w, clientset, matchingPVCs, time.Second*5, options.DryRun) if err != nil { return fmt.Errorf("failed to scale down pods: %w", err) } + if options.DryRun { + w.Printf("\nDry Run Complete\n") + return nil + } + err = copyAllPVCs(ctx, w, clientset, options.SourceSCName, options.DestSCName, options.RsyncImage, matchingPVCs, options.VerboseCopy, time.Second) if err != nil { return err @@ -460,9 +467,12 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, return nil, nil, fmt.Errorf("failed to print PVCs: %w", err) } - err = createDestinationPVCs(ctx, w, clientset, destSCName, matchingPVCs, pvsByName) - if err != nil { - return nil, nil, err + // do not create destination PVCs if this is a dry run + if !options.DryRun { + err = createDestinationPVCs(ctx, w, clientset, destSCName, matchingPVCs, pvsByName) + if err != nil { + return nil, nil, err + } } return matchingPVCs, pvcNamespaces, nil @@ -669,8 +679,8 @@ func mutateSC(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, // scaleDownPods scales down statefulsets & deployments controlling pods mounting PVCs in a supplied list // it will also cleanup WIP migration pods it discovers that happen to be mounting a supplied PVC. // if a pod is not created by pvmigrate, and is not controlled by a statefulset/deployment, this function will return an error. -// if waitForCleanup is true, after scaling down deployments/statefulsets it will wait for all pods to be deleted. -func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, matchingPVCs map[string][]corev1.PersistentVolumeClaim, checkInterval time.Duration) error { +// if dryRun is true, it will print the deployments and statefulsets to be scaled but will not actually modify them. +func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, matchingPVCs map[string][]corev1.PersistentVolumeClaim, checkInterval time.Duration, dryRun bool) error { // get pods using specified PVCs matchingPods := map[string][]corev1.Pod{} matchingPodsCount := 0 @@ -769,6 +779,10 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter } w.Printf("scaling StatefulSet %s from %d to 0 in %s\n", ownerName, formerScale, ns) + if dryRun { + continue + } + _, err = clientset.AppsV1().StatefulSets(ns).Update(ctx, ss, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err) @@ -807,6 +821,10 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter } w.Printf("scaling Deployment %s from %d to 0 in %s\n", ownerName, formerScale, ns) + if dryRun { + continue + } + _, err = clientset.AppsV1().Deployments(ns).Update(ctx, dep, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err) @@ -818,6 +836,11 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter } } + if dryRun { + w.Printf("Not waiting for pods to be cleaned up as this is a dry run\n") + return nil + } + // wait for all pods to be deleted w.Printf("\nWaiting for pods with mounted PVCs to be cleaned up\n") time.Sleep(checkInterval / 16) diff --git a/pkg/migrate/migrate_test.go b/pkg/migrate/migrate_test.go index 75e8733..4c35b1c 100644 --- a/pkg/migrate/migrate_test.go +++ b/pkg/migrate/migrate_test.go @@ -1952,7 +1952,7 @@ func Test_scaleDownPods(t *testing.T) { if tt.backgroundFunc != nil { go tt.backgroundFunc(testCtx, testlog, clientset) } - err := scaleDownPods(testCtx, testlog, clientset, tt.matchingPVCs, time.Second/20) + err := scaleDownPods(testCtx, testlog, clientset, tt.matchingPVCs, time.Second/20, false) if tt.wantErr { req.Error(err) testlog.Printf("got expected error %q", err.Error())