From c4a3a9199186f0a14bc07f519c8b6f757e35b9f2 Mon Sep 17 00:00:00 2001 From: xkx Date: Fri, 6 Dec 2024 14:38:01 +0000 Subject: [PATCH] feat(compress): support lz4 compression Signed-off-by: xkx --- .../lifted/influx/httpd/handler_compress.go | 50 +++++++++++++++++++ .../influx/httpd/handler_compress_test.go | 31 ++++++++++++ 2 files changed, 81 insertions(+) diff --git a/lib/util/lifted/influx/httpd/handler_compress.go b/lib/util/lifted/influx/httpd/handler_compress.go index 7cfdebce..18c02764 100644 --- a/lib/util/lifted/influx/httpd/handler_compress.go +++ b/lib/util/lifted/influx/httpd/handler_compress.go @@ -24,6 +24,7 @@ import ( "github.com/golang/snappy" "github.com/klauspost/compress/zstd" "github.com/openGemini/openGemini/lib/cpu" + "github.com/pierrec/lz4/v4" ) type lazyCompressResponseWriter struct { @@ -38,6 +39,7 @@ type lazyCompressResponseWriter struct { func compressFilter(inner http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var writer io.Writer = w + // todo: consider Content_negotiation: https://developer.mozilla.org/en-US/docs/Web/HTTP/Content_negotiation acceptEncoding := r.Header.Get("Accept-Encoding") switch { case strings.Contains(acceptEncoding, "gzip"): @@ -55,6 +57,11 @@ func compressFilter(inner http.Handler) http.Handler { defer sn.Close() writer = sn w.Header().Set("Content-Encoding", "snappy") + case strings.Contains(acceptEncoding, "lz4"): + lz := getLz4Writer(w) + defer lz.Close() + writer = lz + w.Header().Set("Content-Encoding", "lz4") default: inner.ServeHTTP(w, r) return @@ -112,6 +119,12 @@ func (w *lazyCompressResponseWriter) Close() error { if zw, ok := w.Writer.(*zstd.Encoder); ok { putZstdWriter(zw) } + if sw, ok := w.Writer.(*snappy.Writer); ok { + putSnappyWriter(sw) + } + if lw, ok := w.Writer.(*lz4.Writer); ok { + putLz4Writer(lw) + } return nil } @@ -285,3 +298,40 @@ func putSnappyWriter(snappyWriter *snappy.Writer) { } // ******************** endregion snappy write pool *************************** + +// ******************** region lz4 write pool *************************** +type Lz4WriterPool struct { + pool *FixedCachePool +} + +func NewLz4WriterPool() *Lz4WriterPool { + p := &Lz4WriterPool{ + pool: NewFixedCachePool(cpu.GetCpuNum()*2, func() interface{} { + return lz4.NewWriter(nil) + }), + } + return p +} + +func (p *Lz4WriterPool) Get() *lz4.Writer { + return p.pool.Get().(*lz4.Writer) +} + +func (p *Lz4WriterPool) Put(lz4Writer *lz4.Writer) { + p.pool.Put(lz4Writer) +} + +var lz4WriterPool = NewLz4WriterPool() + +func getLz4Writer(w io.Writer) *lz4.Writer { + lz4Writer := lz4WriterPool.Get() + lz4Writer.Reset(w) + return lz4Writer +} + +func putLz4Writer(lz4Writer *lz4.Writer) { + lz4Writer.Close() + lz4WriterPool.Put(lz4Writer) +} + +// ******************** endregion lz4 write pool *************************** diff --git a/lib/util/lifted/influx/httpd/handler_compress_test.go b/lib/util/lifted/influx/httpd/handler_compress_test.go index b13cbb19..3fdce0c9 100644 --- a/lib/util/lifted/influx/httpd/handler_compress_test.go +++ b/lib/util/lifted/influx/httpd/handler_compress_test.go @@ -22,6 +22,7 @@ import ( "github.com/golang/snappy" "github.com/klauspost/compress/zstd" + "github.com/pierrec/lz4/v4" ) func TestCompressFilter_Gzip(t *testing.T) { @@ -123,6 +124,36 @@ func TestCompressFilter_Snappy(t *testing.T) { } +func TestCompressFilter_Lz4(t *testing.T) { + handler := compressFilter(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("test response for lz4")) + })) + + req := httptest.NewRequest("GET", "/", nil) + req.Header.Set("Accept-Encoding", "lz4") + w := httptest.NewRecorder() + + handler.ServeHTTP(w, req) + + resp := w.Result() + defer resp.Body.Close() + + if resp.Header.Get("Content-Encoding") != "lz4" { + t.Errorf("expected lz4 encoding, got %s", resp.Header.Get("Content-Encoding")) + } + + lz4Reader := lz4.NewReader(resp.Body) + + body, err := io.ReadAll(lz4Reader) + if err != nil { + t.Fatalf("failed to read body: %v", err) + } + + if string(body) != "test response for lz4" { + t.Errorf("expected body 'test response for lz4', got %s", string(body)) + } +} + func TestCompressFilter_NoEncoding(t *testing.T) { handler := compressFilter(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("test response"))