Skip to content

Commit

Permalink
Change ack logic for items processing
Browse files Browse the repository at this point in the history
  • Loading branch information
bomoko committed Oct 1, 2023
1 parent e462013 commit 207e463
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 124 deletions.
44 changes: 28 additions & 16 deletions internal/handler/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts
func (h *Messaging) processMessageQueue(message mq.Message) {
var insights InsightsData
var resource ResourceDestination

// set up defer to ack the message after we're done processing

defer func(message mq.Message) {
// Ack to remove from queue
err := message.Ack(false)
Expand All @@ -48,6 +50,26 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
}
}(message)

acknowledgeMessage := func(message mq.Message) func() {
return func() {
// Ack to remove from queue
err := message.Ack(false)
if err != nil {
fmt.Printf("Failed to acknowledge message: %s\n", err.Error())
}
}
}(message)

rejectMessage := func(message mq.Message) func(bool) {
return func(requeue bool) {
// Ack to remove from queue
err := message.Reject(requeue)
if err != nil {
fmt.Printf("Failed to requect message: %s\n", err.Error())
}
}
}(message)

incoming := &InsightsMessage{}
json.Unmarshal(message.Body(), incoming)

Expand All @@ -56,6 +78,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
if incoming.Type == "direct.facts" || incoming.Type == "direct.problems" {
resp := processItemsDirectly(message, h)
log.Println(resp)
acknowledgeMessage()
return
}

Expand Down Expand Up @@ -108,19 +131,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
if insights.InputType != "" {
switch insights.InputType {
case "sbom", "sbom-gz":

insights.InsightsType = Sbom
// We actually want to decompress the payload here so that they're all processed the same way
//decodeGzipString(incoming.BinaryPayload[0])
//for n, d := range incoming.BinaryPayload {
// // let's try and decompress the binary payload here
// data, err := decodeGzipString(d)
// // TODO: I think there may be a potential issue here if the type isn't gzip, so should probably test
// if err != nil {
//
// }
//}

case "image", "image-gz":
insights.InsightsType = Image
case "direct":
Expand All @@ -135,10 +146,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
if h.EnableDebug {
log.Printf("[DEBUG] no payload was found")
}
err := message.Reject(false)
if err != nil {
fmt.Printf("Unable to reject payload: %s\n", err.Error())
}
rejectMessage(false)
return
}
if len(incoming.Payload) != 0 {
Expand All @@ -160,6 +168,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
err := h.sendToLagoonS3(incoming, insights, resource)
if err != nil {
log.Printf("Unable to send to S3: %s", err.Error())
// TODO: do we reque here? Reject
}
}
}
Expand All @@ -176,7 +185,10 @@ func (h *Messaging) processMessageQueue(message mq.Message) {

if err != nil {
log.Printf("Unable to send to the api: %s", err.Error())
rejectMessage(false)
return
}
}
}
acknowledgeMessage()
}
111 changes: 4 additions & 107 deletions internal/handler/trivyProcessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,29 @@ import (
"github.com/Khan/genqlient/graphql"
"github.com/aquasecurity/trivy/pkg/commands/artifact"
"github.com/aquasecurity/trivy/pkg/flag"
"github.com/aquasecurity/trivy/pkg/sbom/cyclonedx"
"github.com/aquasecurity/trivy/pkg/types"
"github.com/uselagoon/lagoon/services/insights-handler/internal/lagoonclient"
"io"
"net/http"
"os"
"os/exec"
"strings"
"sync"
"time"
)

const problemSource = "insights-handler-grype"

type sbomQueueItem struct {
EnvironmentId int
Service string
SBOM cyclonedx.BOM
}

type sbomQueue struct {
Items []sbomQueueItem
Lock sync.Mutex
GrypeLocation string
Messaging Messaging
}

var queue = sbomQueue{
Items: []sbomQueueItem{},
Lock: sync.Mutex{},
}

