From a75b4c80019cbe65d1f8f07def007b7601b33dae Mon Sep 17 00:00:00 2001 From: Masoud Haghbin Date: Mon, 20 Nov 2023 13:24:36 +0100 Subject: [PATCH] add conditions for groupID --- pumps/sqs.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pumps/sqs.go b/pumps/sqs.go index d8688bd24..2858899b7 100644 --- a/pumps/sqs.go +++ b/pumps/sqs.go @@ -3,17 +3,15 @@ package pumps import ( "context" "encoding/json" + "github.com/TykTechnologies/tyk-pump/analytics" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/mitchellh/mapstructure" - "strconv" - "time" - - "github.com/TykTechnologies/tyk-pump/analytics" "github.com/sirupsen/logrus" + "time" ) type SQSSendMessageBatchAPI interface { @@ -98,15 +96,18 @@ func (s *SQSPump) Init(config interface{}) error { func (s *SQSPump) WriteData(ctx context.Context, data []interface{}) error { s.log.Info("Attempting to write ", len(data), " records...") + startTime := time.Now() messages := make([]types.SendMessageBatchRequestEntry, len(data)) for i, v := range data { decoded := v.(analytics.AnalyticsRecord) decodedMessageByteArray, _ := json.Marshal(decoded) messages[i] = types.SendMessageBatchRequestEntry{ - MessageBody: aws.String(string(decodedMessageByteArray)), - MessageGroupId: aws.String(s.SQSConf.AWSMessageGroupID), - Id: aws.String(strconv.FormatInt(time.Now().UnixNano(), 10)), + MessageBody: aws.String(string(decodedMessageByteArray)), + Id: aws.String(decoded.GetObjectID().String()), + } + if s.SQSConf.AWSMessageGroupID != "" { + messages[i].MessageGroupId = aws.String(s.SQSConf.AWSMessageGroupID) } } SQSError := s.write(ctx, messages) @@ -115,6 +116,7 @@ func (s *SQSPump) WriteData(ctx context.Context, data []interface{}) error { return SQSError } + s.log.Debug("ElapsedTime in seconds for ", len(data), " records:", time.Now().Sub(startTime)) s.log.Info("Purged ", len(data), " records...") return nil } @@ -128,7 +130,7 @@ func (s *SQSPump) write(c context.Context, messages []types.SendMessageBatchRequ } sMInput := &sqs.SendMessageBatchInput{ Entries: messages[i:end], - QueueUrl: aws.String(s.SQSConf.QueueName), + QueueUrl: s.SQSQueueURL, } if _, err := s.SQSClient.SendMessageBatch(c, sMInput); err != nil {