Skip to content

Commit

Permalink
[v14] Add support for custom SQS consumer lock and disabling consumer (
Browse files Browse the repository at this point in the history
…#47612)

* Add support for custom SQS consumer lock and disabling consumer

* Update lib/events/athena/athena.go

Co-authored-by: Edoardo Spadolini <[email protected]>

* Use LockNameComponents for constructing a lock name

* Add a test for disabled consumer

* Fix URI in disabled consumer test

* Address feedback about ConsumerLockName being a single string

* Update lib/events/athena/athena.go

Co-authored-by: rosstimothy <[email protected]>

* Make linter happy

---------

Co-authored-by: Edoardo Spadolini <[email protected]>
Co-authored-by: rosstimothy <[email protected]>
  • Loading branch information
3 people authored Oct 16, 2024
1 parent 9179d2e commit 9de80bd
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 13 deletions.
45 changes: 34 additions & 11 deletions lib/events/athena/athena.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ type Config struct {
BatchMaxItems int
// BatchMaxInterval defined interval at which parquet files will be created (optional).
BatchMaxInterval time.Duration
// ConsumerLockName defines a name of a SQS consumer lock (optional).
// If provided, it will be prefixed with "athena/" to avoid accidental
// collision with existing locks.
ConsumerLockName string
// ConsumerDisabled defines if SQS consumer should be disabled (optional).
ConsumerDisabled bool

// Clock is a clock interface, used in tests.
Clock clockwork.Clock
Expand Down Expand Up @@ -414,6 +420,16 @@ func (cfg *Config) SetFromURL(url *url.URL) error {
}
cfg.BatchMaxInterval = dur
}
if consumerLockName := url.Query().Get("consumerLockName"); consumerLockName != "" {
cfg.ConsumerLockName = consumerLockName
}
if val := url.Query().Get("consumerDisabled"); val != "" {
boolVal, err := strconv.ParseBool(val)
if err != nil {
return trace.BadParameter("invalid consumerDisabled value: %v", err)
}
cfg.ConsumerDisabled = boolVal
}

return nil
}
Expand Down Expand Up @@ -481,20 +497,23 @@ func New(ctx context.Context, cfg Config) (*Log, error) {
return nil, trace.Wrap(err)
}

consumerCtx, consumerCancel := context.WithCancel(ctx)

consumer, err := newConsumer(cfg, consumerCancel)
if err != nil {
return nil, trace.Wrap(err)
}

l := &Log{
publisher: newPublisherFromAthenaConfig(cfg),
querier: querier,
consumerCloser: consumer,
publisher: newPublisherFromAthenaConfig(cfg),
querier: querier,
}

go consumer.run(consumerCtx)
if !cfg.ConsumerDisabled {
consumerCtx, consumerCancel := context.WithCancel(ctx)

consumer, err := newConsumer(cfg, consumerCancel)
if err != nil {
return nil, trace.Wrap(err)
}

l.consumerCloser = consumer

go consumer.run(consumerCtx)
}

return l, nil
}
Expand Down Expand Up @@ -524,6 +543,10 @@ func (l *Log) Close() error {
return trace.Wrap(l.consumerCloser.Close())
}

func (l *Log) IsConsumerDisabled() bool {
return l.consumerCloser == nil
}

var isAlphanumericOrUnderscoreRe = regexp.MustCompile("^[a-zA-Z0-9_]+$")

func isAlphanumericOrUnderscore(s string) bool {
Expand Down
6 changes: 5 additions & 1 deletion lib/events/athena/athena_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,15 @@ func TestConfig_SetFromURL(t *testing.T) {
},
{
name: "params to batcher",
url: "athena://db.tbl/?queueURL=https://queueURL&batchMaxItems=1000&batchMaxInterval=10s",
url: "athena://db.tbl/?queueURL=https://queueURL&batchMaxItems=1000&batchMaxInterval=10s&consumerLockName=mylock&consumerDisabled=true",
want: Config{
TableName: "tbl",
Database: "db",
QueueURL: "https://queueURL",
BatchMaxItems: 1000,
BatchMaxInterval: 10 * time.Second,
ConsumerLockName: "mylock",
ConsumerDisabled: true,
},
},
{
Expand Down Expand Up @@ -183,6 +185,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
GetQueryResultsInterval: 100 * time.Millisecond,
BatchMaxItems: 20000,
BatchMaxInterval: 1 * time.Minute,
ConsumerLockName: "",
PublisherConsumerAWSConfig: dummyAWSCfg,
StorerQuerierAWSConfig: dummyAWSCfg,
Backend: mockBackend{},
Expand All @@ -208,6 +211,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) {
GetQueryResultsInterval: 100 * time.Millisecond,
BatchMaxItems: 20000,
BatchMaxInterval: 1 * time.Minute,
ConsumerLockName: "",
PublisherConsumerAWSConfig: dummyAWSCfg,
StorerQuerierAWSConfig: dummyAWSCfg,
Backend: mockBackend{},
Expand Down
8 changes: 7 additions & 1 deletion lib/events/athena/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type consumer struct {
storeLocationBucket string
batchMaxItems int
batchMaxInterval time.Duration
consumerLockName string

// perDateFileParquetWriter returns file writer per date.
// Added in config to allow testing.
Expand Down Expand Up @@ -153,6 +154,7 @@ func newConsumer(cfg Config, cancelFn context.CancelFunc) (*consumer, error) {
storeLocationBucket: cfg.locationS3Bucket,
batchMaxItems: cfg.BatchMaxItems,
batchMaxInterval: cfg.BatchMaxInterval,
consumerLockName: cfg.ConsumerLockName,
collectConfig: collectCfg,
sqsDeleter: sqsClient,
queueURL: cfg.QueueURL,
Expand Down Expand Up @@ -251,10 +253,14 @@ func (c *consumer) runContinuouslyOnSingleAuth(ctx context.Context, eventsProces
case <-ctx.Done():
return
default:
lockName := []string{"athena", c.consumerLockName}
if c.consumerLockName == "" {
lockName = []string{"athena_lock"}
}
err := backend.RunWhileLocked(ctx, backend.RunWhileLockedConfig{
LockConfiguration: backend.LockConfiguration{
Backend: c.backend,
LockNameComponents: []string{"athena_lock"},
LockNameComponents: lockName,
// TTL is higher then batchMaxInterval because we want to optimize
// for low backend writes.
TTL: 5 * c.batchMaxInterval,
Expand Down
10 changes: 10 additions & 0 deletions lib/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,16 @@ func TestAthenaAuditLogSetup(t *testing.T) {
require.True(t, ok, "invalid logger type, got %T", v)
},
},
{
name: "valid athena config with disabled consumer",
uris: []string{sampleAthenaURI + "&consumerDisabled=true"},
externalAudit: externalAuditStorageDisabled,
wantFn: func(t *testing.T, alog events.AuditLogger) {
v, ok := alog.(*athena.Log)
require.True(t, ok, "invalid logger type, got %T", v)
require.True(t, v.IsConsumerDisabled(), "consumer is not disabled")
},
},
{
name: "config with rate limit - should use events.SearchEventsLimiter",
uris: []string{sampleAthenaURI + "&limiterRefillAmount=3&limiterBurst=2"},
Expand Down

0 comments on commit 9de80bd

Please sign in to comment.