Skip to content

Commit

Permalink
Bloom compactor: Load blocks lazily in batches (#11919)
Browse files Browse the repository at this point in the history
To avoid loading possibly lots of blocks upfront, this PR introduces
lazy loading of blocks in batches using an iterator that loads blocks on
demand.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Feb 13, 2024
1 parent 0bb2574 commit eb8464a
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 35 deletions.
95 changes: 95 additions & 0 deletions pkg/bloomcompactor/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package bloomcompactor

import (
"context"

"github.com/grafana/dskit/multierror"

"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

// interface modeled from `pkg/storage/stores/shipper/bloomshipper.Fetcher`
type blocksFetcher interface {
FetchBlocks(context.Context, []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error)
}

func newBatchedBlockLoader(ctx context.Context, fetcher blocksFetcher, blocks []bloomshipper.BlockRef) (*batchedBlockLoader, error) {
return &batchedBlockLoader{
ctx: ctx,
batchSize: 10, // make configurable?
source: blocks,
fetcher: fetcher,
}, nil
}

type batchedBlockLoader struct {
ctx context.Context
batchSize int

source []bloomshipper.BlockRef
fetcher blocksFetcher

batch []*bloomshipper.CloseableBlockQuerier
cur *bloomshipper.CloseableBlockQuerier
err error
}

// At implements v1.CloseableIterator.
func (b *batchedBlockLoader) At() *bloomshipper.CloseableBlockQuerier {
return b.cur
}

// Close implements v1.CloseableIterator.
func (b *batchedBlockLoader) Close() error {
if b.cur != nil {
return b.cur.Close()
}
return nil
}

// CloseBatch closes the remaining items from the current batch
func (b *batchedBlockLoader) CloseBatch() error {
var err multierror.MultiError
for _, cur := range b.batch {
err.Add(cur.Close())
}
if len(b.batch) > 0 {
b.batch = b.batch[:0]
}
return err.Err()
}

// Err implements v1.CloseableIterator.
func (b *batchedBlockLoader) Err() error {
return b.err
}

// Next implements v1.CloseableIterator.
func (b *batchedBlockLoader) Next() bool {
if len(b.batch) > 0 {
return b.setNext()
}

if len(b.source) == 0 {
return false
}

// setup next batch
batchSize := min(b.batchSize, len(b.source))
toFetch := b.source[:batchSize]

// update source
b.source = b.source[batchSize:]

b.batch, b.err = b.fetcher.FetchBlocks(b.ctx, toFetch)
if b.err != nil {
return false
}
return b.setNext()
}

func (b *batchedBlockLoader) setNext() bool {
b.cur, b.err = b.batch[0], nil
b.batch = b.batch[1:]
return true
}
37 changes: 37 additions & 0 deletions pkg/bloomcompactor/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package bloomcompactor

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

type dummyBlocksFetcher struct {
count *atomic.Int32
}

func (f *dummyBlocksFetcher) FetchBlocks(_ context.Context, blocks []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error) {
f.count.Inc()
return make([]*bloomshipper.CloseableBlockQuerier, len(blocks)), nil
}

func TestBatchedBlockLoader(t *testing.T) {
ctx := context.Background()
f := &dummyBlocksFetcher{count: atomic.NewInt32(0)}

blocks := make([]bloomshipper.BlockRef, 25)
blocksIter, err := newBatchedBlockLoader(ctx, f, blocks)
require.NoError(t, err)

var count int
for blocksIter.Next() && blocksIter.Err() == nil {
count++
}

require.Equal(t, len(blocks), count)
require.Equal(t, int32(len(blocks)/blocksIter.batchSize+1), f.count.Load())
}
68 changes: 45 additions & 23 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"sort"

"github.com/go-kit/log"
Expand Down Expand Up @@ -138,45 +139,36 @@ func (s *SimpleBloomController) buildBlocks(
for _, gap := range plan.gaps {
// Fetch blocks that aren't up to date but are in the desired fingerprint range
// to try and accelerate bloom creation
seriesItr, preExistingBlocks, err := s.loadWorkForGap(ctx, table, tenant, plan.tsdb, gap)
seriesItr, blocksIter, err := s.loadWorkForGap(ctx, table, tenant, plan.tsdb, gap)
if err != nil {
level.Error(logger).Log("msg", "failed to get series and blocks", "err", err)
return errors.Wrap(err, "failed to get series and blocks")
}
// Close all remaining blocks on exit
closePreExistingBlocks := func() {
var closeErrors multierror.MultiError
for _, block := range preExistingBlocks {
closeErrors.Add(block.Close())
}
if err := closeErrors.Err(); err != nil {
level.Error(s.logger).Log("msg", "failed to close blocks", "err", err)
}
}

gen := NewSimpleBloomGenerator(
tenant,
blockOpts,
seriesItr,
s.chunkLoader,
preExistingBlocks,
blocksIter,
s.rwFn,
s.metrics,
log.With(logger, "tsdb", plan.tsdb.Name(), "ownership", gap, "blocks", len(preExistingBlocks)),
log.With(logger, "tsdb", plan.tsdb.Name(), "ownership", gap),
)

_, newBlocks, err := gen.Generate(ctx)
_, loaded, newBlocks, err := gen.Generate(ctx)

if err != nil {
// TODO(owen-d): metrics
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
closePreExistingBlocks()
s.closeLoadedBlocks(loaded, blocksIter)
return errors.Wrap(err, "failed to generate bloom")
}

client, err := s.bloomStore.Client(table.ModelTime())
if err != nil {
level.Error(logger).Log("msg", "failed to get client", "err", err)
closePreExistingBlocks()
s.closeLoadedBlocks(loaded, blocksIter)
return errors.Wrap(err, "failed to get client")
}

Expand All @@ -195,20 +187,20 @@ func (s *SimpleBloomController) buildBlocks(
built,
); err != nil {
level.Error(logger).Log("msg", "failed to write block", "err", err)
closePreExistingBlocks()
s.closeLoadedBlocks(loaded, blocksIter)
return errors.Wrap(err, "failed to write block")
}
}

if err := newBlocks.Err(); err != nil {
// TODO(owen-d): metrics
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
closePreExistingBlocks()
s.closeLoadedBlocks(loaded, blocksIter)
return errors.Wrap(err, "failed to generate bloom")
}

// Close pre-existing blocks
closePreExistingBlocks()
s.closeLoadedBlocks(loaded, blocksIter)
}
}

Expand All @@ -226,19 +218,49 @@ func (s *SimpleBloomController) loadWorkForGap(
tenant string,
id tsdb.Identifier,
gap gapWithBlocks,
) (v1.CloseableIterator[*v1.Series], []*bloomshipper.CloseableBlockQuerier, error) {
) (v1.CloseableIterator[*v1.Series], v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier], error) {
// load a series iterator for the gap
seriesItr, err := s.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.bounds)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load tsdb")
}

