diff --git a/internal/handler/messaging.go b/internal/handler/messaging.go index b3091bf..9a17879 100644 --- a/internal/handler/messaging.go +++ b/internal/handler/messaging.go @@ -61,6 +61,25 @@ func (h *Messaging) processMessageQueue(message mq.Message) { } }(message) + rejectRequeue := func(message mq.Message) func(func(bool), *InsightsMessage, int, string, error) { + return func(rejectMessage func(bool), incoming *InsightsMessage, retryAttemptLimit int, target string, err error) { + incoming.RequeueAttempts++ + updatedMessage, jsonErr := json.Marshal(incoming) + if jsonErr != nil { + slog.Error(jsonErr.Error()) + } + if incoming.RequeueAttempts <= retryAttemptLimit { + rejectMessage(false) + if qErr := h.MessageQWriter(updatedMessage); qErr != nil { + slog.Error("Error re-queueing message", "Error", qErr.Error()) + } + } else { + slog.Error(fmt.Sprintf("Retries failed, unable to send to %s", target), "Error", err.Error()) + rejectMessage(false) + } + } + }(message) + incoming := &InsightsMessage{} err := json.Unmarshal(message.Body(), incoming) @@ -175,22 +194,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) { if insights.InsightsType != Direct { err := h.sendToLagoonS3(incoming, insights, resource) if err != nil { - incoming.RequeueAttempts++ - updatedMessage, err := json.Marshal(incoming) - if err != nil { - fmt.Printf(err.Error()) - } - if incoming.RequeueAttempts <= 3 { - rejectMessage(false) - if err := h.MessageQWriter(updatedMessage); err != nil { - slog.Error("Error re-queueing message", "Error", err.Error()) - } - return - } else { - slog.Error("Retries failed, unable to send to S3", "Error", err.Error()) - rejectMessage(false) - return - } + rejectRequeue(rejectMessage, incoming, 3, "S3", err) } } } @@ -206,22 +210,7 @@ func (h *Messaging) processMessageQueue(message mq.Message) { err := h.sendToLagoonAPI(incoming, resource, insights) if err != nil { - incoming.RequeueAttempts++ - updatedMessage, err := json.Marshal(incoming) - if err != nil { - fmt.Printf(err.Error()) - } - if incoming.RequeueAttempts <= 3 { - rejectMessage(false) - if err := h.MessageQWriter(updatedMessage); err != nil { - slog.Error("Error re-queueing message", "Error", err.Error()) - } - return - } else { - slog.Error("Retries failed, unable to send to the API", "Error", err.Error()) - rejectMessage(false) - return - } + rejectRequeue(rejectMessage, incoming, 3, "Lagoon API", err) } } }