Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bloom/fuse queries #11088

Merged
merged 11 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pkg/storage/bloom/v1/TODO.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
* Should be able to read bloom as a []byte without copying it during decoding
* It's immutable + partition offsets are calculable, etc
* Less copying! I've taken some shortcuts we'll need to refactor to avoid copying []byte around in a few places
* more sophisticated querying methods
* queue access to blooms
* multiplex reads across blooms
* Queueing system for bloom access
* bloom hierarchies (bloom per block, etc). Test a tree of blooms down the to individual series/chunk
* memoize hashing & bucket lookups during queries
* versioning
* so we can change implementations
* encode bloom parameters in block: sbf params, hashing strategy, tokenizer
* caching
* ability to download indices without chunks


# merge querier for different blocks
* how to merge two block queriers with the same fp
Expand Down
23 changes: 15 additions & 8 deletions pkg/storage/bloom/v1/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,14 +473,18 @@ func SortBlocksIntoOverlappingGroups(xs []*Block) (groups [][]*Block) {
// from a list of blocks and a store of series.
type MergeBuilder struct {
// existing blocks
blocks []*Block
blocks []PeekingIterator[*SeriesWithBloom]
// store
store Iterator[*Series]
// Add chunks to a bloom
populate func(*Series, *Bloom) error
}

func NewMergeBuilder(blocks []*Block, store Iterator[*Series], populate func(*Series, *Bloom) error) *MergeBuilder {
// NewMergeBuilder is a specific builder which does the following:
// 1. merges multiple blocks into a single ordered querier,
// i) When two blocks have the same series, it will prefer the one with the most chunks already indexed
// 2. iterates through the store, adding chunks to the relevant blooms via the `populate` argument
func NewMergeBuilder(blocks []PeekingIterator[*SeriesWithBloom], store Iterator[*Series], populate func(*Series, *Bloom) error) *MergeBuilder {
return &MergeBuilder{
blocks: blocks,
store: store,
Expand All @@ -492,23 +496,19 @@ func NewMergeBuilder(blocks []*Block, store Iterator[*Series], populate func(*Se
// but this gives us a good starting point.
func (mb *MergeBuilder) Build(builder *BlockBuilder) error {
var (
xs = make([]PeekingIterator[*SeriesWithBloom], 0, len(mb.blocks))
nextInBlocks *SeriesWithBloom
)

for _, block := range mb.blocks {
xs = append(xs, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(block)))
}

// Turn the list of blocks into a single iterator that returns the next series
mergedBlocks := NewPeekingIter[*SeriesWithBloom](NewMergeBlockQuerier(xs...))
mergedBlocks := NewPeekingIter[*SeriesWithBloom](NewHeapIterForSeriesWithBloom(mb.blocks...))
// two overlapping blocks can conceivably have the same series, so we need to dedupe,
// preferring the one with the most chunks already indexed since we'll have
// to add fewer chunks to the bloom
deduped := NewDedupingIter[*SeriesWithBloom](
func(a, b *SeriesWithBloom) bool {
return a.Series.Fingerprint == b.Series.Fingerprint
},
id[*SeriesWithBloom],
func(a, b *SeriesWithBloom) *SeriesWithBloom {
if len(a.Series.Chunks) > len(b.Series.Chunks) {
return a
Expand Down Expand Up @@ -567,5 +567,12 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) error {
return errors.Wrap(err, "adding series to block")
}
}

if err := builder.blooms.Close(); err != nil {
return errors.Wrap(err, "closing bloom file")
}
if err := builder.index.Close(); err != nil {
return errors.Wrap(err, "closing series file")
}
return nil
}
95 changes: 95 additions & 0 deletions pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1

import (
"bytes"
"errors"
"fmt"
"testing"

Expand Down Expand Up @@ -47,6 +48,17 @@ func mkBasicSeriesWithBlooms(nSeries, keysPerSeries int, fromFp, throughFp model
return
}

func EqualIterators[T any](t *testing.T, test func(a, b T), expected, actual Iterator[T]) {
for expected.Next() {
require.True(t, actual.Next())
a, b := expected.At(), actual.At()
test(a, b)
}
require.False(t, actual.Next())
require.Nil(t, expected.Err())
require.Nil(t, actual.Err())
}

func TestBlockBuilderRoundTrip(t *testing.T) {
numSeries := 100
numKeysPerSeries := 10000
Expand Down Expand Up @@ -124,3 +136,86 @@ func TestBlockBuilderRoundTrip(t *testing.T) {
})
}
}

func TestMergeBuilder(t *testing.T) {

nBlocks := 10
numSeries := 100
numKeysPerSeries := 100
blocks := make([]PeekingIterator[*SeriesWithBloom], 0, nBlocks)
data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
blockOpts := BlockOptions{
schema: Schema{
version: DefaultSchemaVersion,
encoding: chunkenc.EncSnappy,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
}

// Build a list of blocks containing overlapping & duplicated parts of the dataset
for i := 0; i < nBlocks; i++ {
// references for linking in memory reader+writer
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)

min := i * numSeries / nBlocks
max := (i + 2) * numSeries / nBlocks // allow some overlap
if max > len(data) {
max = len(data)
}

writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := NewByteReader(indexBuf, bloomsBuf)

builder, err := NewBlockBuilder(
blockOpts,
writer,
)

require.Nil(t, err)
itr := NewSliceIter[SeriesWithBloom](data[min:max])
require.Nil(t, builder.BuildFrom(itr))
blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader))))
}

// We're not testing the ability to extend a bloom in this test
pop := func(_ *Series, _ *Bloom) error {
return errors.New("not implemented")
}

// storage should contain references to all the series we ingested,
// regardless of block allocation/overlap.
storeItr := NewMapIter[SeriesWithBloom, *Series](
NewSliceIter[SeriesWithBloom](data),
func(swb SeriesWithBloom) *Series {
return swb.Series
},
)

// Ensure that the merge builder combines all the blocks correctly
mergeBuilder := NewMergeBuilder(blocks, storeItr, pop)
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := NewByteReader(indexBuf, bloomsBuf)

builder, err := NewBlockBuilder(
blockOpts,
writer,
)
require.Nil(t, err)

require.Nil(t, mergeBuilder.Build(builder))
block := NewBlock(reader)
querier := NewBlockQuerier(block)

EqualIterators[*SeriesWithBloom](
t,
func(a, b *SeriesWithBloom) {
require.Equal(t, a.Series, b.Series, "expected %+v, got %+v", a, b)
},
NewSliceIter[*SeriesWithBloom](PointerSlice(data)),
querier,
)
}
55 changes: 27 additions & 28 deletions pkg/storage/bloom/v1/dedupe.go
Original file line number Diff line number Diff line change
@@ -1,55 +1,54 @@
package v1

// DedupeIter is a deduplicating iterator that merges adjacent elements
// It's intended to be used when merging multiple blocks,
// each of which may contain the same fingerprints
type DedupeIter[T any] struct {
eq func(T, T) bool
merge func(T, T) T
itr PeekingIterator[T]

tmp []T
// DedupeIter is a deduplicating iterator which creates an Iterator[B]
// from a sequence of Iterator[A].
type DedupeIter[A, B any] struct {
eq func(A, B) bool // equality check
from func(A) B // convert A to B, used on first element
merge func(A, B) B // merge A into B
itr PeekingIterator[A]

tmp B
}

func NewDedupingIter[T any](
eq func(T, T) bool,
merge func(T, T) T,
itr PeekingIterator[T],
) *DedupeIter[T] {
return &DedupeIter[T]{
// general helper, in this case created for DedupeIter[T,T]
func id[A any](a A) A { return a }

func NewDedupingIter[A, B any](
eq func(A, B) bool,
from func(A) B,
merge func(A, B) B,
itr PeekingIterator[A],
) *DedupeIter[A, B] {
return &DedupeIter[A, B]{
eq: eq,
from: from,
merge: merge,
itr: itr,
}
}

func (it *DedupeIter[T]) Next() bool {
it.tmp = it.tmp[:0]
func (it *DedupeIter[A, B]) Next() bool {
if !it.itr.Next() {
return false
}
it.tmp = append(it.tmp, it.itr.At())
it.tmp = it.from(it.itr.At())
for {
next, ok := it.itr.Peek()
if !ok || !it.eq(it.tmp[0], next) {
if !ok || !it.eq(next, it.tmp) {
break
}

it.itr.Next() // ensured via peek
it.tmp = append(it.tmp, it.itr.At())
}

// merge all the elements in tmp
for i := len(it.tmp) - 1; i > 0; i-- {
it.tmp[i-1] = it.merge(it.tmp[i-1], it.tmp[i])
it.tmp = it.merge(next, it.tmp)
}
return true
}

func (it *DedupeIter[T]) Err() error {
func (it *DedupeIter[A, B]) Err() error {
return it.itr.Err()
}

func (it *DedupeIter[T]) At() T {
return it.tmp[0]
func (it *DedupeIter[A, B]) At() B {
return it.tmp
}
5 changes: 3 additions & 2 deletions pkg/storage/bloom/v1/dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ func TestMergeDedupeIter(t *testing.T) {
queriers[i] = NewPeekingIter[*SeriesWithBloom](NewSliceIter[*SeriesWithBloom](dataPtr))
}

mbq := NewMergeBlockQuerier(queriers...)
mbq := NewHeapIterForSeriesWithBloom(queriers...)
eq := func(a, b *SeriesWithBloom) bool {
return a.Series.Fingerprint == b.Series.Fingerprint
}
merge := func(a, _ *SeriesWithBloom) *SeriesWithBloom {
return a
}
deduper := NewDedupingIter[*SeriesWithBloom](
deduper := NewDedupingIter[*SeriesWithBloom, *SeriesWithBloom](
eq,
id[*SeriesWithBloom],
merge,
NewPeekingIter[*SeriesWithBloom](mbq),
)
Expand Down
Loading