Skip to content

Commit

Permalink
Bloom compactor: Load blocks lazily in batches
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Feb 13, 2024
1 parent 0bb2574 commit 35c21ac
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 35 deletions.
95 changes: 95 additions & 0 deletions pkg/bloomcompactor/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package bloomcompactor

import (
"context"

"github.com/grafana/dskit/multierror"

"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

// interface modeled from `pkg/storage/stores/shipper/bloomshipper.Fetcher`
type blocksFetcher interface {
FetchBlocks(context.Context, []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error)
}

func newBatchedBlockLoader(ctx context.Context, fetcher blocksFetcher, blocks []bloomshipper.BlockRef) (*batchedBlockLoader, error) {
return &batchedBlockLoader{
ctx: ctx,
batchSize: 10, // make configurable?
source: blocks,
fetcher: fetcher,
}, nil
}

type batchedBlockLoader struct {
ctx context.Context
batchSize int

source []bloomshipper.BlockRef
fetcher blocksFetcher

batch []*bloomshipper.CloseableBlockQuerier
cur *bloomshipper.CloseableBlockQuerier
err error
}

// At implements v1.CloseableIterator.
func (b *batchedBlockLoader) At() *bloomshipper.CloseableBlockQuerier {
return b.cur
}

// Close implements v1.CloseableIterator.
func (b *batchedBlockLoader) Close() error {
if b.cur != nil {
return b.cur.Close()
}
return nil
}

// CloseBatch closes the remaining items from the current batch
func (b *batchedBlockLoader) CloseBatch() error {
var err multierror.MultiError
for _, cur := range b.batch {
err.Add(cur.Close())
}
if len(b.batch) > 0 {
b.batch = b.batch[:0]
}
return err.Err()
}

// Err implements v1.CloseableIterator.
func (b *batchedBlockLoader) Err() error {
return b.err
}

// Next implements v1.CloseableIterator.
func (b *batchedBlockLoader) Next() bool {
if len(b.batch) > 0 {
return b.setNext()
}

if len(b.source) == 0 {
return false
}

// setup next batch
batchSize := min(b.batchSize, len(b.source))
toFetch := b.source[:batchSize]

// update source
b.source = b.source[batchSize:]

b.batch, b.err = b.fetcher.FetchBlocks(b.ctx, toFetch)
if b.err != nil {
return false
}
return b.setNext()
}

func (b *batchedBlockLoader) setNext() bool {
b.cur, b.err = b.batch[0], nil
b.batch = b.batch[1:]
return true
}
37 changes: 37 additions & 0 deletions pkg/bloomcompactor/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package bloomcompactor

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

type dummyBlocksFetcher struct {
count *atomic.Int32
}

func (f *dummyBlocksFetcher) FetchBlocks(_ context.Context, blocks []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error) {
f.count.Inc()
return make([]*bloomshipper.CloseableBlockQuerier, len(blocks)), nil
}

func TestBatchedBlockLoader(t *testing.T) {
ctx := context.Background()
f := &dummyBlocksFetcher{count: atomic.NewInt32(0)}

blocks := make([]bloomshipper.BlockRef, 25)
blocksIter, err := newBatchedBlockLoader(ctx, f, blocks)
require.NoError(t, err)

var count int
for blocksIter.Next() && blocksIter.Err() == nil {
count++
}

require.Equal(t, len(blocks), count)
require.Equal(t, int32(len(blocks)/blocksIter.batchSize+1), f.count.Load())
}
68 changes: 45 additions & 23 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"sort"

"github.com/go-kit/log"
Expand Down Expand Up @@ -138,45 +139,36 @@ func (s *SimpleBloomController) buildBlocks(
for _, gap := range plan.gaps {
// Fetch blocks that aren't up to date but are in the desired fingerprint range
// to try and accelerate bloom creation
seriesItr, preExistingBlocks, err := s.loadWorkForGap(ctx, table, tenant, plan.tsdb, gap)
seriesItr, blocksIter, err := s.loadWorkForGap(ctx, table, tenant, plan.tsdb, gap)
if err != nil {
level.Error(logger).Log("msg", "failed to get series and blocks", "err", err)
return errors.Wrap(err, "failed to get series and blocks")
}
// Close all remaining blocks on exit
closePreExistingBlocks := func() {
var closeErrors multierror.MultiError
for _, block := range preExistingBlocks {
closeErrors.Add(block.Close())
}
if err := closeErrors.Err(); err != nil {
level.Error(s.logger).Log("msg", "failed to close blocks", "err", err)
}
}

gen := NewSimpleBloomGenerator(
tenant,
blockOpts,
seriesItr,
s.chunkLoader,
preExistingBlocks,
blocksIter,
s.rwFn,
s.metrics,
log.With(logger, "tsdb", plan.tsdb.Name(), "ownership", gap, "blocks", len(preExistingBlocks)),
log.With(logger, "tsdb", plan.tsdb.Name(), "ownership", gap),
)

_, newBlocks, err := gen.Generate(ctx)
_, loaded, newBlocks, err := gen.Generate(ctx)

if err != nil {
// TODO(owen-d): metrics
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
closePreExistingBlocks()
s.closeLoadedBlocks(loaded, blocksIter)
return errors.Wrap(err, "failed to generate bloom")
}

client, err := s.bloomStore.Client(table.ModelTime())
if err != nil {
level.Error(logger).Log("msg", "failed to get client", "err", err)
closePreExistingBlocks()
s.closeLoadedBlocks(loaded, blocksIter)
return errors.Wrap(err, "failed to get client")
}

Expand All @@ -195,20 +187,20 @@ func (s *SimpleBloomController) buildBlocks(
built,
); err != nil {
level.Error(logger).Log("msg", "failed to write block", "err", err)
closePreExistingBlocks()
s.closeLoadedBlocks(loaded, blocksIter)
return errors.Wrap(err, "failed to write block")
}
}

if err := newBlocks.Err(); err != nil {
// TODO(owen-d): metrics
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
closePreExistingBlocks()
s.closeLoadedBlocks(loaded, blocksIter)
return errors.Wrap(err, "failed to generate bloom")
}

// Close pre-existing blocks
closePreExistingBlocks()
s.closeLoadedBlocks(loaded, blocksIter)
}
}

