Skip to content

Commit

Permalink
perf: Introduce fixed size memory pool for bloom querier (#13039)
Browse files Browse the repository at this point in the history
This PR introduces a fixed size memory pool for bloom pages that are loaded in the block querier.

The aim of a fixed size pool of `[]byte` buffers is to reduce the amount of allocations, as well as to control the maximum heap size to prevent OOMing of the bloom gateways.

Also, with the current usage of a `sync.Pool`, the query parallelism (`bloom-gateway.worker-concurrency` and `bloom-gateway.block-query-concurrency`) was defined by the `bloom.max-query-page-size` (and vice versa), because the max page size could be loaded `workers * concurrency` times at the same time. Most of the time, though, smaller pages are loaded, and therefore concurrency is not optimized.

With the new fixed size memory pool, this problem should be solved. The pool is divided into slabs of different sizes holding different amounts of buffers; a larger amount of small sized buffers and a smaller amount of large sized buffers.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Jun 6, 2024
1 parent 86b119a commit fc26431
Show file tree
Hide file tree
Showing 21 changed files with 680 additions and 98 deletions.
4 changes: 2 additions & 2 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBloom, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize)
bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize)
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBloom, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize)
bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize)
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
44 changes: 0 additions & 44 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,50 +215,6 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
}
})

t.Run("request cancellation does not result in channel locking", func(t *testing.T) {
now := mktime("2024-01-25 10:00")

// replace store implementation and re-initialize workers and sub-services
refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)
mockStore := newMockBloomStore(queriers, metas)
mockStore.delay = 2000 * time.Millisecond

reg := prometheus.NewRegistry()
gw, err := New(cfg, mockStore, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
require.NoError(t, err)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), gw)
require.NoError(t, err)
})

chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100)

// saturate workers
// then send additional request
for i := 0; i < gw.cfg.WorkerConcurrency+1; i++ {
expr, err := syntax.ParseExpr(`{foo="bar"} |= "does not match"`)
require.NoError(t, err)

req := &logproto.FilterChunkRefRequest{
From: now.Add(-24 * time.Hour),
Through: now,
Refs: groupRefs(t, chunkRefs),
Plan: plan.QueryPlan{AST: expr},
Blocks: stringSlice(refs),
}

ctx, cancelFn := context.WithTimeout(context.Background(), 500*time.Millisecond)
ctx = user.InjectOrgID(ctx, tenantID)
t.Cleanup(cancelFn)

res, err := gw.FilterChunkRefs(ctx, req)
require.ErrorContainsf(t, err, context.DeadlineExceeded.Error(), "%+v", res)
}
})

t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) {
now := mktime("2023-10-03 10:00")

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
// }
// }
querier := &bloomshipper.CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize),
BlockQuerier: v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize),
BlockRef: blockRef,
}
queriers = append(queriers, querier)
Expand Down
15 changes: 15 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/grafana/loki/v3/pkg/bloomcompactor"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/types"

