Skip to content

Commit

Permalink
retry
Browse files Browse the repository at this point in the history
  • Loading branch information
labuladong committed Aug 14, 2024
1 parent e7c5d30 commit 42d4763
Show file tree
Hide file tree
Showing 11 changed files with 419 additions and 2 deletions.
6 changes: 5 additions & 1 deletion controllers/pulsarconnection_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type PulsarConnectionReconciler struct {
Log logr.Logger
Recorder record.EventRecorder
PulsarAdminCreator admin.PulsarAdminCreator
Retryer *utils.ReconcileRetryer
}

//nolint:lll
Expand Down Expand Up @@ -109,11 +110,12 @@ func (r *PulsarConnectionReconciler) Reconcile(ctx context.Context, req ctrl.Req

r.Log.Info("Reconciling PulsarConnection", "name", pulsarConnection.Name, "namespace", pulsarConnection.Namespace)

reconciler := connection.MakeReconciler(r.Log, r.Client, r.PulsarAdminCreator, pulsarConnection)
reconciler := connection.MakeReconciler(r.Log, r.Client, r.PulsarAdminCreator, pulsarConnection, r.Retryer)
if err := reconciler.Observe(ctx); err != nil {
return ctrl.Result{}, err
}
if err := reconciler.Reconcile(ctx); err != nil {
r.Retryer.CreateIfAbsent(pulsarConnection)
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -241,8 +243,10 @@ func (r *PulsarConnectionReconciler) SetupWithManager(mgr ctrl.Manager, options
Watches(&source.Kind{Type: &corev1.Secret{}},
handler.EnqueueRequestsFromMapFunc(r.findSecretsForConnection),
builder.WithPredicates(secretPredicate())).
Watches(&source.Channel{Source: r.Retryer.Source()}, &handler.EnqueueRequestForObject{}).
WithOptions(options).
Complete(r)

Check failure on line 249 in controllers/pulsarconnection_controller.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)
}

func (r *PulsarConnectionReconciler) findSecretsForConnection(secret client.Object) []reconcile.Request {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/apache/pulsar-client-go v0.12.1
github.com/go-logr/logr v1.2.1
github.com/onsi/ginkgo v1.16.5
github.com/onsi/ginkgo/v2 v2.1.3
github.com/onsi/gomega v1.19.0
github.com/xhit/go-str2duration/v2 v2.1.0
go.uber.org/zap v1.19.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down Expand Up @@ -381,13 +382,15 @@ github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.1.3 h1:e/3Cwtogj0HA+25nMP1jCMDIf8RtRYbGwGGuBIFztkc=
github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
Expand Down Expand Up @@ -624,6 +627,7 @@ golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"

"github.com/streamnative/pulsar-resources-operator/pkg/feature"
"github.com/streamnative/pulsar-resources-operator/pkg/utils"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -105,6 +106,7 @@ func main() {
Log: ctrl.Log.WithName("controllers").WithName("PulsarConnection"),
Recorder: mgr.GetEventRecorderFor("pulsarconnection-controller"),
PulsarAdminCreator: admin.NewPulsarAdmin,
Retryer: utils.NewReconcileRetryer(5, utils.NewEventSource(ctrl.Log.WithName("eventSource"))),
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: 1}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PulsarConnection")
os.Exit(1)
Expand Down
1 change: 1 addition & 0 deletions pkg/connection/reconcile_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin
for _, err := range policyErrs {
msg += err.Error() + ";\n"
}
r.conn.retry()
meta.SetStatusCondition(&topic.Status.Conditions,
NewTopicErrorCondition(topic.Generation, resourcev1alpha1.ConditionTopicPolicyReady, msg))
} else {
Expand Down
9 changes: 8 additions & 1 deletion pkg/connection/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,21 @@ type PulsarConnectionReconciler struct {
pulsarAdmin admin.PulsarAdmin
pulsarAdminV3 admin.PulsarAdmin
reconcilers []reconciler.Interface

retryer *utils.ReconcileRetryer
}

var _ reconciler.Interface = &PulsarConnectionReconciler{}

// MakeReconciler creates resource reconcilers
func MakeReconciler(log logr.Logger, k8sClient client.Client, creator admin.PulsarAdminCreator,
connection *resourcev1alpha1.PulsarConnection) reconciler.Interface {
connection *resourcev1alpha1.PulsarConnection, retryer *utils.ReconcileRetryer) reconciler.Interface {
r := &PulsarConnectionReconciler{
log: log,
connection: connection,
creator: creator,
client: k8sClient,
retryer: retryer,
}
r.reconcilers = []reconciler.Interface{
makeGeoReplicationReconciler(r),
Expand All @@ -87,6 +90,10 @@ func makeSubResourceLog(r *PulsarConnectionReconciler, name string) logr.Logger
fmt.Sprintf("%s/%s", r.connection.Namespace, r.connection.Name))
}

func (r *PulsarConnectionReconciler) retry() {
r.retryer.CreateIfAbsent(r.connection)
}

// Observe checks the updates of object
func (r *PulsarConnectionReconciler) Observe(ctx context.Context) error {
r.log.Info("Start PulsarConnectionReconciler Observe")
Expand Down
98 changes: 98 additions & 0 deletions pkg/utils/event_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2020 StreamNative, Inc.. All Rights Reserved.

package utils

import (
"sync"
"time"

"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
)

type Event struct {

Check warning on line 14 in pkg/utils/event_source.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported type Event should have comment or be unexported (revive)
client.Object
}

// EventSource is a custom event source that can be used to trigger reconcile
type EventSource struct {
Log logr.Logger
Events chan event.GenericEvent
eventMap map[string]*time.Timer
mu sync.Mutex
}

func NewEventSource(log logr.Logger) *EventSource {

Check warning on line 26 in pkg/utils/event_source.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported function NewEventSource should have comment or be unexported (revive)
return &EventSource{
Log: log,
Events: make(chan event.GenericEvent, 20),
eventMap: make(map[string]*time.Timer),
}
}

// CreateIfAbsent triggers reconcile after delay, idempotent operation for the same key
func (s *EventSource) CreateIfAbsent(delay time.Duration, obj client.Object, key string) {
if delay <= 0 {
return
}
s.mu.Lock()
defer s.mu.Unlock()

if _, ok := s.eventMap[key]; ok {
return
}
s.Log.Info("Will trigger reconcile after delay", "Key", key,
"Delay", delay, "Name", obj.GetName(), "Namespace", obj.GetNamespace())
// add a little jitter
delay += time.Second * 2
s.eventMap[key] = time.AfterFunc(delay, func() {
s.mu.Lock()
defer s.mu.Unlock()
s.Log.Info("Trigger reconcile",
"Key", key, "Name", obj.GetName(), "Namespace", obj.GetNamespace())
s.Events <- event.GenericEvent{Object: obj}
delete(s.eventMap, key)
})
}

func (s *EventSource) Update(key string, delay time.Duration) {

Check warning on line 59 in pkg/utils/event_source.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported method EventSource.Update should have comment or be unexported (revive)
s.mu.Lock()
defer s.mu.Unlock()
s.Log.Info("Update reconcile event", "Key", key)
if timer, ok := s.eventMap[key]; ok {
timer.Reset(delay)
} else {
s.Log.Info("No reconcile event found", "Key", key)
}
}

func (s *EventSource) Contains(key string) bool {

Check warning on line 70 in pkg/utils/event_source.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported method EventSource.Contains should have comment or be unexported (revive)
s.mu.Lock()
defer s.mu.Unlock()

_, ok := s.eventMap[key]
return ok
}

func (s *EventSource) Remove(key string) {

Check warning on line 78 in pkg/utils/event_source.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported method EventSource.Remove should have comment or be unexported (revive)
s.mu.Lock()
defer s.mu.Unlock()

s.Log.Info("Remove reconcile event", "Key", key)
if timer, ok := s.eventMap[key]; ok {
timer.Stop()
delete(s.eventMap, key)
}
}

func (s *EventSource) Close() {

Check warning on line 89 in pkg/utils/event_source.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported method EventSource.Close should have comment or be unexported (revive)
s.mu.Lock()
defer s.mu.Unlock()

for key, timer := range s.eventMap {
timer.Stop()
delete(s.eventMap, key)
}
close(s.Events)
}
60 changes: 60 additions & 0 deletions pkg/utils/event_source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2020 StreamNative, Inc.. All Rights Reserved.

package utils

import (
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/event"
)

var _ = Context("TestEventSource", func() {
It("should trigger reconcile after delay", func() {
source := NewEventSource(ctrl.Log.WithName("test"))
key := "test"

Check failure on line 18 in pkg/utils/event_source_test.go

View workflow job for this annotation

GitHub Actions / lint

string `test` has 3 occurrences, make it a constant (goconst)
Expect(source.Events).ShouldNot(BeClosed())
source.CreateIfAbsent(3*time.Second, &corev1.Pod{}, key)
Eventually(source.Events, "10s").Should(
Receive(Equal(event.GenericEvent{Object: &corev1.Pod{}})),
)
// key should be removed from the event map
Expect(source.Contains(key)).Should(BeFalse())
source.Close()
Expect(source.Events).Should(BeClosed())
})

It("should be idempotent for the same key", func() {
source := NewEventSource(ctrl.Log.WithName("test"))
key := "test"
Expect(source.Events).ShouldNot(BeClosed())

source.CreateIfAbsent(3*time.Second, &corev1.Pod{}, key)
source.CreateIfAbsent(10*time.Second, &corev1.Pod{}, key)

Eventually(source.Events, "10s").
Should(Receive(Equal(event.GenericEvent{Object: &corev1.Pod{}})))
// key should be removed from the event map
Expect(source.Contains(key)).Should(BeFalse())
source.Close()
Expect(source.Events).Should(BeClosed())
})

It("should be removed", func() {
source := NewEventSource(ctrl.Log.WithName("test"))
key := "test"
Expect(source.Events).ShouldNot(BeClosed())

source.CreateIfAbsent(2*time.Second, &corev1.Pod{}, key)
source.Remove(key)
// key should be removed from the event map
Expect(source.Contains(key)).Should(BeFalse())
Consistently(source.Events, "3s").
ShouldNot(Receive(Equal(event.GenericEvent{Object: &corev1.Pod{}})))
source.Close()
Expect(source.Events).Should(BeClosed())
})
})
Loading

0 comments on commit 42d4763

Please sign in to comment.