Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add workers flag to validate input command #2218

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 43 additions & 20 deletions cmd/validate/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
"runtime/trace"
"sort"
"strings"
"sync"

hd "github.com/MakeNowJust/heredoc"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/enterprise-contract/ec-cli/internal/applicationsnapshot"
Expand All @@ -50,8 +50,10 @@
policy policy.Policy
policyConfiguration string
strict bool
workers int
}{
strict: true,
strict: true,
workers: 5,
}
cmd := &cobra.Command{
Use: "input",
Expand Down Expand Up @@ -116,23 +118,25 @@
policyInput []byte
}

ch := make(chan result, len(data.filePaths))
showSuccesses, _ := cmd.Flags().GetBool("show-successes")

var lock sync.WaitGroup
// Set numWorkers to the value from our flag. The default is 5.
numWorkers := data.workers

showSuccesses, _ := cmd.Flags().GetBool("show-successes")
jobs := make(chan string, len(data.filePaths))
results := make(chan result, len(data.filePaths))

for _, f := range data.filePaths {
lock.Add(1)
go func(fpath string) {
// worker function processes one file path at a time.
worker := func(id int, jobs <-chan string, results chan<- result) {
log.Debugf("Starting worker %d", id)
for fpath := range jobs {
ctx := cmd.Context()
var task *trace.Task
if trace.IsEnabled() {
ctx, task = trace.NewTask(ctx, "ec:validate-input")
trace.Logf(ctx, "", "workerID=%d, file=%s", id, fpath)

Check warning on line 137 in cmd/validate/input.go

View check run for this annotation

Codecov / codecov/patch

cmd/validate/input.go#L137

Added line #L137 was not covered by tests
}

defer lock.Done()

out, err := validate(ctx, fpath, data.policy, data.info)
res := result{
err: err,
Expand All @@ -141,7 +145,7 @@
Success: err == nil,
},
}
// Skip on err to not panic. Error is return on routine completion.

if err == nil {
res.input.Violations = out.Violations()
res.input.Warnings = out.Warnings()
Expand All @@ -151,43 +155,59 @@
if showSuccesses {
res.input.Successes = successes
}
res.data = out.Data
if containsOutput(data.output, "data") {
res.data = out.Data
}

Check warning on line 160 in cmd/validate/input.go

View check run for this annotation

Codecov / codecov/patch

cmd/validate/input.go#L159-L160

Added lines #L159 - L160 were not covered by tests
res.input.Success = (len(res.input.Violations) == 0)
res.policyInput = out.PolicyInput
}
res.input.Success = err == nil && len(res.input.Violations) == 0

if task != nil {
task.End()
}
ch <- res
}(f)
results <- res
}
log.Debugf("Done with worker %d", id)
}

lock.Wait()
close(ch)
// Start the worker pool
for i := 0; i < numWorkers; i++ {
go worker(i, jobs, results)
}

// Push all jobs (file paths) to the jobs channel
for _, f := range data.filePaths {
jobs <- f
}
close(jobs)

var inputs []input.Input
var evaluatorData [][]evaluator.Data
var manyPolicyInput [][]byte
var allErrors error = nil

for r := range ch {
// Collect all results
for i := 0; i < len(data.filePaths); i++ {
r := <-results
if r.err != nil {
e := fmt.Errorf("error validating file %s: %w", r.input.FilePath, r.err)
allErrors = errors.Join(allErrors, e)
} else {
inputs = append(inputs, r.input)
// evaluator data is duplicated per component, so only collect it once.
// evaluator data is duplicated per input, so only collect it once.
if len(evaluatorData) == 0 && containsOutput(data.output, "data") {
evaluatorData = append(evaluatorData, r.data)
}
manyPolicyInput = append(manyPolicyInput, r.policyInput)
}
}
close(results)

if allErrors != nil {
return allErrors
}

// Ensure some consistency in output.
// Sort inputs for consistent output
sort.Slice(inputs, func(i, j int) bool {
return inputs[i].FilePath > inputs[j].FilePath
})
Expand Down Expand Up @@ -240,6 +260,9 @@
violations, include the title and the description of the failed policy
rule.`))

cmd.Flags().IntVar(&data.workers, "workers", data.workers, hd.Doc(`
Number of workers to use for validation. Defaults to 5.`))

if err := cmd.MarkFlagRequired("file"); err != nil {
panic(err)
}
Expand Down
Loading
Loading