Skip to content

Commit

Permalink
Use lazy bloom filter (grafana#10896)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

When querying bloom filters, use lazy decoders from
owen-d/BoomFilters#1 so we don't make a copy of
bloom filters.
  • Loading branch information
salvacorts authored Oct 18, 2023
1 parent d84026c commit 8982906
Show file tree
Hide file tree
Showing 13 changed files with 267 additions and 252 deletions.
20 changes: 10 additions & 10 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,13 @@ func NoopGetChunks() []byte { return nil }
// part1: Create a compact method that assumes no block/meta files exists (eg first compaction)
// part2: Write logic to check first for existing block/meta files and does above.
func (c *Compactor) compactNewChunks(ctx context.Context, dst string) (err error) {
//part1
// part1
series := NoopGetSeries()
data := NoopGetChunks()

bloom := v1.Bloom{Sbf: *filter.NewDefaultScalableBloomFilter(0.01)}
bloom := v1.Bloom{ScalableBloomFilter: *filter.NewDefaultScalableBloomFilter(0.01)}
// create bloom filters from that.
bloom.Sbf.Add([]byte(fmt.Sprint(data)))
bloom.Add([]byte(fmt.Sprint(data)))

// block and seriesList
seriesList := []v1.SeriesWithBloom{
Expand Down Expand Up @@ -190,7 +190,7 @@ func (c *Compactor) compactNewChunks(ctx context.Context, dst string) (err error
}

func (c *Compactor) runCompact(ctx context.Context) error {
//TODO set MaxLookBackPeriod to Max ingester accepts
// TODO set MaxLookBackPeriod to Max ingester accepts
maxLookBackPeriod := c.cfg.MaxLookBackPeriod

stFp, endFp := NoopGetFingerprintRange()
Expand All @@ -213,7 +213,7 @@ func (c *Compactor) runCompact(ctx context.Context) error {
}

if len(metas) == 0 {
//run compaction from scratch
// run compaction from scratch
tempDst := os.TempDir()
err = c.compactNewChunks(ctx, tempDst)
if err != nil {
Expand All @@ -232,15 +232,15 @@ func (c *Compactor) runCompact(ctx context.Context) error {
}

// TODO complete part 2 - discuss with Owen - add part to compare chunks and blocks.
//1. for each period at hand, get TSDB table indexes for given fp range
//2. Check blocks for given uniqueIndexPaths and TSDBindexes
// 1. for each period at hand, get TSDB table indexes for given fp range
// 2. Check blocks for given uniqueIndexPaths and TSDBindexes
// if bloomBlock refs are a superset (covers TSDBIndexes plus more outside of range)
// create a new meta.json file, tombstone unused index/block paths.

//else if: there are TSDBindexes that are not covered in bloomBlocks (a subset)
//then call compactNewChunks on them and create a new meta.json
// else if: there are TSDBindexes that are not covered in bloomBlocks (a subset)
// then call compactNewChunks on them and create a new meta.json

//else: all good, no compaction
// else: all good, no compaction
}
return nil
}
3 changes: 2 additions & 1 deletion pkg/storage/bloom/v1/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ func TestArchive(t *testing.T) {
dir2 := t.TempDir()

numSeries := 100
data := mkBasicSeriesWithBlooms(numSeries, 0, 0xffff, 0, 10000)
numKeysPerSeries := 10000
data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)

builder, err := NewBlockBuilder(
BlockOptions{
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (bq *BlockQuerier) CheckChunksForSeries(fp model.Fingerprint, chks ChunkRef

// First, see if the search passes the series level bloom before checking for chunks individually
for _, search := range searches {
if !bloom.Sbf.Test(search) {
if !bloom.Test(search) {
// the entire series bloom didn't pass one of the searches,
// so we can skip checking chunks individually.
// We still return all chunks that are not included in the bloom
Expand All @@ -161,7 +161,7 @@ outer:
// TODO(owen-d): meld chunk + search into a single byte slice from the block schema
var combined = search

if !bloom.Sbf.Test(combined) {
if !bloom.Test(combined) {
continue outer
}
}
Expand Down
20 changes: 16 additions & 4 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
)

type Bloom struct {
Sbf filter.ScalableBloomFilter
filter.ScalableBloomFilter
}

func (b *Bloom) Encode(enc *encoding.Encbuf) error {
// divide by 8 b/c bloom capacity is measured in bits, but we want bytes
buf := bytes.NewBuffer(BlockPool.Get(int(b.Sbf.Capacity() / 8)))
buf := bytes.NewBuffer(BlockPool.Get(int(b.Capacity() / 8)))

_, err := b.Sbf.WriteTo(buf)
_, err := b.WriteTo(buf)
if err != nil {
return errors.Wrap(err, "encoding bloom filter")
}
Expand All @@ -32,11 +32,23 @@ func (b *Bloom) Encode(enc *encoding.Encbuf) error {
return nil
}

func (b *Bloom) DecodeCopy(dec *encoding.Decbuf) error {
ln := dec.Uvarint()
data := dec.Bytes(ln)

_, err := b.ReadFrom(bytes.NewReader(data))
if err != nil {
return errors.Wrap(err, "decoding copy of bloom filter")
}

return nil
}

func (b *Bloom) Decode(dec *encoding.Decbuf) error {
ln := dec.Uvarint()
data := dec.Bytes(ln)

_, err := b.Sbf.ReadFrom(bytes.NewReader(data))
_, err := b.DecodeFrom(data)
if err != nil {
return errors.Wrap(err, "decoding bloom filter")
}
Expand Down
50 changes: 32 additions & 18 deletions pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
"github.com/grafana/loki/pkg/storage/bloom/v1/filter"
)

func mkBasicSeriesWithBlooms(n int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (seriesList []SeriesWithBloom) {
for i := 0; i < n; i++ {
func mkBasicSeriesWithBlooms(nSeries, keysPerSeries int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (seriesList []SeriesWithBloom, keysList [][][]byte) {
seriesList = make([]SeriesWithBloom, 0, nSeries)
keysList = make([][][]byte, 0, nSeries)
for i := 0; i < nSeries; i++ {
var series Series
step := (throughFp - fromFp) / (model.Fingerprint(n))
step := (throughFp - fromFp) / (model.Fingerprint(nSeries))
series.Fingerprint = fromFp + model.Fingerprint(i)*step
timeDelta := fromTs + (throughTs-fromTs)/model.Time(n)*model.Time(i)
timeDelta := fromTs + (throughTs-fromTs)/model.Time(nSeries)*model.Time(i)
series.Chunks = []ChunkRef{
{
Start: fromTs + timeDelta*model.Time(i),
Expand All @@ -27,20 +29,28 @@ func mkBasicSeriesWithBlooms(n int, fromFp, throughFp model.Fingerprint, fromTs,
}

var bloom Bloom
bloom.Sbf = *filter.NewScalableBloomFilter(1024, 0.01, 0.8)
bloom.Sbf.Add([]byte(fmt.Sprint(i)))
bloom.ScalableBloomFilter = *filter.NewScalableBloomFilter(1024, 0.01, 0.8)

keys := make([][]byte, 0, keysPerSeries)
for j := 0; j < keysPerSeries; j++ {
key := []byte(fmt.Sprint(j))
bloom.Add(key)
keys = append(keys, key)
}

seriesList = append(seriesList, SeriesWithBloom{
Series: &series,
Bloom: &bloom,
})
keysList = append(keysList, keys)
}
return
}

func TestBlockBuilderRoundTrip(t *testing.T) {
numSeries := 100
data := mkBasicSeriesWithBlooms(numSeries, 0, 0xffff, 0, 10000)
numKeysPerSeries := 10000
data, keys := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)

// references for linking in memory reader+writer
indexBuf := bytes.NewBuffer(nil)
Expand Down Expand Up @@ -87,25 +97,29 @@ func TestBlockBuilderRoundTrip(t *testing.T) {
require.Equal(t, true, querier.Next(), "on iteration %d with error %v", i, querier.Err())
got := querier.At()
require.Equal(t, data[i].Series, got.Series)
require.Equal(t, data[i].Bloom, got.Bloom)
for _, key := range keys[i] {
require.True(t, got.Bloom.Test(key))
}
require.NoError(t, querier.Err())
}
// ensure no error
require.Nil(t, querier.Err())
// ensure it's exhausted
require.Equal(t, false, querier.Next())
require.False(t, querier.Next())

// test seek
i := numSeries / 2
half := data[i:]
require.Nil(t, querier.Seek(half[0].Series.Fingerprint))
for j := 0; j < len(half); j++ {
halfData := data[i:]
halfKeys := keys[i:]
require.Nil(t, querier.Seek(halfData[0].Series.Fingerprint))
for j := 0; j < len(halfData); j++ {
require.Equal(t, true, querier.Next(), "on iteration %d", j)
got := querier.At()
require.Equal(t, half[j].Series, got.Series)
require.Equal(t, half[j].Bloom, got.Bloom)
require.Nil(t, querier.Err())
require.Equal(t, halfData[j].Series, got.Series)
for _, key := range halfKeys[j] {
require.True(t, got.Bloom.Test(key))
}
require.NoError(t, querier.Err())
}
require.Equal(t, false, querier.Next())
require.False(t, querier.Next())

})
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/bloom/v1/dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (

func TestMergeDedupeIter(t *testing.T) {
var (
data = PointerSlice(
mkBasicSeriesWithBlooms(100, 0, 0xffff, 0, 10000),
)
queriers = make([]PeekingIterator[*SeriesWithBloom], 4)
numSeries = 100
numKeysPerSeries = 10000
data, _ = mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
dataPtr = PointerSlice(data)
queriers = make([]PeekingIterator[*SeriesWithBloom], 4)
)

for i := 0; i < len(queriers); i++ {
queriers[i] = NewPeekingIter[*SeriesWithBloom](NewSliceIter[*SeriesWithBloom](data))
queriers[i] = NewPeekingIter[*SeriesWithBloom](NewSliceIter[*SeriesWithBloom](dataPtr))
}

mbq := NewMergeBlockQuerier(queriers...)
Expand Down
90 changes: 47 additions & 43 deletions pkg/storage/bloom/v1/filter/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package filter
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"math/bits"
)
Expand Down Expand Up @@ -149,24 +150,41 @@ func (b *Buckets) WriteTo(stream io.Writer) (int64, error) {
return int64(len(b.data) + 2*binary.Size(uint8(0)) + 2*binary.Size(uint64(0))), err
}

// ReadFrom reads a binary representation of Buckets (such as might
// have been written by WriteTo()) from an i/o stream. It returns the number
// of bytes read.
func (b *Buckets) ReadFrom(stream io.Reader) (int64, error) {
var bucketSize, max uint8
var count, len uint64
func (b *Buckets) readParams(stream io.Reader) (int64, error) {
var bucketSize, maximum uint8
var count uint64

err := binary.Read(stream, binary.BigEndian, &bucketSize)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &max)
err = binary.Read(stream, binary.BigEndian, &maximum)
if err != nil {
return 0, err
}
err = binary.Read(stream, binary.BigEndian, &count)
if err != nil {
return 0, err
}

b.bucketSize = bucketSize
b.max = maximum
b.count = uint(count)

// Bytes read: bucketSize (uint8), max (uint8), count (uint64)
return int64(2*binary.Size(uint8(0)) + binary.Size(uint64(0))), nil
}

// ReadFrom reads a binary representation of Buckets (such as might
// have been written by WriteTo()) from an i/o stream. It returns the number
// of bytes read.
func (b *Buckets) ReadFrom(stream io.Reader) (int64, error) {
bytesParams, err := b.readParams(stream)
if err != nil {
return 0, err
}

var len uint64
err = binary.Read(stream, binary.BigEndian, &len)
if err != nil {
return 0, err
Expand All @@ -176,11 +194,29 @@ func (b *Buckets) ReadFrom(stream io.Reader) (int64, error) {
if err != nil {
return 0, err
}
b.bucketSize = bucketSize
b.max = max
b.count = uint(count)
b.data = data
return int64(int(len) + 2*binary.Size(uint8(0)) + 2*binary.Size(uint64(0))), nil

// Bytes read: bytesParams + dataLen (uint64) + data ([]byte)
return bytesParams + int64(binary.Size(uint64(0))) + int64(len), nil
}

// DecodeFrom reads a binary representation of Buckets (such as might
// have been written by WriteTo()) from a buffer.
// Whereas ReadFrom() reads the entire data into memory and
// makes a copy of the data buffer, DecodeFrom keeps a reference
// to the original data buffer and can only be used to Test.
func (b *Buckets) DecodeFrom(data []byte) (int64, error) {
bytesParams, err := b.readParams(bytes.NewReader(data))
if err != nil {
return 0, fmt.Errorf("failed to read Buckets params from buffer: %w", err)
}

dataLen := int64(binary.BigEndian.Uint64(data[bytesParams:]))
dataStart := bytesParams + int64(binary.Size(uint64(0)))
dataEnd := dataStart + dataLen
b.data = data[dataStart:dataEnd]

return dataEnd, nil
}

// GobEncode implements gob.GobEncoder interface.
Expand All @@ -201,35 +237,3 @@ func (b *Buckets) GobDecode(data []byte) error {

return err
}

type BucketsLazyReader struct {
Buckets
}

// NewBucketsLazyReader creates a new BucketsLazyReader from the provided data
// and returns the number of bytes used by the Buckets
// The data is expected to be in the format written by Buckets.WriteTo().
// Whereas Buckets.ReadFrom() reads the entire data into memory and
// makes a copy of the data buffer, BucketsLazyReader keeps a reference
// to the original data buffer and only reads from it when needed.
func NewBucketsLazyReader(data []byte) (BucketsLazyReader, int) {
bucketSize := data[0]

// Skip bucketSize (uint8), max (uint8), count (uint64)
lenDataOffset := 2*binary.Size(uint8(0)) + binary.Size(uint64(0))
// Add len field (uint64(0)) to the above offset
dataStart := lenDataOffset + binary.Size(uint64(0))
dataEnd := dataStart + int(binary.BigEndian.Uint64(data[lenDataOffset:]))

return BucketsLazyReader{
Buckets: Buckets{
data: data[dataStart:dataEnd],
bucketSize: bucketSize,
},
}, dataEnd
}

// Get returns the value in the specified bucket.
func (s BucketsLazyReader) Get(bucket uint) uint32 {
return s.Buckets.Get(bucket)
}
Loading

0 comments on commit 8982906

Please sign in to comment.