diff --git a/pkg/bloomcompactor/TODO.md b/pkg/bloomcompactor/TODO.md index b34fc24aa967a..479f5399a350d 100644 --- a/pkg/bloomcompactor/TODO.md +++ b/pkg/bloomcompactor/TODO.md @@ -1,5 +1,4 @@ -* Should we consider configuring falsePosRate of sbf at runtime? +* Adding falsePosRate of sbf into config +* Add per-tenant bool to enable compaction * Use tarGz, untarGz before uploding blocks to storage -* Return checksum from `BuildFrom` -* Move meta creation to an outer layer, ensure one meta.json per compaction cycle. * Introduce back `maxLookBackPeriod` as `RejectOldSamplesMaxAge` limit in distributors diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index b517957833a94..c41d4bdfd7c91 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -503,10 +503,13 @@ func createLocalDirName(workingDir string, job Job) string { return filepath.Join(workingDir, dir) } -func CompactNewChunks(ctx context.Context, logger log.Logger, job Job, chunks []chunk.Chunk, bt *v1.BloomTokenizer, bloomShipperClient bloomshipper.Client, dst string) (err error) { +// Compacts given list of chunks, uploads them to storage and returns a list of bloomBlocks +func CompactNewChunks(ctx context.Context, logger log.Logger, job Job, + chunks []chunk.Chunk, bt *v1.BloomTokenizer, + bloomShipperClient bloomshipper.Client, dst string) ([]bloomshipper.Block, error) { // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). if err := ctx.Err(); err != nil { - return err + return nil, err } // Create a bloom for this series @@ -526,31 +529,14 @@ func CompactNewChunks(ctx context.Context, logger log.Logger, job Job, chunks [] blocks, err := buildBloomBlock(ctx, logger, bloomForChks, job, dst) if err != nil { level.Error(logger).Log("building bloomBlocks", err) - return + return nil, err } - storedBlocks, err := bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blocks}) if err != nil { level.Error(logger).Log("putting blocks to storage", err) - return - } - - storedBlockRefs := make([]bloomshipper.BlockRef, len(storedBlocks)) - // Build and upload meta.json to storage - meta := bloomshipper.Meta{ - // After successful compaction there should be no tombstones - Tombstones: make([]bloomshipper.BlockRef, 0), - Blocks: storedBlockRefs, - } - - // TODO move this to an outer layer, otherwise creates a meta per block - err = bloomShipperClient.PutMeta(ctx, meta) - if err != nil { - level.Error(logger).Log("putting meta.json to storage", err) - return + return nil, err } - - return nil + return storedBlocks, nil } func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, bloomShipperClient bloomshipper.Client, bt *v1.BloomTokenizer, storeClient storeClient) error { @@ -559,23 +545,43 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, return err } - // TODO call bloomShipperClient.GetMetas to get existing meta.json + metaSearchParams := bloomshipper.MetaSearchParams{ + TenantID: job.tenantID, + MinFingerprint: uint64(job.seriesFP), + MaxFingerprint: uint64(job.seriesFP), + StartTimestamp: int64(job.from), + EndTimestamp: int64(job.through), + } var metas []bloomshipper.Meta + //TODO Configure pool for these to avoid allocations + var bloomBlocksRefs []bloomshipper.BlockRef + var tombstonedBlockRefs []bloomshipper.BlockRef + + metas, err := bloomShipperClient.GetMetas(ctx, metaSearchParams) + if err != nil { + return err + } if len(metas) == 0 { // Get chunks data from list of chunkRefs - chks, err := storeClient.chunk.GetChunks( - ctx, - makeChunkRefs(job.Chunks(), job.Tenant(), job.Fingerprint()), - ) + chks, err := storeClient.chunk.GetChunks(ctx, makeChunkRefs(job.Chunks(), job.Tenant(), job.Fingerprint())) if err != nil { return err } - err = CompactNewChunks(ctx, logger, job, chks, bt, bloomShipperClient, c.cfg.WorkingDirectory) + storedBlocks, err := CompactNewChunks(ctx, logger, job, chks, bt, bloomShipperClient, c.cfg.WorkingDirectory) if err != nil { - return err + return level.Error(logger).Log("compacting new chunks", err) } + + storedBlockRefs := make([]bloomshipper.BlockRef, len(storedBlocks)) + + for i, block := range storedBlocks { + storedBlockRefs[i] = block.BlockRef + } + + // all blocks are new and active blocks + bloomBlocksRefs = storedBlockRefs } else { // TODO complete part 2 - periodic compaction for delta from previous period // When already compacted metas exists @@ -586,11 +592,24 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, for _, blockRef := range meta.Blocks { uniqueIndexPaths[blockRef.IndexPath] = struct{}{} // ... + + // the result should return a list of active + // blocks and tombstoned bloom blocks. } } } + // After all is done, create one meta file and upload to storage + meta := bloomshipper.Meta{ + Tombstones: tombstonedBlockRefs, + Blocks: bloomBlocksRefs, + } + err = bloomShipperClient.PutMeta(ctx, meta) + if err != nil { + level.Error(logger).Log("putting meta.json to storage", err) + return err + } return nil }