diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 51ecb12af62f1..14406ac37cc8c 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -3098,6 +3098,12 @@ shard_streams: # CLI flag: -bloom-compactor.chunks-batch-size [bloom_compactor_chunks_batch_size: | default = 100] +# The maximum bloom block size. Default is unlimited. The actual block size +# might exceed this limit since blooms will be added to blocks until the block +# exceeds the maximum block size. +# CLI flag: -bloom-compactor.max-block-size +[bloom_compactor_max_block_size: | default = 0B] + # Length of the n-grams created when computing blooms from log lines. # CLI flag: -bloom-compactor.ngram-length [bloom_ngram_length: | default = 4] diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index dbe307ff18822..1895b92a6159b 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -504,7 +504,8 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, metasMatchingJob, blocksMatchingJob := matchingBlocks(metas, job) localDst := createLocalDirName(c.cfg.WorkingDirectory, job) - blockOptions := v1.NewBlockOptions(bt.GetNGramLength(), bt.GetNGramSkip()) + maxBlockSize := c.limits.BloomCompactorMaxBlockSize(job.tenantID) + blockOptions := v1.NewBlockOptions(bt.GetNGramLength(), bt.GetNGramSkip(), maxBlockSize) defer func() { //clean up the bloom directory @@ -513,10 +514,12 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, } }() - var resultingBlock bloomshipper.Block + var resultingBlocks []bloomshipper.Block defer func() { - if resultingBlock.Data != nil { - _ = resultingBlock.Data.Close() + for _, resultingBlock := range resultingBlocks { + if resultingBlock.Data != nil { + _ = resultingBlock.Data.Close() + } } }() @@ -535,7 +538,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, return err } - resultingBlock, err = compactNewChunks(ctx, logger, job, bt, storeClient.chunk, builder, c.limits) + resultingBlocks, err = compactNewChunks(ctx, logger, job, bt, storeClient.chunk, builder, c.limits) if err != nil { return level.Error(logger).Log("msg", "failed compacting new chunks", "err", err) } @@ -546,7 +549,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, var populate = createPopulateFunc(ctx, job, storeClient, bt, c.limits) - seriesIter := makeSeriesIterFromSeriesMeta(job) + seriesIter := v1.NewPeekingIter[*v1.Series](makeSeriesIterFromSeriesMeta(job)) blockIters, blockPaths, err := makeBlockIterFromBlocks(ctx, logger, c.bloomShipperClient, blocksMatchingJob, c.cfg.WorkingDirectory) defer func() { @@ -568,7 +571,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, return err } - resultingBlock, err = mergeCompactChunks(logger, populate, mergeBlockBuilder, blockIters, seriesIter, job) + resultingBlocks, err = mergeCompactChunks(logger, populate, mergeBlockBuilder, blockIters, seriesIter, job) if err != nil { level.Error(logger).Log("msg", "failed merging existing blocks with new chunks", "err", err) return err @@ -576,24 +579,33 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, } - archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String()) + level.Debug(logger).Log("msg", "blocks created", "resultingBlocks", len(resultingBlocks)) - blockToUpload, err := bloomshipper.CompressBloomBlock(resultingBlock.BlockRef, archivePath, localDst, logger) - if err != nil { - level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err) - return err + blocksToUpload := make([]bloomshipper.Block, 0, len(resultingBlocks)) + archivesPaths := make([]string, 0, len(resultingBlocks)) + for _, block := range resultingBlocks { + archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String()) + blockToUpload, err := bloomshipper.CompressBloomBlock(block.BlockRef, archivePath, block.BlockRef.BlockPath, logger) + if err != nil { + level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err) + return err + } + blocksToUpload = append(blocksToUpload, blockToUpload) + archivesPaths = append(archivesPaths, archivePath) } defer func() { - err = os.Remove(archivePath) - if err != nil { - level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath) + for _, archivePath := range archivesPaths { + err = os.Remove(archivePath) + if err != nil { + level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath) + } } }() // Do not change the signature of PutBlocks yet. // Once block size is limited potentially, compactNewChunks will return multiple blocks, hence a list is appropriate. - storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blockToUpload}) + storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, blocksToUpload) if err != nil { level.Error(logger).Log("msg", "failed uploading blocks to storage", "err", err) return err diff --git a/pkg/bloomcompactor/chunkcompactor.go b/pkg/bloomcompactor/chunkcompactor.go index c4993ccc62a59..02cc942a603b2 100644 --- a/pkg/bloomcompactor/chunkcompactor.go +++ b/pkg/bloomcompactor/chunkcompactor.go @@ -31,38 +31,73 @@ type chunkClient interface { } type blockBuilder interface { - BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (uint32, error) + // BuildFrom build a block from the given iterator. + // If the data is too large to fit in a single block, the iterator won't be consumed completely. In this case, + // call BuildFrom again with the same iterator to build the next block. + BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (string, uint32, v1.SeriesBounds, error) + // Data returns a reader for the last block built by BuildFrom. Data() (io.ReadSeekCloser, error) } type PersistentBlockBuilder struct { - builder *v1.BlockBuilder - localDst string + blockOptions v1.BlockOptions + lastBlockIdx uint64 + baseLocalDst string + currentBlockPath string } func NewPersistentBlockBuilder(localDst string, blockOptions v1.BlockOptions) (*PersistentBlockBuilder, error) { + builder := PersistentBlockBuilder{ + blockOptions: blockOptions, + baseLocalDst: localDst, + } + + return &builder, nil +} + +func (p *PersistentBlockBuilder) getNextBuilder() (*v1.BlockBuilder, error) { // write bloom to a local dir - b, err := v1.NewBlockBuilder(blockOptions, v1.NewDirectoryBlockWriter(localDst)) + blockPath := filepath.Join(p.baseLocalDst, fmt.Sprintf("%d", p.lastBlockIdx)) + builder, err := v1.NewBlockBuilder(p.blockOptions, v1.NewDirectoryBlockWriter(blockPath)) if err != nil { return nil, err } - builder := PersistentBlockBuilder{ - builder: b, - localDst: localDst, - } - return &builder, nil + p.currentBlockPath = blockPath + p.lastBlockIdx++ + return builder, nil } -func (p *PersistentBlockBuilder) BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (uint32, error) { - return p.builder.BuildFrom(itr) +func (p *PersistentBlockBuilder) BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (string, uint32, v1.SeriesBounds, error) { + + b, err := p.getNextBuilder() + if err != nil { + return "", 0, v1.SeriesBounds{}, err + } + + checksum, bounds, err := b.BuildFrom(itr) + if err != nil { + return "", 0, v1.SeriesBounds{}, err + } + + return p.currentBlockPath, checksum, bounds, nil } -func (p *PersistentBlockBuilder) mergeBuild(builder *v1.MergeBuilder) (uint32, error) { - return builder.Build(p.builder) +func (p *PersistentBlockBuilder) mergeBuild(builder *v1.MergeBuilder) (string, uint32, v1.SeriesBounds, error) { + b, err := p.getNextBuilder() + if err != nil { + return "", 0, v1.SeriesBounds{}, err + } + + checksum, bounds, err := builder.Build(b) + if err != nil { + return "", 0, v1.SeriesBounds{}, err + } + + return p.currentBlockPath, checksum, bounds, nil } func (p *PersistentBlockBuilder) Data() (io.ReadSeekCloser, error) { - blockFile, err := os.Open(filepath.Join(p.localDst, v1.BloomFileName)) + blockFile, err := os.Open(filepath.Join(p.currentBlockPath, v1.BloomFileName)) if err != nil { return nil, err } @@ -87,10 +122,13 @@ func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fing } func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compactorTokenizer, chunks v1.Iterator[[]chunk.Chunk]) (v1.SeriesWithBloom, error) { + chunkRefs := makeChunkRefsFromChunkMetas(seriesMeta.chunkRefs) + // Create a bloom for this series bloomForChks := v1.SeriesWithBloom{ Series: &v1.Series{ Fingerprint: seriesMeta.seriesFP, + Chunks: chunkRefs, }, Bloom: &v1.Bloom{ ScalableBloomFilter: *filter.NewDefaultScalableBloomFilter(fpRate), @@ -106,47 +144,62 @@ func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compa } // TODO Test this when bloom block size check is implemented -func buildBlockFromBlooms( +func buildBlocksFromBlooms( ctx context.Context, logger log.Logger, builder blockBuilder, - blooms v1.Iterator[v1.SeriesWithBloom], + blooms v1.PeekingIterator[v1.SeriesWithBloom], job Job, -) (bloomshipper.Block, error) { +) ([]bloomshipper.Block, error) { // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). if err := ctx.Err(); err != nil { - return bloomshipper.Block{}, err - } - - checksum, err := builder.BuildFrom(blooms) - if err != nil { - level.Error(logger).Log("msg", "failed writing to bloom", "err", err) - return bloomshipper.Block{}, err + return []bloomshipper.Block{}, err } - data, err := builder.Data() - if err != nil { - level.Error(logger).Log("msg", "failed reading bloom data", "err", err) - return bloomshipper.Block{}, err - } - - block := bloomshipper.Block{ - BlockRef: bloomshipper.BlockRef{ - Ref: bloomshipper.Ref{ - TenantID: job.tenantID, - TableName: job.tableName, - MinFingerprint: uint64(job.minFp), - MaxFingerprint: uint64(job.maxFp), - StartTimestamp: job.from, - EndTimestamp: job.through, - Checksum: checksum, + // TODO(salvacorts): Reuse buffer + blocks := make([]bloomshipper.Block, 0) + + for { + // Create blocks until the iterator is empty + if _, hasNext := blooms.Peek(); !hasNext { + break + } + + if err := ctx.Err(); err != nil { + return []bloomshipper.Block{}, err + } + + blockPath, checksum, bounds, err := builder.BuildFrom(blooms) + if err != nil { + level.Error(logger).Log("msg", "failed writing to bloom", "err", err) + return []bloomshipper.Block{}, err + } + + data, err := builder.Data() + if err != nil { + level.Error(logger).Log("msg", "failed reading bloom data", "err", err) + return []bloomshipper.Block{}, err + } + + blocks = append(blocks, bloomshipper.Block{ + BlockRef: bloomshipper.BlockRef{ + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: uint64(bounds.FromFp), + MaxFingerprint: uint64(bounds.ThroughFp), + StartTimestamp: bounds.FromTs, + EndTimestamp: bounds.ThroughTs, + Checksum: checksum, + }, + IndexPath: job.indexPath, + BlockPath: blockPath, }, - IndexPath: job.indexPath, - }, - Data: data, + Data: data, + }) } - return block, nil + return blocks, nil } func createLocalDirName(workingDir string, job Job) string { @@ -162,22 +215,22 @@ func compactNewChunks(ctx context.Context, storeClient chunkClient, builder blockBuilder, limits Limits, -) (bloomshipper.Block, error) { +) ([]bloomshipper.Block, error) { // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). if err := ctx.Err(); err != nil { - return bloomshipper.Block{}, err + return []bloomshipper.Block{}, err } - bloomIter := newLazyBloomBuilder(ctx, job, storeClient, bt, logger, limits) + bloomIter := v1.NewPeekingIter[v1.SeriesWithBloom](newLazyBloomBuilder(ctx, job, storeClient, bt, logger, limits)) // Build and upload bloomBlock to storage - block, err := buildBlockFromBlooms(ctx, logger, builder, bloomIter, job) + blocks, err := buildBlocksFromBlooms(ctx, logger, builder, bloomIter, job) if err != nil { level.Error(logger).Log("msg", "failed building bloomBlocks", "err", err) - return bloomshipper.Block{}, err + return []bloomshipper.Block{}, err } - return block, nil + return blocks, nil } type lazyBloomBuilder struct { diff --git a/pkg/bloomcompactor/chunkcompactor_test.go b/pkg/bloomcompactor/chunkcompactor_test.go index 8bc94fd26537a..a6786c8ebe0d2 100644 --- a/pkg/bloomcompactor/chunkcompactor_test.go +++ b/pkg/bloomcompactor/chunkcompactor_test.go @@ -2,6 +2,7 @@ package bloomcompactor import ( "context" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "io" "testing" "time" @@ -70,8 +71,8 @@ func TestChunkCompactor_CompactNewChunks(t *testing.T) { logger := log.NewNopLogger() label := labels.FromStrings("foo", "bar") fp1 := model.Fingerprint(100) - fp2 := model.Fingerprint(999) - fp3 := model.Fingerprint(200) + fp2 := model.Fingerprint(200) + fp3 := model.Fingerprint(999) chunkRef1 := index.ChunkMeta{ Checksum: 1, @@ -107,23 +108,113 @@ func TestChunkCompactor_CompactNewChunks(t *testing.T) { mbt := mockBloomTokenizer{} mcc := mockChunkClient{} - pbb := mockPersistentBlockBuilder{} - - // Run Compaction - compactedBlock, err := compactNewChunks(context.Background(), logger, job, &mbt, &mcc, &pbb, mockLimits{fpRate: fpRate}) - // Validate Compaction Succeeds - require.NoError(t, err) - require.NotNil(t, compactedBlock) - - // Validate Compacted Block has expected data - require.Equal(t, job.tenantID, compactedBlock.TenantID) - require.Equal(t, job.tableName, compactedBlock.TableName) - require.Equal(t, uint64(fp1), compactedBlock.MinFingerprint) - require.Equal(t, uint64(fp2), compactedBlock.MaxFingerprint) - require.Equal(t, model.Time(chunkRef1.MinTime), compactedBlock.StartTimestamp) - require.Equal(t, model.Time(chunkRef2.MaxTime), compactedBlock.EndTimestamp) - require.Equal(t, indexPath, compactedBlock.IndexPath) + for _, tc := range []struct { + name string + blockSize int + expectedBlocks []bloomshipper.BlockRef + }{ + { + name: "unlimited block size", + blockSize: 0, + expectedBlocks: []bloomshipper.BlockRef{ + { + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: 100, + MaxFingerprint: 999, + StartTimestamp: 1, + EndTimestamp: 999, + }, + }, + }, + }, + { + name: "limited block size", + blockSize: 1024, // Enough to result into two blocks + expectedBlocks: []bloomshipper.BlockRef{ + { + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: 100, + MaxFingerprint: 200, + StartTimestamp: 1, + EndTimestamp: 999, + }, + }, + { + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: 999, + MaxFingerprint: 999, + StartTimestamp: 1, + EndTimestamp: 999, + }, + }, + }, + }, + { + name: "block contains at least one series", + blockSize: 1, + expectedBlocks: []bloomshipper.BlockRef{ + { + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: 100, + MaxFingerprint: 100, + StartTimestamp: 1, + EndTimestamp: 99, + }, + }, + { + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: 200, + MaxFingerprint: 200, + StartTimestamp: 1, + EndTimestamp: 999, + }, + }, + { + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: 999, + MaxFingerprint: 999, + StartTimestamp: 1, + EndTimestamp: 999, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + localDst := createLocalDirName(t.TempDir(), job) + blockOpts := v1.NewBlockOptions(1, 1, tc.blockSize) + pbb, err := NewPersistentBlockBuilder(localDst, blockOpts) + require.NoError(t, err) + + // Run Compaction + compactedBlocks, err := compactNewChunks(context.Background(), logger, job, &mbt, &mcc, pbb, mockLimits{fpRate: fpRate}) + require.NoError(t, err) + require.Len(t, compactedBlocks, len(tc.expectedBlocks)) + + for i, compactedBlock := range compactedBlocks { + require.Equal(t, tc.expectedBlocks[i].TenantID, compactedBlock.Ref.TenantID) + require.Equal(t, tc.expectedBlocks[i].TableName, compactedBlock.Ref.TableName) + require.Equal(t, tc.expectedBlocks[i].MinFingerprint, compactedBlock.Ref.MinFingerprint) + require.Equal(t, tc.expectedBlocks[i].MaxFingerprint, compactedBlock.Ref.MaxFingerprint) + require.Equal(t, tc.expectedBlocks[i].StartTimestamp, compactedBlock.Ref.StartTimestamp) + require.Equal(t, tc.expectedBlocks[i].EndTimestamp, compactedBlock.Ref.EndTimestamp) + require.Equal(t, indexPath, compactedBlock.IndexPath) + } + }) + } } func TestLazyBloomBuilder(t *testing.T) { @@ -220,8 +311,8 @@ func (mcc *mockChunkClient) GetChunks(_ context.Context, chks []chunk.Chunk) ([] type mockPersistentBlockBuilder struct { } -func (pbb *mockPersistentBlockBuilder) BuildFrom(_ v1.Iterator[v1.SeriesWithBloom]) (uint32, error) { - return 0, nil +func (pbb *mockPersistentBlockBuilder) BuildFrom(_ v1.Iterator[v1.SeriesWithBloom]) (string, uint32, v1.SeriesBounds, error) { + return "", 0, v1.SeriesBounds{}, nil } func (pbb *mockPersistentBlockBuilder) Data() (io.ReadSeekCloser, error) { diff --git a/pkg/bloomcompactor/config.go b/pkg/bloomcompactor/config.go index 884034fdd043d..a7281e18bd8fc 100644 --- a/pkg/bloomcompactor/config.go +++ b/pkg/bloomcompactor/config.go @@ -47,4 +47,5 @@ type Limits interface { BloomNGramLength(tenantID string) int BloomNGramSkip(tenantID string) int BloomFalsePositiveRate(tenantID string) float64 + BloomCompactorMaxBlockSize(tenantID string) int } diff --git a/pkg/bloomcompactor/mergecompactor.go b/pkg/bloomcompactor/mergecompactor.go index 6e2143f75135c..afee30d480dfe 100644 --- a/pkg/bloomcompactor/mergecompactor.go +++ b/pkg/bloomcompactor/mergecompactor.go @@ -4,31 +4,36 @@ import ( "context" "fmt" - "github.com/grafana/dskit/concurrency" - - "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/storage/chunk" - "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/concurrency" + + "github.com/grafana/loki/pkg/logproto" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) +func makeChunkRefsFromChunkMetas(chunks index.ChunkMetas) v1.ChunkRefs { + chunkRefs := make(v1.ChunkRefs, 0, len(chunks)) + for _, chk := range chunks { + chunkRefs = append(chunkRefs, v1.ChunkRef{ + Start: chk.From(), + End: chk.Through(), + Checksum: chk.Checksum, + }) + } + return chunkRefs +} + func makeSeriesIterFromSeriesMeta(job Job) *v1.SliceIter[*v1.Series] { // Satisfy types for series seriesFromSeriesMeta := make([]*v1.Series, len(job.seriesMetas)) for i, s := range job.seriesMetas { - crefs := make([]v1.ChunkRef, len(s.chunkRefs)) - for j, chk := range s.chunkRefs { - crefs[j] = v1.ChunkRef{ - Start: chk.From(), - End: chk.Through(), - Checksum: chk.Checksum, - } - } + crefs := makeChunkRefsFromChunkMetas(s.chunkRefs) seriesFromSeriesMeta[i] = &v1.Series{ Fingerprint: s.seriesFP, Chunks: crefs, @@ -108,42 +113,57 @@ func createPopulateFunc(ctx context.Context, job Job, storeClient storeClient, b } } -func mergeCompactChunks(logger log.Logger, +func mergeCompactChunks( + logger log.Logger, populate func(*v1.Series, *v1.Bloom) error, mergeBlockBuilder *PersistentBlockBuilder, - blockIters []v1.PeekingIterator[*v1.SeriesWithBloom], seriesIter *v1.SliceIter[*v1.Series], - job Job) (bloomshipper.Block, error) { + blockIters []v1.PeekingIterator[*v1.SeriesWithBloom], + seriesIter v1.PeekingIterator[*v1.Series], + job Job, +) ([]bloomshipper.Block, error) { mergeBuilder := v1.NewMergeBuilder( blockIters, seriesIter, populate) - checksum, err := mergeBlockBuilder.mergeBuild(mergeBuilder) - if err != nil { - level.Error(logger).Log("msg", "failed merging the blooms", "err", err) - return bloomshipper.Block{}, err - } - data, err := mergeBlockBuilder.Data() - if err != nil { - level.Error(logger).Log("msg", "failed reading bloom data", "err", err) - return bloomshipper.Block{}, err - } + // TODO(salvacorts): Reuse buffer + mergedBlocks := make([]bloomshipper.Block, 0) - mergedBlock := bloomshipper.Block{ - BlockRef: bloomshipper.BlockRef{ - Ref: bloomshipper.Ref{ - TenantID: job.tenantID, - TableName: job.tableName, - MinFingerprint: uint64(job.minFp), - MaxFingerprint: uint64(job.maxFp), - StartTimestamp: job.from, - EndTimestamp: job.through, - Checksum: checksum, + for { + // Merge/Create blocks until there are no more series to process + if _, hasNext := seriesIter.Peek(); !hasNext { + break + } + + blockPath, checksum, bounds, err := mergeBlockBuilder.mergeBuild(mergeBuilder) + if err != nil { + level.Error(logger).Log("msg", "failed merging the blooms", "err", err) + return []bloomshipper.Block{}, err + } + data, err := mergeBlockBuilder.Data() + if err != nil { + level.Error(logger).Log("msg", "failed reading bloom data", "err", err) + return []bloomshipper.Block{}, err + } + + mergedBlocks = append(mergedBlocks, bloomshipper.Block{ + BlockRef: bloomshipper.BlockRef{ + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: uint64(bounds.FromFp), + MaxFingerprint: uint64(bounds.ThroughFp), + StartTimestamp: bounds.FromTs, + EndTimestamp: bounds.ThroughTs, + Checksum: checksum, + }, + IndexPath: job.indexPath, + BlockPath: blockPath, }, - IndexPath: job.indexPath, - }, - Data: data, + Data: data, + }) } - return mergedBlock, nil + + return mergedBlocks, nil } diff --git a/pkg/storage/bloom/v1/block_writer.go b/pkg/storage/bloom/v1/block_writer.go index 99ab65ef9cd40..3c5d7d20e6a95 100644 --- a/pkg/storage/bloom/v1/block_writer.go +++ b/pkg/storage/bloom/v1/block_writer.go @@ -19,6 +19,8 @@ const ( type BlockWriter interface { Index() (io.WriteCloser, error) Blooms() (io.WriteCloser, error) + + // Size returns the sum of the index and bloom files sizes in bytes Size() (int, error) // byte size of accumualted index & blooms } diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 7b5d0dc3d73ff..031ad06331184 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -5,6 +5,7 @@ import ( "fmt" "hash" "io" + "math" "sort" "github.com/pkg/errors" @@ -26,11 +27,12 @@ type BlockOptions struct { type BlockBuilder struct { opts BlockOptions + writer BlockWriter index *IndexBuilder blooms *BloomBlockBuilder } -func NewBlockOptions(NGramLength, NGramSkip uint64) BlockOptions { +func NewBlockOptions(NGramLength, NGramSkip uint64, blockSize int) BlockOptions { return BlockOptions{ schema: Schema{ version: byte(1), @@ -39,6 +41,7 @@ func NewBlockOptions(NGramLength, NGramSkip uint64) BlockOptions { }, SeriesPageSize: 100, BloomPageSize: 10 << 10, // 0.01MB + BlockSize: blockSize, } } @@ -54,6 +57,7 @@ func NewBlockBuilder(opts BlockOptions, writer BlockWriter) (*BlockBuilder, erro return &BlockBuilder{ opts: opts, + writer: writer, index: NewIndexBuilder(opts, index), blooms: NewBloomBlockBuilder(opts, blooms), }, nil @@ -64,26 +68,89 @@ type SeriesWithBloom struct { Bloom *Bloom } -func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, error) { +type updatableBounds SeriesBounds + +func newUpdatableBounds() updatableBounds { + return updatableBounds{ + FromTs: model.Latest, + ThroughTs: model.Earliest, + FromFp: model.Fingerprint(math.MaxInt64), + ThroughFp: model.Fingerprint(0), + } +} + +func (b *updatableBounds) update(series *Series) { + minFrom := model.Latest + maxThrough := model.Earliest + for _, chunk := range series.Chunks { + if minFrom > chunk.Start { + minFrom = chunk.Start + } + if maxThrough < chunk.End { + maxThrough = chunk.End + } + } + + if b.FromTs.After(minFrom) { + b.FromTs = minFrom + } + if b.ThroughTs.Before(maxThrough) { + b.ThroughTs = maxThrough + } + if b.FromFp > series.Fingerprint { + b.FromFp = series.Fingerprint + } + if b.ThroughFp < series.Fingerprint { + b.ThroughFp = series.Fingerprint + } +} + +func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, SeriesBounds, error) { + bounds := newUpdatableBounds() + for itr.Next() { - if err := b.AddSeries(itr.At()); err != nil { - return 0, err + series := itr.At() + if err := b.AddSeries(series); err != nil { + return 0, SeriesBounds{}, err } + bounds.update(series.Series) + + full, err := b.BlockIsFull() + if err != nil { + return 0, SeriesBounds{}, err + } + if full { + break + } } if err := itr.Err(); err != nil { - return 0, errors.Wrap(err, "iterating series with blooms") + return 0, SeriesBounds{}, errors.Wrap(err, "iterating series with blooms") } checksum, err := b.blooms.Close() if err != nil { - return 0, errors.Wrap(err, "closing bloom file") + return 0, SeriesBounds{}, errors.Wrap(err, "closing bloom file") } if err := b.index.Close(); err != nil { - return 0, errors.Wrap(err, "closing series file") + return 0, SeriesBounds{}, errors.Wrap(err, "closing series file") } - return checksum, nil + return checksum, SeriesBounds(bounds), nil +} + +func (b *BlockBuilder) BlockIsFull() (bool, error) { + // if the block size is 0, the max size is unlimited + if b.opts.BlockSize == 0 { + return false, nil + } + + size, err := b.writer.Size() + if err != nil { + return false, errors.Wrap(err, "getting block size") + } + + return size >= b.opts.BlockSize, nil } func (b *BlockBuilder) AddSeries(series SeriesWithBloom) error { @@ -394,10 +461,12 @@ func (b *IndexBuilder) flushPage() error { DecompressedLen: decompressedLen, SeriesHeader: SeriesHeader{ NumSeries: b.page.Count(), - FromFp: b.fromFp, - ThroughFp: b.previousFp, - FromTs: b.fromTs, - ThroughTs: b.throughTs, + SeriesBounds: SeriesBounds{ + FromFp: b.fromFp, + ThroughFp: b.previousFp, + FromTs: b.fromTs, + ThroughTs: b.throughTs, + }, }, } @@ -487,7 +556,7 @@ type MergeBuilder struct { // 1. merges multiple blocks into a single ordered querier, // i) When two blocks have the same series, it will prefer the one with the most chunks already indexed // 2. iterates through the store, adding chunks to the relevant blooms via the `populate` argument -func NewMergeBuilder(blocks []PeekingIterator[*SeriesWithBloom], store Iterator[*Series], populate func(*Series, *Bloom) error) *MergeBuilder { +func NewMergeBuilder(blocks []PeekingIterator[*SeriesWithBloom], store PeekingIterator[*Series], populate func(*Series, *Bloom) error) *MergeBuilder { return &MergeBuilder{ blocks: blocks, store: store, @@ -495,11 +564,16 @@ func NewMergeBuilder(blocks []PeekingIterator[*SeriesWithBloom], store Iterator[ } } +func (mb *MergeBuilder) HasPendingData() bool { + return mb.store.Next() +} + // NB: this will build one block. Ideally we would build multiple blocks once a target size threshold is met // but this gives us a good starting point. -func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { +func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, SeriesBounds, error) { var ( nextInBlocks *SeriesWithBloom + bounds = newUpdatableBounds() ) // Turn the list of blocks into a single iterator that returns the next series @@ -563,21 +637,31 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { }, cur.Bloom, ); err != nil { - return 0, errors.Wrapf(err, "populating bloom for series with fingerprint: %v", nextInStore.Fingerprint) + return 0, SeriesBounds{}, errors.Wrapf(err, "populating bloom for series with fingerprint: %v", nextInStore.Fingerprint) } } if err := builder.AddSeries(*cur); err != nil { - return 0, errors.Wrap(err, "adding series to block") + return 0, SeriesBounds{}, errors.Wrap(err, "adding series to block") + } + + bounds.update(cur.Series) + + full, err := builder.BlockIsFull() + if err != nil { + return 0, SeriesBounds{}, errors.Wrap(err, "checking if block is full") + } + if full { + break } } checksum, err := builder.blooms.Close() if err != nil { - return 0, errors.Wrap(err, "closing bloom file") + return 0, SeriesBounds{}, errors.Wrap(err, "closing bloom file") } if err := builder.index.Close(); err != nil { - return 0, errors.Wrap(err, "closing series file") + return 0, SeriesBounds{}, errors.Wrap(err, "closing series file") } - return checksum, nil + return checksum, SeriesBounds(bounds), nil } diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index fb74e6a1638d3..ec3de6b19f60d 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -204,7 +204,11 @@ func (h *SeriesPageHeaderWithOffset) Decode(dec *encoding.Decbuf) error { } type SeriesHeader struct { - NumSeries int + NumSeries int + SeriesBounds +} + +type SeriesBounds struct { FromFp, ThroughFp model.Fingerprint FromTs, ThroughTs model.Time } @@ -220,8 +224,10 @@ func aggregateHeaders(xs []SeriesHeader) SeriesHeader { } res := SeriesHeader{ - FromFp: xs[0].FromFp, - ThroughFp: xs[len(xs)-1].ThroughFp, + SeriesBounds: SeriesBounds{ + FromFp: xs[0].FromFp, + ThroughFp: xs[len(xs)-1].ThroughFp, + }, } for _, x := range xs { diff --git a/pkg/storage/bloom/v1/test_util.go b/pkg/storage/bloom/v1/test_util.go index 3b706297171b3..8e1cfd0fb19e8 100644 --- a/pkg/storage/bloom/v1/test_util.go +++ b/pkg/storage/bloom/v1/test_util.go @@ -38,7 +38,7 @@ func MakeBlockQuerier(t testing.TB, fromFp, throughFp model.Fingerprint, fromTs, ) require.Nil(t, err) itr := NewSliceIter[SeriesWithBloom](data) - _, err = builder.BuildFrom(itr) + _, _, err = builder.BuildFrom(itr) require.Nil(t, err) block := NewBlock(reader) return NewBlockQuerier(block), data diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 45dd34f201e8d..6f8bfe71e0c8c 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -185,15 +185,16 @@ type Limits struct { BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"` BloomGatewayEnabled bool `yaml:"bloom_gateway_enable_filtering" json:"bloom_gateway_enable_filtering"` - BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"` - BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"` - BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"` - BloomCompactorChunksBatchSize int `yaml:"bloom_compactor_chunks_batch_size" json:"bloom_compactor_chunks_batch_size"` - BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"` - BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"` - BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"` - BloomGatewayBlocksDownloadingParallelism int `yaml:"bloom_gateway_blocks_downloading_parallelism" json:"bloom_gateway_blocks_downloading_parallelism"` - BloomGatewayCacheKeyInterval time.Duration `yaml:"bloom_gateway_cache_key_interval" json:"bloom_gateway_cache_key_interval"` + BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"` + BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"` + BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"` + BloomCompactorChunksBatchSize int `yaml:"bloom_compactor_chunks_batch_size" json:"bloom_compactor_chunks_batch_size"` + BloomCompactorMaxBlockSize flagext.ByteSize `yaml:"bloom_compactor_max_block_size" json:"bloom_compactor_max_block_size"` + BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"` + BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"` + BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"` + BloomGatewayBlocksDownloadingParallelism int `yaml:"bloom_gateway_blocks_downloading_parallelism" json:"bloom_gateway_blocks_downloading_parallelism"` + BloomGatewayCacheKeyInterval time.Duration `yaml:"bloom_gateway_cache_key_interval" json:"bloom_gateway_cache_key_interval"` AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."` MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."` @@ -318,6 +319,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.") f.BoolVar(&l.BloomCompactorEnabled, "bloom-compactor.enable-compaction", false, "Whether to compact chunks into bloom filters.") f.IntVar(&l.BloomCompactorChunksBatchSize, "bloom-compactor.chunks-batch-size", 100, "The batch size of the chunks the bloom-compactor downloads at once.") + f.Var(&l.BloomCompactorMaxBlockSize, "bloom-compactor.max-block-size", "The maximum bloom block size. Default is unlimited. The actual block size might exceed this limit since blooms will be added to blocks until the block exceeds the maximum block size.") f.IntVar(&l.BloomNGramLength, "bloom-compactor.ngram-length", 4, "Length of the n-grams created when computing blooms from log lines.") f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 0, "Skip factor for the n-grams created when computing blooms from log lines.") f.Float64Var(&l.BloomFalsePositiveRate, "bloom-compactor.false-positive-rate", 0.01, "Scalable Bloom Filter desired false-positive rate.") @@ -844,6 +846,10 @@ func (o *Overrides) BloomCompactorChunksBatchSize(userID string) int { return o.getOverridesForUser(userID).BloomCompactorChunksBatchSize } +func (o *Overrides) BloomCompactorMaxBlockSize(userID string) int { + return o.getOverridesForUser(userID).BloomCompactorMaxBlockSize.Val() +} + func (o *Overrides) BloomCompactorShardSize(userID string) int { return o.getOverridesForUser(userID).BloomCompactorShardSize }