Skip to content

Commit

Permalink
feat: allow passing in custom options for OCF decoder. (#460)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfycheng authored Sep 27, 2024
1 parent 45e7071 commit b4fe207
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 3 deletions.
27 changes: 24 additions & 3 deletions ocf/ocf.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var magicBytes = [4]byte{'O', 'b', 'j', 1}

// HeaderSchema is the Avro schema of a container file header.
var HeaderSchema = avro.MustParse(`{
"type": "record",
"type": "record",
"name": "org.apache.avro.file.Header",
"fields": [
{"name": "magic", "type": {"type": "fixed", "name": "Magic", "size": 4}},
Expand All @@ -40,6 +40,20 @@ type Header struct {
Sync [16]byte `avro:"sync"`
}

type decoderConfig struct {
DecoderConfig avro.API
}

// DecoderFunc represents a configuration function for Decoder.
type DecoderFunc func(cfg *decoderConfig)

// WithDecoderConfig sets the value decoder config on the OCF decoder.
func WithDecoderConfig(wCfg avro.API) DecoderFunc {
return func(cfg *decoderConfig) {
cfg.DecoderConfig = wCfg
}
}

// Decoder reads and decodes Avro values from a container file.
type Decoder struct {
reader *avro.Reader
Expand All @@ -54,7 +68,14 @@ type Decoder struct {
}

// NewDecoder returns a new decoder that reads from reader r.
func NewDecoder(r io.Reader) (*Decoder, error) {
func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) {
cfg := decoderConfig{
DecoderConfig: avro.DefaultConfig,
}
for _, opt := range opts {
opt(&cfg)
}

reader := avro.NewReader(r, 1024)

h, err := readHeader(reader)
Expand All @@ -67,7 +88,7 @@ func NewDecoder(r io.Reader) (*Decoder, error) {
return &Decoder{
reader: reader,
resetReader: decReader,
decoder: avro.NewDecoderForSchema(h.Schema, decReader),
decoder: cfg.DecoderConfig.NewDecoder(h.Schema, decReader),
meta: h.Meta,
sync: h.Sync,
codec: h.Codec,
Expand Down
63 changes: 63 additions & 0 deletions ocf/ocf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"flag"
"io"
"os"
"strings"
"testing"

"github.com/hamba/avro/v2"
Expand Down Expand Up @@ -453,6 +454,68 @@ func TestDecoder_InvalidBlock(t *testing.T) {
assert.Error(t, dec.Error())
}

func TestDecoder_WithConfig(t *testing.T) {
const defaultMax = 1_048_576

unionStr := "union value"
longString := strings.Repeat("a", defaultMax+1)
record := FullRecord{
Strings: []string{"string1", "string2", "string3", "string4", "string5"},
Longs: []int64{1, 2, 3, 4, 5},
Enum: "C",
Map: map[string]int{
"key1": 1,
"key2": 2,
"key3": 3,
"key4": 4,
"key5": 5,
},
Nullable: &unionStr,
Fixed: [16]byte{0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04, 0x01, 0x02, 0x03, 0x04},
Record: &TestRecord{
Long: 1925639126735,
String: longString,
Int: 666,
Float: 7171.17,
Double: 916734926348163.01973408746523,
Bool: true,
},
}

buf := &bytes.Buffer{}
enc, err := ocf.NewEncoder(schema, buf)
require.NoError(t, err)

err = enc.Encode(record)
require.NoError(t, err)

err = enc.Close()
require.NoError(t, err)

t.Run("Default Fails", func(t *testing.T) {
dec, err := ocf.NewDecoder(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)

var got FullRecord
require.True(t, dec.HasNext())
require.ErrorContains(t, dec.Decode(&got), "size is greater than `Config.MaxByteSliceSize`")
})

t.Run("Custom Config Is Used", func(t *testing.T) {
cfg := avro.Config{MaxByteSliceSize: defaultMax + 1}.Freeze()
dec, err := ocf.NewDecoder(
bytes.NewReader(buf.Bytes()),
ocf.WithDecoderConfig(cfg),
)
require.NoError(t, err)

var got FullRecord
require.True(t, dec.HasNext())
require.NoError(t, dec.Decode(&got))
require.Equal(t, record, got)
})
}

func TestNewEncoder_InvalidSchema(t *testing.T) {
buf := &bytes.Buffer{}

Expand Down

0 comments on commit b4fe207

Please sign in to comment.