Skip to content

Commit

Permalink
Add workers flag to validate input command
Browse files Browse the repository at this point in the history
This commit reworks the concurrency model of `validate input` to be like
that found in `validate image`. This commit adds a `--workers` flag,
with a default of 5 that can be used to configure how many simultaneous
fetches occur. This avoids situations where validating multiple input
files can trigger HTTP 429 errors.

Signed-off-by: robnester-rh <[email protected]>
  • Loading branch information
robnester-rh committed Dec 12, 2024
1 parent b7fb577 commit 7987753
Show file tree
Hide file tree
Showing 2 changed files with 333 additions and 20 deletions.
63 changes: 43 additions & 20 deletions cmd/validate/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"runtime/trace"
"sort"
"strings"
"sync"

hd "github.com/MakeNowJust/heredoc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/enterprise-contract/ec-cli/internal/applicationsnapshot"
Expand All @@ -50,8 +50,10 @@ func validateInputCmd(validate InputValidationFunc) *cobra.Command {
policy policy.Policy
policyConfiguration string
strict bool
workers int
}{
strict: true,
strict: true,
workers: 5,
}
cmd := &cobra.Command{
Use: "input",
Expand Down Expand Up @@ -116,23 +118,25 @@ func validateInputCmd(validate InputValidationFunc) *cobra.Command {
policyInput []byte
}

ch := make(chan result, len(data.filePaths))
showSuccesses, _ := cmd.Flags().GetBool("show-successes")

var lock sync.WaitGroup
// Set numWorkers to the value from our flag. The default is 5.
numWorkers := data.workers

showSuccesses, _ := cmd.Flags().GetBool("show-successes")
jobs := make(chan string, len(data.filePaths))
results := make(chan result, len(data.filePaths))

for _, f := range data.filePaths {
lock.Add(1)
go func(fpath string) {
// worker function processes one file path at a time.
worker := func(id int, jobs <-chan string, results chan<- result) {
log.Debugf("Starting worker %d", id)
for fpath := range jobs {
ctx := cmd.Context()
var task *trace.Task
if trace.IsEnabled() {
ctx, task = trace.NewTask(ctx, "ec:validate-input")
trace.Logf(ctx, "", "workerID=%d, file=%s", id, fpath)
}

defer lock.Done()

out, err := validate(ctx, fpath, data.policy, data.info)
res := result{
err: err,
Expand All @@ -141,7 +145,7 @@ func validateInputCmd(validate InputValidationFunc) *cobra.Command {
Success: err == nil,
},
}
// Skip on err to not panic. Error is return on routine completion.

if err == nil {
res.input.Violations = out.Violations()
res.input.Warnings = out.Warnings()
Expand All @@ -151,43 +155,59 @@ func validateInputCmd(validate InputValidationFunc) *cobra.Command {
if showSuccesses {
res.input.Successes = successes
}
res.data = out.Data
if containsOutput(data.output, "data") {
res.data = out.Data
}
res.input.Success = (len(res.input.Violations) == 0)
res.policyInput = out.PolicyInput
}
res.input.Success = err == nil && len(res.input.Violations) == 0

if task != nil {
task.End()
}
ch <- res
}(f)
results <- res
}
log.Debugf("Done with worker %d", id)
}

lock.Wait()
close(ch)
// Start the worker pool
for i := 0; i < numWorkers; i++ {
go worker(i, jobs, results)
}

// Push all jobs (file paths) to the jobs channel
for _, f := range data.filePaths {
jobs <- f
}
close(jobs)

var inputs []input.Input
var evaluatorData [][]evaluator.Data
var manyPolicyInput [][]byte
var allErrors error = nil

for r := range ch {
// Collect all results
for i := 0; i < len(data.filePaths); i++ {
r := <-results
if r.err != nil {
e := fmt.Errorf("error validating file %s: %w", r.input.FilePath, r.err)
allErrors = errors.Join(allErrors, e)
} else {
inputs = append(inputs, r.input)
// evaluator data is duplicated per component, so only collect it once.
// evaluator data is duplicated per input, so only collect it once.
if len(evaluatorData) == 0 && containsOutput(data.output, "data") {
evaluatorData = append(evaluatorData, r.data)
}
manyPolicyInput = append(manyPolicyInput, r.policyInput)
}
}
close(results)

if allErrors != nil {
return allErrors
}

// Ensure some consistency in output.
// Sort inputs for consistent output
sort.Slice(inputs, func(i, j int) bool {
return inputs[i].FilePath > inputs[j].FilePath
})
Expand Down Expand Up @@ -240,6 +260,9 @@ func validateInputCmd(validate InputValidationFunc) *cobra.Command {
violations, include the title and the description of the failed policy
rule.`))

cmd.Flags().IntVar(&data.workers, "workers", data.workers, hd.Doc(`
Number of workers to use for validation. Defaults to 5.`))

if err := cmd.MarkFlagRequired("file"); err != nil {
panic(err)
}
Expand Down
Loading

0 comments on commit 7987753

Please sign in to comment.