diff --git a/lib/compress/compression_pool.go b/lib/compress/compression_pool.go new file mode 100644 index 00000000..08a8eccb --- /dev/null +++ b/lib/compress/compression_pool.go @@ -0,0 +1,219 @@ +package compress + +import ( + "compress/gzip" + "io" + "sync" + + "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" + "github.com/openGemini/openGemini/lib/cpu" + "github.com/pierrec/lz4/v4" +) + +var ( + gzipWriterPool = NewGzipWriterPool() + zstdWriterPool = NewZstdWriterPool() + snappyWriterPool = NewSnappyWriterPool() + lz4WriterPool = NewLz4WriterPool() + + gzipReaderPool sync.Pool +) + +type FixedCachePool struct { + cache chan interface{} + sp sync.Pool +} + +func NewFixedCachePool(size int, newFunc func() interface{}) *FixedCachePool { + return &FixedCachePool{ + cache: make(chan interface{}, size), + sp: sync.Pool{ + New: newFunc, + }, + } +} + +func (p *FixedCachePool) Get() interface{} { + select { + case item := <-p.cache: + return item + default: + item := p.sp.Get() + if item == nil { + item = p.sp.New() + } + return item + } +} + +func (p *FixedCachePool) Put(item interface{}) { + select { + case p.cache <- item: + default: + p.sp.Put(item) + } +} + +// ******************** region gzip write pool *********************** +type GzipWriterPool struct { + pool *FixedCachePool +} + +func NewGzipWriterPool() *GzipWriterPool { + p := &GzipWriterPool{ + pool: NewFixedCachePool(cpu.GetCpuNum()*2, func() interface{} { + return gzip.NewWriter(nil) + }), + } + return p +} + +func (p *GzipWriterPool) Get() *gzip.Writer { + return p.pool.Get().(*gzip.Writer) +} + +func (p *GzipWriterPool) Put(gz *gzip.Writer) { + p.pool.Put(gz) +} + +func GetGzipWriter(w io.Writer) *gzip.Writer { + gz := gzipWriterPool.Get() + gz.Reset(w) + return gz +} + +func PutGzipWriter(gz *gzip.Writer) { + gz.Close() + gzipWriterPool.Put(gz) +} + +// ******************** endregion gzip write pool *********************** + +// ******************** region gzip read pool *************************** + +func GetGzipReader(r io.Reader) (*gzip.Reader, error) { + v := gzipReaderPool.Get() + if v == nil { + return gzip.NewReader(r) + } + zr := v.(*gzip.Reader) + if err := zr.Reset(r); err != nil { + return nil, err + } + return zr, nil +} + +// PutGzipReader returns back gzip reader obtained via GetGzipReader. +func PutGzipReader(zr *gzip.Reader) { + _ = zr.Close() + gzipReaderPool.Put(zr) +} + +// ******************** endregion gzip read pool *************************** + +// ******************** region zstd write pool *********************** +type ZstdWriterPool struct { + pool *FixedCachePool +} + +func NewZstdWriterPool() *ZstdWriterPool { + p := &ZstdWriterPool{ + pool: NewFixedCachePool(cpu.GetCpuNum()*2, func() interface{} { + encoder, _ := zstd.NewWriter(nil) + return encoder + }), + } + return p +} + +func (p *ZstdWriterPool) Get() *zstd.Encoder { + return p.pool.Get().(*zstd.Encoder) +} + +func (p *ZstdWriterPool) Put(zstdEncoder *zstd.Encoder) { + p.pool.Put(zstdEncoder) +} + +func GetZstdWriter(w io.Writer) *zstd.Encoder { + zstdEncoder := zstdWriterPool.Get() + zstdEncoder.Reset(w) + return zstdEncoder +} + +func PutZstdWriter(zstdEncoder *zstd.Encoder) { + zstdEncoder.Close() + zstdWriterPool.Put(zstdEncoder) +} + +// ******************** endregion zstd write pool *********************** + +// ******************** region snappy write pool *************************** +type SnappyWriterPool struct { + pool *FixedCachePool +} + +func NewSnappyWriterPool() *SnappyWriterPool { + p := &SnappyWriterPool{ + pool: NewFixedCachePool(cpu.GetCpuNum()*2, func() interface{} { + return snappy.NewBufferedWriter(nil) + }), + } + return p +} + +func (p *SnappyWriterPool) Get() *snappy.Writer { + return p.pool.Get().(*snappy.Writer) +} + +func (p *SnappyWriterPool) Put(snappyWriter *snappy.Writer) { + p.pool.Put(snappyWriter) +} + +func GetSnappyWriter(w io.Writer) *snappy.Writer { + snappyWriter := snappyWriterPool.Get() + snappyWriter.Reset(w) + return snappyWriter +} + +func PutSnappyWriter(snappyWriter *snappy.Writer) { + snappyWriter.Close() + snappyWriterPool.Put(snappyWriter) +} + +// ******************** endregion snappy write pool *************************** + +// ******************** region lz4 write pool *************************** +type Lz4WriterPool struct { + pool *FixedCachePool +} + +func NewLz4WriterPool() *Lz4WriterPool { + p := &Lz4WriterPool{ + pool: NewFixedCachePool(cpu.GetCpuNum()*2, func() interface{} { + return lz4.NewWriter(nil) + }), + } + return p +} + +func (p *Lz4WriterPool) Get() *lz4.Writer { + return p.pool.Get().(*lz4.Writer) +} + +func (p *Lz4WriterPool) Put(lz4Writer *lz4.Writer) { + p.pool.Put(lz4Writer) +} + +func GetLz4Writer(w io.Writer) *lz4.Writer { + lz4Writer := lz4WriterPool.Get() + lz4Writer.Reset(w) + return lz4Writer +} + +func PutLz4Writer(lz4Writer *lz4.Writer) { + lz4Writer.Close() + lz4WriterPool.Put(lz4Writer) +} + +// ******************** endregion lz4 write pool *************************** diff --git a/lib/compress/compression_pool_test.go b/lib/compress/compression_pool_test.go new file mode 100644 index 00000000..1aebcd66 --- /dev/null +++ b/lib/compress/compression_pool_test.go @@ -0,0 +1,75 @@ +package compress + +import ( + "bytes" + "compress/gzip" + "io" + "testing" + + "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" + "github.com/pierrec/lz4/v4" + "github.com/stretchr/testify/assert" +) + +func TestGzipWriterPool(t *testing.T) { + var buf bytes.Buffer + writer := GetGzipWriter(&buf) + _, err := writer.Write([]byte("test data")) + assert.NoError(t, err) + PutGzipWriter(writer) + + reader, err := gzip.NewReader(&buf) + assert.NoError(t, err) + defer reader.Close() + + result := new(bytes.Buffer) + _, err = io.Copy(result, reader) + assert.NoError(t, err) + assert.Equal(t, "test data", result.String()) +} + +func TestZstdWriterPool(t *testing.T) { + var buf bytes.Buffer + writer := GetZstdWriter(&buf) + _, err := writer.Write([]byte("test data")) + assert.NoError(t, err) + PutZstdWriter(writer) + + reader, err := zstd.NewReader(&buf) + assert.NoError(t, err) + defer reader.Close() + + result := new(bytes.Buffer) + _, err = io.Copy(result, reader) + assert.NoError(t, err) + assert.Equal(t, "test data", result.String()) +} + +func TestSnappyWriterPool(t *testing.T) { + var buf bytes.Buffer + writer := GetSnappyWriter(&buf) + _, err := writer.Write([]byte("test data")) + assert.NoError(t, err) + PutSnappyWriter(writer) + + reader := snappy.NewReader(&buf) + result := new(bytes.Buffer) + _, err = io.Copy(result, reader) + assert.NoError(t, err) + assert.Equal(t, "test data", result.String()) +} + +func TestLz4WriterPool(t *testing.T) { + var buf bytes.Buffer + writer := GetLz4Writer(&buf) + _, err := writer.Write([]byte("test data")) + assert.NoError(t, err) + PutLz4Writer(writer) + + reader := lz4.NewReader(&buf) + result := new(bytes.Buffer) + _, err = io.Copy(result, reader) + assert.NoError(t, err) + assert.Equal(t, "test data", result.String()) +}