Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply maximum block size limit during bloom compaction #11681

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3098,6 +3098,12 @@ shard_streams:
# CLI flag: -bloom-compactor.chunks-batch-size
[bloom_compactor_chunks_batch_size: <int> | default = 100]

# The maximum bloom block size. Default is unlimited. The actual block size
# might exceed this limit since blooms will be added to blocks until the block
# exceeds the maximum block size.
# CLI flag: -bloom-compactor.max-block-size
[bloom_compactor_max_block_size: <int> | default = 0B]

# Length of the n-grams created when computing blooms from log lines.
# CLI flag: -bloom-compactor.ngram-length
[bloom_ngram_length: <int> | default = 4]
Expand Down
44 changes: 28 additions & 16 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,8 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
metasMatchingJob, blocksMatchingJob := matchingBlocks(metas, job)

localDst := createLocalDirName(c.cfg.WorkingDirectory, job)
blockOptions := v1.NewBlockOptions(bt.GetNGramLength(), bt.GetNGramSkip())
maxBlockSize := c.limits.BloomCompactorMaxBlockSize(job.tenantID)
blockOptions := v1.NewBlockOptions(bt.GetNGramLength(), bt.GetNGramSkip(), maxBlockSize)

defer func() {
//clean up the bloom directory
Expand All @@ -513,10 +514,12 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}
}()

var resultingBlock bloomshipper.Block
var resultingBlocks []bloomshipper.Block
defer func() {
if resultingBlock.Data != nil {
_ = resultingBlock.Data.Close()
for _, resultingBlock := range resultingBlocks {
if resultingBlock.Data != nil {
_ = resultingBlock.Data.Close()
}
}
}()

Expand All @@ -535,7 +538,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
return err
}

resultingBlock, err = compactNewChunks(ctx, logger, job, bt, storeClient.chunk, builder, c.limits)
resultingBlocks, err = compactNewChunks(ctx, logger, job, bt, storeClient.chunk, builder, c.limits)
if err != nil {
return level.Error(logger).Log("msg", "failed compacting new chunks", "err", err)
}
Expand All @@ -546,7 +549,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,

var populate = createPopulateFunc(ctx, job, storeClient, bt, c.limits)

seriesIter := makeSeriesIterFromSeriesMeta(job)
seriesIter := v1.NewPeekingIter[*v1.Series](makeSeriesIterFromSeriesMeta(job))
salvacorts marked this conversation as resolved.
Show resolved Hide resolved

blockIters, blockPaths, err := makeBlockIterFromBlocks(ctx, logger, c.bloomShipperClient, blocksMatchingJob, c.cfg.WorkingDirectory)
defer func() {
Expand All @@ -568,32 +571,41 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
return err
}

resultingBlock, err = mergeCompactChunks(logger, populate, mergeBlockBuilder, blockIters, seriesIter, job)
resultingBlocks, err = mergeCompactChunks(logger, populate, mergeBlockBuilder, blockIters, seriesIter, job)
if err != nil {
level.Error(logger).Log("msg", "failed merging existing blocks with new chunks", "err", err)
return err
}

}

archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String())
level.Debug(logger).Log("msg", "blocks created", "resultingBlocks", len(resultingBlocks))

blockToUpload, err := bloomshipper.CompressBloomBlock(resultingBlock.BlockRef, archivePath, localDst, logger)
if err != nil {
level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err)
return err
blocksToUpload := make([]bloomshipper.Block, 0, len(resultingBlocks))
archivesPaths := make([]string, 0, len(resultingBlocks))
for _, block := range resultingBlocks {
archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String())
blockToUpload, err := bloomshipper.CompressBloomBlock(block.BlockRef, archivePath, block.BlockRef.BlockPath, logger)
if err != nil {
level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err)
return err
}
blocksToUpload = append(blocksToUpload, blockToUpload)
archivesPaths = append(archivesPaths, archivePath)
}

defer func() {
err = os.Remove(archivePath)
if err != nil {
level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath)
for _, archivePath := range archivesPaths {
err = os.Remove(archivePath)
if err != nil {
level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath)
}
}
}()

