Skip to content

Commit

Permalink
add conditions for groupID
Browse files Browse the repository at this point in the history
  • Loading branch information
Masoud Haghbin committed Nov 20, 2023
1 parent f67cae3 commit a75b4c8
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions pumps/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@ package pumps
import (
"context"
"encoding/json"
"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/mitchellh/mapstructure"
"strconv"
"time"

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/sirupsen/logrus"
"time"
)

type SQSSendMessageBatchAPI interface {
Expand Down Expand Up @@ -98,15 +96,18 @@ func (s *SQSPump) Init(config interface{}) error {

func (s *SQSPump) WriteData(ctx context.Context, data []interface{}) error {
s.log.Info("Attempting to write ", len(data), " records...")
startTime := time.Now()

messages := make([]types.SendMessageBatchRequestEntry, len(data))
for i, v := range data {
decoded := v.(analytics.AnalyticsRecord)
decodedMessageByteArray, _ := json.Marshal(decoded)
messages[i] = types.SendMessageBatchRequestEntry{
MessageBody: aws.String(string(decodedMessageByteArray)),
MessageGroupId: aws.String(s.SQSConf.AWSMessageGroupID),
Id: aws.String(strconv.FormatInt(time.Now().UnixNano(), 10)),
MessageBody: aws.String(string(decodedMessageByteArray)),
Id: aws.String(decoded.GetObjectID().String()),
}
if s.SQSConf.AWSMessageGroupID != "" {
messages[i].MessageGroupId = aws.String(s.SQSConf.AWSMessageGroupID)
}
}
SQSError := s.write(ctx, messages)
Expand All @@ -115,6 +116,7 @@ func (s *SQSPump) WriteData(ctx context.Context, data []interface{}) error {

return SQSError
}
s.log.Debug("ElapsedTime in seconds for ", len(data), " records:", time.Now().Sub(startTime))
s.log.Info("Purged ", len(data), " records...")
return nil
}
Expand All @@ -128,7 +130,7 @@ func (s *SQSPump) write(c context.Context, messages []types.SendMessageBatchRequ
}
sMInput := &sqs.SendMessageBatchInput{
Entries: messages[i:end],
QueueUrl: aws.String(s.SQSConf.QueueName),
QueueUrl: s.SQSQueueURL,
}

if _, err := s.SQSClient.SendMessageBatch(c, sMInput); err != nil {
Expand Down

0 comments on commit a75b4c8

Please sign in to comment.