Skip to content

Commit

Permalink
feat(compress): support snappy compression
Browse files Browse the repository at this point in the history
Signed-off-by: xkx <[email protected]>
  • Loading branch information
xkx9431 committed Dec 6, 2024
1 parent 6be5081 commit bf6572e
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 0 deletions.
43 changes: 43 additions & 0 deletions lib/util/lifted/influx/httpd/handler_compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"sync"

"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
"github.com/openGemini/openGemini/lib/cpu"
)
Expand Down Expand Up @@ -49,6 +50,11 @@ func compressFilter(inner http.Handler) http.Handler {
defer enc.Close()
writer = enc
w.Header().Set("Content-Encoding", "zstd")
case strings.Contains(acceptEncoding, "snappy"):
sn := getSnappyWriter(w)
defer sn.Close()
writer = sn
w.Header().Set("Content-Encoding", "snappy")
default:
inner.ServeHTTP(w, r)
return
Expand Down Expand Up @@ -242,3 +248,40 @@ func putZstdWriter(zstdEncoder *zstd.Encoder) {
}

// ******************** endregion zstd write pool ***********************

// ******************** region snappy write pool ***************************
type SnappyWriterPool struct {
pool *FixedCachePool
}

func NewSnappyWriterPool() *SnappyWriterPool {
p := &SnappyWriterPool{
pool: NewFixedCachePool(cpu.GetCpuNum()*2, func() interface{} {
return snappy.NewBufferedWriter(nil)
}),
}
return p
}

func (p *SnappyWriterPool) Get() *snappy.Writer {
return p.pool.Get().(*snappy.Writer)
}

func (p *SnappyWriterPool) Put(snappyWriter *snappy.Writer) {
p.pool.Put(snappyWriter)
}

var snappyWriterPool = NewSnappyWriterPool()

func getSnappyWriter(w io.Writer) *snappy.Writer {
snappyWriter := snappyWriterPool.Get()
snappyWriter.Reset(w)
return snappyWriter
}

func putSnappyWriter(snappyWriter *snappy.Writer) {
snappyWriter.Close()
snappyWriterPool.Put(snappyWriter)
}

// ******************** endregion snappy write pool ***************************
32 changes: 32 additions & 0 deletions lib/util/lifted/influx/httpd/handler_compress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http/httptest"
"testing"

"github.com/golang/snappy"
"github.com/klauspost/compress/zstd"
)

Expand Down Expand Up @@ -91,6 +92,37 @@ func TestCompressFilter_Zstd(t *testing.T) {
}
}

func TestCompressFilter_Snappy(t *testing.T) {
handler := compressFilter(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test response"))
}))

req := httptest.NewRequest("GET", "/", nil)
req.Header.Set("Accept-Encoding", "snappy")
w := httptest.NewRecorder()

handler.ServeHTTP(w, req)

resp := w.Result()
defer resp.Body.Close()

if resp.Header.Get("Content-Encoding") != "snappy" {
t.Errorf("expected snappy encoding, got %s", resp.Header.Get("Content-Encoding"))
}

snappyReader := snappy.NewReader(resp.Body)

body, err := io.ReadAll(snappyReader)
if err != nil {
t.Fatalf("failed to read snappy body: %v", err)
}

if string(body) != "test response" {
t.Errorf("expected body 'test response', got %s", string(body))
}

}

func TestCompressFilter_NoEncoding(t *testing.T) {
handler := compressFilter(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("test response"))
Expand Down

0 comments on commit bf6572e

Please sign in to comment.