Skip to content

Commit

Permalink
✨ feat: accept resource mutators in Move operation
Browse files Browse the repository at this point in the history
Signed-off-by: Tarun Gupta Akirala <[email protected]>
  • Loading branch information
takirala authored and dkoshkin committed Jun 27, 2023
1 parent 14e8e52 commit 6360fa7
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 36 deletions.
3 changes: 3 additions & 0 deletions api/v1beta1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (
// ClusterFinalizer is the finalizer used by the cluster controller to
// cleanup the cluster resources when a Cluster is being deleted.
ClusterFinalizer = "cluster.cluster.x-k8s.io"

// ClusterKind represents the Kind of Cluster.
ClusterKind = "Cluster"
)

// ANCHOR: ClusterSpec
Expand Down
3 changes: 3 additions & 0 deletions api/v1beta1/clusterclass_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
)

// ClusterClassKind represents the Kind of ClusterClass.
const ClusterClassKind = "ClusterClass"

// +kubebuilder:object:root=true
// +kubebuilder:resource:path=clusterclasses,shortName=cc,scope=Namespaced,categories=cluster-api
// +kubebuilder:storageversion
Expand Down
128 changes: 97 additions & 31 deletions cmd/clusterctl/client/cluster/mover.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -41,10 +42,13 @@ import (
"sigs.k8s.io/cluster-api/util/yaml"
)

// ResourceMutatorFunc holds the type for mutators to be applied on resources during a move operation.
type ResourceMutatorFunc func(u *unstructured.Unstructured) error

