forked from grafana/loki
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Bloom compactor: Load blocks lazily in batches (grafana#11919)
To avoid loading possibly lots of blocks upfront, this PR introduces lazy loading of blocks in batches using an iterator that loads blocks on demand. Signed-off-by: Christian Haudum <[email protected]>
- Loading branch information
Showing
6 changed files
with
212 additions
and
35 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
package bloomcompactor | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/grafana/dskit/multierror" | ||
|
||
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" | ||
) | ||
|
||
// interface modeled from `pkg/storage/stores/shipper/bloomshipper.Fetcher` | ||
type blocksFetcher interface { | ||
FetchBlocks(context.Context, []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error) | ||
} | ||
|
||
func newBatchedBlockLoader(ctx context.Context, fetcher blocksFetcher, blocks []bloomshipper.BlockRef) (*batchedBlockLoader, error) { | ||
return &batchedBlockLoader{ | ||
ctx: ctx, | ||
batchSize: 10, // make configurable? | ||
source: blocks, | ||
fetcher: fetcher, | ||
}, nil | ||
} | ||
|
||
type batchedBlockLoader struct { | ||
ctx context.Context | ||
batchSize int | ||
|
||
source []bloomshipper.BlockRef | ||
fetcher blocksFetcher | ||
|
||
batch []*bloomshipper.CloseableBlockQuerier | ||
cur *bloomshipper.CloseableBlockQuerier | ||
err error | ||
} | ||
|
||
// At implements v1.CloseableIterator. | ||
func (b *batchedBlockLoader) At() *bloomshipper.CloseableBlockQuerier { | ||
return b.cur | ||
} | ||
|
||
// Close implements v1.CloseableIterator. | ||
func (b *batchedBlockLoader) Close() error { | ||
if b.cur != nil { | ||
return b.cur.Close() | ||
} | ||
return nil | ||
} | ||
|
||
// CloseBatch closes the remaining items from the current batch | ||
func (b *batchedBlockLoader) CloseBatch() error { | ||
var err multierror.MultiError | ||
for _, cur := range b.batch { | ||
err.Add(cur.Close()) | ||
} | ||
if len(b.batch) > 0 { | ||
b.batch = b.batch[:0] | ||
} | ||
return err.Err() | ||
} | ||
|
||
// Err implements v1.CloseableIterator. | ||
func (b *batchedBlockLoader) Err() error { | ||
return b.err | ||
} | ||
|
||
// Next implements v1.CloseableIterator. | ||
func (b *batchedBlockLoader) Next() bool { | ||
if len(b.batch) > 0 { | ||
return b.setNext() | ||
} | ||
|
||
if len(b.source) == 0 { | ||
return false | ||
} | ||
|
||
// setup next batch | ||
batchSize := min(b.batchSize, len(b.source)) | ||
toFetch := b.source[:batchSize] | ||
|
||
// update source | ||
b.source = b.source[batchSize:] | ||
|
||
b.batch, b.err = b.fetcher.FetchBlocks(b.ctx, toFetch) | ||
if b.err != nil { | ||
return false | ||
} | ||
return b.setNext() | ||
} | ||
|
||
func (b *batchedBlockLoader) setNext() bool { | ||
b.cur, b.err = b.batch[0], nil | ||
b.batch = b.batch[1:] | ||
return true | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package bloomcompactor | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
"go.uber.org/atomic" | ||
|
||
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" | ||
) | ||
|
||
type dummyBlocksFetcher struct { | ||
count *atomic.Int32 | ||
} | ||
|
||
func (f *dummyBlocksFetcher) FetchBlocks(_ context.Context, blocks []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error) { | ||
f.count.Inc() | ||
return make([]*bloomshipper.CloseableBlockQuerier, len(blocks)), nil | ||
} | ||
|
||
func TestBatchedBlockLoader(t *testing.T) { | ||
ctx := context.Background() | ||
f := &dummyBlocksFetcher{count: atomic.NewInt32(0)} | ||
|
||
blocks := make([]bloomshipper.BlockRef, 25) | ||
blocksIter, err := newBatchedBlockLoader(ctx, f, blocks) | ||
require.NoError(t, err) | ||
|
||
var count int | ||
for blocksIter.Next() && blocksIter.Err() == nil { | ||
count++ | ||
} | ||
|
||
require.Equal(t, len(blocks), count) | ||
require.Equal(t, int32(len(blocks)/blocksIter.batchSize+1), f.count.Load()) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters