Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor for dry run support #108

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ clean:

.PHONY: deps
deps:
go get golang.org/x/lint/golint
go install golang.org/x/lint/golint

.PHONY: lint
lint: deps
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ pvmigrate --source-sc default --dest-sc mynewsc
|--------------------------|--------|----------|------------------|--------------------------------------------------------------------------------------------------|
| --source-sc | String | ✓ | | storage provider name to migrate from |
| --dest-sc | String | ✓ | | storage provider name to migrate to |
| --namespace | String | | | only migrate PVCs within this namespace |
| --namespace | String | | | only migrate PVCs within this namespace |
| --rsync-image | String | | eeacms/rsync:2.3 | the image to use to copy PVCs - must have 'rsync' on the path |
| --set-defaults | Bool | | false | change default storage class from source to dest |
| --verbose-copy | Bool | | false | show output from the rsync command used to copy data between PVCs |
| --skip-source-validation | Bool | | false | migrate from PVCs using a particular StorageClass name, even if that StorageClass does not exist |
| --dry-run | Bool | | false | do not apply changes to the cluster and instead print what resources would be impacted |

## Process

Expand Down
63 changes: 48 additions & 15 deletions pkg/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Options struct {
SetDefaults bool
VerboseCopy bool
SkipSourceValidation bool
DryRun bool
}

// Cli uses CLI options to run Migrate
Expand All @@ -57,6 +58,7 @@ func Cli() {
flag.BoolVar(&options.SetDefaults, "set-defaults", false, "change default storage class from source to dest")
flag.BoolVar(&options.VerboseCopy, "verbose-copy", false, "show output from the rsync command used to copy data between PVCs")
flag.BoolVar(&options.SkipSourceValidation, "skip-source-validation", false, "migrate from PVCs using a particular StorageClass name, even if that StorageClass does not exist")
flag.BoolVar(&options.DryRun, "dry-run", false, "do not apply changes to the cluster and instead print what resources would be impacted")

flag.Parse()

Expand Down Expand Up @@ -89,16 +91,21 @@ func Migrate(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
return err
}

matchingPVCs, namespaces, err := getPVCs(ctx, w, clientset, options.SourceSCName, options.DestSCName, options.Namespace)
matchingPVCs, namespaces, err := getPVCs(ctx, w, clientset, options)
if err != nil {
return err
}

err = scaleDownPods(ctx, w, clientset, matchingPVCs, time.Second*5)
err = scaleDownPods(ctx, w, clientset, matchingPVCs, time.Second*5, options.DryRun)
if err != nil {
return fmt.Errorf("failed to scale down pods: %w", err)
}

if options.DryRun {
w.Printf("\nDry Run Complete\n")
return nil
}

err = copyAllPVCs(ctx, w, clientset, options.SourceSCName, options.DestSCName, options.RsyncImage, matchingPVCs, options.VerboseCopy, time.Second)
if err != nil {
return err
Expand Down Expand Up @@ -404,7 +411,9 @@ func createMigrationPod(ctx context.Context, clientset k8sclient.Interface, ns s
// a map of namespaces to arrays of original PVCs
// an array of namespaces that the PVCs were found within
// an error, if one was encountered
func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, sourceSCName, destSCName string, Namespace string) (map[string][]corev1.PersistentVolumeClaim, []string, error) {
func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, options Options) (map[string][]corev1.PersistentVolumeClaim, []string, error) {
sourceSCName, destSCName, namespace := options.SourceSCName, options.DestSCName, options.Namespace

// get PVs using the specified storage provider
pvs, err := clientset.CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{})
if err != nil {
Expand All @@ -430,7 +439,7 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
return nil, nil, fmt.Errorf("failed to get PVC for PV %s in %s: %w", pv.Spec.ClaimRef.Name, pv.Spec.ClaimRef.Namespace, err)
}

if pv.Spec.ClaimRef.Namespace == Namespace || Namespace == "" {
if pv.Spec.ClaimRef.Namespace == namespace || namespace == "" {
matchingPVCs[pv.Spec.ClaimRef.Namespace] = append(matchingPVCs[pv.Spec.ClaimRef.Namespace], *pvc)
}

Expand Down Expand Up @@ -458,27 +467,39 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
return nil, nil, fmt.Errorf("failed to print PVCs: %w", err)
}

// create new PVCs for each matching PVC
// do not create destination PVCs if this is a dry run
if !options.DryRun {
err = createDestinationPVCs(ctx, w, clientset, destSCName, matchingPVCs, pvsByName)
if err != nil {
return nil, nil, err
}
}

return matchingPVCs, pvcNamespaces, nil
}

// create new PVCs in the destination storageclass for each matching PVC with matching size and accessMode
func createDestinationPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, destSCName string, matchingPVCs map[string][]corev1.PersistentVolumeClaim, pvsByName map[string]corev1.PersistentVolume) error {
w.Printf("\nCreating new PVCs to migrate data to using the %s StorageClass\n", destSCName)
for ns, nsPvcs := range matchingPVCs {
for _, nsPvc := range nsPvcs {
newName := newPvcName(nsPvc.Name)

desiredPV, ok := pvsByName[nsPvc.Spec.VolumeName]
if !ok {
return nil, nil, fmt.Errorf("failed to find existing PV %s for PVC %s in %s", nsPvc.Spec.VolumeName, nsPvc.Name, ns)
return fmt.Errorf("failed to find existing PV %s for PVC %s in %s", nsPvc.Spec.VolumeName, nsPvc.Name, ns)
}

desiredPvStorage, ok := desiredPV.Spec.Capacity[corev1.ResourceStorage]
if !ok {
return nil, nil, fmt.Errorf("failed to find storage capacity for PV %s for PVC %s in %s", nsPvc.Spec.VolumeName, nsPvc.Name, ns)
return fmt.Errorf("failed to find storage capacity for PV %s for PVC %s in %s", nsPvc.Spec.VolumeName, nsPvc.Name, ns)
}

// check to see if the desired PVC name already exists (and is appropriate)
existingPVC, err := clientset.CoreV1().PersistentVolumeClaims(ns).Get(ctx, newName, metav1.GetOptions{})
if err != nil {
if !k8serrors.IsNotFound(err) {
return nil, nil, fmt.Errorf("failed to find existing PVC: %w", err)
return fmt.Errorf("failed to find existing PVC: %w", err)
}
} else if existingPVC != nil {
if existingPVC.Spec.StorageClassName != nil && *existingPVC.Spec.StorageClassName == destSCName {
Expand All @@ -488,10 +509,10 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
w.Printf("found existing PVC with name %s, not creating new one\n", newName)
continue
} else {
return nil, nil, fmt.Errorf("PVC %s already exists in namespace %s but with size %s instead of %s, cannot create migration target from %s - please delete this to continue", newName, ns, existingSize, desiredPvStorage.String(), nsPvc.Name)
return fmt.Errorf("PVC %s already exists in namespace %s but with size %s instead of %s, cannot create migration target from %s - please delete this to continue", newName, ns, existingSize, desiredPvStorage.String(), nsPvc.Name)
}
} else {
return nil, nil, fmt.Errorf("PVC %s already exists in namespace %s but with storage class %v, cannot create migration target from %s - please delete this to continue", newName, ns, existingPVC.Spec.StorageClassName, nsPvc.Name)
return fmt.Errorf("PVC %s already exists in namespace %s but with storage class %v, cannot create migration target from %s - please delete this to continue", newName, ns, existingPVC.Spec.StorageClassName, nsPvc.Name)
}
}

Expand All @@ -517,13 +538,12 @@ func getPVCs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
},
}, metav1.CreateOptions{})
if err != nil {
return nil, nil, fmt.Errorf("failed to create new PVC %s in %s: %w", newName, ns, err)
return fmt.Errorf("failed to create new PVC %s in %s: %w", newName, ns, err)
}
w.Printf("created new PVC %s with size %v in %s\n", newName, newPVC.Spec.Resources.Requests.Storage().String(), ns)
}
}

