From e77de93a67c951fb3d10fca4add0d0ee8105a260 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Mon, 15 Jan 2024 16:08:13 +0100 Subject: [PATCH 1/6] Apply maximum block size limit during bloom compaction --- docs/sources/configure/_index.md | 6 + pkg/bloomcompactor/bloomcompactor.go | 44 ++++--- pkg/bloomcompactor/chunkcompactor.go | 153 +++++++++++++++------- pkg/bloomcompactor/chunkcompactor_test.go | 131 +++++++++++++++--- pkg/bloomcompactor/config.go | 1 + pkg/bloomcompactor/mergecompactor.go | 100 ++++++++------ pkg/storage/bloom/v1/archive_test.go | 2 +- pkg/storage/bloom/v1/block_writer.go | 2 + pkg/storage/bloom/v1/builder.go | 122 ++++++++++++++--- pkg/storage/bloom/v1/builder_test.go | 18 +-- pkg/storage/bloom/v1/fuse_test.go | 4 +- pkg/storage/bloom/v1/index.go | 12 +- pkg/storage/bloom/v1/test_util.go | 2 +- pkg/validation/limits.go | 24 ++-- 14 files changed, 452 insertions(+), 169 deletions(-) diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 51ecb12af62f1..14406ac37cc8c 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -3098,6 +3098,12 @@ shard_streams: # CLI flag: -bloom-compactor.chunks-batch-size [bloom_compactor_chunks_batch_size: | default = 100] +# The maximum bloom block size. Default is unlimited. The actual block size +# might exceed this limit since blooms will be added to blocks until the block +# exceeds the maximum block size. +# CLI flag: -bloom-compactor.max-block-size +[bloom_compactor_max_block_size: | default = 0B] + # Length of the n-grams created when computing blooms from log lines. # CLI flag: -bloom-compactor.ngram-length [bloom_ngram_length: | default = 4] diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index dbe307ff18822..1895b92a6159b 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -504,7 +504,8 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, metasMatchingJob, blocksMatchingJob := matchingBlocks(metas, job) localDst := createLocalDirName(c.cfg.WorkingDirectory, job) - blockOptions := v1.NewBlockOptions(bt.GetNGramLength(), bt.GetNGramSkip()) + maxBlockSize := c.limits.BloomCompactorMaxBlockSize(job.tenantID) + blockOptions := v1.NewBlockOptions(bt.GetNGramLength(), bt.GetNGramSkip(), maxBlockSize) defer func() { //clean up the bloom directory @@ -513,10 +514,12 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, } }() - var resultingBlock bloomshipper.Block + var resultingBlocks []bloomshipper.Block defer func() { - if resultingBlock.Data != nil { - _ = resultingBlock.Data.Close() + for _, resultingBlock := range resultingBlocks { + if resultingBlock.Data != nil { + _ = resultingBlock.Data.Close() + } } }() @@ -535,7 +538,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, return err } - resultingBlock, err = compactNewChunks(ctx, logger, job, bt, storeClient.chunk, builder, c.limits) + resultingBlocks, err = compactNewChunks(ctx, logger, job, bt, storeClient.chunk, builder, c.limits) if err != nil { return level.Error(logger).Log("msg", "failed compacting new chunks", "err", err) } @@ -546,7 +549,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, var populate = createPopulateFunc(ctx, job, storeClient, bt, c.limits) - seriesIter := makeSeriesIterFromSeriesMeta(job) + seriesIter := v1.NewPeekingIter[*v1.Series](makeSeriesIterFromSeriesMeta(job)) blockIters, blockPaths, err := makeBlockIterFromBlocks(ctx, logger, c.bloomShipperClient, blocksMatchingJob, c.cfg.WorkingDirectory) defer func() { @@ -568,7 +571,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, return err } - resultingBlock, err = mergeCompactChunks(logger, populate, mergeBlockBuilder, blockIters, seriesIter, job) + resultingBlocks, err = mergeCompactChunks(logger, populate, mergeBlockBuilder, blockIters, seriesIter, job) if err != nil { level.Error(logger).Log("msg", "failed merging existing blocks with new chunks", "err", err) return err @@ -576,24 +579,33 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, } - archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String()) + level.Debug(logger).Log("msg", "blocks created", "resultingBlocks", len(resultingBlocks)) - blockToUpload, err := bloomshipper.CompressBloomBlock(resultingBlock.BlockRef, archivePath, localDst, logger) - if err != nil { - level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err) - return err + blocksToUpload := make([]bloomshipper.Block, 0, len(resultingBlocks)) + archivesPaths := make([]string, 0, len(resultingBlocks)) + for _, block := range resultingBlocks { + archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String()) + blockToUpload, err := bloomshipper.CompressBloomBlock(block.BlockRef, archivePath, block.BlockRef.BlockPath, logger) + if err != nil { + level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err) + return err + } + blocksToUpload = append(blocksToUpload, blockToUpload) + archivesPaths = append(archivesPaths, archivePath) } defer func() { - err = os.Remove(archivePath) - if err != nil { - level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath) + for _, archivePath := range archivesPaths { + err = os.Remove(archivePath) + if err != nil { + level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath) + } } }() // Do not change the signature of PutBlocks yet. // Once block size is limited potentially, compactNewChunks will return multiple blocks, hence a list is appropriate. - storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blockToUpload}) + storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, blocksToUpload) if err != nil { level.Error(logger).Log("msg", "failed uploading blocks to storage", "err", err) return err diff --git a/pkg/bloomcompactor/chunkcompactor.go b/pkg/bloomcompactor/chunkcompactor.go index c4993ccc62a59..02cc942a603b2 100644 --- a/pkg/bloomcompactor/chunkcompactor.go +++ b/pkg/bloomcompactor/chunkcompactor.go @@ -31,38 +31,73 @@ type chunkClient interface { } type blockBuilder interface { - BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (uint32, error) + // BuildFrom build a block from the given iterator. + // If the data is too large to fit in a single block, the iterator won't be consumed completely. In this case, + // call BuildFrom again with the same iterator to build the next block. + BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (string, uint32, v1.SeriesBounds, error) + // Data returns a reader for the last block built by BuildFrom. Data() (io.ReadSeekCloser, error) } type PersistentBlockBuilder struct { - builder *v1.BlockBuilder - localDst string + blockOptions v1.BlockOptions + lastBlockIdx uint64 + baseLocalDst string + currentBlockPath string } func NewPersistentBlockBuilder(localDst string, blockOptions v1.BlockOptions) (*PersistentBlockBuilder, error) { + builder := PersistentBlockBuilder{ + blockOptions: blockOptions, + baseLocalDst: localDst, + } + + return &builder, nil +} + +func (p *PersistentBlockBuilder) getNextBuilder() (*v1.BlockBuilder, error) { // write bloom to a local dir - b, err := v1.NewBlockBuilder(blockOptions, v1.NewDirectoryBlockWriter(localDst)) + blockPath := filepath.Join(p.baseLocalDst, fmt.Sprintf("%d", p.lastBlockIdx)) + builder, err := v1.NewBlockBuilder(p.blockOptions, v1.NewDirectoryBlockWriter(blockPath)) if err != nil { return nil, err } - builder := PersistentBlockBuilder{ - builder: b, - localDst: localDst, - } - return &builder, nil + p.currentBlockPath = blockPath + p.lastBlockIdx++ + return builder, nil } -func (p *PersistentBlockBuilder) BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (uint32, error) { - return p.builder.BuildFrom(itr) +func (p *PersistentBlockBuilder) BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (string, uint32, v1.SeriesBounds, error) { + + b, err := p.getNextBuilder() + if err != nil { + return "", 0, v1.SeriesBounds{}, err + } + + checksum, bounds, err := b.BuildFrom(itr) + if err != nil { + return "", 0, v1.SeriesBounds{}, err + } + + return p.currentBlockPath, checksum, bounds, nil } -func (p *PersistentBlockBuilder) mergeBuild(builder *v1.MergeBuilder) (uint32, error) { - return builder.Build(p.builder) +func (p *PersistentBlockBuilder) mergeBuild(builder *v1.MergeBuilder) (string, uint32, v1.SeriesBounds, error) { + b, err := p.getNextBuilder() + if err != nil { + return "", 0, v1.SeriesBounds{}, err + } + + checksum, bounds, err := builder.Build(b) + if err != nil { + return "", 0, v1.SeriesBounds{}, err + } + + return p.currentBlockPath, checksum, bounds, nil } func (p *PersistentBlockBuilder) Data() (io.ReadSeekCloser, error) { - blockFile, err := os.Open(filepath.Join(p.localDst, v1.BloomFileName)) + blockFile, err := os.Open(filepath.Join(p.currentBlockPath, v1.BloomFileName)) if err != nil { return nil, err } @@ -87,10 +122,13 @@ func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fing } func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compactorTokenizer, chunks v1.Iterator[[]chunk.Chunk]) (v1.SeriesWithBloom, error) { + chunkRefs := makeChunkRefsFromChunkMetas(seriesMeta.chunkRefs) + // Create a bloom for this series bloomForChks := v1.SeriesWithBloom{ Series: &v1.Series{ Fingerprint: seriesMeta.seriesFP, + Chunks: chunkRefs, }, Bloom: &v1.Bloom{ ScalableBloomFilter: *filter.NewDefaultScalableBloomFilter(fpRate), @@ -106,47 +144,62 @@ func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compa } // TODO Test this when bloom block size check is implemented -func buildBlockFromBlooms( +func buildBlocksFromBlooms( ctx context.Context, logger log.Logger, builder blockBuilder, - blooms v1.Iterator[v1.SeriesWithBloom], + blooms v1.PeekingIterator[v1.SeriesWithBloom], job Job, -) (bloomshipper.Block, error) { +) ([]bloomshipper.Block, error) { // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). if err := ctx.Err(); err != nil { - return bloomshipper.Block{}, err - } - - checksum, err := builder.BuildFrom(blooms) - if err != nil { - level.Error(logger).Log("msg", "failed writing to bloom", "err", err) - return bloomshipper.Block{}, err + return []bloomshipper.Block{}, err } - data, err := builder.Data() - if err != nil { - level.Error(logger).Log("msg", "failed reading bloom data", "err", err) - return bloomshipper.Block{}, err - } - - block := bloomshipper.Block{ - BlockRef: bloomshipper.BlockRef{ - Ref: bloomshipper.Ref{ - TenantID: job.tenantID, - TableName: job.tableName, - MinFingerprint: uint64(job.minFp), - MaxFingerprint: uint64(job.maxFp), - StartTimestamp: job.from, - EndTimestamp: job.through, - Checksum: checksum, + // TODO(salvacorts): Reuse buffer + blocks := make([]bloomshipper.Block, 0) + + for { + // Create blocks until the iterator is empty + if _, hasNext := blooms.Peek(); !hasNext { + break + } + + if err := ctx.Err(); err != nil { + return []bloomshipper.Block{}, err + } + + blockPath, checksum, bounds, err := builder.BuildFrom(blooms) + if err != nil { + level.Error(logger).Log("msg", "failed writing to bloom", "err", err) + return []bloomshipper.Block{}, err + } + + data, err := builder.Data() + if err != nil { + level.Error(logger).Log("msg", "failed reading bloom data", "err", err) + return []bloomshipper.Block{}, err + } + + blocks = append(blocks, bloomshipper.Block{ + BlockRef: bloomshipper.BlockRef{ + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: uint64(bounds.FromFp), + MaxFingerprint: uint64(bounds.ThroughFp), + StartTimestamp: bounds.FromTs, + EndTimestamp: bounds.ThroughTs, + Checksum: checksum, + }, + IndexPath: job.indexPath, + BlockPath: blockPath, }, - IndexPath: job.indexPath, - }, - Data: data, + Data: data, + }) } - return block, nil + return blocks, nil } func createLocalDirName(workingDir string, job Job) string { @@ -162,22 +215,22 @@ func compactNewChunks(ctx context.Context, storeClient chunkClient, builder blockBuilder, limits Limits, -) (bloomshipper.Block, error) { +) ([]bloomshipper.Block, error) { // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). if err := ctx.Err(); err != nil { - return bloomshipper.Block{}, err + return []bloomshipper.Block{}, err } - bloomIter := newLazyBloomBuilder(ctx, job, storeClient, bt, logger, limits) + bloomIter := v1.NewPeekingIter[v1.SeriesWithBloom](newLazyBloomBuilder(ctx, job, storeClient, bt, logger, limits)) // Build and upload bloomBlock to storage - block, err := buildBlockFromBlooms(ctx, logger, builder, bloomIter, job) + blocks, err := buildBlocksFromBlooms(ctx, logger, builder, bloomIter, job) if err != nil { level.Error(logger).Log("msg", "failed building bloomBlocks", "err", err) - return bloomshipper.Block{}, err + return []bloomshipper.Block{}, err } - return block, nil + return blocks, nil } type lazyBloomBuilder struct { diff --git a/pkg/bloomcompactor/chunkcompactor_test.go b/pkg/bloomcompactor/chunkcompactor_test.go index 8bc94fd26537a..a6786c8ebe0d2 100644 --- a/pkg/bloomcompactor/chunkcompactor_test.go +++ b/pkg/bloomcompactor/chunkcompactor_test.go @@ -2,6 +2,7 @@ package bloomcompactor import ( "context" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "io" "testing" "time" @@ -70,8 +71,8 @@ func TestChunkCompactor_CompactNewChunks(t *testing.T) { logger := log.NewNopLogger() label := labels.FromStrings("foo", "bar") fp1 := model.Fingerprint(100) - fp2 := model.Fingerprint(999) - fp3 := model.Fingerprint(200) + fp2 := model.Fingerprint(200) + fp3 := model.Fingerprint(999) chunkRef1 := index.ChunkMeta{ Checksum: 1, @@ -107,23 +108,113 @@ func TestChunkCompactor_CompactNewChunks(t *testing.T) { mbt := mockBloomTokenizer{} mcc := mockChunkClient{} - pbb := mockPersistentBlockBuilder{} - - // Run Compaction - compactedBlock, err := compactNewChunks(context.Background(), logger, job, &mbt, &mcc, &pbb, mockLimits{fpRate: fpRate}) - // Validate Compaction Succeeds - require.NoError(t, err) - require.NotNil(t, compactedBlock) - - // Validate Compacted Block has expected data - require.Equal(t, job.tenantID, compactedBlock.TenantID) - require.Equal(t, job.tableName, compactedBlock.TableName) - require.Equal(t, uint64(fp1), compactedBlock.MinFingerprint) - require.Equal(t, uint64(fp2), compactedBlock.MaxFingerprint) - require.Equal(t, model.Time(chunkRef1.MinTime), compactedBlock.StartTimestamp) - require.Equal(t, model.Time(chunkRef2.MaxTime), compactedBlock.EndTimestamp) - require.Equal(t, indexPath, compactedBlock.IndexPath) + for _, tc := range []struct { + name string + blockSize int + expectedBlocks []bloomshipper.BlockRef + }{ + { + name: "unlimited block size", + blockSize: 0, + expectedBlocks: []bloomshipper.BlockRef{ + { + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: 100, + MaxFingerprint: 999, + StartTimestamp: 1, + EndTimestamp: 999, + }, + }, + }, + }, + { + name: "limited block size", + blockSize: 1024, // Enough to result into two blocks + expectedBlocks: []bloomshipper.BlockRef{ + { + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: 100, + MaxFingerprint: 200, + StartTimestamp: 1, + EndTimestamp: 999, + }, + }, + { + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: 999, + MaxFingerprint: 999, + StartTimestamp: 1, + EndTimestamp: 999, + }, + }, + }, + }, + { + name: "block contains at least one series", + blockSize: 1, + expectedBlocks: []bloomshipper.BlockRef{ + { + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: 100, + MaxFingerprint: 100, + StartTimestamp: 1, + EndTimestamp: 99, + }, + }, + { + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: 200, + MaxFingerprint: 200, + StartTimestamp: 1, + EndTimestamp: 999, + }, + }, + { + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: 999, + MaxFingerprint: 999, + StartTimestamp: 1, + EndTimestamp: 999, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + localDst := createLocalDirName(t.TempDir(), job) + blockOpts := v1.NewBlockOptions(1, 1, tc.blockSize) + pbb, err := NewPersistentBlockBuilder(localDst, blockOpts) + require.NoError(t, err) + + // Run Compaction + compactedBlocks, err := compactNewChunks(context.Background(), logger, job, &mbt, &mcc, pbb, mockLimits{fpRate: fpRate}) + require.NoError(t, err) + require.Len(t, compactedBlocks, len(tc.expectedBlocks)) + + for i, compactedBlock := range compactedBlocks { + require.Equal(t, tc.expectedBlocks[i].TenantID, compactedBlock.Ref.TenantID) + require.Equal(t, tc.expectedBlocks[i].TableName, compactedBlock.Ref.TableName) + require.Equal(t, tc.expectedBlocks[i].MinFingerprint, compactedBlock.Ref.MinFingerprint) + require.Equal(t, tc.expectedBlocks[i].MaxFingerprint, compactedBlock.Ref.MaxFingerprint) + require.Equal(t, tc.expectedBlocks[i].StartTimestamp, compactedBlock.Ref.StartTimestamp) + require.Equal(t, tc.expectedBlocks[i].EndTimestamp, compactedBlock.Ref.EndTimestamp) + require.Equal(t, indexPath, compactedBlock.IndexPath) + } + }) + } } func TestLazyBloomBuilder(t *testing.T) { @@ -220,8 +311,8 @@ func (mcc *mockChunkClient) GetChunks(_ context.Context, chks []chunk.Chunk) ([] type mockPersistentBlockBuilder struct { } -func (pbb *mockPersistentBlockBuilder) BuildFrom(_ v1.Iterator[v1.SeriesWithBloom]) (uint32, error) { - return 0, nil +func (pbb *mockPersistentBlockBuilder) BuildFrom(_ v1.Iterator[v1.SeriesWithBloom]) (string, uint32, v1.SeriesBounds, error) { + return "", 0, v1.SeriesBounds{}, nil } func (pbb *mockPersistentBlockBuilder) Data() (io.ReadSeekCloser, error) { diff --git a/pkg/bloomcompactor/config.go b/pkg/bloomcompactor/config.go index 884034fdd043d..a7281e18bd8fc 100644 --- a/pkg/bloomcompactor/config.go +++ b/pkg/bloomcompactor/config.go @@ -47,4 +47,5 @@ type Limits interface { BloomNGramLength(tenantID string) int BloomNGramSkip(tenantID string) int BloomFalsePositiveRate(tenantID string) float64 + BloomCompactorMaxBlockSize(tenantID string) int } diff --git a/pkg/bloomcompactor/mergecompactor.go b/pkg/bloomcompactor/mergecompactor.go index 6e2143f75135c..afee30d480dfe 100644 --- a/pkg/bloomcompactor/mergecompactor.go +++ b/pkg/bloomcompactor/mergecompactor.go @@ -4,31 +4,36 @@ import ( "context" "fmt" - "github.com/grafana/dskit/concurrency" - - "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/storage/chunk" - "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/grafana/dskit/concurrency" + + "github.com/grafana/loki/pkg/logproto" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) +func makeChunkRefsFromChunkMetas(chunks index.ChunkMetas) v1.ChunkRefs { + chunkRefs := make(v1.ChunkRefs, 0, len(chunks)) + for _, chk := range chunks { + chunkRefs = append(chunkRefs, v1.ChunkRef{ + Start: chk.From(), + End: chk.Through(), + Checksum: chk.Checksum, + }) + } + return chunkRefs +} + func makeSeriesIterFromSeriesMeta(job Job) *v1.SliceIter[*v1.Series] { // Satisfy types for series seriesFromSeriesMeta := make([]*v1.Series, len(job.seriesMetas)) for i, s := range job.seriesMetas { - crefs := make([]v1.ChunkRef, len(s.chunkRefs)) - for j, chk := range s.chunkRefs { - crefs[j] = v1.ChunkRef{ - Start: chk.From(), - End: chk.Through(), - Checksum: chk.Checksum, - } - } + crefs := makeChunkRefsFromChunkMetas(s.chunkRefs) seriesFromSeriesMeta[i] = &v1.Series{ Fingerprint: s.seriesFP, Chunks: crefs, @@ -108,42 +113,57 @@ func createPopulateFunc(ctx context.Context, job Job, storeClient storeClient, b } } -func mergeCompactChunks(logger log.Logger, +func mergeCompactChunks( + logger log.Logger, populate func(*v1.Series, *v1.Bloom) error, mergeBlockBuilder *PersistentBlockBuilder, - blockIters []v1.PeekingIterator[*v1.SeriesWithBloom], seriesIter *v1.SliceIter[*v1.Series], - job Job) (bloomshipper.Block, error) { + blockIters []v1.PeekingIterator[*v1.SeriesWithBloom], + seriesIter v1.PeekingIterator[*v1.Series], + job Job, +) ([]bloomshipper.Block, error) { mergeBuilder := v1.NewMergeBuilder( blockIters, seriesIter, populate) - checksum, err := mergeBlockBuilder.mergeBuild(mergeBuilder) - if err != nil { - level.Error(logger).Log("msg", "failed merging the blooms", "err", err) - return bloomshipper.Block{}, err - } - data, err := mergeBlockBuilder.Data() - if err != nil { - level.Error(logger).Log("msg", "failed reading bloom data", "err", err) - return bloomshipper.Block{}, err - } + // TODO(salvacorts): Reuse buffer + mergedBlocks := make([]bloomshipper.Block, 0) - mergedBlock := bloomshipper.Block{ - BlockRef: bloomshipper.BlockRef{ - Ref: bloomshipper.Ref{ - TenantID: job.tenantID, - TableName: job.tableName, - MinFingerprint: uint64(job.minFp), - MaxFingerprint: uint64(job.maxFp), - StartTimestamp: job.from, - EndTimestamp: job.through, - Checksum: checksum, + for { + // Merge/Create blocks until there are no more series to process + if _, hasNext := seriesIter.Peek(); !hasNext { + break + } + + blockPath, checksum, bounds, err := mergeBlockBuilder.mergeBuild(mergeBuilder) + if err != nil { + level.Error(logger).Log("msg", "failed merging the blooms", "err", err) + return []bloomshipper.Block{}, err + } + data, err := mergeBlockBuilder.Data() + if err != nil { + level.Error(logger).Log("msg", "failed reading bloom data", "err", err) + return []bloomshipper.Block{}, err + } + + mergedBlocks = append(mergedBlocks, bloomshipper.Block{ + BlockRef: bloomshipper.BlockRef{ + Ref: bloomshipper.Ref{ + TenantID: job.tenantID, + TableName: job.tableName, + MinFingerprint: uint64(bounds.FromFp), + MaxFingerprint: uint64(bounds.ThroughFp), + StartTimestamp: bounds.FromTs, + EndTimestamp: bounds.ThroughTs, + Checksum: checksum, + }, + IndexPath: job.indexPath, + BlockPath: blockPath, }, - IndexPath: job.indexPath, - }, - Data: data, + Data: data, + }) } - return mergedBlock, nil + + return mergedBlocks, nil } diff --git a/pkg/storage/bloom/v1/archive_test.go b/pkg/storage/bloom/v1/archive_test.go index 60620c8a5294f..e9541ee07416d 100644 --- a/pkg/storage/bloom/v1/archive_test.go +++ b/pkg/storage/bloom/v1/archive_test.go @@ -33,7 +33,7 @@ func TestArchive(t *testing.T) { require.Nil(t, err) itr := NewSliceIter[SeriesWithBloom](data) - _, err = builder.BuildFrom(itr) + _, _, err = builder.BuildFrom(itr) require.Nil(t, err) reader := NewDirectoryBlockReader(dir1) diff --git a/pkg/storage/bloom/v1/block_writer.go b/pkg/storage/bloom/v1/block_writer.go index 99ab65ef9cd40..3c5d7d20e6a95 100644 --- a/pkg/storage/bloom/v1/block_writer.go +++ b/pkg/storage/bloom/v1/block_writer.go @@ -19,6 +19,8 @@ const ( type BlockWriter interface { Index() (io.WriteCloser, error) Blooms() (io.WriteCloser, error) + + // Size returns the sum of the index and bloom files sizes in bytes Size() (int, error) // byte size of accumualted index & blooms } diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 7b5d0dc3d73ff..031ad06331184 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -5,6 +5,7 @@ import ( "fmt" "hash" "io" + "math" "sort" "github.com/pkg/errors" @@ -26,11 +27,12 @@ type BlockOptions struct { type BlockBuilder struct { opts BlockOptions + writer BlockWriter index *IndexBuilder blooms *BloomBlockBuilder } -func NewBlockOptions(NGramLength, NGramSkip uint64) BlockOptions { +func NewBlockOptions(NGramLength, NGramSkip uint64, blockSize int) BlockOptions { return BlockOptions{ schema: Schema{ version: byte(1), @@ -39,6 +41,7 @@ func NewBlockOptions(NGramLength, NGramSkip uint64) BlockOptions { }, SeriesPageSize: 100, BloomPageSize: 10 << 10, // 0.01MB + BlockSize: blockSize, } } @@ -54,6 +57,7 @@ func NewBlockBuilder(opts BlockOptions, writer BlockWriter) (*BlockBuilder, erro return &BlockBuilder{ opts: opts, + writer: writer, index: NewIndexBuilder(opts, index), blooms: NewBloomBlockBuilder(opts, blooms), }, nil @@ -64,26 +68,89 @@ type SeriesWithBloom struct { Bloom *Bloom } -func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, error) { +type updatableBounds SeriesBounds + +func newUpdatableBounds() updatableBounds { + return updatableBounds{ + FromTs: model.Latest, + ThroughTs: model.Earliest, + FromFp: model.Fingerprint(math.MaxInt64), + ThroughFp: model.Fingerprint(0), + } +} + +func (b *updatableBounds) update(series *Series) { + minFrom := model.Latest + maxThrough := model.Earliest + for _, chunk := range series.Chunks { + if minFrom > chunk.Start { + minFrom = chunk.Start + } + if maxThrough < chunk.End { + maxThrough = chunk.End + } + } + + if b.FromTs.After(minFrom) { + b.FromTs = minFrom + } + if b.ThroughTs.Before(maxThrough) { + b.ThroughTs = maxThrough + } + if b.FromFp > series.Fingerprint { + b.FromFp = series.Fingerprint + } + if b.ThroughFp < series.Fingerprint { + b.ThroughFp = series.Fingerprint + } +} + +func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, SeriesBounds, error) { + bounds := newUpdatableBounds() + for itr.Next() { - if err := b.AddSeries(itr.At()); err != nil { - return 0, err + series := itr.At() + if err := b.AddSeries(series); err != nil { + return 0, SeriesBounds{}, err } + bounds.update(series.Series) + + full, err := b.BlockIsFull() + if err != nil { + return 0, SeriesBounds{}, err + } + if full { + break + } } if err := itr.Err(); err != nil { - return 0, errors.Wrap(err, "iterating series with blooms") + return 0, SeriesBounds{}, errors.Wrap(err, "iterating series with blooms") } checksum, err := b.blooms.Close() if err != nil { - return 0, errors.Wrap(err, "closing bloom file") + return 0, SeriesBounds{}, errors.Wrap(err, "closing bloom file") } if err := b.index.Close(); err != nil { - return 0, errors.Wrap(err, "closing series file") + return 0, SeriesBounds{}, errors.Wrap(err, "closing series file") } - return checksum, nil + return checksum, SeriesBounds(bounds), nil +} + +func (b *BlockBuilder) BlockIsFull() (bool, error) { + // if the block size is 0, the max size is unlimited + if b.opts.BlockSize == 0 { + return false, nil + } + + size, err := b.writer.Size() + if err != nil { + return false, errors.Wrap(err, "getting block size") + } + + return size >= b.opts.BlockSize, nil } func (b *BlockBuilder) AddSeries(series SeriesWithBloom) error { @@ -394,10 +461,12 @@ func (b *IndexBuilder) flushPage() error { DecompressedLen: decompressedLen, SeriesHeader: SeriesHeader{ NumSeries: b.page.Count(), - FromFp: b.fromFp, - ThroughFp: b.previousFp, - FromTs: b.fromTs, - ThroughTs: b.throughTs, + SeriesBounds: SeriesBounds{ + FromFp: b.fromFp, + ThroughFp: b.previousFp, + FromTs: b.fromTs, + ThroughTs: b.throughTs, + }, }, } @@ -487,7 +556,7 @@ type MergeBuilder struct { // 1. merges multiple blocks into a single ordered querier, // i) When two blocks have the same series, it will prefer the one with the most chunks already indexed // 2. iterates through the store, adding chunks to the relevant blooms via the `populate` argument -func NewMergeBuilder(blocks []PeekingIterator[*SeriesWithBloom], store Iterator[*Series], populate func(*Series, *Bloom) error) *MergeBuilder { +func NewMergeBuilder(blocks []PeekingIterator[*SeriesWithBloom], store PeekingIterator[*Series], populate func(*Series, *Bloom) error) *MergeBuilder { return &MergeBuilder{ blocks: blocks, store: store, @@ -495,11 +564,16 @@ func NewMergeBuilder(blocks []PeekingIterator[*SeriesWithBloom], store Iterator[ } } +func (mb *MergeBuilder) HasPendingData() bool { + return mb.store.Next() +} + // NB: this will build one block. Ideally we would build multiple blocks once a target size threshold is met // but this gives us a good starting point. -func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { +func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, SeriesBounds, error) { var ( nextInBlocks *SeriesWithBloom + bounds = newUpdatableBounds() ) // Turn the list of blocks into a single iterator that returns the next series @@ -563,21 +637,31 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { }, cur.Bloom, ); err != nil { - return 0, errors.Wrapf(err, "populating bloom for series with fingerprint: %v", nextInStore.Fingerprint) + return 0, SeriesBounds{}, errors.Wrapf(err, "populating bloom for series with fingerprint: %v", nextInStore.Fingerprint) } } if err := builder.AddSeries(*cur); err != nil { - return 0, errors.Wrap(err, "adding series to block") + return 0, SeriesBounds{}, errors.Wrap(err, "adding series to block") + } + + bounds.update(cur.Series) + + full, err := builder.BlockIsFull() + if err != nil { + return 0, SeriesBounds{}, errors.Wrap(err, "checking if block is full") + } + if full { + break } } checksum, err := builder.blooms.Close() if err != nil { - return 0, errors.Wrap(err, "closing bloom file") + return 0, SeriesBounds{}, errors.Wrap(err, "closing bloom file") } if err := builder.index.Close(); err != nil { - return 0, errors.Wrap(err, "closing series file") + return 0, SeriesBounds{}, errors.Wrap(err, "closing series file") } - return checksum, nil + return checksum, SeriesBounds(bounds), nil } diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 622e076f97b03..8917bbac05dec 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -67,7 +67,7 @@ func TestBlockBuilderRoundTrip(t *testing.T) { require.Nil(t, err) itr := NewSliceIter[SeriesWithBloom](data) - _, err = builder.BuildFrom(itr) + _, _, err = builder.BuildFrom(itr) require.Nil(t, err) block := NewBlock(tc.reader) querier := NewBlockQuerier(block) @@ -146,7 +146,7 @@ func TestMergeBuilder(t *testing.T) { require.Nil(t, err) itr := NewSliceIter[SeriesWithBloom](data[min:max]) - _, err = builder.BuildFrom(itr) + _, _, err = builder.BuildFrom(itr) require.Nil(t, err) blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader)))) } @@ -158,11 +158,13 @@ func TestMergeBuilder(t *testing.T) { // storage should contain references to all the series we ingested, // regardless of block allocation/overlap. - storeItr := NewMapIter[SeriesWithBloom, *Series]( - NewSliceIter[SeriesWithBloom](data), - func(swb SeriesWithBloom) *Series { - return swb.Series - }, + storeItr := NewPeekingIter[*Series]( + NewMapIter[SeriesWithBloom, *Series]( + NewSliceIter[SeriesWithBloom](data), + func(swb SeriesWithBloom) *Series { + return swb.Series + }, + ), ) // Ensure that the merge builder combines all the blocks correctly @@ -178,7 +180,7 @@ func TestMergeBuilder(t *testing.T) { ) require.Nil(t, err) - _, err = mergeBuilder.Build(builder) + _, _, err = mergeBuilder.Build(builder) require.Nil(t, err) block := NewBlock(reader) diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index e784ac0168201..6a70a56621fae 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -35,7 +35,7 @@ func TestFusedQuerier(t *testing.T) { ) require.Nil(t, err) itr := NewSliceIter[SeriesWithBloom](data) - _, err = builder.BuildFrom(itr) + _, _, err = builder.BuildFrom(itr) require.Nil(t, err) block := NewBlock(reader) querier := NewBlockQuerier(block) @@ -127,7 +127,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou ) require.Nil(b, err) itr := NewSliceIter[SeriesWithBloom](data) - _, err = builder.BuildFrom(itr) + _, _, err = builder.BuildFrom(itr) require.Nil(b, err) block := NewBlock(reader) querier := NewBlockQuerier(block) diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index fb74e6a1638d3..ec3de6b19f60d 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -204,7 +204,11 @@ func (h *SeriesPageHeaderWithOffset) Decode(dec *encoding.Decbuf) error { } type SeriesHeader struct { - NumSeries int + NumSeries int + SeriesBounds +} + +type SeriesBounds struct { FromFp, ThroughFp model.Fingerprint FromTs, ThroughTs model.Time } @@ -220,8 +224,10 @@ func aggregateHeaders(xs []SeriesHeader) SeriesHeader { } res := SeriesHeader{ - FromFp: xs[0].FromFp, - ThroughFp: xs[len(xs)-1].ThroughFp, + SeriesBounds: SeriesBounds{ + FromFp: xs[0].FromFp, + ThroughFp: xs[len(xs)-1].ThroughFp, + }, } for _, x := range xs { diff --git a/pkg/storage/bloom/v1/test_util.go b/pkg/storage/bloom/v1/test_util.go index 3b706297171b3..8e1cfd0fb19e8 100644 --- a/pkg/storage/bloom/v1/test_util.go +++ b/pkg/storage/bloom/v1/test_util.go @@ -38,7 +38,7 @@ func MakeBlockQuerier(t testing.TB, fromFp, throughFp model.Fingerprint, fromTs, ) require.Nil(t, err) itr := NewSliceIter[SeriesWithBloom](data) - _, err = builder.BuildFrom(itr) + _, _, err = builder.BuildFrom(itr) require.Nil(t, err) block := NewBlock(reader) return NewBlockQuerier(block), data diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 45dd34f201e8d..6f8bfe71e0c8c 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -185,15 +185,16 @@ type Limits struct { BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"` BloomGatewayEnabled bool `yaml:"bloom_gateway_enable_filtering" json:"bloom_gateway_enable_filtering"` - BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"` - BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"` - BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"` - BloomCompactorChunksBatchSize int `yaml:"bloom_compactor_chunks_batch_size" json:"bloom_compactor_chunks_batch_size"` - BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"` - BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"` - BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"` - BloomGatewayBlocksDownloadingParallelism int `yaml:"bloom_gateway_blocks_downloading_parallelism" json:"bloom_gateway_blocks_downloading_parallelism"` - BloomGatewayCacheKeyInterval time.Duration `yaml:"bloom_gateway_cache_key_interval" json:"bloom_gateway_cache_key_interval"` + BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"` + BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"` + BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"` + BloomCompactorChunksBatchSize int `yaml:"bloom_compactor_chunks_batch_size" json:"bloom_compactor_chunks_batch_size"` + BloomCompactorMaxBlockSize flagext.ByteSize `yaml:"bloom_compactor_max_block_size" json:"bloom_compactor_max_block_size"` + BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"` + BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"` + BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"` + BloomGatewayBlocksDownloadingParallelism int `yaml:"bloom_gateway_blocks_downloading_parallelism" json:"bloom_gateway_blocks_downloading_parallelism"` + BloomGatewayCacheKeyInterval time.Duration `yaml:"bloom_gateway_cache_key_interval" json:"bloom_gateway_cache_key_interval"` AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."` MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."` @@ -318,6 +319,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.") f.BoolVar(&l.BloomCompactorEnabled, "bloom-compactor.enable-compaction", false, "Whether to compact chunks into bloom filters.") f.IntVar(&l.BloomCompactorChunksBatchSize, "bloom-compactor.chunks-batch-size", 100, "The batch size of the chunks the bloom-compactor downloads at once.") + f.Var(&l.BloomCompactorMaxBlockSize, "bloom-compactor.max-block-size", "The maximum bloom block size. Default is unlimited. The actual block size might exceed this limit since blooms will be added to blocks until the block exceeds the maximum block size.") f.IntVar(&l.BloomNGramLength, "bloom-compactor.ngram-length", 4, "Length of the n-grams created when computing blooms from log lines.") f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 0, "Skip factor for the n-grams created when computing blooms from log lines.") f.Float64Var(&l.BloomFalsePositiveRate, "bloom-compactor.false-positive-rate", 0.01, "Scalable Bloom Filter desired false-positive rate.") @@ -844,6 +846,10 @@ func (o *Overrides) BloomCompactorChunksBatchSize(userID string) int { return o.getOverridesForUser(userID).BloomCompactorChunksBatchSize } +func (o *Overrides) BloomCompactorMaxBlockSize(userID string) int { + return o.getOverridesForUser(userID).BloomCompactorMaxBlockSize.Val() +} + func (o *Overrides) BloomCompactorShardSize(userID string) int { return o.getOverridesForUser(userID).BloomCompactorShardSize } From a8d2295ef09c2eced250efee46043ae466e3ddf2 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Tue, 16 Jan 2024 15:16:23 +0100 Subject: [PATCH 2/6] Add more tests --- pkg/bloomcompactor/chunkcompactor.go | 1 - pkg/bloomcompactor/chunkcompactor_test.go | 2 +- pkg/storage/bloom/v1/builder.go | 37 ++-- pkg/storage/bloom/v1/builder_test.go | 254 ++++++++++++++-------- 4 files changed, 175 insertions(+), 119 deletions(-) diff --git a/pkg/bloomcompactor/chunkcompactor.go b/pkg/bloomcompactor/chunkcompactor.go index 02cc942a603b2..050ef7cc5b8ec 100644 --- a/pkg/bloomcompactor/chunkcompactor.go +++ b/pkg/bloomcompactor/chunkcompactor.go @@ -68,7 +68,6 @@ func (p *PersistentBlockBuilder) getNextBuilder() (*v1.BlockBuilder, error) { } func (p *PersistentBlockBuilder) BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (string, uint32, v1.SeriesBounds, error) { - b, err := p.getNextBuilder() if err != nil { return "", 0, v1.SeriesBounds{}, err diff --git a/pkg/bloomcompactor/chunkcompactor_test.go b/pkg/bloomcompactor/chunkcompactor_test.go index a6786c8ebe0d2..71ce5c39194b5 100644 --- a/pkg/bloomcompactor/chunkcompactor_test.go +++ b/pkg/bloomcompactor/chunkcompactor_test.go @@ -2,7 +2,6 @@ package bloomcompactor import ( "context" - "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "io" "testing" "time" @@ -17,6 +16,7 @@ import ( "github.com/grafana/loki/pkg/push" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 031ad06331184..05cf725663437 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -110,17 +110,12 @@ func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, SeriesB for itr.Next() { series := itr.At() - if err := b.AddSeries(series); err != nil { - return 0, SeriesBounds{}, err - } - bounds.update(series.Series) - - full, err := b.BlockIsFull() + blockFull, err := b.AddSeries(series) if err != nil { return 0, SeriesBounds{}, err } - if full { + if blockFull { break } } @@ -139,7 +134,7 @@ func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, SeriesB return checksum, SeriesBounds(bounds), nil } -func (b *BlockBuilder) BlockIsFull() (bool, error) { +func (b *BlockBuilder) blockIsFull() (bool, error) { // if the block size is 0, the max size is unlimited if b.opts.BlockSize == 0 { return false, nil @@ -153,20 +148,25 @@ func (b *BlockBuilder) BlockIsFull() (bool, error) { return size >= b.opts.BlockSize, nil } -func (b *BlockBuilder) AddSeries(series SeriesWithBloom) error { +func (b *BlockBuilder) AddSeries(series SeriesWithBloom) (bool, error) { offset, err := b.blooms.Append(series) if err != nil { - return errors.Wrapf(err, "writing bloom for series %v", series.Series.Fingerprint) + return false, errors.Wrapf(err, "writing bloom for series %v", series.Series.Fingerprint) } if err := b.index.Append(SeriesWithOffset{ Offset: offset, Series: *series.Series, }); err != nil { - return errors.Wrapf(err, "writing index for series %v", series.Series.Fingerprint) + return false, errors.Wrapf(err, "writing index for series %v", series.Series.Fingerprint) } - return nil + full, err := b.blockIsFull() + if err != nil { + return false, errors.Wrap(err, "checking if block is full") + } + + return full, nil } type BloomBlockBuilder struct { @@ -556,7 +556,7 @@ type MergeBuilder struct { // 1. merges multiple blocks into a single ordered querier, // i) When two blocks have the same series, it will prefer the one with the most chunks already indexed // 2. iterates through the store, adding chunks to the relevant blooms via the `populate` argument -func NewMergeBuilder(blocks []PeekingIterator[*SeriesWithBloom], store PeekingIterator[*Series], populate func(*Series, *Bloom) error) *MergeBuilder { +func NewMergeBuilder(blocks []PeekingIterator[*SeriesWithBloom], store Iterator[*Series], populate func(*Series, *Bloom) error) *MergeBuilder { return &MergeBuilder{ blocks: blocks, store: store, @@ -641,17 +641,12 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, SeriesBounds, erro } } - if err := builder.AddSeries(*cur); err != nil { - return 0, SeriesBounds{}, errors.Wrap(err, "adding series to block") - } - bounds.update(cur.Series) - - full, err := builder.BlockIsFull() + blockFull, err := builder.AddSeries(*cur) if err != nil { - return 0, SeriesBounds{}, errors.Wrap(err, "checking if block is full") + return 0, SeriesBounds{}, errors.Wrap(err, "adding series to block") } - if full { + if blockFull { break } } diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 8917bbac05dec..98e00344bc5d3 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -33,9 +33,11 @@ func TestBlockBuilderRoundTrip(t *testing.T) { tmpDir := t.TempDir() for _, tc := range []struct { - desc string - writer BlockWriter - reader BlockReader + desc string + writer BlockWriter + reader BlockReader + maxBlockSize int + iterHasPendingData bool }{ { desc: "in-memory", @@ -47,6 +49,14 @@ func TestBlockBuilderRoundTrip(t *testing.T) { writer: NewDirectoryBlockWriter(tmpDir), reader: NewDirectoryBlockReader(tmpDir), }, + { + desc: "max block size", + writer: NewDirectoryBlockWriter(tmpDir), + reader: NewDirectoryBlockReader(tmpDir), + // Set max block big enough to fit a bunch of series + maxBlockSize: 500 << 10, + iterHasPendingData: true, + }, } { t.Run(tc.desc, func(t *testing.T) { schema := Schema{ @@ -61,14 +71,31 @@ func TestBlockBuilderRoundTrip(t *testing.T) { schema: schema, SeriesPageSize: 100, BloomPageSize: 10 << 10, + BlockSize: tc.maxBlockSize, }, tc.writer, ) require.Nil(t, err) - itr := NewSliceIter[SeriesWithBloom](data) + itr := NewPeekingIter[SeriesWithBloom](NewSliceIter[SeriesWithBloom](data)) _, _, err = builder.BuildFrom(itr) require.Nil(t, err) + + firstPendingSeries, iterHasPendingData := itr.Peek() + require.Equal(t, tc.iterHasPendingData, iterHasPendingData) + + processedData := data + if iterHasPendingData { + // If iter is not consumed, cut the data to the first pending series + processedData = make([]SeriesWithBloom, 0, len(data)) + for _, d := range data { + if d.Series.Fingerprint >= firstPendingSeries.Series.Fingerprint { + break + } + processedData = append(processedData, d) + } + } + block := NewBlock(tc.reader) querier := NewBlockQuerier(block) @@ -76,10 +103,11 @@ func TestBlockBuilderRoundTrip(t *testing.T) { require.Nil(t, err) require.Equal(t, block.blooms.schema, schema) - for i := 0; i < len(data); i++ { + // Check processed data can be queried + for i := 0; i < len(processedData); i++ { require.Equal(t, true, querier.Next(), "on iteration %d with error %v", i, querier.Err()) got := querier.At() - require.Equal(t, data[i].Series, got.Series) + require.Equal(t, processedData[i].Series, got.Series) for _, key := range keys[i] { require.True(t, got.Bloom.Test(key)) } @@ -89,109 +117,143 @@ func TestBlockBuilderRoundTrip(t *testing.T) { require.False(t, querier.Next()) // test seek - i := numSeries / 2 - halfData := data[i:] - halfKeys := keys[i:] - require.Nil(t, querier.Seek(halfData[0].Series.Fingerprint)) - for j := 0; j < len(halfData); j++ { - require.Equal(t, true, querier.Next(), "on iteration %d", j) - got := querier.At() - require.Equal(t, halfData[j].Series, got.Series) - for _, key := range halfKeys[j] { - require.True(t, got.Bloom.Test(key)) + if !iterHasPendingData { + i := numSeries / 2 + halfData := data[i:] + halfKeys := keys[i:] + require.Nil(t, querier.Seek(halfData[0].Series.Fingerprint)) + for j := 0; j < len(halfData); j++ { + require.Equal(t, true, querier.Next(), "on iteration %d", j) + got := querier.At() + require.Equal(t, halfData[j].Series, got.Series) + for _, key := range halfKeys[j] { + require.True(t, got.Bloom.Test(key)) + } + require.NoError(t, querier.Err()) } - require.NoError(t, querier.Err()) + require.False(t, querier.Next()) } - require.False(t, querier.Next()) - }) } } func TestMergeBuilder(t *testing.T) { - - nBlocks := 10 - numSeries := 100 - numKeysPerSeries := 100 - blocks := make([]PeekingIterator[*SeriesWithBloom], 0, nBlocks) - data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) - blockOpts := BlockOptions{ - schema: Schema{ - version: DefaultSchemaVersion, - encoding: chunkenc.EncSnappy, + for _, tc := range []struct { + desc string + maxBlockSize int + iterHasPendingData bool + }{ + { + desc: "no block limit", }, - SeriesPageSize: 100, - BloomPageSize: 10 << 10, - } + { + desc: "max block size", + // Set max block big enough to fit a bunch of series + maxBlockSize: 10 << 10, + iterHasPendingData: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + nBlocks := 10 + numSeries := 100 + numKeysPerSeries := 100 + blocks := make([]PeekingIterator[*SeriesWithBloom], 0, nBlocks) + data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) + blockOpts := BlockOptions{ + schema: Schema{ + version: DefaultSchemaVersion, + encoding: chunkenc.EncSnappy, + }, + SeriesPageSize: 100, + BloomPageSize: 10 << 10, + BlockSize: tc.maxBlockSize, + } - // Build a list of blocks containing overlapping & duplicated parts of the dataset - for i := 0; i < nBlocks; i++ { - // references for linking in memory reader+writer - indexBuf := bytes.NewBuffer(nil) - bloomsBuf := bytes.NewBuffer(nil) - - min := i * numSeries / nBlocks - max := (i + 2) * numSeries / nBlocks // allow some overlap - if max > len(data) { - max = len(data) - } - - writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) - reader := NewByteReader(indexBuf, bloomsBuf) - - builder, err := NewBlockBuilder( - blockOpts, - writer, - ) - - require.Nil(t, err) - itr := NewSliceIter[SeriesWithBloom](data[min:max]) - _, _, err = builder.BuildFrom(itr) - require.Nil(t, err) - blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader)))) - } + // Build a list of blocks containing overlapping & duplicated parts of the dataset + for i := 0; i < nBlocks; i++ { + // references for linking in memory reader+writer + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) - // We're not testing the ability to extend a bloom in this test - pop := func(_ *Series, _ *Bloom) error { - return errors.New("not implemented") - } + min := i * numSeries / nBlocks + max := (i + 2) * numSeries / nBlocks // allow some overlap + if max > len(data) { + max = len(data) + } - // storage should contain references to all the series we ingested, - // regardless of block allocation/overlap. - storeItr := NewPeekingIter[*Series]( - NewMapIter[SeriesWithBloom, *Series]( - NewSliceIter[SeriesWithBloom](data), - func(swb SeriesWithBloom) *Series { - return swb.Series - }, - ), - ) - - // Ensure that the merge builder combines all the blocks correctly - mergeBuilder := NewMergeBuilder(blocks, storeItr, pop) - indexBuf := bytes.NewBuffer(nil) - bloomsBuf := bytes.NewBuffer(nil) - writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) - reader := NewByteReader(indexBuf, bloomsBuf) + writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) + reader := NewByteReader(indexBuf, bloomsBuf) - builder, err := NewBlockBuilder( - blockOpts, - writer, - ) - require.Nil(t, err) + builder, err := NewBlockBuilder( + blockOpts, + writer, + ) - _, _, err = mergeBuilder.Build(builder) - require.Nil(t, err) + require.Nil(t, err) + itr := NewSliceIter[SeriesWithBloom](data[min:max]) + _, _, err = builder.BuildFrom(itr) + require.Nil(t, err) + blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader)))) + } - block := NewBlock(reader) - querier := NewBlockQuerier(block) + // We're not testing the ability to extend a bloom in this test + pop := func(_ *Series, _ *Bloom) error { + return errors.New("not implemented") + } - EqualIterators[*SeriesWithBloom]( - t, - func(a, b *SeriesWithBloom) { - require.Equal(t, a.Series, b.Series, "expected %+v, got %+v", a, b) - }, - NewSliceIter[*SeriesWithBloom](PointerSlice(data)), - querier, - ) + // storage should contain references to all the series we ingested, + // regardless of block allocation/overlap. + storeItr := NewPeekingIter[*Series]( + NewMapIter[SeriesWithBloom, *Series]( + NewSliceIter[SeriesWithBloom](data), + func(swb SeriesWithBloom) *Series { + return swb.Series + }, + ), + ) + + // Ensure that the merge builder combines all the blocks correctly + mergeBuilder := NewMergeBuilder(blocks, storeItr, pop) + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) + reader := NewByteReader(indexBuf, bloomsBuf) + + builder, err := NewBlockBuilder( + blockOpts, + writer, + ) + require.Nil(t, err) + + _, _, err = mergeBuilder.Build(builder) + require.Nil(t, err) + + firstPendingSeries, iterHasPendingData := storeItr.Peek() + require.Equal(t, tc.iterHasPendingData, iterHasPendingData) + + processedData := data + if iterHasPendingData { + // If iter is not consumed, cut the data to the first pending series + processedData = make([]SeriesWithBloom, 0, len(data)) + for _, d := range data { + if d.Series.Fingerprint >= firstPendingSeries.Fingerprint { + break + } + processedData = append(processedData, d) + } + } + + block := NewBlock(reader) + querier := NewBlockQuerier(block) + + EqualIterators[*SeriesWithBloom]( + t, + func(a, b *SeriesWithBloom) { + require.Equal(t, a.Series, b.Series, "expected %+v, got %+v", a, b) + }, + NewSliceIter[*SeriesWithBloom](PointerSlice(processedData)), + querier, + ) + }) + } } From b013546e86f1da4a8289619c1e893cf71bbfa203 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 17 Jan 2024 12:22:43 +0100 Subject: [PATCH 3/6] Add some debugging logs --- pkg/bloomcompactor/chunkcompactor.go | 17 +++++++++++++++-- pkg/bloomcompactor/mergecompactor.go | 17 +++++++++++++++-- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/pkg/bloomcompactor/chunkcompactor.go b/pkg/bloomcompactor/chunkcompactor.go index 050ef7cc5b8ec..fca7959852c3d 100644 --- a/pkg/bloomcompactor/chunkcompactor.go +++ b/pkg/bloomcompactor/chunkcompactor.go @@ -180,7 +180,7 @@ func buildBlocksFromBlooms( return []bloomshipper.Block{}, err } - blocks = append(blocks, bloomshipper.Block{ + block := bloomshipper.Block{ BlockRef: bloomshipper.BlockRef{ Ref: bloomshipper.Ref{ TenantID: job.tenantID, @@ -195,7 +195,20 @@ func buildBlocksFromBlooms( BlockPath: blockPath, }, Data: data, - }) + } + blocks = append(blocks, block) + level.Debug(logger).Log( + "msg", "built bloom block", + "TenantID", block.TenantID, + "MinFingerprint", block.MinFingerprint, + "MaxFingerprint", block.MaxFingerprint, + "StartTimestamp", block.StartTimestamp, + "EndTimestamp", block.EndTimestamp, + "Checksum", block.Checksum, + "TableName", block.TableName, + "IndexPath", block.IndexPath, + "BlockPath", block.BlockPath, + ) } return blocks, nil diff --git a/pkg/bloomcompactor/mergecompactor.go b/pkg/bloomcompactor/mergecompactor.go index afee30d480dfe..97c96540b16cd 100644 --- a/pkg/bloomcompactor/mergecompactor.go +++ b/pkg/bloomcompactor/mergecompactor.go @@ -147,7 +147,7 @@ func mergeCompactChunks( return []bloomshipper.Block{}, err } - mergedBlocks = append(mergedBlocks, bloomshipper.Block{ + mergedBlock := bloomshipper.Block{ BlockRef: bloomshipper.BlockRef{ Ref: bloomshipper.Ref{ TenantID: job.tenantID, @@ -162,7 +162,20 @@ func mergeCompactChunks( BlockPath: blockPath, }, Data: data, - }) + } + mergedBlocks = append(mergedBlocks, mergedBlock) + level.Debug(logger).Log( + "msg", "merged bloom block", + "TenantID", mergedBlock.TenantID, + "MinFingerprint", mergedBlock.MinFingerprint, + "MaxFingerprint", mergedBlock.MaxFingerprint, + "StartTimestamp", mergedBlock.StartTimestamp, + "EndTimestamp", mergedBlock.EndTimestamp, + "Checksum", mergedBlock.Checksum, + "TableName", mergedBlock.TableName, + "IndexPath", mergedBlock.IndexPath, + "BlockPath", mergedBlock.BlockPath, + ) } return mergedBlocks, nil From 373c9fd7d1a0adcdad706b6362c8d882717d024a Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Thu, 18 Jan 2024 11:43:55 +0100 Subject: [PATCH 4/6] Sort series --- pkg/bloomcompactor/bloomcompactor.go | 12 ++++++++++++ pkg/bloomcompactor/chunkcompactor.go | 4 ++-- pkg/storage/bloom/v1/builder.go | 2 +- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 1895b92a6159b..7c286cd4ed2a2 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -27,6 +27,7 @@ package bloomcompactor import ( "context" "fmt" + "golang.org/x/exp/slices" "math" "math/rand" "os" @@ -423,6 +424,17 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto return nil } + // Sort seriesMetas by fingerprint to avoid overlapping blocks + slices.SortFunc(seriesMetas, func(a, b seriesMeta) int { + if a.seriesFP < b.seriesFP { + return -1 + } + if a.seriesFP > b.seriesFP { + return 1 + } + return 0 + }) + job := NewJob(tenant, tableName, idx.Path(), seriesMetas) jobLogger := log.With(logger, "job", job.String()) c.metrics.compactionRunJobStarted.Inc() diff --git a/pkg/bloomcompactor/chunkcompactor.go b/pkg/bloomcompactor/chunkcompactor.go index fca7959852c3d..58a147498ff20 100644 --- a/pkg/bloomcompactor/chunkcompactor.go +++ b/pkg/bloomcompactor/chunkcompactor.go @@ -200,8 +200,8 @@ func buildBlocksFromBlooms( level.Debug(logger).Log( "msg", "built bloom block", "TenantID", block.TenantID, - "MinFingerprint", block.MinFingerprint, - "MaxFingerprint", block.MaxFingerprint, + "MinFingerprint", fmt.Sprintf("%x", block.MinFingerprint), + "MaxFingerprint", fmt.Sprintf("%x", block.MaxFingerprint), "StartTimestamp", block.StartTimestamp, "EndTimestamp", block.EndTimestamp, "Checksum", block.Checksum, diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 05cf725663437..1f39668342707 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -74,7 +74,7 @@ func newUpdatableBounds() updatableBounds { return updatableBounds{ FromTs: model.Latest, ThroughTs: model.Earliest, - FromFp: model.Fingerprint(math.MaxInt64), + FromFp: model.Fingerprint(math.MaxUint64), ThroughFp: model.Fingerprint(0), } } From aad748e44323a77dc954e85a083b286bb25535d1 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Thu, 18 Jan 2024 12:47:01 +0100 Subject: [PATCH 5/6] PR feedback --- docs/sources/configure/_index.md | 8 +-- pkg/bloomcompactor/bloomcompactor.go | 24 ++++---- pkg/bloomcompactor/chunkcompactor.go | 15 ++--- pkg/bloomcompactor/chunkcompactor_test.go | 17 ++++-- pkg/bloomcompactor/job.go | 68 ++++++----------------- pkg/bloomcompactor/mergecompactor.go | 16 ++++-- pkg/storage/bloom/v1/builder.go | 20 +++---- pkg/validation/limits.go | 5 +- 8 files changed, 74 insertions(+), 99 deletions(-) diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 14406ac37cc8c..21f647433439e 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -3098,11 +3098,11 @@ shard_streams: # CLI flag: -bloom-compactor.chunks-batch-size [bloom_compactor_chunks_batch_size: | default = 100] -# The maximum bloom block size. Default is unlimited. The actual block size -# might exceed this limit since blooms will be added to blocks until the block -# exceeds the maximum block size. +# The maximum bloom block size. A value of 0 sets an unlimited size. Default is +# 200MB. The actual block size might exceed this limit since blooms will be +# added to blocks until the block exceeds the maximum block size. # CLI flag: -bloom-compactor.max-block-size -[bloom_compactor_max_block_size: | default = 0B] +[bloom_compactor_max_block_size: | default = 200MB] # Length of the n-grams created when computing blooms from log lines. # CLI flag: -bloom-compactor.ngram-length diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 7c286cd4ed2a2..a13fa4096aa83 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -497,10 +497,10 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, } metaSearchParams := bloomshipper.MetaSearchParams{ TenantID: job.tenantID, - MinFingerprint: job.minFp, - MaxFingerprint: job.maxFp, - StartTimestamp: job.from, - EndTimestamp: job.through, + MinFingerprint: job.FromFp, + MaxFingerprint: job.ThroughFp, + StartTimestamp: job.FromTs, + EndTimestamp: job.ThroughTs, } var metas []bloomshipper.Meta //TODO Configure pool for these to avoid allocations @@ -544,7 +544,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, // No matching existing blocks for this job, compact all series from scratch level.Info(logger).Log("msg", "No matching existing blocks for this job, compact all series from scratch") - builder, err := NewPersistentBlockBuilder(localDst, blockOptions) + builder := NewPersistentBlockBuilder(localDst, blockOptions) if err != nil { level.Error(logger).Log("msg", "failed creating block builder", "err", err) return err @@ -561,7 +561,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, var populate = createPopulateFunc(ctx, job, storeClient, bt, c.limits) - seriesIter := v1.NewPeekingIter[*v1.Series](makeSeriesIterFromSeriesMeta(job)) + seriesIter := makeSeriesIterFromSeriesMeta(job) blockIters, blockPaths, err := makeBlockIterFromBlocks(ctx, logger, c.bloomShipperClient, blocksMatchingJob, c.cfg.WorkingDirectory) defer func() { @@ -577,13 +577,13 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, return err } - mergeBlockBuilder, err := NewPersistentBlockBuilder(localDst, blockOptions) + mergeBlockBuilder := NewPersistentBlockBuilder(localDst, blockOptions) if err != nil { level.Error(logger).Log("msg", "failed creating block builder", "err", err) return err } - resultingBlocks, err = mergeCompactChunks(logger, populate, mergeBlockBuilder, blockIters, seriesIter, job) + resultingBlocks, err = mergeCompactChunks(ctx, logger, populate, mergeBlockBuilder, blockIters, seriesIter, job) if err != nil { level.Error(logger).Log("msg", "failed merging existing blocks with new chunks", "err", err) return err @@ -635,10 +635,10 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, Ref: bloomshipper.Ref{ TenantID: job.tenantID, TableName: job.tableName, - MinFingerprint: uint64(job.minFp), - MaxFingerprint: uint64(job.maxFp), - StartTimestamp: job.from, - EndTimestamp: job.through, + MinFingerprint: uint64(job.FromFp), + MaxFingerprint: uint64(job.ThroughFp), + StartTimestamp: job.FromTs, + EndTimestamp: job.ThroughTs, Checksum: rand.Uint32(), // Discuss if checksum is needed for Metas, why should we read all data again. }, }, diff --git a/pkg/bloomcompactor/chunkcompactor.go b/pkg/bloomcompactor/chunkcompactor.go index 58a147498ff20..b3c9fbface452 100644 --- a/pkg/bloomcompactor/chunkcompactor.go +++ b/pkg/bloomcompactor/chunkcompactor.go @@ -3,6 +3,7 @@ package bloomcompactor import ( "context" "fmt" + "github.com/pkg/errors" "io" "os" "path/filepath" @@ -46,13 +47,13 @@ type PersistentBlockBuilder struct { currentBlockPath string } -func NewPersistentBlockBuilder(localDst string, blockOptions v1.BlockOptions) (*PersistentBlockBuilder, error) { +func NewPersistentBlockBuilder(localDst string, blockOptions v1.BlockOptions) *PersistentBlockBuilder { builder := PersistentBlockBuilder{ blockOptions: blockOptions, baseLocalDst: localDst, } - return &builder, nil + return &builder } func (p *PersistentBlockBuilder) getNextBuilder() (*v1.BlockBuilder, error) { @@ -121,13 +122,10 @@ func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fing } func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compactorTokenizer, chunks v1.Iterator[[]chunk.Chunk]) (v1.SeriesWithBloom, error) { - chunkRefs := makeChunkRefsFromChunkMetas(seriesMeta.chunkRefs) - // Create a bloom for this series bloomForChks := v1.SeriesWithBloom{ Series: &v1.Series{ Fingerprint: seriesMeta.seriesFP, - Chunks: chunkRefs, }, Bloom: &v1.Bloom{ ScalableBloomFilter: *filter.NewDefaultScalableBloomFilter(fpRate), @@ -170,14 +168,13 @@ func buildBlocksFromBlooms( blockPath, checksum, bounds, err := builder.BuildFrom(blooms) if err != nil { - level.Error(logger).Log("msg", "failed writing to bloom", "err", err) - return []bloomshipper.Block{}, err + return []bloomshipper.Block{}, errors.Wrap(err, "failed to write bloom") } data, err := builder.Data() if err != nil { level.Error(logger).Log("msg", "failed reading bloom data", "err", err) - return []bloomshipper.Block{}, err + return []bloomshipper.Block{}, errors.Wrap(err, "failed to read bloom") } block := bloomshipper.Block{ @@ -215,7 +212,7 @@ func buildBlocksFromBlooms( } func createLocalDirName(workingDir string, job Job) string { - dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%d-%d-%s", job.tableName, job.tenantID, job.minFp, job.maxFp, job.from, job.through, uuid.New().String()) + dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%d-%d-%s", job.tableName, job.tenantID, job.FromFp, job.ThroughFp, job.FromTs, job.ThroughTs, uuid.New().String()) return filepath.Join(workingDir, dir) } diff --git a/pkg/bloomcompactor/chunkcompactor_test.go b/pkg/bloomcompactor/chunkcompactor_test.go index 71ce5c39194b5..585155c9369e0 100644 --- a/pkg/bloomcompactor/chunkcompactor_test.go +++ b/pkg/bloomcompactor/chunkcompactor_test.go @@ -196,8 +196,7 @@ func TestChunkCompactor_CompactNewChunks(t *testing.T) { t.Run(tc.name, func(t *testing.T) { localDst := createLocalDirName(t.TempDir(), job) blockOpts := v1.NewBlockOptions(1, 1, tc.blockSize) - pbb, err := NewPersistentBlockBuilder(localDst, blockOpts) - require.NoError(t, err) + pbb := NewPersistentBlockBuilder(localDst, blockOpts) // Run Compaction compactedBlocks, err := compactNewChunks(context.Background(), logger, job, &mbt, &mcc, pbb, mockLimits{fpRate: fpRate}) @@ -290,9 +289,17 @@ type mockBloomTokenizer struct { chunks []chunk.Chunk } -func (mbt *mockBloomTokenizer) PopulateSeriesWithBloom(_ *v1.SeriesWithBloom, c v1.Iterator[[]chunk.Chunk]) error { +func (mbt *mockBloomTokenizer) PopulateSeriesWithBloom(s *v1.SeriesWithBloom, c v1.Iterator[[]chunk.Chunk]) error { for c.Next() { - mbt.chunks = append(mbt.chunks, c.At()...) + chunks := c.At() + for _, chk := range chunks { + s.Series.Chunks = append(s.Series.Chunks, v1.ChunkRef{ + Start: chk.From, + End: chk.Through, + Checksum: chk.Checksum, + }) + mbt.chunks = append(mbt.chunks, chk) + } } return nil } @@ -305,7 +312,7 @@ type mockChunkClient struct { func (mcc *mockChunkClient) GetChunks(_ context.Context, chks []chunk.Chunk) ([]chunk.Chunk, error) { mcc.requestCount++ mcc.chunkCount += len(chks) - return nil, nil + return chks, nil } type mockPersistentBlockBuilder struct { diff --git a/pkg/bloomcompactor/job.go b/pkg/bloomcompactor/job.go index bd43293c73cb6..c31caa9e24938 100644 --- a/pkg/bloomcompactor/job.go +++ b/pkg/bloomcompactor/job.go @@ -1,11 +1,10 @@ package bloomcompactor import ( - "math" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) @@ -18,10 +17,7 @@ type seriesMeta struct { type Job struct { tableName, tenantID, indexPath string seriesMetas []seriesMeta - - // We compute them lazily. Unset value is 0. - from, through model.Time - minFp, maxFp model.Fingerprint + v1.SeriesBounds } // NewJob returns a new compaction Job. @@ -31,55 +27,23 @@ func NewJob( indexPath string, seriesMetas []seriesMeta, ) Job { - j := Job{ - tenantID: tenantID, - tableName: tableName, - indexPath: indexPath, - seriesMetas: seriesMetas, + bounds := v1.NewUpdatableBounds() + for _, series := range seriesMetas { + bounds.Update(v1.Series{ + Fingerprint: series.seriesFP, + Chunks: makeChunkRefsFromChunkMetas(series.chunkRefs), + }) + } + + return Job{ + tenantID: tenantID, + tableName: tableName, + indexPath: indexPath, + seriesMetas: seriesMetas, + SeriesBounds: v1.SeriesBounds(bounds), } - j.computeBounds() - return j } func (j *Job) String() string { return j.tableName + "_" + j.tenantID + "_" } - -func (j *Job) computeBounds() { - if len(j.seriesMetas) == 0 { - return - } - - minFrom := model.Latest - maxThrough := model.Earliest - - minFp := model.Fingerprint(math.MaxInt64) - maxFp := model.Fingerprint(0) - - for _, seriesMeta := range j.seriesMetas { - // calculate timestamp boundaries - for _, chunkRef := range seriesMeta.chunkRefs { - from, through := chunkRef.Bounds() - if minFrom > from { - minFrom = from - } - if maxThrough < through { - maxThrough = through - } - } - - // calculate fingerprint boundaries - if minFp > seriesMeta.seriesFP { - minFp = seriesMeta.seriesFP - } - if maxFp < seriesMeta.seriesFP { - maxFp = seriesMeta.seriesFP - } - } - - j.from = minFrom - j.through = maxThrough - - j.minFp = minFp - j.maxFp = maxFp -} diff --git a/pkg/bloomcompactor/mergecompactor.go b/pkg/bloomcompactor/mergecompactor.go index 97c96540b16cd..cb0cf77fb7bc0 100644 --- a/pkg/bloomcompactor/mergecompactor.go +++ b/pkg/bloomcompactor/mergecompactor.go @@ -3,6 +3,7 @@ package bloomcompactor import ( "context" "fmt" + "github.com/pkg/errors" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -28,7 +29,7 @@ func makeChunkRefsFromChunkMetas(chunks index.ChunkMetas) v1.ChunkRefs { return chunkRefs } -func makeSeriesIterFromSeriesMeta(job Job) *v1.SliceIter[*v1.Series] { +func makeSeriesIterFromSeriesMeta(job Job) *v1.PeekIter[*v1.Series] { // Satisfy types for series seriesFromSeriesMeta := make([]*v1.Series, len(job.seriesMetas)) @@ -39,7 +40,7 @@ func makeSeriesIterFromSeriesMeta(job Job) *v1.SliceIter[*v1.Series] { Chunks: crefs, } } - return v1.NewSliceIter(seriesFromSeriesMeta) + return v1.NewPeekingIter[*v1.Series](v1.NewSliceIter(seriesFromSeriesMeta)) } func makeBlockIterFromBlocks(ctx context.Context, logger log.Logger, @@ -114,6 +115,7 @@ func createPopulateFunc(ctx context.Context, job Job, storeClient storeClient, b } func mergeCompactChunks( + ctx context.Context, logger log.Logger, populate func(*v1.Series, *v1.Bloom) error, mergeBlockBuilder *PersistentBlockBuilder, @@ -136,15 +138,17 @@ func mergeCompactChunks( break } + if err := ctx.Err(); err != nil { + return []bloomshipper.Block{}, err + } + blockPath, checksum, bounds, err := mergeBlockBuilder.mergeBuild(mergeBuilder) if err != nil { - level.Error(logger).Log("msg", "failed merging the blooms", "err", err) - return []bloomshipper.Block{}, err + return []bloomshipper.Block{}, errors.Wrap(err, "failed to merge blooms") } data, err := mergeBlockBuilder.Data() if err != nil { - level.Error(logger).Log("msg", "failed reading bloom data", "err", err) - return []bloomshipper.Block{}, err + return []bloomshipper.Block{}, errors.Wrap(err, "failed to read blooms") } mergedBlock := bloomshipper.Block{ diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 1f39668342707..a7b23f37e4c6e 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -68,10 +68,10 @@ type SeriesWithBloom struct { Bloom *Bloom } -type updatableBounds SeriesBounds +type UpdatableBounds SeriesBounds -func newUpdatableBounds() updatableBounds { - return updatableBounds{ +func NewUpdatableBounds() UpdatableBounds { + return UpdatableBounds{ FromTs: model.Latest, ThroughTs: model.Earliest, FromFp: model.Fingerprint(math.MaxUint64), @@ -79,7 +79,7 @@ func newUpdatableBounds() updatableBounds { } } -func (b *updatableBounds) update(series *Series) { +func (b *UpdatableBounds) Update(series Series) { minFrom := model.Latest maxThrough := model.Earliest for _, chunk := range series.Chunks { @@ -106,11 +106,11 @@ func (b *updatableBounds) update(series *Series) { } func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, SeriesBounds, error) { - bounds := newUpdatableBounds() + bounds := NewUpdatableBounds() for itr.Next() { series := itr.At() - bounds.update(series.Series) + bounds.Update(*series.Series) blockFull, err := b.AddSeries(series) if err != nil { return 0, SeriesBounds{}, err @@ -134,7 +134,7 @@ func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, SeriesB return checksum, SeriesBounds(bounds), nil } -func (b *BlockBuilder) blockIsFull() (bool, error) { +func (b *BlockBuilder) isBlockFull() (bool, error) { // if the block size is 0, the max size is unlimited if b.opts.BlockSize == 0 { return false, nil @@ -161,7 +161,7 @@ func (b *BlockBuilder) AddSeries(series SeriesWithBloom) (bool, error) { return false, errors.Wrapf(err, "writing index for series %v", series.Series.Fingerprint) } - full, err := b.blockIsFull() + full, err := b.isBlockFull() if err != nil { return false, errors.Wrap(err, "checking if block is full") } @@ -573,7 +573,7 @@ func (mb *MergeBuilder) HasPendingData() bool { func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, SeriesBounds, error) { var ( nextInBlocks *SeriesWithBloom - bounds = newUpdatableBounds() + bounds = NewUpdatableBounds() ) // Turn the list of blocks into a single iterator that returns the next series @@ -641,7 +641,7 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, SeriesBounds, erro } } - bounds.update(cur.Series) + bounds.Update(*cur.Series) blockFull, err := builder.AddSeries(*cur) if err != nil { return 0, SeriesBounds{}, errors.Wrap(err, "adding series to block") diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 6f8bfe71e0c8c..b457b41542611 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -55,6 +55,8 @@ const ( defaultMaxStructuredMetadataSize = "64kb" defaultMaxStructuredMetadataCount = 128 + + defaultBloomCompactorMaxBlockSize = "200MB" ) // Limits describe all the limits for users; can be used to describe global default @@ -319,7 +321,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.") f.BoolVar(&l.BloomCompactorEnabled, "bloom-compactor.enable-compaction", false, "Whether to compact chunks into bloom filters.") f.IntVar(&l.BloomCompactorChunksBatchSize, "bloom-compactor.chunks-batch-size", 100, "The batch size of the chunks the bloom-compactor downloads at once.") - f.Var(&l.BloomCompactorMaxBlockSize, "bloom-compactor.max-block-size", "The maximum bloom block size. Default is unlimited. The actual block size might exceed this limit since blooms will be added to blocks until the block exceeds the maximum block size.") + _ = l.BloomCompactorMaxBlockSize.Set(defaultBloomCompactorMaxBlockSize) + f.Var(&l.BloomCompactorMaxBlockSize, "bloom-compactor.max-block-size", "The maximum bloom block size. A value of 0 sets an unlimited size. Default is 200MB. The actual block size might exceed this limit since blooms will be added to blocks until the block exceeds the maximum block size.") f.IntVar(&l.BloomNGramLength, "bloom-compactor.ngram-length", 4, "Length of the n-grams created when computing blooms from log lines.") f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 0, "Skip factor for the n-grams created when computing blooms from log lines.") f.Float64Var(&l.BloomFalsePositiveRate, "bloom-compactor.false-positive-rate", 0.01, "Scalable Bloom Filter desired false-positive rate.") From ae7e38223bbb8e41c86594bf64be950cfa5ff514 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Thu, 18 Jan 2024 12:57:02 +0100 Subject: [PATCH 6/6] fix format --- pkg/bloomcompactor/bloomcompactor.go | 2 +- pkg/bloomcompactor/chunkcompactor.go | 5 ++--- pkg/bloomcompactor/job.go | 2 +- pkg/bloomcompactor/mergecompactor.go | 2 +- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index a13fa4096aa83..7719c43e39883 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -27,7 +27,6 @@ package bloomcompactor import ( "context" "fmt" - "golang.org/x/exp/slices" "math" "math/rand" "os" @@ -43,6 +42,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "golang.org/x/exp/slices" "path/filepath" diff --git a/pkg/bloomcompactor/chunkcompactor.go b/pkg/bloomcompactor/chunkcompactor.go index b3c9fbface452..0923501a1f27c 100644 --- a/pkg/bloomcompactor/chunkcompactor.go +++ b/pkg/bloomcompactor/chunkcompactor.go @@ -3,15 +3,14 @@ package bloomcompactor import ( "context" "fmt" - "github.com/pkg/errors" "io" "os" "path/filepath" - "github.com/google/uuid" - "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/google/uuid" + "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/logproto" diff --git a/pkg/bloomcompactor/job.go b/pkg/bloomcompactor/job.go index c31caa9e24938..f4bd0edd747c2 100644 --- a/pkg/bloomcompactor/job.go +++ b/pkg/bloomcompactor/job.go @@ -4,7 +4,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" - "github.com/grafana/loki/pkg/storage/bloom/v1" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) diff --git a/pkg/bloomcompactor/mergecompactor.go b/pkg/bloomcompactor/mergecompactor.go index cb0cf77fb7bc0..90b2a2d30cc40 100644 --- a/pkg/bloomcompactor/mergecompactor.go +++ b/pkg/bloomcompactor/mergecompactor.go @@ -3,10 +3,10 @@ package bloomcompactor import ( "context" "fmt" - "github.com/pkg/errors" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/pkg/errors" "github.com/grafana/dskit/concurrency"