Skip to content

Commit

Permalink
dedupes block refs in bloom block planning
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <[email protected]>
  • Loading branch information
owen-d committed Jan 30, 2024
1 parent 60b0a85 commit 8339db0
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 3 deletions.
26 changes: 26 additions & 0 deletions pkg/bloomcompactor/v2controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bloomcompactor
import (
"context"
"fmt"
"sort"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -221,6 +222,31 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []Meta) []blockPlan {
}
}

// ensure we sort blocks so deduping iterator works as expected
sort.Slice(planGap.blocks, func(i, j int) bool {
return planGap.blocks[i].OwnershipRange.Less(planGap.blocks[j].OwnershipRange)
})

peekingBlocks := v1.NewPeekingIter[BlockRef](
v1.NewSliceIter[BlockRef](
planGap.blocks,
),
)
// dedupe blocks which could be in multiple metas
itr := v1.NewDedupingIter[BlockRef, BlockRef](
func(a, b BlockRef) bool {
return a == b
},
v1.Id[BlockRef],
func(a, _ BlockRef) BlockRef {
return a
},
peekingBlocks,
)

deduped := v1.Collect[BlockRef](itr)
planGap.blocks = deduped

plan.gaps = append(plan.gaps, planGap)
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/bloomcompactor/v2controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,36 @@ func Test_blockPlansForGaps(t *testing.T) {
},
},
},
{
desc: "dedupes block refs",
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbId(0)},
metas: []Meta{
genMeta(9, 20, []int{1}, []BlockRef{
genBlockRef(1, 4),
genBlockRef(9, 20),
}), // blocks for first diff tsdb
genMeta(5, 20, []int{2}, []BlockRef{
genBlockRef(5, 10),
genBlockRef(9, 20), // same block references in prior meta (will be deduped)
}), // block for second diff tsdb
},
exp: []blockPlan{
{
tsdb: tsdbId(0),
gaps: []gapWithBlocks{
{
bounds: v1.NewBounds(0, 10),
blocks: []BlockRef{
genBlockRef(1, 4),
genBlockRef(5, 10),
genBlockRef(9, 20),
},
},
},
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
// we reuse the gapsBetweenTSDBsAndMetas function to generate the gaps as this function is tested
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/bloom/v1/bounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ func (b FingerprintBounds) String() string {
return b.Min.String() + "-" + b.Max.String()
}

func (b FingerprintBounds) Less(other FingerprintBounds) bool {
if b.Min != other.Min {
return b.Min < other.Min
}
return b.Max <= other.Max
}

// Cmp returns the fingerprint's position relative to the bounds
func (b FingerprintBounds) Cmp(fp model.Fingerprint) BoundsCheck {
if fp < b.Min {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bloom/v1/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
func(a, b *SeriesWithBloom) bool {
return a.Series.Fingerprint == b.Series.Fingerprint
},
id[*SeriesWithBloom],
Id[*SeriesWithBloom],
func(a, b *SeriesWithBloom) *SeriesWithBloom {
if len(a.Series.Chunks) > len(b.Series.Chunks) {
return a
Expand Down
19 changes: 18 additions & 1 deletion pkg/storage/bloom/v1/dedupe.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type DedupeIter[A, B any] struct {
}

// general helper, in this case created for DedupeIter[T,T]
func id[A any](a A) A { return a }
func Id[A any](a A) A { return a }

func NewDedupingIter[A, B any](
eq func(A, B) bool,
Expand Down Expand Up @@ -52,3 +52,20 @@ func (it *DedupeIter[A, B]) Err() error {
func (it *DedupeIter[A, B]) At() B {
return it.tmp
}

// Collect collects an interator into a slice. It uses
// CollectInto with a new slice
func Collect[T any](itr Iterator[T]) []T {
return CollectInto(itr, nil)
}

// CollectInto collects the elements of an iterator into a provided slice
// which is returned
func CollectInto[T any](itr Iterator[T], into []T) []T {
into = into[:0]

for itr.Next() {
into = append(into, itr.At())
}
return into
}
2 changes: 1 addition & 1 deletion pkg/storage/bloom/v1/dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestMergeDedupeIter(t *testing.T) {
}
deduper := NewDedupingIter[*SeriesWithBloom, *SeriesWithBloom](
eq,
id[*SeriesWithBloom],
Id[*SeriesWithBloom],
merge,
NewPeekingIter[*SeriesWithBloom](mbq),
)
Expand Down

0 comments on commit 8339db0

Please sign in to comment.