diff --git a/pumps/sqs.go b/pumps/sqs.go index b560e0f5f..a00591374 100644 --- a/pumps/sqs.go +++ b/pumps/sqs.go @@ -3,6 +3,8 @@ package pumps import ( "context" "encoding/json" + "time" + "github.com/TykTechnologies/tyk-pump/analytics" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" @@ -12,7 +14,6 @@ import ( "github.com/mitchellh/mapstructure" "github.com/oklog/ulid/v2" "github.com/sirupsen/logrus" - "time" ) type SQSSendMessageBatchAPI interface { @@ -33,8 +34,10 @@ type SQSPump struct { CommonPumpConfig } -var SQSPrefix = "sqs-pump" -var SQSDefaultENV = PUMPS_ENV_PREFIX + "_SQS" + PUMPS_ENV_META_PREFIX +var ( + SQSPrefix = "sqs-pump" + SQSDefaultENV = PUMPS_ENV_PREFIX + "_SQS" + PUMPS_ENV_META_PREFIX +) // SQSConf represents the configuration structure for the Tyk Pump SQS (Simple Queue Service) pump. type SQSConf struct { @@ -56,17 +59,17 @@ type SQSConf struct { // AWSEndpoint is the custom endpoint URL for AWS SQS, if applicable. AWSEndpoint string `mapstructure:"aws_endpoint"` - // AWSDelaySeconds configures the delay (in seconds) before messages become available for processing. - AWSDelaySeconds int32 `mapstructure:"aws_delay_seconds"` - // AWSMessageGroupID specifies the message group ID for ordered processing within the SQS queue. AWSMessageGroupID string `mapstructure:"aws_message_group_id"` - // AWSSQSBatchLimit sets the maximum number of messages in a single batch when sending to the SQS queue. - AWSSQSBatchLimit int `mapstructure:"aws_sqs_batch_limit"` - // AWSMessageIDDeduplicationEnabled enables/disables message deduplication based on unique IDs. AWSMessageIDDeduplicationEnabled bool `mapstructure:"aws_message_id_deduplication_enabled"` + + // AWSDelaySeconds configures the delay (in seconds) before messages become available for processing. + AWSDelaySeconds int32 `mapstructure:"aws_delay_seconds"` + + // AWSSQSBatchLimit sets the maximum number of messages in a single batch when sending to the SQS queue. + AWSSQSBatchLimit int `mapstructure:"aws_sqs_batch_limit"` } func (s *SQSPump) New() Pump { @@ -121,8 +124,16 @@ func (s *SQSPump) WriteData(ctx context.Context, data []interface{}) error { messages := make([]types.SendMessageBatchRequestEntry, len(data)) for i, v := range data { - decoded := v.(analytics.AnalyticsRecord) - decodedMessageByteArray, _ := json.Marshal(decoded) + decoded, ok := v.(analytics.AnalyticsRecord) + if !ok { + s.log.Errorf("Unable to decode message: %v", v) + continue + } + decodedMessageByteArray, err := json.Marshal(decoded) + if err != nil { + s.log.Errorf("Unable to marshal message: %v", err) + continue + } messages[i] = types.SendMessageBatchRequestEntry{ MessageBody: aws.String(string(decodedMessageByteArray)), Id: aws.String(ulid.Make().String()), @@ -148,7 +159,7 @@ func (s *SQSPump) WriteData(ctx context.Context, data []interface{}) error { return SQSError } - s.log.Debug("ElapsedTime in seconds for ", len(data), " records:", time.Now().Sub(startTime)) + s.log.Debug("ElapsedTime in seconds for ", len(data), " records:", time.Since(startTime)) s.log.Info("Purged ", len(data), " records...") return nil } diff --git a/pumps/sqs_test.go b/pumps/sqs_test.go index b23d382df..718fb92dc 100644 --- a/pumps/sqs_test.go +++ b/pumps/sqs_test.go @@ -1,12 +1,16 @@ +// Disabling 'revive' linter as it's complaining about the 'GetQueueUrl' input, which is imported from the AWS SDK. +// +//revive:disable package pumps import ( "context" - "github.com/TykTechnologies/tyk-pump/analytics" - "github.com/aws/aws-sdk-go-v2/aws" "testing" "time" + "github.com/TykTechnologies/tyk-pump/analytics" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/stretchr/testify/assert" ) @@ -68,7 +72,7 @@ func TestSQSPump_WriteData(t *testing.T) { } func TestSQSPump_Chunks(t *testing.T) { - var Calls int = 0 + var Calls int // Mock SQS client mockSQS := &MockSQSSendMessageBatchAPI{ // Implement the required functions for testing