blocks, err := s.bloomStore.FetchBlocks(ctx, gap.blocks)
// load a blocks iterator for the gap
fetcher, err := s.bloomStore.Fetcher(table.ModelTime())
if err != nil {
return nil, nil, errors.Wrap(err, "failed to get blocks")
return nil, nil, errors.Wrap(err, "failed to get fetcher")
}

return seriesItr, blocks, nil
blocksIter, err := newBatchedBlockLoader(ctx, fetcher, gap.blocks)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load blocks")
}

return seriesItr, blocksIter, nil
}

func (s *SimpleBloomController) closeLoadedBlocks(toClose []io.Closer, it v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier]) {
// close loaded blocks
var err multierror.MultiError
for _, closer := range toClose {
err.Add(closer.Close())
}

switch itr := it.(type) {
case *batchedBlockLoader:
// close remaining loaded blocks from batch
err.Add(itr.CloseBatch())
default:
// close remaining loaded blocks
for itr.Next() && itr.Err() == nil {
err.Add(itr.At().Close())
}
}

// log error
if err.Err() != nil {
level.Error(s.logger).Log("msg", "failed to close blocks", "err", err)
}
}

type gapWithBlocks struct {
Expand Down
30 changes: 20 additions & 10 deletions pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bloomcompactor
import (
"context"
"fmt"
"io"
"math"
"time"

Expand Down Expand Up @@ -39,17 +40,15 @@ func (k Keyspace) Cmp(other Keyspace) v1.BoundsCheck {
// Store is likely bound within. This allows specifying impls like ShardedStore<Store>
// to only request the shard-range needed from the existing store.
type BloomGenerator interface {
Generate(ctx context.Context) (skippedBlocks []*v1.Block, results v1.Iterator[*v1.Block], err error)
Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, toClose []io.Closer, results v1.Iterator[*v1.Block], err error)
}

// Simple implementation of a BloomGenerator.
type SimpleBloomGenerator struct {
userID string
store v1.Iterator[*v1.Series]
chunkLoader ChunkLoader
// TODO(owen-d): blocks need not be all downloaded prior. Consider implementing
// as an iterator of iterators, where each iterator is a batch of overlapping blocks.
blocks []*bloomshipper.CloseableBlockQuerier
blocksIter v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier]

// options to build blocks with
opts v1.BlockOptions
Expand All @@ -71,7 +70,7 @@ func NewSimpleBloomGenerator(
opts v1.BlockOptions,
store v1.Iterator[*v1.Series],
chunkLoader ChunkLoader,
blocks []*bloomshipper.CloseableBlockQuerier,
blocksIter v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier],
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
metrics *Metrics,
logger log.Logger,
Expand All @@ -81,7 +80,7 @@ func NewSimpleBloomGenerator(
opts: opts,
store: store,
chunkLoader: chunkLoader,
blocks: blocks,
blocksIter: blocksIter,
logger: log.With(logger, "component", "bloom_generator"),
readWriterFn: readWriterFn,
metrics: metrics,
Expand All @@ -108,9 +107,15 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se

}

func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, results v1.Iterator[*v1.Block], err error) {
blocksMatchingSchema := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.blocks))
for _, block := range s.blocks {
func (s *SimpleBloomGenerator) Generate(ctx context.Context) ([]v1.BlockMetadata, []io.Closer, v1.Iterator[*v1.Block], error) {
skippedBlocks := make([]v1.BlockMetadata, 0)
toClose := make([]io.Closer, 0)
blocksMatchingSchema := make([]*bloomshipper.CloseableBlockQuerier, 0)

for s.blocksIter.Next() && s.blocksIter.Err() == nil {
block := s.blocksIter.At()
toClose = append(toClose, block)

logger := log.With(s.logger, "block", block.BlockRef)
md, err := block.Metadata()
schema := md.Options.Schema
Expand All @@ -130,11 +135,16 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []v1
blocksMatchingSchema = append(blocksMatchingSchema, block)
}

if s.blocksIter.Err() != nil {
// should we ignore the error and continue with the blocks we got?
return skippedBlocks, toClose, v1.NewSliceIter([]*v1.Block{}), s.blocksIter.Err()
}

level.Debug(s.logger).Log("msg", "generating bloom filters for blocks", "num_blocks", len(blocksMatchingSchema), "skipped_blocks", len(skippedBlocks), "schema", fmt.Sprintf("%+v", s.opts.Schema))

series := v1.NewPeekingIter(s.store)
blockIter := NewLazyBlockBuilderIterator(ctx, s.opts, s.populator(ctx), s.readWriterFn, series, blocksMatchingSchema)
return skippedBlocks, blockIter, nil
return skippedBlocks, toClose, blockIter, nil
}

// LazyBlockBuilderIterator is a lazy iterator over blocks that builds
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,14 @@ func dummyBloomGen(opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks [
BlockQuerier: v1.NewBlockQuerier(b),
})
}
blocksIter := v1.NewCloseableIterator(v1.NewSliceIter(bqs))

return NewSimpleBloomGenerator(
"fake",
opts,
store,
dummyChunkLoader{},
bqs,
blocksIter,
func() (v1.BlockWriter, v1.BlockReader) {
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
)

gen := dummyBloomGen(tc.toSchema, storeItr, sourceBlocks)
skipped, results, err := gen.Generate(context.Background())
skipped, _, results, err := gen.Generate(context.Background())
require.Nil(t, err)
require.Equal(t, tc.numSkipped, len(skipped))

Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/bloom/v1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,18 @@ type CloseableIterator[T any] interface {
Close() error
}

func NewCloseableIterator[T io.Closer](itr Iterator[T]) *CloseIter[T] {
return &CloseIter[T]{itr}
}

type CloseIter[T io.Closer] struct {
Iterator[T]
}

func (i *CloseIter[T]) Close() error {
return i.At().Close()
}

type PeekingCloseableIterator[T any] interface {
PeekingIterator[T]
CloseableIterator[T]
Expand Down

0 comments on commit eb8464a

Please sign in to comment.