"github.com/grafana/loki/v3/pkg/analytics"
Expand Down Expand Up @@ -79,6 +80,7 @@ import (
"github.com/grafana/loki/v3/pkg/util/httpreq"
"github.com/grafana/loki/v3/pkg/util/limiter"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/mempool"
"github.com/grafana/loki/v3/pkg/util/querylimits"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"
serverutil "github.com/grafana/loki/v3/pkg/util/server"
Expand Down Expand Up @@ -730,6 +732,19 @@ func (t *Loki) initBloomStore() (services.Service, error) {
reg := prometheus.DefaultRegisterer
bsCfg := t.Cfg.StorageConfig.BloomShipperConfig

// Set global BloomPageAllocator variable
switch bsCfg.MemoryManagement.BloomPageAllocationType {
case "simple":
bloomshipper.BloomPageAllocator = &v1.SimpleHeapAllocator{}
case "dynamic":
bloomshipper.BloomPageAllocator = v1.BloomPagePool
case "fixed":
bloomshipper.BloomPageAllocator = mempool.New("bloom-page-pool", bsCfg.MemoryManagement.BloomPageMemPoolBuckets, reg)
default:
// do nothing
bloomshipper.BloomPageAllocator = nil
}

var metasCache cache.Cache
if t.Cfg.isTarget(IndexGateway) && cache.IsCacheConfigured(bsCfg.MetasCache) {
metasCache, err = cache.New(bsCfg.MetasCache, reg, logger, stats.BloomMetasCache, constants.Loki)
Expand Down
8 changes: 6 additions & 2 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ type BlockQuerier struct {
// will be returned to the pool for efficiency. This can only safely be used
// when the underlying bloom bytes don't escape the decoder, i.e.
// when loading blooms for querying (bloom-gw) but not for writing (bloom-compactor).
func NewBlockQuerier(b *Block, noCapture bool, maxPageSize int) *BlockQuerier {
func NewBlockQuerier(b *Block, alloc Allocator, maxPageSize int) *BlockQuerier {
return &BlockQuerier{
block: b,
series: NewLazySeriesIter(b),
blooms: NewLazyBloomIter(b, noCapture, maxPageSize),
blooms: NewLazyBloomIter(b, alloc, maxPageSize),
}
}

Expand Down Expand Up @@ -173,3 +173,7 @@ func (bq *BlockQuerier) Err() error {

return bq.blooms.Err()
}

func (bq *BlockQuerier) Close() {
bq.blooms.Close()
}
43 changes: 28 additions & 15 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Bloom struct {

func (b *Bloom) Encode(enc *encoding.Encbuf) error {
// divide by 8 b/c bloom capacity is measured in bits, but we want bytes
buf := bytes.NewBuffer(BloomPagePool.Get(int(b.Capacity() / 8)))
buf := bytes.NewBuffer(make([]byte, 0, int(b.Capacity()/8)))

// TODO(owen-d): have encoder implement writer directly so we don't need
// to indirect via a buffer
Expand All @@ -36,7 +36,6 @@ func (b *Bloom) Encode(enc *encoding.Encbuf) error {
data := buf.Bytes()
enc.PutUvarint(len(data)) // length of bloom filter
enc.PutBytes(data)
BloomPagePool.Put(data[:0]) // release to pool
return nil
}

Expand Down Expand Up @@ -64,11 +63,14 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error {
return nil
}

func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
data := BloomPagePool.Get(page.Len)[:page.Len]
defer BloomPagePool.Put(data)
func LazyDecodeBloomPage(r io.Reader, alloc Allocator, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
data, err := alloc.Get(page.Len)
if err != nil {
return nil, errors.Wrap(err, "allocating buffer")
}
defer alloc.Put(data)

_, err := io.ReadFull(r, data)
_, err = io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading bloom page")
}
Expand All @@ -84,7 +86,10 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe
}
defer pool.PutReader(decompressor)

b := BloomPagePool.Get(page.DecompressedLen)[:page.DecompressedLen]
b, err := alloc.Get(page.DecompressedLen)
if err != nil {
return nil, errors.Wrap(err, "allocating buffer")
}

if _, err = io.ReadFull(decompressor, b); err != nil {
return nil, errors.Wrap(err, "decompressing bloom page")
Expand All @@ -96,14 +101,18 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe
}

// shortcut to skip allocations when we know the page is not compressed
func LazyDecodeBloomPageNoCompression(r io.Reader, page BloomPageHeader) (*BloomPageDecoder, error) {
func LazyDecodeBloomPageNoCompression(r io.Reader, alloc Allocator, page BloomPageHeader) (*BloomPageDecoder, error) {
// data + checksum
if page.Len != page.DecompressedLen+4 {
return nil, errors.New("the Len and DecompressedLen of the page do not match")
}
data := BloomPagePool.Get(page.Len)[:page.Len]

_, err := io.ReadFull(r, data)
data, err := alloc.Get(page.Len)
if err != nil {
return nil, errors.Wrap(err, "allocating buffer")
}

_, err = io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading bloom page")
}
Expand Down Expand Up @@ -158,12 +167,16 @@ type BloomPageDecoder struct {
// This can only safely be used when the underlying bloom
// bytes don't escape the decoder:
// on reads in the bloom-gw but not in the bloom-compactor
func (d *BloomPageDecoder) Relinquish() {
func (d *BloomPageDecoder) Relinquish(alloc Allocator) {
if d == nil {
return
}

data := d.data
d.data = nil

if cap(data) > 0 {
BloomPagePool.Put(data)
_ = alloc.Put(data)
}
}

Expand Down Expand Up @@ -277,7 +290,7 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
// BloomPageDecoder returns a decoder for the given page index.
// It may skip the page if it's too large.
// NB(owen-d): if `skip` is true, err _must_ be nil.
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) {
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc Allocator, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) {
if pageIdx < 0 || pageIdx >= len(b.pageHeaders) {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen))
Expand All @@ -300,9 +313,9 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize
}

if b.schema.encoding == chunkenc.EncNone {
res, err = LazyDecodeBloomPageNoCompression(r, page)
res, err = LazyDecodeBloomPageNoCompression(r, alloc, page)
} else {
res, err = LazyDecodeBloomPage(r, b.schema.DecompressorPool(), page)
res, err = LazyDecodeBloomPage(r, alloc, b.schema.DecompressorPool(), page)
}

if err != nil {
Expand Down
30 changes: 15 additions & 15 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ type BloomQuerier interface {
}

type LazyBloomIter struct {
usePool bool

b *Block
m int // max page size in bytes

alloc Allocator

// state
initialized bool
err error
Expand All @@ -24,11 +24,11 @@ type LazyBloomIter struct {
// will be returned to the pool for efficiency.
// This can only safely be used when the underlying bloom
// bytes don't escape the decoder.
func NewLazyBloomIter(b *Block, pool bool, maxSize int) *LazyBloomIter {
func NewLazyBloomIter(b *Block, alloc Allocator, maxSize int) *LazyBloomIter {
return &LazyBloomIter{
usePool: pool,
b: b,
m: maxSize,
b: b,
m: maxSize,
alloc: alloc,
}
}

Expand All @@ -53,16 +53,14 @@ func (it *LazyBloomIter) LoadOffset(offset BloomOffset) (skip bool) {

// drop the current page if it exists and
// we're using the pool
if it.curPage != nil && it.usePool {
it.curPage.Relinquish()
}
it.curPage.Relinquish(it.alloc)

r, err := it.b.reader.Blooms()
if err != nil {
it.err = errors.Wrap(err, "getting blooms reader")
return false
}
decoder, skip, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics)
decoder, skip, err := it.b.blooms.BloomPageDecoder(r, it.alloc, offset.Page, it.m, it.b.metrics)
if err != nil {
it.err = errors.Wrap(err, "loading bloom page")
return false
Expand Down Expand Up @@ -106,6 +104,7 @@ func (it *LazyBloomIter) next() bool {
var skip bool
it.curPage, skip, err = it.b.blooms.BloomPageDecoder(
r,
it.alloc,
it.curPageIndex,
it.m,
it.b.metrics,
Expand All @@ -130,11 +129,8 @@ func (it *LazyBloomIter) next() bool {

// we've exhausted the current page, progress to next
it.curPageIndex++
// drop the current page if it exists and
// we're using the pool
if it.usePool {
it.curPage.Relinquish()
}
// drop the current page if it exists
it.curPage.Relinquish(it.alloc)
it.curPage = nil
continue
}
Expand All @@ -161,3 +157,7 @@ func (it *LazyBloomIter) Err() error {
return nil
}
}

func (it *LazyBloomIter) Close() {
it.curPage.Relinquish(it.alloc)
}
12 changes: 6 additions & 6 deletions pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) {
}

block := NewBlock(tc.reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)

err = block.LoadHeaders()
require.Nil(t, err)
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestMergeBuilder(t *testing.T) {
itr := NewSliceIter[SeriesWithBloom](data[min:max])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize)))
blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize)))
}

// We're not testing the ability to extend a bloom in this test
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestMergeBuilder(t *testing.T) {
require.Nil(t, err)

block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)

EqualIterators[*SeriesWithBloom](
t,
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestBlockReset(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)

rounds := make([][]model.Fingerprint, 2)

Expand Down Expand Up @@ -362,7 +362,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)
querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize)

// rather than use the block querier directly, collect it's data
// so we can use it in a few places later
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {

// ensure the new block contains one copy of all the data
// by comparing it against an iterator over the source data
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize)
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize)
sourceItr := NewSliceIter[*SeriesWithBloom](PointerSlice[SeriesWithBloom](xs))

EqualIterators[*SeriesWithBloom](
Expand Down
Loading

0 comments on commit fc26431

Please sign in to comment.