From 0937c259ea937425231886b1d2e6a05d5b5893dd Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 16 Nov 2023 14:20:17 +0100 Subject: [PATCH] Remove re-implementation of BytesBuffer pool There is already an implementation for a bucketed pool in the Prometheus utilities, which is also used elsewhere in the code. Signed-off-by: Christian Haudum --- pkg/ingester/checkpoint.go | 22 ++++++---- pkg/util/pool/bytesbuffer.go | 69 ------------------------------- pkg/util/pool/bytesbuffer_test.go | 17 -------- 3 files changed, 15 insertions(+), 93 deletions(-) delete mode 100644 pkg/util/pool/bytesbuffer.go delete mode 100644 pkg/util/pool/bytesbuffer_test.go diff --git a/pkg/ingester/checkpoint.go b/pkg/ingester/checkpoint.go index e2c8ef2c18681..d7686c58e3f6e 100644 --- a/pkg/ingester/checkpoint.go +++ b/pkg/ingester/checkpoint.go @@ -18,13 +18,12 @@ import ( tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/wlog" - prompool "github.com/prometheus/prometheus/util/pool" + "github.com/prometheus/prometheus/util/pool" "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/ingester/wal" "github.com/grafana/loki/pkg/logproto" util_log "github.com/grafana/loki/pkg/util/log" - "github.com/grafana/loki/pkg/util/pool" ) var ( @@ -32,9 +31,13 @@ var ( // Starting with something sane first then we can refine with more experience. // Buckets [1KB 2KB 4KB 16KB 32KB to 4MB] by 2 - chunksBufferPool = pool.NewBuffer(1024, 4*1024*1024, 2) + chunksBufferPool = pool.New(1024, 4*1024*1024, 2, func(i int) interface{} { + return bytes.NewBuffer(make([]byte, 0, i)) + }) // Buckets [64B 128B 256B 512B... to 2MB] by 2 - headBufferPool = pool.NewBuffer(64, 2*1024*1024, 2) + headBufferPool = pool.New(64, 2*1024*1024, 2, func(i int) interface{} { + return bytes.NewBuffer(make([]byte, 0, i)) + }) ) type chunkWithBuffer struct { @@ -62,6 +65,11 @@ func toWireChunks(descs []chunkDesc, wireChunks []chunkWithBuffer) ([]chunkWithB from, to := d.chunk.Bounds() chunkSize, headSize := d.chunk.CheckpointSize() + blocks := chunksBufferPool.Get(chunkSize).(*bytes.Buffer) + blocks.Reset() + head := headBufferPool.Get(headSize).(*bytes.Buffer) + head.Reset() + wireChunk := chunkWithBuffer{ Chunk: Chunk{ From: from, @@ -71,8 +79,8 @@ func toWireChunks(descs []chunkDesc, wireChunks []chunkWithBuffer) ([]chunkWithB LastUpdated: d.lastUpdated, Synced: d.synced, }, - blocks: chunksBufferPool.Get(chunkSize), - head: headBufferPool.Get(headSize), + blocks: blocks, + head: head, } err := d.chunk.SerializeForCheckpointTo( @@ -361,7 +369,7 @@ func (w *WALCheckpointWriter) Advance() (bool, error) { } // Buckets [64KB to 256MB] by 2 -var recordBufferPool = prompool.New(1<<16, 1<<28, 2, func(size int) interface{} { return make([]byte, 0, size) }) +var recordBufferPool = pool.New(1<<16, 1<<28, 2, func(size int) interface{} { return make([]byte, 0, size) }) func (w *WALCheckpointWriter) Write(s *Series) error { size := s.Size() + 1 // +1 for header diff --git a/pkg/util/pool/bytesbuffer.go b/pkg/util/pool/bytesbuffer.go deleted file mode 100644 index f357e5be9a2f5..0000000000000 --- a/pkg/util/pool/bytesbuffer.go +++ /dev/null @@ -1,69 +0,0 @@ -package pool - -import ( - "bytes" - "sync" -) - -// BufferPool is a bucketed pool for variably bytes buffers. -type BufferPool struct { - buckets []sync.Pool - sizes []int -} - -// NewBuffer a new Pool with size buckets for minSize to maxSize -// increasing by the given factor. -func NewBuffer(minSize, maxSize int, factor float64) *BufferPool { - if minSize < 1 { - panic("invalid minimum pool size") - } - if maxSize < 1 { - panic("invalid maximum pool size") - } - if factor < 1 { - panic("invalid factor") - } - - var sizes []int - - for s := minSize; s <= maxSize; s = int(float64(s) * factor) { - sizes = append(sizes, s) - } - - return &BufferPool{ - buckets: make([]sync.Pool, len(sizes)), - sizes: sizes, - } -} - -// Get returns a byte buffer that fits the given size. -func (p *BufferPool) Get(sz int) *bytes.Buffer { - for i, bktSize := range p.sizes { - if sz > bktSize { - continue - } - b := p.buckets[i].Get() - if b == nil { - b = bytes.NewBuffer(make([]byte, 0, bktSize)) - } - buf := b.(*bytes.Buffer) - buf.Reset() - return b.(*bytes.Buffer) - } - return bytes.NewBuffer(make([]byte, 0, sz)) -} - -// Put adds a byte buffer to the right bucket in the pool. -func (p *BufferPool) Put(s *bytes.Buffer) { - if s == nil { - return - } - capt := s.Cap() - for i, size := range p.sizes { - if capt > size { - continue - } - p.buckets[i].Put(s) - return - } -} diff --git a/pkg/util/pool/bytesbuffer_test.go b/pkg/util/pool/bytesbuffer_test.go deleted file mode 100644 index 0fa955d01bcd2..0000000000000 --- a/pkg/util/pool/bytesbuffer_test.go +++ /dev/null @@ -1,17 +0,0 @@ -package pool - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func Test_ZeroBuffer(t *testing.T) { - p := NewBuffer(2, 10, 2) - require.Equal(t, 0, p.Get(1).Len()) - require.Equal(t, 0, p.Get(1).Len()) - require.Equal(t, 0, p.Get(2).Len()) - require.Equal(t, 0, p.Get(2).Len()) - require.Equal(t, 0, p.Get(20).Len()) - require.Equal(t, 0, p.Get(20).Len()) -}