From 822eb6dfc19c54c8266d2247d245faea493fed91 Mon Sep 17 00:00:00 2001 From: Barak Amar Date: Sun, 8 Oct 2023 16:03:50 +0800 Subject: [PATCH] Revert "revert kv dynamodb client to use sdk v1 (#6706)" (#6714) This reverts commit b1c301f57f7f6aa8494e4443e50bef13eb498c45. --- go.mod | 2 +- go.sum | 2 - pkg/kv/dynamodb/store.go | 181 +++++++++++++++++++-------------------- 3 files changed, 88 insertions(+), 97 deletions(-) diff --git a/go.mod b/go.mod index 0daed8f1697..82e7543aed1 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,7 @@ require ( github.com/Shopify/go-lua v0.0.0-20221004153744-91867de107cf github.com/alitto/pond v1.8.2 github.com/antonmedv/expr v1.12.5 - github.com/aws/aws-sdk-go v1.44.56 + github.com/aws/aws-sdk-go v1.43.31 github.com/aws/aws-sdk-go-v2 v1.21.0 github.com/aws/aws-sdk-go-v2/config v1.18.36 github.com/aws/aws-sdk-go-v2/credentials v1.13.35 diff --git a/go.sum b/go.sum index 00d5291897a..5a70f3b7ad4 100644 --- a/go.sum +++ b/go.sum @@ -183,8 +183,6 @@ github.com/aws/aws-sdk-go v1.30.19/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZve github.com/aws/aws-sdk-go v1.37.0/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go v1.43.31 h1:yJZIr8nMV1hXjAvvOLUFqZRJcHV7udPQBfhJqawDzI0= github.com/aws/aws-sdk-go v1.43.31/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= -github.com/aws/aws-sdk-go v1.44.56 h1:bT+lExwagH7djxb6InKUVkEKGPAj5aAPnV85/m1fKro= -github.com/aws/aws-sdk-go v1.44.56/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/aws/aws-sdk-go-v2 v1.16.2/go.mod h1:ytwTPBG6fXTZLxxeeCCWj2/EMYp/xDUgX+OET6TLNNU= github.com/aws/aws-sdk-go-v2 v1.20.3/go.mod h1:/RfNgGmRxI+iFOB1OeJUyxiU+9s88k3pfHvDagGEp0M= github.com/aws/aws-sdk-go-v2 v1.21.0 h1:gMT0IW+03wtYJhRqTVYn0wLzwdnK9sRMcxmtfGzRdJc= diff --git a/pkg/kv/dynamodb/store.go b/pkg/kv/dynamodb/store.go index 527d92a3f46..4f1e9cb6840 100644 --- a/pkg/kv/dynamodb/store.go +++ b/pkg/kv/dynamodb/store.go @@ -10,13 +10,14 @@ import ( "sync" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/dynamodb" - "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" - "github.com/aws/aws-sdk-go/service/dynamodb/expression" + "github.com/aws/aws-sdk-go-v2/aws" + awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/cenkalti/backoff/v4" "github.com/treeverse/lakefs/pkg/kv" "github.com/treeverse/lakefs/pkg/kv/kvparams" @@ -26,7 +27,7 @@ import ( type Driver struct{} type Store struct { - svc *dynamodb.DynamoDB + svc *dynamodb.Client params *kvparams.DynamoDB wg sync.WaitGroup logger logging.Logger @@ -36,7 +37,7 @@ type Store struct { type EntriesIterator struct { partitionKey []byte startKey []byte - exclusiveStartKey map[string]*dynamodb.AttributeValue + exclusiveStartKey map[string]types.AttributeValue scanCtx context.Context entry *kv.Entry @@ -73,41 +74,38 @@ func (d *Driver) Open(ctx context.Context, kvParams kvparams.Config) (kv.Store, if params == nil { return nil, fmt.Errorf("missing %s settings: %w", DriverName, kv.ErrDriverConfiguration) } - sess, err := session.NewSessionWithOptions(session.Options{ - SharedConfigState: session.SharedConfigEnable, - Profile: params.AwsProfile, - }) - if err != nil { - return nil, err - } - - cfg := aws.NewConfig() - if params.Endpoint != "" { - cfg.Endpoint = aws.String(params.Endpoint) - } + var opts []func(*config.LoadOptions) error if params.AwsRegion != "" { - cfg = cfg.WithRegion(params.AwsRegion) + opts = append(opts, config.WithRegion(params.AwsRegion)) + } + if params.AwsProfile != "" { + opts = append(opts, config.WithSharedConfigProfile(params.AwsProfile)) } - if params.AwsAccessKeyID != "" { - cfg = cfg.WithCredentials(credentials.NewCredentials( - &credentials.StaticProvider{ - Value: credentials.Value{ - AccessKeyID: params.AwsAccessKeyID, - SecretAccessKey: params.AwsSecretAccessKey, - }, - })) + opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( + params.AwsAccessKeyID, + params.AwsSecretAccessKey, + "", + ))) } const maxConnectionPerHost = 10 - cfg = cfg.WithHTTPClient(&http.Client{ - Transport: &http.Transport{ - MaxConnsPerHost: maxConnectionPerHost, - }, - }) + opts = append(opts, config.WithHTTPClient( + awshttp.NewBuildableClient().WithTransportOptions(func(transport *http.Transport) { + transport.MaxConnsPerHost = maxConnectionPerHost + }))) + + cfg, err := config.LoadDefaultConfig(ctx, opts...) + if err != nil { + return nil, err + } // Create DynamoDB client - svc := dynamodb.New(sess, cfg) + svc := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { + if params.Endpoint != "" { + o.BaseEndpoint = ¶ms.Endpoint + } + }) err = setupKeyValueDatabase(ctx, svc, params) if err != nil { @@ -127,53 +125,55 @@ func (d *Driver) Open(ctx context.Context, kvParams kvparams.Config) (kv.Store, } // isTableExist will try to describeTable and return bool status, error is returned only in case err != ResourceNotFoundException -func isTableExist(ctx context.Context, svc *dynamodb.DynamoDB, table string) (bool, error) { - _, err := svc.DescribeTableWithContext(ctx, &dynamodb.DescribeTableInput{ +func isTableExist(ctx context.Context, svc *dynamodb.Client, table string) (bool, error) { + _, err := svc.DescribeTable(ctx, &dynamodb.DescribeTableInput{ TableName: aws.String(table), }) if err != nil { - if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == dynamodb.ErrCodeResourceNotFoundException { + var errResNotFound *types.ResourceNotFoundException + if errors.As(err, &errResNotFound) { return false, nil } - return false, handleClientError(err) + return false, err } return true, nil } // setupKeyValueDatabase setup everything required to enable kv over postgres -func setupKeyValueDatabase(ctx context.Context, svc *dynamodb.DynamoDB, params *kvparams.DynamoDB) error { +func setupKeyValueDatabase(ctx context.Context, svc *dynamodb.Client, params *kvparams.DynamoDB) error { // main kv table exist, err := isTableExist(ctx, svc, params.TableName) if exist || err != nil { return err } - table, err := svc.CreateTableWithContext(ctx, &dynamodb.CreateTableInput{ + table, err := svc.CreateTable(ctx, &dynamodb.CreateTableInput{ TableName: aws.String(params.TableName), - BillingMode: aws.String(dynamodb.BillingModePayPerRequest), // On-Demand - AttributeDefinitions: []*dynamodb.AttributeDefinition{ + BillingMode: types.BillingModePayPerRequest, // On-Demand + AttributeDefinitions: []types.AttributeDefinition{ { AttributeName: aws.String(PartitionKey), - AttributeType: aws.String("B"), + AttributeType: types.ScalarAttributeTypeB, }, { AttributeName: aws.String(ItemKey), - AttributeType: aws.String("B"), + AttributeType: types.ScalarAttributeTypeB, }, }, - KeySchema: []*dynamodb.KeySchemaElement{ + KeySchema: []types.KeySchemaElement{ { AttributeName: aws.String(PartitionKey), - KeyType: aws.String("HASH"), + KeyType: types.KeyTypeHash, }, { AttributeName: aws.String(ItemKey), - KeyType: aws.String("RANGE"), + KeyType: types.KeyTypeRange, }, }, }) if err != nil { - if _, ok := err.(*dynamodb.ResourceInUseException); ok { + var errResInUse *types.ResourceInUseException + if errors.As(err, &errResInUse) { return nil } return err @@ -187,28 +187,28 @@ func setupKeyValueDatabase(ctx context.Context, svc *dynamodb.DynamoDB, params * bo.MaxInterval = maxInterval * time.Second bo.MaxElapsedTime = maxElapsed * time.Second err = backoff.Retry(func() error { - desc, err := svc.DescribeTableWithContext(ctx, &dynamodb.DescribeTableInput{ + desc, err := svc.DescribeTable(ctx, &dynamodb.DescribeTableInput{ TableName: table.TableDescription.TableName, }) if err != nil { // we shouldn't retry on anything but kv.ErrTableNotActive return backoff.Permanent(err) } - if *desc.Table.TableStatus != dynamodb.TableStatusActive { - return fmt.Errorf("table status(%s): %w", *desc.Table.TableStatus, kv.ErrTableNotActive) + if desc.Table.TableStatus != types.TableStatusActive { + return fmt.Errorf("table status(%s): %w", desc.Table.TableStatus, kv.ErrTableNotActive) } return nil }, bo) return err } -func (s *Store) bytesKeyToDynamoKey(partitionKey, key []byte) map[string]*dynamodb.AttributeValue { - return map[string]*dynamodb.AttributeValue{ - PartitionKey: { - B: partitionKey, +func (s *Store) bytesKeyToDynamoKey(partitionKey, key []byte) map[string]types.AttributeValue { + return map[string]types.AttributeValue{ + PartitionKey: &types.AttributeValueMemberB{ + Value: partitionKey, }, - ItemKey: { - B: key, + ItemKey: &types.AttributeValueMemberB{ + Value: key, }, } } @@ -220,15 +220,15 @@ func (s *Store) Get(ctx context.Context, partitionKey, key []byte) (*kv.ValueWit if len(key) == 0 { return nil, kv.ErrMissingKey } - result, err := s.svc.GetItemWithContext(ctx, &dynamodb.GetItemInput{ + result, err := s.svc.GetItem(ctx, &dynamodb.GetItemInput{ TableName: aws.String(s.params.TableName), Key: s.bytesKeyToDynamoKey(partitionKey, key), ConsistentRead: aws.Bool(true), - ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), + ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal, }) const operation = "GetItem" if err != nil { - return nil, fmt.Errorf("get item: %w", handleClientError(err)) + return nil, fmt.Errorf("get item: %w", err) } if result.ConsumedCapacity != nil { dynamoConsumedCapacity.WithLabelValues(operation).Add(*result.ConsumedCapacity.CapacityUnits) @@ -239,7 +239,7 @@ func (s *Store) Get(ctx context.Context, partitionKey, key []byte) (*kv.ValueWit } var item DynKVItem - err = dynamodbattribute.UnmarshalMap(result.Item, &item) + err = attributevalue.UnmarshalMap(result.Item, &item) if err != nil { return nil, fmt.Errorf("unmarshal map: %w", err) } @@ -275,7 +275,7 @@ func (s *Store) setWithOptionalPredicate(ctx context.Context, partitionKey, key, ItemValue: value, } - marshaledItem, err := dynamodbattribute.MarshalMap(item) + marshaledItem, err := attributevalue.MarshalMap(item) if err != nil { return fmt.Errorf("marshal map: %w", err) } @@ -283,7 +283,7 @@ func (s *Store) setWithOptionalPredicate(ctx context.Context, partitionKey, key, input := &dynamodb.PutItemInput{ Item: marshaledItem, TableName: &s.params.TableName, - ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), + ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal, } if usePredicate { switch valuePredicate { @@ -305,13 +305,14 @@ func (s *Store) setWithOptionalPredicate(ctx context.Context, partitionKey, key, } } - resp, err := s.svc.PutItemWithContext(ctx, input) + resp, err := s.svc.PutItem(ctx, input) const operation = "PutItem" if err != nil { - if _, ok := err.(*dynamodb.ConditionalCheckFailedException); ok && usePredicate { + var errConditionalCheckFailed *types.ConditionalCheckFailedException + if usePredicate && errors.As(err, &errConditionalCheckFailed) { return kv.ErrPredicateFailed } - return fmt.Errorf("put item: %w", handleClientError(err)) + return fmt.Errorf("put item: %w", err) } if resp.ConsumedCapacity != nil { dynamoConsumedCapacity.WithLabelValues(operation).Add(*resp.ConsumedCapacity.CapacityUnits) @@ -327,14 +328,14 @@ func (s *Store) Delete(ctx context.Context, partitionKey, key []byte) error { return kv.ErrMissingKey } - resp, err := s.svc.DeleteItemWithContext(ctx, &dynamodb.DeleteItemInput{ + resp, err := s.svc.DeleteItem(ctx, &dynamodb.DeleteItemInput{ TableName: aws.String(s.params.TableName), Key: s.bytesKeyToDynamoKey(partitionKey, key), - ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), + ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal, }) const operation = "DeleteItem" if err != nil { - return fmt.Errorf("delete item: %w", handleClientError(err)) + return fmt.Errorf("delete item: %w", err) } if resp.ConsumedCapacity != nil { dynamoConsumedCapacity.WithLabelValues(operation).Add(*resp.ConsumedCapacity.CapacityUnits) @@ -372,7 +373,8 @@ func (s *Store) Close() { // DropTable used internally for testing purposes func (s *Store) DropTable() error { - _, err := s.svc.DeleteTable(&dynamodb.DeleteTableInput{ + ctx := context.Background() + _, err := s.svc.DeleteTable(ctx, &dynamodb.DeleteTableInput{ TableName: &s.params.TableName, }) return err @@ -387,7 +389,7 @@ func (e *EntriesIterator) SeekGE(key []byte) { } var item DynKVItem e.currEntryIdx = sort.Search(len(e.queryResult.Items), func(i int) bool { - if e.err = dynamodbattribute.UnmarshalMap(e.queryResult.Items[i], &item); e.err != nil { + if e.err = attributevalue.UnmarshalMap(e.queryResult.Items[i], &item); e.err != nil { return false } return bytes.Compare(key, item.ItemKey) <= 0 @@ -409,7 +411,7 @@ func (e *EntriesIterator) Next() bool { } } var item DynKVItem - e.err = dynamodbattribute.UnmarshalMap(e.queryResult.Items[e.currEntryIdx], &item) + e.err = attributevalue.UnmarshalMap(e.queryResult.Items[e.currEntryIdx], &item) if e.err != nil { return false } @@ -434,16 +436,16 @@ func (e *EntriesIterator) Close() { } func (e *EntriesIterator) runQuery() { - expressionAttributeValues := map[string]*dynamodb.AttributeValue{ - ":partitionkey": { - B: e.partitionKey, + expressionAttributeValues := map[string]types.AttributeValue{ + ":partitionkey": &types.AttributeValueMemberB{ + Value: e.partitionKey, }, } keyConditionExpression := PartitionKey + " = :partitionkey" if len(e.startKey) > 0 { keyConditionExpression += " AND " + ItemKey + " >= :fromkey" - expressionAttributeValues[":fromkey"] = &dynamodb.AttributeValue{ - B: e.startKey, + expressionAttributeValues[":fromkey"] = &types.AttributeValueMemberB{ + Value: e.startKey, } } queryInput := &dynamodb.QueryInput{ @@ -453,16 +455,16 @@ func (e *EntriesIterator) runQuery() { ConsistentRead: aws.Bool(true), ScanIndexForward: aws.Bool(true), ExclusiveStartKey: e.exclusiveStartKey, - ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal), + ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal, } if e.limit != 0 { - queryInput.SetLimit(e.limit) + queryInput.Limit = aws.Int32(int32(e.limit)) } - queryResult, err := e.store.svc.QueryWithContext(e.scanCtx, queryInput) + queryResult, err := e.store.svc.Query(e.scanCtx, queryInput) const operation = "Query" if err != nil { - e.err = fmt.Errorf("query: %w", handleClientError(err)) + e.err = fmt.Errorf("query: %w", err) return } dynamoConsumedCapacity.WithLabelValues(operation).Add(*queryResult.ConsumedCapacity.CapacityUnits) @@ -475,11 +477,11 @@ func (e *EntriesIterator) isInRange(key []byte) bool { return false } var maxItem, minItem DynKVItem - e.err = dynamodbattribute.UnmarshalMap(e.queryResult.Items[0], &minItem) + e.err = attributevalue.UnmarshalMap(e.queryResult.Items[0], &minItem) if e.err != nil { return false } - e.err = dynamodbattribute.UnmarshalMap(e.queryResult.Items[len(e.queryResult.Items)-1], &maxItem) + e.err = attributevalue.UnmarshalMap(e.queryResult.Items[len(e.queryResult.Items)-1], &maxItem) if e.err != nil { return false } @@ -528,12 +530,3 @@ func (s *Store) StopPeriodicCheck() { s.cancel = nil } } - -func handleClientError(err error) error { - // extract original error if needed - var reqErr awserr.Error - if errors.As(err, &reqErr) && errors.Is(reqErr.OrigErr(), context.Canceled) { - err = reqErr.OrigErr() - } - return err -}