// Do not change the signature of PutBlocks yet.
// Once block size is limited potentially, compactNewChunks will return multiple blocks, hence a list is appropriate.
storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blockToUpload})
storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, blocksToUpload)
if err != nil {
level.Error(logger).Log("msg", "failed uploading blocks to storage", "err", err)
return err
Expand Down
152 changes: 102 additions & 50 deletions pkg/bloomcompactor/chunkcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,72 @@ type chunkClient interface {
}

type blockBuilder interface {
BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (uint32, error)
// BuildFrom build a block from the given iterator.
// If the data is too large to fit in a single block, the iterator won't be consumed completely. In this case,
// call BuildFrom again with the same iterator to build the next block.
BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (string, uint32, v1.SeriesBounds, error)
// Data returns a reader for the last block built by BuildFrom.
Data() (io.ReadSeekCloser, error)
}

type PersistentBlockBuilder struct {
builder *v1.BlockBuilder
localDst string
blockOptions v1.BlockOptions
lastBlockIdx uint64
baseLocalDst string
currentBlockPath string
}

func NewPersistentBlockBuilder(localDst string, blockOptions v1.BlockOptions) (*PersistentBlockBuilder, error) {
salvacorts marked this conversation as resolved.
Show resolved Hide resolved
builder := PersistentBlockBuilder{
blockOptions: blockOptions,
baseLocalDst: localDst,
}

return &builder, nil
}

func (p *PersistentBlockBuilder) getNextBuilder() (*v1.BlockBuilder, error) {
// write bloom to a local dir
b, err := v1.NewBlockBuilder(blockOptions, v1.NewDirectoryBlockWriter(localDst))
blockPath := filepath.Join(p.baseLocalDst, fmt.Sprintf("%d", p.lastBlockIdx))
builder, err := v1.NewBlockBuilder(p.blockOptions, v1.NewDirectoryBlockWriter(blockPath))
if err != nil {
return nil, err
}
builder := PersistentBlockBuilder{
builder: b,
localDst: localDst,
}
return &builder, nil
p.currentBlockPath = blockPath
p.lastBlockIdx++
return builder, nil
}

func (p *PersistentBlockBuilder) BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (uint32, error) {
return p.builder.BuildFrom(itr)
func (p *PersistentBlockBuilder) BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (string, uint32, v1.SeriesBounds, error) {
b, err := p.getNextBuilder()
if err != nil {
return "", 0, v1.SeriesBounds{}, err
}

checksum, bounds, err := b.BuildFrom(itr)
if err != nil {
return "", 0, v1.SeriesBounds{}, err
}

return p.currentBlockPath, checksum, bounds, nil
}

func (p *PersistentBlockBuilder) mergeBuild(builder *v1.MergeBuilder) (uint32, error) {
return builder.Build(p.builder)
func (p *PersistentBlockBuilder) mergeBuild(builder *v1.MergeBuilder) (string, uint32, v1.SeriesBounds, error) {
b, err := p.getNextBuilder()
if err != nil {
return "", 0, v1.SeriesBounds{}, err
}

checksum, bounds, err := builder.Build(b)
if err != nil {
return "", 0, v1.SeriesBounds{}, err
}

return p.currentBlockPath, checksum, bounds, nil
}

func (p *PersistentBlockBuilder) Data() (io.ReadSeekCloser, error) {
blockFile, err := os.Open(filepath.Join(p.localDst, v1.BloomFileName))
blockFile, err := os.Open(filepath.Join(p.currentBlockPath, v1.BloomFileName))
if err != nil {
return nil, err
}
Expand All @@ -87,10 +121,13 @@ func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fing
}

func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compactorTokenizer, chunks v1.Iterator[[]chunk.Chunk]) (v1.SeriesWithBloom, error) {
chunkRefs := makeChunkRefsFromChunkMetas(seriesMeta.chunkRefs)

// Create a bloom for this series
bloomForChks := v1.SeriesWithBloom{
Series: &v1.Series{
Fingerprint: seriesMeta.seriesFP,
Chunks: chunkRefs,
salvacorts marked this conversation as resolved.
Show resolved Hide resolved
},
Bloom: &v1.Bloom{
ScalableBloomFilter: *filter.NewDefaultScalableBloomFilter(fpRate),
Expand All @@ -106,47 +143,62 @@ func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compa
}

// TODO Test this when bloom block size check is implemented
func buildBlockFromBlooms(
func buildBlocksFromBlooms(
ctx context.Context,
logger log.Logger,
builder blockBuilder,
blooms v1.Iterator[v1.SeriesWithBloom],
blooms v1.PeekingIterator[v1.SeriesWithBloom],
job Job,
) (bloomshipper.Block, error) {
) ([]bloomshipper.Block, error) {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return bloomshipper.Block{}, err
}

checksum, err := builder.BuildFrom(blooms)
if err != nil {
level.Error(logger).Log("msg", "failed writing to bloom", "err", err)
return bloomshipper.Block{}, err
return []bloomshipper.Block{}, err
}

data, err := builder.Data()
if err != nil {
level.Error(logger).Log("msg", "failed reading bloom data", "err", err)
return bloomshipper.Block{}, err
}

block := bloomshipper.Block{
BlockRef: bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
TenantID: job.tenantID,
TableName: job.tableName,
MinFingerprint: uint64(job.minFp),
MaxFingerprint: uint64(job.maxFp),
StartTimestamp: job.from,
EndTimestamp: job.through,
Checksum: checksum,
// TODO(salvacorts): Reuse buffer
blocks := make([]bloomshipper.Block, 0)

for {
// Create blocks until the iterator is empty
if _, hasNext := blooms.Peek(); !hasNext {
break
}

if err := ctx.Err(); err != nil {
return []bloomshipper.Block{}, err
}

blockPath, checksum, bounds, err := builder.BuildFrom(blooms)
if err != nil {
level.Error(logger).Log("msg", "failed writing to bloom", "err", err)
return []bloomshipper.Block{}, err
salvacorts marked this conversation as resolved.
Show resolved Hide resolved
}

data, err := builder.Data()
if err != nil {
level.Error(logger).Log("msg", "failed reading bloom data", "err", err)
return []bloomshipper.Block{}, err
}

blocks = append(blocks, bloomshipper.Block{
BlockRef: bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
TenantID: job.tenantID,
TableName: job.tableName,
MinFingerprint: uint64(bounds.FromFp),
MaxFingerprint: uint64(bounds.ThroughFp),
StartTimestamp: bounds.FromTs,
EndTimestamp: bounds.ThroughTs,
Checksum: checksum,
},
IndexPath: job.indexPath,
BlockPath: blockPath,
},
IndexPath: job.indexPath,
},
Data: data,
Data: data,
})
}

return block, nil
return blocks, nil
}

func createLocalDirName(workingDir string, job Job) string {
Expand All @@ -162,22 +214,22 @@ func compactNewChunks(ctx context.Context,
storeClient chunkClient,
builder blockBuilder,
limits Limits,
) (bloomshipper.Block, error) {
) ([]bloomshipper.Block, error) {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return bloomshipper.Block{}, err
return []bloomshipper.Block{}, err
}

bloomIter := newLazyBloomBuilder(ctx, job, storeClient, bt, logger, limits)
bloomIter := v1.NewPeekingIter[v1.SeriesWithBloom](newLazyBloomBuilder(ctx, job, storeClient, bt, logger, limits))

// Build and upload bloomBlock to storage
block, err := buildBlockFromBlooms(ctx, logger, builder, bloomIter, job)
blocks, err := buildBlocksFromBlooms(ctx, logger, builder, bloomIter, job)
if err != nil {
level.Error(logger).Log("msg", "failed building bloomBlocks", "err", err)
return bloomshipper.Block{}, err
return []bloomshipper.Block{}, err
}

return block, nil
return blocks, nil
}

type lazyBloomBuilder struct {
Expand Down
Loading
Loading