From 1392463ea9c64ea1c93678a49be78992321f2fdb Mon Sep 17 00:00:00 2001 From: Leon Date: Mon, 25 Nov 2024 17:21:48 +0800 Subject: [PATCH 1/3] chore: filter events to reconcile by CRD API version --- controllers/apps/cluster_controller.go | 2 +- .../apps/clusterdefinition_controller.go | 2 +- controllers/apps/component_controller.go | 4 +- .../apps/componentdefinition_controller.go | 2 +- .../apps/componentversion_controller.go | 2 +- .../configconstraint_controller.go | 2 +- .../configuration/configuration_controller.go | 2 +- .../configuration/reconfigure_controller.go | 2 +- .../apps/servicedescriptor_controller.go | 2 +- .../apps/shardingdefinition_controller.go | 2 +- .../apps/sidecardefinition_controller.go | 2 +- .../apps/transformer_cluster_normalization.go | 51 ++++++++- .../dataprotection/actionset_controller.go | 2 +- .../dataprotection/backup_controller.go | 2 +- .../dataprotection/backuppolicy_controller.go | 2 +- .../backuppolicytemplate_controller.go | 2 +- .../dataprotection/backuprepo_controller.go | 2 +- .../backupschedule_controller.go | 2 +- controllers/dataprotection/gc_controller.go | 2 +- .../log_collection_controller.go | 2 +- .../dataprotection/restore_controller.go | 2 +- .../storageprovider_controller.go | 2 +- .../volumepopulator_controller.go | 2 +- .../nodecountscaler_controller.go | 3 +- controllers/extensions/addon_controller.go | 2 +- controllers/k8score/event_controller.go | 2 +- .../operations/opsdefinition_controller.go | 2 +- .../operations/opsrequest_controller.go | 2 +- .../workloads/instanceset_controller.go | 4 +- pkg/constant/annotations.go | 3 + pkg/controller/component/component.go | 1 + pkg/controller/factory/builder.go | 1 + pkg/controllerutil/predicate.go | 105 +++++++++++++++++- 33 files changed, 188 insertions(+), 34 deletions(-) diff --git a/controllers/apps/cluster_controller.go b/controllers/apps/cluster_controller.go index 7a081ecb3b5..d1f8979b4e7 100644 --- a/controllers/apps/cluster_controller.go +++ b/controllers/apps/cluster_controller.go @@ -171,7 +171,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct // SetupWithManager sets up the controller with the Manager. func (r *ClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr, &appsv1.Cluster{}, &appsv1.Component{}). For(&appsv1.Cluster{}). WithOptions(controller.Options{ MaxConcurrentReconciles: int(math.Ceil(viper.GetFloat64(constant.CfgKBReconcileWorkers) / 4)), diff --git a/controllers/apps/clusterdefinition_controller.go b/controllers/apps/clusterdefinition_controller.go index c44b7bdce7c..5ab3d9300f1 100644 --- a/controllers/apps/clusterdefinition_controller.go +++ b/controllers/apps/clusterdefinition_controller.go @@ -96,7 +96,7 @@ func (r *ClusterDefinitionReconciler) Reconcile(ctx context.Context, req ctrl.Re // SetupWithManager sets up the controller with the Manager. func (r *ClusterDefinitionReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr, &appsv1.ClusterDefinition{}). For(&appsv1.ClusterDefinition{}). Complete(r) } diff --git a/controllers/apps/component_controller.go b/controllers/apps/component_controller.go index ba22615ded8..f74a3a34ef3 100644 --- a/controllers/apps/component_controller.go +++ b/controllers/apps/component_controller.go @@ -196,7 +196,7 @@ func (r *ComponentReconciler) SetupWithManager(mgr ctrl.Manager, multiClusterMgr } func (r *ComponentReconciler) setupWithManager(mgr ctrl.Manager) error { - b := intctrlutil.NewNamespacedControllerManagedBy(mgr). + b := intctrlutil.NewControllerManagedBy(mgr, &appsv1.Component{}, &workloads.InstanceSet{}). For(&appsv1.Component{}). WithOptions(controller.Options{ MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers), @@ -220,7 +220,7 @@ func (r *ComponentReconciler) setupWithManager(mgr ctrl.Manager) error { } func (r *ComponentReconciler) setupWithMultiClusterManager(mgr ctrl.Manager, multiClusterMgr multicluster.Manager) error { - b := intctrlutil.NewNamespacedControllerManagedBy(mgr). + b := intctrlutil.NewControllerManagedBy(mgr, &appsv1.Component{}, &workloads.InstanceSet{}). For(&appsv1.Component{}). WithOptions(controller.Options{ MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers), diff --git a/controllers/apps/componentdefinition_controller.go b/controllers/apps/componentdefinition_controller.go index 8cd0bf896d3..ef11b36e005 100644 --- a/controllers/apps/componentdefinition_controller.go +++ b/controllers/apps/componentdefinition_controller.go @@ -86,7 +86,7 @@ func (r *ComponentDefinitionReconciler) Reconcile(ctx context.Context, req ctrl. // SetupWithManager sets up the controller with the Manager. func (r *ComponentDefinitionReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr, &appsv1.ComponentDefinition{}). For(&appsv1.ComponentDefinition{}). Complete(r) } diff --git a/controllers/apps/componentversion_controller.go b/controllers/apps/componentversion_controller.go index 57a57890a69..eaff1d6c342 100644 --- a/controllers/apps/componentversion_controller.go +++ b/controllers/apps/componentversion_controller.go @@ -91,7 +91,7 @@ func (r *ComponentVersionReconciler) Reconcile(ctx context.Context, req ctrl.Req // SetupWithManager sets up the controller with the Manager. func (r *ComponentVersionReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr, &appsv1.ComponentVersion{}, &appsv1.ComponentDefinition{}). For(&appsv1.ComponentVersion{}). Watches(&appsv1.ComponentDefinition{}, handler.EnqueueRequestsFromMapFunc(r.compatibleCompVersion)). Complete(r) diff --git a/controllers/apps/configuration/configconstraint_controller.go b/controllers/apps/configuration/configconstraint_controller.go index 5ebd5d16065..5fc10fac609 100644 --- a/controllers/apps/configuration/configconstraint_controller.go +++ b/controllers/apps/configuration/configconstraint_controller.go @@ -117,7 +117,7 @@ func (r *ConfigConstraintReconciler) Reconcile(ctx context.Context, req ctrl.Req // SetupWithManager sets up the controller with the Manager. func (r *ConfigConstraintReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr). For(&appsv1beta1.ConfigConstraint{}). // for other resource Owns(&corev1.ConfigMap{}). // TODO(leon) diff --git a/controllers/apps/configuration/configuration_controller.go b/controllers/apps/configuration/configuration_controller.go index dcfa9b77d8e..3856465284e 100644 --- a/controllers/apps/configuration/configuration_controller.go +++ b/controllers/apps/configuration/configuration_controller.go @@ -189,7 +189,7 @@ func (r *ConfigurationReconciler) runTasks(taskCtx TaskContext, tasks []Task) (e // SetupWithManager sets up the controller with the Manager. func (r *ConfigurationReconciler) SetupWithManager(mgr ctrl.Manager, multiClusterMgr multicluster.Manager) error { - b := intctrlutil.NewNamespacedControllerManagedBy(mgr). + b := intctrlutil.NewControllerManagedBy(mgr). For(&appsv1alpha1.Configuration{}). WithOptions(controller.Options{ MaxConcurrentReconciles: int(math.Ceil(viper.GetFloat64(constant.CfgKBReconcileWorkers) / 2)), diff --git a/controllers/apps/configuration/reconfigure_controller.go b/controllers/apps/configuration/reconfigure_controller.go index 3df54e4e958..6efd1953e27 100644 --- a/controllers/apps/configuration/reconfigure_controller.go +++ b/controllers/apps/configuration/reconfigure_controller.go @@ -136,7 +136,7 @@ func (r *ReconfigureReconciler) Reconcile(ctx context.Context, req ctrl.Request) // SetupWithManager sets up the controller with the Manager. func (r *ReconfigureReconciler) SetupWithManager(mgr ctrl.Manager, multiClusterMgr multicluster.Manager) error { - b := intctrlutil.NewNamespacedControllerManagedBy(mgr). + b := intctrlutil.NewControllerManagedBy(mgr). For(&corev1.ConfigMap{}). WithOptions(controller.Options{ MaxConcurrentReconciles: int(math.Ceil(viper.GetFloat64(constant.CfgKBReconcileWorkers) / 4)), diff --git a/controllers/apps/servicedescriptor_controller.go b/controllers/apps/servicedescriptor_controller.go index 2ae0e054c4c..739147ee5b3 100644 --- a/controllers/apps/servicedescriptor_controller.go +++ b/controllers/apps/servicedescriptor_controller.go @@ -106,7 +106,7 @@ func (r *ServiceDescriptorReconciler) Reconcile(ctx context.Context, req ctrl.Re // SetupWithManager sets up the controller with the Manager. func (r *ServiceDescriptorReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr). For(&appsv1.ServiceDescriptor{}). Complete(r) } diff --git a/controllers/apps/shardingdefinition_controller.go b/controllers/apps/shardingdefinition_controller.go index c847bf22479..cbf14138dda 100644 --- a/controllers/apps/shardingdefinition_controller.go +++ b/controllers/apps/shardingdefinition_controller.go @@ -105,7 +105,7 @@ func (r *ShardingDefinitionReconciler) Reconcile(ctx context.Context, req ctrl.R // SetupWithManager sets up the controller with the Manager. func (r *ShardingDefinitionReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr). For(&appsv1.ShardingDefinition{}). Complete(r) } diff --git a/controllers/apps/sidecardefinition_controller.go b/controllers/apps/sidecardefinition_controller.go index 80cb9b57993..37465557e6b 100644 --- a/controllers/apps/sidecardefinition_controller.go +++ b/controllers/apps/sidecardefinition_controller.go @@ -108,7 +108,7 @@ func (r *SidecarDefinitionReconciler) Reconcile(ctx context.Context, req ctrl.Re // SetupWithManager sets up the controller with the Manager. func (r *SidecarDefinitionReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr). For(&appsv1.SidecarDefinition{}). Watches(&appsv1.ComponentDefinition{}, handler.EnqueueRequestsFromMapFunc(r.matchedCompDefinition)). Complete(r) diff --git a/controllers/apps/transformer_cluster_normalization.go b/controllers/apps/transformer_cluster_normalization.go index 1d643f9c819..e6d61493ddd 100644 --- a/controllers/apps/transformer_cluster_normalization.go +++ b/controllers/apps/transformer_cluster_normalization.go @@ -22,6 +22,7 @@ package apps import ( "fmt" + "golang.org/x/exp/maps" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -81,7 +82,7 @@ func (t *clusterNormalizationTransformer) Transform(ctx graph.TransformContext, // write-back the resolved definitions and service versions to cluster spec. t.writeBackCompNShardingSpecs(transCtx) - return nil + return t.patchCRDAPIVersionKey(transCtx) } func (t *clusterNormalizationTransformer) resolveCompsNShardings(transCtx *clusterTransformContext) ([]*appsv1.ClusterComponentSpec, []*appsv1.ClusterSharding, error) { @@ -409,3 +410,51 @@ func (t *clusterNormalizationTransformer) writeBackCompNShardingSpecs(transCtx * transCtx.Cluster.Spec.Shardings = shardings } } + +func (t *clusterNormalizationTransformer) patchCRDAPIVersionKey(transCtx *clusterTransformContext) error { + apiVersions := map[string][]string{} + + add := func(k, v string) { + if _, ok := apiVersions[k]; !ok { + apiVersions[k] = []string{} + } + apiVersions[k] = append(apiVersions[k], v) + } + + from := func(name string, annotations map[string]string) { + if annotations == nil { + add("", name) + } else { + add(annotations[constant.CRDAPIVersionAnnotationKey], name) + } + } + + if transCtx.clusterDef != nil { + from(transCtx.clusterDef.Name, transCtx.clusterDef.Annotations) + } else { + for _, compDef := range transCtx.componentDefs { + from(compDef.Name, compDef.Annotations) + } + for _, shardingDef := range transCtx.shardingDefs { + from(shardingDef.Name, shardingDef.Annotations) + } + } + + if len(apiVersions) > 1 { + return fmt.Errorf("multiple CRD API versions found: %v", apiVersions) + } + + apiVersion := "" + if len(apiVersions) == 1 { + apiVersion = maps.Keys(apiVersions)[0] + if transCtx.Cluster.Annotations == nil { + transCtx.Cluster.Annotations = make(map[string]string) + } + transCtx.Cluster.Annotations[constant.CRDAPIVersionAnnotationKey] = apiVersion + } + + if controllerutil.IsSupportedCRDAPIVersion(apiVersion) { + return nil + } + return graph.ErrPrematureStop // un-supported CRD API version, stop the transformation +} diff --git a/controllers/dataprotection/actionset_controller.go b/controllers/dataprotection/actionset_controller.go index 787977e47c0..0af71c4daac 100644 --- a/controllers/dataprotection/actionset_controller.go +++ b/controllers/dataprotection/actionset_controller.go @@ -92,7 +92,7 @@ func (r *ActionSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( // SetupWithManager sets up the controller with the Manager. func (r *ActionSetReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr). For(&dpv1alpha1.ActionSet{}). Complete(r) } diff --git a/controllers/dataprotection/backup_controller.go b/controllers/dataprotection/backup_controller.go index 3595876b8e0..31d78742043 100644 --- a/controllers/dataprotection/backup_controller.go +++ b/controllers/dataprotection/backup_controller.go @@ -141,7 +141,7 @@ func (r *BackupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // SetupWithManager sets up the controller with the Manager. func (r *BackupReconciler) SetupWithManager(mgr ctrl.Manager) error { - b := intctrlutil.NewNamespacedControllerManagedBy(mgr). + b := intctrlutil.NewControllerManagedBy(mgr). For(&dpv1alpha1.Backup{}). WithOptions(controller.Options{ MaxConcurrentReconciles: viper.GetInt(dptypes.CfgDataProtectionReconcileWorkers), diff --git a/controllers/dataprotection/backuppolicy_controller.go b/controllers/dataprotection/backuppolicy_controller.go index bd0bcff3f02..ec93b3721fc 100644 --- a/controllers/dataprotection/backuppolicy_controller.go +++ b/controllers/dataprotection/backuppolicy_controller.go @@ -124,7 +124,7 @@ func (r *BackupPolicyReconciler) validateBackupPolicy(backupPolicy *dpv1alpha1.B // SetupWithManager sets up the controller with the Manager. func (r *BackupPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr). For(&dpv1alpha1.BackupPolicy{}). Complete(r) } diff --git a/controllers/dataprotection/backuppolicytemplate_controller.go b/controllers/dataprotection/backuppolicytemplate_controller.go index 40d46cedcc4..3c7f8a1da94 100644 --- a/controllers/dataprotection/backuppolicytemplate_controller.go +++ b/controllers/dataprotection/backuppolicytemplate_controller.go @@ -174,7 +174,7 @@ func (r *BackupPolicyTemplateReconciler) compatibleBackupPolicyTemplate(ctx cont // SetupWithManager sets up the controller with the Manager. func (r *BackupPolicyTemplateReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr). For(&dpv1alpha1.BackupPolicyTemplate{}). Watches(&appsv1.ComponentDefinition{}, handler.EnqueueRequestsFromMapFunc(r.compatibleBackupPolicyTemplate)). Complete(r) diff --git a/controllers/dataprotection/backuprepo_controller.go b/controllers/dataprotection/backuprepo_controller.go index 08d54f13aa5..db168841b35 100644 --- a/controllers/dataprotection/backuprepo_controller.go +++ b/controllers/dataprotection/backuprepo_controller.go @@ -1497,7 +1497,7 @@ func (r *BackupRepoReconciler) SetupWithManager(mgr ctrl.Manager) error { }); err != nil { return err } - b := intctrlutil.NewNamespacedControllerManagedBy(mgr). + b := intctrlutil.NewControllerManagedBy(mgr). For(&dpv1alpha1.BackupRepo{}). Watches(&dpv1alpha1.StorageProvider{}, handler.EnqueueRequestsFromMapFunc(r.mapProviderToRepos)). Watches(&dpv1alpha1.Backup{}, handler.EnqueueRequestsFromMapFunc(r.mapBackupToRepo)). diff --git a/controllers/dataprotection/backupschedule_controller.go b/controllers/dataprotection/backupschedule_controller.go index 46bacf489d4..aaeb7ebfc71 100644 --- a/controllers/dataprotection/backupschedule_controller.go +++ b/controllers/dataprotection/backupschedule_controller.go @@ -96,7 +96,7 @@ func (r *BackupScheduleReconciler) Reconcile(ctx context.Context, req ctrl.Reque // SetupWithManager sets up the controller with the Manager. func (r *BackupScheduleReconciler) SetupWithManager(mgr ctrl.Manager) error { - b := intctrlutil.NewNamespacedControllerManagedBy(mgr). + b := intctrlutil.NewControllerManagedBy(mgr). For(&dpv1alpha1.BackupSchedule{}) // Compatible with kubernetes versions prior to K8s 1.21, only supports batch v1beta1. diff --git a/controllers/dataprotection/gc_controller.go b/controllers/dataprotection/gc_controller.go index 72dafe8c070..153b52f5217 100644 --- a/controllers/dataprotection/gc_controller.go +++ b/controllers/dataprotection/gc_controller.go @@ -62,7 +62,7 @@ func NewGCReconciler(mgr ctrl.Manager) *GCReconciler { // taken care of. Other events will be filtered to decrease the load on the controller. func (r *GCReconciler) SetupWithManager(mgr ctrl.Manager) error { s := dputils.NewPeriodicalEnqueueSource(mgr.GetClient(), &dpv1alpha1.BackupList{}, r.frequency, dputils.PeriodicalEnqueueSourceOption{}) - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr). For(&dpv1alpha1.Backup{}, builder.WithPredicates(predicate.NewPredicateFuncs(func(client.Object) bool { return false }))). WatchesRawSource(s, nil). Complete(r) diff --git a/controllers/dataprotection/log_collection_controller.go b/controllers/dataprotection/log_collection_controller.go index a3726012ad5..5f89c407ceb 100644 --- a/controllers/dataprotection/log_collection_controller.go +++ b/controllers/dataprotection/log_collection_controller.go @@ -101,7 +101,7 @@ func (r *LogCollectionReconciler) Reconcile(ctx context.Context, req ctrl.Reques // SetupWithManager sets up the controller with the Manager. func (r *LogCollectionReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr). For(&batchv1.Job{}, builder.WithPredicates( failedJobUpdatePredicate{ Funcs: predicate.NewPredicateFuncs(func(object client.Object) bool { return false }), diff --git a/controllers/dataprotection/restore_controller.go b/controllers/dataprotection/restore_controller.go index 16842c989e5..c9c54cac98d 100644 --- a/controllers/dataprotection/restore_controller.go +++ b/controllers/dataprotection/restore_controller.go @@ -102,7 +102,7 @@ func (r *RestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct // SetupWithManager sets up the controller with the Manager. func (r *RestoreReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr). For(&dpv1alpha1.Restore{}). Owns(&batchv1.Job{}). Watches(&batchv1.Job{}, handler.EnqueueRequestsFromMapFunc(r.parseRestoreJob)). diff --git a/controllers/dataprotection/storageprovider_controller.go b/controllers/dataprotection/storageprovider_controller.go index 9549efd6f6c..03c31fa5fc9 100644 --- a/controllers/dataprotection/storageprovider_controller.go +++ b/controllers/dataprotection/storageprovider_controller.go @@ -219,7 +219,7 @@ func (r *StorageProviderReconciler) deleteExternalResources( // SetupWithManager sets up the controller with the Manager. func (r *StorageProviderReconciler) SetupWithManager(mgr ctrl.Manager) error { - b := intctrlutil.NewNamespacedControllerManagedBy(mgr). + b := intctrlutil.NewControllerManagedBy(mgr). For(&dpv1alpha1.StorageProvider{}) mapCSIDriverToProvider := handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) []reconcile.Request { diff --git a/controllers/dataprotection/volumepopulator_controller.go b/controllers/dataprotection/volumepopulator_controller.go index f95be89d707..be41bea4c4c 100644 --- a/controllers/dataprotection/volumepopulator_controller.go +++ b/controllers/dataprotection/volumepopulator_controller.go @@ -99,7 +99,7 @@ func (r *VolumePopulatorReconciler) Reconcile(ctx context.Context, req ctrl.Requ // SetupWithManager sets up the controller with the Manager. func (r *VolumePopulatorReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr). For(&corev1.PersistentVolumeClaim{}). Owns(&batchv1.Job{}). Complete(r) diff --git a/controllers/experimental/nodecountscaler_controller.go b/controllers/experimental/nodecountscaler_controller.go index dafa1b4d6ae..f20e534a4c4 100644 --- a/controllers/experimental/nodecountscaler_controller.go +++ b/controllers/experimental/nodecountscaler_controller.go @@ -32,6 +32,7 @@ import ( appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" experimental "github.com/apecloud/kubeblocks/apis/experimental/v1alpha1" "github.com/apecloud/kubeblocks/pkg/controller/kubebuilderx" + intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" ) // NodeCountScalerReconciler reconciles a NodeCountScaler object @@ -69,7 +70,7 @@ func (r *NodeCountScalerReconciler) Reconcile(ctx context.Context, req ctrl.Requ // SetupWithManager sets up the controller with the Manager. func (r *NodeCountScalerReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr). For(&experimental.NodeCountScaler{}). Watches(&corev1.Node{}, &nodeScalingHandler{r.Client}). Watches(&appsv1.Cluster{}, &clusterHandler{r.Client}). diff --git a/controllers/extensions/addon_controller.go b/controllers/extensions/addon_controller.go index d84ee588dbd..a656e7b8805 100644 --- a/controllers/extensions/addon_controller.go +++ b/controllers/extensions/addon_controller.go @@ -153,7 +153,7 @@ func (r *AddonReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl // SetupWithManager sets up the controller with the Manager. func (r *AddonReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr). For(&extensionsv1alpha1.Addon{}). Watches(&batchv1.Job{}, handler.EnqueueRequestsFromMapFunc(r.findAddonJobs)). WithOptions(controller.Options{ diff --git a/controllers/k8score/event_controller.go b/controllers/k8score/event_controller.go index 42a4e1954e4..5bf2a295dda 100644 --- a/controllers/k8score/event_controller.go +++ b/controllers/k8score/event_controller.go @@ -98,7 +98,7 @@ func (r *EventReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl // SetupWithManager sets up the controller with the Manager. func (r *EventReconciler) SetupWithManager(mgr ctrl.Manager, multiClusterMgr multicluster.Manager) error { - b := intctrlutil.NewNamespacedControllerManagedBy(mgr). + b := intctrlutil.NewControllerManagedBy(mgr). For(&corev1.Event{}) if multiClusterMgr != nil { diff --git a/controllers/operations/opsdefinition_controller.go b/controllers/operations/opsdefinition_controller.go index 2bf9bfbf8c9..42b0f17bf38 100644 --- a/controllers/operations/opsdefinition_controller.go +++ b/controllers/operations/opsdefinition_controller.go @@ -102,7 +102,7 @@ func (r *OpsDefinitionReconciler) updateStatusUnavailable(reqCtx intctrlutil.Req // SetupWithManager sets up the controller with the Manager. func (r *OpsDefinitionReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr). For(&opsv1alpha1.OpsDefinition{}). Complete(r) } diff --git a/controllers/operations/opsrequest_controller.go b/controllers/operations/opsrequest_controller.go index be4ec744e45..f54caa9839e 100644 --- a/controllers/operations/opsrequest_controller.go +++ b/controllers/operations/opsrequest_controller.go @@ -90,7 +90,7 @@ func (r *OpsRequestReconciler) Reconcile(ctx context.Context, req ctrl.Request) // SetupWithManager sets up the controller with the Manager. func (r *OpsRequestReconciler) SetupWithManager(mgr ctrl.Manager) error { - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr). For(&opsv1alpha1.OpsRequest{}). WithOptions(controller.Options{ MaxConcurrentReconciles: int(math.Ceil(viper.GetFloat64(constant.CfgKBReconcileWorkers) / 2)), diff --git a/controllers/workloads/instanceset_controller.go b/controllers/workloads/instanceset_controller.go index c723e61245a..72463950d64 100644 --- a/controllers/workloads/instanceset_controller.go +++ b/controllers/workloads/instanceset_controller.go @@ -112,7 +112,7 @@ func (r *InstanceSetReconciler) SetupWithManager(mgr ctrl.Manager, multiClusterM func (r *InstanceSetReconciler) setupWithManager(mgr ctrl.Manager, ctx *handler.FinderContext) error { itsFinder := handler.NewLabelFinder(&workloads.InstanceSet{}, instanceset.WorkloadsManagedByLabelKey, workloads.Kind, instanceset.WorkloadsInstanceLabelKey) podHandler := handler.NewBuilder(ctx).AddFinder(itsFinder).Build() - return intctrlutil.NewNamespacedControllerManagedBy(mgr). + return intctrlutil.NewControllerManagedBy(mgr, &workloads.InstanceSet{}). For(&workloads.InstanceSet{}). WithOptions(controller.Options{ MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers), @@ -132,7 +132,7 @@ func (r *InstanceSetReconciler) setupWithMultiClusterManager(mgr ctrl.Manager, // TODO: modify handler.getObjectFromKey to support running Job in data clusters jobHandler := handler.NewBuilder(ctx).AddFinder(delegatorFinder).Build() - b := intctrlutil.NewNamespacedControllerManagedBy(mgr). + b := intctrlutil.NewControllerManagedBy(mgr, &workloads.InstanceSet{}). For(&workloads.InstanceSet{}). WithOptions(controller.Options{ MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers), diff --git a/pkg/constant/annotations.go b/pkg/constant/annotations.go index 17f73be3c34..a9b0d41c674 100644 --- a/pkg/constant/annotations.go +++ b/pkg/constant/annotations.go @@ -21,6 +21,9 @@ package constant // annotations defined by KubeBlocks const ( + // CRDAPIVersionAnnotationKey indicates the CRD API version of the object. + CRDAPIVersionAnnotationKey = "kubeblocks.io/crd-api-version" + ClusterSnapshotAnnotationKey = "kubeblocks.io/cluster-snapshot" // ClusterSnapshotAnnotationKey saves the snapshot of cluster. EncryptedSystemAccountsAnnotationKey = "kubeblocks.io/encrypted-system-accounts" // EncryptedSystemAccountsAnnotationKey saves the encrypted system accounts. OpsRequestAnnotationKey = "kubeblocks.io/ops-request" // OpsRequestAnnotationKey OpsRequest annotation key in Cluster diff --git a/pkg/controller/component/component.go b/pkg/controller/component/component.go index 43d2400990f..9678d516481 100644 --- a/pkg/controller/component/component.go +++ b/pkg/controller/component/component.go @@ -62,6 +62,7 @@ func BuildComponent(cluster *appsv1.Cluster, compSpec *appsv1.ClusterComponentSp } compBuilder := builder.NewComponentBuilder(cluster.Namespace, FullName(cluster.Name, compSpec.Name), compSpec.ComponentDef). AddAnnotations(constant.KubeBlocksGenerationKey, strconv.FormatInt(cluster.Generation, 10)). + AddAnnotations(constant.CRDAPIVersionAnnotationKey, appsv1.GroupVersion.String()). AddAnnotations(constant.KBAppClusterUIDKey, string(cluster.UID)). AddAnnotationsInMap(inheritedAnnotations(cluster)). AddAnnotationsInMap(annotations). // annotations added by the cluster controller diff --git a/pkg/controller/factory/builder.go b/pkg/controller/factory/builder.go index 424a630c438..61b4d4fea46 100644 --- a/pkg/controller/factory/builder.go +++ b/pkg/controller/factory/builder.go @@ -66,6 +66,7 @@ func BuildInstanceSet(synthesizedComp *component.SynthesizedComponent, component AddLabelsInMap(constant.GetCompLabels(clusterName, compName)). AddLabelsInMap(synthesizedComp.StaticLabels). AddAnnotations(constant.KubeBlocksGenerationKey, synthesizedComp.Generation). + AddAnnotations(constant.CRDAPIVersionAnnotationKey, workloads.GroupVersion.String()). AddAnnotationsInMap(map[string]string{ constant.AppComponentLabelKey: compDefName, constant.KBAppServiceVersionKey: synthesizedComp.ServiceVersion, diff --git a/pkg/controllerutil/predicate.go b/pkg/controllerutil/predicate.go index 360e86e27ce..c59cb3a2c75 100644 --- a/pkg/controllerutil/predicate.go +++ b/pkg/controllerutil/predicate.go @@ -20,6 +20,7 @@ along with this program. If not, see . package controllerutil import ( + "reflect" "strings" "k8s.io/apimachinery/pkg/util/sets" @@ -29,17 +30,91 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" + appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" + workloadsv1 "github.com/apecloud/kubeblocks/apis/workloads/v1" "github.com/apecloud/kubeblocks/pkg/constant" viper "github.com/apecloud/kubeblocks/pkg/viperx" ) var ( - managedNamespaces *sets.Set[string] + // pkg reconciler resource sub-resources operation + // experimentalv1alpha1 NodeCountScalerReconciler NodeCountScaler corev1.Node w + // appsv1alpha1.Cluster w + // extensionsv1alpha1 AddonReconciler Addon batchv1.Job w + // corev1 EventReconciler Event + // workloadsv1alpha1 InstanceSetReconciler InstanceSet corev1.Pod w + // corev1.PersistentVolumeClaim o + // batchv1.Job o + // corev1.Service o + // corev1.ConfigMap o + // appsv1beta1 ConfigConstraintReconciler ConfigConstraint corev1.ConfigMap o + // appsv1alpha1 OpsRequestReconciler OpsRequest appsv1alpha1.Cluster w + // workloadsv1alpha1.InstanceSet w + // dpv1alpha1.Backup w + // corev1.PersistentVolumeClaim w + // corev1.Pod w + // batchv1.Job o + // dpv1alpha1.Restore o + // ReconfigureReconciler corev1.ConfigMap + // ConfigurationReconciler Configuration corev1.ConfigMap o + // ClusterReconciler Cluster appsv1alpha1.Component o + // corev1.Service o + // corev1.Secret o + // dpv1alpha1.BackupPolicy o + // dpv1alpha1.BackupSchedule o + // SystemAccountReconciler Cluster corev1.Secret o + // batchv1.Job w + // ComponentReconciler Component workloads.InstanceSet o + // corev1.Service o + // corev1.Secret o + // corev1.ConfigMap o + // dpv1alpha1.Backup o + // dpv1alpha1.Restore o + // corev1.PersistentVolumeClaim w + // batchv1.Job o + // appsv1alpha1.Configuration w + // rbacv1.ClusterRoleBinding o/w + // rbacv1.RoleBinding o/w + // corev1.ServiceAccount o/w + // BackupPolicyTemplateReconciler BackupPolicyTemplate appsv1alpha1.ComponentDefinition w + // ComponentClassReconciler ComponentClassDefinition + // ClusterVersionReconciler ClusterVersion + // ServiceDescriptorReconciler ServiceDescriptor + // ClusterDefinitionReconciler ClusterDefinition + // OpsDefinitionReconciler OpsDefinition + // ComponentDefinitionReconciler ComponentDefinition + // ComponentVersionReconciler ComponentVersion appsv1alpha1.ComponentDefinition w + // + // has new version: - filter by api version label/annotation + // addon: ClusterDefinition, ComponentDefinition, ComponentVersion, BackupPolicyTemplate + // user:ServiceDescriptor, Cluster + // controller: Component, InstanceSet + // unchanged:NodeCountScaler, Addon - the new operator will be responsible for these + // deleted:ClusterVersion, ComponentClassDefinition - nothing to do + // group changed:OpsRequest, OpsDefinition, ConfigConstraint, Configuration - nothing to do + // TODO: + // EventReconciler.Event + + managedNamespaces *sets.Set[string] + supportedCRDAPIVersions = sets.New[string]( + // ClusterDefinition, ComponentDefinition, ComponentVersion, Cluster, Component + appsv1.GroupVersion.String(), + // InstanceSet + workloadsv1.GroupVersion.String(), + ) ) -func NewNamespacedControllerManagedBy(mgr manager.Manager) *builder.Builder { - return ctrl.NewControllerManagedBy(mgr). +func IsSupportedCRDAPIVersion(apiVersion string) bool { + return supportedCRDAPIVersions.Has(apiVersion) +} + +func NewControllerManagedBy(mgr manager.Manager, objs ...client.Object) *builder.Builder { + b := ctrl.NewControllerManagedBy(mgr). WithEventFilter(predicate.NewPredicateFuncs(namespacePredicateFilter)) + if len(objs) > 0 { + b.WithEventFilter(predicate.NewPredicateFuncs(newAPIVersionPredicateFilter(objs))) + } + return b } func namespacePredicateFilter(object client.Object) bool { @@ -56,3 +131,27 @@ func namespacePredicateFilter(object client.Object) bool { } return managedNamespaces.Has(object.GetNamespace()) } + +func newAPIVersionPredicateFilter(objs []client.Object) func(client.Object) bool { + return func(obj client.Object) bool { + annotations := obj.GetAnnotations() + if annotations != nil { + apiVersion, ok := annotations[constant.CRDAPIVersionAnnotationKey] + if ok { + return IsSupportedCRDAPIVersion(apiVersion) + } + } + switch reflect.TypeOf(obj) { + case reflect.TypeOf(&appsv1.Cluster{}): + return true + case reflect.TypeOf(&appsv1.ClusterDefinition{}), + reflect.TypeOf(&appsv1.ComponentDefinition{}), + reflect.TypeOf(&appsv1.ComponentVersion{}), + reflect.TypeOf(&appsv1.Component{}), + reflect.TypeOf(&workloadsv1.InstanceSet{}): + return false + default: + return true + } + } +} From 517105022ceea76b8a24067fce8092a1145ed3a9 Mon Sep 17 00:00:00 2001 From: Leon Date: Fri, 29 Nov 2024 11:49:23 +0800 Subject: [PATCH 2/3] fix test --- pkg/testutil/apps/clusterdef_factory.go | 2 ++ pkg/testutil/apps/componentdefinition_factory.go | 2 ++ pkg/testutil/apps/componentversion_factory.go | 2 ++ 3 files changed, 6 insertions(+) diff --git a/pkg/testutil/apps/clusterdef_factory.go b/pkg/testutil/apps/clusterdef_factory.go index bb8e6d224aa..10a4f7dc493 100644 --- a/pkg/testutil/apps/clusterdef_factory.go +++ b/pkg/testutil/apps/clusterdef_factory.go @@ -21,6 +21,7 @@ package apps import ( appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" + "github.com/apecloud/kubeblocks/pkg/constant" ) type MockClusterDefFactory struct { @@ -33,6 +34,7 @@ func NewClusterDefFactory(name string) *MockClusterDefFactory { &appsv1.ClusterDefinition{ Spec: appsv1.ClusterDefinitionSpec{}, }, f) + f.AddAnnotations(constant.CRDAPIVersionAnnotationKey, appsv1.GroupVersion.String()) return f } diff --git a/pkg/testutil/apps/componentdefinition_factory.go b/pkg/testutil/apps/componentdefinition_factory.go index 0d8e586512f..8871c693202 100644 --- a/pkg/testutil/apps/componentdefinition_factory.go +++ b/pkg/testutil/apps/componentdefinition_factory.go @@ -28,6 +28,7 @@ import ( rbacv1 "k8s.io/api/rbac/v1" kbappsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" + "github.com/apecloud/kubeblocks/pkg/constant" ) type MockComponentDefinitionFactory struct { @@ -49,6 +50,7 @@ func NewComponentDefinitionFactoryExt(name, provider, description, serviceKind, ServiceVersion: serviceVersion, }, }, f) + f.AddAnnotations(constant.CRDAPIVersionAnnotationKey, kbappsv1.GroupVersion.String()) return f } diff --git a/pkg/testutil/apps/componentversion_factory.go b/pkg/testutil/apps/componentversion_factory.go index b025f7ad640..74ee27e0850 100644 --- a/pkg/testutil/apps/componentversion_factory.go +++ b/pkg/testutil/apps/componentversion_factory.go @@ -21,6 +21,7 @@ package apps import ( appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" + "github.com/apecloud/kubeblocks/pkg/constant" ) type MockComponentVersionFactory struct { @@ -35,6 +36,7 @@ func NewComponentVersionFactory(name string) *MockComponentVersionFactory { Releases: []appsv1.ComponentVersionRelease{}, }, }, f) + f.AddAnnotations(constant.CRDAPIVersionAnnotationKey, appsv1.GroupVersion.String()) return f } From c88da1b29276a09536983a688a5c0370238c411e Mon Sep 17 00:00:00 2001 From: Leon Date: Fri, 29 Nov 2024 11:52:51 +0800 Subject: [PATCH 3/3] update --- .../apps/clusterdefinition_controller.go | 14 ++++++++------ .../apps/transformer_cluster_normalization.go | 14 ++------------ .../apps/transformer_cluster_validation.go | 5 ----- .../workloads/instanceset_controller_test.go | 1 + pkg/controllerutil/predicate.go | 19 ++++++++----------- 5 files changed, 19 insertions(+), 34 deletions(-) diff --git a/controllers/apps/clusterdefinition_controller.go b/controllers/apps/clusterdefinition_controller.go index 5ab3d9300f1..d498dfa4cf5 100644 --- a/controllers/apps/clusterdefinition_controller.go +++ b/controllers/apps/clusterdefinition_controller.go @@ -367,12 +367,14 @@ func defaultClusterTopology(clusterDef *appsv1.ClusterDefinition) *appsv1.Cluste // referredClusterTopology returns the cluster topology which has name @name. func referredClusterTopology(clusterDef *appsv1.ClusterDefinition, name string) *appsv1.ClusterTopology { - if len(name) == 0 { - return defaultClusterTopology(clusterDef) - } - for i, topology := range clusterDef.Spec.Topologies { - if topology.Name == name { - return &clusterDef.Spec.Topologies[i] + if clusterDef != nil { + if len(name) == 0 { + return defaultClusterTopology(clusterDef) + } + for i, topology := range clusterDef.Spec.Topologies { + if topology.Name == name { + return &clusterDef.Spec.Topologies[i] + } } } return nil diff --git a/controllers/apps/transformer_cluster_normalization.go b/controllers/apps/transformer_cluster_normalization.go index e6d61493ddd..b842e6aa4ed 100644 --- a/controllers/apps/transformer_cluster_normalization.go +++ b/controllers/apps/transformer_cluster_normalization.go @@ -414,19 +414,9 @@ func (t *clusterNormalizationTransformer) writeBackCompNShardingSpecs(transCtx * func (t *clusterNormalizationTransformer) patchCRDAPIVersionKey(transCtx *clusterTransformContext) error { apiVersions := map[string][]string{} - add := func(k, v string) { - if _, ok := apiVersions[k]; !ok { - apiVersions[k] = []string{} - } - apiVersions[k] = append(apiVersions[k], v) - } - from := func(name string, annotations map[string]string) { - if annotations == nil { - add("", name) - } else { - add(annotations[constant.CRDAPIVersionAnnotationKey], name) - } + key := annotations[constant.CRDAPIVersionAnnotationKey] + apiVersions[key] = append(apiVersions[key], name) } if transCtx.clusterDef != nil { diff --git a/controllers/apps/transformer_cluster_validation.go b/controllers/apps/transformer_cluster_validation.go index ebbb0a97de0..d90ea50de80 100644 --- a/controllers/apps/transformer_cluster_validation.go +++ b/controllers/apps/transformer_cluster_validation.go @@ -146,7 +146,6 @@ func loadNCheckClusterDefinition(transCtx *clusterTransformContext, cluster *app return err } } - if cd != nil { if cd.Generation != cd.Status.ObservedGeneration { return fmt.Errorf("the referenced ClusterDefinition is not up to date: %s", cd.Name) @@ -155,10 +154,6 @@ func loadNCheckClusterDefinition(transCtx *clusterTransformContext, cluster *app return fmt.Errorf("the referenced ClusterDefinition is unavailable: %s", cd.Name) } } - - if cd == nil { - cd = &appsv1.ClusterDefinition{} - } transCtx.clusterDef = cd return nil } diff --git a/controllers/workloads/instanceset_controller_test.go b/controllers/workloads/instanceset_controller_test.go index a372d660120..48901cb0ee6 100644 --- a/controllers/workloads/instanceset_controller_test.go +++ b/controllers/workloads/instanceset_controller_test.go @@ -68,6 +68,7 @@ var _ = Describe("InstanceSet Controller", func() { } its := builder.NewInstanceSetBuilder(testCtx.DefaultNamespace, name). AddMatchLabelsInMap(commonLabels). + AddAnnotations(constant.CRDAPIVersionAnnotationKey, workloads.GroupVersion.String()). SetTemplate(template). AddCustomHandler(action). GetObject() diff --git a/pkg/controllerutil/predicate.go b/pkg/controllerutil/predicate.go index c59cb3a2c75..d65b0073118 100644 --- a/pkg/controllerutil/predicate.go +++ b/pkg/controllerutil/predicate.go @@ -141,17 +141,14 @@ func newAPIVersionPredicateFilter(objs []client.Object) func(client.Object) bool return IsSupportedCRDAPIVersion(apiVersion) } } - switch reflect.TypeOf(obj) { - case reflect.TypeOf(&appsv1.Cluster{}): - return true - case reflect.TypeOf(&appsv1.ClusterDefinition{}), - reflect.TypeOf(&appsv1.ComponentDefinition{}), - reflect.TypeOf(&appsv1.ComponentVersion{}), - reflect.TypeOf(&appsv1.Component{}), - reflect.TypeOf(&workloadsv1.InstanceSet{}): - return false - default: - return true + if reflect.TypeOf(obj) == reflect.TypeOf(&appsv1.Cluster{}) { + return true // to resolve the CRD API version of the cluster } + for _, wobj := range objs { + if reflect.TypeOf(obj) == reflect.TypeOf(wobj) { + return false // watched objects, but has no CRD API version, it may be the old version + } + } + return true } }