Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bloom compactor: Load blocks lazily in batches #11919

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions pkg/bloomcompactor/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package bloomcompactor

import (
"context"

"github.com/grafana/dskit/multierror"

"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

// interface modeled from `pkg/storage/stores/shipper/bloomshipper.Fetcher`
type blocksFetcher interface {
FetchBlocks(context.Context, []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error)
}

func newBatchedBlockLoader(ctx context.Context, fetcher blocksFetcher, blocks []bloomshipper.BlockRef) (*batchedBlockLoader, error) {
return &batchedBlockLoader{
ctx: ctx,
batchSize: 10, // make configurable?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this should be configurable

source: blocks,
fetcher: fetcher,
}, nil
}

type batchedBlockLoader struct {
ctx context.Context
batchSize int

source []bloomshipper.BlockRef
fetcher blocksFetcher

batch []*bloomshipper.CloseableBlockQuerier
cur *bloomshipper.CloseableBlockQuerier
err error
}

// At implements v1.CloseableIterator.
func (b *batchedBlockLoader) At() *bloomshipper.CloseableBlockQuerier {
return b.cur
}

// Close implements v1.CloseableIterator.
func (b *batchedBlockLoader) Close() error {
if b.cur != nil {
return b.cur.Close()
}
return nil
}

// CloseBatch closes the remaining items from the current batch
func (b *batchedBlockLoader) CloseBatch() error {
var err multierror.MultiError
for _, cur := range b.batch {
err.Add(cur.Close())
}
if len(b.batch) > 0 {
b.batch = b.batch[:0]
}
return err.Err()
}

// Err implements v1.CloseableIterator.
func (b *batchedBlockLoader) Err() error {
return b.err
}

// Next implements v1.CloseableIterator.
func (b *batchedBlockLoader) Next() bool {
if len(b.batch) > 0 {
return b.setNext()
}

if len(b.source) == 0 {
return false
}

// setup next batch
batchSize := min(b.batchSize, len(b.source))
toFetch := b.source[:batchSize]

// update source
b.source = b.source[batchSize:]

b.batch, b.err = b.fetcher.FetchBlocks(b.ctx, toFetch)
if b.err != nil {
return false
}
return b.setNext()
}

func (b *batchedBlockLoader) setNext() bool {
b.cur, b.err = b.batch[0], nil
b.batch = b.batch[1:]
return true
}
37 changes: 37 additions & 0 deletions pkg/bloomcompactor/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package bloomcompactor

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)

type dummyBlocksFetcher struct {
count *atomic.Int32
}

func (f *dummyBlocksFetcher) FetchBlocks(_ context.Context, blocks []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error) {
f.count.Inc()
return make([]*bloomshipper.CloseableBlockQuerier, len(blocks)), nil
}

func TestBatchedBlockLoader(t *testing.T) {
ctx := context.Background()
f := &dummyBlocksFetcher{count: atomic.NewInt32(0)}

blocks := make([]bloomshipper.BlockRef, 25)
blocksIter, err := newBatchedBlockLoader(ctx, f, blocks)
require.NoError(t, err)

var count int
for blocksIter.Next() && blocksIter.Err() == nil {
count++
}

require.Equal(t, len(blocks), count)
require.Equal(t, int32(len(blocks)/blocksIter.batchSize+1), f.count.Load())
}
68 changes: 45 additions & 23 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"sort"

