Skip to content

Commit

Permalink
add deduplication for FIFO Queues
Browse files Browse the repository at this point in the history
  • Loading branch information
Masoud Haghbin committed Dec 13, 2023
1 parent e667c5b commit 88cce13
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ require (
github.com/influxdata/influxdb v1.8.10
github.com/influxdata/influxdb-client-go/v2 v2.6.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/kr/pretty v0.3.1
github.com/logzio/logzio-go v0.0.0-20200316143903-ac8fc0e2910e
github.com/mitchellh/mapstructure v1.3.1
github.com/moesif/moesifapi-go v1.0.6
github.com/oklog/ulid/v2 v2.1.0
github.com/olivere/elastic/v7 v7.0.28
github.com/oschwald/maxminddb-golang v1.11.0
github.com/pkg/errors v0.9.1
Expand Down
7 changes: 6 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
Expand Down Expand Up @@ -342,6 +341,7 @@ github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFB
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
Expand Down Expand Up @@ -400,6 +400,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/olivere/elastic v6.2.31+incompatible h1:zwJIIsgfiDBuDS3sb6MCbm/e03BPEJoGZvqevZXM254=
github.com/olivere/elastic v6.2.31+incompatible/go.mod h1:J+q1zQJTgAz9woqsbVRqGeB5G1iqDKVBWLNSYW8yfJ8=
github.com/olivere/elastic/v7 v7.0.12/go.mod h1:14rWX28Pnh3qCKYRVnSGXWLf9MbLonYS/4FDCY3LAPo=
Expand All @@ -424,11 +426,13 @@ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYr
github.com/oschwald/maxminddb-golang v1.11.0 h1:aSXMqYR/EPNjGE8epgqwDay+P30hCBZIveY0WZbAWh0=
github.com/oschwald/maxminddb-golang v1.11.0/go.mod h1:YmVI+H0zh3ySFR3w+oz8PCfglAFj3PuCmui13+P9zDg=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -461,6 +465,7 @@ github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1
github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827 h1:D2Xs0bSuqpKnUOOlK4yu6lloeOs4+oD+pjbOfsxgWu0=
github.com/robertkowalski/graylog-golang v0.0.0-20151121031040-e5295cfa2827/go.mod h1:jONcYFk83vUF1lv0aERAwaFtDM9wUW4BMGmlnpLJyZY=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
Expand Down
32 changes: 22 additions & 10 deletions pumps/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/kr/pretty"
"github.com/mitchellh/mapstructure"
"github.com/oklog/ulid/v2"
"github.com/sirupsen/logrus"
"time"
)
Expand Down Expand Up @@ -37,15 +39,16 @@ var SQSDefaultENV = PUMPS_ENV_PREFIX + "_SQS" + PUMPS_ENV_META_PREFIX

// @PumpConf SQS
type SQSConf struct {
EnvPrefix string `mapstructure:"meta_env_prefix"`
QueueName string `mapstructure:"aws_queue_name"`
AWSRegion string `mapstructure:"aws_region"`
AWSSecret string `mapstructure:"aws_secret"`
AWSKey string `mapstructure:"aws_key"`
AWSEndpoint string `mapstructure:"aws_endpoint"`
AWSDelaySeconds int32 `mapstructure:"aws_delay_seconds"`
AWSMessageGroupID string `mapstructure:"aws_message_group_id"`
AWSSQSBatchLimit int `mapstructure:"aws_sqs_batch_limit"`
EnvPrefix string `mapstructure:"meta_env_prefix"`
QueueName string `mapstructure:"aws_queue_name"`
AWSRegion string `mapstructure:"aws_region"`
AWSSecret string `mapstructure:"aws_secret"`
AWSKey string `mapstructure:"aws_key"`
AWSEndpoint string `mapstructure:"aws_endpoint"`
AWSDelaySeconds int32 `mapstructure:"aws_delay_seconds"`
AWSMessageGroupID string `mapstructure:"aws_message_group_id"`
AWSSQSBatchLimit int `mapstructure:"aws_sqs_batch_limit"`
AWSMessageIDDeduplicationEnabled bool `mapstructure:"aws_message_id_deduplication_enabled"`
}

func (s *SQSPump) New() Pump {
Expand Down Expand Up @@ -104,14 +107,22 @@ func (s *SQSPump) WriteData(ctx context.Context, data []interface{}) error {
decodedMessageByteArray, _ := json.Marshal(decoded)
messages[i] = types.SendMessageBatchRequestEntry{
MessageBody: aws.String(string(decodedMessageByteArray)),
Id: aws.String(decoded.GetObjectID().String()),
Id: aws.String(ulid.Make().String()),
}
if s.SQSConf.AWSMessageGroupID != "" {
messages[i].MessageGroupId = aws.String(s.SQSConf.AWSMessageGroupID)
}
if s.SQSConf.AWSDelaySeconds != 0 {
messages[i].DelaySeconds = s.SQSConf.AWSDelaySeconds
}

// for FIFO SQS
if s.SQSConf.AWSMessageGroupID != "" {
messages[i].MessageGroupId = aws.String(s.SQSConf.AWSMessageGroupID)
}
if s.SQSConf.AWSMessageIDDeduplicationEnabled {
messages[i].MessageDeduplicationId = messages[i].Id
}
}
SQSError := s.write(ctx, messages)
if SQSError != nil {
Expand All @@ -125,6 +136,7 @@ func (s *SQSPump) WriteData(ctx context.Context, data []interface{}) error {
}

func (s *SQSPump) write(c context.Context, messages []types.SendMessageBatchRequestEntry) error {
pretty.Print(messages)
for i := 0; i < len(messages); i += s.SQSConf.AWSSQSBatchLimit {
end := i + s.SQSConf.AWSSQSBatchLimit

Expand Down

0 comments on commit 88cce13

Please sign in to comment.