From fc264310ce64fc082965a5d7f036e45a5a399c61 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 6 Jun 2024 16:41:05 +0200 Subject: [PATCH] perf: Introduce fixed size memory pool for bloom querier (#13039) This PR introduces a fixed size memory pool for bloom pages that are loaded in the block querier. The aim of a fixed size pool of `[]byte` buffers is to reduce the amount of allocations, as well as to control the maximum heap size to prevent OOMing of the bloom gateways. Also, with the current usage of a `sync.Pool`, the query parallelism (`bloom-gateway.worker-concurrency` and `bloom-gateway.block-query-concurrency`) was defined by the `bloom.max-query-page-size` (and vice versa), because the max page size could be loaded `workers * concurrency` times at the same time. Most of the time, though, smaller pages are loaded, and therefore concurrency is not optimized. With the new fixed size memory pool, this problem should be solved. The pool is divided into slabs of different sizes holding different amounts of buffers; a larger amount of small sized buffers and a smaller amount of large sized buffers. Signed-off-by: Christian Haudum --- pkg/bloombuild/builder/spec_test.go | 4 +- pkg/bloomcompactor/spec_test.go | 4 +- pkg/bloomgateway/bloomgateway_test.go | 44 ------ pkg/bloomgateway/util_test.go | 2 +- pkg/loki/modules.go | 15 ++ pkg/storage/bloom/v1/block.go | 8 +- pkg/storage/bloom/v1/bloom.go | 43 ++++-- pkg/storage/bloom/v1/bloom_querier.go | 30 ++-- pkg/storage/bloom/v1/builder_test.go | 12 +- pkg/storage/bloom/v1/fuse_test.go | 6 +- pkg/storage/bloom/v1/index.go | 2 +- pkg/storage/bloom/v1/util.go | 31 +++- .../stores/shipper/bloomshipper/cache.go | 16 ++- .../shipper/bloomshipper/config/config.go | 67 +++++++++ pkg/util/flagext/csv.go | 62 ++++++++ pkg/util/flagext/csv_test.go | 79 ++++++++++ pkg/util/mempool/bucket.go | 51 +++++++ pkg/util/mempool/metrics.go | 32 +++++ pkg/util/mempool/pool.go | 135 ++++++++++++++++++ pkg/util/mempool/pool_test.go | 133 +++++++++++++++++ tools/bloom/inspector/main.go | 2 +- 21 files changed, 680 insertions(+), 98 deletions(-) create mode 100644 pkg/util/flagext/csv.go create mode 100644 pkg/util/flagext/csv_test.go create mode 100644 pkg/util/mempool/bucket.go create mode 100644 pkg/util/mempool/metrics.go create mode 100644 pkg/util/mempool/pool.go create mode 100644 pkg/util/mempool/pool_test.go diff --git a/pkg/bloombuild/builder/spec_test.go b/pkg/bloombuild/builder/spec_test.go index 40225dc45865b..3397a2f60bfe4 100644 --- a/pkg/bloombuild/builder/spec_test.go +++ b/pkg/bloombuild/builder/spec_test.go @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser for i, b := range blocks { bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{ BlockRef: refs[i], - BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize), + BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize), }) } @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) { expectedRefs := v1.PointerSlice(data) outputRefs := make([]*v1.SeriesWithBloom, 0, len(data)) for _, block := range outputBlocks { - bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize) + bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize) for bq.Next() { outputRefs = append(outputRefs, bq.At()) } diff --git a/pkg/bloomcompactor/spec_test.go b/pkg/bloomcompactor/spec_test.go index 7e39b8dec57f0..373b99de68ad3 100644 --- a/pkg/bloomcompactor/spec_test.go +++ b/pkg/bloomcompactor/spec_test.go @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser for i, b := range blocks { bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{ BlockRef: refs[i], - BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize), + BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize), }) } @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) { expectedRefs := v1.PointerSlice(data) outputRefs := make([]*v1.SeriesWithBloom, 0, len(data)) for _, block := range outputBlocks { - bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize) + bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize) for bq.Next() { outputRefs = append(outputRefs, bq.At()) } diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 15c9ca2be2d85..8663bcf079590 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -215,50 +215,6 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { } }) - t.Run("request cancellation does not result in channel locking", func(t *testing.T) { - now := mktime("2024-01-25 10:00") - - // replace store implementation and re-initialize workers and sub-services - refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) - mockStore := newMockBloomStore(queriers, metas) - mockStore.delay = 2000 * time.Millisecond - - reg := prometheus.NewRegistry() - gw, err := New(cfg, mockStore, logger, reg) - require.NoError(t, err) - - err = services.StartAndAwaitRunning(context.Background(), gw) - require.NoError(t, err) - t.Cleanup(func() { - err = services.StopAndAwaitTerminated(context.Background(), gw) - require.NoError(t, err) - }) - - chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100) - - // saturate workers - // then send additional request - for i := 0; i < gw.cfg.WorkerConcurrency+1; i++ { - expr, err := syntax.ParseExpr(`{foo="bar"} |= "does not match"`) - require.NoError(t, err) - - req := &logproto.FilterChunkRefRequest{ - From: now.Add(-24 * time.Hour), - Through: now, - Refs: groupRefs(t, chunkRefs), - Plan: plan.QueryPlan{AST: expr}, - Blocks: stringSlice(refs), - } - - ctx, cancelFn := context.WithTimeout(context.Background(), 500*time.Millisecond) - ctx = user.InjectOrgID(ctx, tenantID) - t.Cleanup(cancelFn) - - res, err := gw.FilterChunkRefs(ctx, req) - require.ErrorContainsf(t, err, context.DeadlineExceeded.Error(), "%+v", res) - } - }) - t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) { now := mktime("2023-10-03 10:00") diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index a3f219c326efd..4bd9d9609d647 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -399,7 +399,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, // } // } querier := &bloomshipper.CloseableBlockQuerier{ - BlockQuerier: v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize), + BlockQuerier: v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize), BlockRef: blockRef, } queriers = append(queriers, querier) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 22cd46743ea27..8f91e0d754427 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -35,6 +35,7 @@ import ( "github.com/grafana/loki/v3/pkg/bloomcompactor" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/types" "github.com/grafana/loki/v3/pkg/analytics" @@ -79,6 +80,7 @@ import ( "github.com/grafana/loki/v3/pkg/util/httpreq" "github.com/grafana/loki/v3/pkg/util/limiter" util_log "github.com/grafana/loki/v3/pkg/util/log" + "github.com/grafana/loki/v3/pkg/util/mempool" "github.com/grafana/loki/v3/pkg/util/querylimits" lokiring "github.com/grafana/loki/v3/pkg/util/ring" serverutil "github.com/grafana/loki/v3/pkg/util/server" @@ -730,6 +732,19 @@ func (t *Loki) initBloomStore() (services.Service, error) { reg := prometheus.DefaultRegisterer bsCfg := t.Cfg.StorageConfig.BloomShipperConfig + // Set global BloomPageAllocator variable + switch bsCfg.MemoryManagement.BloomPageAllocationType { + case "simple": + bloomshipper.BloomPageAllocator = &v1.SimpleHeapAllocator{} + case "dynamic": + bloomshipper.BloomPageAllocator = v1.BloomPagePool + case "fixed": + bloomshipper.BloomPageAllocator = mempool.New("bloom-page-pool", bsCfg.MemoryManagement.BloomPageMemPoolBuckets, reg) + default: + // do nothing + bloomshipper.BloomPageAllocator = nil + } + var metasCache cache.Cache if t.Cfg.isTarget(IndexGateway) && cache.IsCacheConfigured(bsCfg.MetasCache) { metasCache, err = cache.New(bsCfg.MetasCache, reg, logger, stats.BloomMetasCache, constants.Loki) diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index ba661de79c498..042c55a7a0666 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -117,11 +117,11 @@ type BlockQuerier struct { // will be returned to the pool for efficiency. This can only safely be used // when the underlying bloom bytes don't escape the decoder, i.e. // when loading blooms for querying (bloom-gw) but not for writing (bloom-compactor). -func NewBlockQuerier(b *Block, noCapture bool, maxPageSize int) *BlockQuerier { +func NewBlockQuerier(b *Block, alloc Allocator, maxPageSize int) *BlockQuerier { return &BlockQuerier{ block: b, series: NewLazySeriesIter(b), - blooms: NewLazyBloomIter(b, noCapture, maxPageSize), + blooms: NewLazyBloomIter(b, alloc, maxPageSize), } } @@ -173,3 +173,7 @@ func (bq *BlockQuerier) Err() error { return bq.blooms.Err() } + +func (bq *BlockQuerier) Close() { + bq.blooms.Close() +} diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index aa51762d4e4ec..fc39133e81b05 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -24,7 +24,7 @@ type Bloom struct { func (b *Bloom) Encode(enc *encoding.Encbuf) error { // divide by 8 b/c bloom capacity is measured in bits, but we want bytes - buf := bytes.NewBuffer(BloomPagePool.Get(int(b.Capacity() / 8))) + buf := bytes.NewBuffer(make([]byte, 0, int(b.Capacity()/8))) // TODO(owen-d): have encoder implement writer directly so we don't need // to indirect via a buffer @@ -36,7 +36,6 @@ func (b *Bloom) Encode(enc *encoding.Encbuf) error { data := buf.Bytes() enc.PutUvarint(len(data)) // length of bloom filter enc.PutBytes(data) - BloomPagePool.Put(data[:0]) // release to pool return nil } @@ -64,11 +63,14 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error { return nil } -func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) { - data := BloomPagePool.Get(page.Len)[:page.Len] - defer BloomPagePool.Put(data) +func LazyDecodeBloomPage(r io.Reader, alloc Allocator, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) { + data, err := alloc.Get(page.Len) + if err != nil { + return nil, errors.Wrap(err, "allocating buffer") + } + defer alloc.Put(data) - _, err := io.ReadFull(r, data) + _, err = io.ReadFull(r, data) if err != nil { return nil, errors.Wrap(err, "reading bloom page") } @@ -84,7 +86,10 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe } defer pool.PutReader(decompressor) - b := BloomPagePool.Get(page.DecompressedLen)[:page.DecompressedLen] + b, err := alloc.Get(page.DecompressedLen) + if err != nil { + return nil, errors.Wrap(err, "allocating buffer") + } if _, err = io.ReadFull(decompressor, b); err != nil { return nil, errors.Wrap(err, "decompressing bloom page") @@ -96,14 +101,18 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe } // shortcut to skip allocations when we know the page is not compressed -func LazyDecodeBloomPageNoCompression(r io.Reader, page BloomPageHeader) (*BloomPageDecoder, error) { +func LazyDecodeBloomPageNoCompression(r io.Reader, alloc Allocator, page BloomPageHeader) (*BloomPageDecoder, error) { // data + checksum if page.Len != page.DecompressedLen+4 { return nil, errors.New("the Len and DecompressedLen of the page do not match") } - data := BloomPagePool.Get(page.Len)[:page.Len] - _, err := io.ReadFull(r, data) + data, err := alloc.Get(page.Len) + if err != nil { + return nil, errors.Wrap(err, "allocating buffer") + } + + _, err = io.ReadFull(r, data) if err != nil { return nil, errors.Wrap(err, "reading bloom page") } @@ -158,12 +167,16 @@ type BloomPageDecoder struct { // This can only safely be used when the underlying bloom // bytes don't escape the decoder: // on reads in the bloom-gw but not in the bloom-compactor -func (d *BloomPageDecoder) Relinquish() { +func (d *BloomPageDecoder) Relinquish(alloc Allocator) { + if d == nil { + return + } + data := d.data d.data = nil if cap(data) > 0 { - BloomPagePool.Put(data) + _ = alloc.Put(data) } } @@ -277,7 +290,7 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) { // BloomPageDecoder returns a decoder for the given page index. // It may skip the page if it's too large. // NB(owen-d): if `skip` is true, err _must_ be nil. -func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) { +func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc Allocator, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) { if pageIdx < 0 || pageIdx >= len(b.pageHeaders) { metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc() metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen)) @@ -300,9 +313,9 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize } if b.schema.encoding == chunkenc.EncNone { - res, err = LazyDecodeBloomPageNoCompression(r, page) + res, err = LazyDecodeBloomPageNoCompression(r, alloc, page) } else { - res, err = LazyDecodeBloomPage(r, b.schema.DecompressorPool(), page) + res, err = LazyDecodeBloomPage(r, alloc, b.schema.DecompressorPool(), page) } if err != nil { diff --git a/pkg/storage/bloom/v1/bloom_querier.go b/pkg/storage/bloom/v1/bloom_querier.go index 8de9a33e713f0..b90bae8a046bb 100644 --- a/pkg/storage/bloom/v1/bloom_querier.go +++ b/pkg/storage/bloom/v1/bloom_querier.go @@ -7,11 +7,11 @@ type BloomQuerier interface { } type LazyBloomIter struct { - usePool bool - b *Block m int // max page size in bytes + alloc Allocator + // state initialized bool err error @@ -24,11 +24,11 @@ type LazyBloomIter struct { // will be returned to the pool for efficiency. // This can only safely be used when the underlying bloom // bytes don't escape the decoder. -func NewLazyBloomIter(b *Block, pool bool, maxSize int) *LazyBloomIter { +func NewLazyBloomIter(b *Block, alloc Allocator, maxSize int) *LazyBloomIter { return &LazyBloomIter{ - usePool: pool, - b: b, - m: maxSize, + b: b, + m: maxSize, + alloc: alloc, } } @@ -53,16 +53,14 @@ func (it *LazyBloomIter) LoadOffset(offset BloomOffset) (skip bool) { // drop the current page if it exists and // we're using the pool - if it.curPage != nil && it.usePool { - it.curPage.Relinquish() - } + it.curPage.Relinquish(it.alloc) r, err := it.b.reader.Blooms() if err != nil { it.err = errors.Wrap(err, "getting blooms reader") return false } - decoder, skip, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics) + decoder, skip, err := it.b.blooms.BloomPageDecoder(r, it.alloc, offset.Page, it.m, it.b.metrics) if err != nil { it.err = errors.Wrap(err, "loading bloom page") return false @@ -106,6 +104,7 @@ func (it *LazyBloomIter) next() bool { var skip bool it.curPage, skip, err = it.b.blooms.BloomPageDecoder( r, + it.alloc, it.curPageIndex, it.m, it.b.metrics, @@ -130,11 +129,8 @@ func (it *LazyBloomIter) next() bool { // we've exhausted the current page, progress to next it.curPageIndex++ - // drop the current page if it exists and - // we're using the pool - if it.usePool { - it.curPage.Relinquish() - } + // drop the current page if it exists + it.curPage.Relinquish(it.alloc) it.curPage = nil continue } @@ -161,3 +157,7 @@ func (it *LazyBloomIter) Err() error { return nil } } + +func (it *LazyBloomIter) Close() { + it.curPage.Relinquish(it.alloc) +} diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 56d03cbd7c930..45461824970ab 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -117,7 +117,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) { } block := NewBlock(tc.reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false, DefaultMaxPageSize) + querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize) err = block.LoadHeaders() require.Nil(t, err) @@ -218,7 +218,7 @@ func TestMergeBuilder(t *testing.T) { itr := NewSliceIter[SeriesWithBloom](data[min:max]) _, err = builder.BuildFrom(itr) require.Nil(t, err) - blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize))) + blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize))) } // We're not testing the ability to extend a bloom in this test @@ -252,7 +252,7 @@ func TestMergeBuilder(t *testing.T) { require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false, DefaultMaxPageSize) + querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize) EqualIterators[*SeriesWithBloom]( t, @@ -296,7 +296,7 @@ func TestBlockReset(t *testing.T) { _, err = builder.BuildFrom(itr) require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false, DefaultMaxPageSize) + querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize) rounds := make([][]model.Fingerprint, 2) @@ -362,7 +362,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { _, err = builder.BuildFrom(itr) require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false, DefaultMaxPageSize) + querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize) // rather than use the block querier directly, collect it's data // so we can use it in a few places later @@ -423,7 +423,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { // ensure the new block contains one copy of all the data // by comparing it against an iterator over the source data - mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize) + mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize) sourceItr := NewSliceIter[*SeriesWithBloom](PointerSlice[SeriesWithBloom](xs)) EqualIterators[*SeriesWithBloom]( diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index a0dc23001e939..3df65a8da27c4 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -76,7 +76,7 @@ func TestFusedQuerier(t *testing.T) { require.NoError(t, err) require.False(t, itr.Next()) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, true, DefaultMaxPageSize) + querier := NewBlockQuerier(block, BloomPagePool, DefaultMaxPageSize) n := 2 nReqs := numSeries / n @@ -215,7 +215,7 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { require.False(t, itr.Next()) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, true, 1000) + querier := NewBlockQuerier(block, BloomPagePool, 1000) for fp := model.Fingerprint(0); fp < model.Fingerprint(numSeries); fp++ { err := querier.Seek(fp) @@ -264,7 +264,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou _, err = builder.BuildFrom(itr) require.Nil(b, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, true, DefaultMaxPageSize) + querier := NewBlockQuerier(block, BloomPagePool, DefaultMaxPageSize) numRequestChains := 100 seriesPerRequest := 100 diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index caadfa26ddf74..eed6d21ce5c9c 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -166,7 +166,7 @@ func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHead return nil, errors.Wrap(err, "seeking to series page") } - data := SeriesPagePool.Get(header.Len)[:header.Len] + data, _ := SeriesPagePool.Get(header.Len) defer SeriesPagePool.Put(data) _, err = io.ReadFull(r, data) if err != nil { diff --git a/pkg/storage/bloom/v1/util.go b/pkg/storage/bloom/v1/util.go index 22fb47e43e799..92e64048fa5bd 100644 --- a/pkg/storage/bloom/v1/util.go +++ b/pkg/storage/bloom/v1/util.go @@ -44,7 +44,7 @@ var ( // buffer pool for bloom pages // 128KB 256KB 512KB 1MB 2MB 4MB 8MB 16MB 32MB 64MB 128MB - BloomPagePool = BytePool{ + BloomPagePool = &BytePool{ pool: pool.New( 128<<10, 128<<20, 2, func(size int) interface{} { @@ -53,15 +53,38 @@ var ( } ) +// Allocator handles byte slices for bloom queriers. +// It exists to reduce the cost of allocations and allows to re-use already allocated memory. +type Allocator interface { + Get(size int) ([]byte, error) + Put([]byte) bool +} + +// SimpleHeapAllocator allocates a new byte slice every time and does not re-cycle buffers. +type SimpleHeapAllocator struct{} + +func (a *SimpleHeapAllocator) Get(size int) ([]byte, error) { + return make([]byte, size), nil +} + +func (a *SimpleHeapAllocator) Put([]byte) bool { + return true +} + +// BytePool uses a sync.Pool to re-cycle already allocated buffers. type BytePool struct { pool *pool.Pool } -func (p *BytePool) Get(size int) []byte { - return p.pool.Get(size).([]byte)[:0] +// Get implement Allocator +func (p *BytePool) Get(size int) ([]byte, error) { + return p.pool.Get(size).([]byte)[:size], nil } -func (p *BytePool) Put(b []byte) { + +// Put implement Allocator +func (p *BytePool) Put(b []byte) bool { p.pool.Put(b) + return true } func newCRC32() hash.Hash32 { diff --git a/pkg/storage/stores/shipper/bloomshipper/cache.go b/pkg/storage/stores/shipper/bloomshipper/cache.go index 6ff6ef64948e3..c1e3964fe8fc0 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache.go @@ -15,6 +15,8 @@ import ( "github.com/grafana/loki/v3/pkg/util" ) +var BloomPageAllocator v1.Allocator + type CloseableBlockQuerier struct { BlockRef *v1.BlockQuerier @@ -22,6 +24,7 @@ type CloseableBlockQuerier struct { } func (c *CloseableBlockQuerier) Close() error { + c.BlockQuerier.Close() if c.close != nil { return c.close() } @@ -157,15 +160,24 @@ func (b *BlockDirectory) resolveSize() error { // BlockQuerier returns a new block querier from the directory. // The passed function `close` is called when the the returned querier is closed. - func (b BlockDirectory) BlockQuerier( usePool bool, close func() error, maxPageSize int, metrics *v1.Metrics, ) *CloseableBlockQuerier { + + var alloc v1.Allocator + if usePool && BloomPageAllocator != nil { + alloc = BloomPageAllocator + } else { + alloc = &v1.SimpleHeapAllocator{} + } + + bq := v1.NewBlockQuerier(b.Block(metrics), alloc, maxPageSize) + return &CloseableBlockQuerier{ - BlockQuerier: v1.NewBlockQuerier(b.Block(metrics), usePool, maxPageSize), + BlockQuerier: bq, BlockRef: b.BlockRef, close: close, } diff --git a/pkg/storage/stores/shipper/bloomshipper/config/config.go b/pkg/storage/stores/shipper/bloomshipper/config/config.go index 72d8f8557b095..6de144a3f84bf 100644 --- a/pkg/storage/stores/shipper/bloomshipper/config/config.go +++ b/pkg/storage/stores/shipper/bloomshipper/config/config.go @@ -4,11 +4,16 @@ package config import ( "errors" "flag" + "fmt" + "slices" + "strings" "time" "github.com/grafana/dskit/flagext" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" + lokiflagext "github.com/grafana/loki/v3/pkg/util/flagext" + "github.com/grafana/loki/v3/pkg/util/mempool" ) type Config struct { @@ -18,6 +23,7 @@ type Config struct { BlocksCache BlocksCacheConfig `yaml:"blocks_cache"` MetasCache cache.Config `yaml:"metas_cache"` MetasLRUCache cache.EmbeddedCacheConfig `yaml:"metas_lru_cache"` + MemoryManagement MemoryManagementConfig `yaml:"memory_management" doc:"hidden"` // This will always be set to true when flags are registered. // In tests, where config is created as literal, it can be set manually. @@ -34,6 +40,7 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour) c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f) c.MetasLRUCache.RegisterFlagsWithPrefix(prefix+"metas-lru-cache.", "In-memory LRU cache for bloom metas. ", f) + c.MemoryManagement.RegisterFlagsWithPrefix(prefix+"memory-management.", f) // always cache LIST operations c.CacheListOps = true @@ -43,6 +50,9 @@ func (c *Config) Validate() error { if len(c.WorkingDirectory) == 0 { return errors.New("at least one working directory must be specified") } + if err := c.MemoryManagement.Validate(); err != nil { + return err + } return nil } @@ -81,3 +91,60 @@ func (cfg *BlocksCacheConfig) Validate() error { } return nil } + +var ( + // the default that describes a 4GiB memory pool + defaultMemPoolBuckets = mempool.Buckets{ + {Size: 128, Capacity: 64 << 10}, // 8MiB -- for tests + {Size: 512, Capacity: 2 << 20}, // 1024MiB + {Size: 128, Capacity: 8 << 20}, // 1024MiB + {Size: 32, Capacity: 32 << 20}, // 1024MiB + {Size: 8, Capacity: 128 << 20}, // 1024MiB + } + types = supportedAllocationTypes{ + "simple", "simple heap allocations using Go's make([]byte, n) and no re-cycling of buffers", + "dynamic", "a buffer pool with variable sized buckets and best effort re-cycling of buffers using Go's sync.Pool", + "fixed", "a fixed size memory pool with configurable slab sizes, see mem-pool-buckets", + } +) + +type MemoryManagementConfig struct { + BloomPageAllocationType string `yaml:"bloom_page_alloc_type"` + BloomPageMemPoolBuckets lokiflagext.CSV[mempool.Bucket] `yaml:"bloom_page_mem_pool_buckets"` +} + +func (cfg *MemoryManagementConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.BloomPageAllocationType, prefix+"alloc-type", "dynamic", fmt.Sprintf("One of: %s", strings.Join(types.descriptions(), ", "))) + + _ = cfg.BloomPageMemPoolBuckets.Set(defaultMemPoolBuckets.String()) + f.Var(&cfg.BloomPageMemPoolBuckets, prefix+"mem-pool-buckets", "Comma separated list of buckets in the format {size}x{bytes}") +} + +func (cfg *MemoryManagementConfig) Validate() error { + if !slices.Contains(types.names(), cfg.BloomPageAllocationType) { + msg := fmt.Sprintf("bloom_page_alloc_type must be one of: %s", strings.Join(types.descriptions(), ", ")) + return errors.New(msg) + } + if cfg.BloomPageAllocationType == "fixed" && len(cfg.BloomPageMemPoolBuckets) == 0 { + return errors.New("fixed memory pool requires at least one bucket") + } + return nil +} + +type supportedAllocationTypes []string + +func (t supportedAllocationTypes) names() []string { + names := make([]string, 0, len(t)/2) + for i := 0; i < len(t); i += 2 { + names = append(names, t[i]) + } + return names +} + +func (t supportedAllocationTypes) descriptions() []string { + names := make([]string, 0, len(t)/2) + for i := 0; i < len(t); i += 2 { + names = append(names, fmt.Sprintf("%s (%s)", t[i], t[i+1])) + } + return names +} diff --git a/pkg/util/flagext/csv.go b/pkg/util/flagext/csv.go new file mode 100644 index 0000000000000..6ed5f9bad11a0 --- /dev/null +++ b/pkg/util/flagext/csv.go @@ -0,0 +1,62 @@ +package flagext + +import ( + "strings" +) + +type ListValue interface { + String() string + Parse(s string) (any, error) +} + +// StringSliceCSV is a slice of strings that is parsed from a comma-separated string +// It implements flag.Value and yaml Marshalers +type CSV[T ListValue] []T + +// String implements flag.Value +func (v CSV[T]) String() string { + s := make([]string, 0, len(v)) + for i := range v { + s = append(s, v[i].String()) + } + return strings.Join(s, ",") +} + +// Set implements flag.Value +func (v *CSV[T]) Set(s string) error { + if len(s) == 0 { + *v = nil + return nil + } + var zero T + values := strings.Split(s, ",") + *v = make(CSV[T], 0, len(values)) + for _, val := range values { + el, err := zero.Parse(val) + if err != nil { + return err + } + *v = append(*v, el.(T)) + } + return nil +} + +// String implements flag.Getter +func (v CSV[T]) Get() []T { + return v +} + +// UnmarshalYAML implements yaml.Unmarshaler. +func (v *CSV[T]) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + + return v.Set(s) +} + +// MarshalYAML implements yaml.Marshaler. +func (v CSV[T]) MarshalYAML() (interface{}, error) { + return v.String(), nil +} diff --git a/pkg/util/flagext/csv_test.go b/pkg/util/flagext/csv_test.go new file mode 100644 index 0000000000000..aca4ea8a77eef --- /dev/null +++ b/pkg/util/flagext/csv_test.go @@ -0,0 +1,79 @@ +package flagext + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +type customType int + +// Parse implements ListValue. +func (l customType) Parse(s string) (any, error) { + v, err := strconv.Atoi(s) + if err != nil { + return customType(0), err + } + return customType(v), nil +} + +// String implements ListValue. +func (l customType) String() string { + return strconv.Itoa(int(l)) +} + +var _ ListValue = customType(0) + +func Test_CSV(t *testing.T) { + for _, tc := range []struct { + in string + err bool + out []customType + }{ + { + in: "", + err: false, + out: nil, + }, + { + in: ",", + err: true, + out: []customType{}, + }, + { + in: "1", + err: false, + out: []customType{1}, + }, + { + in: "1,2", + err: false, + out: []customType{1, 2}, + }, + { + in: "1,", + err: true, + out: []customType{}, + }, + { + in: ",1", + err: true, + out: []customType{}, + }, + } { + t.Run(tc.in, func(t *testing.T) { + var v CSV[customType] + + err := v.Set(tc.in) + if tc.err { + require.NotNil(t, err) + } else { + require.Nil(t, err) + require.Equal(t, tc.out, v.Get()) + } + + }) + } + +} diff --git a/pkg/util/mempool/bucket.go b/pkg/util/mempool/bucket.go new file mode 100644 index 0000000000000..a041eb49e3f8f --- /dev/null +++ b/pkg/util/mempool/bucket.go @@ -0,0 +1,51 @@ +package mempool + +import ( + "errors" + "fmt" + "strconv" + "strings" + + "github.com/c2h5oh/datasize" +) + +type Bucket struct { + Size int + Capacity uint64 +} + +func (b Bucket) Parse(s string) (any, error) { + parts := strings.Split(s, "x") + if len(parts) != 2 { + return nil, errors.New("bucket must be in format {count}x{bytes}") + } + + size, err := strconv.Atoi(parts[0]) + if err != nil { + return nil, err + } + + capacity, err := datasize.ParseString(parts[1]) + if err != nil { + panic(err.Error()) + } + + return Bucket{ + Size: size, + Capacity: uint64(capacity), + }, nil +} + +func (b Bucket) String() string { + return fmt.Sprintf("%dx%s", b.Size, datasize.ByteSize(b.Capacity).String()) +} + +type Buckets []Bucket + +func (b Buckets) String() string { + s := make([]string, 0, len(b)) + for i := range b { + s = append(s, b[i].String()) + } + return strings.Join(s, ",") +} diff --git a/pkg/util/mempool/metrics.go b/pkg/util/mempool/metrics.go new file mode 100644 index 0000000000000..f7d5a52eb0d91 --- /dev/null +++ b/pkg/util/mempool/metrics.go @@ -0,0 +1,32 @@ +package mempool + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/grafana/loki/v3/pkg/util/constants" +) + +type metrics struct { + availableBuffersPerSlab *prometheus.CounterVec + errorsCounter *prometheus.CounterVec +} + +func newMetrics(r prometheus.Registerer, name string) *metrics { + return &metrics{ + availableBuffersPerSlab: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Subsystem: "mempool", + Name: "available_buffers_per_slab", + Help: "The amount of available buffers per slab.", + ConstLabels: prometheus.Labels{"pool": name}, + }, []string{"slab"}), + errorsCounter: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Subsystem: "mempool", + Name: "errors_total", + Help: "The total amount of errors returned from the pool.", + ConstLabels: prometheus.Labels{"pool": name}, + }, []string{"slab", "reason"}), + } +} diff --git a/pkg/util/mempool/pool.go b/pkg/util/mempool/pool.go new file mode 100644 index 0000000000000..b42d8d9237677 --- /dev/null +++ b/pkg/util/mempool/pool.go @@ -0,0 +1,135 @@ +package mempool + +import ( + "errors" + "fmt" + "sync" + "unsafe" + + "github.com/dustin/go-humanize" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + errSlabExhausted = errors.New("slab exhausted") + + reasonSizeExceeded = "size-exceeded" + reasonSlabExhausted = "slab-exhausted" +) + +type slab struct { + buffer chan unsafe.Pointer + size, count int + mtx sync.Mutex + metrics *metrics + name string +} + +func newSlab(bufferSize, bufferCount int, m *metrics) *slab { + name := humanize.Bytes(uint64(bufferSize)) + m.availableBuffersPerSlab.WithLabelValues(name).Add(0) // initialize metric with value 0 + + return &slab{ + size: bufferSize, + count: bufferCount, + metrics: m, + name: name, + } +} + +func (s *slab) init() { + s.buffer = make(chan unsafe.Pointer, s.count) + for i := 0; i < s.count; i++ { + buf := make([]byte, 0, s.size) + ptr := unsafe.Pointer(unsafe.SliceData(buf)) + s.buffer <- ptr + } + s.metrics.availableBuffersPerSlab.WithLabelValues(s.name).Add(float64(s.count)) +} + +func (s *slab) get(size int) ([]byte, error) { + s.mtx.Lock() + if s.buffer == nil { + s.init() + } + defer s.mtx.Unlock() + + // wait for available buffer on channel + var buf []byte + select { + case ptr := <-s.buffer: + buf = unsafe.Slice((*byte)(ptr), s.size) + default: + s.metrics.errorsCounter.WithLabelValues(s.name, reasonSlabExhausted).Inc() + return nil, errSlabExhausted + } + + // Taken from https://github.com/ortuman/nuke/blob/main/monotonic_arena.go#L37-L48 + // This piece of code will be translated into a runtime.memclrNoHeapPointers + // invocation by the compiler, which is an assembler optimized implementation. + // Architecture specific code can be found at src/runtime/memclr_$GOARCH.s + // in Go source (since https://codereview.appspot.com/137880043). + for i := range buf { + buf[i] = 0 + } + + return buf[:size], nil +} + +func (s *slab) put(buf []byte) { + if s.buffer == nil { + panic("slab is not initialized") + } + + ptr := unsafe.Pointer(unsafe.SliceData(buf)) + s.buffer <- ptr +} + +// MemPool is an Allocator implementation that uses a fixed size memory pool +// that is split into multiple slabs of different buffer sizes. +// Buffers are re-cycled and need to be returned back to the pool, otherwise +// the pool runs out of available buffers. +type MemPool struct { + slabs []*slab + metrics *metrics +} + +func New(name string, buckets []Bucket, r prometheus.Registerer) *MemPool { + a := &MemPool{ + slabs: make([]*slab, 0, len(buckets)), + metrics: newMetrics(r, name), + } + for _, b := range buckets { + a.slabs = append(a.slabs, newSlab(int(b.Capacity), b.Size, a.metrics)) + } + return a +} + +// Get satisfies Allocator interface +// Allocating a buffer from an exhausted pool/slab, or allocating a buffer that +// exceeds the largest slab size will return an error. +func (a *MemPool) Get(size int) ([]byte, error) { + for i := 0; i < len(a.slabs); i++ { + if a.slabs[i].size < size { + continue + } + return a.slabs[i].get(size) + } + a.metrics.errorsCounter.WithLabelValues("pool", reasonSizeExceeded).Inc() + return nil, fmt.Errorf("no slab found for size: %d", size) +} + +// Put satisfies Allocator interface +// Every buffer allocated with Get(size int) needs to be returned to the pool +// using Put(buffer []byte) so it can be re-cycled. +func (a *MemPool) Put(buffer []byte) bool { + size := cap(buffer) + for i := 0; i < len(a.slabs); i++ { + if a.slabs[i].size < size { + continue + } + a.slabs[i].put(buffer) + return true + } + return false +} diff --git a/pkg/util/mempool/pool_test.go b/pkg/util/mempool/pool_test.go new file mode 100644 index 0000000000000..da0fc361dd4a4 --- /dev/null +++ b/pkg/util/mempool/pool_test.go @@ -0,0 +1,133 @@ +package mempool + +import ( + "math/rand" + "sync" + "testing" + "time" + "unsafe" + + "github.com/stretchr/testify/require" +) + +func TestMemPool(t *testing.T) { + + t.Run("empty pool", func(t *testing.T) { + pool := New("test", []Bucket{}, nil) + _, err := pool.Get(256) + require.Error(t, err) + }) + + t.Run("requested size too big", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 1, Capacity: 128}, + }, nil) + _, err := pool.Get(256) + require.Error(t, err) + }) + + t.Run("requested size within bucket", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 1, Capacity: 128}, + {Size: 1, Capacity: 256}, + {Size: 1, Capacity: 512}, + }, nil) + res, err := pool.Get(200) + require.NoError(t, err) + require.Equal(t, 200, len(res)) + require.Equal(t, 256, cap(res)) + + res, err = pool.Get(300) + require.NoError(t, err) + require.Equal(t, 300, len(res)) + require.Equal(t, 512, cap(res)) + }) + + t.Run("buffer is cleared when returned", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 1, Capacity: 64}, + }, nil) + res, err := pool.Get(8) + require.NoError(t, err) + require.Equal(t, 8, len(res)) + source := []byte{0, 1, 2, 3, 4, 5, 6, 7} + copy(res, source) + + pool.Put(res) + + res, err = pool.Get(8) + require.NoError(t, err) + require.Equal(t, 8, len(res)) + require.Equal(t, make([]byte, 8), res) + }) + + t.Run("pool returns error when no buffer is available", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 1, Capacity: 64}, + }, nil) + buf1, _ := pool.Get(32) + require.Equal(t, 32, len(buf1)) + + _, err := pool.Get(16) + require.ErrorContains(t, err, errSlabExhausted.Error()) + }) + + t.Run("test ring buffer returns same backing array", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 2, Capacity: 128}, + }, nil) + res1, _ := pool.Get(32) + ptr1 := unsafe.Pointer(unsafe.SliceData(res1)) + + res2, _ := pool.Get(64) + ptr2 := unsafe.Pointer(unsafe.SliceData(res2)) + + pool.Put(res2) + pool.Put(res1) + + res3, _ := pool.Get(48) + ptr3 := unsafe.Pointer(unsafe.SliceData(res3)) + + res4, _ := pool.Get(96) + ptr4 := unsafe.Pointer(unsafe.SliceData(res4)) + + require.Equal(t, ptr1, ptr4) + require.Equal(t, ptr2, ptr3) + }) + + t.Run("concurrent access", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 32, Capacity: 2 << 10}, + {Size: 16, Capacity: 4 << 10}, + {Size: 8, Capacity: 8 << 10}, + {Size: 4, Capacity: 16 << 10}, + {Size: 2, Capacity: 32 << 10}, + }, nil) + + var wg sync.WaitGroup + numWorkers := 256 + n := 10 + + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < n; i++ { + s := 2 << rand.Intn(5) + buf1, err1 := pool.Get(s) + buf2, err2 := pool.Get(s) + if err2 == nil { + pool.Put(buf2) + } + time.Sleep(time.Millisecond * time.Duration(rand.Intn(10))) + if err1 == nil { + pool.Put(buf1) + } + } + }() + } + + wg.Wait() + t.Log("finished") + }) +} diff --git a/tools/bloom/inspector/main.go b/tools/bloom/inspector/main.go index dfcc7c79cd86d..90aa8c29c2cbb 100644 --- a/tools/bloom/inspector/main.go +++ b/tools/bloom/inspector/main.go @@ -18,7 +18,7 @@ func main() { r := v1.NewDirectoryBlockReader(path) b := v1.NewBlock(r, v1.NewMetrics(nil)) - q := v1.NewBlockQuerier(b, true, v1.DefaultMaxPageSize) + q := v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize) md, err := q.Metadata() if err != nil {