From e369b52a7ca98148a5c56e35d845abc47b788d3b Mon Sep 17 00:00:00 2001 From: jstuart Date: Fri, 1 Nov 2024 10:49:28 -0500 Subject: [PATCH 1/3] Resolve ImageManifests concurrently This should speed up resolving manifests when there's a greater number of components. https://issues.redhat.com/browse/EC-975 --- cmd/validate/image_test.go | 35 +++--- internal/applicationsnapshot/input.go | 119 +++++++++++++-------- internal/applicationsnapshot/input_test.go | 18 ++-- 3 files changed, 102 insertions(+), 70 deletions(-) diff --git a/cmd/validate/image_test.go b/cmd/validate/image_test.go index 5d8ad1c0b..be44dada2 100644 --- a/cmd/validate/image_test.go +++ b/cmd/validate/image_test.go @@ -173,13 +173,17 @@ func Test_determineInputSpec(t *testing.T) { { "name": "single-container-app", "containerImage": "quay.io/hacbs-contract-demo/single-container-app:62c06bf" - } + }, ] }`, }, spec: &app.SnapshotSpec{ Application: "app1", Components: []app.SnapshotComponent{ + { + Name: "single-container-app", + ContainerImage: "quay.io/hacbs-contract-demo/single-container-app:62c06bf", + }, { Name: "nodejs", ContainerImage: "quay.io/hacbs-contract-demo/single-nodejs-app:877418e", @@ -188,10 +192,6 @@ func Test_determineInputSpec(t *testing.T) { Name: "petclinic", ContainerImage: "quay.io/hacbs-contract-demo/spring-petclinic:dc80a7f", }, - { - Name: "single-container-app", - ContainerImage: "quay.io/hacbs-contract-demo/single-container-app:62c06bf", - }, }, }, }, @@ -213,6 +213,10 @@ func Test_determineInputSpec(t *testing.T) { spec: &app.SnapshotSpec{ Application: "app1", Components: []app.SnapshotComponent{ + { + Name: "single-container-app", + ContainerImage: "quay.io/hacbs-contract-demo/single-container-app:62c06bf", + }, { Name: "nodejs", ContainerImage: "quay.io/hacbs-contract-demo/single-nodejs-app:877418e", @@ -221,10 +225,6 @@ func Test_determineInputSpec(t *testing.T) { Name: "petclinic", ContainerImage: "quay.io/hacbs-contract-demo/spring-petclinic:dc80a7f", }, - { - Name: "single-container-app", - ContainerImage: "quay.io/hacbs-contract-demo/single-container-app:62c06bf", - }, }, }, }, @@ -236,6 +236,10 @@ func Test_determineInputSpec(t *testing.T) { spec: &app.SnapshotSpec{ Application: "app1", Components: []app.SnapshotComponent{ + { + Name: "single-container-app", + ContainerImage: "quay.io/hacbs-contract-demo/single-container-app:62c06bf", + }, { Name: "nodejs", ContainerImage: "quay.io/hacbs-contract-demo/single-nodejs-app:877418e", @@ -244,10 +248,6 @@ func Test_determineInputSpec(t *testing.T) { Name: "petclinic", ContainerImage: "quay.io/hacbs-contract-demo/spring-petclinic:dc80a7f", }, - { - Name: "single-container-app", - ContainerImage: "quay.io/hacbs-contract-demo/single-container-app:62c06bf", - }, }, }, }, @@ -259,6 +259,10 @@ func Test_determineInputSpec(t *testing.T) { spec: &app.SnapshotSpec{ Application: "app1", Components: []app.SnapshotComponent{ + { + Name: "single-container-app", + ContainerImage: "quay.io/hacbs-contract-demo/single-container-app:62c06bf", + }, { Name: "nodejs", ContainerImage: "quay.io/hacbs-contract-demo/single-nodejs-app:877418e", @@ -267,10 +271,6 @@ func Test_determineInputSpec(t *testing.T) { Name: "petclinic", ContainerImage: "quay.io/hacbs-contract-demo/spring-petclinic:dc80a7f", }, - { - Name: "single-container-app", - ContainerImage: "quay.io/hacbs-contract-demo/single-container-app:62c06bf", - }, }, }, }, @@ -291,6 +291,7 @@ func Test_determineInputSpec(t *testing.T) { } else { assert.NoError(t, err) } + assert.Equal(t, c.spec, s) }) } diff --git a/internal/applicationsnapshot/input.go b/internal/applicationsnapshot/input.go index 9b09690cb..1b0e2194a 100644 --- a/internal/applicationsnapshot/input.go +++ b/internal/applicationsnapshot/input.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" "runtime/trace" + "sort" + "sync" "github.com/google/go-containerregistry/pkg/name" app "github.com/konflux-ci/application-api/api/v1alpha1" @@ -181,6 +183,55 @@ func readSnapshotSource(input []byte) (app.SnapshotSpec, error) { return file, nil } +func imageIndexWorker(client oci.Client, component app.SnapshotComponent, componentChan chan<- app.SnapshotComponent, errorsChan chan<- error, wg *sync.WaitGroup) { + defer wg.Done() + + componentChan <- component + + ref, err := name.ParseReference(component.ContainerImage) + if err != nil { + errorsChan <- fmt.Errorf("unable to parse container image %s: %w", component.ContainerImage, err) + return + } + + desc, err := client.Head(ref) + if err != nil { + errorsChan <- fmt.Errorf("unable to fetch descriptior for container image %s: %w", ref, err) + return + } + + if !desc.MediaType.IsIndex() { + return + } + + index, err := client.Index(ref) + if err != nil { + errorsChan <- fmt.Errorf("unable to fetch index for container image %s: %w", component.ContainerImage, err) + return + } + + indexManifest, err := index.IndexManifest() + if err != nil { + errorsChan <- fmt.Errorf("unable to fetch index manifest for container image %s: %w", component.ContainerImage, err) + return + } + + // Add the platform-specific image references (Image Manifests) to the list of components so + // each is validated as well as the multi-platform image reference (Image Index). + for i, manifest := range indexManifest.Manifests { + var arch string + if manifest.Platform != nil && manifest.Platform.Architecture != "" { + arch = manifest.Platform.Architecture + } else { + arch = fmt.Sprintf("noarch-%d", i) + } + archComponent := component + archComponent.Name = fmt.Sprintf("%s-%s-%s", component.Name, manifest.Digest, arch) + archComponent.ContainerImage = fmt.Sprintf("%s@%s", ref.Context().Name(), manifest.Digest) + componentChan <- archComponent + } +} + func expandImageIndex(ctx context.Context, snap *app.SnapshotSpec) { if trace.IsEnabled() { region := trace.StartRegion(ctx, "ec:expand-image-index") @@ -189,59 +240,39 @@ func expandImageIndex(ctx context.Context, snap *app.SnapshotSpec) { client := oci.NewClient(ctx) // For an image index, remove the original component and replace it with an expanded component with all its image manifests - var components []app.SnapshotComponent // Do not raise an error if the image is inaccessible, it will be handled as a violation when evaluated against the policy // This is to retain the original behavior of the `ec validate` command. - var allErrors error = nil - for _, component := range snap.Components { - // Assume the image is not an image index or it isn't accessible - components = append(components, component) - ref, err := name.ParseReference(component.ContainerImage) - if err != nil { - allErrors = errors.Join(allErrors, fmt.Errorf("unable to parse container image %s: %w", component.ContainerImage, err)) - continue - } - desc, err := client.Head(ref) - if err != nil { - allErrors = errors.Join(allErrors, fmt.Errorf("unable to fetch descriptior for container image %s: %w", ref, err)) - continue - } + componentChan := make(chan app.SnapshotComponent, len(snap.Components)) + errorsChan := make(chan error, len(snap.Components)) + var wg sync.WaitGroup + for _, component := range snap.Components { + wg.Add(1) + // fetch manifests concurrently + go imageIndexWorker(client, component, componentChan, errorsChan, &wg) + } - if !desc.MediaType.IsIndex() { - continue - } + go func() { + wg.Wait() + close(componentChan) + close(errorsChan) + }() - index, err := client.Index(ref) - if err != nil { - allErrors = errors.Join(allErrors, fmt.Errorf("unable to fetch index for container image %s: %w", component.ContainerImage, err)) - continue - } + var components []app.SnapshotComponent + for component := range componentChan { + components = append(components, component) + } + snap.Components = components - indexManifest, err := index.IndexManifest() - if err != nil { - allErrors = errors.Join(allErrors, fmt.Errorf("unable to fetch index manifest for container image %s: %w", component.ContainerImage, err)) - continue - } + sort.Slice(snap.Components, func(i, j int) bool { + return snap.Components[i].ContainerImage < snap.Components[j].ContainerImage + }) - // Add the platform-specific image references (Image Manifests) to the list of components so - // each is validated as well as the multi-platform image reference (Image Index). - for i, manifest := range indexManifest.Manifests { - var arch string - if manifest.Platform != nil && manifest.Platform.Architecture != "" { - arch = manifest.Platform.Architecture - } else { - arch = fmt.Sprintf("noarch-%d", i) - } - archComponent := component - archComponent.Name = fmt.Sprintf("%s-%s-%s", component.Name, manifest.Digest, arch) - archComponent.ContainerImage = fmt.Sprintf("%s@%s", ref.Context().Name(), manifest.Digest) - components = append(components, archComponent) - } + var allErrors error = nil + for err := range errorsChan { + allErrors = errors.Join(allErrors, err) } - snap.Components = components - if allErrors != nil { log.Warnf("Encountered error while checking for Image Index: %v", allErrors) } diff --git a/internal/applicationsnapshot/input_test.go b/internal/applicationsnapshot/input_test.go index 33247421e..d1ac77d32 100644 --- a/internal/applicationsnapshot/input_test.go +++ b/internal/applicationsnapshot/input_test.go @@ -119,15 +119,15 @@ func Test_DetermineInputSpec(t *testing.T) { }, want: &app.SnapshotSpec{ Components: []app.SnapshotComponent{ - snapshot.Components[0], - { - Name: "Named", - ContainerImage: "registry.io/repository/image:different", - }, { Name: "Unnamed", ContainerImage: "registry.io/repository/image:another", }, + { + Name: "Named", + ContainerImage: "registry.io/repository/image:different", + }, + snapshot.Components[0], }, }, }, @@ -140,14 +140,14 @@ func Test_DetermineInputSpec(t *testing.T) { }, want: &app.SnapshotSpec{ Components: []app.SnapshotComponent{ - { - Name: "Named", - ContainerImage: imageRef, - }, { Name: "Set name", ContainerImage: "registry.io/repository/image:another", }, + { + Name: "Named", + ContainerImage: imageRef, + }, }, }, }, From 8df63cf177f3cd2457babc2b4d7b33fd2e573966 Mon Sep 17 00:00:00 2001 From: jstuart Date: Fri, 1 Nov 2024 15:48:12 -0500 Subject: [PATCH 2/3] set concurrency limit to num workers --- internal/applicationsnapshot/input.go | 37 ++++++++++++++++----------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/internal/applicationsnapshot/input.go b/internal/applicationsnapshot/input.go index 1b0e2194a..f7c4c8991 100644 --- a/internal/applicationsnapshot/input.go +++ b/internal/applicationsnapshot/input.go @@ -22,13 +22,13 @@ import ( "fmt" "runtime/trace" "sort" - "sync" "github.com/google/go-containerregistry/pkg/name" app "github.com/konflux-ci/application-api/api/v1alpha1" log "github.com/sirupsen/logrus" "github.com/spf13/afero" "golang.org/x/exp/slices" + "golang.org/x/sync/errgroup" "sigs.k8s.io/yaml" "github.com/enterprise-contract/ec-cli/internal/kubernetes" @@ -183,10 +183,16 @@ func readSnapshotSource(input []byte) (app.SnapshotSpec, error) { return file, nil } -func imageIndexWorker(client oci.Client, component app.SnapshotComponent, componentChan chan<- app.SnapshotComponent, errorsChan chan<- error, wg *sync.WaitGroup) { - defer wg.Done() - - componentChan <- component +// For an image index, remove the original component and replace it with an expanded component with all its image manifests +// Do not raise an error if the image is inaccessible, it will be handled as a violation when evaluated against the policy +// This is to retain the original behavior of the `ec validate` command. +func imageIndexWorker(client oci.Client, component app.SnapshotComponent, componentChan chan<- []app.SnapshotComponent, errorsChan chan<- error) { + var components []app.SnapshotComponent + components = append(components, component) + // to avoid adding to componentsChan before each return + defer func() { + componentChan <- components + }() ref, err := name.ParseReference(component.ContainerImage) if err != nil { @@ -228,7 +234,7 @@ func imageIndexWorker(client oci.Client, component app.SnapshotComponent, compon archComponent := component archComponent.Name = fmt.Sprintf("%s-%s-%s", component.Name, manifest.Digest, arch) archComponent.ContainerImage = fmt.Sprintf("%s@%s", ref.Context().Name(), manifest.Digest) - componentChan <- archComponent + components = append(components, archComponent) } } @@ -239,28 +245,29 @@ func expandImageIndex(ctx context.Context, snap *app.SnapshotSpec) { } client := oci.NewClient(ctx) - // For an image index, remove the original component and replace it with an expanded component with all its image manifests - // Do not raise an error if the image is inaccessible, it will be handled as a violation when evaluated against the policy - // This is to retain the original behavior of the `ec validate` command. - componentChan := make(chan app.SnapshotComponent, len(snap.Components)) + workers := 5 + componentChan := make(chan []app.SnapshotComponent, len(snap.Components)) errorsChan := make(chan error, len(snap.Components)) - var wg sync.WaitGroup + g, _ := errgroup.WithContext(ctx) + g.SetLimit(workers) for _, component := range snap.Components { - wg.Add(1) // fetch manifests concurrently - go imageIndexWorker(client, component, componentChan, errorsChan, &wg) + g.Go(func() error { + imageIndexWorker(client, component, componentChan, errorsChan) + return nil + }) } go func() { - wg.Wait() + _ = g.Wait() close(componentChan) close(errorsChan) }() var components []app.SnapshotComponent for component := range componentChan { - components = append(components, component) + components = append(components, component...) } snap.Components = components From eb0fefacffe2e6ee0b24bc63789e34b7081ea624 Mon Sep 17 00:00:00 2001 From: jstuart Date: Mon, 4 Nov 2024 17:56:18 -0600 Subject: [PATCH 3/3] use environment variable for workers --- internal/applicationsnapshot/input.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/internal/applicationsnapshot/input.go b/internal/applicationsnapshot/input.go index f7c4c8991..fef181d06 100644 --- a/internal/applicationsnapshot/input.go +++ b/internal/applicationsnapshot/input.go @@ -20,8 +20,10 @@ import ( "context" "errors" "fmt" + "os" "runtime/trace" "sort" + "strconv" "github.com/google/go-containerregistry/pkg/name" app "github.com/konflux-ci/application-api/api/v1alpha1" @@ -36,7 +38,11 @@ import ( "github.com/enterprise-contract/ec-cli/internal/utils/oci" ) -const unnamed = "Unnamed" +const ( + unnamed = "Unnamed" + workersEnvVar = "IMAGE_INDEX_WORKERS" + defaultWorkers = 5 +) type Input struct { File string // Deprecated: replaced by images @@ -246,11 +252,10 @@ func expandImageIndex(ctx context.Context, snap *app.SnapshotSpec) { client := oci.NewClient(ctx) - workers := 5 componentChan := make(chan []app.SnapshotComponent, len(snap.Components)) errorsChan := make(chan error, len(snap.Components)) g, _ := errgroup.WithContext(ctx) - g.SetLimit(workers) + g.SetLimit(imageWorkers()) for _, component := range snap.Components { // fetch manifests concurrently g.Go(func() error { @@ -285,3 +290,13 @@ func expandImageIndex(ctx context.Context, snap *app.SnapshotSpec) { } log.Debugf("Snap component after expanding the image index is %v", snap.Components) } + +func imageWorkers() int { + workers := defaultWorkers + if value, exists := os.LookupEnv(workersEnvVar); exists { + if parsed, err := strconv.Atoi(value); err == nil { + workers = parsed + } + } + return workers +}