Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bloom/controller wiring #11831

Merged
merged 7 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/bloomcompactor/v2_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Meta struct {
// is greater than or equal to the range of the actual data in the underlying blocks.
OwnershipRange v1.FingerprintBounds

// Old blocks which can be deleted in the future. These should be from pervious compaction rounds.
// Old blocks which can be deleted in the future. These should be from previous compaction rounds.
Tombstones []BlockRef

// The specific TSDB files used to generate the block.
Expand Down Expand Up @@ -119,17 +119,18 @@ func (m Meta) Checksum() (uint32, error) {
}

type TSDBStore interface {
ResolveTSDBs() ([]*tsdb.TSDBFile, error)
ResolveTSDBs() ([]*tsdb.SingleTenantTSDBIdentifier, error)
LoadTSDB(id tsdb.Identifier, bounds v1.FingerprintBounds) (v1.CloseableIterator[*v1.Series], error)
}

type MetaStore interface {
ResolveMetas(bounds v1.FingerprintBounds) ([]MetaRef, error)
GetMetas([]MetaRef) ([]Meta, error)
PutMeta(Meta) error
ResolveMetas(bounds v1.FingerprintBounds) ([]MetaRef, error)
}

type BlockStore interface {
// TODO(owen-d): flesh out|integrate against bloomshipper.Client
GetBlocks([]BlockRef) ([]interface{}, error)
GetBlocks([]BlockRef) ([]*v1.Block, error)
PutBlock(interface{}) error
}
184 changes: 177 additions & 7 deletions pkg/bloomcompactor/v2controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bloomcompactor
import (
"context"
"fmt"
"sort"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -17,6 +18,9 @@ type SimpleBloomController struct {
tsdbStore TSDBStore
metaStore MetaStore
blockStore BlockStore
chunkLoader ChunkLoader
rwFn func() (v1.BlockWriter, v1.BlockReader)
metrics *Metrics

// TODO(owen-d): add metrics
logger log.Logger
Expand All @@ -27,18 +31,24 @@ func NewSimpleBloomController(
tsdbStore TSDBStore,
metaStore MetaStore,
blockStore BlockStore,
chunkLoader ChunkLoader,
rwFn func() (v1.BlockWriter, v1.BlockReader),
metrics *Metrics,
logger log.Logger,
) *SimpleBloomController {
return &SimpleBloomController{
ownershipRange: ownershipRange,
tsdbStore: tsdbStore,
metaStore: metaStore,
blockStore: blockStore,
chunkLoader: chunkLoader,
rwFn: rwFn,
metrics: metrics,
logger: log.With(logger, "ownership", ownershipRange),
}
}

func (s *SimpleBloomController) do(_ context.Context) error {
func (s *SimpleBloomController) do(ctx context.Context) error {
// 1. Resolve TSDBs
tsdbs, err := s.tsdbStore.ResolveTSDBs()
if err != nil {
Expand All @@ -61,26 +71,26 @@ func (s *SimpleBloomController) do(_ context.Context) error {
}

ids := make([]tsdb.Identifier, 0, len(tsdbs))
for _, idx := range tsdbs {
ids = append(ids, idx.Identifier)
for _, id := range tsdbs {
ids = append(ids, id)
}

// 4. Determine which TSDBs have gaps in the ownership range and need to
// be processed.
work, err := gapsBetweenTSDBsAndMetas(s.ownershipRange, ids, metas)
tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(s.ownershipRange, ids, metas)
if err != nil {
level.Error(s.logger).Log("msg", "failed to find gaps", "err", err)
return errors.Wrap(err, "failed to find gaps")
}

if len(work) == 0 {
if len(tsdbsWithGaps) == 0 {
level.Debug(s.logger).Log("msg", "blooms exist for all tsdbs")
return nil
}

// TODO(owen-d): finish
panic("not implemented")
work := blockPlansForGaps(tsdbsWithGaps, metas)

// 5. Generate Blooms
// Now that we have the gaps, we will generate a bloom block for each gap.
// We can accelerate this by using existing blocks which may already contain
// needed chunks in their blooms, for instance after a new TSDB version is generated
Expand All @@ -89,8 +99,168 @@ func (s *SimpleBloomController) do(_ context.Context) error {
// overlapping the ownership ranges we've identified as needing updates.
// With these in hand, we can download the old blocks and use them to
// accelerate bloom generation for the new blocks.

var (
blockCt int
tsdbCt = len(work)
)

for _, plan := range work {

for _, gap := range plan.gaps {
// Fetch blocks that aren't up to date but are in the desired fingerprint range
// to try and accelerate bloom creation
seriesItr, preExistingBlocks, err := s.loadWorkForGap(plan.tsdb, gap)
if err != nil {
level.Error(s.logger).Log("msg", "failed to get series and blocks", "err", err)
return errors.Wrap(err, "failed to get series and blocks")
}

gen := NewSimpleBloomGenerator(
v1.DefaultBlockOptions,
seriesItr,
s.chunkLoader,
preExistingBlocks,
s.rwFn,
s.metrics,
log.With(s.logger, "tsdb", plan.tsdb.Name(), "ownership", gap, "blocks", len(preExistingBlocks)),
)

_, newBlocks, err := gen.Generate(ctx)
if err != nil {
// TODO(owen-d): metrics
level.Error(s.logger).Log("msg", "failed to generate bloom", "err", err)
return errors.Wrap(err, "failed to generate bloom")
}

// TODO(owen-d): dispatch this to a queue for writing, handling retries/backpressure, etc?
for newBlocks.Next() {
blockCt++
blk := newBlocks.At()
if err := s.blockStore.PutBlock(blk); err != nil {
level.Error(s.logger).Log("msg", "failed to write block", "err", err)
return errors.Wrap(err, "failed to write block")
}
}

if err := newBlocks.Err(); err != nil {
// TODO(owen-d): metrics
level.Error(s.logger).Log("msg", "failed to generate bloom", "err", err)
return errors.Wrap(err, "failed to generate bloom")
}

}
}

level.Debug(s.logger).Log("msg", "finished bloom generation", "blocks", blockCt, "tsdbs", tsdbCt)
return nil

}

func (s *SimpleBloomController) loadWorkForGap(id tsdb.Identifier, gap gapWithBlocks) (v1.CloseableIterator[*v1.Series], []*v1.Block, error) {
// load a series iterator for the gap
seriesItr, err := s.tsdbStore.LoadTSDB(id, gap.bounds)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load tsdb")
}

blocks, err := s.blockStore.GetBlocks(gap.blocks)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to get blocks")
}

return seriesItr, blocks, nil
}

type gapWithBlocks struct {
bounds v1.FingerprintBounds
blocks []BlockRef
}

// blockPlan is a plan for all the work needed to build a meta.json
// It includes:
// - the tsdb (source of truth) which contains all the series+chunks
// we need to ensure are indexed in bloom blocks
// - a list of gaps that are out of date and need to be checked+built
// - within each gap, a list of block refs which overlap the gap are included
// so we can use them to accelerate bloom generation. They likely contain many
// of the same chunks we need to ensure are indexed, just from previous tsdb iterations.
// This is a performance optimization to avoid expensive re-reindexing
type blockPlan struct {
tsdb tsdb.Identifier
gaps []gapWithBlocks
}

// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks.
// This allows us to expedite bloom generation by using existing blocks to fill in the gaps
// since many will contain the same chunks.
func blockPlansForGaps(tsdbs []tsdbGaps, metas []Meta) []blockPlan {
plans := make([]blockPlan, 0, len(tsdbs))

for _, idx := range tsdbs {
plan := blockPlan{
tsdb: idx.tsdb,
gaps: make([]gapWithBlocks, 0, len(idx.gaps)),
}

for _, gap := range idx.gaps {
planGap := gapWithBlocks{
bounds: gap,
}

for _, meta := range metas {

if meta.OwnershipRange.Intersection(gap) == nil {
// this meta doesn't overlap the gap, skip
continue
}

for _, block := range meta.Blocks {
if block.OwnershipRange.Intersection(gap) == nil {
// this block doesn't overlap the gap, skip
continue
}
// this block overlaps the gap, add it to the plan
// for this gap
planGap.blocks = append(planGap.blocks, block)
}
}

// ensure we sort blocks so deduping iterator works as expected
sort.Slice(planGap.blocks, func(i, j int) bool {
return planGap.blocks[i].OwnershipRange.Less(planGap.blocks[j].OwnershipRange)
})

peekingBlocks := v1.NewPeekingIter[BlockRef](
v1.NewSliceIter[BlockRef](
planGap.blocks,
),
)
// dedupe blocks which could be in multiple metas
itr := v1.NewDedupingIter[BlockRef, BlockRef](
func(a, b BlockRef) bool {
return a == b
},
v1.Identity[BlockRef],
func(a, _ BlockRef) BlockRef {
return a
},
peekingBlocks,
)

deduped := v1.Collect[BlockRef](itr)
planGap.blocks = deduped

plan.gaps = append(plan.gaps, planGap)
}

plans = append(plans, plan)
}

return plans
}

// Used to signal the gaps that need to be populated for a tsdb
type tsdbGaps struct {
tsdb tsdb.Identifier
gaps []v1.FingerprintBounds
Expand Down
Loading
Loading