From a3c124225a3edb6879ff72f2c4f5fbe8a5a4dbc7 Mon Sep 17 00:00:00 2001 From: Abhi Kapoor Date: Mon, 16 Dec 2024 12:17:41 -0500 Subject: [PATCH] Watch applicationSet in all namespaces and update env var Signed-off-by: Abhi Kapoor --- charts/kubechecks/values.yaml | 2 +- cmd/root.go | 2 +- docs/usage.md | 2 +- pkg/app_watcher/app_watcher.go | 4 +- pkg/app_watcher/app_watcher_test.go | 2 +- pkg/app_watcher/appset_watcher.go | 110 ++++++++++++++----------- pkg/app_watcher/appset_watcher_test.go | 10 +-- pkg/config/config.go | 2 +- pkg/config/config_test.go | 4 +- 9 files changed, 75 insertions(+), 63 deletions(-) diff --git a/charts/kubechecks/values.yaml b/charts/kubechecks/values.yaml index ba20de3b..9101913f 100644 --- a/charts/kubechecks/values.yaml +++ b/charts/kubechecks/values.yaml @@ -4,7 +4,7 @@ commonLabels: {} configMap: create: false env: {} - # KUBECHECKS_MONITOR_APPS_NAMESPACES: default,namespace-a + # KUBECHECKS_ALLOWED_APPS_NAMESPACES: "*" # KUBECHECKS_ARGOCD_API_INSECURE: "false" # KUBECHECKS_ARGOCD_API_PATH_PREFIX: / # KUBECHECKS_ARGOCD_API_NAMESPACE: argocd diff --git a/cmd/root.go b/cmd/root.go index 0cd425f5..49164230 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -118,7 +118,7 @@ func init() { stringFlag(flags, "replan-comment-msg", "comment message which re-triggers kubechecks on PR.", newStringOpts(). withDefault("kubechecks again")) - stringSliceFlag(flags, "monitor-apps-namespaces", "Additional namespaces other than the ArgoCDNamespace to monitor for applications.") + stringSliceFlag(flags, "additional-apps-namespaces", "Additional namespaces other than the ArgoCDNamespace to monitor for applications.") panicIfError(viper.BindPFlags(flags)) setupLogOutput() diff --git a/docs/usage.md b/docs/usage.md index 1e9213a8..042088b3 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -36,6 +36,7 @@ The full list of supported environment variables is described below: |Env Var|Description|Default Value| |-----------|-------------|------| +|`KUBECHECKS_ADDITIONAL_APPS_NAMESPACES`|Additional namespaces other than the ArgoCDNamespace to monitor for applications.|`[]`| |`KUBECHECKS_ARGOCD_API_INSECURE`|Enable to use insecure connections over TLS to the ArgoCD API server.|`false`| |`KUBECHECKS_ARGOCD_API_NAMESPACE`|ArgoCD namespace where the application watcher will read Custom Resource Definitions (CRD) for Application and ApplicationSet resources.|`argocd`| |`KUBECHECKS_ARGOCD_API_PLAINTEXT`|Enable to use plaintext connections without TLS.|`false`| @@ -58,7 +59,6 @@ The full list of supported environment variables is described below: |`KUBECHECKS_MAX_CONCURRENCT_CHECKS`|Number of concurrent checks to run.|`32`| |`KUBECHECKS_MAX_QUEUE_SIZE`|Size of app diff check queue.|`1024`| |`KUBECHECKS_MONITOR_ALL_APPLICATIONS`|Monitor all applications in argocd automatically.|`true`| -|`KUBECHECKS_MONITOR_APPS_NAMESPACES`|Additional namespaces other than the ArgoCDNamespace to monitor for applications.|`[]`| |`KUBECHECKS_OPENAI_API_TOKEN`|OpenAI API Token.|| |`KUBECHECKS_OTEL_COLLECTOR_HOST`|The OpenTelemetry collector host.|| |`KUBECHECKS_OTEL_COLLECTOR_PORT`|The OpenTelemetry collector port.|| diff --git a/pkg/app_watcher/app_watcher.go b/pkg/app_watcher/app_watcher.go index 71dbe723..e380f36e 100644 --- a/pkg/app_watcher/app_watcher.go +++ b/pkg/app_watcher/app_watcher.go @@ -118,7 +118,7 @@ func (ctrl *ApplicationWatcher) onApplicationDeleted(obj interface{}) { } func (ctrl *ApplicationWatcher) isAppNamespaceAllowed(app *appv1alpha1.Application, cfg config.ServerConfig) bool { - return app.Namespace == cfg.ArgoCDNamespace || glob.MatchStringInList(cfg.MonitorAppsNamespaces, app.Namespace, glob.REGEXP) + return app.Namespace == cfg.ArgoCDNamespace || glob.MatchStringInList(cfg.AdditionalAppsNamespaces, app.Namespace, glob.REGEXP) } /* @@ -137,7 +137,7 @@ func (ctrl *ApplicationWatcher) newApplicationInformerAndLister(refreshTimeout t watchNamespace := cfg.ArgoCDNamespace // If we have at least one additional namespace configured, we need to // watch on them all. - if len(cfg.MonitorAppsNamespaces) > 0 { + if len(cfg.AdditionalAppsNamespaces) > 0 { watchNamespace = "" } diff --git a/pkg/app_watcher/app_watcher_test.go b/pkg/app_watcher/app_watcher_test.go index a09f6583..291959bc 100644 --- a/pkg/app_watcher/app_watcher_test.go +++ b/pkg/app_watcher/app_watcher_test.go @@ -17,7 +17,7 @@ import ( func initTestObjects(t *testing.T) *ApplicationWatcher { cfg, err := config.New() - cfg.MonitorAppsNamespaces = []string{"*"} + cfg.AdditionalAppsNamespaces = []string{"*"} // Handle the error appropriately, e.g., log it or fail the test require.NoError(t, err, "failed to create config") diff --git a/pkg/app_watcher/appset_watcher.go b/pkg/app_watcher/appset_watcher.go index 1f4f48a9..b3aaf3b4 100644 --- a/pkg/app_watcher/appset_watcher.go +++ b/pkg/app_watcher/appset_watcher.go @@ -4,17 +4,19 @@ import ( "context" "fmt" "reflect" - "sync" "time" appv1alpha1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned" - informers "github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions/application/v1alpha1" applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1" + "github.com/argoproj/argo-cd/v2/util/glob" "github.com/rs/zerolog/log" "github.com/zapier/kubechecks/pkg/appdir" "github.com/zapier/kubechecks/pkg/config" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apiruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" ) @@ -22,8 +24,8 @@ import ( // ApplicationSetWatcher is the controller that watches ArgoCD Application resources via the Kubernetes API type ApplicationSetWatcher struct { applicationClientset appclientset.Interface - appInformer []cache.SharedIndexInformer - appLister []applisters.ApplicationSetLister + appInformer cache.SharedIndexInformer + appLister applisters.ApplicationSetLister vcsToArgoMap appdir.VcsToArgoMap } @@ -39,13 +41,8 @@ func NewApplicationSetWatcher(kubeCfg *rest.Config, vcsToArgoMap appdir.VcsToArg } appInformer, appLister := ctrl.newApplicationSetInformerAndLister(time.Second*30, cfg) - for _, informer := range appInformer { - ctrl.appInformer = append(ctrl.appInformer, informer) - } - for _, lister := range appLister { - ctrl.appLister = append(ctrl.appLister, lister) - } - + ctrl.appInformer = appInformer + ctrl.appLister = appLister return &ctrl, nil } @@ -55,46 +52,65 @@ func (ctrl *ApplicationSetWatcher) Run(ctx context.Context) { defer runtime.HandleCrash() - var wg sync.WaitGroup - wg.Add(len(ctrl.appInformer)) - - for _, informer := range ctrl.appInformer { - go func(inf cache.SharedIndexInformer) { - defer wg.Done() - inf.Run(ctx.Done()) - if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { - log.Warn().Msg("Timed out waiting for caches to sync") - return - } - }(informer) + go ctrl.appInformer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), ctrl.appInformer.HasSynced) { + log.Error().Msg("Timed out waiting for caches to sync") + return } - wg.Wait() + + <-ctx.Done() } -func (ctrl *ApplicationSetWatcher) newApplicationSetInformerAndLister(refreshTimeout time.Duration, cfg config.ServerConfig) (map[string]cache.SharedIndexInformer, map[string]applisters.ApplicationSetLister) { - totalNamespaces := append(cfg.MonitorAppsNamespaces, cfg.ArgoCDNamespace) - totalInformers := make(map[string]cache.SharedIndexInformer) - totalAppSetListers := make(map[string]applisters.ApplicationSetLister) - for _, ns := range totalNamespaces { - log.Debug().Msgf("Creating ApplicationSet informer with namespace: %s", ns) - informer := informers.NewApplicationSetInformer(ctrl.applicationClientset, ns, refreshTimeout, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - ) - - AppSetLister := applisters.NewApplicationSetLister(informer.GetIndexer()) - if _, err := informer.AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: ctrl.onApplicationSetAdded, - UpdateFunc: ctrl.onApplicationSetUpdated, - DeleteFunc: ctrl.onApplicationSetDeleted, +func (ctrl *ApplicationSetWatcher) isAppNamespaceAllowed(appSet *appv1alpha1.ApplicationSet, cfg config.ServerConfig) bool { + return appSet.Namespace == cfg.ArgoCDNamespace || glob.MatchStringInList(cfg.AdditionalAppsNamespaces, appSet.Namespace, glob.REGEXP) +} + +func (ctrl *ApplicationSetWatcher) newApplicationSetInformerAndLister(refreshTimeout time.Duration, cfg config.ServerConfig) (cache.SharedIndexInformer, applisters.ApplicationSetLister) { + watchNamespace := cfg.ArgoCDNamespace + // If we have at least one additional namespace configured, we need to + // watch on them all. + if len(cfg.AdditionalAppsNamespaces) > 0 { + watchNamespace = "" + } + + informer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (apiruntime.Object, error) { + // We are only interested in apps that exist in namespaces the + // user wants to be enabled. + appList, err := ctrl.applicationClientset.ArgoprojV1alpha1().ApplicationSets(watchNamespace).List(context.TODO(), options) + if err != nil { + return nil, err + } + newItems := []appv1alpha1.ApplicationSet{} + for _, appSet := range appList.Items { + if ctrl.isAppNamespaceAllowed(&appSet, cfg) { + newItems = append(newItems, appSet) + } + } + appList.Items = newItems + return appList, nil }, - ); err != nil { - log.Error().Err(err).Msg("failed to add event handler for Application Set") - } - totalInformers[ns] = informer - totalAppSetListers[ns] = AppSetLister - } - return totalInformers, totalAppSetListers + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return ctrl.applicationClientset.ArgoprojV1alpha1().ApplicationSets(watchNamespace).Watch(context.TODO(), options) + }, + }, + &appv1alpha1.ApplicationSet{}, + refreshTimeout, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + AppSetLister := applisters.NewApplicationSetLister(informer.GetIndexer()) + if _, err := informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: ctrl.onApplicationSetAdded, + UpdateFunc: ctrl.onApplicationSetUpdated, + DeleteFunc: ctrl.onApplicationSetDeleted, + }, + ); err != nil { + log.Error().Err(err).Msg("failed to add event handler for Application Set") + } + return informer, AppSetLister } // onAdd is the function executed when the informer notifies the diff --git a/pkg/app_watcher/appset_watcher_test.go b/pkg/app_watcher/appset_watcher_test.go index c4df4262..6c0e83ab 100644 --- a/pkg/app_watcher/appset_watcher_test.go +++ b/pkg/app_watcher/appset_watcher_test.go @@ -16,6 +16,7 @@ import ( func initTestObjectsForAppSets(t *testing.T) *ApplicationSetWatcher { cfg, err := config.New() + cfg.AdditionalAppsNamespaces = []string{"*"} // Handle the error appropriately, e.g., log it or fail the test require.NoError(t, err, "failed to create config") @@ -54,13 +55,8 @@ func initTestObjectsForAppSets(t *testing.T) *ApplicationSetWatcher { } appInformer, appLister := ctrl.newApplicationSetInformerAndLister(time.Second*1, cfg) - for _, informer := range appInformer { - ctrl.appInformer = append(ctrl.appInformer, informer) - } - for _, lister := range appLister { - ctrl.appLister = append(ctrl.appLister, lister) - } - + ctrl.appInformer = appInformer + ctrl.appLister = appLister return ctrl } diff --git a/pkg/config/config.go b/pkg/config/config.go index d2f8bcd7..4171b4f8 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -68,7 +68,7 @@ type ServerConfig struct { WorstPreupgradeState pkg.CommitState `mapstructure:"worst-preupgrade-state"` // misc - MonitorAppsNamespaces []string `mapstructure:"monitor-apps-namespaces"` + AdditionalAppsNamespaces []string `mapstructure:"additional-apps-namespaces"` FallbackK8sVersion string `mapstructure:"fallback-k8s-version"` LabelFilter string `mapstructure:"label-filter"` LogLevel zerolog.Level `mapstructure:"log-level"` diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 41f6d542..b32a1674 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -19,7 +19,7 @@ func TestNew(t *testing.T) { v.Set("argocd-api-plaintext", "true") v.Set("worst-conftest-state", "warning") v.Set("repo-refresh-interval", "10m") - v.Set("monitor-apps-namespaces", "default,kube-system") + v.Set("additional-apps-namespaces", "default,kube-system") cfg, err := NewWithViper(v) require.NoError(t, err) @@ -28,5 +28,5 @@ func TestNew(t *testing.T) { assert.Equal(t, true, cfg.ArgoCDPlainText) assert.Equal(t, pkg.StateWarning, cfg.WorstConfTestState, "worst states can be overridden") assert.Equal(t, time.Minute*10, cfg.RepoRefreshInterval) - assert.Equal(t, []string{"default", "kube-system"}, cfg.MonitorAppsNamespaces) + assert.Equal(t, []string{"default", "kube-system"}, cfg.AdditionalAppsNamespaces) }