Skip to content

Commit

Permalink
Watch applications in all namespace and fitler by the ones that are a…
Browse files Browse the repository at this point in the history
…llowed

Signed-off-by: Abhi Kapoor <[email protected]>
  • Loading branch information
abhi-kapoor committed Dec 15, 2024
1 parent b14eac3 commit 7a7f11a
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 53 deletions.
111 changes: 64 additions & 47 deletions pkg/app_watcher/app_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"time"

appv1alpha1 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
appclientset "github.com/argoproj/argo-cd/v2/pkg/client/clientset/versioned"
informers "github.com/argoproj/argo-cd/v2/pkg/client/informers/externalversions/application/v1alpha1"
applisters "github.com/argoproj/argo-cd/v2/pkg/client/listers/application/v1alpha1"
"github.com/argoproj/argo-cd/v2/util/glob"
"github.com/rs/zerolog/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

Expand All @@ -24,8 +26,8 @@ import (
// ApplicationWatcher is the controller that watches ArgoCD Application resources via the Kubernetes API
type ApplicationWatcher struct {
applicationClientset appclientset.Interface
appInformer []cache.SharedIndexInformer
appLister []applisters.ApplicationLister
appInformer cache.SharedIndexInformer
appLister applisters.ApplicationLister

vcsToArgoMap appdir.VcsToArgoMap
}
Expand All @@ -45,12 +47,8 @@ func NewApplicationWatcher(kubeCfg *rest.Config, vcsToArgoMap appdir.VcsToArgoMa
}

appInformer, appLister := ctrl.newApplicationInformerAndLister(time.Second*30, cfg)
for _, informer := range appInformer {
ctrl.appInformer = append(ctrl.appInformer, informer)
}
for _, lister := range appLister {
ctrl.appLister = append(ctrl.appLister, lister)
}
ctrl.appInformer = appInformer
ctrl.appLister = appLister
return &ctrl, nil
}

Expand All @@ -60,19 +58,14 @@ func (ctrl *ApplicationWatcher) Run(ctx context.Context, processors int) {

defer runtime.HandleCrash()

var wg sync.WaitGroup
wg.Add(len(ctrl.appInformer))

for _, informer := range ctrl.appInformer {
go func(inf cache.SharedIndexInformer) {
defer wg.Done()
inf.Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
log.Warn().Msg("Timed out waiting for caches to sync")
}
}(informer)
go ctrl.appInformer.Run(ctx.Done())

if !cache.WaitForCacheSync(ctx.Done(), ctrl.appInformer.HasSynced) {
log.Error().Msg("Timed out waiting for caches to sync")
return
}
wg.Wait()

<-ctx.Done()
}

// onAdd is the function executed when the informer notifies the
Expand Down Expand Up @@ -124,6 +117,10 @@ func (ctrl *ApplicationWatcher) onApplicationDeleted(obj interface{}) {
ctrl.vcsToArgoMap.DeleteApp(app)
}

func (ctrl *ApplicationWatcher) isAppNamespaceAllowed(app *appv1alpha1.Application, cfg config.ServerConfig) bool {
return app.Namespace == cfg.ArgoCDNamespace || glob.MatchStringInList(cfg.MonitorAppsNamespaces, app.Namespace, glob.REGEXP)
}

/*
newApplicationInformerAndLister, is part of the ApplicationWatcher struct. It sets up a Kubernetes SharedIndexInformer
and a Lister for Argo CD Applications.
Expand All @@ -135,32 +132,52 @@ that need to observe the object.
newApplicationInformerAndLister use the data from the informer's cache to provide a read-optimized view of the cache which reduces
the load on the API Server and hides some complexity.
*/
func (ctrl *ApplicationWatcher) newApplicationInformerAndLister(refreshTimeout time.Duration, cfg config.ServerConfig) (map[string]cache.SharedIndexInformer, map[string]applisters.ApplicationLister) {
totalNamespaces := append(cfg.MonitorAppsNamespaces, cfg.ArgoCDNamespace)

totalInformers := make(map[string]cache.SharedIndexInformer)
totalListers := make(map[string]applisters.ApplicationLister)

for _, ns := range totalNamespaces {
log.Debug().Msgf("Creating Application informer with namespace: %s", ns)
informer := informers.NewApplicationInformer(ctrl.applicationClientset, ns, refreshTimeout,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

lister := applisters.NewApplicationLister(informer.GetIndexer())
if _, err := informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: ctrl.onApplicationAdded,
UpdateFunc: ctrl.onApplicationUpdated,
DeleteFunc: ctrl.onApplicationDeleted,
func (ctrl *ApplicationWatcher) newApplicationInformerAndLister(refreshTimeout time.Duration, cfg config.ServerConfig) (cache.SharedIndexInformer, applisters.ApplicationLister) {

watchNamespace := cfg.ArgoCDNamespace
// If we have at least one additional namespace configured, we need to
// watch on them all.
if len(cfg.MonitorAppsNamespaces) > 0 {
watchNamespace = ""
}

informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (apiruntime.Object, error) {
// We are only interested in apps that exist in namespaces the
// user wants to be enabled.
appList, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(watchNamespace).List(context.TODO(), options)
if err != nil {
return nil, err
}
newItems := []appv1alpha1.Application{}
for _, app := range appList.Items {
if ctrl.isAppNamespaceAllowed(&app, cfg) {
newItems = append(newItems, app)
}
}
appList.Items = newItems
return appList, nil
},
); err != nil {
log.Error().Err(err).Msg("failed to add event handler")
}
totalInformers[ns] = informer
totalListers[ns] = lister
}
return totalInformers, totalListers
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ctrl.applicationClientset.ArgoprojV1alpha1().Applications(watchNamespace).Watch(context.TODO(), options)
},
},
&appv1alpha1.Application{},
refreshTimeout,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
lister := applisters.NewApplicationLister(informer.GetIndexer())
if _, err := informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: ctrl.onApplicationAdded,
UpdateFunc: ctrl.onApplicationUpdated,
DeleteFunc: ctrl.onApplicationDeleted,
},
); err != nil {
log.Error().Err(err).Msg("failed to add event handler")
}
return informer, lister
}

func canProcessApp(obj interface{}) (*appv1alpha1.Application, bool) {
Expand Down
10 changes: 4 additions & 6 deletions pkg/app_watcher/app_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

func initTestObjects(t *testing.T) *ApplicationWatcher {
cfg, err := config.New()
cfg.MonitorAppsNamespaces = []string{"*"}
// Handle the error appropriately, e.g., log it or fail the test
require.NoError(t, err, "failed to create config")

Expand All @@ -41,12 +42,9 @@ func initTestObjects(t *testing.T) *ApplicationWatcher {
}

appInformer, appLister := ctrl.newApplicationInformerAndLister(time.Second*1, cfg)
for _, informer := range appInformer {
ctrl.appInformer = append(ctrl.appInformer, informer)
}
for _, lister := range appLister {
ctrl.appLister = append(ctrl.appLister, lister)
}
ctrl.appInformer = appInformer
ctrl.appLister = appLister

return ctrl
}

Expand Down

0 comments on commit 7a7f11a

Please sign in to comment.