Skip to content

Commit

Permalink
Updated to write to the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
CGoodwin90 committed Dec 8, 2023
1 parent 2003773 commit c243281
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 22 deletions.
11 changes: 6 additions & 5 deletions internal/handler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ type S3 struct {
}

type InsightsMessage struct {
Payload []PayloadInput `json:"payload"`
BinaryPayload map[string]string `json:"binaryPayload"`
Annotations map[string]string `json:"annotations"`
Labels map[string]string `json:"labels"`
Type string `json:"type,omitempty"`
Payload []PayloadInput `json:"payload"`
BinaryPayload map[string]string `json:"binaryPayload"`
Annotations map[string]string `json:"annotations"`
Labels map[string]string `json:"labels"`
Type string `json:"type,omitempty"`
RequeueAttempts int `json:"requeueAttempts,omitempty"`
}

type PayloadInput struct {
Expand Down
43 changes: 28 additions & 15 deletions internal/handler/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ type Messaging struct {
EnableDebug bool
ProblemsFromSBOM bool
TrivyServerEndpoint string
RequeueAttempts int
MessageQWriter func(data []byte) error
}

// NewMessaging returns a messaging with config
func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts int, startupInterval int, enableDebug bool, problemsFromSBOM bool, trivyServerEndpoint string) *Messaging {
func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts int, startupInterval int, enableDebug bool, problemsFromSBOM bool, trivyServerEndpoint string, MessageQWriter func(data []byte) error) *Messaging {
return &Messaging{
Config: config,
LagoonAPI: lagoonAPI,
Expand All @@ -33,7 +33,7 @@ func NewMessaging(config mq.Config, lagoonAPI LagoonAPI, s3 S3, startupAttempts
EnableDebug: enableDebug,
ProblemsFromSBOM: problemsFromSBOM,
TrivyServerEndpoint: trivyServerEndpoint,
RequeueAttempts: 0,
MessageQWriter: MessageQWriter,
}
}

Expand Down Expand Up @@ -175,16 +175,21 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
if insights.InsightsType != Direct {
err := h.sendToLagoonS3(incoming, insights, resource)
if err != nil {
// parse error to determine if retry is valid
if err.Error() == "Could not connect to the endpoint URL" || err.Error() == "Connect timeout on endpoint URL" {
slog.Error("Unable to send to S3", "Error", err.Error())
} else {
h.RequeueAttempts++
if h.RequeueAttempts <= 3 {
rejectMessage(true)
} else {
slog.Error("Retries failed, unable to send to S3", "Error", err.Error())
incoming.RequeueAttempts++
updatedMessage, err := json.Marshal(incoming)
if err != nil {
fmt.Printf(err.Error())
}
if incoming.RequeueAttempts <= 3 {
rejectMessage(false)
if err := h.MessageQWriter(updatedMessage); err != nil {
slog.Error("Error re-queueing message", "Error", err.Error())
}
return
} else {
slog.Error("Retries failed, unable to send to S3", "Error", err.Error())
rejectMessage(false)
return
}
}
}
Expand All @@ -201,9 +206,17 @@ func (h *Messaging) processMessageQueue(message mq.Message) {
err := h.sendToLagoonAPI(incoming, resource, insights)

if err != nil {
h.RequeueAttempts++
if h.RequeueAttempts <= 3 {
rejectMessage(true)
incoming.RequeueAttempts++
updatedMessage, err := json.Marshal(incoming)
if err != nil {
fmt.Printf(err.Error())
}
if incoming.RequeueAttempts <= 3 {
rejectMessage(false)
if err := h.MessageQWriter(updatedMessage); err != nil {
slog.Error("Error re-queueing message", "Error", err.Error())
}
return
} else {
slog.Error("Retries failed, unable to send to the API", "Error", err.Error())
rejectMessage(false)
Expand Down
38 changes: 36 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,30 @@ var (
enableDebug bool
problemsFromSBOM bool
trivyServerEndpoint string
config mq.Config
)

func mqWriteObject(data []byte) error {
messageQ, err := mq.New(config)
if err != nil {
return err
}
defer messageQ.Close()

producer, err := messageQ.SyncProducer("lagoon-handler")
if err != nil {
return err
}

err = producer.Produce(data)

if err != nil {
return err
}

return nil
}

func main() {
flag.StringVar(&lagoonAppID, "lagoon-app-id", "insights-handler", "The appID to use that will be sent with messages.")
flag.StringVar(&mqUser, "rabbitmq-username", "guest", "The username of the rabbitmq user.")
Expand Down Expand Up @@ -156,7 +178,7 @@ func main() {
slog.Error("Unable to register filters from disk", "Error", err)
}

config := mq.Config{
config = mq.Config{
ReconnectDelay: time.Duration(rabbitReconnectRetryInterval) * time.Second,
Exchanges: mq.Exchanges{
{
Expand Down Expand Up @@ -195,9 +217,20 @@ func main() {
},
},
},
Producers: mq.Producers{
{
Name: "lagoon-handler",
Exchange: "lagoon-insights",
Sync: true,
Options: mq.Options{
"delivery_mode": "2",
"headers": "",
"content_type": "",
},
},
},
DSN: fmt.Sprintf("amqp://%s:%s@%s/", broker.Username, broker.Password, broker.Hostname),
}

messaging := handler.NewMessaging(config,
graphQLConfig,
s3Config,
Expand All @@ -206,6 +239,7 @@ func main() {
enableDebug,
problemsFromSBOM,
trivyServerEndpoint,
mqWriteObject,
)

// start the consumer
Expand Down

0 comments on commit c243281

Please sign in to comment.