Skip to content

Commit

Permalink
Merge branch 'main' into more_transformers
Browse files Browse the repository at this point in the history
  • Loading branch information
bomoko committed Feb 4, 2024
2 parents 45bb25f + efc27d2 commit c10077e
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 27 deletions.
20 changes: 13 additions & 7 deletions internal/handler/insightsFactsParserFilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ func processFactsInsightsData(h *Messaging, insights InsightsData, v string, api
slog.Error("Error reading insights data", "Error", err)
}

facts := processFactsFromJSON(logger, res, source)
facts, err := processFactsFromJSON(logger, res, source)
if err != nil {
return nil, "", err
}

facts, err = KeyFactsFilter(facts)
if err != nil {
return nil, "", err
Expand All @@ -46,18 +50,17 @@ func processFactsInsightsData(h *Messaging, insights InsightsData, v string, api
return nil, "", nil
}

func processFactsFromJSON(logger *slog.Logger, facts []byte, source string) []LagoonFact {
func processFactsFromJSON(logger *slog.Logger, facts []byte, source string) ([]LagoonFact, error) {
var factsInput []LagoonFact

var factsPayload FactsPayload
err := json.Unmarshal(facts, &factsPayload)
if err != nil {
logger.Error(err.Error())
panic("Can't unmarshal facts")
return factsInput, err
}

if len(factsPayload.Facts) == 0 {
return factsInput
return factsInput, nil
}

var filteredFacts []LagoonFact
Expand All @@ -82,10 +85,13 @@ func processFactsFromJSON(logger *slog.Logger, facts []byte, source string) []La
Type: FactTypeText,
}
logger.Debug("Processing fact", "name", f.Name, "value", f.Value)
fact, _ = ProcessLagoonFactAgainstRegisteredFilters(fact, f)
fact, err = ProcessLagoonFactAgainstRegisteredFilters(fact, f)
if err != nil {
return factsInput, err
}
factsInput = append(factsInput, fact)
}
return factsInput
return factsInput, nil
}

