diff --git a/pkg/app_watcher/app_watcher.go b/pkg/app_watcher/app_watcher.go index adb39c26..e4bdfe6a 100644 --- a/pkg/app_watcher/app_watcher.go +++ b/pkg/app_watcher/app_watcher.go @@ -152,14 +152,14 @@ func canProcessApp(obj interface{}) (*appv1alpha1.Application, bool) { return nil, false } - for _, src := range app.Spec.Sources { + if src := app.Spec.Source; src != nil { if isGitRepo(src.RepoURL) { return app, true } } - if app.Spec.Source != nil { - if isGitRepo(app.Spec.Source.RepoURL) { + for _, src := range app.Spec.Sources { + if isGitRepo(src.RepoURL) { return app, true } } diff --git a/pkg/appdir/vcstoargomap.go b/pkg/appdir/vcstoargomap.go index 0becae94..eeafc070 100644 --- a/pkg/appdir/vcstoargomap.go +++ b/pkg/appdir/vcstoargomap.go @@ -79,37 +79,39 @@ func (v2a VcsToArgoMap) WalkKustomizeApps(cloneURL string, fs fs.FS) *AppDirecto return result } -func (v2a VcsToArgoMap) AddApp(app *v1alpha1.Application) { - if app.Spec.Source == nil { - log.Warn().Msgf("%s/%s: no source, skipping", app.Namespace, app.Name) - return +func (v2a VcsToArgoMap) processApp(app v1alpha1.Application, fn func(*AppDirectory)) { + + if src := app.Spec.Source; src != nil { + appDirectory := v2a.GetAppsInRepo(src.RepoURL) + fn(appDirectory) } - appDirectory := v2a.GetAppsInRepo(app.Spec.Source.RepoURL) - appDirectory.ProcessApp(*app) + for _, src := range app.Spec.Sources { + appDirectory := v2a.GetAppsInRepo(src.RepoURL) + fn(appDirectory) + } } -func (v2a VcsToArgoMap) UpdateApp(old *v1alpha1.Application, new *v1alpha1.Application) { - if new.Spec.Source == nil { - log.Warn().Msgf("%s/%s: no source, skipping", new.Namespace, new.Name) - return - } +func (v2a VcsToArgoMap) AddApp(app *v1alpha1.Application) { + v2a.processApp(*app, func(directory *AppDirectory) { + directory.AddApp(*app) + }) +} - oldAppDirectory := v2a.GetAppsInRepo(old.Spec.Source.RepoURL) - oldAppDirectory.RemoveApp(*old) +func (v2a VcsToArgoMap) UpdateApp(old *v1alpha1.Application, new *v1alpha1.Application) { + v2a.processApp(*old, func(directory *AppDirectory) { + directory.RemoveApp(*old) + }) - newAppDirectory := v2a.GetAppsInRepo(new.Spec.Source.RepoURL) - newAppDirectory.ProcessApp(*new) + v2a.processApp(*new, func(directory *AppDirectory) { + directory.AddApp(*new) + }) } func (v2a VcsToArgoMap) DeleteApp(app *v1alpha1.Application) { - if app.Spec.Source == nil { - log.Warn().Msgf("%s/%s: no source, skipping", app.Namespace, app.Name) - return - } - - oldAppDirectory := v2a.GetAppsInRepo(app.Spec.Source.RepoURL) - oldAppDirectory.RemoveApp(*app) + v2a.processApp(*app, func(directory *AppDirectory) { + directory.RemoveApp(*app) + }) } func (v2a VcsToArgoMap) GetVcsRepos() []string { diff --git a/pkg/checks/kubeconform/check.go b/pkg/checks/kubeconform/check.go index 2198470b..5dcfdb02 100644 --- a/pkg/checks/kubeconform/check.go +++ b/pkg/checks/kubeconform/check.go @@ -8,8 +8,5 @@ import ( ) func Check(ctx context.Context, request checks.Request) (msg.Result, error) { - return argoCdAppValidate( - ctx, request.Container, request.AppName, request.KubernetesVersion, request.Repo.Directory, - request.YamlManifests, - ) + return argoCdAppValidate(ctx, request.Container, request.AppName, request.KubernetesVersion, request.YamlManifests) } diff --git a/pkg/checks/kubeconform/validate.go b/pkg/checks/kubeconform/validate.go index 71439ef3..21394091 100644 --- a/pkg/checks/kubeconform/validate.go +++ b/pkg/checks/kubeconform/validate.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "os" - "path/filepath" "strings" "github.com/pkg/errors" @@ -20,7 +19,7 @@ import ( var tracer = otel.Tracer("pkg/checks/kubeconform") -func getSchemaLocations(ctx context.Context, ctr container.Container, tempRepoPath string) []string { +func getSchemaLocations(ctr container.Container) []string { cfg := ctr.Config locations := []string{ @@ -33,10 +32,6 @@ func getSchemaLocations(ctx context.Context, ctr container.Container, tempRepoPa if strings.HasPrefix(schemasLocation, "http://") || strings.HasPrefix(schemasLocation, "https://") { locations = append(locations, schemasLocation) } else { - if !filepath.IsAbs(schemasLocation) { - schemasLocation = filepath.Join(tempRepoPath, schemasLocation) - } - if _, err := os.Stat(schemasLocation); err != nil { log.Warn(). Err(err). @@ -65,7 +60,7 @@ func getSchemaLocations(ctx context.Context, ctr container.Container, tempRepoPa return locations } -func argoCdAppValidate(ctx context.Context, ctr container.Container, appName, targetKubernetesVersion, tempRepoPath string, appManifests []string) (msg.Result, error) { +func argoCdAppValidate(ctx context.Context, ctr container.Container, appName, targetKubernetesVersion string, appManifests []string) (msg.Result, error) { _, span := tracer.Start(ctx, "ArgoCdAppValidate") defer span.End() @@ -92,7 +87,7 @@ func argoCdAppValidate(ctx context.Context, ctr container.Container, appName, ta var ( outputString []string - schemaLocations = getSchemaLocations(ctx, ctr, tempRepoPath) + schemaLocations = getSchemaLocations(ctr) ) log.Debug().Msgf("cache location: %s", vOpts.Cache) diff --git a/pkg/checks/kubeconform/validate_test.go b/pkg/checks/kubeconform/validate_test.go index bd502b01..b68324cc 100644 --- a/pkg/checks/kubeconform/validate_test.go +++ b/pkg/checks/kubeconform/validate_test.go @@ -1,7 +1,6 @@ package kubeconform import ( - "context" "fmt" "os" "strings" @@ -16,9 +15,8 @@ import ( ) func TestDefaultGetSchemaLocations(t *testing.T) { - ctx := context.TODO() ctr := container.Container{} - schemaLocations := getSchemaLocations(ctx, ctr, "/some/other/path") + schemaLocations := getSchemaLocations(ctr) // default schema location is "./schemas" assert.Len(t, schemaLocations, 1) @@ -26,7 +24,6 @@ func TestDefaultGetSchemaLocations(t *testing.T) { } func TestGetRemoteSchemaLocations(t *testing.T) { - ctx := context.TODO() ctr := container.Container{} if os.Getenv("CI") == "" { @@ -39,7 +36,7 @@ func TestGetRemoteSchemaLocations(t *testing.T) { // t.Setenv("KUBECHECKS_SCHEMAS_LOCATION", fixture.URL) // doesn't work because viper needs to initialize from root, which doesn't happen viper.Set("schemas-location", []string{fixture.URL}) - schemaLocations := getSchemaLocations(ctx, ctr, "/some/other/path") + schemaLocations := getSchemaLocations(ctr) hasTmpDirPrefix := strings.HasPrefix(schemaLocations[0], "/tmp/schemas") assert.Equal(t, hasTmpDirPrefix, true, "invalid schemas location. Schema location should have prefix /tmp/schemas but has %s", schemaLocations[0]) } diff --git a/pkg/events/runner.go b/pkg/events/runner.go index 29c25242..59b0ce13 100644 --- a/pkg/events/runner.go +++ b/pkg/events/runner.go @@ -11,7 +11,6 @@ import ( "github.com/zapier/kubechecks/pkg" "github.com/zapier/kubechecks/pkg/checks" "github.com/zapier/kubechecks/pkg/container" - "github.com/zapier/kubechecks/pkg/git" "github.com/zapier/kubechecks/pkg/msg" "github.com/zapier/kubechecks/telemetry" ) @@ -22,11 +21,7 @@ type Runner struct { wg sync.WaitGroup } -func newRunner( - ctr container.Container, app v1alpha1.Application, repo *git.Repo, - appName, k8sVersion string, jsonManifests, yamlManifests []string, - logger zerolog.Logger, note *msg.Message, queueApp, removeApp func(application v1alpha1.Application), -) *Runner { +func newRunner(ctr container.Container, app v1alpha1.Application, appName, k8sVersion string, jsonManifests, yamlManifests []string, logger zerolog.Logger, note *msg.Message, queueApp, removeApp func(application v1alpha1.Application)) *Runner { return &Runner{ Request: checks.Request{ App: app, @@ -38,7 +33,6 @@ func newRunner( Note: note, QueueApp: queueApp, RemoveApp: removeApp, - Repo: repo, YamlManifests: yamlManifests, }, } diff --git a/pkg/events/worker.go b/pkg/events/worker.go index 58f0e47b..1a22063f 100644 --- a/pkg/events/worker.go +++ b/pkg/events/worker.go @@ -45,6 +45,32 @@ func (w *worker) run(ctx context.Context) { } } +type pathAndRepoUrl struct { + Path, RepoURL, TargetRevision string +} + +func getAppSources(app v1alpha1.Application) []pathAndRepoUrl { + var items []pathAndRepoUrl + + if src := app.Spec.Source; src != nil { + items = append(items, pathAndRepoUrl{ + Path: src.Path, + RepoURL: src.RepoURL, + TargetRevision: src.TargetRevision, + }) + } + + for _, src := range app.Spec.Sources { + items = append(items, pathAndRepoUrl{ + Path: src.Path, + RepoURL: src.RepoURL, + TargetRevision: src.TargetRevision, + }) + } + + return items +} + // processApp is a function that validates and processes a given application manifest against various checks, // such as ArgoCD schema validation, diff generation, conftest policy validation, and pre-upgrade checks using kubepug. // It takes a context (ctx), application name (app), directory (dir) as input and returns an error if any check fails. @@ -54,27 +80,22 @@ func (w *worker) processApp(ctx context.Context, app v1alpha1.Application) { var ( err error - appName = app.Name - appSrc = app.Spec.Source - appPath = appSrc.Path - appRepoUrl = appSrc.RepoURL + appName = app.Name - logger = w.logger.With(). - Str("app_name", appName). - Str("app_path", appPath). - Logger() + rootLogger = w.logger.With(). + Str("app_name", appName). + Logger() ) ctx, span := tracer.Start(ctx, "processApp", trace.WithAttributes( attribute.String("app", appName), - attribute.String("dir", appPath), )) defer span.End() atomic.AddInt32(&inFlight, 1) defer atomic.AddInt32(&inFlight, -1) - logger.Info().Msg("Processing app") + rootLogger.Info().Msg("Processing app") // Build a new section for this app in the parent comment w.vcsNote.AddNewApp(ctx, appName) @@ -94,47 +115,58 @@ func (w *worker) processApp(ctx context.Context, app v1alpha1.Application) { } }() - repo, err := w.getRepo(ctx, w.ctr.VcsClient, appRepoUrl, appSrc.TargetRevision) - if err != nil { - logger.Error().Err(err).Msg("Unable to clone repository") - w.vcsNote.AddToAppMessage(ctx, appName, msg.Result{ - State: pkg.StateError, - Summary: "failed to clone repo", - Details: fmt.Sprintf("Clone URL: `%s`\nTarget Revision: `%s`\n```\n%s\n```", appRepoUrl, appSrc.TargetRevision, err.Error()), - }) - return - } - repoPath := repo.Directory + var jsonManifests []string + sources := getAppSources(app) + for _, appSrc := range sources { + var ( + appPath = appSrc.Path + appRepoUrl = appSrc.RepoURL + logger = rootLogger.With(). + Str("app_path", appPath). + Logger() + ) + + repo, err := w.getRepo(ctx, w.ctr.VcsClient, appRepoUrl, appSrc.TargetRevision) + if err != nil { + logger.Error().Err(err).Msg("Unable to clone repository") + w.vcsNote.AddToAppMessage(ctx, appName, msg.Result{ + State: pkg.StateError, + Summary: "failed to clone repo", + Details: fmt.Sprintf("Clone URL: `%s`\nTarget Revision: `%s`\n```\n%s\n```", appRepoUrl, appSrc.TargetRevision, err.Error()), + }) + return + } + repoPath := repo.Directory + + logger.Debug().Str("repo_path", repoPath).Msg("Getting manifests") + someJsonManifests, err := w.ctr.ArgoClient.GetManifestsLocal(ctx, appName, repoPath, appPath, app) + if err != nil { + logger.Error().Err(err).Msg("Unable to get manifests") + w.vcsNote.AddToAppMessage(ctx, appName, msg.Result{ + State: pkg.StateError, + Summary: "Unable to get manifests", + Details: fmt.Sprintf("```\n%s\n```", cleanupGetManifestsError(err, repo.Directory)), + }) + return + } - logger.Debug().Str("repo_path", repoPath).Msg("Getting manifests") - jsonManifests, err := w.ctr.ArgoClient.GetManifestsLocal(ctx, appName, repoPath, appPath, app) - if err != nil { - logger.Error().Err(err).Msg("Unable to get manifests") - w.vcsNote.AddToAppMessage(ctx, appName, msg.Result{ - State: pkg.StateError, - Summary: "Unable to get manifests", - Details: fmt.Sprintf("```\n%s\n```", cleanupGetManifestsError(err, repo.Directory)), - }) - return + jsonManifests = append(jsonManifests, someJsonManifests...) } // Argo diff logic wants unformatted manifests but everything else wants them as YAML, so we prepare both yamlManifests := argo_client.ConvertJsonToYamlManifests(jsonManifests) - logger.Trace().Msgf("Manifests:\n%+v\n", yamlManifests) + rootLogger.Trace().Msgf("Manifests:\n%+v\n", yamlManifests) k8sVersion, err := w.ctr.ArgoClient.GetKubernetesVersionByApplication(ctx, app) if err != nil { - logger.Error().Err(err).Msg("Error retrieving the Kubernetes version") + rootLogger.Error().Err(err).Msg("Error retrieving the Kubernetes version") k8sVersion = w.ctr.Config.FallbackK8sVersion } else { k8sVersion = fmt.Sprintf("%s.0", k8sVersion) - logger.Info().Msgf("Kubernetes version: %s", k8sVersion) + rootLogger.Info().Msgf("Kubernetes version: %s", k8sVersion) } - runner := newRunner( - w.ctr, app, repo, appName, k8sVersion, jsonManifests, yamlManifests, logger, w.vcsNote, - w.queueApp, w.removeApp, - ) + runner := newRunner(w.ctr, app, appName, k8sVersion, jsonManifests, yamlManifests, rootLogger, w.vcsNote, w.queueApp, w.removeApp) for _, processor := range w.processors { runner.Run(ctx, processor.Name, processor.Processor, processor.WorstState)