From 71730824c9913792cbc52195ba27a54fbb6caa62 Mon Sep 17 00:00:00 2001 From: ericsyh Date: Sat, 14 Sep 2024 22:07:38 +0800 Subject: [PATCH] optimize the controller reconcile Signed-off-by: ericsyh --- pkg/connection/reconcile_function.go | 6 +++++- pkg/connection/reconcile_geo_replication.go | 6 +++++- pkg/connection/reconcile_package.go | 6 +++++- pkg/connection/reconcile_permission.go | 6 +++++- pkg/connection/reconcile_sink.go | 7 +++++-- pkg/connection/reconcile_source.go | 8 +++++--- pkg/connection/reconcile_tenant.go | 6 +++++- 7 files changed, 35 insertions(+), 10 deletions(-) diff --git a/pkg/connection/reconcile_function.go b/pkg/connection/reconcile_function.go index 9706abc..ed581bc 100644 --- a/pkg/connection/reconcile_function.go +++ b/pkg/connection/reconcile_function.go @@ -71,12 +71,16 @@ func (r *PulsarFunctionReconciler) Observe(ctx context.Context) error { // Reconcile reconciles all functions func (r *PulsarFunctionReconciler) Reconcile(ctx context.Context) error { + errs := []error{} for i := range r.conn.functions { instance := &r.conn.functions[i] if err := r.ReconcileFunction(ctx, r.conn.pulsarAdminV3, instance); err != nil { - return fmt.Errorf("reconcile pulsar function [%w]", err) + errs = append(errs, err) } } + if len(errs) > 0 { + return fmt.Errorf("reconcile pulsar function [%v]", errs) + } return nil } diff --git a/pkg/connection/reconcile_geo_replication.go b/pkg/connection/reconcile_geo_replication.go index daae50b..ca8192e 100644 --- a/pkg/connection/reconcile_geo_replication.go +++ b/pkg/connection/reconcile_geo_replication.go @@ -79,6 +79,7 @@ func (r *PulsarGeoReplicationReconciler) Observe(ctx context.Context) error { // Reconcile reconciles all the geo replication objects func (r *PulsarGeoReplicationReconciler) Reconcile(ctx context.Context) error { + errs := []error{} for i := range r.conn.geoReplications { r.log.V(1).Info("Reconcile Geo") geoReplication := &r.conn.geoReplications[i] @@ -104,7 +105,10 @@ func (r *PulsarGeoReplicationReconciler) Reconcile(ctx context.Context) error { } if err := r.ReconcileGeoReplication(ctx, pulsarAdmin, geoReplication); err != nil { - return fmt.Errorf("reconcile geo replication [%w]", err) + errs = append(errs, err) + } + if len(errs) > 0 { + return fmt.Errorf("reconcile geo replication [%v]", errs) } } return nil diff --git a/pkg/connection/reconcile_package.go b/pkg/connection/reconcile_package.go index 417e279..399a472 100644 --- a/pkg/connection/reconcile_package.go +++ b/pkg/connection/reconcile_package.go @@ -73,12 +73,16 @@ func (r *PulsarPackageReconciler) Observe(ctx context.Context) error { // Reconcile reconciles all topics func (r *PulsarPackageReconciler) Reconcile(ctx context.Context) error { + errs := []error{} for i := range r.conn.packages { pkg := &r.conn.packages[i] if err := r.ReconcilePackage(ctx, r.conn.pulsarAdminV3, pkg); err != nil { - return fmt.Errorf("reconcile pulsar package [%w]", err) + errs = append(errs, err) } } + if len(errs) > 0 { + return fmt.Errorf("reconcile pulsar package [%v]", errs) + } return nil } diff --git a/pkg/connection/reconcile_permission.go b/pkg/connection/reconcile_permission.go index f0eb76e..59101b1 100644 --- a/pkg/connection/reconcile_permission.go +++ b/pkg/connection/reconcile_permission.go @@ -68,12 +68,16 @@ func (r *PulsarPermissionReconciler) Observe(ctx context.Context) error { // Reconcile reconciles all permissions func (r *PulsarPermissionReconciler) Reconcile(ctx context.Context) error { + errs := []error{} for i := range r.conn.permissions { perm := &r.conn.permissions[i] if err := r.ReconcilePermission(ctx, r.conn.pulsarAdmin, perm); err != nil { - return fmt.Errorf("reconcile permission [%w]", err) + errs = append(errs, err) } } + if len(errs) > 0 { + return fmt.Errorf("reconcile permission [%v]", errs) + } return nil } diff --git a/pkg/connection/reconcile_sink.go b/pkg/connection/reconcile_sink.go index df2c139..126c77f 100644 --- a/pkg/connection/reconcile_sink.go +++ b/pkg/connection/reconcile_sink.go @@ -69,13 +69,16 @@ func (r *PulsarSinkReconciler) Observe(ctx context.Context) error { // Reconcile reconciles the object func (r *PulsarSinkReconciler) Reconcile(ctx context.Context) error { r.log.V(1).Info("Start Reconcile") - + errs := []error{} for i := range r.conn.sinks { sink := &r.conn.sinks[i] if err := r.ReconcileSink(ctx, r.conn.pulsarAdminV3, sink); err != nil { - return fmt.Errorf("reconcile sink [%s] [%w]", sink.Name, err) + errs = append(errs, err) } } + if len(errs) > 0 { + return fmt.Errorf("reconcile sink [%v]", errs) + } return nil } diff --git a/pkg/connection/reconcile_source.go b/pkg/connection/reconcile_source.go index a355b17..a8ef32b 100644 --- a/pkg/connection/reconcile_source.go +++ b/pkg/connection/reconcile_source.go @@ -70,15 +70,17 @@ func (r *PulsarSourceReconciler) Observe(ctx context.Context) error { // Reconcile reconciles the object func (r *PulsarSourceReconciler) Reconcile(ctx context.Context) error { r.log.V(1).Info("Start Reconcile") - + errs := []error{} for i := range r.conn.sources { source := &r.conn.sources[i] r.log.Info("Reconcile source", "Name", source.Name) if err := r.ReconcileSource(ctx, r.conn.pulsarAdminV3, source); err != nil { - return fmt.Errorf("reconcile source [%s] [%w]", source.Name, err) + errs = append(errs, err) } } - + if len(errs) > 0 { + return fmt.Errorf("reconcile source [%v]", errs) + } return nil } diff --git a/pkg/connection/reconcile_tenant.go b/pkg/connection/reconcile_tenant.go index 0562185..8e37832 100644 --- a/pkg/connection/reconcile_tenant.go +++ b/pkg/connection/reconcile_tenant.go @@ -69,12 +69,16 @@ func (r *PulsarTenantReconciler) Observe(ctx context.Context) error { // Reconcile reconciles all tenants func (r *PulsarTenantReconciler) Reconcile(ctx context.Context) error { + errs := []error{} for i := range r.conn.tenants { tenant := &r.conn.tenants[i] if err := r.ReconcileTenant(ctx, r.conn.pulsarAdmin, tenant); err != nil { - return fmt.Errorf("reconcile tenant [%w]", err) + errs = append(errs, err) } } + if len(errs) > 0 { + return fmt.Errorf("reconcile tenant [%v]", errs) + } return nil }