diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index f4339f004482e..ef4f3623feb99 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -20,6 +20,8 @@ func (b *Bloom) Encode(enc *encoding.Encbuf) error { // divide by 8 b/c bloom capacity is measured in bits, but we want bytes buf := bytes.NewBuffer(BlockPool.Get(int(b.Capacity() / 8))) + // TODO(owen-d): have encoder implement writer directly so we don't need + // to indirect via a buffer _, err := b.WriteTo(buf) if err != nil { return errors.Wrap(err, "encoding bloom filter") @@ -56,7 +58,16 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error { return nil } -func LazyDecodeBloomPage(dec *encoding.Decbuf, pool chunkenc.ReaderPool, decompressedSize int) (*BloomPageDecoder, error) { +func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) { + data := BlockPool.Get(page.Len)[:page.Len] + defer BlockPool.Put(data) + + _, err := io.ReadFull(r, data) + if err != nil { + return nil, errors.Wrap(err, "reading bloom page") + } + dec := encoding.DecWith(data) + if err := dec.CheckCrc(castagnoliTable); err != nil { return nil, errors.Wrap(err, "checksumming bloom page") } @@ -67,7 +78,7 @@ func LazyDecodeBloomPage(dec *encoding.Decbuf, pool chunkenc.ReaderPool, decompr } defer pool.PutReader(decompressor) - b := BlockPool.Get(decompressedSize)[:decompressedSize] + b := make([]byte, page.DecompressedLen) if _, err = io.ReadFull(decompressor, b); err != nil { return nil, errors.Wrap(err, "decompressing bloom page") @@ -98,6 +109,13 @@ func NewBloomPageDecoder(data []byte) *BloomPageDecoder { } // Decoder is a seekable, reset-able iterator +// TODO(owen-d): use buffer pools. The reason we don't currently +// do this is because the `data` slice currently escapes the decoder +// via the returned bloom, so we can't know when it's safe to return it to the pool. +// This happens via `data ([]byte) -> dec (*encoding.Decbuf) -> bloom (Bloom)` where +// the final Bloom has a reference to the data slice. +// We could optimize this by encoding the mode (read, write) into our structs +// and doing copy-on-write shenannigans, but I'm avoiding this for now. type BloomPageDecoder struct { data []byte dec *encoding.Decbuf @@ -107,15 +125,6 @@ type BloomPageDecoder struct { err error } -// Drop returns the underlying byte slice to the pool -// for efficiency. It's intended to be used as a -// perf optimization prior to garbage collection. -func (d *BloomPageDecoder) Drop() { - if cap(d.data) > 0 { - BlockPool.Put(d.data) - } -} - func (d *BloomPageDecoder) Reset() { d.err = nil d.cur = nil @@ -234,13 +243,5 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int) (*BloomPageD return nil, errors.Wrap(err, "seeking to bloom page") } - data := BlockPool.Get(page.Len)[:page.Len] - _, err := io.ReadFull(r, data) - if err != nil { - return nil, errors.Wrap(err, "reading bloom page") - } - - dec := encoding.DecWith(data) - - return LazyDecodeBloomPage(&dec, b.schema.DecompressorPool(), page.DecompressedLen) + return LazyDecodeBloomPage(r, b.schema.DecompressorPool(), page) } diff --git a/pkg/storage/bloom/v1/bloom_querier.go b/pkg/storage/bloom/v1/bloom_querier.go index 1292addb7543c..d0dbdc1b3b550 100644 --- a/pkg/storage/bloom/v1/bloom_querier.go +++ b/pkg/storage/bloom/v1/bloom_querier.go @@ -39,11 +39,6 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) { // load the desired page if it.curPageIndex != offset.Page || it.curPage == nil { - // drop the current page if it exists - if it.curPage != nil { - it.curPage.Drop() - } - r, err := it.b.reader.Blooms() if err != nil { it.err = errors.Wrap(err, "getting blooms reader") @@ -103,7 +98,6 @@ func (it *LazyBloomIter) next() bool { } // we've exhausted the current page, progress to next it.curPageIndex++ - it.curPage.Drop() it.curPage = nil continue } diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index eac8276400b97..41854c60b4c9d 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -173,19 +173,14 @@ func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHead return nil, errors.Wrap(err, "getting decompressor") } - decompressed := BlockPool.Get(header.DecompressedLen)[:header.DecompressedLen] + decompressed := make([]byte, header.DecompressedLen) if _, err = io.ReadFull(decompressor, decompressed); err != nil { return nil, errors.Wrap(err, "decompressing series page") } - // replace decoder's input with the now-decompressed data - dec.B = decompressed - res := &SeriesPageDecoder{ data: decompressed, header: header.SeriesHeader, - - i: -1, } res.Reset()