From 7a7f11a8a056d3494d2014e1b3a0e6b26976eb47 Mon Sep 17 00:00:00 2001 From: Abhi Kapoor Date: Sun, 15 Dec 2024 14:55:19 -0500 Subject: [PATCH] Watch applications in all namespace and fitler by the ones that are allowed Signed-off-by: Abhi Kapoor --- pkg/app_watcher/app_watcher.go | 111 ++++++++++++++++------------ pkg/app_watcher/app_watcher_test.go | 10 +-- 2 files changed, 68 insertions(+), 53 deletions(-) diff --git a/pkg/app_watcher/app_watcher.go b/pkg/app_watcher/app_watcher.go index 821988bb..71dbe723 100644 --- a/pkg/app_watcher/app_watcher.go +++ b/pkg/app_watcher/app_watcher.go @@ -5,15 +5,17 @@ import ( "fmt" "reflect" "strings" - "sync" "time" appv1alpha1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned" - informers "github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions/application/v1alpha1" applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" + "github.com/argoproj/argo-cd/v2/util/glob" "github.com/rs/zerolog/log" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apiruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -24,8 +26,8 @@ import ( // ApplicationWatcher is the controller that watches ArgoCD Application resources via the Kubernetes API type ApplicationWatcher struct { applicationClientset appclientset.Interface - appInformer []cache.SharedIndexInformer - appLister []applisters.ApplicationLister + appInformer cache.SharedIndexInformer + appLister applisters.ApplicationLister vcsToArgoMap appdir.VcsToArgoMap } @@ -45,12 +47,8 @@ func NewApplicationWatcher(kubeCfg *rest.Config, vcsToArgoMap appdir.VcsToArgoMa } appInformer, appLister := ctrl.newApplicationInformerAndLister(time.Second*30, cfg) - for _, informer := range appInformer { - ctrl.appInformer = append(ctrl.appInformer, informer) - } - for _, lister := range appLister { - ctrl.appLister = append(ctrl.appLister, lister) - } + ctrl.appInformer = appInformer + ctrl.appLister = appLister return &ctrl, nil } @@ -60,19 +58,14 @@ func (ctrl *ApplicationWatcher) Run(ctx context.Context, processors int) { defer runtime.HandleCrash() - var wg sync.WaitGroup - wg.Add(len(ctrl.appInformer)) - - for _, informer := range ctrl.appInformer { - go func(inf cache.SharedIndexInformer) { - defer wg.Done() - inf.Run(ctx.Done()) - if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { - log.Warn().Msg("Timed out waiting for caches to sync") - } - }(informer) + go ctrl.appInformer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), ctrl.appInformer.HasSynced) { + log.Error().Msg("Timed out waiting for caches to sync") + return } - wg.Wait() + + <-ctx.Done() } // onAdd is the function executed when the informer notifies the @@ -124,6 +117,10 @@ func (ctrl *ApplicationWatcher) onApplicationDeleted(obj interface{}) { ctrl.vcsToArgoMap.DeleteApp(app) } +func (ctrl *ApplicationWatcher) isAppNamespaceAllowed(app *appv1alpha1.Application, cfg config.ServerConfig) bool { + return app.Namespace == cfg.ArgoCDNamespace || glob.MatchStringInList(cfg.MonitorAppsNamespaces, app.Namespace, glob.REGEXP) +} + /* newApplicationInformerAndLister, is part of the ApplicationWatcher struct. It sets up a Kubernetes SharedIndexInformer and a Lister for Argo CD Applications. @@ -135,32 +132,52 @@ that need to observe the object. newApplicationInformerAndLister use the data from the informer's cache to provide a read-optimized view of the cache which reduces the load on the API Server and hides some complexity. */ -func (ctrl *ApplicationWatcher) newApplicationInformerAndLister(refreshTimeout time.Duration, cfg config.ServerConfig) (map[string]cache.SharedIndexInformer, map[string]applisters.ApplicationLister) { - totalNamespaces := append(cfg.MonitorAppsNamespaces, cfg.ArgoCDNamespace) - - totalInformers := make(map[string]cache.SharedIndexInformer) - totalListers := make(map[string]applisters.ApplicationLister) - - for _, ns := range totalNamespaces { - log.Debug().Msgf("Creating Application informer with namespace: %s", ns) - informer := informers.NewApplicationInformer(ctrl.applicationClientset, ns, refreshTimeout, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - - lister := applisters.NewApplicationLister(informer.GetIndexer()) - if _, err := informer.AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: ctrl.onApplicationAdded, - UpdateFunc: ctrl.onApplicationUpdated, - DeleteFunc: ctrl.onApplicationDeleted, +func (ctrl *ApplicationWatcher) newApplicationInformerAndLister(refreshTimeout time.Duration, cfg config.ServerConfig) (cache.SharedIndexInformer, applisters.ApplicationLister) { + + watchNamespace := cfg.ArgoCDNamespace + // If we have at least one additional namespace configured, we need to + // watch on them all. + if len(cfg.MonitorAppsNamespaces) > 0 { + watchNamespace = "" + } + + informer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (apiruntime.Object, error) { + // We are only interested in apps that exist in namespaces the + // user wants to be enabled. + appList, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(watchNamespace).List(context.TODO(), options) + if err != nil { + return nil, err + } + newItems := []appv1alpha1.Application{} + for _, app := range appList.Items { + if ctrl.isAppNamespaceAllowed(&app, cfg) { + newItems = append(newItems, app) + } + } + appList.Items = newItems + return appList, nil }, - ); err != nil { - log.Error().Err(err).Msg("failed to add event handler") - } - totalInformers[ns] = informer - totalListers[ns] = lister - } - return totalInformers, totalListers + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return ctrl.applicationClientset.ArgoprojV1alpha1().Applications(watchNamespace).Watch(context.TODO(), options) + }, + }, + &appv1alpha1.Application{}, + refreshTimeout, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + lister := applisters.NewApplicationLister(informer.GetIndexer()) + if _, err := informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: ctrl.onApplicationAdded, + UpdateFunc: ctrl.onApplicationUpdated, + DeleteFunc: ctrl.onApplicationDeleted, + }, + ); err != nil { + log.Error().Err(err).Msg("failed to add event handler") + } + return informer, lister } func canProcessApp(obj interface{}) (*appv1alpha1.Application, bool) { diff --git a/pkg/app_watcher/app_watcher_test.go b/pkg/app_watcher/app_watcher_test.go index 56af213e..a09f6583 100644 --- a/pkg/app_watcher/app_watcher_test.go +++ b/pkg/app_watcher/app_watcher_test.go @@ -17,6 +17,7 @@ import ( func initTestObjects(t *testing.T) *ApplicationWatcher { cfg, err := config.New() + cfg.MonitorAppsNamespaces = []string{"*"} // Handle the error appropriately, e.g., log it or fail the test require.NoError(t, err, "failed to create config") @@ -41,12 +42,9 @@ func initTestObjects(t *testing.T) *ApplicationWatcher { } appInformer, appLister := ctrl.newApplicationInformerAndLister(time.Second*1, cfg) - for _, informer := range appInformer { - ctrl.appInformer = append(ctrl.appInformer, informer) - } - for _, lister := range appLister { - ctrl.appLister = append(ctrl.appLister, lister) - } + ctrl.appInformer = appInformer + ctrl.appLister = appLister + return ctrl }