Skip to content

Commit

Permalink
Monitor appropriate slowdown responses from DynamoDB store (#7567)
Browse files Browse the repository at this point in the history
* monitor appropriate slowdown responses from dynamodb

* add a dynamodb counter

* use aws sdk to validate throttling errors

* lint
  • Loading branch information
Jonathan-Rosenberg authored Mar 18, 2024
1 parent 54d2f77 commit 6bea5f8
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/kv/dynamodb/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ var (
Name: "dynamo_consumed_capacity_total",
Help: "The capacity units consumed by operation.",
}, []string{"operation"})
dynamoSlowdown = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "dynamo_slowdown_total",
Help: "The number of times this operation was slowed down due to throttling.",
}, []string{"operation"})
)
29 changes: 29 additions & 0 deletions pkg/kv/dynamodb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ func (s *Store) Get(ctx context.Context, partitionKey, key []byte) (*kv.ValueWit
})
const operation = "GetItem"
if err != nil {
if s.isSlowDownErr(err) {
s.logger.WithField("partition_key", partitionKey).WithContext(ctx).Error("get item: %w", kv.ErrSlowDown)
dynamoSlowdown.WithLabelValues(operation).Inc()
}
return nil, fmt.Errorf("get item: %w", err)
}
if result.ConsumedCapacity != nil {
Expand Down Expand Up @@ -322,6 +326,10 @@ func (s *Store) setWithOptionalPredicate(ctx context.Context, partitionKey, key,
if usePredicate && errors.As(err, &errConditionalCheckFailed) {
return kv.ErrPredicateFailed
}
if s.isSlowDownErr(err) {
s.logger.WithField("partition_key", partitionKey).WithContext(ctx).Error("put item: %w", kv.ErrSlowDown)
dynamoSlowdown.WithLabelValues(operation).Inc()
}
return fmt.Errorf("put item: %w", err)
}
if resp.ConsumedCapacity != nil {
Expand All @@ -345,6 +353,10 @@ func (s *Store) Delete(ctx context.Context, partitionKey, key []byte) error {
})
const operation = "DeleteItem"
if err != nil {
if s.isSlowDownErr(err) {
s.logger.WithField("partition_key", partitionKey).WithContext(ctx).Error("delete item: %w", kv.ErrSlowDown)
dynamoSlowdown.WithLabelValues(operation).Inc()
}
return fmt.Errorf("delete item: %w", err)
}
if resp.ConsumedCapacity != nil {
Expand Down Expand Up @@ -372,6 +384,10 @@ func (s *Store) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOp
}
it.runQuery()
if it.err != nil {
if s.isSlowDownErr(it.err) {
s.logger.WithField("partition_key", partitionKey).WithContext(ctx).Error("scan: %w", kv.ErrSlowDown)
dynamoSlowdown.WithLabelValues("Scan").Inc()
}
return nil, it.err
}
return it, nil
Expand All @@ -387,6 +403,10 @@ func (s *Store) DropTable() error {
_, err := s.svc.DeleteTable(ctx, &dynamodb.DeleteTableInput{
TableName: &s.params.TableName,
})
if s.isSlowDownErr(err) {
s.logger.WithField("table", s.params.TableName).WithContext(ctx).Error("drop table: %w", kv.ErrSlowDown)
dynamoSlowdown.WithLabelValues("DeleteTable").Inc()
}
return err
}

Expand Down Expand Up @@ -561,3 +581,12 @@ func (s *Store) StopPeriodicCheck() {
s.cancel = nil
}
}

func (s *Store) isSlowDownErr(err error) bool {
for _, te := range retry.DefaultThrottles {
if te.IsErrorThrottle(err).Bool() {
return true
}
}
return false
}

0 comments on commit 6bea5f8

Please sign in to comment.