From d25c1c8822296452e9d9d929b2eef3a08e9a1d36 Mon Sep 17 00:00:00 2001 From: Nicholas Wiersma Date: Tue, 16 Jan 2024 20:27:56 +0200 Subject: [PATCH] feat: support zstandard in ocf (#343) --- go.mod | 1 + go.sum | 2 + ocf/codec.go | 30 ++++++++- ocf/ocf_test.go | 96 ++++++++++++++++++++++++++++ ocf/testdata/full-zstd.avro | Bin 0 -> 951 bytes ocf/testdata/zstd-invalid-data.avro | Bin 0 -> 952 bytes 6 files changed, 126 insertions(+), 3 deletions(-) create mode 100644 ocf/testdata/full-zstd.avro create mode 100644 ocf/testdata/zstd-invalid-data.avro diff --git a/go.mod b/go.mod index caf519bf..696ef6e9 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/ettle/strcase v0.2.0 github.com/golang/snappy v0.0.4 github.com/json-iterator/go v1.1.12 + github.com/klauspost/compress v1.17.4 github.com/mitchellh/mapstructure v1.5.0 github.com/modern-go/reflect2 v1.0.2 github.com/stretchr/testify v1.7.1 diff --git a/go.sum b/go.sum index c154e11f..76652a96 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,8 @@ github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= +github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= diff --git a/ocf/codec.go b/ocf/codec.go index 2340a6dd..422d2c96 100644 --- a/ocf/codec.go +++ b/ocf/codec.go @@ -10,6 +10,7 @@ import ( "io" "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" ) // CodecName represents a compression codec name. @@ -17,9 +18,10 @@ type CodecName string // Supported compression codecs. const ( - Null CodecName = "null" - Deflate CodecName = "deflate" - Snappy CodecName = "snappy" + Null CodecName = "null" + Deflate CodecName = "deflate" + Snappy CodecName = "snappy" + ZStandard CodecName = "zstandard" ) func resolveCodec(name CodecName, lvl int) (Codec, error) { @@ -33,6 +35,9 @@ func resolveCodec(name CodecName, lvl int) (Codec, error) { case Snappy: return &SnappyCodec{}, nil + case ZStandard: + return &ZStandardCodec{}, nil + default: return nil, fmt.Errorf("unknown codec %s", name) } @@ -120,3 +125,22 @@ func (*SnappyCodec) Encode(b []byte) []byte { return dst } + +// ZStandardCodec is a zstandard compression codec. +type ZStandardCodec struct{} + +// Decode decodes the given bytes. +func (*ZStandardCodec) Decode(b []byte) ([]byte, error) { + dec, _ := zstd.NewReader(nil) + defer dec.Close() + + return dec.DecodeAll(b, nil) +} + +// Encode encodes the given bytes. +func (*ZStandardCodec) Encode(b []byte) []byte { + enc, _ := zstd.NewWriter(nil) + defer func() { _ = enc.Close() }() + + return enc.EncodeAll(b, nil) +} diff --git a/ocf/ocf_test.go b/ocf/ocf_test.go index 3b02a082..102a1da4 100644 --- a/ocf/ocf_test.go +++ b/ocf/ocf_test.go @@ -311,6 +311,65 @@ func TestDecoder_WithSnappyHandlesInvalidCRC(t *testing.T) { assert.Error(t, dec.Error()) } +func TestDecoder_WithZStandard(t *testing.T) { + unionStr := "union value" + want := FullRecord{ + Strings: []string{"string1", "string2", "string3", "string4", "string5"}, + Longs: []int64{1, 2, 3, 4, 5}, + Enum: "C", + Map: map[string]int{ + "key1": 1, + "key2": 2, + "key3": 3, + "key4": 4, + "key5": 5, + }, + Nullable: &unionStr, + Fixed: [16]byte{0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04}, + Record: &TestRecord{ + Long: 1925639126735, + String: "I am a test record", + Int: 666, + Float: 7171.17, + Double: 916734926348163.01973408746523, + Bool: true, + }, + } + + f, err := os.Open("testdata/full-zstd.avro") + require.NoError(t, err) + t.Cleanup(func() { _ = f.Close() }) + + dec, err := ocf.NewDecoder(f) + require.NoError(t, err) + + var count int + for dec.HasNext() { + count++ + var got FullRecord + err = dec.Decode(&got) + + require.NoError(t, err) + assert.Equal(t, want, got) + } + + require.NoError(t, dec.Error()) + assert.Equal(t, 1, count) +} + +func TestDecoder_WithZStandardHandlesInvalidData(t *testing.T) { + f, err := os.Open("testdata/zstd-invalid-data.avro") + require.NoError(t, err) + t.Cleanup(func() { _ = f.Close() }) + + dec, err := ocf.NewDecoder(f) + require.NoError(t, err) + + dec.HasNext() + + assert.Error(t, dec.Error()) +} + func TestDecoder_DecodeAvroError(t *testing.T) { data := []byte{'O', 'b', 'j', 0x01, 0x01, 0x26, 0x16, 'a', 'v', 'r', 'o', '.', 's', 'c', 'h', 'e', 'm', 'a', 0x0c, '"', 'l', 'o', 'n', 'g', '"', 0x00, 0xfb, 0x2b, 0x0f, 0x1a, 0xdd, 0xfd, 0x90, 0x7d, 0x87, 0x12, @@ -681,6 +740,43 @@ func TestEncoder_EncodeCompressesSnappy(t *testing.T) { assert.Equal(t, 938, buf.Len()) } +func TestEncoder_EncodeCompressesZStandard(t *testing.T) { + unionStr := "union value" + record := FullRecord{ + Strings: []string{"string1", "string2", "string3", "string4", "string5"}, + Longs: []int64{1, 2, 3, 4, 5}, + Enum: "C", + Map: map[string]int{ + "key1": 1, + "key2": 2, + "key3": 3, + "key4": 4, + "key5": 5, + }, + Nullable: &unionStr, + Fixed: [16]byte{0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04}, + Record: &TestRecord{ + Long: 1925639126735, + String: "I am a test record", + Int: 666, + Float: 7171.17, + Double: 916734926348163.01973408746523, + Bool: true, + }, + } + + buf := &bytes.Buffer{} + enc, _ := ocf.NewEncoder(schema, buf, ocf.WithCodec(ocf.ZStandard)) + + err := enc.Encode(record) + assert.NoError(t, err) + + err = enc.Close() + + require.NoError(t, err) + assert.Equal(t, 951, buf.Len()) +} + func TestEncoder_EncodeError(t *testing.T) { buf := &bytes.Buffer{} enc, err := ocf.NewEncoder(`"long"`, buf) diff --git a/ocf/testdata/full-zstd.avro b/ocf/testdata/full-zstd.avro new file mode 100644 index 0000000000000000000000000000000000000000..4e663f7104a2c1501f3d514d327d046eea943099 GIT binary patch literal 951 zcmb7C%SyvQ6iw4u+X#Z9qHY=n7Yb5swJyYsRs|7M#9i%Tl1w$4nJIaQRl0HOM*M*O zfV)zcet>(og5U?Z_W{l%=`@OhF6P3Wd(Og@=fbV;}xJN0m~60TIKkW1ZT(f3VDoE9{imSp}7NTNu^^ zQ;ZW{oV^*Us@teHK^N25X*pHu{4^y5}UhfouaO~LXK}nV6!M*109NMoiRi5 z3-Nx1JE2>@y|;&4?;pkns*4Q>J?L5&O(p6bce@C#kSGUob!%0d!dNv-V*;7fk825{ zJciII<>E&B$uk5AXfPiujiF$V4Qe7|@XYlfF`;8h?XBeG=IdKFKV@teZ_4j-Q=Yt? z`B0=`MVjc%?DU5t=79l1F@EQKAK# zk&u&0^X>lXvNo}yL99W|W1?%JL2Y$^>}hTEef%-oua*)$(%rLEDBriPUs@N+>_ckc GdxxK$>^2|( literal 0 HcmV?d00001 diff --git a/ocf/testdata/zstd-invalid-data.avro b/ocf/testdata/zstd-invalid-data.avro new file mode 100644 index 0000000000000000000000000000000000000000..be12de5d91811f1b16bf09bacf42abe2c10376d3 GIT binary patch literal 952 zcmb7C&r9P#6rQHpZW}>R+_HKx3?5vNYOD1idvUA6BIqLCY7diSs?p3$$&X@`UcBx_ z`~!LyPfGW=e}MPB3W9%tcXh#;B%Mat1wG7=_ultS-uu3{Y3ygR4?{)Nc3H_Y_b@@d zp<_r9!EgyOcek{Mh=EE`eJyaDEo?H^f-}$$TLR+t*qfLF_JW6LKvF;a_Z&^eBnBo7FC+$YME`J^7r*gS<7Wjn z(rppcl1w>HoNOZ}z+xJk`Y&3dTs34kSQ6_{qT`9*aBZyU%ufaaW zvyZqT<;8Tr(w*3?|Gl@3J^vlXMyksVh&>osmrNz=eRl^4p^z*`at&%thoMw84pRbI zEJ$k!lQ@Mi7!%?q`za!fDC}?_8=bLek4H)VV*ok=Gq+&&WGA3IG5A literal 0 HcmV?d00001