"github.com/go-kit/log"
Expand Down Expand Up @@ -138,45 +139,36 @@ func (s *SimpleBloomController) buildBlocks(
for _, gap := range plan.gaps {
// Fetch blocks that aren't up to date but are in the desired fingerprint range
// to try and accelerate bloom creation
seriesItr, preExistingBlocks, err := s.loadWorkForGap(ctx, table, tenant, plan.tsdb, gap)
seriesItr, blocksIter, err := s.loadWorkForGap(ctx, table, tenant, plan.tsdb, gap)
if err != nil {
level.Error(logger).Log("msg", "failed to get series and blocks", "err", err)
return errors.Wrap(err, "failed to get series and blocks")
}
// Close all remaining blocks on exit
closePreExistingBlocks := func() {
var closeErrors multierror.MultiError
for _, block := range preExistingBlocks {
closeErrors.Add(block.Close())
}
if err := closeErrors.Err(); err != nil {
level.Error(s.logger).Log("msg", "failed to close blocks", "err", err)
}
}

gen := NewSimpleBloomGenerator(
tenant,
blockOpts,
seriesItr,
s.chunkLoader,
preExistingBlocks,
blocksIter,
s.rwFn,
s.metrics,
log.With(logger, "tsdb", plan.tsdb.Name(), "ownership", gap, "blocks", len(preExistingBlocks)),
log.With(logger, "tsdb", plan.tsdb.Name(), "ownership", gap),
)

_, newBlocks, err := gen.Generate(ctx)
_, loaded, newBlocks, err := gen.Generate(ctx)

if err != nil {
// TODO(owen-d): metrics
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
closePreExistingBlocks()
s.closeLoadedBlocks(loaded, blocksIter)
return errors.Wrap(err, "failed to generate bloom")
}

client, err := s.bloomStore.Client(table.ModelTime())
if err != nil {
level.Error(logger).Log("msg", "failed to get client", "err", err)
closePreExistingBlocks()
s.closeLoadedBlocks(loaded, blocksIter)
return errors.Wrap(err, "failed to get client")
}

Expand All @@ -195,20 +187,20 @@ func (s *SimpleBloomController) buildBlocks(
built,
); err != nil {
level.Error(logger).Log("msg", "failed to write block", "err", err)
closePreExistingBlocks()
s.closeLoadedBlocks(loaded, blocksIter)
return errors.Wrap(err, "failed to write block")
}
}

if err := newBlocks.Err(); err != nil {
// TODO(owen-d): metrics
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
closePreExistingBlocks()
s.closeLoadedBlocks(loaded, blocksIter)
return errors.Wrap(err, "failed to generate bloom")
}

// Close pre-existing blocks
closePreExistingBlocks()
s.closeLoadedBlocks(loaded, blocksIter)
}
}

Expand All @@ -226,19 +218,49 @@ func (s *SimpleBloomController) loadWorkForGap(
tenant string,
id tsdb.Identifier,
gap gapWithBlocks,
) (v1.CloseableIterator[*v1.Series], []*bloomshipper.CloseableBlockQuerier, error) {
) (v1.CloseableIterator[*v1.Series], v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier], error) {
// load a series iterator for the gap
seriesItr, err := s.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.bounds)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load tsdb")
}

blocks, err := s.bloomStore.FetchBlocks(ctx, gap.blocks)
// load a blocks iterator for the gap
fetcher, err := s.bloomStore.Fetcher(table.ModelTime())
if err != nil {
return nil, nil, errors.Wrap(err, "failed to get blocks")
return nil, nil, errors.Wrap(err, "failed to get fetcher")
}

return seriesItr, blocks, nil
blocksIter, err := newBatchedBlockLoader(ctx, fetcher, gap.blocks)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load blocks")
}

return seriesItr, blocksIter, nil
}

func (s *SimpleBloomController) closeLoadedBlocks(toClose []io.Closer, it v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier]) {
// close loaded blocks
var err multierror.MultiError
for _, closer := range toClose {
err.Add(closer.Close())
}

switch itr := it.(type) {
case *batchedBlockLoader:
// close remaining loaded blocks from batch
err.Add(itr.CloseBatch())
default:
// close remaining loaded blocks
for itr.Next() && itr.Err() == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we ever reach here? As far as I can see closeLoadedBlocks is only called from buildBlocks which will pass whatever loadWorkForGap returns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the function accepts an interface v1.CloseableInterator[*bloomshipper.CloseableBlockQuerier] and not a concrete type, it could be possible that a non-batched version of the iterator is passed.

err.Add(itr.At().Close())
}
}

// log error
if err.Err() != nil {
level.Error(s.logger).Log("msg", "failed to close blocks", "err", err)
}
}

