diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index 493cc09d9063e..5b6a30f6bab72 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -122,6 +122,12 @@ type Config struct { BatchMaxItems int // BatchMaxInterval defined interval at which parquet files will be created (optional). BatchMaxInterval time.Duration + // ConsumerLockName defines a name of a SQS consumer lock (optional). + // If provided, it will be prefixed with "athena/" to avoid accidental + // collision with existing locks. + ConsumerLockName string + // ConsumerDisabled defines if SQS consumer should be disabled (optional). + ConsumerDisabled bool // Clock is a clock interface, used in tests. Clock clockwork.Clock @@ -414,6 +420,16 @@ func (cfg *Config) SetFromURL(url *url.URL) error { } cfg.BatchMaxInterval = dur } + if consumerLockName := url.Query().Get("consumerLockName"); consumerLockName != "" { + cfg.ConsumerLockName = consumerLockName + } + if val := url.Query().Get("consumerDisabled"); val != "" { + boolVal, err := strconv.ParseBool(val) + if err != nil { + return trace.BadParameter("invalid consumerDisabled value: %v", err) + } + cfg.ConsumerDisabled = boolVal + } return nil } @@ -481,20 +497,23 @@ func New(ctx context.Context, cfg Config) (*Log, error) { return nil, trace.Wrap(err) } - consumerCtx, consumerCancel := context.WithCancel(ctx) - - consumer, err := newConsumer(cfg, consumerCancel) - if err != nil { - return nil, trace.Wrap(err) - } - l := &Log{ - publisher: newPublisherFromAthenaConfig(cfg), - querier: querier, - consumerCloser: consumer, + publisher: newPublisherFromAthenaConfig(cfg), + querier: querier, } - go consumer.run(consumerCtx) + if !cfg.ConsumerDisabled { + consumerCtx, consumerCancel := context.WithCancel(ctx) + + consumer, err := newConsumer(cfg, consumerCancel) + if err != nil { + return nil, trace.Wrap(err) + } + + l.consumerCloser = consumer + + go consumer.run(consumerCtx) + } return l, nil } @@ -524,6 +543,10 @@ func (l *Log) Close() error { return trace.Wrap(l.consumerCloser.Close()) } +func (l *Log) IsConsumerDisabled() bool { + return l.consumerCloser == nil +} + var isAlphanumericOrUnderscoreRe = regexp.MustCompile("^[a-zA-Z0-9_]+$") func isAlphanumericOrUnderscore(s string) bool { diff --git a/lib/events/athena/athena_test.go b/lib/events/athena/athena_test.go index fb908c5aeb3de..baa4430db6a0d 100644 --- a/lib/events/athena/athena_test.go +++ b/lib/events/athena/athena_test.go @@ -89,13 +89,15 @@ func TestConfig_SetFromURL(t *testing.T) { }, { name: "params to batcher", - url: "athena://db.tbl/?queueURL=https://queueURL&batchMaxItems=1000&batchMaxInterval=10s", + url: "athena://db.tbl/?queueURL=https://queueURL&batchMaxItems=1000&batchMaxInterval=10s&consumerLockName=mylock&consumerDisabled=true", want: Config{ TableName: "tbl", Database: "db", QueueURL: "https://queueURL", BatchMaxItems: 1000, BatchMaxInterval: 10 * time.Second, + ConsumerLockName: "mylock", + ConsumerDisabled: true, }, }, { @@ -183,6 +185,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { GetQueryResultsInterval: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, + ConsumerLockName: "", PublisherConsumerAWSConfig: dummyAWSCfg, StorerQuerierAWSConfig: dummyAWSCfg, Backend: mockBackend{}, @@ -208,6 +211,7 @@ func TestConfig_CheckAndSetDefaults(t *testing.T) { GetQueryResultsInterval: 100 * time.Millisecond, BatchMaxItems: 20000, BatchMaxInterval: 1 * time.Minute, + ConsumerLockName: "", PublisherConsumerAWSConfig: dummyAWSCfg, StorerQuerierAWSConfig: dummyAWSCfg, Backend: mockBackend{}, diff --git a/lib/events/athena/consumer.go b/lib/events/athena/consumer.go index df1a27f909a73..2b0417ec07a44 100644 --- a/lib/events/athena/consumer.go +++ b/lib/events/athena/consumer.go @@ -75,6 +75,7 @@ type consumer struct { storeLocationBucket string batchMaxItems int batchMaxInterval time.Duration + consumerLockName string // perDateFileParquetWriter returns file writer per date. // Added in config to allow testing. @@ -153,6 +154,7 @@ func newConsumer(cfg Config, cancelFn context.CancelFunc) (*consumer, error) { storeLocationBucket: cfg.locationS3Bucket, batchMaxItems: cfg.BatchMaxItems, batchMaxInterval: cfg.BatchMaxInterval, + consumerLockName: cfg.ConsumerLockName, collectConfig: collectCfg, sqsDeleter: sqsClient, queueURL: cfg.QueueURL, @@ -251,10 +253,14 @@ func (c *consumer) runContinuouslyOnSingleAuth(ctx context.Context, eventsProces case <-ctx.Done(): return default: + lockName := []string{"athena", c.consumerLockName} + if c.consumerLockName == "" { + lockName = []string{"athena_lock"} + } err := backend.RunWhileLocked(ctx, backend.RunWhileLockedConfig{ LockConfiguration: backend.LockConfiguration{ Backend: c.backend, - LockNameComponents: []string{"athena_lock"}, + LockNameComponents: lockName, // TTL is higher then batchMaxInterval because we want to optimize // for low backend writes. TTL: 5 * c.batchMaxInterval, diff --git a/lib/service/service_test.go b/lib/service/service_test.go index 9162d5b46c686..a735e325d1e9a 100644 --- a/lib/service/service_test.go +++ b/lib/service/service_test.go @@ -530,6 +530,16 @@ func TestAthenaAuditLogSetup(t *testing.T) { require.True(t, ok, "invalid logger type, got %T", v) }, }, + { + name: "valid athena config with disabled consumer", + uris: []string{sampleAthenaURI + "&consumerDisabled=true"}, + externalAudit: externalAuditStorageDisabled, + wantFn: func(t *testing.T, alog events.AuditLogger) { + v, ok := alog.(*athena.Log) + require.True(t, ok, "invalid logger type, got %T", v) + require.True(t, v.IsConsumerDisabled(), "consumer is not disabled") + }, + }, { name: "config with rate limit - should use events.SearchEventsLimiter", uris: []string{sampleAthenaURI + "&limiterRefillAmount=3&limiterBurst=2"},