Skip to content

Commit

Permalink
Apply maximum block size limit during bloom compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Jan 15, 2024
1 parent 86f2001 commit c416b19
Show file tree
Hide file tree
Showing 11 changed files with 439 additions and 158 deletions.
6 changes: 6 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3098,6 +3098,12 @@ shard_streams:
# CLI flag: -bloom-compactor.chunks-batch-size
[bloom_compactor_chunks_batch_size: <int> | default = 100]

# The maximum bloom block size. Default is unlimited. The actual block size
# might exceed this limit since blooms will be added to blocks until the block
# exceeds the maximum block size.
# CLI flag: -bloom-compactor.max-block-size
[bloom_compactor_max_block_size: <int> | default = 0B]

# Length of the n-grams created when computing blooms from log lines.
# CLI flag: -bloom-compactor.ngram-length
[bloom_ngram_length: <int> | default = 4]
Expand Down
44 changes: 28 additions & 16 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,8 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
metasMatchingJob, blocksMatchingJob := matchingBlocks(metas, job)

localDst := createLocalDirName(c.cfg.WorkingDirectory, job)
blockOptions := v1.NewBlockOptions(bt.GetNGramLength(), bt.GetNGramSkip())
maxBlockSize := c.limits.BloomCompactorMaxBlockSize(job.tenantID)
blockOptions := v1.NewBlockOptions(bt.GetNGramLength(), bt.GetNGramSkip(), maxBlockSize)

defer func() {
//clean up the bloom directory
Expand All @@ -513,10 +514,12 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}
}()

var resultingBlock bloomshipper.Block
var resultingBlocks []bloomshipper.Block
defer func() {
if resultingBlock.Data != nil {
_ = resultingBlock.Data.Close()
for _, resultingBlock := range resultingBlocks {
if resultingBlock.Data != nil {
_ = resultingBlock.Data.Close()
}
}
}()

Expand All @@ -535,7 +538,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
return err
}

resultingBlock, err = compactNewChunks(ctx, logger, job, bt, storeClient.chunk, builder, c.limits)
resultingBlocks, err = compactNewChunks(ctx, logger, job, bt, storeClient.chunk, builder, c.limits)
if err != nil {
return level.Error(logger).Log("msg", "failed compacting new chunks", "err", err)
}
Expand All @@ -546,7 +549,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,

var populate = createPopulateFunc(ctx, job, storeClient, bt, c.limits)

seriesIter := makeSeriesIterFromSeriesMeta(job)
seriesIter := v1.NewPeekingIter[*v1.Series](makeSeriesIterFromSeriesMeta(job))

blockIters, blockPaths, err := makeBlockIterFromBlocks(ctx, logger, c.bloomShipperClient, blocksMatchingJob, c.cfg.WorkingDirectory)
defer func() {
Expand All @@ -568,32 +571,41 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
return err
}

resultingBlock, err = mergeCompactChunks(logger, populate, mergeBlockBuilder, blockIters, seriesIter, job)
resultingBlocks, err = mergeCompactChunks(logger, populate, mergeBlockBuilder, blockIters, seriesIter, job)
if err != nil {
level.Error(logger).Log("msg", "failed merging existing blocks with new chunks", "err", err)
return err
}

}

archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String())
level.Debug(logger).Log("msg", "blocks created", "resultingBlocks", len(resultingBlocks))

blockToUpload, err := bloomshipper.CompressBloomBlock(resultingBlock.BlockRef, archivePath, localDst, logger)
if err != nil {
level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err)
return err
blocksToUpload := make([]bloomshipper.Block, 0, len(resultingBlocks))
archivesPaths := make([]string, 0, len(resultingBlocks))
for _, block := range resultingBlocks {
archivePath := filepath.Join(c.cfg.WorkingDirectory, uuid.New().String())
blockToUpload, err := bloomshipper.CompressBloomBlock(block.BlockRef, archivePath, block.BlockRef.BlockPath, logger)
if err != nil {
level.Error(logger).Log("msg", "failed compressing bloom blocks into tar file", "err", err)
return err
}
blocksToUpload = append(blocksToUpload, blockToUpload)
archivesPaths = append(archivesPaths, archivePath)
}