func init() {
Expand Down
2 changes: 1 addition & 1 deletion internal/handler/insightsParserFilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func processSbomInsightsData(h *Messaging, insights InsightsData, v string, apiC

decoder := cdx.NewBOMDecoder(bytes.NewReader(b), cdx.BOMFileFormatJSON)
if err = decoder.Decode(bom); err != nil {
panic(err)
return nil, "", err
}
}

Expand Down
63 changes: 45 additions & 18 deletions internal/handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,13 @@ func (h *Messaging) Consumer() {
var err error
messageQueue, err = mq.New(h.Config)
if err != nil {
log.Println(err,
fmt.Sprintf(
"Failed to initialize message queue manager, retrying in %d seconds, attempt %d/%d",
h.ConnectionRetryInterval,
attempt,
h.ConnectionAttempts,
),
slog.Error(fmt.Sprintf(
"Failed to initialize message queue manager, retrying in %d seconds, attempt %d/%d",
h.ConnectionRetryInterval,
attempt,
h.ConnectionAttempts,
),
"error", err.Error(),
)
time.Sleep(time.Duration(h.ConnectionRetryInterval) * time.Second)
}
Expand Down Expand Up @@ -291,62 +291,81 @@ func (h *Messaging) sendToLagoonAPI(incoming *InsightsMessage, resource Resource

if insights.InputPayload == Payload && insights.LagoonType == Facts {
for _, p := range incoming.Payload {
parserFilterLoopForPayloads(insights, p, h, apiClient, resource)
err := parserFilterLoopForPayloads(insights, p, h, apiClient, resource)
if err != nil {
return err
}
}
}

if insights.InputPayload == BinaryPayload && insights.LagoonType == Facts {
for _, p := range incoming.BinaryPayload {
parserFilterLoopForBinaryPayloads(insights, p, h, apiClient, resource)
err := parserFilterLoopForBinaryPayloads(insights, p, h, apiClient, resource)
if err != nil {
return err
}
}
}

return nil
}

func parserFilterLoopForBinaryPayloads(insights InsightsData, p string, h *Messaging, apiClient graphql.Client, resource ResourceDestination) {
func parserFilterLoopForBinaryPayloads(insights InsightsData, p string, h *Messaging, apiClient graphql.Client, resource ResourceDestination) error {
for _, filter := range parserFilters {

result, source, err := filter(h, insights, p, apiClient, resource)
if err != nil {
slog.Error("Error running filter", "error", err.Error())
return err
}

processResultset(result, err, h, apiClient, resource, source)
err = processResultset(result, err, h, apiClient, resource, source)
if err != nil {
return err
}
}
return nil
}

func parserFilterLoopForPayloads(insights InsightsData, p PayloadInput, h *Messaging, apiClient graphql.Client, resource ResourceDestination) {
func parserFilterLoopForPayloads(insights InsightsData, p PayloadInput, h *Messaging, apiClient graphql.Client, resource ResourceDestination) error {
for _, filter := range parserFilters {
var result []interface{}
var source string

json, err := json.Marshal(p)
if err != nil {
slog.Error("Error marshalling data", "error", err.Error())
return err
}

result, source, err = filter(h, insights, fmt.Sprintf("%s", json), apiClient, resource)
if err != nil {
slog.Error("Error Filtering payload", "error", err.Error())
return err
}

processResultset(result, err, h, apiClient, resource, source)
err = processResultset(result, err, h, apiClient, resource, source)
if err != nil {
return err
}
}
return nil
}

// processResultset will send results as facts to the lagoon api after processing via a parser filter
func processResultset(result []interface{}, err error, h *Messaging, apiClient graphql.Client, resource ResourceDestination, source string) {
func processResultset(result []interface{}, err error, h *Messaging, apiClient graphql.Client, resource ResourceDestination, source string) error {
project, environment, apiErr := determineResourceFromLagoonAPI(apiClient, resource)
if apiErr != nil {
log.Println(apiErr)
slog.Error(apiErr.Error())
return apiErr
}

// Even if we don't find any new facts, we need to delete the existing ones
// since these may be the end product of a filter process
apiErr = h.deleteExistingFactsBySource(apiClient, environment, source, project)
if apiErr != nil {
log.Printf("%s", apiErr.Error())
slog.Error(apiErr.Error())
return apiErr
}

for _, r := range result {
Expand All @@ -355,15 +374,23 @@ func processResultset(result []interface{}, err error, h *Messaging, apiClient g
err = h.sendFactsToLagoonAPI([]LagoonFact{fact}, apiClient, resource, source)
if err != nil {
slog.Error("Error sending facts to Lagoon API", "error", err.Error())
return err
}
} else if facts, ok := r.([]LagoonFact); ok {
// Handle slice of facts
h.sendFactsToLagoonAPI(facts, apiClient, resource, source)
err = h.sendFactsToLagoonAPI(facts, apiClient, resource, source)
if err != nil {
slog.Error("Error sending facts to Lagoon API", "error", err.Error())
return err
}
} else {
// Unexpected type returned from filter()
slog.Error(fmt.Sprintf("unexpected type returned from filter(): %T\n", r))
err := fmt.Errorf("unexpected type returned from filter(): %T\n", r)
slog.Error(err.Error())
return err
}
}
return nil
}

func (h *Messaging) sendFactsToLagoonAPI(facts []LagoonFact, apiClient graphql.Client, resource ResourceDestination, source string) error {
Expand Down
1 change: 0 additions & 1 deletion internal/handler/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
err := h.sendToLagoonAPI(incoming, resource, insights)

if err != nil {
//log.Printf("Unable to send to the api: %s", err.Error())
slog.Error("Unable to send to the API", "Error", err.Error())
rejectMessage(false)
return
Expand Down

0 comments on commit c10077e

Please sign in to comment.