Skip to content

Commit

Permalink
Remove re-implementation of BytesBuffer pool
Browse files Browse the repository at this point in the history
There is already an implementation for a bucketed pool
in the Prometheus utilities, which is also used elsewhere in the code.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Nov 16, 2023
1 parent 8328345 commit 0937c25
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 93 deletions.
22 changes: 15 additions & 7 deletions pkg/ingester/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,26 @@ import (
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/wlog"
prompool "github.com/prometheus/prometheus/util/pool"
"github.com/prometheus/prometheus/util/pool"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/wal"
"github.com/grafana/loki/pkg/logproto"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/pool"
)

var (
// todo(ctovena) those pools should be in factor of the actual configuration (blocksize, targetsize).
// Starting with something sane first then we can refine with more experience.

// Buckets [1KB 2KB 4KB 16KB 32KB to 4MB] by 2
chunksBufferPool = pool.NewBuffer(1024, 4*1024*1024, 2)
chunksBufferPool = pool.New(1024, 4*1024*1024, 2, func(i int) interface{} {
return bytes.NewBuffer(make([]byte, 0, i))
})
// Buckets [64B 128B 256B 512B... to 2MB] by 2
headBufferPool = pool.NewBuffer(64, 2*1024*1024, 2)
headBufferPool = pool.New(64, 2*1024*1024, 2, func(i int) interface{} {
return bytes.NewBuffer(make([]byte, 0, i))
})
)

type chunkWithBuffer struct {
Expand Down Expand Up @@ -62,6 +65,11 @@ func toWireChunks(descs []chunkDesc, wireChunks []chunkWithBuffer) ([]chunkWithB
from, to := d.chunk.Bounds()
chunkSize, headSize := d.chunk.CheckpointSize()

blocks := chunksBufferPool.Get(chunkSize).(*bytes.Buffer)
blocks.Reset()
head := headBufferPool.Get(headSize).(*bytes.Buffer)
head.Reset()

wireChunk := chunkWithBuffer{
Chunk: Chunk{
From: from,
Expand All @@ -71,8 +79,8 @@ func toWireChunks(descs []chunkDesc, wireChunks []chunkWithBuffer) ([]chunkWithB
LastUpdated: d.lastUpdated,
Synced: d.synced,
},
blocks: chunksBufferPool.Get(chunkSize),
head: headBufferPool.Get(headSize),
blocks: blocks,
head: head,
}

err := d.chunk.SerializeForCheckpointTo(
Expand Down Expand Up @@ -361,7 +369,7 @@ func (w *WALCheckpointWriter) Advance() (bool, error) {
}

// Buckets [64KB to 256MB] by 2
var recordBufferPool = prompool.New(1<<16, 1<<28, 2, func(size int) interface{} { return make([]byte, 0, size) })
var recordBufferPool = pool.New(1<<16, 1<<28, 2, func(size int) interface{} { return make([]byte, 0, size) })

func (w *WALCheckpointWriter) Write(s *Series) error {
size := s.Size() + 1 // +1 for header
Expand Down
69 changes: 0 additions & 69 deletions pkg/util/pool/bytesbuffer.go

This file was deleted.

17 changes: 0 additions & 17 deletions pkg/util/pool/bytesbuffer_test.go

This file was deleted.

0 comments on commit 0937c25

Please sign in to comment.