Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply maximum block size limit during bloom compaction #11681

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3098,6 +3098,12 @@ shard_streams:
# CLI flag: -bloom-compactor.chunks-batch-size
[bloom_compactor_chunks_batch_size: <int> | default = 100]

# The maximum bloom block size. A value of 0 sets an unlimited size. Default is
# 200MB. The actual block size might exceed this limit since blooms will be
# added to blocks until the block exceeds the maximum block size.
# CLI flag: -bloom-compactor.max-block-size
[bloom_compactor_max_block_size: <int> | default = 200MB]

# Length of the n-grams created when computing blooms from log lines.
# CLI flag: -bloom-compactor.ngram-length
[bloom_ngram_length: <int> | default = 4]
Expand Down
74 changes: 49 additions & 25 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/exp/slices"

"path/filepath"

Expand Down Expand Up @@ -423,6 +424,17 @@ func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc sto
return nil
}

// Sort seriesMetas by fingerprint to avoid overlapping blocks
slices.SortFunc(seriesMetas, func(a, b seriesMeta) int {
if a.seriesFP < b.seriesFP {
return -1
}
if a.seriesFP > b.seriesFP {
return 1
}
return 0
})

job := NewJob(tenant, tableName, idx.Path(), seriesMetas)
jobLogger := log.With(logger, "job", job.String())
c.metrics.compactionRunJobStarted.Inc()
Expand Down Expand Up @@ -485,10 +497,10 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}
metaSearchParams := bloomshipper.MetaSearchParams{
TenantID: job.tenantID,
MinFingerprint: job.minFp,
MaxFingerprint: job.maxFp,
StartTimestamp: job.from,
EndTimestamp: job.through,
MinFingerprint: job.FromFp,
MaxFingerprint: job.ThroughFp,
StartTimestamp: job.FromTs,
EndTimestamp: job.ThroughTs,
}
var metas []bloomshipper.Meta
//TODO Configure pool for these to avoid allocations
Expand All @@ -504,7 +516,8 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
metasMatchingJob, blocksMatchingJob := matchingBlocks(metas, job)

localDst := createLocalDirName(c.cfg.WorkingDirectory, job)
blockOptions := v1.NewBlockOptions(bt.GetNGramLength(), bt.GetNGramSkip())
maxBlockSize := c.limits.BloomCompactorMaxBlockSize(job.tenantID)
blockOptions := v1.NewBlockOptions(bt.GetNGramLength(), bt.GetNGramSkip(), maxBlockSize)

defer func() {
//clean up the bloom directory
Expand All @@ -513,10 +526,12 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}
}()

var resultingBlock bloomshipper.Block
var resultingBlocks []bloomshipper.Block
defer func() {
if resultingBlock.Data != nil {
_ = resultingBlock.Data.Close()
for _, resultingBlock := range resultingBlocks {
if resultingBlock.Data != nil {
_ = resultingBlock.Data.Close()
}
}
}()

Expand All @@ -529,13 +544,13 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
// No matching existing blocks for this job, compact all series from scratch
level.Info(logger).Log("msg", "No matching existing blocks for this job, compact all series from scratch")

builder, err := NewPersistentBlockBuilder(localDst, blockOptions)
builder := NewPersistentBlockBuilder(localDst, blockOptions)
if err != nil {
level.Error(logger).Log("msg", "failed creating block builder", "err", err)
return err
}

resultingBlock, err = compactNewChunks(ctx, logger, job, bt, storeClient.chunk, builder, c.limits)
resultingBlocks, err = compactNewChunks(ctx, logger, job, bt, storeClient.chunk, builder, c.limits)
if err != nil {
return level.Error(logger).Log("msg", "failed compacting new chunks", "err", err)
}
Expand All @@ -562,38 +577,47 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
return err
}

mergeBlockBuilder, err := NewPersistentBlockBuilder(localDst, blockOptions)
mergeBlockBuilder := NewPersistentBlockBuilder(localDst, blockOptions)
if err != nil {
level.Error(logger).Log("msg", "failed creating block builder", "err", err)
return err
}

resultingBlock, err = mergeCompactChunks(logger, populate, mergeBlockBuilder, blockIters, seriesIter, job)
resultingBlocks, err = mergeCompactChunks(ctx, logger, populate, mergeBlockBuilder, blockIters, seriesIter, job)
if err != nil {
level.Error(logger).Log("msg", "failed merging existing blocks with new chunks", "err", err)
return err
}

}

archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String())
level.Debug(logger).Log("msg", "blocks created", "resultingBlocks", len(resultingBlocks))

blockToUpload, err := bloomshipper.CompressBloomBlock(resultingBlock.BlockRef, archivePath, localDst, logger)
if err != nil {
level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err)
return err
blocksToUpload := make([]bloomshipper.Block, 0, len(resultingBlocks))
archivesPaths := make([]string, 0, len(resultingBlocks))
for _, block := range resultingBlocks {
archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String())
blockToUpload, err := bloomshipper.CompressBloomBlock(block.BlockRef, archivePath, block.BlockRef.BlockPath, logger)
if err != nil {
level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err)
return err
}
blocksToUpload = append(blocksToUpload, blockToUpload)
archivesPaths = append(archivesPaths, archivePath)
}

defer func() {
err = os.Remove(archivePath)
if err != nil {
level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath)
for _, archivePath := range archivesPaths {
err = os.Remove(archivePath)
if err != nil {
level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath)
}
}
}()

// Do not change the signature of PutBlocks yet.
// Once block size is limited potentially, compactNewChunks will return multiple blocks, hence a list is appropriate.
storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blockToUpload})
storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, blocksToUpload)
if err != nil {
level.Error(logger).Log("msg", "failed uploading blocks to storage", "err", err)
return err
Expand All @@ -611,10 +635,10 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
Ref: bloomshipper.Ref{
TenantID: job.tenantID,
TableName: job.tableName,
MinFingerprint: uint64(job.minFp),
MaxFingerprint: uint64(job.maxFp),
StartTimestamp: job.from,
EndTimestamp: job.through,
MinFingerprint: uint64(job.FromFp),
MaxFingerprint: uint64(job.ThroughFp),
StartTimestamp: job.FromTs,
EndTimestamp: job.ThroughTs,
Checksum: rand.Uint32(), // Discuss if checksum is needed for Metas, why should we read all data again.
},
},
Expand Down
Loading
Loading