diff --git a/pkg/bloomcompactor/batch.go b/pkg/bloomcompactor/batch.go new file mode 100644 index 0000000000000..2d43f83219df9 --- /dev/null +++ b/pkg/bloomcompactor/batch.go @@ -0,0 +1,95 @@ +package bloomcompactor + +import ( + "context" + + "github.com/grafana/dskit/multierror" + + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" +) + +// interface modeled from `pkg/storage/stores/shipper/bloomshipper.Fetcher` +type blocksFetcher interface { + FetchBlocks(context.Context, []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error) +} + +func newBatchedBlockLoader(ctx context.Context, fetcher blocksFetcher, blocks []bloomshipper.BlockRef) (*batchedBlockLoader, error) { + return &batchedBlockLoader{ + ctx: ctx, + batchSize: 10, // make configurable? + source: blocks, + fetcher: fetcher, + }, nil +} + +type batchedBlockLoader struct { + ctx context.Context + batchSize int + + source []bloomshipper.BlockRef + fetcher blocksFetcher + + batch []*bloomshipper.CloseableBlockQuerier + cur *bloomshipper.CloseableBlockQuerier + err error +} + +// At implements v1.CloseableIterator. +func (b *batchedBlockLoader) At() *bloomshipper.CloseableBlockQuerier { + return b.cur +} + +// Close implements v1.CloseableIterator. +func (b *batchedBlockLoader) Close() error { + if b.cur != nil { + return b.cur.Close() + } + return nil +} + +// CloseBatch closes the remaining items from the current batch +func (b *batchedBlockLoader) CloseBatch() error { + var err multierror.MultiError + for _, cur := range b.batch { + err.Add(cur.Close()) + } + if len(b.batch) > 0 { + b.batch = b.batch[:0] + } + return err.Err() +} + +// Err implements v1.CloseableIterator. +func (b *batchedBlockLoader) Err() error { + return b.err +} + +// Next implements v1.CloseableIterator. +func (b *batchedBlockLoader) Next() bool { + if len(b.batch) > 0 { + return b.setNext() + } + + if len(b.source) == 0 { + return false + } + + // setup next batch + batchSize := min(b.batchSize, len(b.source)) + toFetch := b.source[:batchSize] + + // update source + b.source = b.source[batchSize:] + + b.batch, b.err = b.fetcher.FetchBlocks(b.ctx, toFetch) + if b.err != nil { + return false + } + return b.setNext() +} + +func (b *batchedBlockLoader) setNext() bool { + b.cur, b.err = b.batch[0], nil + b.batch = b.batch[1:] + return true +} diff --git a/pkg/bloomcompactor/batch_test.go b/pkg/bloomcompactor/batch_test.go new file mode 100644 index 0000000000000..a1922bf931b86 --- /dev/null +++ b/pkg/bloomcompactor/batch_test.go @@ -0,0 +1,37 @@ +package bloomcompactor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" +) + +type dummyBlocksFetcher struct { + count *atomic.Int32 +} + +func (f *dummyBlocksFetcher) FetchBlocks(_ context.Context, blocks []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error) { + f.count.Inc() + return make([]*bloomshipper.CloseableBlockQuerier, len(blocks)), nil +} + +func TestBatchedBlockLoader(t *testing.T) { + ctx := context.Background() + f := &dummyBlocksFetcher{count: atomic.NewInt32(0)} + + blocks := make([]bloomshipper.BlockRef, 25) + blocksIter, err := newBatchedBlockLoader(ctx, f, blocks) + require.NoError(t, err) + + var count int + for blocksIter.Next() && blocksIter.Err() == nil { + count++ + } + + require.Equal(t, len(blocks), count) + require.Equal(t, int32(len(blocks)/blocksIter.batchSize+1), f.count.Load()) +} diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index 38831ef932e69..47d9627d92e1a 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "sort" "github.com/go-kit/log" @@ -138,45 +139,36 @@ func (s *SimpleBloomController) buildBlocks( for _, gap := range plan.gaps { // Fetch blocks that aren't up to date but are in the desired fingerprint range // to try and accelerate bloom creation - seriesItr, preExistingBlocks, err := s.loadWorkForGap(ctx, table, tenant, plan.tsdb, gap) + seriesItr, blocksIter, err := s.loadWorkForGap(ctx, table, tenant, plan.tsdb, gap) if err != nil { level.Error(logger).Log("msg", "failed to get series and blocks", "err", err) return errors.Wrap(err, "failed to get series and blocks") } - // Close all remaining blocks on exit - closePreExistingBlocks := func() { - var closeErrors multierror.MultiError - for _, block := range preExistingBlocks { - closeErrors.Add(block.Close()) - } - if err := closeErrors.Err(); err != nil { - level.Error(s.logger).Log("msg", "failed to close blocks", "err", err) - } - } gen := NewSimpleBloomGenerator( tenant, blockOpts, seriesItr, s.chunkLoader, - preExistingBlocks, + blocksIter, s.rwFn, s.metrics, - log.With(logger, "tsdb", plan.tsdb.Name(), "ownership", gap, "blocks", len(preExistingBlocks)), + log.With(logger, "tsdb", plan.tsdb.Name(), "ownership", gap), ) - _, newBlocks, err := gen.Generate(ctx) + _, loaded, newBlocks, err := gen.Generate(ctx) + if err != nil { // TODO(owen-d): metrics level.Error(logger).Log("msg", "failed to generate bloom", "err", err) - closePreExistingBlocks() + s.closeLoadedBlocks(loaded, blocksIter) return errors.Wrap(err, "failed to generate bloom") } client, err := s.bloomStore.Client(table.ModelTime()) if err != nil { level.Error(logger).Log("msg", "failed to get client", "err", err) - closePreExistingBlocks() + s.closeLoadedBlocks(loaded, blocksIter) return errors.Wrap(err, "failed to get client") } @@ -195,7 +187,7 @@ func (s *SimpleBloomController) buildBlocks( built, ); err != nil { level.Error(logger).Log("msg", "failed to write block", "err", err) - closePreExistingBlocks() + s.closeLoadedBlocks(loaded, blocksIter) return errors.Wrap(err, "failed to write block") } } @@ -203,12 +195,12 @@ func (s *SimpleBloomController) buildBlocks( if err := newBlocks.Err(); err != nil { // TODO(owen-d): metrics level.Error(logger).Log("msg", "failed to generate bloom", "err", err) - closePreExistingBlocks() + s.closeLoadedBlocks(loaded, blocksIter) return errors.Wrap(err, "failed to generate bloom") } // Close pre-existing blocks - closePreExistingBlocks() + s.closeLoadedBlocks(loaded, blocksIter) } } @@ -226,19 +218,49 @@ func (s *SimpleBloomController) loadWorkForGap( tenant string, id tsdb.Identifier, gap gapWithBlocks, -) (v1.CloseableIterator[*v1.Series], []*bloomshipper.CloseableBlockQuerier, error) { +) (v1.CloseableIterator[*v1.Series], v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier], error) { // load a series iterator for the gap seriesItr, err := s.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.bounds) if err != nil { return nil, nil, errors.Wrap(err, "failed to load tsdb") } - blocks, err := s.bloomStore.FetchBlocks(ctx, gap.blocks) + // load a blocks iterator for the gap + fetcher, err := s.bloomStore.Fetcher(table.ModelTime()) if err != nil { - return nil, nil, errors.Wrap(err, "failed to get blocks") + return nil, nil, errors.Wrap(err, "failed to get fetcher") } - return seriesItr, blocks, nil + blocksIter, err := newBatchedBlockLoader(ctx, fetcher, gap.blocks) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to load blocks") + } + + return seriesItr, blocksIter, nil +} + +func (s *SimpleBloomController) closeLoadedBlocks(toClose []io.Closer, it v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier]) { + // close loaded blocks + var err multierror.MultiError + for _, closer := range toClose { + err.Add(closer.Close()) + } + + switch itr := it.(type) { + case *batchedBlockLoader: + // close remaining loaded blocks from batch + err.Add(itr.CloseBatch()) + default: + // close remaining loaded blocks + for itr.Next() && itr.Err() == nil { + err.Add(itr.At().Close()) + } + } + + // log error + if err.Err() != nil { + level.Error(s.logger).Log("msg", "failed to close blocks", "err", err) + } } type gapWithBlocks struct { diff --git a/pkg/bloomcompactor/spec.go b/pkg/bloomcompactor/spec.go index 58dd2674895ed..4a1125082ca54 100644 --- a/pkg/bloomcompactor/spec.go +++ b/pkg/bloomcompactor/spec.go @@ -3,6 +3,7 @@ package bloomcompactor import ( "context" "fmt" + "io" "math" "time" @@ -39,7 +40,7 @@ func (k Keyspace) Cmp(other Keyspace) v1.BoundsCheck { // Store is likely bound within. This allows specifying impls like ShardedStore // to only request the shard-range needed from the existing store. type BloomGenerator interface { - Generate(ctx context.Context) (skippedBlocks []*v1.Block, results v1.Iterator[*v1.Block], err error) + Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, toClose []io.Closer, results v1.Iterator[*v1.Block], err error) } // Simple implementation of a BloomGenerator. @@ -47,9 +48,7 @@ type SimpleBloomGenerator struct { userID string store v1.Iterator[*v1.Series] chunkLoader ChunkLoader - // TODO(owen-d): blocks need not be all downloaded prior. Consider implementing - // as an iterator of iterators, where each iterator is a batch of overlapping blocks. - blocks []*bloomshipper.CloseableBlockQuerier + blocksIter v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier] // options to build blocks with opts v1.BlockOptions @@ -71,7 +70,7 @@ func NewSimpleBloomGenerator( opts v1.BlockOptions, store v1.Iterator[*v1.Series], chunkLoader ChunkLoader, - blocks []*bloomshipper.CloseableBlockQuerier, + blocksIter v1.CloseableIterator[*bloomshipper.CloseableBlockQuerier], readWriterFn func() (v1.BlockWriter, v1.BlockReader), metrics *Metrics, logger log.Logger, @@ -81,7 +80,7 @@ func NewSimpleBloomGenerator( opts: opts, store: store, chunkLoader: chunkLoader, - blocks: blocks, + blocksIter: blocksIter, logger: log.With(logger, "component", "bloom_generator"), readWriterFn: readWriterFn, metrics: metrics, @@ -108,9 +107,15 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se } -func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, results v1.Iterator[*v1.Block], err error) { - blocksMatchingSchema := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.blocks)) - for _, block := range s.blocks { +func (s *SimpleBloomGenerator) Generate(ctx context.Context) ([]v1.BlockMetadata, []io.Closer, v1.Iterator[*v1.Block], error) { + skippedBlocks := make([]v1.BlockMetadata, 0) + toClose := make([]io.Closer, 0) + blocksMatchingSchema := make([]*bloomshipper.CloseableBlockQuerier, 0) + + for s.blocksIter.Next() && s.blocksIter.Err() == nil { + block := s.blocksIter.At() + toClose = append(toClose, block) + logger := log.With(s.logger, "block", block.BlockRef) md, err := block.Metadata() schema := md.Options.Schema @@ -130,11 +135,16 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []v1 blocksMatchingSchema = append(blocksMatchingSchema, block) } + if s.blocksIter.Err() != nil { + // should we ignore the error and continue with the blocks we got? + return skippedBlocks, toClose, v1.NewSliceIter([]*v1.Block{}), s.blocksIter.Err() + } + level.Debug(s.logger).Log("msg", "generating bloom filters for blocks", "num_blocks", len(blocksMatchingSchema), "skipped_blocks", len(skippedBlocks), "schema", fmt.Sprintf("%+v", s.opts.Schema)) series := v1.NewPeekingIter(s.store) blockIter := NewLazyBlockBuilderIterator(ctx, s.opts, s.populator(ctx), s.readWriterFn, series, blocksMatchingSchema) - return skippedBlocks, blockIter, nil + return skippedBlocks, toClose, blockIter, nil } // LazyBlockBuilderIterator is a lazy iterator over blocks that builds diff --git a/pkg/bloomcompactor/spec_test.go b/pkg/bloomcompactor/spec_test.go index 44b1fa26a4d1f..bb4fde6cc2359 100644 --- a/pkg/bloomcompactor/spec_test.go +++ b/pkg/bloomcompactor/spec_test.go @@ -71,13 +71,14 @@ func dummyBloomGen(opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks [ BlockQuerier: v1.NewBlockQuerier(b), }) } + blocksIter := v1.NewCloseableIterator(v1.NewSliceIter(bqs)) return NewSimpleBloomGenerator( "fake", opts, store, dummyChunkLoader{}, - bqs, + blocksIter, func() (v1.BlockWriter, v1.BlockReader) { indexBuf := bytes.NewBuffer(nil) bloomsBuf := bytes.NewBuffer(nil) @@ -130,7 +131,7 @@ func TestSimpleBloomGenerator(t *testing.T) { ) gen := dummyBloomGen(tc.toSchema, storeItr, sourceBlocks) - skipped, results, err := gen.Generate(context.Background()) + skipped, _, results, err := gen.Generate(context.Background()) require.Nil(t, err) require.Equal(t, tc.numSkipped, len(skipped)) diff --git a/pkg/storage/bloom/v1/util.go b/pkg/storage/bloom/v1/util.go index d980a9ecc4df1..3b9e0631b715d 100644 --- a/pkg/storage/bloom/v1/util.go +++ b/pkg/storage/bloom/v1/util.go @@ -247,6 +247,18 @@ type CloseableIterator[T any] interface { Close() error } +func NewCloseableIterator[T io.Closer](itr Iterator[T]) *CloseIter[T] { + return &CloseIter[T]{itr} +} + +type CloseIter[T io.Closer] struct { + Iterator[T] +} + +func (i *CloseIter[T]) Close() error { + return i.At().Close() +} + type PeekingCloseableIterator[T any] interface { PeekingIterator[T] CloseableIterator[T]