Skip to content

Commit

Permalink
optimize the controller reconcile (#257)
Browse files Browse the repository at this point in the history
Signed-off-by: ericsyh <[email protected]>
  • Loading branch information
ericsyh authored Sep 16, 2024
1 parent 242cc11 commit a710667
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 10 deletions.
6 changes: 5 additions & 1 deletion pkg/connection/reconcile_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,16 @@ func (r *PulsarFunctionReconciler) Observe(ctx context.Context) error {

// Reconcile reconciles all functions
func (r *PulsarFunctionReconciler) Reconcile(ctx context.Context) error {
errs := []error{}
for i := range r.conn.functions {
instance := &r.conn.functions[i]
if err := r.ReconcileFunction(ctx, r.conn.pulsarAdminV3, instance); err != nil {
return fmt.Errorf("reconcile pulsar function [%w]", err)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return fmt.Errorf("reconcile pulsar function [%v]", errs)
}
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/connection/reconcile_geo_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (r *PulsarGeoReplicationReconciler) Observe(ctx context.Context) error {

// Reconcile reconciles all the geo replication objects
func (r *PulsarGeoReplicationReconciler) Reconcile(ctx context.Context) error {
errs := []error{}
for i := range r.conn.geoReplications {
r.log.V(1).Info("Reconcile Geo")
geoReplication := &r.conn.geoReplications[i]
Expand All @@ -104,7 +105,10 @@ func (r *PulsarGeoReplicationReconciler) Reconcile(ctx context.Context) error {
}

if err := r.ReconcileGeoReplication(ctx, pulsarAdmin, geoReplication); err != nil {
return fmt.Errorf("reconcile geo replication [%w]", err)
errs = append(errs, err)
}
if len(errs) > 0 {
return fmt.Errorf("reconcile geo replication [%v]", errs)
}
}
return nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/connection/reconcile_package.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,16 @@ func (r *PulsarPackageReconciler) Observe(ctx context.Context) error {

// Reconcile reconciles all topics
func (r *PulsarPackageReconciler) Reconcile(ctx context.Context) error {
errs := []error{}
for i := range r.conn.packages {
pkg := &r.conn.packages[i]
if err := r.ReconcilePackage(ctx, r.conn.pulsarAdminV3, pkg); err != nil {
return fmt.Errorf("reconcile pulsar package [%w]", err)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return fmt.Errorf("reconcile pulsar package [%v]", errs)
}
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/connection/reconcile_permission.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,16 @@ func (r *PulsarPermissionReconciler) Observe(ctx context.Context) error {

// Reconcile reconciles all permissions
func (r *PulsarPermissionReconciler) Reconcile(ctx context.Context) error {
errs := []error{}
for i := range r.conn.permissions {
perm := &r.conn.permissions[i]
if err := r.ReconcilePermission(ctx, r.conn.pulsarAdmin, perm); err != nil {
return fmt.Errorf("reconcile permission [%w]", err)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return fmt.Errorf("reconcile permission [%v]", errs)
}

return nil
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/connection/reconcile_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,16 @@ func (r *PulsarSinkReconciler) Observe(ctx context.Context) error {
// Reconcile reconciles the object
func (r *PulsarSinkReconciler) Reconcile(ctx context.Context) error {
r.log.V(1).Info("Start Reconcile")

errs := []error{}
for i := range r.conn.sinks {
sink := &r.conn.sinks[i]
if err := r.ReconcileSink(ctx, r.conn.pulsarAdminV3, sink); err != nil {
return fmt.Errorf("reconcile sink [%s] [%w]", sink.Name, err)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return fmt.Errorf("reconcile sink [%v]", errs)
}

return nil
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/connection/reconcile_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,17 @@ func (r *PulsarSourceReconciler) Observe(ctx context.Context) error {
// Reconcile reconciles the object
func (r *PulsarSourceReconciler) Reconcile(ctx context.Context) error {
r.log.V(1).Info("Start Reconcile")

errs := []error{}
for i := range r.conn.sources {
source := &r.conn.sources[i]
r.log.Info("Reconcile source", "Name", source.Name)
if err := r.ReconcileSource(ctx, r.conn.pulsarAdminV3, source); err != nil {
return fmt.Errorf("reconcile source [%s] [%w]", source.Name, err)
errs = append(errs, err)
}
}

if len(errs) > 0 {
return fmt.Errorf("reconcile source [%v]", errs)
}
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/connection/reconcile_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,16 @@ func (r *PulsarTenantReconciler) Observe(ctx context.Context) error {

// Reconcile reconciles all tenants
func (r *PulsarTenantReconciler) Reconcile(ctx context.Context) error {
errs := []error{}
for i := range r.conn.tenants {
tenant := &r.conn.tenants[i]
if err := r.ReconcileTenant(ctx, r.conn.pulsarAdmin, tenant); err != nil {
return fmt.Errorf("reconcile tenant [%w]", err)
errs = append(errs, err)
}
}
if len(errs) > 0 {
return fmt.Errorf("reconcile tenant [%v]", errs)
}
return nil
}

Expand Down

0 comments on commit a710667

Please sign in to comment.