Expand All @@ -226,19 +218,49 @@ func (s *SimpleBloomController) loadWorkForGap(
tenant string,
id tsdb.Identifier,
gap gapWithBlocks,
) (v1.CloseableIterator[*v1.Series], []*bloomshipper.CloseableBlockQuerier, error) {
) (v1.CloseableIterator[*v1.Series], v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier], error) {
// load a series iterator for the gap
seriesItr, err := s.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.bounds)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load tsdb")
}

blocks, err := s.bloomStore.FetchBlocks(ctx, gap.blocks)
// load a blocks iterator for the gap
fetcher, err := s.bloomStore.Fetcher(table.ModelTime())
if err != nil {
return nil, nil, errors.Wrap(err, "failed to get blocks")
return nil, nil, errors.Wrap(err, "failed to get fetcher")
}

return seriesItr, blocks, nil
blocksIter, err := newBatchedBlockLoader(ctx, fetcher, gap.blocks)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load blocks")
}

return seriesItr, blocksIter, nil
}

func (s *SimpleBloomController) closeLoadedBlocks(toClose []io.Closer, it v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier]) {
// close loaded blocks
var err multierror.MultiError
for _, closer := range toClose {
err.Add(closer.Close())
}

switch itr := it.(type) {
case *batchedBlockLoader:
// close remaining loaded blocks from batch
err.Add(itr.CloseBatch())
default:
// close remaining loaded blocks
for itr.Next() && itr.Err() == nil {
err.Add(itr.At().Close())
}
}

// log error
if err.Err() != nil {
level.Error(s.logger).Log("msg", "failed to close blocks", "err", err)
}
}

