Skip to content

Commit

Permalink
[bloom-compactor] Move meta.json creation at the end of compaction cy…
Browse files Browse the repository at this point in the history
…cle (#11234)

**What this PR does / why we need it**:
This is a follow up from #11115
Instead of a creating a meta file per bloom creation, create a meta file
per compaction cycle.

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:
  • Loading branch information
poyzannur authored Nov 15, 2023
1 parent c6f4212 commit 3f0f8fa
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 32 deletions.
5 changes: 2 additions & 3 deletions pkg/bloomcompactor/TODO.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
* Should we consider configuring falsePosRate of sbf at runtime?
* Adding falsePosRate of sbf into config
* Add per-tenant bool to enable compaction
* Use tarGz, untarGz before uploding blocks to storage
* Return checksum from `BuildFrom`
* Move meta creation to an outer layer, ensure one meta.json per compaction cycle.
* Introduce back `maxLookBackPeriod` as `RejectOldSamplesMaxAge` limit in distributors
77 changes: 48 additions & 29 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,13 @@ func createLocalDirName(workingDir string, job Job) string {
return filepath.Join(workingDir, dir)
}

func CompactNewChunks(ctx context.Context, logger log.Logger, job Job, chunks []chunk.Chunk, bt *v1.BloomTokenizer, bloomShipperClient bloomshipper.Client, dst string) (err error) {
// Compacts given list of chunks, uploads them to storage and returns a list of bloomBlocks
func CompactNewChunks(ctx context.Context, logger log.Logger, job Job,
chunks []chunk.Chunk, bt *v1.BloomTokenizer,
bloomShipperClient bloomshipper.Client, dst string) ([]bloomshipper.Block, error) {
// Ensure the context has not been canceled (ie. compactor shutdown has been triggered).
if err := ctx.Err(); err != nil {
return err
return nil, err
}

// Create a bloom for this series
Expand All @@ -526,31 +529,14 @@ func CompactNewChunks(ctx context.Context, logger log.Logger, job Job, chunks []
blocks, err := buildBloomBlock(ctx, logger, bloomForChks, job, dst)
if err != nil {
level.Error(logger).Log("building bloomBlocks", err)
return
return nil, err
}

storedBlocks, err := bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blocks})
if err != nil {
level.Error(logger).Log("putting blocks to storage", err)
return
}

storedBlockRefs := make([]bloomshipper.BlockRef, len(storedBlocks))
// Build and upload meta.json to storage
meta := bloomshipper.Meta{
// After successful compaction there should be no tombstones
Tombstones: make([]bloomshipper.BlockRef, 0),
Blocks: storedBlockRefs,
}

// TODO move this to an outer layer, otherwise creates a meta per block
err = bloomShipperClient.PutMeta(ctx, meta)
if err != nil {
level.Error(logger).Log("putting meta.json to storage", err)
return
return nil, err
}

return nil
return storedBlocks, nil
}

func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, bloomShipperClient bloomshipper.Client, bt *v1.BloomTokenizer, storeClient storeClient) error {
Expand All @@ -559,23 +545,43 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
return err
}

// TODO call bloomShipperClient.GetMetas to get existing meta.json
metaSearchParams := bloomshipper.MetaSearchParams{
TenantID: job.tenantID,
MinFingerprint: uint64(job.seriesFP),
MaxFingerprint: uint64(job.seriesFP),
StartTimestamp: int64(job.from),
EndTimestamp: int64(job.through),
}
var metas []bloomshipper.Meta
//TODO Configure pool for these to avoid allocations
var bloomBlocksRefs []bloomshipper.BlockRef
var tombstonedBlockRefs []bloomshipper.BlockRef

metas, err := bloomShipperClient.GetMetas(ctx, metaSearchParams)
if err != nil {
return err
}

if len(metas) == 0 {
// Get chunks data from list of chunkRefs
chks, err := storeClient.chunk.GetChunks(
ctx,
makeChunkRefs(job.Chunks(), job.Tenant(), job.Fingerprint()),
)
chks, err := storeClient.chunk.GetChunks(ctx, makeChunkRefs(job.Chunks(), job.Tenant(), job.Fingerprint()))
if err != nil {
return err
}

err = CompactNewChunks(ctx, logger, job, chks, bt, bloomShipperClient, c.cfg.WorkingDirectory)
storedBlocks, err := CompactNewChunks(ctx, logger, job, chks, bt, bloomShipperClient, c.cfg.WorkingDirectory)
if err != nil {
return err
return level.Error(logger).Log("compacting new chunks", err)
}

storedBlockRefs := make([]bloomshipper.BlockRef, len(storedBlocks))

for i, block := range storedBlocks {
storedBlockRefs[i] = block.BlockRef
}

// all blocks are new and active blocks
bloomBlocksRefs = storedBlockRefs
} else {
// TODO complete part 2 - periodic compaction for delta from previous period
// When already compacted metas exists
Expand All @@ -586,11 +592,24 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
for _, blockRef := range meta.Blocks {
uniqueIndexPaths[blockRef.IndexPath] = struct{}{}
// ...

// the result should return a list of active
// blocks and tombstoned bloom blocks.
}
}

}

// After all is done, create one meta file and upload to storage
meta := bloomshipper.Meta{
Tombstones: tombstonedBlockRefs,
Blocks: bloomBlocksRefs,
}
err = bloomShipperClient.PutMeta(ctx, meta)
if err != nil {
level.Error(logger).Log("putting meta.json to storage", err)
return err
}
return nil
}

Expand Down

0 comments on commit 3f0f8fa

Please sign in to comment.