diff --git a/README.md b/README.md index aac70ef3b..6e74388a4 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,7 @@ The table below provides details on the fields within each `tyk_analytics` recor - [Kafka](#kafka-config) - [Stdout](#stdout) (i.e. for use by Datadog logging agent in Kubernetes) - [Timestream](#timestream-config) +- [AWS SQS](#SQS-config) # Configuration: @@ -1268,6 +1269,76 @@ TYK_PMP_PUMPS_CSV_TYPE=csv TYK_PMP_PUMPS_CSV_META_CSVDIR=./ ``` +## SQS Config + +#### Authentication & Prerequisite + +We must authenticate ourselves by providing credentials to AWS. This pump uses the official AWS GO SDK, so instructions on how to authenticate can be found on [their documentation here](https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials). + +#### Config Fields + +`aws_queue_name` - Specifies the name of the AWS Simple Queue Service (SQS) queue where messages will be sent + +`aws_message_group_id` - Specifies the name of the AWS Simple Queue Service (SQS) queue where messages will be sent + +`aws_sqs_batch_limit` - Sets the maximum number of messages to include in a single batch when sending messages to the SQS queue + +`aws_message_id_deduplication_enabled` - Enables or disables the deduplication of messages based on unique message IDs to prevent unintended duplicates in the queu + +`aws_delay_seconds` - Configures the delay (in seconds) before messages sent to the SQS queue become available for processing. + +When you initialize a Timestream Pump, the SDK uses its default credential chain to find AWS credentials. This default credential chain looks for credentials in the following order: + +- Environment variables. + - Static Credentials (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN`) + - Web Identity Token (`AWS_WEB_IDENTITY_TOKEN_FILE`) +- Shared configuration files. + - SDK defaults to credentials file under `.aws` folder that is placed in the home folder on your computer. +- If your application uses an ECS task definition or RunTask API operation, IAM role for tasks. +- If your application is running on an Amazon EC2 instance, IAM role for Amazon EC2. + +If no credentials are provided, SQS Pump won't be able to connect. + +###### JSON / Conf File + +```json + "sqs": { + "type": "sqs", + "meta": { + "log_field_name": "tyk-analytics-record", + "format": "json", + "aws_queue_name": "access-logs-queue.fifo", + "aws_region": "us-east-1", + "aws_key": "key", + "aws_secret": "secret", + "aws_endpoint": "http://aws-endpoint:4566", + "aws_message_group_id": "message_group_id", + "aws_sqs_batch_limit": 10, + "aws_message_id_deduplication_enabled": true, + "aws_delay_seconds": 0 + } + }, +``` + +###### Env Variables + +``` +# SQS Pump Configuration +TYK_PMP_PUMPS_SQS_TYPE=sqs +TYK_PMP_PUMPS_SQS_META_LOGFIELDNAME=tyk-analytics-record +TYK_PMP_PUMPS_SQS_META_FORMAT=json +TYK_PMP_PUMPS_SQS_META_AWSQUEUENAME=access-logs-queue.fifo +TYK_PMP_PUMPS_SQS_META_AWSREGION=us-east-1 +TYK_PMP_PUMPS_SQS_META_AWSKEY=key +TYK_PMP_PUMPS_SQS_META_AWSSECRET=secret +TYK_PMP_PUMPS_SQS_META_AWSENDPOINT=http://aws-endpoint:4566 +TYK_PMP_PUMPS_SQS_META_AWSMESSAGEGROUPID=message_group_id +TYK_PMP_PUMPS_SQS_META_AWSSQSBATCHLIMIT=10 +TYK_PMP_PUMPS_SQS_META_AWSMESSAGEIDDEDUPLICATIONENABLED=true +TYK_PMP_PUMPS_SQS_META_AWSDELAYSECONDS=0 + +``` + # Base Pump Configurations The following configurations can be added to any Pump. Keep reading for an example. diff --git a/go.mod b/go.mod index 1859a4523..ca1f98595 100644 --- a/go.mod +++ b/go.mod @@ -7,8 +7,10 @@ require ( github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9 github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632 github.com/TykTechnologies/storage v1.0.8 - github.com/aws/aws-sdk-go-v2 v1.16.14 + github.com/aws/aws-sdk-go-v2 v1.22.1 github.com/aws/aws-sdk-go-v2/config v1.9.0 + github.com/aws/aws-sdk-go-v2/credentials v1.5.0 + github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0 github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0 github.com/cenkalti/backoff/v4 v4.0.2 github.com/fatih/structs v1.1.0 @@ -24,6 +26,7 @@ require ( github.com/logzio/logzio-go v0.0.0-20200316143903-ac8fc0e2910e github.com/mitchellh/mapstructure v1.3.1 github.com/moesif/moesifapi-go v1.0.6 + github.com/oklog/ulid/v2 v2.1.0 github.com/olivere/elastic/v7 v7.0.28 github.com/oschwald/maxminddb-golang v1.11.0 github.com/pkg/errors v0.9.1 @@ -55,16 +58,15 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.5.0 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.5.0 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.8.0 // indirect - github.com/aws/smithy-go v1.13.2 // indirect + github.com/aws/smithy-go v1.16.0 // indirect github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect diff --git a/go.sum b/go.sum index b7cf4aeb4..59c39d931 100644 --- a/go.sum +++ b/go.sum @@ -60,24 +60,28 @@ github.com/aws/aws-sdk-go v1.29.11/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTg github.com/aws/aws-sdk-go v1.40.32/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= github.com/aws/aws-sdk-go-v2 v1.10.0/go.mod h1:U/EyyVvKtzmFeQQcca7eBotKdlpcP2zzU6bXBYcf7CE= github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= -github.com/aws/aws-sdk-go-v2 v1.16.14 h1:db6GvO4Z2UqHt5gvT0lr6J5x5P+oQ7bdRzczVaRekMU= -github.com/aws/aws-sdk-go-v2 v1.16.14/go.mod h1:s/G+UV29dECbF5rf+RNj1xhlmvoNurGSr+McVSRj59w= +github.com/aws/aws-sdk-go-v2 v1.22.1 h1:sjnni/AuoTXxHitsIdT0FwmqUuNUuHtufcVDErVFT9U= +github.com/aws/aws-sdk-go-v2 v1.22.1/go.mod h1:Kd0OJtkW3Q0M0lUWGszapWjEvrXDzRW+D21JNsroB+c= github.com/aws/aws-sdk-go-v2/config v1.9.0 h1:SkREVSwi+J8MSdjhJ96jijZm5ZDNleI0E4hHCNivh7s= github.com/aws/aws-sdk-go-v2/config v1.9.0/go.mod h1:qhK5NNSgo9/nOSMu3HyE60WHXZTWTHTgd5qtIF44vOQ= github.com/aws/aws-sdk-go-v2/credentials v1.5.0 h1:r6470olsn2qyOe2aLzK6q+wfO3dzNcMujRT3gqBgBB8= github.com/aws/aws-sdk-go-v2/credentials v1.5.0/go.mod h1:kvqTkpzQmzri9PbsiTY+LvwFzM0gY19emlAWwBOJMb0= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0 h1:FKaqk7geL3oIqSwGJt5SWUKj8uJ+qLZNqlBuqq6sFyA= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0/go.mod h1:KqEkRkxm/+1Pd/rENRNbQpfblDBYeg5HDSqjB6ks8hA= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 h1:XJLnluKuUxQG255zPNe+04izXl7GSyUVafIsgfv9aw4= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 h1:EauRoYZVNPlidZSZJDscjJBQ22JhVF2+tdteatax2Ak= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 h1:fi1ga6WysOyYb5PAf3Exd6B5GiSNpnZim4h1rhlBqx0= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1/go.mod h1:V5CY8wNurvPUibTi9mwqUqpiFZ5LnioKWIFUDtIzdI8= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 h1:ZpaV/j48RlPc4AmOZuPv22pJliXjXq8/reL63YzyFnw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1/go.mod h1:R8aXraabD2e3qv1csxM14/X9WF4wFMIY0kH4YEtYD5M= github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 h1:zPxLGWALExNepElO0gYgoqsbqTlt4ZCrhZ7XlfJ+Qlw= github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5/go.mod h1:6ZBTuDmvpCOD4Sf1i2/I3PgftlEcDGgvi8ocq64oQEg= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+IpkVIuDvIkm9Q0DEjtWHnh6ITDoZo8fH2dIjlqQ= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h1:zOyLMYyg60yyZpOCniAUuibWVqTU4TuLmMa/Wh4P+HA= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0 h1:/T5wKsw/po118HEDvnSE8YU7TESxvZbYM2rnn+Oi7Kk= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0/go.mod h1:X5/JuOxPLU/ogICgDTtnpfaQzdQJO0yKDcpoxWLLJ8Y= +github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0 h1:21QmEZkOnaJ4SPRFhhN+8MV5ewb0j1lxTg+RPp0mUeE= +github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0/go.mod h1:E02a07/HTyJEHFpp+WMRh33xuNVdsd8WCbLlODeT4lU= github.com/aws/aws-sdk-go-v2/service/sso v1.5.0 h1:VnrCAJTp1bDxU79UuW/D4z7bwZ7xOc7JjDKpqXL/m04= github.com/aws/aws-sdk-go-v2/service/sso v1.5.0/go.mod h1:GsqaJOJeOfeYD88/2vHWKXegvDRofDqWwC5i48A2kgs= github.com/aws/aws-sdk-go-v2/service/sts v1.8.0 h1:7N7RsEVvUcvEg7jrWKU5AnSi4/6b6eY9+wG1g6W4ExE= @@ -86,8 +90,8 @@ github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0 h1:/4djuASUYOns1ZhCO github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0/go.mod h1:VN4yDJwgYOO6AzHPE8+QeBwK6wUMOFkSCogZFWifdVc= github.com/aws/smithy-go v1.8.1/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= -github.com/aws/smithy-go v1.13.2 h1:TBLKyeJfXTrTXRHmsv4qWt9IQGYyWThLYaJWSahTOGE= -github.com/aws/smithy-go v1.13.2/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= +github.com/aws/smithy-go v1.16.0 h1:gJZEH/Fqh+RsvlJ1Zt4tVAtV6bKkp3cC+R6FCZMNzik= +github.com/aws/smithy-go v1.16.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE= github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280 h1:ZgW7EEoTQvz27wleAVF3XVBqc6eBFqB4BNw4Awg4BN8= github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw= github.com/benbjohnson/tmpl v1.1.0/go.mod h1:N7W0NUGWuG26caFrID5sE4tvyLaKVp1fbV3Vr+MCul8= @@ -218,7 +222,6 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= @@ -398,6 +401,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= +github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= github.com/olivere/elastic v6.2.31+incompatible h1:zwJIIsgfiDBuDS3sb6MCbm/e03BPEJoGZvqevZXM254= github.com/olivere/elastic v6.2.31+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8= github.com/olivere/elastic/v7 v7.0.12/go.mod h1:14rWX28Pnh3qCKYRVnSGXWLf9MbLonYS/4FDCY3LAPo= @@ -422,6 +427,7 @@ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYr github.com/oschwald/maxminddb-golang v1.11.0 h1:aSXMqYR/EPNjGE8epgqwDay+P30hCBZIveY0WZbAWh0= github.com/oschwald/maxminddb-golang v1.11.0/go.mod h1:YmVI+H0zh3ySFR3w+oz8PCfglAFj3PuCmui13+P9zDg= github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= +github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= diff --git a/pumps/init.go b/pumps/init.go index 86f9393a6..544988ef4 100644 --- a/pumps/init.go +++ b/pumps/init.go @@ -36,4 +36,5 @@ func init() { AvailablePumps["sql-graph"] = &GraphSQLPump{} AvailablePumps["sql-graph-aggregate"] = &GraphSQLAggregatePump{} AvailablePumps["resurfaceio"] = &ResurfacePump{} + AvailablePumps["sqs"] = &SQSPump{} } diff --git a/pumps/sqs.go b/pumps/sqs.go new file mode 100644 index 000000000..a00591374 --- /dev/null +++ b/pumps/sqs.go @@ -0,0 +1,206 @@ +package pumps + +import ( + "context" + "encoding/json" + "time" + + "github.com/TykTechnologies/tyk-pump/analytics" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/mitchellh/mapstructure" + "github.com/oklog/ulid/v2" + "github.com/sirupsen/logrus" +) + +type SQSSendMessageBatchAPI interface { + GetQueueUrl(ctx context.Context, + params *sqs.GetQueueUrlInput, + optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error) + + SendMessageBatch(ctx context.Context, + params *sqs.SendMessageBatchInput, + optFns ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error) +} + +type SQSPump struct { + SQSClient SQSSendMessageBatchAPI + SQSQueueURL *string + SQSConf *SQSConf + log *logrus.Entry + CommonPumpConfig +} + +var ( + SQSPrefix = "sqs-pump" + SQSDefaultENV = PUMPS_ENV_PREFIX + "_SQS" + PUMPS_ENV_META_PREFIX +) + +// SQSConf represents the configuration structure for the Tyk Pump SQS (Simple Queue Service) pump. +type SQSConf struct { + // EnvPrefix defines the prefix for environment variables related to this SQS configuration. + EnvPrefix string `mapstructure:"meta_env_prefix"` + + // QueueName specifies the name of the AWS Simple Queue Service (SQS) queue for message delivery. + QueueName string `mapstructure:"aws_queue_name"` + + // AWSRegion sets the AWS region where the SQS queue is located. + AWSRegion string `mapstructure:"aws_region"` + + // AWSSecret is the AWS secret key used for authentication. + AWSSecret string `mapstructure:"aws_secret"` + + // AWSKey is the AWS access key ID used for authentication. + AWSKey string `mapstructure:"aws_key"` + + // AWSEndpoint is the custom endpoint URL for AWS SQS, if applicable. + AWSEndpoint string `mapstructure:"aws_endpoint"` + + // AWSMessageGroupID specifies the message group ID for ordered processing within the SQS queue. + AWSMessageGroupID string `mapstructure:"aws_message_group_id"` + + // AWSMessageIDDeduplicationEnabled enables/disables message deduplication based on unique IDs. + AWSMessageIDDeduplicationEnabled bool `mapstructure:"aws_message_id_deduplication_enabled"` + + // AWSDelaySeconds configures the delay (in seconds) before messages become available for processing. + AWSDelaySeconds int32 `mapstructure:"aws_delay_seconds"` + + // AWSSQSBatchLimit sets the maximum number of messages in a single batch when sending to the SQS queue. + AWSSQSBatchLimit int `mapstructure:"aws_sqs_batch_limit"` +} + +func (s *SQSPump) New() Pump { + newPump := SQSPump{} + return &newPump +} + +func (s *SQSPump) GetName() string { + return "SQS Pump" +} + +func (s *SQSPump) GetEnvPrefix() string { + return s.SQSConf.EnvPrefix +} + +func (s *SQSPump) Init(config interface{}) error { + s.SQSConf = &SQSConf{} + s.log = log.WithField("prefix", SQSPrefix) + + err := mapstructure.Decode(config, &s.SQSConf) + if err != nil { + s.log.Fatal("Failed to decode configuration: ", err) + return err + } + + processPumpEnvVars(s, s.log, s.SQSConf, SQSDefaultENV) + + s.SQSClient, err = s.NewSQSPublisher() + if err != nil { + s.log.Fatal("Failed to create sqs client: ", err) + return err + } + // Get URL of queue + gQInput := &sqs.GetQueueUrlInput{ + QueueName: aws.String(s.SQSConf.QueueName), + } + + result, err := s.SQSClient.GetQueueUrl(context.TODO(), gQInput) + if err != nil { + return err + } + s.SQSQueueURL = result.QueueUrl + + s.log.Info(s.GetName() + " Initialized") + + return nil +} + +func (s *SQSPump) WriteData(ctx context.Context, data []interface{}) error { + s.log.Info("Attempting to write ", len(data), " records...") + startTime := time.Now() + + messages := make([]types.SendMessageBatchRequestEntry, len(data)) + for i, v := range data { + decoded, ok := v.(analytics.AnalyticsRecord) + if !ok { + s.log.Errorf("Unable to decode message: %v", v) + continue + } + decodedMessageByteArray, err := json.Marshal(decoded) + if err != nil { + s.log.Errorf("Unable to marshal message: %v", err) + continue + } + messages[i] = types.SendMessageBatchRequestEntry{ + MessageBody: aws.String(string(decodedMessageByteArray)), + Id: aws.String(ulid.Make().String()), + } + if s.SQSConf.AWSMessageGroupID != "" { + messages[i].MessageGroupId = aws.String(s.SQSConf.AWSMessageGroupID) + } + if s.SQSConf.AWSDelaySeconds != 0 { + messages[i].DelaySeconds = s.SQSConf.AWSDelaySeconds + } + + // for FIFO SQS + if s.SQSConf.AWSMessageGroupID != "" { + messages[i].MessageGroupId = aws.String(s.SQSConf.AWSMessageGroupID) + } + if s.SQSConf.AWSMessageIDDeduplicationEnabled { + messages[i].MessageDeduplicationId = messages[i].Id + } + } + SQSError := s.write(ctx, messages) + if SQSError != nil { + s.log.WithError(SQSError).Error("unable to write message") + + return SQSError + } + s.log.Debug("ElapsedTime in seconds for ", len(data), " records:", time.Since(startTime)) + s.log.Info("Purged ", len(data), " records...") + return nil +} + +func (s *SQSPump) write(c context.Context, messages []types.SendMessageBatchRequestEntry) error { + log.Debug(messages) + for i := 0; i < len(messages); i += s.SQSConf.AWSSQSBatchLimit { + end := i + s.SQSConf.AWSSQSBatchLimit + + if end > len(messages) { + end = len(messages) + } + sMInput := &sqs.SendMessageBatchInput{ + Entries: messages[i:end], + QueueUrl: s.SQSQueueURL, + } + + if _, err := s.SQSClient.SendMessageBatch(c, sMInput); err != nil { + return err + } + } + + return nil +} + +func (s *SQSPump) NewSQSPublisher() (c *sqs.Client, err error) { + cfg, err := config.LoadDefaultConfig(context.TODO(), + config.WithRegion(s.SQSConf.AWSRegion), + ) + if err != nil { + return nil, err + } + + client := sqs.NewFromConfig(cfg, func(options *sqs.Options) { + if s.SQSConf.AWSEndpoint != "" { + options.BaseEndpoint = aws.String(s.SQSConf.AWSEndpoint) + } + if s.SQSConf.AWSKey != "" && s.SQSConf.AWSSecret != "" { + options.Credentials = credentials.NewStaticCredentialsProvider(s.SQSConf.AWSKey, s.SQSConf.AWSSecret, "") + } + }) + + return client, nil +} diff --git a/pumps/sqs_test.go b/pumps/sqs_test.go new file mode 100644 index 000000000..718fb92dc --- /dev/null +++ b/pumps/sqs_test.go @@ -0,0 +1,117 @@ +// Disabling 'revive' linter as it's complaining about the 'GetQueueUrl' input, which is imported from the AWS SDK. +// +//revive:disable +package pumps + +import ( + "context" + "testing" + "time" + + "github.com/TykTechnologies/tyk-pump/analytics" + "github.com/aws/aws-sdk-go-v2/aws" + + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/stretchr/testify/assert" +) + +// MockSQSSendMessageBatchAPI is a mock implementation of SQSSendMessageBatchAPI for testing purposes. +type MockSQSSendMessageBatchAPI struct { + GetQueueUrlFunc func(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error) + SendMessageBatchFunc func(ctx context.Context, params *sqs.SendMessageBatchInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error) +} + +func (m *MockSQSSendMessageBatchAPI) GetQueueUrl(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error) { + return m.GetQueueUrlFunc(ctx, params, optFns...) +} + +func (m *MockSQSSendMessageBatchAPI) SendMessageBatch(ctx context.Context, params *sqs.SendMessageBatchInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error) { + return m.SendMessageBatchFunc(ctx, params, optFns...) +} + +func TestSQSPump_WriteData(t *testing.T) { + // Mock SQS client + mockSQS := &MockSQSSendMessageBatchAPI{ + // Implement the required functions for testing + GetQueueUrlFunc: func(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error) { + // Implement the mock behavior for GetQueueUrl + return &sqs.GetQueueUrlOutput{QueueUrl: aws.String("mockQueueUrl")}, nil + }, + SendMessageBatchFunc: func(ctx context.Context, params *sqs.SendMessageBatchInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error) { + // Implement the mock behavior for SendMessageBatch + return &sqs.SendMessageBatchOutput{}, nil + }, + } + + // Create an instance of SQSPump with the mock SQS client + sqsPump := &SQSPump{ + SQSClient: mockSQS, + SQSQueueURL: aws.String("mockQueueUrl"), + SQSConf: &SQSConf{ + QueueName: "test-queue", + AWSSQSBatchLimit: 10, + }, + log: log.WithField("prefix", SQSPrefix), // You might want to provide a mock logger for testing + CommonPumpConfig: CommonPumpConfig{}, // You might want to set CommonPumpConfig fields for testing + } + + // Create a context for testing + ctx := context.TODO() + + // Create mock data for testing + keys := make([]interface{}, 3) + keys[0] = analytics.AnalyticsRecord{APIID: "api111", OrgID: "123", TimeStamp: time.Now()} + keys[1] = analytics.AnalyticsRecord{APIID: "api123", OrgID: "1234", TimeStamp: time.Now()} + keys[2] = analytics.AnalyticsRecord{APIID: "api321", OrgID: "12345", TimeStamp: time.Now()} + + // Perform the WriteData operation + err := sqsPump.WriteData(ctx, keys) + + // Assert that no error occurred during WriteData + assert.NoError(t, err, "Unexpected error during WriteData") +} + +func TestSQSPump_Chunks(t *testing.T) { + var Calls int + // Mock SQS client + mockSQS := &MockSQSSendMessageBatchAPI{ + // Implement the required functions for testing + GetQueueUrlFunc: func(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error) { + // Implement the mock behavior for GetQueueUrl + return &sqs.GetQueueUrlOutput{QueueUrl: aws.String("mockQueueUrl")}, nil + }, + SendMessageBatchFunc: func(ctx context.Context, params *sqs.SendMessageBatchInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error) { + // Implement the mock behavior for SendMessageBatch + Calls++ + return &sqs.SendMessageBatchOutput{}, nil + }, + } + + // Create an instance of SQSPump with the mock SQS client + sqsPump := &SQSPump{ + SQSClient: mockSQS, + SQSQueueURL: aws.String("mockQueueUrl"), + SQSConf: &SQSConf{ + QueueName: "test-queue", + AWSSQSBatchLimit: 1, + }, + log: log.WithField("prefix", SQSPrefix), // You might want to provide a mock logger for testing + CommonPumpConfig: CommonPumpConfig{}, // You might want to set CommonPumpConfig fields for testing + } + + // Create a context for testing + ctx := context.TODO() + + // Create mock data for testing + keys := make([]interface{}, 3) + keys[0] = analytics.AnalyticsRecord{APIID: "api111", OrgID: "123", TimeStamp: time.Now()} + keys[1] = analytics.AnalyticsRecord{APIID: "api123", OrgID: "1234", TimeStamp: time.Now()} + keys[2] = analytics.AnalyticsRecord{APIID: "api321", OrgID: "12345", TimeStamp: time.Now()} + + // Perform the WriteData operation + err := sqsPump.WriteData(ctx, keys) + + // Assert that no error occurred during WriteData + assert.NoError(t, err, "Unexpected error during WriteData") + assert.Equal(t, len(keys), Calls) +}