Skip to content

Commit

Permalink
KV scan paginate with exponential jumps (#8002)
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-har authored Jul 25, 2024
1 parent 04287d1 commit 62803f8
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/kv/cosmosdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ func (e *EntriesIterator) Next() bool {

// handleBatchSizeChange handles running query after the first query ran with limited batch size.
// The reason we switch the batch size is to avoid issues like https://github.com/treeverse/lakeFS/issues/7864
// as opposed to the exponential backoff approach in dynamoDB here we use a dynamic page size and let Cosmos DB manage paging.
func (e *EntriesIterator) handleBatchSizeChange() error {
e.startKey = e.entry.Key
e.batchSize = dynamicPageSize
Expand Down
16 changes: 14 additions & 2 deletions pkg/kv/dynamodb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,12 @@ func (s *Store) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOp
startKey: options.KeyStart,
scanCtx: ctx,
store: s,
limit: int(s.params.ScanLimit),
limit: int(firstScanLimit),
}

// Setting the limit just for the first scan to avoid issues like
// https://github.com/treeverse/lakeFS/issues/7864
it.runQuery(int(firstScanLimit))
it.runQuery(it.limit)
if it.err != nil {
err := it.err
if s.isSlowDownErr(it.err) {
Expand Down Expand Up @@ -456,6 +456,7 @@ func (e *EntriesIterator) Next() bool {
return false
}
e.exclusiveStartKey = e.queryResult.LastEvaluatedKey
e.doubleAndCapLimit()
e.runQuery(e.limit)
if e.err != nil {
return false
Expand All @@ -474,6 +475,17 @@ func (e *EntriesIterator) Next() bool {
return true
}

// doubleAndCapLimit doubles the limit up to the maximum allowed by the store
// this is done to avoid:
// 1. limit being too small and causing multiple queries on one side
// 2. limit being too large and causing a single query consuming too much capacity
func (e *EntriesIterator) doubleAndCapLimit() {
e.limit *= 2
if e.limit > int(e.store.params.ScanLimit) {
e.limit = int(e.store.params.ScanLimit)
}
}

func (e *EntriesIterator) Entry() *kv.Entry {
return e.entry
}
Expand Down
17 changes: 15 additions & 2 deletions pkg/kv/postgres/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,11 +327,11 @@ func (s *Store) Scan(ctx context.Context, partitionKey []byte, options kv.ScanOp
ctx: ctx,
partitionKey: partitionKey,
startKey: options.KeyStart,
limit: s.Params.ScanPageSize,
limit: firstScanLimit,
store: s,
includeStart: true,
}
it.runQuery(firstScanLimit)
it.runQuery(it.limit)
if it.err != nil {
return nil, it.err
}
Expand All @@ -355,6 +355,7 @@ func (e *EntriesIterator) Next() bool {
key := e.entries[e.currEntryIdx].Key
e.startKey = key
e.includeStart = false
e.doubleAndCapLimit()
e.runQuery(e.limit)
if e.err != nil || len(e.entries) == 0 {
return false
Expand All @@ -364,10 +365,22 @@ func (e *EntriesIterator) Next() bool {
return true
}

// DoubleAndCapLimit doubles the limit up to the maximum allowed by the store
// this is to avoid
// 1. limit being too small and causing multiple queries on one sid
// 2. limit being too large and causing a single query to be too slow
func (e *EntriesIterator) doubleAndCapLimit() {
e.limit *= 2
if e.limit > e.store.Params.ScanPageSize {
e.limit = e.store.Params.ScanPageSize
}
}

func (e *EntriesIterator) SeekGE(key []byte) {
if !e.isInRange(key) {
e.startKey = key
e.includeStart = true
e.doubleAndCapLimit()
e.runQuery(e.limit)
return
}
Expand Down

0 comments on commit 62803f8

Please sign in to comment.