type gapWithBlocks struct {
Expand Down
30 changes: 20 additions & 10 deletions pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bloomcompactor
import (
"context"
"fmt"
"io"
"math"
"time"

Expand Down Expand Up @@ -39,17 +40,15 @@ func (k Keyspace) Cmp(other Keyspace) v1.BoundsCheck {
// Store is likely bound within. This allows specifying impls like ShardedStore<Store>
// to only request the shard-range needed from the existing store.
type BloomGenerator interface {
Generate(ctx context.Context) (skippedBlocks []*v1.Block, results v1.Iterator[*v1.Block], err error)
Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, toClose []io.Closer, results v1.Iterator[*v1.Block], err error)
}

// Simple implementation of a BloomGenerator.
type SimpleBloomGenerator struct {
userID string
store v1.Iterator[*v1.Series]
chunkLoader ChunkLoader
// TODO(owen-d): blocks need not be all downloaded prior. Consider implementing
// as an iterator of iterators, where each iterator is a batch of overlapping blocks.
blocks []*bloomshipper.CloseableBlockQuerier
blocksIter v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier]

// options to build blocks with
opts v1.BlockOptions
Expand All @@ -71,7 +70,7 @@ func NewSimpleBloomGenerator(
opts v1.BlockOptions,
store v1.Iterator[*v1.Series],
chunkLoader ChunkLoader,
blocks []*bloomshipper.CloseableBlockQuerier,
blocksIter v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier],
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
metrics *Metrics,
logger log.Logger,
Expand All @@ -81,7 +80,7 @@ func NewSimpleBloomGenerator(
opts: opts,
store: store,
chunkLoader: chunkLoader,
blocks: blocks,
blocksIter: blocksIter,
logger: log.With(logger, "component", "bloom_generator"),
readWriterFn: readWriterFn,
metrics: metrics,
Expand All @@ -108,9 +107,15 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se

}

func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, results v1.Iterator[*v1.Block], err error) {
blocksMatchingSchema := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.blocks))
for _, block := range s.blocks {
func (s *SimpleBloomGenerator) Generate(ctx context.Context) ([]v1.BlockMetadata, []io.Closer, v1.Iterator[*v1.Block], error) {
skippedBlocks := make([]v1.BlockMetadata, 0)
toClose := make([]io.Closer, 0)
blocksMatchingSchema := make([]*bloomshipper.CloseableBlockQuerier, 0)

for s.blocksIter.Next() && s.blocksIter.Err() == nil {
block := s.blocksIter.At()
toClose = append(toClose, block)

logger := log.With(s.logger, "block", block.BlockRef)
md, err := block.Metadata()
schema := md.Options.Schema
Expand All @@ -130,11 +135,16 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []v1
blocksMatchingSchema = append(blocksMatchingSchema, block)
}

if s.blocksIter.Err() != nil {
// should we ignore the error and continue with the blocks we got?
return skippedBlocks, toClose, v1.NewSliceIter([]*v1.Block{}), s.blocksIter.Err()
}

level.Debug(s.logger).Log("msg", "generating bloom filters for blocks", "num_blocks", len(blocksMatchingSchema), "skipped_blocks", len(skippedBlocks), "schema", fmt.Sprintf("%+v", s.opts.Schema))

series := v1.NewPeekingIter(s.store)
blockIter := NewLazyBlockBuilderIterator(ctx, s.opts, s.populator(ctx), s.readWriterFn, series, blocksMatchingSchema)
return skippedBlocks, blockIter, nil
return skippedBlocks, toClose, blockIter, nil
}

// LazyBlockBuilderIterator is a lazy iterator over blocks that builds
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,14 @@ func dummyBloomGen(opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks [
BlockQuerier: v1.NewBlockQuerier(b),
})
}
blocksIter := v1.NewCloseableIterator(v1.NewSliceIter(bqs))

return NewSimpleBloomGenerator(
"fake",
opts,
store,
dummyChunkLoader{},
bqs,
blocksIter,
func() (v1.BlockWriter, v1.BlockReader) {
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
)

gen := dummyBloomGen(tc.toSchema, storeItr, sourceBlocks)
skipped, results, err := gen.Generate(context.Background())
skipped, _, results, err := gen.Generate(context.Background())
require.Nil(t, err)
require.Equal(t, tc.numSkipped, len(skipped))

Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/bloom/v1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,18 @@ type CloseableIterator[T any] interface {
Close() error
}

func NewCloseableIterator[T io.Closer](itr Iterator[T]) *CloseIter[T] {
return &CloseIter[T]{itr}
}

type CloseIter[T io.Closer] struct {
Iterator[T]
}

func (i *CloseIter[T]) Close() error {
return i.At().Close()
}

type PeekingCloseableIterator[T any] interface {
PeekingIterator[T]
CloseableIterator[T]
Expand Down
Loading