diff --git a/pkg/connection/reconcile_geo_replication.go b/pkg/connection/reconcile_geo_replication.go index f43e07c5..38dba051 100644 --- a/pkg/connection/reconcile_geo_replication.go +++ b/pkg/connection/reconcile_geo_replication.go @@ -77,7 +77,28 @@ func (r *PulsarGeoReplicationReconciler) Reconcile(ctx context.Context) error { for i := range r.conn.geoReplications { r.log.V(1).Info("Reconcile Geo") geoReplication := &r.conn.geoReplications[i] - if err := r.ReconcileGeoReplication(ctx, r.conn.pulsarAdmin, geoReplication); err != nil { + pulsarAdmin := r.conn.pulsarAdmin + if geoReplication.Spec.ConnectionRef.Name != r.conn.connection.Name { + // If the connectionRef is the remote connection, we need to create a new pulsarAdmin for it + localConnection := &resourcev1alpha1.PulsarConnection{} + namespacedName := types.NamespacedName{ + Name: geoReplication.Spec.ConnectionRef.Name, + Namespace: geoReplication.Namespace, + } + if err := r.conn.client.Get(ctx, namespacedName, localConnection); err != nil { + return fmt.Errorf("get local pulsarConnection [%w]", err) + } + cfg, err := MakePulsarAdminConfig(ctx, localConnection, r.conn.client) + if err != nil { + return fmt.Errorf("make pulsar admin config [%w]", err) + } + pulsarAdmin, err = r.conn.creator(*cfg) + if err != nil { + return fmt.Errorf("make pulsar admin [%w]", err) + } + } + + if err := r.ReconcileGeoReplication(ctx, pulsarAdmin, geoReplication); err != nil { return fmt.Errorf("reconcile geo replication [%w]", err) } } diff --git a/pkg/connection/reconciler.go b/pkg/connection/reconciler.go index c41bf8f3..f6df61f1 100644 --- a/pkg/connection/reconciler.go +++ b/pkg/connection/reconciler.go @@ -193,17 +193,22 @@ func GetValue(ctx context.Context, k8sClient client.Client, namespace string, // MakePulsarAdminConfig create pulsar admin configuration func (r *PulsarConnectionReconciler) MakePulsarAdminConfig(ctx context.Context) (*admin.PulsarAdminConfig, error) { - if r.connection.Spec.AdminServiceURL == "" && r.connection.Spec.AdminServiceSecureURL == "" { + return MakePulsarAdminConfig(ctx, r.connection, r.client) +} + +// MakePulsarAdminConfig create pulsar admin configuration +func MakePulsarAdminConfig(ctx context.Context, connection *resourcev1alpha1.PulsarConnection, + k8sClient client.Client) (*admin.PulsarAdminConfig, error) { + if connection.Spec.AdminServiceURL == "" && connection.Spec.AdminServiceSecureURL == "" { return nil, fmt.Errorf("adminServiceURL or adminServiceSecureURL must not be empty") } - cfg := admin.PulsarAdminConfig{ - WebServiceURL: r.connection.Spec.AdminServiceURL, + WebServiceURL: connection.Spec.AdminServiceURL, } hasAuth := false - if authn := r.connection.Spec.Authentication; authn != nil { + if authn := connection.Spec.Authentication; authn != nil { if token := authn.Token; token != nil { - value, err := GetValue(ctx, r.client, r.connection.Namespace, token) + value, err := GetValue(ctx, k8sClient, connection.Namespace, token) if err != nil { return nil, err } @@ -217,7 +222,7 @@ func (r *PulsarConnectionReconciler) MakePulsarAdminConfig(ctx context.Context) cfg.ClientID = oauth2.ClientID cfg.Audience = oauth2.Audience cfg.Scope = oauth2.Scope - value, err := GetValue(ctx, r.client, r.connection.Namespace, &oauth2.Key) + value, err := GetValue(ctx, k8sClient, connection.Namespace, &oauth2.Key) if err != nil { return nil, err }