Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
labuladong committed Oct 30, 2023
1 parent cf2e93a commit 337fd9e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 7 deletions.
23 changes: 22 additions & 1 deletion pkg/connection/reconcile_geo_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,28 @@ func (r *PulsarGeoReplicationReconciler) Reconcile(ctx context.Context) error {
for i := range r.conn.geoReplications {
r.log.V(1).Info("Reconcile Geo")
geoReplication := &r.conn.geoReplications[i]
if err := r.ReconcileGeoReplication(ctx, r.conn.pulsarAdmin, geoReplication); err != nil {
pulsarAdmin := r.conn.pulsarAdmin
if geoReplication.Spec.ConnectionRef.Name != r.conn.connection.Name {
// If the connectionRef is the remote connection, we need to create a new pulsarAdmin for it
localConnection := &resourcev1alpha1.PulsarConnection{}
namespacedName := types.NamespacedName{
Name: geoReplication.Spec.ConnectionRef.Name,
Namespace: geoReplication.Namespace,
}
if err := r.conn.client.Get(ctx, namespacedName, localConnection); err != nil {
return fmt.Errorf("get local pulsarConnection [%w]", err)
}
cfg, err := MakePulsarAdminConfig(ctx, localConnection, r.conn.client)
if err != nil {
return fmt.Errorf("make pulsar admin config [%w]", err)
}
pulsarAdmin, err = r.conn.creator(*cfg)
if err != nil {
return fmt.Errorf("make pulsar admin [%w]", err)
}
}

if err := r.ReconcileGeoReplication(ctx, pulsarAdmin, geoReplication); err != nil {
return fmt.Errorf("reconcile geo replication [%w]", err)
}
}
Expand Down
17 changes: 11 additions & 6 deletions pkg/connection/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,17 +193,22 @@ func GetValue(ctx context.Context, k8sClient client.Client, namespace string,

// MakePulsarAdminConfig create pulsar admin configuration
func (r *PulsarConnectionReconciler) MakePulsarAdminConfig(ctx context.Context) (*admin.PulsarAdminConfig, error) {
if r.connection.Spec.AdminServiceURL == "" && r.connection.Spec.AdminServiceSecureURL == "" {
return MakePulsarAdminConfig(ctx, r.connection, r.client)
}

// MakePulsarAdminConfig create pulsar admin configuration
func MakePulsarAdminConfig(ctx context.Context, connection *resourcev1alpha1.PulsarConnection,
k8sClient client.Client) (*admin.PulsarAdminConfig, error) {
if connection.Spec.AdminServiceURL == "" && connection.Spec.AdminServiceSecureURL == "" {
return nil, fmt.Errorf("adminServiceURL or adminServiceSecureURL must not be empty")
}

cfg := admin.PulsarAdminConfig{
WebServiceURL: r.connection.Spec.AdminServiceURL,
WebServiceURL: connection.Spec.AdminServiceURL,
}
hasAuth := false
if authn := r.connection.Spec.Authentication; authn != nil {
if authn := connection.Spec.Authentication; authn != nil {
if token := authn.Token; token != nil {
value, err := GetValue(ctx, r.client, r.connection.Namespace, token)
value, err := GetValue(ctx, k8sClient, connection.Namespace, token)
if err != nil {
return nil, err
}
Expand All @@ -217,7 +222,7 @@ func (r *PulsarConnectionReconciler) MakePulsarAdminConfig(ctx context.Context)
cfg.ClientID = oauth2.ClientID
cfg.Audience = oauth2.Audience
cfg.Scope = oauth2.Scope
value, err := GetValue(ctx, r.client, r.connection.Namespace, &oauth2.Key)
value, err := GetValue(ctx, k8sClient, connection.Namespace, &oauth2.Key)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 337fd9e

Please sign in to comment.