Skip to content

Commit

Permalink
move PreFetchHook to WatcherConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoandredinis committed Nov 25, 2024
1 parent 787bba3 commit 56f10d1
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 34 deletions.
31 changes: 8 additions & 23 deletions lib/srv/discovery/common/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type WatcherConfig struct {
DiscoveryGroup string
// Origin is used to specify what type of origin watcher's resources are
Origin string
// PreFetchHookFn is called before starting a new fetch cycle.
PreFetchHookFn func()
}

// CheckAndSetDefaults validates the config.
Expand Down Expand Up @@ -96,36 +98,19 @@ type Watcher struct {
ctx context.Context
// resourcesC is a channel where fetched resourcess are sent.
resourcesC chan (types.ResourcesWithLabels)
// preFetchHookFn is called before starting a new fetch cycle.
preFetchHookFn func()
}

// WatcherOption is a functional option for the Watcher.
type WatcherOption func(*Watcher)

// WithPreFetchHookFn sets a function that gets called before each new iteration.
func WithPreFetchHookFn(f func()) WatcherOption {
return func(w *Watcher) {
w.preFetchHookFn = f
}
}

// NewWatcher returns a new instance of a common discovery watcher.
func NewWatcher(ctx context.Context, config WatcherConfig, options ...WatcherOption) (*Watcher, error) {
func NewWatcher(ctx context.Context, config WatcherConfig) (*Watcher, error) {
if err := config.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
watcher := &Watcher{

return &Watcher{
cfg: config,
ctx: ctx,
resourcesC: make(chan types.ResourcesWithLabels),
}

for _, opt := range options {
opt(watcher)
}

return watcher, nil
}, nil
}

// Start starts fetching cloud resources and sending them to the channel.
Expand Down Expand Up @@ -158,8 +143,8 @@ func (w *Watcher) Start() {

// fetchAndSend fetches resources from all fetchers and sends them to the channel.
func (w *Watcher) fetchAndSend() {
if w.preFetchHookFn != nil {
w.preFetchHookFn()
if w.cfg.PreFetchHookFn != nil {
w.cfg.PreFetchHookFn()
}

var (
Expand Down
8 changes: 4 additions & 4 deletions lib/srv/discovery/common/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ func TestWatcher(t *testing.T) {
Interval: time.Hour,
Clock: clock,
Origin: types.OriginCloud,
},
WithPreFetchHookFn(func() {
fetchIterations.Add(1)
}))
PreFetchHookFn: func() {
fetchIterations.Add(1)
},
})
require.NoError(t, err)
go watcher.Start()

Expand Down
6 changes: 3 additions & 3 deletions lib/srv/discovery/database_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ func (s *Server) startDatabaseWatchers() error {
TriggerFetchC: s.newDiscoveryConfigChangedSub(),
Origin: types.OriginCloud,
Clock: s.clock,
PreFetchHookFn: func() {
s.awsRDSResourcesStatus.reset()
},
},
common.WithPreFetchHookFn(func() {
s.awsRDSResourcesStatus.reset()
}),
)
if err != nil {
return trace.Wrap(err)
Expand Down
7 changes: 3 additions & 4 deletions lib/srv/discovery/kube_integration_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ func (s *Server) startKubeIntegrationWatchers() error {
Interval: s.PollInterval,
Origin: types.OriginCloud,
TriggerFetchC: s.newDiscoveryConfigChangedSub(),
},
common.WithPreFetchHookFn(func() {
PreFetchHookFn: func() {
s.awsEKSResourcesStatus.reset()
}),
)
},
})
if err != nil {
return trace.Wrap(err)
}
Expand Down

0 comments on commit 56f10d1

Please sign in to comment.