Skip to content

Commit

Permalink
handle multi source applications
Browse files Browse the repository at this point in the history
this also removes the ability to reference a schemas dir relative to the root repo, but I'm hoping it's unused, as it's complicated to reason about or manage. a central schemas repo makes more sense.
  • Loading branch information
djeebus committed Oct 21, 2024
1 parent 2d7b205 commit 5274205
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 87 deletions.
6 changes: 3 additions & 3 deletions pkg/app_watcher/app_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,14 @@ func canProcessApp(obj interface{}) (*appv1alpha1.Application, bool) {
return nil, false
}

for _, src := range app.Spec.Sources {
if src := app.Spec.Source; src != nil {
if isGitRepo(src.RepoURL) {
return app, true
}
}

if app.Spec.Source != nil {
if isGitRepo(app.Spec.Source.RepoURL) {
for _, src := range app.Spec.Sources {
if isGitRepo(src.RepoURL) {
return app, true
}
}
Expand Down
46 changes: 24 additions & 22 deletions pkg/appdir/vcstoargomap.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,37 +79,39 @@ func (v2a VcsToArgoMap) WalkKustomizeApps(cloneURL string, fs fs.FS) *AppDirecto
return result
}

func (v2a VcsToArgoMap) AddApp(app *v1alpha1.Application) {
if app.Spec.Source == nil {
log.Warn().Msgf("%s/%s: no source, skipping", app.Namespace, app.Name)
return
func (v2a VcsToArgoMap) processApp(app v1alpha1.Application, fn func(*AppDirectory)) {

if src := app.Spec.Source; src != nil {
appDirectory := v2a.GetAppsInRepo(src.RepoURL)
fn(appDirectory)
}

appDirectory := v2a.GetAppsInRepo(app.Spec.Source.RepoURL)
appDirectory.ProcessApp(*app)
for _, src := range app.Spec.Sources {
appDirectory := v2a.GetAppsInRepo(src.RepoURL)
fn(appDirectory)
}
}

func (v2a VcsToArgoMap) UpdateApp(old *v1alpha1.Application, new *v1alpha1.Application) {
if new.Spec.Source == nil {
log.Warn().Msgf("%s/%s: no source, skipping", new.Namespace, new.Name)
return
}
func (v2a VcsToArgoMap) AddApp(app *v1alpha1.Application) {
v2a.processApp(*app, func(directory *AppDirectory) {
directory.AddApp(*app)
})
}

oldAppDirectory := v2a.GetAppsInRepo(old.Spec.Source.RepoURL)
oldAppDirectory.RemoveApp(*old)
func (v2a VcsToArgoMap) UpdateApp(old *v1alpha1.Application, new *v1alpha1.Application) {
v2a.processApp(*old, func(directory *AppDirectory) {
directory.RemoveApp(*old)
})

newAppDirectory := v2a.GetAppsInRepo(new.Spec.Source.RepoURL)
newAppDirectory.ProcessApp(*new)
v2a.processApp(*new, func(directory *AppDirectory) {
directory.AddApp(*new)
})
}

func (v2a VcsToArgoMap) DeleteApp(app *v1alpha1.Application) {
if app.Spec.Source == nil {
log.Warn().Msgf("%s/%s: no source, skipping", app.Namespace, app.Name)
return
}

oldAppDirectory := v2a.GetAppsInRepo(app.Spec.Source.RepoURL)
oldAppDirectory.RemoveApp(*app)
v2a.processApp(*app, func(directory *AppDirectory) {
directory.RemoveApp(*app)
})
}

func (v2a VcsToArgoMap) GetVcsRepos() []string {
Expand Down
5 changes: 1 addition & 4 deletions pkg/checks/kubeconform/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,5 @@ import (
)

func Check(ctx context.Context, request checks.Request) (msg.Result, error) {
return argoCdAppValidate(
ctx, request.Container, request.AppName, request.KubernetesVersion, request.Repo.Directory,
request.YamlManifests,
)
return argoCdAppValidate(ctx, request.Container, request.AppName, request.KubernetesVersion, request.YamlManifests)
}
11 changes: 3 additions & 8 deletions pkg/checks/kubeconform/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"strings"

"github.com/pkg/errors"
Expand All @@ -20,7 +19,7 @@ import (

var tracer = otel.Tracer("pkg/checks/kubeconform")

func getSchemaLocations(ctx context.Context, ctr container.Container, tempRepoPath string) []string {
func getSchemaLocations(ctr container.Container) []string {
cfg := ctr.Config

locations := []string{
Expand All @@ -33,10 +32,6 @@ func getSchemaLocations(ctx context.Context, ctr container.Container, tempRepoPa
if strings.HasPrefix(schemasLocation, "http://") || strings.HasPrefix(schemasLocation, "https://") {
locations = append(locations, schemasLocation)
} else {
if !filepath.IsAbs(schemasLocation) {
schemasLocation = filepath.Join(tempRepoPath, schemasLocation)
}

if _, err := os.Stat(schemasLocation); err != nil {
log.Warn().
Err(err).
Expand Down Expand Up @@ -65,7 +60,7 @@ func getSchemaLocations(ctx context.Context, ctr container.Container, tempRepoPa
return locations
}

func argoCdAppValidate(ctx context.Context, ctr container.Container, appName, targetKubernetesVersion, tempRepoPath string, appManifests []string) (msg.Result, error) {
func argoCdAppValidate(ctx context.Context, ctr container.Container, appName, targetKubernetesVersion string, appManifests []string) (msg.Result, error) {
_, span := tracer.Start(ctx, "ArgoCdAppValidate")
defer span.End()

Expand All @@ -92,7 +87,7 @@ func argoCdAppValidate(ctx context.Context, ctr container.Container, appName, ta

var (
outputString []string
schemaLocations = getSchemaLocations(ctx, ctr, tempRepoPath)
schemaLocations = getSchemaLocations(ctr)
)

log.Debug().Msgf("cache location: %s", vOpts.Cache)
Expand Down
7 changes: 2 additions & 5 deletions pkg/checks/kubeconform/validate_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kubeconform

import (
"context"
"fmt"
"os"
"strings"
Expand All @@ -16,17 +15,15 @@ import (
)

func TestDefaultGetSchemaLocations(t *testing.T) {
ctx := context.TODO()
ctr := container.Container{}
schemaLocations := getSchemaLocations(ctx, ctr, "/some/other/path")
schemaLocations := getSchemaLocations(ctr)

// default schema location is "./schemas"
assert.Len(t, schemaLocations, 1)
assert.Equal(t, "default", schemaLocations[0])
}

func TestGetRemoteSchemaLocations(t *testing.T) {
ctx := context.TODO()
ctr := container.Container{}

if os.Getenv("CI") == "" {
Expand All @@ -39,7 +36,7 @@ func TestGetRemoteSchemaLocations(t *testing.T) {

// t.Setenv("KUBECHECKS_SCHEMAS_LOCATION", fixture.URL) // doesn't work because viper needs to initialize from root, which doesn't happen
viper.Set("schemas-location", []string{fixture.URL})
schemaLocations := getSchemaLocations(ctx, ctr, "/some/other/path")
schemaLocations := getSchemaLocations(ctr)
hasTmpDirPrefix := strings.HasPrefix(schemaLocations[0], "/tmp/schemas")
assert.Equal(t, hasTmpDirPrefix, true, "invalid schemas location. Schema location should have prefix /tmp/schemas but has %s", schemaLocations[0])
}
8 changes: 1 addition & 7 deletions pkg/events/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/zapier/kubechecks/pkg"
"github.com/zapier/kubechecks/pkg/checks"
"github.com/zapier/kubechecks/pkg/container"
"github.com/zapier/kubechecks/pkg/git"
"github.com/zapier/kubechecks/pkg/msg"
"github.com/zapier/kubechecks/telemetry"
)
Expand All @@ -22,11 +21,7 @@ type Runner struct {
wg sync.WaitGroup
}

func newRunner(
ctr container.Container, app v1alpha1.Application, repo *git.Repo,
appName, k8sVersion string, jsonManifests, yamlManifests []string,
logger zerolog.Logger, note *msg.Message, queueApp, removeApp func(application v1alpha1.Application),
) *Runner {
func newRunner(ctr container.Container, app v1alpha1.Application, appName, k8sVersion string, jsonManifests, yamlManifests []string, logger zerolog.Logger, note *msg.Message, queueApp, removeApp func(application v1alpha1.Application)) *Runner {
return &Runner{
Request: checks.Request{
App: app,
Expand All @@ -38,7 +33,6 @@ func newRunner(
Note: note,
QueueApp: queueApp,
RemoveApp: removeApp,
Repo: repo,
YamlManifests: yamlManifests,
},
}
Expand Down
108 changes: 70 additions & 38 deletions pkg/events/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,32 @@ func (w *worker) run(ctx context.Context) {
}
}

type pathAndRepoUrl struct {
Path, RepoURL, TargetRevision string
}

func getAppSources(app v1alpha1.Application) []pathAndRepoUrl {
var items []pathAndRepoUrl

if src := app.Spec.Source; src != nil {
items = append(items, pathAndRepoUrl{
Path: src.Path,
RepoURL: src.RepoURL,
TargetRevision: src.TargetRevision,
})
}

for _, src := range app.Spec.Sources {
items = append(items, pathAndRepoUrl{
Path: src.Path,
RepoURL: src.RepoURL,
TargetRevision: src.TargetRevision,
})
}

return items
}

// processApp is a function that validates and processes a given application manifest against various checks,
// such as ArgoCD schema validation, diff generation, conftest policy validation, and pre-upgrade checks using kubepug.
// It takes a context (ctx), application name (app), directory (dir) as input and returns an error if any check fails.
Expand All @@ -54,27 +80,22 @@ func (w *worker) processApp(ctx context.Context, app v1alpha1.Application) {
var (
err error

appName = app.Name
appSrc = app.Spec.Source
appPath = appSrc.Path
appRepoUrl = appSrc.RepoURL
appName = app.Name

logger = w.logger.With().
Str("app_name", appName).
Str("app_path", appPath).
Logger()
rootLogger = w.logger.With().
Str("app_name", appName).
Logger()
)

ctx, span := tracer.Start(ctx, "processApp", trace.WithAttributes(
attribute.String("app", appName),
attribute.String("dir", appPath),
))
defer span.End()

atomic.AddInt32(&inFlight, 1)
defer atomic.AddInt32(&inFlight, -1)

logger.Info().Msg("Processing app")
rootLogger.Info().Msg("Processing app")

// Build a new section for this app in the parent comment
w.vcsNote.AddNewApp(ctx, appName)
Expand All @@ -94,47 +115,58 @@ func (w *worker) processApp(ctx context.Context, app v1alpha1.Application) {
}
}()

repo, err := w.getRepo(ctx, w.ctr.VcsClient, appRepoUrl, appSrc.TargetRevision)
if err != nil {
logger.Error().Err(err).Msg("Unable to clone repository")
w.vcsNote.AddToAppMessage(ctx, appName, msg.Result{
State: pkg.StateError,
Summary: "failed to clone repo",
Details: fmt.Sprintf("Clone URL: `%s`\nTarget Revision: `%s`\n```\n%s\n```", appRepoUrl, appSrc.TargetRevision, err.Error()),
})
return
}
repoPath := repo.Directory
var jsonManifests []string
sources := getAppSources(app)
for _, appSrc := range sources {
var (
appPath = appSrc.Path
appRepoUrl = appSrc.RepoURL
logger = rootLogger.With().
Str("app_path", appPath).
Logger()
)

repo, err := w.getRepo(ctx, w.ctr.VcsClient, appRepoUrl, appSrc.TargetRevision)
if err != nil {
logger.Error().Err(err).Msg("Unable to clone repository")
w.vcsNote.AddToAppMessage(ctx, appName, msg.Result{
State: pkg.StateError,
Summary: "failed to clone repo",
Details: fmt.Sprintf("Clone URL: `%s`\nTarget Revision: `%s`\n```\n%s\n```", appRepoUrl, appSrc.TargetRevision, err.Error()),
})
return
}
repoPath := repo.Directory

logger.Debug().Str("repo_path", repoPath).Msg("Getting manifests")
someJsonManifests, err := w.ctr.ArgoClient.GetManifestsLocal(ctx, appName, repoPath, appPath, app)
if err != nil {
logger.Error().Err(err).Msg("Unable to get manifests")
w.vcsNote.AddToAppMessage(ctx, appName, msg.Result{
State: pkg.StateError,
Summary: "Unable to get manifests",
Details: fmt.Sprintf("```\n%s\n```", cleanupGetManifestsError(err, repo.Directory)),
})
return
}

logger.Debug().Str("repo_path", repoPath).Msg("Getting manifests")
jsonManifests, err := w.ctr.ArgoClient.GetManifestsLocal(ctx, appName, repoPath, appPath, app)
if err != nil {
logger.Error().Err(err).Msg("Unable to get manifests")
w.vcsNote.AddToAppMessage(ctx, appName, msg.Result{
State: pkg.StateError,
Summary: "Unable to get manifests",
Details: fmt.Sprintf("```\n%s\n```", cleanupGetManifestsError(err, repo.Directory)),
})
return
jsonManifests = append(jsonManifests, someJsonManifests...)
}

// Argo diff logic wants unformatted manifests but everything else wants them as YAML, so we prepare both
yamlManifests := argo_client.ConvertJsonToYamlManifests(jsonManifests)
logger.Trace().Msgf("Manifests:\n%+v\n", yamlManifests)
rootLogger.Trace().Msgf("Manifests:\n%+v\n", yamlManifests)

k8sVersion, err := w.ctr.ArgoClient.GetKubernetesVersionByApplication(ctx, app)
if err != nil {
logger.Error().Err(err).Msg("Error retrieving the Kubernetes version")
rootLogger.Error().Err(err).Msg("Error retrieving the Kubernetes version")
k8sVersion = w.ctr.Config.FallbackK8sVersion
} else {
k8sVersion = fmt.Sprintf("%s.0", k8sVersion)
logger.Info().Msgf("Kubernetes version: %s", k8sVersion)
rootLogger.Info().Msgf("Kubernetes version: %s", k8sVersion)
}

runner := newRunner(
w.ctr, app, repo, appName, k8sVersion, jsonManifests, yamlManifests, logger, w.vcsNote,
w.queueApp, w.removeApp,
)
runner := newRunner(w.ctr, app, appName, k8sVersion, jsonManifests, yamlManifests, rootLogger, w.vcsNote, w.queueApp, w.removeApp)

for _, processor := range w.processors {
runner.Run(ctx, processor.Name, processor.Processor, processor.WorstState)
Expand Down

0 comments on commit 5274205

Please sign in to comment.