From f05c19bd973ba232e14723019617303d3dcadf0a Mon Sep 17 00:00:00 2001 From: Xieql Date: Thu, 12 Oct 2023 15:10:01 +0800 Subject: [PATCH] backup: init backup controller Signed-off-by: Xieql --- cmd/fleet-manager/backup/backup.go | 41 +++ cmd/fleet-manager/main.go | 7 +- .../charts/fleet-manager/templates/rbac.yaml | 10 + pkg/client/client.go | 38 ++- pkg/fleet-manager/backup_controller.go | 291 +++++++++++++++++ .../backup_restore_migrate_shared.go | 306 ++++++++++++++++++ 6 files changed, 685 insertions(+), 8 deletions(-) create mode 100644 cmd/fleet-manager/backup/backup.go create mode 100644 pkg/fleet-manager/backup_controller.go create mode 100644 pkg/fleet-manager/backup_restore_migrate_shared.go diff --git a/cmd/fleet-manager/backup/backup.go b/cmd/fleet-manager/backup/backup.go new file mode 100644 index 000000000..aa27a1e1f --- /dev/null +++ b/cmd/fleet-manager/backup/backup.go @@ -0,0 +1,41 @@ +/* +Copyright Kurator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package backup + +import ( + "context" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller" + + "kurator.dev/kurator/cmd/fleet-manager/options" + fleet "kurator.dev/kurator/pkg/fleet-manager" +) + +var log = ctrl.Log.WithName("backup") + +func InitControllers(ctx context.Context, opts *options.Options, mgr ctrl.Manager) error { + if err := (&fleet.BackupManager{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(ctx, mgr, controller.Options{MaxConcurrentReconciles: opts.Concurrency, RecoverPanic: true}); err != nil { + log.Error(err, "unable to create controller", "controller", "Backup") + return err + } + + return nil +} diff --git a/cmd/fleet-manager/main.go b/cmd/fleet-manager/main.go index 082ccdc90..837b43c0b 100644 --- a/cmd/fleet-manager/main.go +++ b/cmd/fleet-manager/main.go @@ -35,6 +35,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "kurator.dev/kurator/cmd/fleet-manager/application" + "kurator.dev/kurator/cmd/fleet-manager/backup" "kurator.dev/kurator/cmd/fleet-manager/options" "kurator.dev/kurator/cmd/fleet-manager/scheme" fleet "kurator.dev/kurator/pkg/fleet-manager" @@ -138,7 +139,11 @@ func run(ctx context.Context, opts *options.Options) error { } if err = application.InitControllers(ctx, opts, mgr); err != nil { - return fmt.Errorf("application init fail, %w", err) + return fmt.Errorf("application init controllers fail, %w", err) + } + + if err = backup.InitControllers(ctx, opts, mgr); err != nil { + return fmt.Errorf("backup init controllers fail, %w", err) } log.Info("starting manager", "version", version.Get().String()) diff --git a/manifests/charts/fleet-manager/templates/rbac.yaml b/manifests/charts/fleet-manager/templates/rbac.yaml index b2b13f966..7c5271c2e 100644 --- a/manifests/charts/fleet-manager/templates/rbac.yaml +++ b/manifests/charts/fleet-manager/templates/rbac.yaml @@ -23,6 +23,16 @@ rules: - patch - update - watch + - apiGroups: + - backup.kurator.dev + resources: + - '*' + verbs: + - get + - list + - patch + - update + - watch - apiGroups: - kustomize.toolkit.fluxcd.io resources: diff --git a/pkg/client/client.go b/pkg/client/client.go index dd2bb1d1d..65024539b 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -27,13 +27,17 @@ import ( karmadaclientset "github.com/karmada-io/karmada/pkg/generated/clientset/versioned" promclient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned" "github.com/sirupsen/logrus" + veleroapi "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" helmclient "helm.sh/helm/v3/pkg/kube" + corev1 "k8s.io/api/core/v1" crdclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/cli-runtime/pkg/genericclioptions" kubeclient "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" ) @@ -42,8 +46,9 @@ type Client struct { crd crdclientset.Interface helm *helmclient.Client - karmada karmadaclientset.Interface - prom promclient.Interface + karmada karmadaclientset.Interface + prom promclient.Interface + ctrlRuntimeClient client.Client } func NewClient(rest genericclioptions.RESTClientGetter) (*Client, error) { @@ -58,12 +63,27 @@ func NewClient(rest genericclioptions.RESTClientGetter) (*Client, error) { karmadaClient := karmadaclientset.NewForConfigOrDie(c) promClient := promclient.NewForConfigOrDie(c) + // create Scheme to add velero resource + scheme := runtime.NewScheme() + if err := corev1.AddToScheme(scheme); err != nil { + return nil, fmt.Errorf("failed to add corev1 to scheme: %v", err) + } + if err := veleroapi.AddToScheme(scheme); err != nil { + return nil, fmt.Errorf("failed to add veleroapi to scheme: %v", err) + } + // create controller-runtime client with scheme + ctrlRuntimeClient, err := client.New(c, client.Options{Scheme: scheme}) + if err != nil { + return nil, err + } + return &Client{ - kube: kubeClient, - helm: helmClient, - crd: crdClientSet, - karmada: karmadaClient, - prom: promClient, + kube: kubeClient, + helm: helmClient, + crd: crdClientSet, + karmada: karmadaClient, + prom: promClient, + ctrlRuntimeClient: ctrlRuntimeClient, }, nil } @@ -179,3 +199,7 @@ func (c *Client) NewClusterHelmClient(clusterName string) (helmclient.Interface, clusterGetter := NewRESTClientGetter(clusterConfig) return helmclient.New(clusterGetter), nil } + +func (c *Client) CtrlRuntimeClient() client.Client { + return c.ctrlRuntimeClient +} diff --git a/pkg/fleet-manager/backup_controller.go b/pkg/fleet-manager/backup_controller.go new file mode 100644 index 000000000..22edf0937 --- /dev/null +++ b/pkg/fleet-manager/backup_controller.go @@ -0,0 +1,291 @@ +/* +Copyright Kurator Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fleet + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "sigs.k8s.io/cluster-api/util/patch" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + backupapi "kurator.dev/kurator/pkg/apis/backups/v1alpha1" +) + +// BackupManager reconciles a Backup object +type BackupManager struct { + client.Client + Scheme *runtime.Scheme +} + +// SetupWithManager sets up the controller with the Manager. +func (b *BackupManager) SetupWithManager(ctx context.Context, mgr ctrl.Manager, options controller.Options) error { + return ctrl.NewControllerManagedBy(mgr). + For(&backupapi.Backup{}). + WithOptions(options). + Complete(b) +} + +func (b *BackupManager) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) { + log := ctrl.LoggerFrom(ctx) + backup := &backupapi.Backup{} + + if err := b.Client.Get(ctx, req.NamespacedName, backup); err != nil { + if apierrors.IsNotFound(err) { + log.Info("backup object not found", "backup", req) + return ctrl.Result{}, nil + } + + // Error reading the object - requeue the request. + return ctrl.Result{}, err + } + + // Initialize patch helper + patchHelper, err := patch.NewHelper(backup, b.Client) + if err != nil { + return ctrl.Result{}, errors.Wrapf(err, "failed to init patch helper for backup %s", req.NamespacedName) + } + // Setup deferred function to handle patching the object at the end of the reconciler + defer func() { + patchOpts := []patch.Option{} + if err := patchHelper.Patch(ctx, backup, patchOpts...); err != nil { + reterr = utilerrors.NewAggregate([]error{reterr, errors.Wrapf(err, "failed to patch %s %s", backup.Name, req.NamespacedName)}) + } + }() + + // Check and add finalizer if not present + if !controllerutil.ContainsFinalizer(backup, BackupFinalizer) { + controllerutil.AddFinalizer(backup, BackupFinalizer) + return ctrl.Result{}, nil + } + + // Handle deletion + if backup.GetDeletionTimestamp() != nil { + return b.reconcileDeleteBackup(ctx, backup) + } + + // Handle the main reconcile logic + return b.reconcileBackup(ctx, backup) +} + +// reconcileBackup handles the main reconcile logic for a Backup object. +func (b *BackupManager) reconcileBackup(ctx context.Context, backup *backupapi.Backup) (ctrl.Result, error) { + log := ctrl.LoggerFrom(ctx) + + // Fetch destination clusters + destinationClusters, err := fetchDestinationClusters(ctx, b.Client, backup.Namespace, backup.Spec.Destination) + if err != nil { + log.Error(err, "failed to fetch destination clusters for backup", "backupName", backup.Name) + return ctrl.Result{}, err + } + // Apply velero backup resource in target clusters + result, err := b.reconcileBackupResources(ctx, backup, destinationClusters) + if err != nil || result.Requeue || result.RequeueAfter > 0 { + return result, err + } + // Collect velero backup resource status to current backup + return b.reconcileBackupStatus(ctx, backup, destinationClusters) +} + +// reconcileBackupResources converts the backup resources into velero backup resources that can be used by Velero on the target clusters, and applies each of these backup resources to the respective target clusters. +func (b *BackupManager) reconcileBackupResources(ctx context.Context, backup *backupapi.Backup, destinationClusters map[ClusterKey]*fleetCluster) (ctrl.Result, error) { + log := ctrl.LoggerFrom(ctx) + + backupLabel := generateVeleroInstanceLabel(BackupNameLabel, backup.Name, backup.Spec.Destination.Fleet) + + if isScheduleBackup(backup) { + // Handle scheduled backups + for clusterKey, clusterAccess := range destinationClusters { + veleroScheduleName := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Name) + veleroSchedule := buildVeleroScheduleInstance(&backup.Spec, backupLabel, veleroScheduleName) + if err := syncVeleroObj(ctx, clusterKey, clusterAccess, veleroSchedule); err != nil { + log.Error(err, "failed to create velero schedule instance for backup", "backupName", backup.Name) + return ctrl.Result{}, err + } + } + } else { + // Handle one time backups + for clusterKey, clusterAccess := range destinationClusters { + veleroBackupName := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Name) + veleroBackup := buildVeleroBackupInstance(&backup.Spec, backupLabel, veleroBackupName) + if err := syncVeleroObj(ctx, clusterKey, clusterAccess, veleroBackup); err != nil { + log.Error(err, "failed to create velero backup instance for backup", "backupName", backup.Name) + return ctrl.Result{}, err + } + } + } + return ctrl.Result{}, nil +} + +// reconcileBackupStatus updates the synchronization status of each backup resource. +func (b *BackupManager) reconcileBackupStatus(ctx context.Context, backup *backupapi.Backup, destinationClusters map[ClusterKey]*fleetCluster) (ctrl.Result, error) { + // Initialize a map to store the status of each cluster currently recorded. The combination of detail.ClusterName, detail.ClusterKind, and detail.BackupNameInCluster uniquely identifies a Velero backup object. + statusMap := make(map[string]*backupapi.BackupDetails) + for _, detail := range backup.Status.Details { + key := fmt.Sprintf("%s-%s-%s", detail.ClusterName, detail.ClusterKind, detail.BackupNameInCluster) + statusMap[key] = detail + } + if isScheduleBackup(backup) { + return b.reconcileScheduleBackupStatus(ctx, backup, destinationClusters, statusMap) + } else { + return b.reconcileOneTimeBackupStatus(ctx, backup, destinationClusters, statusMap) + } +} + +// reconcileOneTimeBackupStatus updates the status of a one-time Backup object by checking the status of corresponding Velero backup resources in each target cluster. +// It determines whether to requeue the reconciliation based on the completion status of all Velero backup resources. +func (b *BackupManager) reconcileOneTimeBackupStatus(ctx context.Context, backup *backupapi.Backup, destinationClusters map[ClusterKey]*fleetCluster, statusMap map[string]*backupapi.BackupDetails) (ctrl.Result, error) { + log := ctrl.LoggerFrom(ctx) + + // Loop through each target cluster to retrieve the status of Velero backup resources using the client associated with the respective target cluster. + for clusterKey, clusterAccess := range destinationClusters { + name := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Name) + veleroBackup := &velerov1.Backup{} + + // Use the client of the target cluster to get the status of Velero backup resources + err := getResourceFromClusterClient(ctx, name, VeleroNamespace, *clusterAccess, veleroBackup) + if err != nil { + log.Error(err, "failed to create velero backup instance for sync one time backup status", "backupName", backup.Name) + return ctrl.Result{}, err + } + + key := fmt.Sprintf("%s-%s-%s", clusterKey.Name, clusterKey.Kind, veleroBackup.Name) + if detail, exists := statusMap[key]; exists { + // If a matching entry is found, update the existing BackupDetails object with the new status. + detail.BackupStatusInCluster = &veleroBackup.Status + } else { + // If no matching entry is found, create a new BackupDetails object and append it to the backup's status details. + currentBackupDetails := &backupapi.BackupDetails{ + ClusterName: clusterKey.Name, + ClusterKind: clusterKey.Kind, + BackupNameInCluster: veleroBackup.Name, + BackupStatusInCluster: &veleroBackup.Status, + } + backup.Status.Details = append(backup.Status.Details, currentBackupDetails) + } + } + + // Determine whether to requeue the reconciliation based on the completion status of all Velero backup resources. + // If all backups are complete, exit directly without requeuing. + // Otherwise, requeue the reconciliation after a specified interval (StatusSyncInterval). + if allBackupsCompleted(backup.Status) { + return ctrl.Result{}, nil + } else { + return ctrl.Result{RequeueAfter: StatusSyncInterval}, nil + } +} + +// reconcileScheduleBackupStatus manages the status synchronization for scheduled Backup objects. +// If the backup type is "schedule", new backups will be continuously generated, hence the status synchronization will be executed continuously. +func (b *BackupManager) reconcileScheduleBackupStatus(ctx context.Context, schedule *backupapi.Backup, destinationClusters map[ClusterKey]*fleetCluster, statusMap map[string]*backupapi.BackupDetails) (ctrl.Result, error) { + log := ctrl.LoggerFrom(ctx) + + // Loop through each target cluster to retrieve the status of Velero backup created by schedule resources using the client associated with the respective target cluster. + for clusterKey, clusterAccess := range destinationClusters { + name := generateVeleroResourceName(clusterKey.Name, BackupKind, schedule.Name) + veleroSchedule := &velerov1.Schedule{} + // Use the client of the target cluster to get the status of Velero backup resources + err := getResourceFromClusterClient(ctx, name, VeleroNamespace, *clusterAccess, veleroSchedule) + if err != nil { + log.Error(err, "Unable to get velero schedule", "scheduleName", name) + return ctrl.Result{}, err + } + + // Fetch all velero backups created by velero schedule + backupList := &velerov1.BackupList{} + listErr := listResourcesFromClusterClient(ctx, VeleroNamespace, velerov1.ScheduleNameLabel, veleroSchedule.Name, *clusterAccess, backupList) + if listErr != nil { + log.Info("Unable to list velero backups for velero schedule", "scheduleName", veleroSchedule.Name) + return ctrl.Result{}, listErr + } + + // Fetch most recent completed backup + veleroBackup := MostRecentCompletedBackup(backupList.Items) + if len(veleroBackup.Name) == 0 { + // If a schedule backup record cannot be found, the potential reasons are: + // 1. The backup task hasn't been triggered by schedule. + // 2. An issue occurred, but we can not get information directly from the status of schedules.velero.io + log.Info("No completed backups found for schedule", "scheduleName", veleroSchedule.Name) + } + + // Sync schedule backup status with most recent complete backup + key := fmt.Sprintf("%s-%s-%s", clusterKey.Name, clusterKey.Kind, veleroBackup.Name) + if detail, exists := statusMap[key]; exists { + // If a matching entry is found, update the existing BackupDetails object with the new status. + detail.BackupStatusInCluster = &veleroBackup.Status + } else { + // If no matching entry is found, create a new BackupDetails object and append it to the schedule's status details. + currentBackupDetails := &backupapi.BackupDetails{ + ClusterName: clusterKey.Name, + ClusterKind: clusterKey.Kind, + BackupNameInCluster: veleroBackup.Name, + BackupStatusInCluster: &veleroBackup.Status, + } + schedule.Status.Details = append(schedule.Status.Details, currentBackupDetails) + } + } + + if allBackupsCompleted(schedule.Status) { + // Get the next reconcile interval + cronInterval, err := GetCronInterval(schedule.Spec.Schedule) + if err != nil { + log.Error(err, "failed to get cron Interval of backup.spec.schedule", "backupName", schedule.Name, "cronExpression", schedule.Spec.Schedule) + return ctrl.Result{}, err + } + // If all backups are complete,requeue the reconciliation after a long cronInterval. + return ctrl.Result{RequeueAfter: cronInterval}, nil + } + // If not all backups are complete, requeue the reconciliation after a short StatusSyncInterval. + return ctrl.Result{RequeueAfter: StatusSyncInterval}, nil +} + +// reconcileDeleteBackup handles the deletion process of a Backup object. +func (b *BackupManager) reconcileDeleteBackup(ctx context.Context, backup *backupapi.Backup) (ctrl.Result, error) { + log := ctrl.LoggerFrom(ctx) + + // Fetch backup destination clusters + destinationClusters, err := fetchDestinationClusters(ctx, b.Client, backup.Namespace, backup.Spec.Destination) + if err != nil { + log.Error(err, "failed to fetch destination clusters when delete backup", "backupName", backup.Name) + controllerutil.RemoveFinalizer(backup, BackupFinalizer) + log.Info("Removed finalizer due to fetch destination clusters error", "backupName", backup.Name) + return ctrl.Result{}, err + } + + var objList client.ObjectList + if isScheduleBackup(backup) { + objList = &velerov1.ScheduleList{} + } else { + objList = &velerov1.BackupList{} + } + + // Delete all related velero schedule or backup instance + if err := deleteResourcesInClusters(ctx, VeleroNamespace, BackupNameLabel, backup.Name, destinationClusters, objList); err != nil { + log.Error(err, "failed to delete velero schedule or backup Instances when delete backup", "backupName", backup.Name) + return ctrl.Result{}, err + } + + // Remove finalizer + controllerutil.RemoveFinalizer(backup, BackupFinalizer) + + return ctrl.Result{}, nil +} diff --git a/pkg/fleet-manager/backup_restore_migrate_shared.go b/pkg/fleet-manager/backup_restore_migrate_shared.go new file mode 100644 index 000000000..b25f3bbd5 --- /dev/null +++ b/pkg/fleet-manager/backup_restore_migrate_shared.go @@ -0,0 +1,306 @@ +/* +Copyright Kurator Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fleet + +import ( + "context" + "fmt" + "github.com/robfig/cron" + "reflect" + ctrl "sigs.k8s.io/controller-runtime" + "sort" + "time" + + velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + backupapi "kurator.dev/kurator/pkg/apis/backups/v1alpha1" + fleetapi "kurator.dev/kurator/pkg/apis/fleet/v1alpha1" + "kurator.dev/kurator/pkg/fleet-manager/plugin" +) + +const ( + // BackupNameLabel is the label key used to identify a schedule by name. + BackupNameLabel = "kurator.dev/backup-name" + // RestoreNameLabel is the label key used to identify a restore by name. + RestoreNameLabel = "kurator.dev/restore-name" + // MigrateNameLabel is the label key used to identify a migrate by name. + MigrateNameLabel = "kurator.dev/migrate-name" + + BackupKind = "backup" + RestoreKind = "restore" + MigrateKind = "migrate" + + // VeleroNamespace defines the default namespace where all Velero resources are created. It's a constant namespace used by Velero. + VeleroNamespace = "velero" + + BackupFinalizer = "backup.kurator.dev" + RestoreFinalizer = "restore.kurator.dev" + MigrateFinalizer = "migrate.kurator.dev" + + // StatusSyncInterval specifies the interval for requeueing when synchronizing status. It determines how frequently the status should be checked and updated. + StatusSyncInterval = 30 * time.Second +) + +// fetchDestinationClusters retrieves the clusters from the specified destination and filters them based on the 'Clusters' defined in the destination. It returns a map of selected clusters along with any error encountered during the process. +func fetchDestinationClusters(ctx context.Context, kubeClient client.Client, namespace string, destination backupapi.Destination) (map[ClusterKey]*fleetCluster, error) { + // Fetch fleet instance + fleet := &fleetapi.Fleet{} + fleetKey := client.ObjectKey{ + Namespace: namespace, + Name: destination.Fleet, + } + if err := kubeClient.Get(ctx, fleetKey, fleet); err != nil { + return nil, fmt.Errorf("failed to retrieve fleet instance '%s' in namespace '%s': %w", destination.Fleet, namespace, err) + } + + fleetClusters, err := buildFleetClusters(ctx, kubeClient, fleet) + if err != nil { + return nil, fmt.Errorf("failed to build fleet clusters from fleet instance '%s': %w", fleet.Name, err) + } + + // If no destination.Clusters defined, return all clusters + if len(destination.Clusters) == 0 { + return fleetClusters, nil + } + + selectedFleetCluster := make(map[ClusterKey]*fleetCluster) + // Check and return clusters with client + for _, cluster := range destination.Clusters { + name := cluster.Name + kind := cluster.Kind + found := false + for key, f := range fleetClusters { + if key.Name == name && key.Kind == kind { + found = true + selectedFleetCluster[key] = f + } + } + if !found { + return nil, fmt.Errorf("current clusters: clustername: %s, clusterKind: %s in destination is not recorede in the fleet: %s", name, kind, fleet.Name) + } + } + return selectedFleetCluster, nil +} + +// buildVeleroBackupInstance constructs a Velero Backup instance configured to perform a backup operation on the specified cluster. +func buildVeleroBackupInstance(backupSpec *backupapi.BackupSpec, labels map[string]string, veleroBackupName string) *velerov1.Backup { + veleroBackup := &velerov1.Backup{ + ObjectMeta: generateVeleroResourceObjectMeta(veleroBackupName, labels), + Spec: buildVeleroBackupSpec(backupSpec.Policy), + } + return veleroBackup +} + +// buildVeleroScheduleInstance constructs a Velero Schedule instance configured to schedule backup operations on the specified cluster. +func buildVeleroScheduleInstance(backupSpec *backupapi.BackupSpec, labels map[string]string, veleroBackupName string) *velerov1.Schedule { + veleroSchedule := &velerov1.Schedule{ + ObjectMeta: generateVeleroResourceObjectMeta(veleroBackupName, labels), + Spec: velerov1.ScheduleSpec{ + Template: buildVeleroBackupSpec(backupSpec.Policy), + Schedule: backupSpec.Schedule, + }, + } + return veleroSchedule +} + +func buildVeleroBackupSpec(backupPolicy *backupapi.BackupPolicy) velerov1.BackupSpec { + if backupPolicy == nil { + return velerov1.BackupSpec{} + } + return velerov1.BackupSpec{ + TTL: backupPolicy.TTL, + OrderedResources: backupPolicy.OrderedResources, + IncludedNamespaces: backupPolicy.ResourceFilter.IncludedNamespaces, + ExcludedNamespaces: backupPolicy.ResourceFilter.ExcludedNamespaces, + IncludedResources: backupPolicy.ResourceFilter.IncludedResources, + ExcludedResources: backupPolicy.ResourceFilter.ExcludedResources, + IncludeClusterResources: backupPolicy.ResourceFilter.IncludeClusterResources, + IncludedClusterScopedResources: backupPolicy.ResourceFilter.IncludedClusterScopedResources, + ExcludedClusterScopedResources: backupPolicy.ResourceFilter.ExcludedClusterScopedResources, + IncludedNamespaceScopedResources: backupPolicy.ResourceFilter.IncludedNamespaceScopedResources, + ExcludedNamespaceScopedResources: backupPolicy.ResourceFilter.ExcludedNamespaceScopedResources, + LabelSelector: backupPolicy.ResourceFilter.LabelSelector, + OrLabelSelectors: backupPolicy.ResourceFilter.OrLabelSelectors, + } +} + +func syncVeleroObj(ctx context.Context, clusterKey ClusterKey, cluster *fleetCluster, veleroObj client.Object) error { + // Get the client + clusterClient := cluster.client.CtrlRuntimeClient() + + // create or update veleroRestore + _, syncErr := controllerutil.CreateOrUpdate(ctx, clusterClient, veleroObj, func() error { + // the veleroObj already contains the desired state, and there's no need for any additional modifications in this mutateFn. + return nil + }) + + return syncErr +} + +// allBackupsCompleted checks if all Velero backup statuses in the cluster have been collected and verifies if they have successfully completed. +// If every BackupStatusInCluster.Phase is marked as completed, it indicates that all backups have been successfully completed. +func allBackupsCompleted(status backupapi.BackupStatus) bool { + for _, detail := range status.Details { + if detail.BackupStatusInCluster == nil || detail.BackupStatusInCluster.Phase != velerov1.BackupPhaseCompleted { + return false + } + } + return true +} + +// deleteResourcesInClusters deletes instances of a Kubernetes resource based on the specified label key and value. +// It iterates over all destination clusters and performs the deletion. +// Parameters: +// - ctx: context to carry out the API requests +// - labelKey: the key of the label to filter the resources +// - labelValue: the value of the label to filter the resources +// - destinationClusters: a map containing information about the destination clusters +// - objList: an empty instance of the resource list object to be filled with retrieved data. It should implement client.ObjectList. +// Returns: +// - error: if any error occurs during the deletion process +// deleteResourcesInClusters deletes instances of a Kubernetes resource based on the specified label key and value. +func deleteResourcesInClusters(ctx context.Context, namespace, labelKey string, labelValue string, destinationClusters map[ClusterKey]*fleetCluster, objList client.ObjectList) error { + // Log setup + log := ctrl.LoggerFrom(ctx) + + // Iterate over each destination cluster + for clusterKey, clusterAccess := range destinationClusters { + // List the resources using the helper function + if err := listResourcesFromClusterClient(ctx, namespace, labelKey, labelValue, *clusterAccess, objList); err != nil { + log.Error(err, "Failed to list resources in cluster", "ClusterName", clusterKey.Name, "Namespace", namespace, "LabelKey", labelKey, "LabelValue", labelValue) + return err + } + + // Extract Items using reflection + itemsValue := reflect.ValueOf(objList).Elem().FieldByName("Items") + if !itemsValue.IsValid() { + err := fmt.Errorf("failed to extract 'Items' from object list using reflection") + log.Error(err, "Reflection error") + return err + } + + for i := 0; i < itemsValue.Len(); i++ { + item := itemsValue.Index(i).Addr().Interface().(client.Object) + + clusterClient := clusterAccess.client.CtrlRuntimeClient() + + if err := clusterClient.Delete(ctx, item); err != nil && !apierrors.IsNotFound(err) { + log.Error(err, "Failed to delete resource in cluster", "ResourceName", item.GetName(), "ResourceNamespace", item.GetNamespace(), "ClusterName", clusterKey.Name) + return err + } + } + } + return nil +} + +// if backup.Spec.Schedule is set, then it is ScheduleBackup. otherwise, it is regular/ont-time backup. +func isScheduleBackup(backup *backupapi.Backup) bool { + return len(backup.Spec.Schedule) != 0 +} + +func generateVeleroInstanceLabel(createdByLabel, creatorName, fleetName string) map[string]string { + return map[string]string{ + createdByLabel: creatorName, + FleetLabel: fleetName, + FleetPluginName: plugin.BackupPluginName, + } +} + +func generateVeleroResourceObjectMeta(veleroResourceName string, labels map[string]string) metav1.ObjectMeta { + return metav1.ObjectMeta{ + Name: veleroResourceName, + Namespace: VeleroNamespace, + Labels: labels, + } +} + +func generateVeleroResourceName(clusterName, creatorKind, creatorName string) string { + return clusterName + "-" + creatorKind + "-" + creatorName +} + +// MostRecentCompletedBackup returns the most recent backup that's completed from a list of backups. +// origin from https://github.com/vmware-tanzu/velero/blob/release-1.12/pkg/controller/restore_controller.go +func MostRecentCompletedBackup(backups []velerov1.Backup) velerov1.Backup { + sort.Slice(backups, func(i, j int) bool { + var iStartTime, jStartTime time.Time + if backups[i].Status.StartTimestamp != nil { + iStartTime = backups[i].Status.StartTimestamp.Time + } + if backups[j].Status.StartTimestamp != nil { + jStartTime = backups[j].Status.StartTimestamp.Time + } + return iStartTime.After(jStartTime) + }) + + for _, backup := range backups { + if backup.Status.Phase == velerov1.BackupPhaseCompleted { + return backup + } + } + + return velerov1.Backup{} +} + +// GetCronInterval return the cron interval of a cron expression。 +func GetCronInterval(cronExpr string) (time.Duration, error) { + parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) + schedule, err := parser.Parse(cronExpr) + if err != nil { + return 0, err + } + + now := time.Now() + nextRun := schedule.Next(now) + nextNextRun := schedule.Next(nextRun) + + // Adding a 30-second delay to avoid timing issues. + // Without this delay, we risk checking just before a new backup starts, + // seeing the previous backup's "completed" status, and missing the new one. + const delay = 30 * time.Second + interval := nextNextRun.Sub(nextRun) + delay + + return interval, nil +} + +// getResourceFromClusterClient retrieves a specific Kubernetes resource from the provided cluster. +func getResourceFromClusterClient(ctx context.Context, name, namespace string, clusterAccess fleetCluster, obj client.Object) error { + clusterClient := clusterAccess.client.CtrlRuntimeClient() + + resourceKey := types.NamespacedName{ + Name: name, + Namespace: namespace, + } + return clusterClient.Get(ctx, resourceKey, obj) +} + +// listResourcesFromClusterClient retrieves resources from a cluster based on the provided namespace and label. +func listResourcesFromClusterClient(ctx context.Context, namespace string, labelKey string, labelValue string, clusterAccess fleetCluster, objList client.ObjectList) error { + // Create the cluster client + clusterClient := clusterAccess.client.CtrlRuntimeClient() + // Create the label selector + labelSelector := labels.Set(map[string]string{labelKey: labelValue}).AsSelector() + + // List the resources + opts := &client.ListOptions{ + Namespace: namespace, + LabelSelector: labelSelector, + } + return clusterClient.List(ctx, objList, opts) +}