return matchingPVCs, pvcNamespaces, nil
return nil
}

func validateStorageClasses(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, sourceSCName string, destSCName string, skipSourceValidation bool) error {
Expand Down Expand Up @@ -659,8 +679,8 @@ func mutateSC(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
// scaleDownPods scales down statefulsets & deployments controlling pods mounting PVCs in a supplied list
// it will also cleanup WIP migration pods it discovers that happen to be mounting a supplied PVC.
// if a pod is not created by pvmigrate, and is not controlled by a statefulset/deployment, this function will return an error.
// if waitForCleanup is true, after scaling down deployments/statefulsets it will wait for all pods to be deleted.
func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, matchingPVCs map[string][]corev1.PersistentVolumeClaim, checkInterval time.Duration) error {
// if dryRun is true, it will print the deployments and statefulsets to be scaled but will not actually modify them.
func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, matchingPVCs map[string][]corev1.PersistentVolumeClaim, checkInterval time.Duration, dryRun bool) error {
// get pods using specified PVCs
matchingPods := map[string][]corev1.Pod{}
matchingPodsCount := 0
Expand Down Expand Up @@ -759,6 +779,10 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
}

w.Printf("scaling StatefulSet %s from %d to 0 in %s\n", ownerName, formerScale, ns)
if dryRun {
continue
}

_, err = clientset.AppsV1().StatefulSets(ns).Update(ctx, ss, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err)
Expand Down Expand Up @@ -797,6 +821,10 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
}

w.Printf("scaling Deployment %s from %d to 0 in %s\n", ownerName, formerScale, ns)
if dryRun {
continue
}

_, err = clientset.AppsV1().Deployments(ns).Update(ctx, dep, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to scale statefulset %s to zero in %s: %w", ownerName, ns, err)
Expand All @@ -808,6 +836,11 @@ func scaleDownPods(ctx context.Context, w *log.Logger, clientset k8sclient.Inter
}
}

if dryRun {
w.Printf("Not waiting for pods to be cleaned up as this is a dry run\n")
return nil
}

// wait for all pods to be deleted
w.Printf("\nWaiting for pods with mounted PVCs to be cleaned up\n")
time.Sleep(checkInterval / 16)
Expand Down
4 changes: 2 additions & 2 deletions pkg/migrate/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ func TestGetPVCs(t *testing.T) {
req := require.New(t)
clientset := fake.NewSimpleClientset(test.resources...)
testlog := log.New(testWriter{t: t}, "", 0)
originalPVCs, nses, err := getPVCs(context.Background(), testlog, clientset, test.sourceScName, test.destScName, test.namespace)
originalPVCs, nses, err := getPVCs(context.Background(), testlog, clientset, Options{SourceSCName: test.sourceScName, DestSCName: test.destScName, Namespace: test.namespace})
if !test.wantErr {
req.NoError(err)
} else {
Expand Down Expand Up @@ -1952,7 +1952,7 @@ func Test_scaleDownPods(t *testing.T) {
if tt.backgroundFunc != nil {
go tt.backgroundFunc(testCtx, testlog, clientset)
}
err := scaleDownPods(testCtx, testlog, clientset, tt.matchingPVCs, time.Second/20)
err := scaleDownPods(testCtx, testlog, clientset, tt.matchingPVCs, time.Second/20, false)
if tt.wantErr {
req.Error(err)
testlog.Printf("got expected error %q", err.Error())
Expand Down