From 6bea5f82840e0262948b91781d4df89eb9014ec7 Mon Sep 17 00:00:00 2001 From: Jonathan Rosenberg <96974219+Jonathan-Rosenberg@users.noreply.github.com> Date: Mon, 18 Mar 2024 20:56:20 +0200 Subject: [PATCH] Monitor appropriate slowdown responses from DynamoDB store (#7567) * monitor appropriate slowdown responses from dynamodb * add a dynamodb counter * use aws sdk to validate throttling errors * lint --- pkg/kv/dynamodb/stats.go | 4 ++++ pkg/kv/dynamodb/store.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/pkg/kv/dynamodb/stats.go b/pkg/kv/dynamodb/stats.go index ca5a420a9ce..5624c391feb 100644 --- a/pkg/kv/dynamodb/stats.go +++ b/pkg/kv/dynamodb/stats.go @@ -10,4 +10,8 @@ var ( Name: "dynamo_consumed_capacity_total", Help: "The capacity units consumed by operation.", }, []string{"operation"}) + dynamoSlowdown = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "dynamo_slowdown_total", + Help: "The number of times this operation was slowed down due to throttling.", + }, []string{"operation"}) ) diff --git a/pkg/kv/dynamodb/store.go b/pkg/kv/dynamodb/store.go index a50c83eb489..71fc4bb374c 100644 --- a/pkg/kv/dynamodb/store.go +++ b/pkg/kv/dynamodb/store.go @@ -238,6 +238,10 @@ func (s *Store) Get(ctx context.Context, partitionKey, key []byte) (*kv.ValueWit }) const operation = "GetItem" if err != nil { + if s.isSlowDownErr(err) { + s.logger.WithField("partition_key", partitionKey).WithContext(ctx).Error("get item: %w", kv.ErrSlowDown) + dynamoSlowdown.WithLabelValues(operation).Inc() + } return nil, fmt.Errorf("get item: %w", err) } if result.ConsumedCapacity != nil { @@ -322,6 +326,10 @@ func (s *Store) setWithOptionalPredicate(ctx context.Context, partitionKey, key, if usePredicate && errors.As(err, &errConditionalCheckFailed) { return kv.ErrPredicateFailed } + if s.isSlowDownErr(err) { + s.logger.WithField("partition_key", partitionKey).WithContext(ctx).Error("put item: %w", kv.ErrSlowDown) + dynamoSlowdown.WithLabelValues(operation).Inc() + } return fmt.Errorf("put item: %w", err) } if resp.ConsumedCapacity != nil { @@ -345,6 +353,10 @@ func (s *Store) Delete(ctx context.Context, partitionKey, key []byte) error { }) const operation = "DeleteItem" if err != nil { + if s.isSlowDownErr(err) { + s.logger.WithField("partition_key", partitionKey).WithContext(ctx).Error("delete item: %w", kv.ErrSlowDown) + dynamoSlowdown.WithLabelValues(operation).Inc() + } return fmt.Errorf("delete item: %w", err) } if resp.ConsumedCapacity != nil { @@ -372,6 +384,10 @@ func (s *Store) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOp } it.runQuery() if it.err != nil { + if s.isSlowDownErr(it.err) { + s.logger.WithField("partition_key", partitionKey).WithContext(ctx).Error("scan: %w", kv.ErrSlowDown) + dynamoSlowdown.WithLabelValues("Scan").Inc() + } return nil, it.err } return it, nil @@ -387,6 +403,10 @@ func (s *Store) DropTable() error { _, err := s.svc.DeleteTable(ctx, &dynamodb.DeleteTableInput{ TableName: &s.params.TableName, }) + if s.isSlowDownErr(err) { + s.logger.WithField("table", s.params.TableName).WithContext(ctx).Error("drop table: %w", kv.ErrSlowDown) + dynamoSlowdown.WithLabelValues("DeleteTable").Inc() + } return err } @@ -561,3 +581,12 @@ func (s *Store) StopPeriodicCheck() { s.cancel = nil } } + +func (s *Store) isSlowDownErr(err error) bool { + for _, te := range retry.DefaultThrottles { + if te.IsErrorThrottle(err).Bool() { + return true + } + } + return false +}