func SbomToProblems(apiClient graphql.Client, trivyRemoteAddress string, bomWriteDirectory string, environmentId int, service string, sbom cdx.BOM) error {
fmt.Println("AAA")
rep, err := executeProcessingTrivy(trivyRemoteAddress, bomWriteDirectory, sbom)
if err != nil {
return err
return fmt.Errorf("unable to execute trivy processing: %v", err.Error())
}
fmt.Println("BBB")
problems, err := trivyReportToProblems(environmentId, problemSource, service, rep)
if err != nil {
return err
return fmt.Errorf("unable to execute trivy processing - converting trivy report to problems: %v", err.Error())
}
fmt.Println("CCC")
err = writeProblemsArrayToApi(apiClient, environmentId, problemSource, service, problems)
if err != nil {
return err
return fmt.Errorf("unable to execute trivy processing- writing problems to api: %v", err.Error())
}
return nil
}
Expand Down Expand Up @@ -85,7 +61,6 @@ func convertBOMToProblemsArray(environment int, source string, service string, b
//here we need to ensure that there are actually vulnerabilities
if v.Ratings != nil && len(*v.Ratings) > 0 {

//TODO: this is gross, fix it.
p.Severity = lagoonclient.ProblemSeverityRating(strings.ToUpper(string((*v.Ratings)[0].Severity)))
var sevScore float64

Expand Down Expand Up @@ -120,7 +95,7 @@ func writeProblemsArrayToApi(apiClient graphql.Client, environment int, source s
return nil
}

func testTrivyServerIsAlive(trivyRemoteAddress string) (bool, error) {
func IsTrivyServerIsAlive(trivyRemoteAddress string) (bool, error) {
resp, err := http.Get(fmt.Sprintf("%v/healthz", trivyRemoteAddress))
if err != nil {
return false, err
Expand Down Expand Up @@ -253,7 +228,6 @@ func trivyReportToProblems(environment int, source string, service string, repor
Data: "{}",
AssociatedPackage: "",
Description: v.Vulnerability.Description,
// Severity:
}

if len(v.Vulnerability.References) > 0 {
Expand All @@ -269,80 +243,3 @@ func trivyReportToProblems(environment int, source string, service string, repor
fmt.Println(ret)
return ret, nil
}

func executeProcessing(grypeLocation string, bom cyclonedx.BOM) (cyclonedx.BOM, error) {
cmd := exec.Command(grypeLocation, "-o", "cyclonedx-json")
// Set up pipes for stdin, stdout, and stderr
stdin, err := cmd.StdinPipe()
if err != nil {
fmt.Println("Failed to create stdin pipe:", err)
return cyclonedx.BOM{}, err
}

stdout, err := cmd.StdoutPipe()
if err != nil {
fmt.Println("Failed to create stdout pipe:", err)
return cyclonedx.BOM{}, err
}
defer stdout.Close()

stderr, err := cmd.StderrPipe()
if err != nil {
fmt.Println("Failed to create stderr pipe:", err)
return cyclonedx.BOM{}, err
}
defer stderr.Close()

sbomString, err := json.Marshal(bom)
if err != nil {
return cyclonedx.BOM{}, err
}
//let's push the sbom into the stdin
if err := cmd.Start(); err != nil {
fmt.Println("Failed to start command:", err)
return cyclonedx.BOM{}, err
}

go func() {
defer stdin.Close()
_, err = io.WriteString(stdin, string(sbomString))
}()

if err != nil {
fmt.Println("Could not write to grype", err)
return cyclonedx.BOM{}, err
}

//execute
// Read from stdout
output := make([]byte, 0) // Buffer to store the output
buf := make([]byte, 1024) // Read buffer
for {
n, err := stdout.Read(buf)
if err != nil && err != io.EOF {
fmt.Println("Failed to read from stdout:", err)
return cyclonedx.BOM{}, err
}
if n == 0 {
break
}
output = append(output, buf[:n]...)
}

//fmt.Println("Output:", string(output))

// Wait for the command to finish
if err := cmd.Wait(); err != nil {
fmt.Println("Command execution failed:", err)
return cyclonedx.BOM{}, err
}

var ret cyclonedx.BOM
err = json.Unmarshal(output, &ret)
if err != nil {
fmt.Println("Unable to unmarshal data")
return ret, err
}

return ret, nil
}
2 changes: 1 addition & 1 deletion internal/handler/trivyProcessing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func Test_executeProcessingTrivy(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {

// check if a server is available to run the test
serverUp, err := testTrivyServerIsAlive(tt.args.trivyRemoteAddress)
serverUp, err := IsTrivyServerIsAlive(tt.args.trivyRemoteAddress)

if err != nil {
t.Errorf("Unable to connect to trivy server: %v", err.Error())
Expand Down

0 comments on commit 207e463

Please sign in to comment.