Skip to content

Commit

Permalink
linting
Browse files Browse the repository at this point in the history
  • Loading branch information
mativm02 committed Jan 23, 2024
1 parent 20d26b2 commit f6375c0
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 15 deletions.
35 changes: 23 additions & 12 deletions pumps/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package pumps
import (
"context"
"encoding/json"
"time"

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
Expand All @@ -12,7 +14,6 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/oklog/ulid/v2"
"github.com/sirupsen/logrus"
"time"
)

type SQSSendMessageBatchAPI interface {
Expand All @@ -33,8 +34,10 @@ type SQSPump struct {
CommonPumpConfig
}

var SQSPrefix = "sqs-pump"
var SQSDefaultENV = PUMPS_ENV_PREFIX + "_SQS" + PUMPS_ENV_META_PREFIX
var (
SQSPrefix = "sqs-pump"
SQSDefaultENV = PUMPS_ENV_PREFIX + "_SQS" + PUMPS_ENV_META_PREFIX
)

// SQSConf represents the configuration structure for the Tyk Pump SQS (Simple Queue Service) pump.
type SQSConf struct {
Expand All @@ -56,17 +59,17 @@ type SQSConf struct {
// AWSEndpoint is the custom endpoint URL for AWS SQS, if applicable.
AWSEndpoint string `mapstructure:"aws_endpoint"`

// AWSDelaySeconds configures the delay (in seconds) before messages become available for processing.
AWSDelaySeconds int32 `mapstructure:"aws_delay_seconds"`

// AWSMessageGroupID specifies the message group ID for ordered processing within the SQS queue.
AWSMessageGroupID string `mapstructure:"aws_message_group_id"`

// AWSSQSBatchLimit sets the maximum number of messages in a single batch when sending to the SQS queue.
AWSSQSBatchLimit int `mapstructure:"aws_sqs_batch_limit"`

// AWSMessageIDDeduplicationEnabled enables/disables message deduplication based on unique IDs.
AWSMessageIDDeduplicationEnabled bool `mapstructure:"aws_message_id_deduplication_enabled"`

// AWSDelaySeconds configures the delay (in seconds) before messages become available for processing.
AWSDelaySeconds int32 `mapstructure:"aws_delay_seconds"`

// AWSSQSBatchLimit sets the maximum number of messages in a single batch when sending to the SQS queue.
AWSSQSBatchLimit int `mapstructure:"aws_sqs_batch_limit"`
}

func (s *SQSPump) New() Pump {
Expand Down Expand Up @@ -121,8 +124,16 @@ func (s *SQSPump) WriteData(ctx context.Context, data []interface{}) error {

messages := make([]types.SendMessageBatchRequestEntry, len(data))
for i, v := range data {
decoded := v.(analytics.AnalyticsRecord)
decodedMessageByteArray, _ := json.Marshal(decoded)
decoded, ok := v.(analytics.AnalyticsRecord)
if !ok {
s.log.Errorf("Unable to decode message: %v", v)
continue
}
decodedMessageByteArray, err := json.Marshal(decoded)
if err != nil {
s.log.Errorf("Unable to marshal message: %v", err)
continue
}
messages[i] = types.SendMessageBatchRequestEntry{
MessageBody: aws.String(string(decodedMessageByteArray)),
Id: aws.String(ulid.Make().String()),
Expand All @@ -148,7 +159,7 @@ func (s *SQSPump) WriteData(ctx context.Context, data []interface{}) error {

return SQSError
}
s.log.Debug("ElapsedTime in seconds for ", len(data), " records:", time.Now().Sub(startTime))
s.log.Debug("ElapsedTime in seconds for ", len(data), " records:", time.Since(startTime))
s.log.Info("Purged ", len(data), " records...")
return nil
}
Expand Down
10 changes: 7 additions & 3 deletions pumps/sqs_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
// Disabling 'revive' linter as it's complaining about the 'GetQueueUrl' input, which is imported from the AWS SDK.
//
//revive:disable
package pumps

import (
"context"
"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/aws/aws-sdk-go-v2/aws"
"testing"
"time"

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/aws/aws-sdk-go-v2/aws"

"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -68,7 +72,7 @@ func TestSQSPump_WriteData(t *testing.T) {
}

func TestSQSPump_Chunks(t *testing.T) {
var Calls int = 0
var Calls int
// Mock SQS client
mockSQS := &MockSQSSendMessageBatchAPI{
// Implement the required functions for testing
Expand Down

0 comments on commit f6375c0

Please sign in to comment.