defer func() {
err = os.Remove(archivePath)
if err != nil {
level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath)
for _, archivePath := range archivesPaths {
err = os.Remove(archivePath)
if err != nil {
level.Error(logger).Log("msg", "failed removing archive file", "err", err, "file", archivePath)
}
}
}()

// Do not change the signature of PutBlocks yet.
// Once block size is limited potentially, compactNewChunks will return multiple blocks, hence a list is appropriate.
storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blockToUpload})
storedBlocks, err := c.bloomShipperClient.PutBlocks(ctx, blocksToUpload)
if err != nil {
level.Error(logger).Log("msg", "failed uploading blocks to storage", "err", err)
return err
Expand Down
153 changes: 103 additions & 50 deletions pkg/bloomcompactor/chunkcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,73 @@ type chunkClient interface {
}

type blockBuilder interface {
BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (uint32, error)
// BuildFrom build a block from the given iterator.
// If the data is too large to fit in a single block, the iterator won't be consumed completely. In this case,
// call BuildFrom again with the same iterator to build the next block.
BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (string, uint32, v1.SeriesBounds, error)
// Data returns a reader for the last block built by BuildFrom.
Data() (io.ReadSeekCloser, error)
}

type PersistentBlockBuilder struct {
builder *v1.BlockBuilder
localDst string
blockOptions v1.BlockOptions
lastBlockIdx uint64
baseLocalDst string
currentBlockPath string
}

func NewPersistentBlockBuilder(localDst string, blockOptions v1.BlockOptions) (*PersistentBlockBuilder, error) {
builder := PersistentBlockBuilder{
blockOptions: blockOptions,
baseLocalDst: localDst,
}

return &builder, nil
}

func (p *PersistentBlockBuilder) getNextBuilder() (*v1.BlockBuilder, error) {
// write bloom to a local dir
b, err := v1.NewBlockBuilder(blockOptions, v1.NewDirectoryBlockWriter(localDst))
blockPath := filepath.Join(p.baseLocalDst, fmt.Sprintf("%d", p.lastBlockIdx))
builder, err := v1.NewBlockBuilder(p.blockOptions, v1.NewDirectoryBlockWriter(blockPath))
if err != nil {
return nil, err
}
builder := PersistentBlockBuilder{
builder: b,
localDst: localDst,
}
return &builder, nil
p.currentBlockPath = blockPath
p.lastBlockIdx++
return builder, nil
}

func (p *PersistentBlockBuilder) BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (uint32, error) {
return p.builder.BuildFrom(itr)
func (p *PersistentBlockBuilder) BuildFrom(itr v1.Iterator[v1.SeriesWithBloom]) (string, uint32, v1.SeriesBounds, error) {

b, err := p.getNextBuilder()
if err != nil {
return "", 0, v1.SeriesBounds{}, err
}

checksum, bounds, err := b.BuildFrom(itr)
if err != nil {
return "", 0, v1.SeriesBounds{}, err
}

return p.currentBlockPath, checksum, bounds, nil
}

func (p *PersistentBlockBuilder) mergeBuild(builder *v1.MergeBuilder) (uint32, error) {
return builder.Build(p.builder)
func (p *PersistentBlockBuilder) mergeBuild(builder *v1.MergeBuilder) (string, uint32, v1.SeriesBounds, error) {
b, err := p.getNextBuilder()
if err != nil {
return "", 0, v1.SeriesBounds{}, err
}

checksum, bounds, err := builder.Build(b)
if err != nil {
return "", 0, v1.SeriesBounds{}, err
}

return p.currentBlockPath, checksum, bounds, nil
}

func (p *PersistentBlockBuilder) Data() (io.ReadSeekCloser, error) {
blockFile, err := os.Open(filepath.Join(p.localDst, v1.BloomFileName))
blockFile, err := os.Open(filepath.Join(p.currentBlockPath, v1.BloomFileName))
if err != nil {
return nil, err
}
Expand All @@ -87,10 +122,13 @@ func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fing
}

func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compactorTokenizer, chunks v1.Iterator[[]chunk.Chunk]) (v1.SeriesWithBloom, error) {
chunkRefs := makeChunkRefsFromChunkMetas(seriesMeta.chunkRefs)

// Create a bloom for this series
bloomForChks := v1.SeriesWithBloom{
Series: &v1.Series{
Fingerprint: seriesMeta.seriesFP,
Chunks: chunkRefs,
},
Bloom: &v1.Bloom{
ScalableBloomFilter: *filter.NewDefaultScalableBloomFilter(fpRate),
Expand All @@ -106,47 +144,62 @@ func buildBloomFromSeries(seriesMeta seriesMeta, fpRate float64, tokenizer compa
}

// TODO Test this when bloom block size check is implemented
func buildBlockFromBlooms(
func buildBlocksFromBlooms(
ctx context.Context,
logger log.Logger,
builder blockBuilder,
blooms v1.Iterator[v1.SeriesWithBloom],
blooms v1.PeekingIterator[v1.SeriesWithBloom],
job Job,
) (bloomshipper.Block, error) {
) ([]bloomshipper.Block, error) {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return bloomshipper.Block{}, err
}

checksum, err := builder.BuildFrom(blooms)
if err != nil {
level.Error(logger).Log("msg", "failed writing to bloom", "err", err)
return bloomshipper.Block{}, err
return []bloomshipper.Block{}, err
}

data, err := builder.Data()
if err != nil {
level.Error(logger).Log("msg", "failed reading bloom data", "err", err)
return bloomshipper.Block{}, err
}

block := bloomshipper.Block{
BlockRef: bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
TenantID: job.tenantID,
TableName: job.tableName,
MinFingerprint: uint64(job.minFp),
MaxFingerprint: uint64(job.maxFp),
StartTimestamp: job.from,
EndTimestamp: job.through,
Checksum: checksum,
// TODO(salvacorts): Reuse buffer
blocks := make([]bloomshipper.Block, 0)

for {
// Create blocks until the iterator is empty
if _, hasNext := blooms.Peek(); !hasNext {
break
}

if err := ctx.Err(); err != nil {
return []bloomshipper.Block{}, err
}

blockPath, checksum, bounds, err := builder.BuildFrom(blooms)
if err != nil {
level.Error(logger).Log("msg", "failed writing to bloom", "err", err)
return []bloomshipper.Block{}, err
}

data, err := builder.Data()
if err != nil {
level.Error(logger).Log("msg", "failed reading bloom data", "err", err)
return []bloomshipper.Block{}, err
}

blocks = append(blocks, bloomshipper.Block{
BlockRef: bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
TenantID: job.tenantID,
TableName: job.tableName,
MinFingerprint: uint64(bounds.FromFp),
MaxFingerprint: uint64(bounds.ThroughFp),
StartTimestamp: bounds.FromTs,
EndTimestamp: bounds.ThroughTs,
Checksum: checksum,
},
IndexPath: job.indexPath,
BlockPath: blockPath,
},
IndexPath: job.indexPath,
},
Data: data,
Data: data,
})
}

return block, nil
return blocks, nil
}

func createLocalDirName(workingDir string, job Job) string {
Expand All @@ -162,22 +215,22 @@ func compactNewChunks(ctx context.Context,
storeClient chunkClient,
builder blockBuilder,
limits Limits,
) (bloomshipper.Block, error) {
) ([]bloomshipper.Block, error) {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return bloomshipper.Block{}, err
return []bloomshipper.Block{}, err
}

bloomIter := newLazyBloomBuilder(ctx, job, storeClient, bt, logger, limits)
bloomIter := v1.NewPeekingIter[v1.SeriesWithBloom](newLazyBloomBuilder(ctx, job, storeClient, bt, logger, limits))

// Build and upload bloomBlock to storage
block, err := buildBlockFromBlooms(ctx, logger, builder, bloomIter, job)
blocks, err := buildBlocksFromBlooms(ctx, logger, builder, bloomIter, job)
if err != nil {
level.Error(logger).Log("msg", "failed building bloomBlocks", "err", err)
return bloomshipper.Block{}, err
return []bloomshipper.Block{}, err
}

return block, nil
return blocks, nil
}

type lazyBloomBuilder struct {
Expand Down
Loading

0 comments on commit c416b19

Please sign in to comment.