From b4fe207b701605cc423ae1074f8dd6892f37e9b2 Mon Sep 17 00:00:00 2001 From: Mike Cheng Date: Fri, 27 Sep 2024 12:51:10 -0400 Subject: [PATCH] feat: allow passing in custom options for OCF decoder. (#460) --- ocf/ocf.go | 27 ++++++++++++++++++--- ocf/ocf_test.go | 63 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 3 deletions(-) diff --git a/ocf/ocf.go b/ocf/ocf.go index 137cc03..c446cef 100644 --- a/ocf/ocf.go +++ b/ocf/ocf.go @@ -24,7 +24,7 @@ var magicBytes = [4]byte{'O', 'b', 'j', 1} // HeaderSchema is the Avro schema of a container file header. var HeaderSchema = avro.MustParse(`{ - "type": "record", + "type": "record", "name": "org.apache.avro.file.Header", "fields": [ {"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}}, @@ -40,6 +40,20 @@ type Header struct { Sync [16]byte `avro:"sync"` } +type decoderConfig struct { + DecoderConfig avro.API +} + +// DecoderFunc represents a configuration function for Decoder. +type DecoderFunc func(cfg *decoderConfig) + +// WithDecoderConfig sets the value decoder config on the OCF decoder. +func WithDecoderConfig(wCfg avro.API) DecoderFunc { + return func(cfg *decoderConfig) { + cfg.DecoderConfig = wCfg + } +} + // Decoder reads and decodes Avro values from a container file. type Decoder struct { reader *avro.Reader @@ -54,7 +68,14 @@ type Decoder struct { } // NewDecoder returns a new decoder that reads from reader r. -func NewDecoder(r io.Reader) (*Decoder, error) { +func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) { + cfg := decoderConfig{ + DecoderConfig: avro.DefaultConfig, + } + for _, opt := range opts { + opt(&cfg) + } + reader := avro.NewReader(r, 1024) h, err := readHeader(reader) @@ -67,7 +88,7 @@ func NewDecoder(r io.Reader) (*Decoder, error) { return &Decoder{ reader: reader, resetReader: decReader, - decoder: avro.NewDecoderForSchema(h.Schema, decReader), + decoder: cfg.DecoderConfig.NewDecoder(h.Schema, decReader), meta: h.Meta, sync: h.Sync, codec: h.Codec, diff --git a/ocf/ocf_test.go b/ocf/ocf_test.go index 5eee2bb..f5405c0 100644 --- a/ocf/ocf_test.go +++ b/ocf/ocf_test.go @@ -7,6 +7,7 @@ import ( "flag" "io" "os" + "strings" "testing" "github.com/hamba/avro/v2" @@ -453,6 +454,68 @@ func TestDecoder_InvalidBlock(t *testing.T) { assert.Error(t, dec.Error()) } +func TestDecoder_WithConfig(t *testing.T) { + const defaultMax = 1_048_576 + + unionStr := "union value" + longString := strings.Repeat("a", defaultMax+1) + record := FullRecord{ + Strings: []string{"string1", "string2", "string3", "string4", "string5"}, + Longs: []int64{1, 2, 3, 4, 5}, + Enum: "C", + Map: map[string]int{ + "key1": 1, + "key2": 2, + "key3": 3, + "key4": 4, + "key5": 5, + }, + Nullable: &unionStr, + Fixed: [16]byte{0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04}, + Record: &TestRecord{ + Long: 1925639126735, + String: longString, + Int: 666, + Float: 7171.17, + Double: 916734926348163.01973408746523, + Bool: true, + }, + } + + buf := &bytes.Buffer{} + enc, err := ocf.NewEncoder(schema, buf) + require.NoError(t, err) + + err = enc.Encode(record) + require.NoError(t, err) + + err = enc.Close() + require.NoError(t, err) + + t.Run("Default Fails", func(t *testing.T) { + dec, err := ocf.NewDecoder(bytes.NewReader(buf.Bytes())) + require.NoError(t, err) + + var got FullRecord + require.True(t, dec.HasNext()) + require.ErrorContains(t, dec.Decode(&got), "size is greater than `Config.MaxByteSliceSize`") + }) + + t.Run("Custom Config Is Used", func(t *testing.T) { + cfg := avro.Config{MaxByteSliceSize: defaultMax + 1}.Freeze() + dec, err := ocf.NewDecoder( + bytes.NewReader(buf.Bytes()), + ocf.WithDecoderConfig(cfg), + ) + require.NoError(t, err) + + var got FullRecord + require.True(t, dec.HasNext()) + require.NoError(t, dec.Decode(&got)) + require.Equal(t, record, got) + }) +} + func TestNewEncoder_InvalidSchema(t *testing.T) { buf := &bytes.Buffer{}