diff --git a/pkg/kv/cosmosdb/store.go b/pkg/kv/cosmosdb/store.go index 73b17c061e2..881534d29ce 100644 --- a/pkg/kv/cosmosdb/store.go +++ b/pkg/kv/cosmosdb/store.go @@ -454,6 +454,7 @@ func (e *EntriesIterator) Next() bool { // handleBatchSizeChange handles running query after the first query ran with limited batch size. // The reason we switch the batch size is to avoid issues like https://github.com/treeverse/lakeFS/issues/7864 +// as opposed to the exponential backoff approach in dynamoDB here we use a dynamic page size and let Cosmos DB manage paging. func (e *EntriesIterator) handleBatchSizeChange() error { e.startKey = e.entry.Key e.batchSize = dynamicPageSize diff --git a/pkg/kv/dynamodb/store.go b/pkg/kv/dynamodb/store.go index 27ae0d5a259..d9f4f420625 100644 --- a/pkg/kv/dynamodb/store.go +++ b/pkg/kv/dynamodb/store.go @@ -390,12 +390,12 @@ func (s *Store) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOp startKey: options.KeyStart, scanCtx: ctx, store: s, - limit: int(s.params.ScanLimit), + limit: int(firstScanLimit), } // Setting the limit just for the first scan to avoid issues like // https://github.com/treeverse/lakeFS/issues/7864 - it.runQuery(int(firstScanLimit)) + it.runQuery(it.limit) if it.err != nil { err := it.err if s.isSlowDownErr(it.err) { @@ -456,6 +456,7 @@ func (e *EntriesIterator) Next() bool { return false } e.exclusiveStartKey = e.queryResult.LastEvaluatedKey + e.doubleAndCapLimit() e.runQuery(e.limit) if e.err != nil { return false @@ -474,6 +475,17 @@ func (e *EntriesIterator) Next() bool { return true } +// doubleAndCapLimit doubles the limit up to the maximum allowed by the store +// this is done to avoid: +// 1. limit being too small and causing multiple queries on one side +// 2. limit being too large and causing a single query consuming too much capacity +func (e *EntriesIterator) doubleAndCapLimit() { + e.limit *= 2 + if e.limit > int(e.store.params.ScanLimit) { + e.limit = int(e.store.params.ScanLimit) + } +} + func (e *EntriesIterator) Entry() *kv.Entry { return e.entry } diff --git a/pkg/kv/postgres/store.go b/pkg/kv/postgres/store.go index 99aa9631dad..525c925ee92 100644 --- a/pkg/kv/postgres/store.go +++ b/pkg/kv/postgres/store.go @@ -327,11 +327,11 @@ func (s *Store) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOp ctx: ctx, partitionKey: partitionKey, startKey: options.KeyStart, - limit: s.Params.ScanPageSize, + limit: firstScanLimit, store: s, includeStart: true, } - it.runQuery(firstScanLimit) + it.runQuery(it.limit) if it.err != nil { return nil, it.err } @@ -355,6 +355,7 @@ func (e *EntriesIterator) Next() bool { key := e.entries[e.currEntryIdx].Key e.startKey = key e.includeStart = false + e.doubleAndCapLimit() e.runQuery(e.limit) if e.err != nil || len(e.entries) == 0 { return false @@ -364,10 +365,22 @@ func (e *EntriesIterator) Next() bool { return true } +// DoubleAndCapLimit doubles the limit up to the maximum allowed by the store +// this is to avoid +// 1. limit being too small and causing multiple queries on one sid +// 2. limit being too large and causing a single query to be too slow +func (e *EntriesIterator) doubleAndCapLimit() { + e.limit *= 2 + if e.limit > e.store.Params.ScanPageSize { + e.limit = e.store.Params.ScanPageSize + } +} + func (e *EntriesIterator) SeekGE(key []byte) { if !e.isInRange(key) { e.startKey = key e.includeStart = true + e.doubleAndCapLimit() e.runQuery(e.limit) return }