type gapWithBlocks struct {
Expand Down
30 changes: 20 additions & 10 deletions pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bloomcompactor
import (
"context"
"fmt"
"io"
"math"
"time"

Expand Down Expand Up @@ -39,17 +40,15 @@ func (k Keyspace) Cmp(other Keyspace) v1.BoundsCheck {
// Store is likely bound within. This allows specifying impls like ShardedStore<Store>
// to only request the shard-range needed from the existing store.
type BloomGenerator interface {
Generate(ctx context.Context) (skippedBlocks []*v1.Block, results v1.Iterator[*v1.Block], err error)
Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, toClose []io.Closer, results v1.Iterator[*v1.Block], err error)
}

// Simple implementation of a BloomGenerator.
type SimpleBloomGenerator struct {
userID string
store v1.Iterator[*v1.Series]
chunkLoader ChunkLoader
// TODO(owen-d): blocks need not be all downloaded prior. Consider implementing
// as an iterator of iterators, where each iterator is a batch of overlapping blocks.
blocks []*bloomshipper.CloseableBlockQuerier
blocksIter v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier]

// options to build blocks with
opts v1.BlockOptions
Expand All @@ -71,7 +70,7 @@ func NewSimpleBloomGenerator(
opts v1.BlockOptions,
store v1.Iterator[*v1.Series],
chunkLoader ChunkLoader,
blocks []*bloomshipper.CloseableBlockQuerier,
blocksIter v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier],
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
metrics *Metrics,
logger log.Logger,
Expand All @@ -81,7 +80,7 @@ func NewSimpleBloomGenerator(
opts: opts,
store: store,
chunkLoader: chunkLoader,
blocks: blocks,
blocksIter: blocksIter,
logger: log.With(logger, "component", "bloom_generator"),
readWriterFn: readWriterFn,
metrics: metrics,
Expand All @@ -108,9 +107,15 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se

}

func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, results v1.Iterator[*v1.Block], err error) {
blocksMatchingSchema := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.blocks))
for _, block := range s.blocks {
func (s *SimpleBloomGenerator) Generate(ctx context.Context) ([]v1.BlockMetadata, []io.Closer, v1.Iterator[*v1.Block], error) {
skippedBlocks := make([]v1.BlockMetadata, 0)
toClose := make([]io.Closer, 0)
blocksMatchingSchema := make([]*bloomshipper.CloseableBlockQuerier, 0)

for s.blocksIter.Next() && s.blocksIter.Err() == nil {
block := s.blocksIter.At()
toClose = append(toClose, block)

logger := log.With(s.logger, "block", block.BlockRef)
md, err := block.Metadata()
schema := md.Options.Schema
Expand All @@ -130,11 +135,16 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []v1
blocksMatchingSchema = append(blocksMatchingSchema, block)
}

if s.blocksIter.Err() != nil {
// should we ignore the error and continue with the blocks we got?
return skippedBlocks, toClose, v1.NewSliceIter([]*v1.Block{}), s.blocksIter.Err()
}

level.Debug(s.logger).Log("msg", "generating bloom filters for blocks", "num_blocks", len(blocksMatchingSchema), "skipped_blocks", len(skippedBlocks), "schema", fmt.Sprintf("%+v", s.opts.Schema))

series := v1.NewPeekingIter(s.store)
blockIter := NewLazyBlockBuilderIterator(ctx, s.opts, s.populator(ctx), s.readWriterFn, series, blocksMatchingSchema)
return skippedBlocks, blockIter, nil
return skippedBlocks, toClose, blockIter, nil
}

// LazyBlockBuilderIterator is a lazy iterator over blocks that builds
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,14 @@ func dummyBloomGen(opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks [
BlockQuerier: v1.NewBlockQuerier(b),
})
}
blocksIter := v1.NewCloseableIterator(v1.NewSliceIter(bqs))

return NewSimpleBloomGenerator(
"fake",
opts,
store,
dummyChunkLoader{},
bqs,
blocksIter,
func() (v1.BlockWriter, v1.BlockReader) {
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
)

gen := dummyBloomGen(tc.toSchema, storeItr, sourceBlocks)
skipped, results, err := gen.Generate(context.Background())
skipped, _, results, err := gen.Generate(context.Background())
require.Nil(t, err)
require.Equal(t, tc.numSkipped, len(skipped))

Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/bloom/v1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,18 @@ type CloseableIterator[T any] interface {
Close() error
}

func NewCloseableIterator[T io.Closer](itr Iterator[T]) *CloseIter[T] {
return &CloseIter[T]{itr}
}

type CloseIter[T io.Closer] struct {
Iterator[T]
}

func (i *CloseIter[T]) Close() error {
return i.At().Close()
}

type PeekingCloseableIterator[T any] interface {
PeekingIterator[T]
CloseableIterator[T]
Expand Down

0 comments on commit 35c21ac

Please sign in to comment.