diff --git a/cmd/fleet-manager/scheme/scheme.go b/cmd/fleet-manager/scheme/scheme.go index 67dba1d8a..3c0ad4bec 100644 --- a/cmd/fleet-manager/scheme/scheme.go +++ b/cmd/fleet-manager/scheme/scheme.go @@ -22,6 +22,7 @@ import ( kubescheme "k8s.io/client-go/kubernetes/scheme" applicationapi "kurator.dev/kurator/pkg/apis/apps/v1alpha1" + backupapi "kurator.dev/kurator/pkg/apis/backups/v1alpha1" clusterv1alpha1 "kurator.dev/kurator/pkg/apis/cluster/v1alpha1" fleetapi "kurator.dev/kurator/pkg/apis/fleet/v1alpha1" ) @@ -36,4 +37,5 @@ func init() { _ = clusterv1alpha1.AddToScheme(Scheme) _ = hrapiv2b1.AddToScheme(Scheme) _ = applicationapi.AddToScheme(Scheme) + _ = backupapi.AddToScheme(Scheme) } diff --git a/pkg/client/client.go b/pkg/client/client.go index 65024539b..8a3864358 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -46,8 +46,9 @@ type Client struct { crd crdclientset.Interface helm *helmclient.Client - karmada karmadaclientset.Interface - prom promclient.Interface + karmada karmadaclientset.Interface + prom promclient.Interface + // it currently only support k8s core API and velero API, because only these schemes are registered ctrlRuntimeClient client.Client } diff --git a/pkg/fleet-manager/backup_controller.go b/pkg/fleet-manager/backup_controller.go index 22edf0937..e7d8bfd61 100644 --- a/pkg/fleet-manager/backup_controller.go +++ b/pkg/fleet-manager/backup_controller.go @@ -16,7 +16,9 @@ package fleet import ( "context" "fmt" + "strings" + "github.com/go-logr/logr" "github.com/pkg/errors" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -35,6 +37,7 @@ import ( type BackupManager struct { client.Client Scheme *runtime.Scheme + logger logr.Logger } // SetupWithManager sets up the controller with the Manager. @@ -46,12 +49,13 @@ func (b *BackupManager) SetupWithManager(ctx context.Context, mgr ctrl.Manager, } func (b *BackupManager) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) { - log := ctrl.LoggerFrom(ctx) + b.logger = ctrl.LoggerFrom(ctx).WithValues("backup", req.NamespacedName) + backup := &backupapi.Backup{} if err := b.Client.Get(ctx, req.NamespacedName, backup); err != nil { if apierrors.IsNotFound(err) { - log.Info("backup object not found", "backup", req) + b.logger.Info("backup object not found", "backup", req) return ctrl.Result{}, nil } @@ -66,8 +70,7 @@ func (b *BackupManager) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl } // Setup deferred function to handle patching the object at the end of the reconciler defer func() { - patchOpts := []patch.Option{} - if err := patchHelper.Patch(ctx, backup, patchOpts...); err != nil { + if err := patchHelper.Patch(ctx, backup); err != nil { reterr = utilerrors.NewAggregate([]error{reterr, errors.Wrapf(err, "failed to patch %s %s", backup.Name, req.NamespacedName)}) } }() @@ -75,7 +78,6 @@ func (b *BackupManager) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl // Check and add finalizer if not present if !controllerutil.ContainsFinalizer(backup, BackupFinalizer) { controllerutil.AddFinalizer(backup, BackupFinalizer) - return ctrl.Result{}, nil } // Handle deletion @@ -89,17 +91,15 @@ func (b *BackupManager) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl // reconcileBackup handles the main reconcile logic for a Backup object. func (b *BackupManager) reconcileBackup(ctx context.Context, backup *backupapi.Backup) (ctrl.Result, error) { - log := ctrl.LoggerFrom(ctx) - // Fetch destination clusters destinationClusters, err := fetchDestinationClusters(ctx, b.Client, backup.Namespace, backup.Spec.Destination) if err != nil { - log.Error(err, "failed to fetch destination clusters for backup", "backupName", backup.Name) + b.logger.Error(err, "failed to fetch destination clusters for backup", "backupName", backup.Name) return ctrl.Result{}, err } // Apply velero backup resource in target clusters result, err := b.reconcileBackupResources(ctx, backup, destinationClusters) - if err != nil || result.Requeue || result.RequeueAfter > 0 { + if err != nil { return result, err } // Collect velero backup resource status to current backup @@ -108,31 +108,44 @@ func (b *BackupManager) reconcileBackup(ctx context.Context, backup *backupapi.B // reconcileBackupResources converts the backup resources into velero backup resources that can be used by Velero on the target clusters, and applies each of these backup resources to the respective target clusters. func (b *BackupManager) reconcileBackupResources(ctx context.Context, backup *backupapi.Backup, destinationClusters map[ClusterKey]*fleetCluster) (ctrl.Result, error) { - log := ctrl.LoggerFrom(ctx) - backupLabel := generateVeleroInstanceLabel(BackupNameLabel, backup.Name, backup.Spec.Destination.Fleet) + // Add tasks of syncVeleroObj func + var tasks []func() error if isScheduleBackup(backup) { // Handle scheduled backups for clusterKey, clusterAccess := range destinationClusters { - veleroScheduleName := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Name) + veleroScheduleName := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Namespace, backup.Name) veleroSchedule := buildVeleroScheduleInstance(&backup.Spec, backupLabel, veleroScheduleName) - if err := syncVeleroObj(ctx, clusterKey, clusterAccess, veleroSchedule); err != nil { - log.Error(err, "failed to create velero schedule instance for backup", "backupName", backup.Name) - return ctrl.Result{}, err - } + task := newSyncVeleroTaskFunc(ctx, clusterAccess, veleroSchedule) + tasks = append(tasks, task) } } else { // Handle one time backups for clusterKey, clusterAccess := range destinationClusters { - veleroBackupName := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Name) + veleroBackupName := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Namespace, backup.Name) veleroBackup := buildVeleroBackupInstance(&backup.Spec, backupLabel, veleroBackupName) - if err := syncVeleroObj(ctx, clusterKey, clusterAccess, veleroBackup); err != nil { - log.Error(err, "failed to create velero backup instance for backup", "backupName", backup.Name) - return ctrl.Result{}, err - } + task := newSyncVeleroTaskFunc(ctx, clusterAccess, veleroBackup) + tasks = append(tasks, task) } } + + // Parallel process syncVeleroObj func + errs := parallelProcess(tasks) + // Check for errors + var errorList []string + for _, err := range errs { + if err != nil { + b.logger.Error(err, "Error encountered during parallel processing", "backupName", backup.Name) + errorList = append(errorList, err.Error()) + } + } + + if len(errorList) > 0 { + // Return all errs + return ctrl.Result{}, fmt.Errorf("encountered %d errors during processing: %s", len(errorList), strings.Join(errorList, "; ")) + } + return ctrl.Result{}, nil } @@ -154,17 +167,15 @@ func (b *BackupManager) reconcileBackupStatus(ctx context.Context, backup *backu // reconcileOneTimeBackupStatus updates the status of a one-time Backup object by checking the status of corresponding Velero backup resources in each target cluster. // It determines whether to requeue the reconciliation based on the completion status of all Velero backup resources. func (b *BackupManager) reconcileOneTimeBackupStatus(ctx context.Context, backup *backupapi.Backup, destinationClusters map[ClusterKey]*fleetCluster, statusMap map[string]*backupapi.BackupDetails) (ctrl.Result, error) { - log := ctrl.LoggerFrom(ctx) - // Loop through each target cluster to retrieve the status of Velero backup resources using the client associated with the respective target cluster. for clusterKey, clusterAccess := range destinationClusters { - name := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Name) + name := generateVeleroResourceName(clusterKey.Name, BackupKind, backup.Namespace, backup.Name) veleroBackup := &velerov1.Backup{} // Use the client of the target cluster to get the status of Velero backup resources err := getResourceFromClusterClient(ctx, name, VeleroNamespace, *clusterAccess, veleroBackup) if err != nil { - log.Error(err, "failed to create velero backup instance for sync one time backup status", "backupName", backup.Name) + b.logger.Error(err, "failed to create velero backup instance for sync one time backup status", "backupName", backup.Name) return ctrl.Result{}, err } @@ -197,16 +208,14 @@ func (b *BackupManager) reconcileOneTimeBackupStatus(ctx context.Context, backup // reconcileScheduleBackupStatus manages the status synchronization for scheduled Backup objects. // If the backup type is "schedule", new backups will be continuously generated, hence the status synchronization will be executed continuously. func (b *BackupManager) reconcileScheduleBackupStatus(ctx context.Context, schedule *backupapi.Backup, destinationClusters map[ClusterKey]*fleetCluster, statusMap map[string]*backupapi.BackupDetails) (ctrl.Result, error) { - log := ctrl.LoggerFrom(ctx) - // Loop through each target cluster to retrieve the status of Velero backup created by schedule resources using the client associated with the respective target cluster. for clusterKey, clusterAccess := range destinationClusters { - name := generateVeleroResourceName(clusterKey.Name, BackupKind, schedule.Name) + name := generateVeleroResourceName(clusterKey.Name, BackupKind, schedule.Namespace, schedule.Name) veleroSchedule := &velerov1.Schedule{} // Use the client of the target cluster to get the status of Velero backup resources err := getResourceFromClusterClient(ctx, name, VeleroNamespace, *clusterAccess, veleroSchedule) if err != nil { - log.Error(err, "Unable to get velero schedule", "scheduleName", name) + b.logger.Error(err, "Unable to get velero schedule", "scheduleName", name) return ctrl.Result{}, err } @@ -214,7 +223,7 @@ func (b *BackupManager) reconcileScheduleBackupStatus(ctx context.Context, sched backupList := &velerov1.BackupList{} listErr := listResourcesFromClusterClient(ctx, VeleroNamespace, velerov1.ScheduleNameLabel, veleroSchedule.Name, *clusterAccess, backupList) if listErr != nil { - log.Info("Unable to list velero backups for velero schedule", "scheduleName", veleroSchedule.Name) + b.logger.Info("Unable to list velero backups for velero schedule", "scheduleName", veleroSchedule.Name) return ctrl.Result{}, listErr } @@ -224,7 +233,7 @@ func (b *BackupManager) reconcileScheduleBackupStatus(ctx context.Context, sched // If a schedule backup record cannot be found, the potential reasons are: // 1. The backup task hasn't been triggered by schedule. // 2. An issue occurred, but we can not get information directly from the status of schedules.velero.io - log.Info("No completed backups found for schedule", "scheduleName", veleroSchedule.Name) + b.logger.Info("No completed backups found for schedule", "scheduleName", veleroSchedule.Name) } // Sync schedule backup status with most recent complete backup @@ -248,7 +257,7 @@ func (b *BackupManager) reconcileScheduleBackupStatus(ctx context.Context, sched // Get the next reconcile interval cronInterval, err := GetCronInterval(schedule.Spec.Schedule) if err != nil { - log.Error(err, "failed to get cron Interval of backup.spec.schedule", "backupName", schedule.Name, "cronExpression", schedule.Spec.Schedule) + b.logger.Error(err, "failed to get cron Interval of backup.spec.schedule", "backupName", schedule.Name, "cronExpression", schedule.Spec.Schedule) return ctrl.Result{}, err } // If all backups are complete,requeue the reconciliation after a long cronInterval. @@ -260,14 +269,12 @@ func (b *BackupManager) reconcileScheduleBackupStatus(ctx context.Context, sched // reconcileDeleteBackup handles the deletion process of a Backup object. func (b *BackupManager) reconcileDeleteBackup(ctx context.Context, backup *backupapi.Backup) (ctrl.Result, error) { - log := ctrl.LoggerFrom(ctx) - // Fetch backup destination clusters destinationClusters, err := fetchDestinationClusters(ctx, b.Client, backup.Namespace, backup.Spec.Destination) if err != nil { - log.Error(err, "failed to fetch destination clusters when delete backup", "backupName", backup.Name) + b.logger.Error(err, "failed to fetch destination clusters when delete backup", "backupName", backup.Name) controllerutil.RemoveFinalizer(backup, BackupFinalizer) - log.Info("Removed finalizer due to fetch destination clusters error", "backupName", backup.Name) + b.logger.Info("Removed finalizer due to fetch destination clusters error", "backupName", backup.Name) return ctrl.Result{}, err } @@ -280,7 +287,7 @@ func (b *BackupManager) reconcileDeleteBackup(ctx context.Context, backup *backu // Delete all related velero schedule or backup instance if err := deleteResourcesInClusters(ctx, VeleroNamespace, BackupNameLabel, backup.Name, destinationClusters, objList); err != nil { - log.Error(err, "failed to delete velero schedule or backup Instances when delete backup", "backupName", backup.Name) + b.logger.Error(err, "failed to delete velero schedule or backup Instances when delete backup", "backupName", backup.Name) return ctrl.Result{}, err } diff --git a/pkg/fleet-manager/backup_restore_migrate_shared.go b/pkg/fleet-manager/backup_restore_migrate_shared.go index 8e8be7f69..32d6589f3 100644 --- a/pkg/fleet-manager/backup_restore_migrate_shared.go +++ b/pkg/fleet-manager/backup_restore_migrate_shared.go @@ -18,6 +18,7 @@ import ( "fmt" "reflect" "sort" + "sync" "time" "github.com/robfig/cron/v3" @@ -141,7 +142,13 @@ func buildVeleroBackupSpec(backupPolicy *backupapi.BackupPolicy) velerov1.Backup } } -func syncVeleroObj(ctx context.Context, clusterKey ClusterKey, cluster *fleetCluster, veleroObj client.Object) error { +func newSyncVeleroTaskFunc(ctx context.Context, clusterAccess *fleetCluster, obj client.Object) func() error { + return func() error { + return syncVeleroObj(ctx, clusterAccess, obj) + } +} + +func syncVeleroObj(ctx context.Context, cluster *fleetCluster, veleroObj client.Object) error { // Get the client clusterClient := cluster.client.CtrlRuntimeClient() @@ -231,8 +238,9 @@ func generateVeleroResourceObjectMeta(veleroResourceName string, labels map[stri } } -func generateVeleroResourceName(clusterName, creatorKind, creatorName string) string { - return clusterName + "-" + creatorKind + "-" + creatorName +// generateVeleroResourceName generate a name uniquely across object store +func generateVeleroResourceName(clusterName, creatorKind, creatorNamespace, creatorName string) string { + return clusterName + "-" + creatorKind + "-" + creatorNamespace + "-" + creatorName } // MostRecentCompletedBackup returns the most recent backup that's completed from a list of backups. @@ -304,3 +312,25 @@ func listResourcesFromClusterClient(ctx context.Context, namespace string, label } return clusterClient.List(ctx, objList, opts) } + +// parallelProcess runs the provided tasks concurrently and collects any errors. +func parallelProcess(tasks []func() error) []error { + var errs []error + var errMutex sync.Mutex + var wg sync.WaitGroup + + for _, task := range tasks { + wg.Add(1) + go func(task func() error) { + defer wg.Done() + if err := task(); err != nil { + errMutex.Lock() + errs = append(errs, err) + errMutex.Unlock() + } + }(task) + } + + wg.Wait() + return errs +}