Skip to content

Commit

Permalink
fix: reload the pulsar connection when the auth secret is updated (#146)
Browse files Browse the repository at this point in the history
* fix: trigger the pulsarconnection reconcile when the auth secret is updated

* fix: lint error
  • Loading branch information
FushuWang authored Sep 6, 2023
1 parent 286c252 commit b492211
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 7 deletions.
43 changes: 43 additions & 0 deletions controllers/pulsarconnection_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ package controllers
import (
"context"
"fmt"
"reflect"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -164,6 +168,45 @@ func (r *PulsarConnectionReconciler) SetupWithManager(mgr ctrl.Manager, options
Watches(&source.Kind{Type: &resourcev1alpha1.PulsarGeoReplication{}},
handler.EnqueueRequestsFromMapFunc(ConnectionRefMapper),
builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Watches(&source.Kind{Type: &corev1.Secret{}},
handler.EnqueueRequestsFromMapFunc(r.findSecretsForConnection),
builder.WithPredicates(secretPredicate())).
WithOptions(options).
Complete(r)
}

func (r *PulsarConnectionReconciler) findSecretsForConnection(secret client.Object) []reconcile.Request {
ctx := context.Background()
conns := &resourcev1alpha1.PulsarConnectionList{}
err := r.List(ctx, conns, client.InNamespace(secret.GetNamespace()))
if err != nil {
r.Log.Error(err, "List secrets to match connection failed")
}
var requests []reconcile.Request
for _, i := range conns.Items {
auth := i.Spec.Authentication
if auth != nil && auth.Token != nil && auth.Token.SecretRef != nil {
if auth.Token.SecretRef.Name == secret.GetName() {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: i.GetName(),
Namespace: i.GetNamespace(),
},
})
}
}
}

return requests
}

func secretPredicate() predicate.Predicate {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
if e.ObjectOld == nil || e.ObjectNew == nil {
return false
}
return !reflect.DeepEqual(e.ObjectOld.GetResourceVersion(), e.ObjectNew.GetResourceVersion())
},
}
}
11 changes: 4 additions & 7 deletions pkg/connection/reconcile_geo_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,9 @@ func (r *PulsarGeoReplicationReconciler) Observe(ctx context.Context) error {
r.log.V(1).Info("Observed geo replication items", "Count", len(geoList.Items))

r.conn.geoReplications = geoList.Items
if !r.conn.hasUnreadyResource {
for i := range r.conn.geoReplications {
if !resourcev1alpha1.IsPulsarResourceReady(&r.conn.geoReplications[i]) {
r.conn.hasUnreadyResource = true
break
}
}
// Force the `hasUnreadyResource` to be `true`` to trigger the PulsarConnection reload the auth config
if len(geoList.Items) != 0 {
r.conn.hasUnreadyResource = true
}

r.log.V(1).Info("Observe Done")
Expand All @@ -70,6 +66,7 @@ func (r *PulsarGeoReplicationReconciler) Observe(ctx context.Context) error {
// Reconcile reconciles all the geo replication objects
func (r *PulsarGeoReplicationReconciler) Reconcile(ctx context.Context) error {
for i := range r.conn.geoReplications {
r.log.V(1).Info("Reconcile Geo")
geoReplication := &r.conn.geoReplications[i]
if err := r.ReconcileGeoReplication(ctx, r.conn.pulsarAdmin, geoReplication); err != nil {
return fmt.Errorf("reconcile geo replication [%w]", err)
Expand Down
2 changes: 2 additions & 0 deletions pkg/connection/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (r *PulsarConnectionReconciler) Observe(ctx context.Context) error {
// Reconcile reconciles all resources
func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error {
var err error

if !r.hasUnreadyResource {
if !r.connection.DeletionTimestamp.IsZero() {
if len(r.tenants) == 0 && len(r.namespaces) == 0 && len(r.topics) == 0 {
Expand All @@ -108,6 +109,7 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context) error {
r.log.Info("Doesn't have unReady resource")
return nil
}
r.log.Info("have unReady resource")

if r.connection.Spec.AdminServiceURL == "" && r.connection.Spec.AdminServiceSecureURL != "" {
r.connection.Spec.AdminServiceURL = r.connection.Spec.AdminServiceSecureURL
Expand Down

0 comments on commit b492211

Please sign in to comment.