Skip to content

Commit

Permalink
feat(compress): support lz4 compression
Browse files Browse the repository at this point in the history
Signed-off-by: xkx <[email protected]>
  • Loading branch information
xkx9431 committed Dec 6, 2024
1 parent bf6572e commit c4a3a91
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
50 changes: 50 additions & 0 deletions lib/util/lifted/influx/httpd/handler_compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/openGemini/openGemini/lib/cpu"
"github.com/pierrec/lz4/v4"
)

type lazyCompressResponseWriter struct {
Expand All @@ -38,6 +39,7 @@ type lazyCompressResponseWriter struct {
func compressFilter(inner http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var writer io.Writer = w
// todo: consider Content_negotiation: https://developer.mozilla.org/en-US/docs/Web/HTTP/Content_negotiation
acceptEncoding := r.Header.Get("Accept-Encoding")
switch {
case strings.Contains(acceptEncoding, "gzip"):
Expand All @@ -55,6 +57,11 @@ func compressFilter(inner http.Handler) http.Handler {
defer sn.Close()
writer = sn
w.Header().Set("Content-Encoding", "snappy")
case strings.Contains(acceptEncoding, "lz4"):
lz := getLz4Writer(w)
defer lz.Close()
writer = lz
w.Header().Set("Content-Encoding", "lz4")
default:
inner.ServeHTTP(w, r)
return
Expand Down Expand Up @@ -112,6 +119,12 @@ func (w *lazyCompressResponseWriter) Close() error {
if zw, ok := w.Writer.(*zstd.Encoder); ok {
putZstdWriter(zw)
}
if sw, ok := w.Writer.(*snappy.Writer); ok {
putSnappyWriter(sw)
}
if lw, ok := w.Writer.(*lz4.Writer); ok {
putLz4Writer(lw)
}
return nil
}

Expand Down Expand Up @@ -285,3 +298,40 @@ func putSnappyWriter(snappyWriter *snappy.Writer) {
}

// ******************** endregion snappy write pool ***************************

// ******************** region lz4 write pool ***************************
type Lz4WriterPool struct {
pool *FixedCachePool
}

func NewLz4WriterPool() *Lz4WriterPool {
p := &Lz4WriterPool{
pool: NewFixedCachePool(cpu.GetCpuNum()*2, func() interface{} {
return lz4.NewWriter(nil)
}),
}
return p
}

func (p *Lz4WriterPool) Get() *lz4.Writer {
return p.pool.Get().(*lz4.Writer)
}

func (p *Lz4WriterPool) Put(lz4Writer *lz4.Writer) {
p.pool.Put(lz4Writer)
}

var lz4WriterPool = NewLz4WriterPool()

func getLz4Writer(w io.Writer) *lz4.Writer {
lz4Writer := lz4WriterPool.Get()
lz4Writer.Reset(w)
return lz4Writer
}

func putLz4Writer(lz4Writer *lz4.Writer) {
lz4Writer.Close()
lz4WriterPool.Put(lz4Writer)
}

// ******************** endregion lz4 write pool ***************************
31 changes: 31 additions & 0 deletions lib/util/lifted/influx/httpd/handler_compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"
)

func TestCompressFilter_Gzip(t *testing.T) {
Expand Down Expand Up @@ -123,6 +124,36 @@ func TestCompressFilter_Snappy(t *testing.T) {

}

func TestCompressFilter_Lz4(t *testing.T) {
handler := compressFilter(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test response for lz4"))
}))

req := httptest.NewRequest("GET", "/", nil)
req.Header.Set("Accept-Encoding", "lz4")
w := httptest.NewRecorder()

handler.ServeHTTP(w, req)

resp := w.Result()
defer resp.Body.Close()

if resp.Header.Get("Content-Encoding") != "lz4" {
t.Errorf("expected lz4 encoding, got %s", resp.Header.Get("Content-Encoding"))
}

lz4Reader := lz4.NewReader(resp.Body)

body, err := io.ReadAll(lz4Reader)
if err != nil {
t.Fatalf("failed to read body: %v", err)
}

if string(body) != "test response for lz4" {
t.Errorf("expected body 'test response for lz4', got %s", string(body))
}
}

func TestCompressFilter_NoEncoding(t *testing.T) {
handler := compressFilter(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test response"))
Expand Down

0 comments on commit c4a3a91

Please sign in to comment.