diff --git a/pkg/kv/cosmosdb/store.go b/pkg/kv/cosmosdb/store.go index f37f951df1f..b75e50eddf9 100644 --- a/pkg/kv/cosmosdb/store.go +++ b/pkg/kv/cosmosdb/store.go @@ -356,11 +356,10 @@ func (s *Store) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOp store: s, partitionKey: partitionKey, startKey: options.KeyStart, - limit: options.BatchSize, queryCtx: ctx, encoding: encoding, } - if err := it.runQuery(); err != nil { + if err := it.runQuery(options.BatchSize); err != nil { return nil, convertError(err) } return it, nil @@ -373,7 +372,6 @@ type EntriesIterator struct { store *Store partitionKey []byte startKey []byte - limit int entry *kv.Entry err error @@ -445,7 +443,8 @@ func (e *EntriesIterator) Next() bool { func (e *EntriesIterator) SeekGE(key []byte) { e.startKey = key if !e.isInRange() { - if err := e.runQuery(); err != nil { + // '-1' Used for dynamic page size. + if err := e.runQuery(-1); err != nil { e.err = convertError(err) } return @@ -476,11 +475,11 @@ func (e *EntriesIterator) Close() { e.err = kv.ErrClosedEntries } -func (e *EntriesIterator) runQuery() error { +func (e *EntriesIterator) runQuery(limit int) error { pk := azcosmos.NewPartitionKeyString(encoding.EncodeToString(e.partitionKey)) e.queryPager = e.store.containerClient.NewQueryItemsPager("select * from c where c.key >= @start order by c.key", pk, &azcosmos.QueryOptions{ ConsistencyLevel: e.store.consistencyLevel.ToPtr(), - PageSizeHint: int32(e.limit), + PageSizeHint: int32(limit), QueryParameters: []azcosmos.QueryParameter{{ Name: "@start", Value: encoding.EncodeToString(e.startKey), diff --git a/pkg/kv/dynamodb/store.go b/pkg/kv/dynamodb/store.go index fc1ad746b49..27ae0d5a259 100644 --- a/pkg/kv/dynamodb/store.go +++ b/pkg/kv/dynamodb/store.go @@ -45,7 +45,7 @@ type EntriesIterator struct { store *Store queryResult *dynamodb.QueryOutput currEntryIdx int - limit int64 + limit int } type DynKVItem struct { @@ -380,19 +380,22 @@ func (s *Store) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOp return nil, kv.ErrMissingPartitionKey } // limit set to the minimum 'params.ScanLimit' and 'options.BatchSize', unless 0 (not set) - limit := s.params.ScanLimit + firstScanLimit := s.params.ScanLimit batchSize := int64(options.BatchSize) - if batchSize != 0 && limit != 0 && batchSize < limit { - limit = batchSize + if batchSize != 0 && firstScanLimit != 0 && batchSize < firstScanLimit { + firstScanLimit = batchSize } it := &EntriesIterator{ partitionKey: partitionKey, startKey: options.KeyStart, scanCtx: ctx, store: s, - limit: limit, + limit: int(s.params.ScanLimit), } - it.runQuery() + + // Setting the limit just for the first scan to avoid issues like + // https://github.com/treeverse/lakeFS/issues/7864 + it.runQuery(int(firstScanLimit)) if it.err != nil { err := it.err if s.isSlowDownErr(it.err) { @@ -427,7 +430,7 @@ func (e *EntriesIterator) SeekGE(key []byte) { if !e.isInRange(key) { e.startKey = key e.exclusiveStartKey = nil - e.runQuery() + e.runQuery(e.limit) return } var item DynKVItem @@ -453,7 +456,7 @@ func (e *EntriesIterator) Next() bool { return false } e.exclusiveStartKey = e.queryResult.LastEvaluatedKey - e.runQuery() + e.runQuery(e.limit) if e.err != nil { return false } @@ -483,7 +486,7 @@ func (e *EntriesIterator) Close() { e.err = kv.ErrClosedEntries } -func (e *EntriesIterator) runQuery() { +func (e *EntriesIterator) runQuery(limit int) { expressionAttributeValues := map[string]types.AttributeValue{ ":partitionkey": &types.AttributeValueMemberB{ Value: e.partitionKey, @@ -505,9 +508,8 @@ func (e *EntriesIterator) runQuery() { ExclusiveStartKey: e.exclusiveStartKey, ReturnConsumedCapacity: types.ReturnConsumedCapacityTotal, } - if e.limit != 0 { - queryInput.Limit = aws.Int32(int32(e.limit)) - } + + queryInput.Limit = aws.Int32(int32(limit)) queryResult, err := e.store.svc.Query(e.scanCtx, queryInput) const operation = "Query" diff --git a/pkg/kv/postgres/store.go b/pkg/kv/postgres/store.go index 67b0c28f14d..99aa9631dad 100644 --- a/pkg/kv/postgres/store.go +++ b/pkg/kv/postgres/store.go @@ -34,9 +34,9 @@ type EntriesIterator struct { includeStart bool store *Store entries []kv.Entry - limit int currEntryIdx int err error + limit int } const ( @@ -318,20 +318,20 @@ func (s *Store) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOp return nil, kv.ErrMissingPartitionKey } - // limit based on the minimum between ScanPageSize and ScanOptions batch size - limit := s.Params.ScanPageSize + // firstScanLimit based on the minimum between ScanPageSize and ScanOptions batch size + firstScanLimit := s.Params.ScanPageSize if options.BatchSize != 0 && s.Params.ScanPageSize != 0 && options.BatchSize < s.Params.ScanPageSize { - limit = options.BatchSize + firstScanLimit = options.BatchSize } it := &EntriesIterator{ ctx: ctx, partitionKey: partitionKey, startKey: options.KeyStart, + limit: s.Params.ScanPageSize, store: s, - limit: limit, includeStart: true, } - it.runQuery() + it.runQuery(firstScanLimit) if it.err != nil { return nil, it.err } @@ -355,7 +355,7 @@ func (e *EntriesIterator) Next() bool { key := e.entries[e.currEntryIdx].Key e.startKey = key e.includeStart = false - e.runQuery() + e.runQuery(e.limit) if e.err != nil || len(e.entries) == 0 { return false } @@ -368,7 +368,7 @@ func (e *EntriesIterator) SeekGE(key []byte) { if !e.isInRange(key) { e.startKey = key e.includeStart = true - e.runQuery() + e.runQuery(e.limit) return } for i := range e.entries { @@ -397,19 +397,19 @@ func (e *EntriesIterator) Close() { e.err = kv.ErrClosedEntries } -func (e *EntriesIterator) runQuery() { +func (e *EntriesIterator) runQuery(scanLimit int) { var ( rows pgx.Rows err error ) if e.startKey == nil { - rows, err = e.store.Pool.Query(e.ctx, `SELECT partition_key,key,value FROM `+e.store.Params.SanitizedTableName+` WHERE partition_key=$1 ORDER BY key LIMIT $2`, e.partitionKey, e.limit) + rows, err = e.store.Pool.Query(e.ctx, `SELECT partition_key,key,value FROM `+e.store.Params.SanitizedTableName+` WHERE partition_key=$1 ORDER BY key LIMIT $2`, e.partitionKey, scanLimit) } else { compareOp := ">=" if !e.includeStart { compareOp = ">" } - rows, err = e.store.Pool.Query(e.ctx, `SELECT partition_key,key,value FROM `+e.store.Params.SanitizedTableName+` WHERE partition_key=$1 AND key `+compareOp+` $2 ORDER BY key LIMIT $3`, e.partitionKey, e.startKey, e.limit) + rows, err = e.store.Pool.Query(e.ctx, `SELECT partition_key,key,value FROM `+e.store.Params.SanitizedTableName+` WHERE partition_key=$1 AND key `+compareOp+` $2 ORDER BY key LIMIT $3`, e.partitionKey, e.startKey, scanLimit) } if err != nil { e.err = fmt.Errorf("postgres scan: %w", err)