Skip to content

Commit

Permalink
bloom plans
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <[email protected]>
  • Loading branch information
owen-d committed Jan 30, 2024
1 parent 2d3314f commit 60b0a85
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 38 deletions.
3 changes: 2 additions & 1 deletion pkg/bloomcompactor/v2_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ func (m Meta) Checksum() (uint32, error) {
}

type TSDBStore interface {
ResolveTSDBs() ([]*tsdb.TSDBFile, error)
ResolveTSDBs() ([]*tsdb.SingleTenantTSDBIdentifier, error)
LoadTSDB(id tsdb.Identifier, bounds v1.FingerprintBounds) (v1.CloseableIterator[*v1.Series], error)
}

type MetaStore interface {
Expand Down
94 changes: 85 additions & 9 deletions pkg/bloomcompactor/v2controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,25 @@ func (s *SimpleBloomController) do(ctx context.Context) error {
}

ids := make([]tsdb.Identifier, 0, len(tsdbs))
for _, idx := range tsdbs {
ids = append(ids, idx.Identifier)
for _, id := range tsdbs {
ids = append(ids, id)
}

// 4. Determine which TSDBs have gaps in the ownership range and need to
// be processed.
work, err := gapsBetweenTSDBsAndMetas(s.ownershipRange, ids, metas)
tsdbsWithGaps, err := gapsBetweenTSDBsAndMetas(s.ownershipRange, ids, metas)
if err != nil {
level.Error(s.logger).Log("msg", "failed to find gaps", "err", err)
return errors.Wrap(err, "failed to find gaps")
}

if len(work) == 0 {
if len(tsdbsWithGaps) == 0 {
level.Debug(s.logger).Log("msg", "blooms exist for all tsdbs")
return nil
}

work := blockPlansForGaps(tsdbsWithGaps, metas)

// 5. Generate Blooms
// Now that we have the gaps, we will generate a bloom block for each gap.
// We can accelerate this by using existing blocks which may already contain
Expand All @@ -102,12 +104,16 @@ func (s *SimpleBloomController) do(ctx context.Context) error {
tsdbCt = len(work)
)

for _, db := range work {
for _, plan := range work {

for _, gap := range db.gaps {
for _, gap := range plan.gaps {
// Fetch blocks that aren't up to date but are in the desired fingerprint range
// to try and accelerate bloom creation
seriesItr, preExistingBlocks := s.get(db.tsdb, gap)
seriesItr, preExistingBlocks, err := s.loadWorkForGap(plan.tsdb, gap)
if err != nil {
level.Error(s.logger).Log("msg", "failed to get series and blocks", "err", err)
return errors.Wrap(err, "failed to get series and blocks")
}

gen := NewSimpleBloomGenerator(
v1.DefaultBlockOptions,
Expand All @@ -116,7 +122,7 @@ func (s *SimpleBloomController) do(ctx context.Context) error {
preExistingBlocks,
s.rwFn,
s.metrics,
log.With(s.logger, "tsdb", db.tsdb.Name(), "ownership", gap, "blocks", len(preExistingBlocks)),
log.With(s.logger, "tsdb", plan.tsdb.Name(), "ownership", gap, "blocks", len(preExistingBlocks)),
)

_, newBlocks, err := gen.Generate(ctx)
Expand Down Expand Up @@ -150,11 +156,81 @@ func (s *SimpleBloomController) do(ctx context.Context) error {

}

func (s *SimpleBloomController) get(tsdb tsdb.Identifier, gap v1.FingerprintBounds) (v1.Iterator[*v1.Series], []*v1.Block) {
func (s *SimpleBloomController) loadWorkForGap(id tsdb.Identifier, gap gapWithBlocks) (v1.CloseableIterator[*v1.Series], []*v1.Block, error) {
// load a series iterator for the gap
_, err := s.tsdbStore.LoadTSDB(id, gap.bounds)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to load tsdb")
}

// TODO(owen-d): finish
panic("not implemented")
}

type gapWithBlocks struct {
bounds v1.FingerprintBounds
blocks []BlockRef
}

// blockPlan is a plan for all the work needed to build a meta.json
// It includes:
// - the tsdb (source of truth) which contains all the series+chunks
// we need to ensure are indexed in bloom blocks
// - a list of gaps that are out of date and need to be checked+built
// - within each gap, a list of block refs which overlap the gap are included
// so we can use them to accelerate bloom generation. They likely contain many
// of the same chunks we need to ensure are indexed, just from previous tsdb iterations.
// This is a performance optimization to avoid expensive re-reindexing
type blockPlan struct {
tsdb tsdb.Identifier
gaps []gapWithBlocks
}

// blockPlansForGaps groups tsdb gaps we wish to fill with overlapping but out of date blocks.
// This allows us to expedite bloom generation by using existing blocks to fill in the gaps
// since many will contain the same chunks.
func blockPlansForGaps(tsdbs []tsdbGaps, metas []Meta) []blockPlan {
plans := make([]blockPlan, 0, len(tsdbs))

for _, idx := range tsdbs {
plan := blockPlan{
tsdb: idx.tsdb,
gaps: make([]gapWithBlocks, 0, len(idx.gaps)),
}

for _, gap := range idx.gaps {
planGap := gapWithBlocks{
bounds: gap,
}

for _, meta := range metas {

if meta.OwnershipRange.Intersection(gap) == nil {
// this meta doesn't overlap the gap, skip
continue
}

for _, block := range meta.Blocks {
if block.OwnershipRange.Intersection(gap) == nil {
// this block doesn't overlap the gap, skip
continue
}
// this block overlaps the gap, add it to the plan
// for this gap
planGap.blocks = append(planGap.blocks, block)
}
}

plan.gaps = append(plan.gaps, planGap)
}

plans = append(plans, plan)
}

return plans
}

// Used to signal the gaps that need to be populated for a tsdb
type tsdbGaps struct {
tsdb tsdb.Identifier
gaps []v1.FingerprintBounds
Expand Down
Loading

0 comments on commit 60b0a85

Please sign in to comment.