From 40a92d49ec3430bf17af239ea70c2b5b733dee79 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 23 Oct 2023 14:47:42 -0700 Subject: [PATCH 01/11] [wip] fusing queries Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/TODO.md | 2 - pkg/storage/bloom/v1/fuse.go | 125 +++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 2 deletions(-) create mode 100644 pkg/storage/bloom/v1/fuse.go diff --git a/pkg/storage/bloom/v1/TODO.md b/pkg/storage/bloom/v1/TODO.md index a6b0885167453..3b7ffd900be96 100644 --- a/pkg/storage/bloom/v1/TODO.md +++ b/pkg/storage/bloom/v1/TODO.md @@ -1,5 +1,3 @@ -* Should be able to read bloom as a []byte without copying it during decoding - * It's immutable + partition offsets are calculable, etc * Less copying! I've taken some shortcuts we'll need to refactor to avoid copying []byte around in a few places * more sophisticated querying methods * queue access to blooms diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go new file mode 100644 index 0000000000000..742b2b41cc78e --- /dev/null +++ b/pkg/storage/bloom/v1/fuse.go @@ -0,0 +1,125 @@ +package v1 + +import ( + "context" + + "github.com/efficientgo/core/errors" + "github.com/prometheus/common/model" +) + +type request struct { + fp model.Fingerprint + chks ChunkRefs + searches [][]byte + response chan<- output +} + +type inputs Iterator[request] + +type CancellableInputsIter struct { + ctx context.Context + Iterator[request] +} + +func (cii *CancellableInputsIter) Next() bool { + select { + case <-cii.ctx.Done(): + return false + default: + return cii.Iterator.Next() + } +} + +// output represents a chunk that failed to pass all searches +// and must be downloaded +type output struct { + fp model.Fingerprint + chks ChunkRefs +} + +// Fuse combines multiple requests into a single loop iteration +// over the data set and returns the corresponding outputs +func (bq *BlockQuerier) Fuse(inputs []inputs) *FusedQuerier { + return NewFusedQuerier(bq, inputs) +} + +type FusedQuerier struct { + bq *BlockQuerier + inputs []inputs +} + +func NewFusedQuerier(bq *BlockQuerier, inputs []inputs) *FusedQuerier { + return &FusedQuerier{ + bq: bq, + inputs: inputs, + } +} + +// returns a batch of inputs for the next fingerprint +func (fq *FusedQuerier) nextFP() ([]request, error) { + return nil, nil +} + +func (fq *FusedQuerier) Run() error { + for { + // find all queries for the next relevant fingerprint + nextBatch, err := fq.nextFP() + if err != nil { + return errors.Wrap(err, "getting next fingerprint") + } + + if len(nextBatch) == 0 { + return nil + } + + fp := nextBatch[0].fp + + // advance the series iterator to the next fingerprint + if err := fq.bq.Seek(fp); err != nil { + return errors.Wrap(err, "seeking to fingerprint") + } + + if !fq.bq.series.Next() { + // TODO(owen-d): fingerprint not found, can't remove chunks + } + + series := fq.bq.series.At() + if series.Fingerprint != fp { + // TODO(owen-d): fingerprint not found, can't remove chunks + } + + // Now that we've found the series, we need to find the unpack the bloom + fq.bq.blooms.Seek(series.Offset) + if !fq.bq.blooms.Next() { + // TODO(owen-d): fingerprint not found, can't remove chunks + } + + bloom := fq.bq.blooms.At() + // test every input against this chunk + for _, input := range nextBatch { + mustCheck, inBlooms := input.chks.Compare(series.Chunks, true) + + outer: + for _, chk := range inBlooms { + for _, search := range input.searches { + // TODO(owen-d): meld chunk + search into a single byte slice from the block schema + var combined = search + + if !bloom.ScalableBloomFilter.Test(combined) { + continue outer + } + } + // chunk passed all searches, add to the list of chunks to download + mustCheck = append(mustCheck, chk) + + } + + input.response <- output{ + fp: fp, + chks: mustCheck, + } + } + + } + +} From 3920e006686172d9f2f06096667d8dc3ed122aff Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 23 Oct 2023 15:19:23 -0700 Subject: [PATCH 02/11] generic heap iter Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/builder.go | 2 +- pkg/storage/bloom/v1/dedupe_test.go | 2 +- pkg/storage/bloom/v1/merge.go | 78 +++++++++++++++++------------ pkg/storage/bloom/v1/merge_test.go | 4 +- 4 files changed, 49 insertions(+), 37 deletions(-) diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 76ff08f3860e8..03715ca61e0bf 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -501,7 +501,7 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) error { } // Turn the list of blocks into a single iterator that returns the next series - mergedBlocks := NewPeekingIter[*SeriesWithBloom](NewMergeBlockQuerier(xs...)) + mergedBlocks := NewPeekingIter[*SeriesWithBloom](NewHeapIterForSeriesWithBloom(xs...)) // two overlapping blocks can conceivably have the same series, so we need to dedupe, // preferring the one with the most chunks already indexed since we'll have // to add fewer chunks to the bloom diff --git a/pkg/storage/bloom/v1/dedupe_test.go b/pkg/storage/bloom/v1/dedupe_test.go index 08e0ea2f85a19..b6d43333ce701 100644 --- a/pkg/storage/bloom/v1/dedupe_test.go +++ b/pkg/storage/bloom/v1/dedupe_test.go @@ -19,7 +19,7 @@ func TestMergeDedupeIter(t *testing.T) { queriers[i] = NewPeekingIter[*SeriesWithBloom](NewSliceIter[*SeriesWithBloom](dataPtr)) } - mbq := NewMergeBlockQuerier(queriers...) + mbq := NewHeapIterForSeriesWithBloom(queriers...) eq := func(a, b *SeriesWithBloom) bool { return a.Series.Fingerprint == b.Series.Fingerprint } diff --git a/pkg/storage/bloom/v1/merge.go b/pkg/storage/bloom/v1/merge.go index 597d824db9b0d..981582f1234cd 100644 --- a/pkg/storage/bloom/v1/merge.go +++ b/pkg/storage/bloom/v1/merge.go @@ -1,70 +1,82 @@ package v1 -// MergeBlockQuerier is a heap implementation of BlockQuerier backed by multiple blocks +// HeapIterator is a heap implementation of BlockQuerier backed by multiple blocks // It is used to merge multiple blocks into a single ordered querier // NB(owen-d): it uses a custom heap implementation because Pop() only returns a single // value of the top-most iterator, rather than the iterator itself -type MergeBlockQuerier struct { - qs []PeekingIterator[*SeriesWithBloom] +type HeapIterator[T any] struct { + itrs []PeekingIterator[T] + less func(T, T) bool - cache *SeriesWithBloom + zero T // zero value of T + cache T ok bool } -func NewMergeBlockQuerier(queriers ...PeekingIterator[*SeriesWithBloom]) *MergeBlockQuerier { - res := &MergeBlockQuerier{ - qs: queriers, +func NewHeapIterForSeriesWithBloom(queriers ...PeekingIterator[*SeriesWithBloom]) *HeapIterator[*SeriesWithBloom] { + return NewHeapIterator( + func(a, b *SeriesWithBloom) bool { + return a.Series.Fingerprint < b.Series.Fingerprint + }, + queriers..., + ) +} + +func NewHeapIterator[T any](less func(T, T) bool, itrs ...PeekingIterator[T]) *HeapIterator[T] { + res := &HeapIterator[T]{ + itrs: itrs, + less: less, } res.init() return res } -func (mbq MergeBlockQuerier) Len() int { - return len(mbq.qs) +func (mbq HeapIterator[T]) Len() int { + return len(mbq.itrs) } -func (mbq *MergeBlockQuerier) Less(i, j int) bool { - a, aOk := mbq.qs[i].Peek() - b, bOk := mbq.qs[j].Peek() +func (mbq *HeapIterator[T]) Less(i, j int) bool { + a, aOk := mbq.itrs[i].Peek() + b, bOk := mbq.itrs[j].Peek() if !bOk { return true } if !aOk { return false } - return a.Series.Fingerprint < b.Series.Fingerprint + return mbq.less(a, b) } -func (mbq *MergeBlockQuerier) Swap(a, b int) { - mbq.qs[a], mbq.qs[b] = mbq.qs[b], mbq.qs[a] +func (mbq *HeapIterator[T]) Swap(a, b int) { + mbq.itrs[a], mbq.itrs[b] = mbq.itrs[b], mbq.itrs[a] } -func (mbq *MergeBlockQuerier) Next() bool { - mbq.cache = mbq.pop() - return mbq.cache != nil +func (mbq *HeapIterator[T]) Next() (ok bool) { + mbq.cache, ok = mbq.pop() + return } // TODO(owen-d): don't swallow this error -func (mbq *MergeBlockQuerier) Err() error { +func (mbq *HeapIterator[T]) Err() error { return nil } -func (mbq *MergeBlockQuerier) At() *SeriesWithBloom { +func (mbq *HeapIterator[T]) At() T { return mbq.cache } -func (mbq *MergeBlockQuerier) push(x PeekingIterator[*SeriesWithBloom]) { - mbq.qs = append(mbq.qs, x) +func (mbq *HeapIterator[T]) push(x PeekingIterator[T]) { + mbq.itrs = append(mbq.itrs, x) mbq.up(mbq.Len() - 1) } -func (mbq *MergeBlockQuerier) pop() *SeriesWithBloom { +func (mbq *HeapIterator[T]) pop() (T, bool) { for { if mbq.Len() == 0 { - return nil + return mbq.zero, false } - cur := mbq.qs[0] + cur := mbq.itrs[0] if ok := cur.Next(); !ok { mbq.remove(0) continue @@ -80,14 +92,14 @@ func (mbq *MergeBlockQuerier) pop() *SeriesWithBloom { _ = mbq.down(0) } - return result + return result, true } } -func (mbq *MergeBlockQuerier) remove(idx int) { +func (mbq *HeapIterator[T]) remove(idx int) { mbq.Swap(idx, mbq.Len()-1) - mbq.qs[len(mbq.qs)-1] = nil // don't leak reference - mbq.qs = mbq.qs[:mbq.Len()-1] + mbq.itrs[len(mbq.itrs)-1] = nil // don't leak reference + mbq.itrs = mbq.itrs[:mbq.Len()-1] mbq.fix(idx) } @@ -95,13 +107,13 @@ func (mbq *MergeBlockQuerier) remove(idx int) { // Changing the value of the element at index i and then calling fix is equivalent to, // but less expensive than, calling Remove(h, i) followed by a Push of the new value. // The complexity is O(log n) where n = h.Len(). -func (mbq *MergeBlockQuerier) fix(i int) { +func (mbq *HeapIterator[T]) fix(i int) { if !mbq.down(i) { mbq.up(i) } } -func (mbq *MergeBlockQuerier) up(j int) { +func (mbq *HeapIterator[T]) up(j int) { for { i := (j - 1) / 2 // parent if i == j || !mbq.Less(j, i) { @@ -112,7 +124,7 @@ func (mbq *MergeBlockQuerier) up(j int) { } } -func (mbq *MergeBlockQuerier) down(i0 int) (moved bool) { +func (mbq *HeapIterator[T]) down(i0 int) (moved bool) { i := i0 n := mbq.Len() for { @@ -135,7 +147,7 @@ func (mbq *MergeBlockQuerier) down(i0 int) (moved bool) { } // establish heap invariants. O(n) -func (mbq *MergeBlockQuerier) init() { +func (mbq *HeapIterator[T]) init() { n := mbq.Len() for i := n/2 - 1; i >= 0; i-- { _ = mbq.down(i) diff --git a/pkg/storage/bloom/v1/merge_test.go b/pkg/storage/bloom/v1/merge_test.go index 3ca089a0db62f..c1600f00dcbe1 100644 --- a/pkg/storage/bloom/v1/merge_test.go +++ b/pkg/storage/bloom/v1/merge_test.go @@ -22,7 +22,7 @@ func TestMergeBlockQuerier_NonOverlapping(t *testing.T) { queriers = append(queriers, NewPeekingIter[*SeriesWithBloom](NewSliceIter[*SeriesWithBloom](ptrs))) } - mbq := NewMergeBlockQuerier(queriers...) + mbq := NewHeapIterForSeriesWithBloom(queriers...) for i := 0; i < numSeries; i++ { require.True(t, mbq.Next()) @@ -49,7 +49,7 @@ func TestMergeBlockQuerier_Overlapping(t *testing.T) { queriers = append(queriers, NewPeekingIter[*SeriesWithBloom](NewSliceIter[*SeriesWithBloom](slices[i]))) } - mbq := NewMergeBlockQuerier(queriers...) + mbq := NewHeapIterForSeriesWithBloom(queriers...) for i := 0; i < numSeries; i++ { require.True(t, mbq.Next()) From 2ef4452a2cecc6ea65c03c35bca77671b7801fbb Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 23 Oct 2023 15:46:58 -0700 Subject: [PATCH 03/11] mult-type DedupeIter Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/builder.go | 1 + pkg/storage/bloom/v1/dedupe.go | 55 ++++++++++++++--------------- pkg/storage/bloom/v1/dedupe_test.go | 3 +- pkg/storage/bloom/v1/fuse.go | 11 +++--- 4 files changed, 35 insertions(+), 35 deletions(-) diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 03715ca61e0bf..9b6070493b662 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -509,6 +509,7 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) error { func(a, b *SeriesWithBloom) bool { return a.Series.Fingerprint == b.Series.Fingerprint }, + id[*SeriesWithBloom], func(a, b *SeriesWithBloom) *SeriesWithBloom { if len(a.Series.Chunks) > len(b.Series.Chunks) { return a diff --git a/pkg/storage/bloom/v1/dedupe.go b/pkg/storage/bloom/v1/dedupe.go index 759de7e686637..a322d8b4b2ef2 100644 --- a/pkg/storage/bloom/v1/dedupe.go +++ b/pkg/storage/bloom/v1/dedupe.go @@ -1,55 +1,54 @@ package v1 -// DedupeIter is a deduplicating iterator that merges adjacent elements -// It's intended to be used when merging multiple blocks, -// each of which may contain the same fingerprints -type DedupeIter[T any] struct { - eq func(T, T) bool - merge func(T, T) T - itr PeekingIterator[T] - - tmp []T +// DedupeIter is a deduplicating iterator which creates an Iterator[B] +// from a sequence of Iterator[A]. +type DedupeIter[A, B any] struct { + eq func(A, B) bool // equality check + from func(A) B // convert A to B, used on first element + merge func(A, B) B // merge A into B + itr PeekingIterator[A] + + tmp B } -func NewDedupingIter[T any]( - eq func(T, T) bool, - merge func(T, T) T, - itr PeekingIterator[T], -) *DedupeIter[T] { - return &DedupeIter[T]{ +// general helper, in this case created for DedupeIter[T,T] +func id[A any](a A) A { return a } + +func NewDedupingIter[A, B any]( + eq func(A, B) bool, + from func(A) B, + merge func(A, B) B, + itr PeekingIterator[A], +) *DedupeIter[A, B] { + return &DedupeIter[A, B]{ eq: eq, + from: from, merge: merge, itr: itr, } } -func (it *DedupeIter[T]) Next() bool { - it.tmp = it.tmp[:0] +func (it *DedupeIter[A, B]) Next() bool { if !it.itr.Next() { return false } - it.tmp = append(it.tmp, it.itr.At()) + it.tmp = it.from(it.itr.At()) for { next, ok := it.itr.Peek() - if !ok || !it.eq(it.tmp[0], next) { + if !ok || !it.eq(next, it.tmp) { break } it.itr.Next() // ensured via peek - it.tmp = append(it.tmp, it.itr.At()) - } - - // merge all the elements in tmp - for i := len(it.tmp) - 1; i > 0; i-- { - it.tmp[i-1] = it.merge(it.tmp[i-1], it.tmp[i]) + it.tmp = it.merge(next, it.tmp) } return true } -func (it *DedupeIter[T]) Err() error { +func (it *DedupeIter[A, B]) Err() error { return it.itr.Err() } -func (it *DedupeIter[T]) At() T { - return it.tmp[0] +func (it *DedupeIter[A, B]) At() B { + return it.tmp } diff --git a/pkg/storage/bloom/v1/dedupe_test.go b/pkg/storage/bloom/v1/dedupe_test.go index b6d43333ce701..f30578c13cd1b 100644 --- a/pkg/storage/bloom/v1/dedupe_test.go +++ b/pkg/storage/bloom/v1/dedupe_test.go @@ -26,8 +26,9 @@ func TestMergeDedupeIter(t *testing.T) { merge := func(a, _ *SeriesWithBloom) *SeriesWithBloom { return a } - deduper := NewDedupingIter[*SeriesWithBloom]( + deduper := NewDedupingIter[*SeriesWithBloom, *SeriesWithBloom]( eq, + id[*SeriesWithBloom], merge, NewPeekingIter[*SeriesWithBloom](mbq), ) diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go index 742b2b41cc78e..4c4c50a882d64 100644 --- a/pkg/storage/bloom/v1/fuse.go +++ b/pkg/storage/bloom/v1/fuse.go @@ -14,8 +14,6 @@ type request struct { response chan<- output } -type inputs Iterator[request] - type CancellableInputsIter struct { ctx context.Context Iterator[request] @@ -39,19 +37,20 @@ type output struct { // Fuse combines multiple requests into a single loop iteration // over the data set and returns the corresponding outputs -func (bq *BlockQuerier) Fuse(inputs []inputs) *FusedQuerier { +func (bq *BlockQuerier) Fuse(inputs []Iterator[request]) *FusedQuerier { return NewFusedQuerier(bq, inputs) } type FusedQuerier struct { bq *BlockQuerier - inputs []inputs + inputs Iterator[[]request] } -func NewFusedQuerier(bq *BlockQuerier, inputs []inputs) *FusedQuerier { +func NewFusedQuerier(bq *BlockQuerier, inputs []Iterator[request]) *FusedQuerier { + // heap := NewHeapIterator[request]() return &FusedQuerier{ bq: bq, - inputs: inputs, + inputs: nil, } } From 0e2bd5adfdf57c95012ab6c06b28d46119b7ff20 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 25 Oct 2023 10:19:36 -0700 Subject: [PATCH 04/11] [wip] fused querier logic Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/TODO.md | 4 +++ pkg/storage/bloom/v1/fuse.go | 51 ++++++++++++++++++++++-------------- 2 files changed, 35 insertions(+), 20 deletions(-) diff --git a/pkg/storage/bloom/v1/TODO.md b/pkg/storage/bloom/v1/TODO.md index 3b7ffd900be96..a6c9dc6f19c90 100644 --- a/pkg/storage/bloom/v1/TODO.md +++ b/pkg/storage/bloom/v1/TODO.md @@ -3,6 +3,10 @@ * queue access to blooms * multiplex reads across blooms * Queueing system for bloom access +* bloom hierarchies (bloom per block, etc). Test a tree of blooms down the to individual series/chunk +* memoize hashing & bucket lookups during queries +* encode bloom parameters in block + # merge querier for different blocks * how to merge two block queriers with the same fp diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go index 4c4c50a882d64..1f32e95969c58 100644 --- a/pkg/storage/bloom/v1/fuse.go +++ b/pkg/storage/bloom/v1/fuse.go @@ -37,7 +37,7 @@ type output struct { // Fuse combines multiple requests into a single loop iteration // over the data set and returns the corresponding outputs -func (bq *BlockQuerier) Fuse(inputs []Iterator[request]) *FusedQuerier { +func (bq *BlockQuerier) Fuse(inputs []PeekingIterator[request]) *FusedQuerier { return NewFusedQuerier(bq, inputs) } @@ -46,30 +46,34 @@ type FusedQuerier struct { inputs Iterator[[]request] } -func NewFusedQuerier(bq *BlockQuerier, inputs []Iterator[request]) *FusedQuerier { - // heap := NewHeapIterator[request]() +func NewFusedQuerier(bq *BlockQuerier, inputs []PeekingIterator[request]) *FusedQuerier { + heap := NewHeapIterator[request]( + func(a, b request) bool { + return a.fp < b.fp + }, + inputs..., + ) + + merging := NewDedupingIter[request, []request]( + func(a request, b []request) bool { + return a.fp == b[0].fp + }, + func(a request) []request { return []request{a} }, + func(a request, b []request) []request { + return append(b, a) + }, + NewPeekingIter[request](heap), + ) return &FusedQuerier{ bq: bq, - inputs: nil, + inputs: merging, } } -// returns a batch of inputs for the next fingerprint -func (fq *FusedQuerier) nextFP() ([]request, error) { - return nil, nil -} - func (fq *FusedQuerier) Run() error { - for { + for fq.inputs.Next() { // find all queries for the next relevant fingerprint - nextBatch, err := fq.nextFP() - if err != nil { - return errors.Wrap(err, "getting next fingerprint") - } - - if len(nextBatch) == 0 { - return nil - } + nextBatch := fq.inputs.At() fp := nextBatch[0].fp @@ -79,12 +83,19 @@ func (fq *FusedQuerier) Run() error { } if !fq.bq.series.Next() { - // TODO(owen-d): fingerprint not found, can't remove chunks + // no more series, we're done since we're iterating desired fingerprints in order + return nil } series := fq.bq.series.At() if series.Fingerprint != fp { - // TODO(owen-d): fingerprint not found, can't remove chunks + // fingerprint not found, can't remove chunks + for _, input := range nextBatch { + input.response <- output{ + fp: fp, + chks: input.chks, + } + } } // Now that we've found the series, we need to find the unpack the bloom From 26378aaf48483738618ee62839989a20d7a4d0e6 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 30 Oct 2023 16:19:19 -0700 Subject: [PATCH 05/11] tooling & testware for merge builder Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/TODO.md | 5 +- pkg/storage/bloom/v1/builder.go | 22 ++++--- pkg/storage/bloom/v1/builder_test.go | 95 ++++++++++++++++++++++++++++ pkg/storage/bloom/v1/fuse.go | 1 + pkg/storage/bloom/v1/util.go | 13 ++++ 5 files changed, 127 insertions(+), 9 deletions(-) diff --git a/pkg/storage/bloom/v1/TODO.md b/pkg/storage/bloom/v1/TODO.md index a6c9dc6f19c90..8a6845d47f734 100644 --- a/pkg/storage/bloom/v1/TODO.md +++ b/pkg/storage/bloom/v1/TODO.md @@ -5,7 +5,10 @@ * Queueing system for bloom access * bloom hierarchies (bloom per block, etc). Test a tree of blooms down the to individual series/chunk * memoize hashing & bucket lookups during queries -* encode bloom parameters in block +* versioning + * so we can change implementations + * encode bloom parameters in block: sbf params, hashing strategy, tokenizer +* caching # merge querier for different blocks diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 9b6070493b662..bfd6d0a25483e 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -473,14 +473,18 @@ func SortBlocksIntoOverlappingGroups(xs []*Block) (groups [][]*Block) { // from a list of blocks and a store of series. type MergeBuilder struct { // existing blocks - blocks []*Block + blocks []PeekingIterator[*SeriesWithBloom] // store store Iterator[*Series] // Add chunks to a bloom populate func(*Series, *Bloom) error } -func NewMergeBuilder(blocks []*Block, store Iterator[*Series], populate func(*Series, *Bloom) error) *MergeBuilder { +// NewMergeBuilder is a specific builder which does the following: +// 1. merges multiple blocks into a single ordered querier, +// i) When two blocks have the same series, it will prefer the one with the most chunks already indexed +// 2. iterates through the store, adding chunks to the relevant blooms via the `populate` argument +func NewMergeBuilder(blocks []PeekingIterator[*SeriesWithBloom], store Iterator[*Series], populate func(*Series, *Bloom) error) *MergeBuilder { return &MergeBuilder{ blocks: blocks, store: store, @@ -492,16 +496,11 @@ func NewMergeBuilder(blocks []*Block, store Iterator[*Series], populate func(*Se // but this gives us a good starting point. func (mb *MergeBuilder) Build(builder *BlockBuilder) error { var ( - xs = make([]PeekingIterator[*SeriesWithBloom], 0, len(mb.blocks)) nextInBlocks *SeriesWithBloom ) - for _, block := range mb.blocks { - xs = append(xs, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(block))) - } - // Turn the list of blocks into a single iterator that returns the next series - mergedBlocks := NewPeekingIter[*SeriesWithBloom](NewHeapIterForSeriesWithBloom(xs...)) + mergedBlocks := NewPeekingIter[*SeriesWithBloom](NewHeapIterForSeriesWithBloom(mb.blocks...)) // two overlapping blocks can conceivably have the same series, so we need to dedupe, // preferring the one with the most chunks already indexed since we'll have // to add fewer chunks to the bloom @@ -568,5 +567,12 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) error { return errors.Wrap(err, "adding series to block") } } + + if err := builder.blooms.Close(); err != nil { + return errors.Wrap(err, "closing bloom file") + } + if err := builder.index.Close(); err != nil { + return errors.Wrap(err, "closing series file") + } return nil } diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index ede515125780e..78f4e98b9972f 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -2,6 +2,7 @@ package v1 import ( "bytes" + "errors" "fmt" "testing" @@ -47,6 +48,17 @@ func mkBasicSeriesWithBlooms(nSeries, keysPerSeries int, fromFp, throughFp model return } +func EqualIterators[T any](t *testing.T, test func(a, b T), expected, actual Iterator[T]) { + for expected.Next() { + require.True(t, actual.Next()) + a, b := expected.At(), actual.At() + test(a, b) + } + require.False(t, actual.Next()) + require.Nil(t, expected.Err()) + require.Nil(t, actual.Err()) +} + func TestBlockBuilderRoundTrip(t *testing.T) { numSeries := 100 numKeysPerSeries := 10000 @@ -124,3 +136,86 @@ func TestBlockBuilderRoundTrip(t *testing.T) { }) } } + +func TestMergeBuilder(t *testing.T) { + + nBlocks := 10 + numSeries := 100 + numKeysPerSeries := 100 + blocks := make([]PeekingIterator[*SeriesWithBloom], 0, nBlocks) + data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) + blockOpts := BlockOptions{ + schema: Schema{ + version: DefaultSchemaVersion, + encoding: chunkenc.EncSnappy, + }, + SeriesPageSize: 100, + BloomPageSize: 10 << 10, + } + + // Build a list of blocks containing overlapping & duplicated parts of the dataset + for i := 0; i < nBlocks; i++ { + // references for linking in memory reader+writer + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + + min := i * numSeries / nBlocks + max := (i + 2) * numSeries / nBlocks // allow some overlap + if max > len(data) { + max = len(data) + } + + writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) + reader := NewByteReader(indexBuf, bloomsBuf) + + builder, err := NewBlockBuilder( + blockOpts, + writer, + ) + + require.Nil(t, err) + itr := NewSliceIter[SeriesWithBloom](data[min:max]) + require.Nil(t, builder.BuildFrom(itr)) + blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader)))) + } + + // We're not testing the ability to extend a bloom in this test + pop := func(_ *Series, _ *Bloom) error { + return errors.New("not implemented") + } + + // storage should contain references to all the series we ingested, + // regardless of block allocation/overlap. + storeItr := NewMapIter[SeriesWithBloom, *Series]( + NewSliceIter[SeriesWithBloom](data), + func(swb SeriesWithBloom) *Series { + return swb.Series + }, + ) + + // Ensure that the merge builder combines all the blocks correctly + mergeBuilder := NewMergeBuilder(blocks, storeItr, pop) + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) + reader := NewByteReader(indexBuf, bloomsBuf) + + builder, err := NewBlockBuilder( + blockOpts, + writer, + ) + require.Nil(t, err) + + require.Nil(t, mergeBuilder.Build(builder)) + block := NewBlock(reader) + querier := NewBlockQuerier(block) + + EqualIterators[*SeriesWithBloom]( + t, + func(a, b *SeriesWithBloom) { + require.Equal(t, a.Series, b.Series, "expected %+v, got %+v", a, b) + }, + NewSliceIter[*SeriesWithBloom](PointerSlice(data)), + querier, + ) +} diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go index 1f32e95969c58..db4631f28264f 100644 --- a/pkg/storage/bloom/v1/fuse.go +++ b/pkg/storage/bloom/v1/fuse.go @@ -132,4 +132,5 @@ func (fq *FusedQuerier) Run() error { } + return nil } diff --git a/pkg/storage/bloom/v1/util.go b/pkg/storage/bloom/v1/util.go index b27de6f6e7884..6120fc6eecb00 100644 --- a/pkg/storage/bloom/v1/util.go +++ b/pkg/storage/bloom/v1/util.go @@ -167,6 +167,19 @@ func (it *SliceIter[T]) At() T { return it.xs[it.cur] } +type MapIter[A any, B any] struct { + Iterator[A] + f func(A) B +} + +func NewMapIter[A any, B any](src Iterator[A], f func(A) B) *MapIter[A, B] { + return &MapIter[A, B]{Iterator: src, f: f} +} + +func (it *MapIter[A, B]) At() B { + return it.f(it.Iterator.At()) +} + type EmptyIter[T any] struct { zero T } From 2d348c593c0cd3f207434bf1a20023ba50812646 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 30 Oct 2023 17:04:00 -0700 Subject: [PATCH 06/11] fuse testing Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/fuse.go | 19 +------ pkg/storage/bloom/v1/fuse_test.go | 93 +++++++++++++++++++++++++++++++ pkg/storage/bloom/v1/util.go | 19 +++++++ 3 files changed, 114 insertions(+), 17 deletions(-) create mode 100644 pkg/storage/bloom/v1/fuse_test.go diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go index db4631f28264f..266104940c6ce 100644 --- a/pkg/storage/bloom/v1/fuse.go +++ b/pkg/storage/bloom/v1/fuse.go @@ -1,8 +1,6 @@ package v1 import ( - "context" - "github.com/efficientgo/core/errors" "github.com/prometheus/common/model" ) @@ -11,21 +9,7 @@ type request struct { fp model.Fingerprint chks ChunkRefs searches [][]byte - response chan<- output -} - -type CancellableInputsIter struct { - ctx context.Context - Iterator[request] -} - -func (cii *CancellableInputsIter) Next() bool { - select { - case <-cii.ctx.Done(): - return false - default: - return cii.Iterator.Next() - } + response chan output } // output represents a chunk that failed to pass all searches @@ -37,6 +21,7 @@ type output struct { // Fuse combines multiple requests into a single loop iteration // over the data set and returns the corresponding outputs +// TODO(owen-d): better async control func (bq *BlockQuerier) Fuse(inputs []PeekingIterator[request]) *FusedQuerier { return NewFusedQuerier(bq, inputs) } diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go new file mode 100644 index 0000000000000..1cdcd457aa81b --- /dev/null +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -0,0 +1,93 @@ +package v1 + +import ( + "bytes" + "context" + "testing" + + "github.com/grafana/dskit/concurrency" + "github.com/grafana/loki/pkg/chunkenc" + "github.com/stretchr/testify/require" +) + +func TestFusedQuerier(t *testing.T) { + // references for linking in memory reader+writer + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) + reader := NewByteReader(indexBuf, bloomsBuf) + numSeries := 100 + numKeysPerSeries := 10000 + data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) + + builder, err := NewBlockBuilder( + BlockOptions{ + schema: Schema{ + version: DefaultSchemaVersion, + encoding: chunkenc.EncSnappy, + }, + SeriesPageSize: 100, + BloomPageSize: 10 << 10, + }, + writer, + ) + require.Nil(t, err) + itr := NewSliceIter[SeriesWithBloom](data) + require.Nil(t, builder.BuildFrom(itr)) + block := NewBlock(reader) + querier := NewBlockQuerier(block) + + nReqs := 10 + var inputs [][]request + for i := 0; i < nReqs; i++ { + ch := make(chan output) + var reqs []request + // find 2 series for each + for j := 0; j < 2; j++ { + idx := numSeries/nReqs*i + j + reqs = append(reqs, request{ + fp: data[idx].Series.Fingerprint, + chks: data[idx].Series.Chunks, + response: ch, + }) + } + inputs = append(inputs, reqs) + } + + var itrs []PeekingIterator[request] + for _, reqs := range inputs { + itrs = append(itrs, NewPeekingIter[request](NewSliceIter[request](reqs))) + } + + resps := make([][]output, nReqs) + go func() { + concurrency.ForEachJob( + context.Background(), + len(resps), + len(resps), + func(_ context.Context, i int) error { + for v := range inputs[i][0].response { + resps[i] = append(resps[i], v) + } + return nil + }, + ) + }() + + fused := querier.Fuse(itrs) + require.Nil(t, fused.Run()) + + for i, input := range inputs { + for j, req := range input { + resp := resps[i][j] + require.Equal( + t, + output{ + fp: req.fp, + chks: req.chks, + }, + resp, + ) + } + } +} diff --git a/pkg/storage/bloom/v1/util.go b/pkg/storage/bloom/v1/util.go index 6120fc6eecb00..e764cba5c6197 100644 --- a/pkg/storage/bloom/v1/util.go +++ b/pkg/storage/bloom/v1/util.go @@ -1,6 +1,7 @@ package v1 import ( + "context" "hash" "hash/crc32" "io" @@ -203,6 +204,24 @@ func NewEmptyIter[T any](zero T) *EmptyIter[T] { return &EmptyIter[T]{zero: zero} } +type CancellableIter[T any] struct { + ctx context.Context + Iterator[T] +} + +func (cii *CancellableIter[T]) Next() bool { + select { + case <-cii.ctx.Done(): + return false + default: + return cii.Iterator.Next() + } +} + +func NewCancelableIter[T any](ctx context.Context, itr Iterator[T]) *CancellableIter[T] { + return &CancellableIter[T]{ctx: ctx, Iterator: itr} +} + type NoopCloser struct { io.Writer } From c4cbe12c141f6edf5199d3d4ab9159b00f48e853 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 30 Oct 2023 18:12:14 -0700 Subject: [PATCH 07/11] short circuit fuse when bloom doesnt have keys before checking chunks Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/fuse.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go index 266104940c6ce..363834731eb22 100644 --- a/pkg/storage/bloom/v1/fuse.go +++ b/pkg/storage/bloom/v1/fuse.go @@ -91,17 +91,33 @@ func (fq *FusedQuerier) Run() error { bloom := fq.bq.blooms.At() // test every input against this chunk + inputLoop: for _, input := range nextBatch { mustCheck, inBlooms := input.chks.Compare(series.Chunks, true) - outer: + // First, see if the search passes the series level bloom before checking for chunks individually + for _, search := range input.searches { + if !bloom.Test(search) { + // the entire series bloom didn't pass one of the searches, + // so we can skip checking chunks individually. + // We still return all chunks that are not included in the bloom + // as they may still have the data + input.response <- output{ + fp: fp, + chks: mustCheck, + } + continue inputLoop + } + } + + chunkLoop: for _, chk := range inBlooms { for _, search := range input.searches { // TODO(owen-d): meld chunk + search into a single byte slice from the block schema var combined = search if !bloom.ScalableBloomFilter.Test(combined) { - continue outer + continue chunkLoop } } // chunk passed all searches, add to the list of chunks to download From f741031d15eb580c16a9806200b93d64a8c90b1d Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 30 Oct 2023 18:12:35 -0700 Subject: [PATCH 08/11] benchmark for fuse vs single-pass bloom querying Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/fuse_test.go | 97 +++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index 1cdcd457aa81b..697ce5a6afa3f 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -91,3 +91,100 @@ func TestFusedQuerier(t *testing.T) { } } } + +func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]request) { + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) + reader := NewByteReader(indexBuf, bloomsBuf) + numSeries := 10000 + numKeysPerSeries := 100 + data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffffff, 0, 10000) + + builder, err := NewBlockBuilder( + BlockOptions{ + schema: Schema{ + version: DefaultSchemaVersion, + encoding: chunkenc.EncSnappy, + }, + SeriesPageSize: 256 << 10, // 256k + BloomPageSize: 1 << 20, // 1MB + }, + writer, + ) + require.Nil(b, err) + itr := NewSliceIter[SeriesWithBloom](data) + require.Nil(b, builder.BuildFrom(itr)) + block := NewBlock(reader) + querier := NewBlockQuerier(block) + + numRequestChains := 100 + seriesPerRequest := 100 + var requestChains [][]request + for i := 0; i < numRequestChains; i++ { + var reqs []request + // ensure they use the same channel + ch := make(chan output) + // evenly spread out the series queried within a single request chain + // to mimic series distribution across keyspace + for j := 0; j < seriesPerRequest; j++ { + // add the chain index (i) for a little jitter + idx := numSeries*j/seriesPerRequest + i + if idx >= numSeries { + idx = numSeries - 1 + } + reqs = append(reqs, request{ + fp: data[idx].Series.Fingerprint, + chks: data[idx].Series.Chunks, + response: ch, + }) + } + requestChains = append(requestChains, reqs) + } + + return querier, requestChains +} + +func BenchmarkBlockQuerying(b *testing.B) { + b.StopTimer() + querier, requestChains := setupBlockForBenchmark(b) + // benchmark + b.StartTimer() + + b.Run("single-pass", func(b *testing.B) { + for i := 0; i < b.N; i++ { + for _, chain := range requestChains { + for _, req := range chain { + _, _ = querier.CheckChunksForSeries(req.fp, req.chks, nil) + } + } + } + + }) + b.Run("fused", func(b *testing.B) { + // spin up some goroutines to consume the responses so they don't block + go func() { + concurrency.ForEachJob( + context.Background(), + len(requestChains), len(requestChains), + func(_ context.Context, idx int) error { + for range requestChains[idx][0].response { + } + return nil + }, + ) + }() + + var itrs []PeekingIterator[request] + + for i := 0; i < b.N; i++ { + itrs = itrs[:0] + for _, reqs := range requestChains { + itrs = append(itrs, NewPeekingIter[request](NewSliceIter[request](reqs))) + } + fused := querier.Fuse(itrs) + _ = fused.Run() + } + }) + +} From 96b46c98f5ce1304d8cb3739ff33d8eddbf419f7 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 31 Oct 2023 09:23:04 -0700 Subject: [PATCH 09/11] lint fixes Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/TODO.md | 1 + pkg/storage/bloom/v1/fuse.go | 9 ++++++++- pkg/storage/bloom/v1/fuse_test.go | 8 ++++---- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pkg/storage/bloom/v1/TODO.md b/pkg/storage/bloom/v1/TODO.md index 8a6845d47f734..1ff1c6a8ad06e 100644 --- a/pkg/storage/bloom/v1/TODO.md +++ b/pkg/storage/bloom/v1/TODO.md @@ -9,6 +9,7 @@ * so we can change implementations * encode bloom parameters in block: sbf params, hashing strategy, tokenizer * caching +* ability to download indices without chunks # merge querier for different blocks diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go index 363834731eb22..150c656aca04d 100644 --- a/pkg/storage/bloom/v1/fuse.go +++ b/pkg/storage/bloom/v1/fuse.go @@ -86,7 +86,14 @@ func (fq *FusedQuerier) Run() error { // Now that we've found the series, we need to find the unpack the bloom fq.bq.blooms.Seek(series.Offset) if !fq.bq.blooms.Next() { - // TODO(owen-d): fingerprint not found, can't remove chunks + // fingerprint not found, can't remove chunks + for _, input := range nextBatch { + input.response <- output{ + fp: fp, + chks: input.chks, + } + } + continue } bloom := fq.bq.blooms.At() diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index 697ce5a6afa3f..8f976715196d7 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -61,7 +61,7 @@ func TestFusedQuerier(t *testing.T) { resps := make([][]output, nReqs) go func() { - concurrency.ForEachJob( + require.Nil(t, concurrency.ForEachJob( context.Background(), len(resps), len(resps), @@ -71,7 +71,7 @@ func TestFusedQuerier(t *testing.T) { } return nil }, - ) + )) }() fused := querier.Fuse(itrs) @@ -164,7 +164,7 @@ func BenchmarkBlockQuerying(b *testing.B) { b.Run("fused", func(b *testing.B) { // spin up some goroutines to consume the responses so they don't block go func() { - concurrency.ForEachJob( + require.Nil(b, concurrency.ForEachJob( context.Background(), len(requestChains), len(requestChains), func(_ context.Context, idx int) error { @@ -172,7 +172,7 @@ func BenchmarkBlockQuerying(b *testing.B) { } return nil }, - ) + )) }() var itrs []PeekingIterator[request] From 6e8c8f29fea28f12f35b525f09c4be84f88f85f2 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 31 Oct 2023 09:31:37 -0700 Subject: [PATCH 10/11] make format Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/fuse_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index 8f976715196d7..f24722af6343e 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -6,8 +6,9 @@ import ( "testing" "github.com/grafana/dskit/concurrency" - "github.com/grafana/loki/pkg/chunkenc" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/chunkenc" ) func TestFusedQuerier(t *testing.T) { From c27710c4ba18faaf07df1c441542ce805417aafd Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 31 Oct 2023 10:46:14 -0700 Subject: [PATCH 11/11] waitgroup for test timing Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/fuse_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index f24722af6343e..38554463e9214 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -3,6 +3,7 @@ package v1 import ( "bytes" "context" + "sync" "testing" "github.com/grafana/dskit/concurrency" @@ -61,6 +62,8 @@ func TestFusedQuerier(t *testing.T) { } resps := make([][]output, nReqs) + var g sync.WaitGroup + g.Add(1) go func() { require.Nil(t, concurrency.ForEachJob( context.Background(), @@ -73,10 +76,16 @@ func TestFusedQuerier(t *testing.T) { return nil }, )) + g.Done() }() fused := querier.Fuse(itrs) + require.Nil(t, fused.Run()) + for _, input := range inputs { + close(input[0].response) + } + g.Wait() for i, input := range inputs { for j, req := range input {