Skip to content

Commit

Permalink
fix(blooms): race condition in BloomPageDecoder (#12027)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Feb 21, 2024
1 parent e835b61 commit 1dce7b5
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pkg/chunkenc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,9 @@ func (pool *SnappyPool) GetReader(src io.Reader) (io.Reader, error) {

// PutReader places back in the pool a CompressionReader
func (pool *SnappyPool) PutReader(reader io.Reader) {
r := reader.(*snappy.Reader)
// Reset to free reference to the underlying reader
r.Reset(nil)
pool.readers.Put(reader)
}

Expand Down
11 changes: 10 additions & 1 deletion pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ func LazyDecodeBloomPage(dec *encoding.Decbuf, pool chunkenc.ReaderPool, decompr
if err != nil {
return nil, errors.Wrap(err, "getting decompressor")
}
defer pool.PutReader(decompressor)

b := BlockPool.Get(decompressedSize)[:decompressedSize]
defer BlockPool.Put(b)

if _, err = io.ReadFull(decompressor, b); err != nil {
return nil, errors.Wrap(err, "decompressing bloom page")
Expand Down Expand Up @@ -107,6 +107,15 @@ type BloomPageDecoder struct {
err error
}

// Drop returns the underlying byte slice to the pool
// for efficiency. It's intended to be used as a
// perf optimization prior to garbage collection.
func (d *BloomPageDecoder) Drop() {
if cap(d.data) > 0 {
BlockPool.Put(d.data)
}
}

func (d *BloomPageDecoder) Reset() {
d.err = nil
d.cur = nil
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) {
// if we need a different page or the current page hasn't been loaded,
// load the desired page
if it.curPageIndex != offset.Page || it.curPage == nil {

// drop the current page if it exists
if it.curPage != nil {
it.curPage.Drop()
}

r, err := it.b.reader.Blooms()
if err != nil {
it.err = errors.Wrap(err, "getting blooms reader")
Expand Down Expand Up @@ -97,6 +103,7 @@ func (it *LazyBloomIter) next() bool {
}
// we've exhausted the current page, progress to next
it.curPageIndex++
it.curPage.Drop()
it.curPage = nil
continue
}
Expand Down

0 comments on commit 1dce7b5

Please sign in to comment.