Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TT-10675 add SQS Pump Backend support #740

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ The table below provides details on the fields within each `tyk_analytics` recor
- [Kafka](#kafka-config)
- [Stdout](#stdout) (i.e. for use by Datadog logging agent in Kubernetes)
- [Timestream](#timestream-config)
- [AWS SQS](#SQS-config)

# Configuration:

Expand Down Expand Up @@ -1268,6 +1269,76 @@ TYK_PMP_PUMPS_CSV_TYPE=csv
TYK_PMP_PUMPS_CSV_META_CSVDIR=./
```

## SQS Config

#### Authentication & Prerequisite

We must authenticate ourselves by providing credentials to AWS. This pump uses the official AWS GO SDK, so instructions on how to authenticate can be found on [their documentation here](https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/#specifying-credentials).

#### Config Fields

`aws_queue_name` - Specifies the name of the AWS Simple Queue Service (SQS) queue where messages will be sent

`aws_message_group_id` - Specifies the name of the AWS Simple Queue Service (SQS) queue where messages will be sent

`aws_sqs_batch_limit` - Sets the maximum number of messages to include in a single batch when sending messages to the SQS queue

`aws_message_id_deduplication_enabled` - Enables or disables the deduplication of messages based on unique message IDs to prevent unintended duplicates in the queu

`aws_delay_seconds` - Configures the delay (in seconds) before messages sent to the SQS queue become available for processing.

When you initialize a Timestream Pump, the SDK uses its default credential chain to find AWS credentials. This default credential chain looks for credentials in the following order:

- Environment variables.
- Static Credentials (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN`)
- Web Identity Token (`AWS_WEB_IDENTITY_TOKEN_FILE`)
- Shared configuration files.
- SDK defaults to credentials file under `.aws` folder that is placed in the home folder on your computer.
- If your application uses an ECS task definition or RunTask API operation, IAM role for tasks.
- If your application is running on an Amazon EC2 instance, IAM role for Amazon EC2.

If no credentials are provided, SQS Pump won't be able to connect.

###### JSON / Conf File

```json
"sqs": {
"type": "sqs",
"meta": {
"log_field_name": "tyk-analytics-record",
"format": "json",
"aws_queue_name": "access-logs-queue.fifo",
"aws_region": "us-east-1",
"aws_key": "key",
"aws_secret": "secret",
"aws_endpoint": "http://aws-endpoint:4566",
"aws_message_group_id": "message_group_id",
"aws_sqs_batch_limit": 10,
"aws_message_id_deduplication_enabled": true,
"aws_delay_seconds": 0
}
},
```

###### Env Variables

```
# SQS Pump Configuration
TYK_PMP_PUMPS_SQS_TYPE=sqs
TYK_PMP_PUMPS_SQS_META_LOGFIELDNAME=tyk-analytics-record
TYK_PMP_PUMPS_SQS_META_FORMAT=json
TYK_PMP_PUMPS_SQS_META_AWSQUEUENAME=access-logs-queue.fifo
TYK_PMP_PUMPS_SQS_META_AWSREGION=us-east-1
TYK_PMP_PUMPS_SQS_META_AWSKEY=key
TYK_PMP_PUMPS_SQS_META_AWSSECRET=secret
TYK_PMP_PUMPS_SQS_META_AWSENDPOINT=http://aws-endpoint:4566
TYK_PMP_PUMPS_SQS_META_AWSMESSAGEGROUPID=message_group_id
TYK_PMP_PUMPS_SQS_META_AWSSQSBATCHLIMIT=10
TYK_PMP_PUMPS_SQS_META_AWSMESSAGEIDDEDUPLICATIONENABLED=true
TYK_PMP_PUMPS_SQS_META_AWSDELAYSECONDS=0

```

# Base Pump Configurations

The following configurations can be added to any Pump. Keep reading for an example.
Expand Down
12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ require (
github.com/TykTechnologies/gorpc v0.0.0-20210624160652-fe65bda0ccb9
github.com/TykTechnologies/murmur3 v0.0.0-20230310161213-aad17efd5632
github.com/TykTechnologies/storage v1.0.8
github.com/aws/aws-sdk-go-v2 v1.16.14
github.com/aws/aws-sdk-go-v2 v1.22.1
github.com/aws/aws-sdk-go-v2/config v1.9.0
github.com/aws/aws-sdk-go-v2/credentials v1.5.0
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0
github.com/cenkalti/backoff/v4 v4.0.2
github.com/fatih/structs v1.1.0
Expand All @@ -24,6 +26,7 @@ require (
github.com/logzio/logzio-go v0.0.0-20200316143903-ac8fc0e2910e
github.com/mitchellh/mapstructure v1.3.1
github.com/moesif/moesifapi-go v1.0.6
github.com/oklog/ulid/v2 v2.1.0
github.com/olivere/elastic/v7 v7.0.28
github.com/oschwald/maxminddb-golang v1.11.0
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -54,16 +57,15 @@ require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.5.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.8.0 // indirect
github.com/aws/smithy-go v1.13.2 // indirect
github.com/aws/smithy-go v1.16.0 // indirect
github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
Expand Down
20 changes: 13 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,28 @@ github.com/aws/aws-sdk-go v1.29.11/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTg
github.com/aws/aws-sdk-go v1.40.32/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q=
github.com/aws/aws-sdk-go-v2 v1.10.0/go.mod h1:U/EyyVvKtzmFeQQcca7eBotKdlpcP2zzU6bXBYcf7CE=
github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ=
github.com/aws/aws-sdk-go-v2 v1.16.14 h1:db6GvO4Z2UqHt5gvT0lr6J5x5P+oQ7bdRzczVaRekMU=
github.com/aws/aws-sdk-go-v2 v1.16.14/go.mod h1:s/G+UV29dECbF5rf+RNj1xhlmvoNurGSr+McVSRj59w=
github.com/aws/aws-sdk-go-v2 v1.22.1 h1:sjnni/AuoTXxHitsIdT0FwmqUuNUuHtufcVDErVFT9U=
github.com/aws/aws-sdk-go-v2 v1.22.1/go.mod h1:Kd0OJtkW3Q0M0lUWGszapWjEvrXDzRW+D21JNsroB+c=
github.com/aws/aws-sdk-go-v2/config v1.9.0 h1:SkREVSwi+J8MSdjhJ96jijZm5ZDNleI0E4hHCNivh7s=
github.com/aws/aws-sdk-go-v2/config v1.9.0/go.mod h1:qhK5NNSgo9/nOSMu3HyE60WHXZTWTHTgd5qtIF44vOQ=
github.com/aws/aws-sdk-go-v2/credentials v1.5.0 h1:r6470olsn2qyOe2aLzK6q+wfO3dzNcMujRT3gqBgBB8=
github.com/aws/aws-sdk-go-v2/credentials v1.5.0/go.mod h1:kvqTkpzQmzri9PbsiTY+LvwFzM0gY19emlAWwBOJMb0=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0 h1:FKaqk7geL3oIqSwGJt5SWUKj8uJ+qLZNqlBuqq6sFyA=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.7.0/go.mod h1:KqEkRkxm/+1Pd/rENRNbQpfblDBYeg5HDSqjB6ks8hA=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 h1:XJLnluKuUxQG255zPNe+04izXl7GSyUVafIsgfv9aw4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 h1:EauRoYZVNPlidZSZJDscjJBQ22JhVF2+tdteatax2Ak=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 h1:fi1ga6WysOyYb5PAf3Exd6B5GiSNpnZim4h1rhlBqx0=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1/go.mod h1:V5CY8wNurvPUibTi9mwqUqpiFZ5LnioKWIFUDtIzdI8=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 h1:ZpaV/j48RlPc4AmOZuPv22pJliXjXq8/reL63YzyFnw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1/go.mod h1:R8aXraabD2e3qv1csxM14/X9WF4wFMIY0kH4YEtYD5M=
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5 h1:zPxLGWALExNepElO0gYgoqsbqTlt4ZCrhZ7XlfJ+Qlw=
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.5/go.mod h1:6ZBTuDmvpCOD4Sf1i2/I3PgftlEcDGgvi8ocq64oQEg=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+IpkVIuDvIkm9Q0DEjtWHnh6ITDoZo8fH2dIjlqQ=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3/go.mod h1:zOyLMYyg60yyZpOCniAUuibWVqTU4TuLmMa/Wh4P+HA=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0 h1:/T5wKsw/po118HEDvnSE8YU7TESxvZbYM2rnn+Oi7Kk=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.4.0/go.mod h1:X5/JuOxPLU/ogICgDTtnpfaQzdQJO0yKDcpoxWLLJ8Y=
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0 h1:21QmEZkOnaJ4SPRFhhN+8MV5ewb0j1lxTg+RPp0mUeE=
github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0/go.mod h1:E02a07/HTyJEHFpp+WMRh33xuNVdsd8WCbLlODeT4lU=
github.com/aws/aws-sdk-go-v2/service/sso v1.5.0 h1:VnrCAJTp1bDxU79UuW/D4z7bwZ7xOc7JjDKpqXL/m04=
github.com/aws/aws-sdk-go-v2/service/sso v1.5.0/go.mod h1:GsqaJOJeOfeYD88/2vHWKXegvDRofDqWwC5i48A2kgs=
github.com/aws/aws-sdk-go-v2/service/sts v1.8.0 h1:7N7RsEVvUcvEg7jrWKU5AnSi4/6b6eY9+wG1g6W4ExE=
Expand All @@ -84,8 +88,8 @@ github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0 h1:/4djuASUYOns1ZhCO
github.com/aws/aws-sdk-go-v2/service/timestreamwrite v1.9.0/go.mod h1:VN4yDJwgYOO6AzHPE8+QeBwK6wUMOFkSCogZFWifdVc=
github.com/aws/smithy-go v1.8.1/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.13.2 h1:TBLKyeJfXTrTXRHmsv4qWt9IQGYyWThLYaJWSahTOGE=
github.com/aws/smithy-go v1.13.2/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/aws/smithy-go v1.16.0 h1:gJZEH/Fqh+RsvlJ1Zt4tVAtV6bKkp3cC+R6FCZMNzik=
github.com/aws/smithy-go v1.16.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE=
github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280 h1:ZgW7EEoTQvz27wleAVF3XVBqc6eBFqB4BNw4Awg4BN8=
github.com/beeker1121/goque v0.0.0-20170321141813-4044bc29b280/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/benbjohnson/tmpl v1.1.0/go.mod h1:N7W0NUGWuG26caFrID5sE4tvyLaKVp1fbV3Vr+MCul8=
Expand Down Expand Up @@ -216,7 +220,6 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
Expand Down Expand Up @@ -396,6 +399,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/olivere/elastic v6.2.31+incompatible h1:zwJIIsgfiDBuDS3sb6MCbm/e03BPEJoGZvqevZXM254=
github.com/olivere/elastic v6.2.31+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8=
github.com/olivere/elastic/v7 v7.0.12/go.mod h1:14rWX28Pnh3qCKYRVnSGXWLf9MbLonYS/4FDCY3LAPo=
Expand All @@ -420,6 +425,7 @@ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYr
github.com/oschwald/maxminddb-golang v1.11.0 h1:aSXMqYR/EPNjGE8epgqwDay+P30hCBZIveY0WZbAWh0=
github.com/oschwald/maxminddb-golang v1.11.0/go.mod h1:YmVI+H0zh3ySFR3w+oz8PCfglAFj3PuCmui13+P9zDg=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
Expand Down
1 change: 1 addition & 0 deletions pumps/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ func init() {
AvailablePumps["sql-graph"] = &GraphSQLPump{}
AvailablePumps["sql-graph-aggregate"] = &GraphSQLAggregatePump{}
AvailablePumps["resurfaceio"] = &ResurfacePump{}
AvailablePumps["sqs"] = &SQSPump{}
}
195 changes: 195 additions & 0 deletions pumps/sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package pumps

import (
"context"
"encoding/json"
"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/mitchellh/mapstructure"
"github.com/oklog/ulid/v2"
"github.com/sirupsen/logrus"
"time"
)

type SQSSendMessageBatchAPI interface {
GetQueueUrl(ctx context.Context,
params *sqs.GetQueueUrlInput,
optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)

SendMessageBatch(ctx context.Context,
params *sqs.SendMessageBatchInput,
optFns ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error)
}

type SQSPump struct {
SQSClient SQSSendMessageBatchAPI
SQSQueueURL *string
SQSConf *SQSConf
log *logrus.Entry
CommonPumpConfig
}

var SQSPrefix = "sqs-pump"
var SQSDefaultENV = PUMPS_ENV_PREFIX + "_SQS" + PUMPS_ENV_META_PREFIX

// SQSConf represents the configuration structure for the Tyk Pump SQS (Simple Queue Service) pump.
type SQSConf struct {
// EnvPrefix defines the prefix for environment variables related to this SQS configuration.
EnvPrefix string `mapstructure:"meta_env_prefix"`

// QueueName specifies the name of the AWS Simple Queue Service (SQS) queue for message delivery.
QueueName string `mapstructure:"aws_queue_name"`

// AWSRegion sets the AWS region where the SQS queue is located.
AWSRegion string `mapstructure:"aws_region"`

// AWSSecret is the AWS secret key used for authentication.
AWSSecret string `mapstructure:"aws_secret"`

// AWSKey is the AWS access key ID used for authentication.
AWSKey string `mapstructure:"aws_key"`

// AWSEndpoint is the custom endpoint URL for AWS SQS, if applicable.
AWSEndpoint string `mapstructure:"aws_endpoint"`

// AWSDelaySeconds configures the delay (in seconds) before messages become available for processing.
AWSDelaySeconds int32 `mapstructure:"aws_delay_seconds"`

// AWSMessageGroupID specifies the message group ID for ordered processing within the SQS queue.
AWSMessageGroupID string `mapstructure:"aws_message_group_id"`

// AWSSQSBatchLimit sets the maximum number of messages in a single batch when sending to the SQS queue.
AWSSQSBatchLimit int `mapstructure:"aws_sqs_batch_limit"`

// AWSMessageIDDeduplicationEnabled enables/disables message deduplication based on unique IDs.
AWSMessageIDDeduplicationEnabled bool `mapstructure:"aws_message_id_deduplication_enabled"`
}

func (s *SQSPump) New() Pump {
newPump := SQSPump{}
return &newPump
}

func (s *SQSPump) GetName() string {
return "SQS Pump"
}

func (s *SQSPump) GetEnvPrefix() string {
return s.SQSConf.EnvPrefix
}

func (s *SQSPump) Init(config interface{}) error {
s.SQSConf = &SQSConf{}
s.log = log.WithField("prefix", SQSPrefix)

err := mapstructure.Decode(config, &s.SQSConf)
if err != nil {
s.log.Fatal("Failed to decode configuration: ", err)
return err
}

processPumpEnvVars(s, s.log, s.SQSConf, SQSDefaultENV)

s.SQSClient, err = s.NewSQSPublisher()
if err != nil {
s.log.Fatal("Failed to create sqs client: ", err)
return err
}
// Get URL of queue
gQInput := &sqs.GetQueueUrlInput{
QueueName: aws.String(s.SQSConf.QueueName),
}

result, err := s.SQSClient.GetQueueUrl(context.TODO(), gQInput)
if err != nil {
return err
}
s.SQSQueueURL = result.QueueUrl

s.log.Info(s.GetName() + " Initialized")

return nil
}

func (s *SQSPump) WriteData(ctx context.Context, data []interface{}) error {
s.log.Info("Attempting to write ", len(data), " records...")
startTime := time.Now()

messages := make([]types.SendMessageBatchRequestEntry, len(data))
for i, v := range data {
decoded := v.(analytics.AnalyticsRecord)
decodedMessageByteArray, _ := json.Marshal(decoded)
messages[i] = types.SendMessageBatchRequestEntry{
MessageBody: aws.String(string(decodedMessageByteArray)),
Id: aws.String(ulid.Make().String()),
}
if s.SQSConf.AWSMessageGroupID != "" {
messages[i].MessageGroupId = aws.String(s.SQSConf.AWSMessageGroupID)
}
if s.SQSConf.AWSDelaySeconds != 0 {
messages[i].DelaySeconds = s.SQSConf.AWSDelaySeconds
}

// for FIFO SQS
if s.SQSConf.AWSMessageGroupID != "" {
messages[i].MessageGroupId = aws.String(s.SQSConf.AWSMessageGroupID)
}
if s.SQSConf.AWSMessageIDDeduplicationEnabled {
messages[i].MessageDeduplicationId = messages[i].Id
}
}
SQSError := s.write(ctx, messages)
if SQSError != nil {
s.log.WithError(SQSError).Error("unable to write message")

return SQSError
}
s.log.Debug("ElapsedTime in seconds for ", len(data), " records:", time.Now().Sub(startTime))
s.log.Info("Purged ", len(data), " records...")
return nil
}

func (s *SQSPump) write(c context.Context, messages []types.SendMessageBatchRequestEntry) error {
log.Debug(messages)
for i := 0; i < len(messages); i += s.SQSConf.AWSSQSBatchLimit {
end := i + s.SQSConf.AWSSQSBatchLimit

if end > len(messages) {
end = len(messages)
}
sMInput := &sqs.SendMessageBatchInput{
Entries: messages[i:end],
QueueUrl: s.SQSQueueURL,
}

if _, err := s.SQSClient.SendMessageBatch(c, sMInput); err != nil {
return err
}
}

return nil
}

func (s *SQSPump) NewSQSPublisher() (c *sqs.Client, err error) {
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion(s.SQSConf.AWSRegion),
)
if err != nil {
return nil, err
}

client := sqs.NewFromConfig(cfg, func(options *sqs.Options) {
if s.SQSConf.AWSEndpoint != "" {
options.BaseEndpoint = aws.String(s.SQSConf.AWSEndpoint)
}
if s.SQSConf.AWSKey != "" && s.SQSConf.AWSSecret != "" {
options.Credentials = credentials.NewStaticCredentialsProvider(s.SQSConf.AWSKey, s.SQSConf.AWSSecret, "")
}
})

return client, nil
}
Loading
Loading