// ObjectMover defines methods for moving Cluster API objects to another management cluster.
type ObjectMover interface {
// Move moves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
Move(namespace string, toCluster Client, dryRun bool) error
Move(namespace string, toCluster Client, dryRun bool, mutators ...ResourceMutatorFunc) error

// ToDirectory writes all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target directory.
ToDirectory(namespace string, directory string) error
Expand All @@ -63,7 +67,7 @@ type objectMover struct {
// ensure objectMover implements the ObjectMover interface.
var _ ObjectMover = &objectMover{}

func (o *objectMover) Move(namespace string, toCluster Client, dryRun bool) error {
func (o *objectMover) Move(namespace string, toCluster Client, dryRun bool, mutators ...ResourceMutatorFunc) error {
log := logf.Log
log.Info("Performing move...")
o.dryRun = dryRun
Expand Down Expand Up @@ -91,7 +95,7 @@ func (o *objectMover) Move(namespace string, toCluster Client, dryRun bool) erro
proxy = toCluster.Proxy()
}

return o.move(objectGraph, proxy)
return o.move(objectGraph, proxy, mutators...)
}

func (o *objectMover) ToDirectory(namespace string, directory string) error {
Expand Down Expand Up @@ -308,7 +312,7 @@ func getMachineObj(proxy Proxy, machine *node, machineObj *clusterv1.Machine) er
}

// Move moves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {
func (o *objectMover) move(graph *objectGraph, toProxy Proxy, mutators ...ResourceMutatorFunc) error {
log := logf.Log

clusters := graph.getClusters()
Expand All @@ -328,11 +332,9 @@ func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {
return errors.Wrap(err, "error pausing ClusterClasses")
}

// Ensure all the expected target namespaces are in place before creating objects.
log.V(1).Info("Creating target namespaces, if missing")
if err := o.ensureNamespaces(graph, toProxy); err != nil {
return err
}
// Nb. DO NOT call ensureNamespaces at this point because:
// - namespace will be ensured to exist before creating the resource.
// - If it's done here, we might create a namespace that can end up unused on target cluster (due to mutators).

// Define the move sequence by processing the ownerReference chain, so we ensure that a Kubernetes object is moved only after its owners.
// The sequence is bases on object graph nodes, each one representing a Kubernetes object; nodes are grouped, so bulk of nodes can be moved in parallel. e.g.
Expand All @@ -344,11 +346,15 @@ func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {
// Create all objects group by group, ensuring all the ownerReferences are re-created.
log.Info("Creating objects in the target cluster")
for groupIndex := 0; groupIndex < len(moveSequence.groups); groupIndex++ {
if err := o.createGroup(moveSequence.getGroup(groupIndex), toProxy); err != nil {
if err := o.createGroup(moveSequence.getGroup(groupIndex), toProxy, mutators...); err != nil {
return err
}
}

// Nb. mutators used after this point (after creating the resources on target clusters) are mainly intended for
// using the right namespace to fetch the resource from the target cluster.
// mutators affecting non metadata fields are no-op after this point.

// Delete all objects group by group in reverse order.
log.Info("Deleting objects from the source cluster")
for groupIndex := len(moveSequence.groups) - 1; groupIndex >= 0; groupIndex-- {
Expand All @@ -359,13 +365,13 @@ func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {

// Resume the ClusterClasses in the target management cluster, so the controllers start reconciling it.
log.V(1).Info("Resuming the target ClusterClasses")
if err := setClusterClassPause(toProxy, clusterClasses, false, o.dryRun); err != nil {
if err := setClusterClassPause(toProxy, clusterClasses, false, o.dryRun, mutators...); err != nil {
return errors.Wrap(err, "error resuming ClusterClasses")
}

// Reset the pause field on the Cluster object in the target management cluster, so the controllers start reconciling it.
log.V(1).Info("Resuming the target cluster")
return setClusterPause(toProxy, clusters, false, o.dryRun)
return setClusterPause(toProxy, clusters, false, o.dryRun, mutators...)
}

func (o *objectMover) toDirectory(graph *objectGraph, directory string) error {
Expand Down Expand Up @@ -532,7 +538,7 @@ func getMoveSequence(graph *objectGraph) *moveSequence {
}

// setClusterPause sets the paused field on nodes referring to Cluster objects.
func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool) error {
func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool, mutators ...ResourceMutatorFunc) error {
if dryRun {
return nil
}
Expand All @@ -553,7 +559,7 @@ func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool) err

// Nb. The operation is wrapped in a retry loop to make setClusterPause more resilient to unexpected conditions.
if err := retryWithExponentialBackoff(setClusterPauseBackoff, func() error {
return patchCluster(proxy, cluster, patch)
return patchCluster(proxy, cluster, patch, mutators...)
}); err != nil {
return errors.Wrapf(err, "error setting Cluster.Spec.Paused=%t", value)
}
Expand All @@ -562,7 +568,7 @@ func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool) err
}

// setClusterClassPause sets the paused annotation on nodes referring to ClusterClass objects.
func setClusterClassPause(proxy Proxy, clusterclasses []*node, pause bool, dryRun bool) error {
func setClusterClassPause(proxy Proxy, clusterclasses []*node, pause bool, dryRun bool, mutators ...ResourceMutatorFunc) error {
if dryRun {
return nil
}
Expand All @@ -580,7 +586,7 @@ func setClusterClassPause(proxy Proxy, clusterclasses []*node, pause bool, dryRu

// Nb. The operation is wrapped in a retry loop to make setClusterClassPause more resilient to unexpected conditions.
if err := retryWithExponentialBackoff(setClusterClassPauseBackoff, func() error {
return pauseClusterClass(proxy, clusterclass, pause)
return pauseClusterClass(proxy, clusterclass, pause, mutators...)
}); err != nil {
return errors.Wrapf(err, "error updating ClusterClass %s/%s", clusterclass.identity.Namespace, clusterclass.identity.Name)
}
Expand All @@ -589,19 +595,29 @@ func setClusterClassPause(proxy Proxy, clusterclasses []*node, pause bool, dryRu
}

// patchCluster applies a patch to a node referring to a Cluster object.
func patchCluster(proxy Proxy, cluster *node, patch client.Patch) error {
func patchCluster(proxy Proxy, n *node, patch client.Patch, mutators ...ResourceMutatorFunc) error {
cFrom, err := proxy.NewClient()
if err != nil {
return err
}

clusterObj := &clusterv1.Cluster{}
clusterObjKey := client.ObjectKey{
Namespace: cluster.identity.Namespace,
Name: cluster.identity.Name,
// Since the patch has been generated already in caller of this function, the ONLY affect that mutators can have
// here is on namespace of the resource.
clusterObj, err := applyMutators(&clusterv1.Cluster{
TypeMeta: metav1.TypeMeta{
Kind: clusterv1.ClusterKind,
APIVersion: clusterv1.GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: n.identity.Name,
Namespace: n.identity.Namespace,
},
}, mutators...)
if err != nil {
return err
}

if err := cFrom.Get(ctx, clusterObjKey, clusterObj); err != nil {
if err := cFrom.Get(ctx, client.ObjectKeyFromObject(clusterObj), clusterObj); err != nil {
return errors.Wrapf(err, "error reading Cluster %s/%s",
clusterObj.GetNamespace(), clusterObj.GetName())
}
Expand All @@ -614,18 +630,35 @@ func patchCluster(proxy Proxy, cluster *node, patch client.Patch) error {
return nil
}

func pauseClusterClass(proxy Proxy, n *node, pause bool) error {
func pauseClusterClass(proxy Proxy, n *node, pause bool, mutators ...ResourceMutatorFunc) error {
cFrom, err := proxy.NewClient()
if err != nil {
return errors.Wrap(err, "error creating client")
}

// Get the ClusterClass from the server
// Get a mutated copy of the ClusterClass to identify the target namespace.
// The ClusterClass could have been moved to a different namespace after the move.
mutatedClusterClass, err := applyMutators(&clusterv1.ClusterClass{
TypeMeta: metav1.TypeMeta{
Kind: clusterv1.ClusterClassKind,
APIVersion: clusterv1.GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: n.identity.Name,
Namespace: n.identity.Namespace,
}}, mutators...)
if err != nil {
return err
}

clusterClass := &clusterv1.ClusterClass{}
// Construct an object key using the mutatedClusterClass reflecting any changes to the namespace.
clusterClassObjKey := client.ObjectKey{
Name: n.identity.Name,
Namespace: n.identity.Namespace,
Name: mutatedClusterClass.GetName(),
Namespace: mutatedClusterClass.GetNamespace(),
}
// Get a copy of the ClusterClass.
// This will ensure that any other changes from the mutator are ignored here as we work with a fresh copy of the cluster class.
if err := cFrom.Get(ctx, clusterClassObjKey, clusterClass); err != nil {
return errors.Wrapf(err, "error reading ClusterClass %s/%s", n.identity.Namespace, n.identity.Name)
}
Expand Down Expand Up @@ -735,7 +768,7 @@ func (o *objectMover) ensureNamespace(toProxy Proxy, namespace string) error {
return err
}

// If the namespace does not exists, create it.
// If the namespace does not exist, create it.
ns = &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Expand All @@ -753,15 +786,18 @@ func (o *objectMover) ensureNamespace(toProxy Proxy, namespace string) error {
}

// createGroup creates all the Kubernetes objects into the target management cluster corresponding to the object graph nodes in a moveGroup.
func (o *objectMover) createGroup(group moveGroup, toProxy Proxy) error {
func (o *objectMover) createGroup(group moveGroup, toProxy Proxy, mutators ...ResourceMutatorFunc) error {
createTargetObjectBackoff := newWriteBackoff()
errList := []error{}

// Maintain a cache of namespaces that have been verified to already exist.
// Nb. This prevents us from making repetitive (and expensive) calls in listing all namespaces to ensure a namespace exists before creating a resource.
existingNamespaces := sets.New[string]()
for _, nodeToCreate := range group {
// Creates the Kubernetes object corresponding to the nodeToCreate.
// Nb. The operation is wrapped in a retry loop to make move more resilient to unexpected conditions.
err := retryWithExponentialBackoff(createTargetObjectBackoff, func() error {
return o.createTargetObject(nodeToCreate, toProxy)
return o.createTargetObject(nodeToCreate, toProxy, mutators, existingNamespaces)
})
if err != nil {
errList = append(errList, err)
Expand Down Expand Up @@ -820,7 +856,7 @@ func (o *objectMover) restoreGroup(group moveGroup, toProxy Proxy) error {
}

// createTargetObject creates the Kubernetes object in the target Management cluster corresponding to the object graph node, taking care of restoring the OwnerReference with the owner nodes, if any.
func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy) error {
func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy, mutators []ResourceMutatorFunc, existingNamespaces sets.Set[string]) error {
log := logf.Log
log.V(1).Info("Creating", nodeToCreate.identity.Kind, nodeToCreate.identity.Name, "Namespace", nodeToCreate.identity.Namespace)

Expand Down Expand Up @@ -853,7 +889,7 @@ func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy) erro
// Removes current OwnerReferences
obj.SetOwnerReferences(nil)

// Rebuild the owne reference chain
// Rebuild the owner reference chain
o.buildOwnerChain(obj, nodeToCreate)

// FIXME Workaround for https://github.com/kubernetes/kubernetes/issues/32220. Remove when the issue is fixed.
Expand All @@ -868,6 +904,17 @@ func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy) erro
return err
}

obj, err = applyMutators(obj, mutators...)
if err != nil {
return err
}
// Applying mutators MAY change the namespace, so ensure the namespace exists before creating the resource.
if !nodeToCreate.isGlobal && !existingNamespaces.Has(obj.GetNamespace()) {
if err = o.ensureNamespace(toProxy, obj.GetNamespace()); err != nil {
return err
}
existingNamespaces.Insert(obj.GetNamespace())
}
oldManagedFields := obj.GetManagedFields()
if err := cTo.Create(ctx, obj); err != nil {
if !apierrors.IsAlreadyExists(err) {
Expand Down Expand Up @@ -1188,3 +1235,22 @@ func patchTopologyManagedFields(ctx context.Context, oldManagedFields []metav1.M
}
return nil
}

func applyMutators(object client.Object, mutators ...ResourceMutatorFunc) (*unstructured.Unstructured, error) {
if object == nil {
return nil, nil
}
u := &unstructured.Unstructured{}
to, err := runtime.DefaultUnstructuredConverter.ToUnstructured(object)
if err != nil {
return nil, err
}
u.SetUnstructuredContent(to)
for _, mutator := range mutators {
if err := mutator(u); err != nil {
return nil, errors.Wrapf(err, "error applying resource mutator to %q %s/%s",
u.GroupVersionKind(), object.GetNamespace(), object.GetName())
}
}
return u, nil
}
Loading

0 comments on commit 6360fa7

Please sign in to comment.