Skip to content

Commit

Permalink
feat: improve OCF encoder/decoder handling of dynamic types (#467)
Browse files Browse the repository at this point in the history
  • Loading branch information
jhump authored Oct 11, 2024
1 parent 99d2dbc commit 4966106
Show file tree
Hide file tree
Showing 3 changed files with 318 additions and 18 deletions.
113 changes: 95 additions & 18 deletions ocf/ocf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package ocf
import (
"bytes"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -20,10 +21,11 @@ const (
codecKey = "avro.codec"
)

var magicBytes = [4]byte{'O', 'b', 'j', 1}
var (
magicBytes = [4]byte{'O', 'b', 'j', 1}

// HeaderSchema is the Avro schema of a container file header.
var HeaderSchema = avro.MustParse(`{
// HeaderSchema is the Avro schema of a container file header.
HeaderSchema = avro.MustParse(`{
"type": "record",
"name": "org.apache.avro.file.Header",
"fields": [
Expand All @@ -33,6 +35,15 @@ var HeaderSchema = avro.MustParse(`{
]
}`)

// DefaultSchemaMarshaler calls the schema's String() method, to produce
// a "canonical" schema.
DefaultSchemaMarshaler = defaultMarshalSchema
// FullSchemaMarshaler calls the schema's MarshalJSON() method, to produce
// a schema with all details preserved. The "canonical" schema returned by
// the default marshaler does not preserve a type's extra properties.
FullSchemaMarshaler = fullMarshalSchema
)

// Header represents an Avro container file header.
type Header struct {
Magic [4]byte `avro:"magic"`
Expand All @@ -42,6 +53,7 @@ type Header struct {

type decoderConfig struct {
DecoderConfig avro.API
SchemaCache *avro.SchemaCache
}

// DecoderFunc represents a configuration function for Decoder.
Expand All @@ -54,13 +66,22 @@ func WithDecoderConfig(wCfg avro.API) DecoderFunc {
}
}

// WithDecoderSchemaCache sets the schema cache for the decoder.
// If not specified, defaults to avro.DefaultSchemaCache.
func WithDecoderSchemaCache(cache *avro.SchemaCache) DecoderFunc {
return func(cfg *decoderConfig) {
cfg.SchemaCache = cache
}
}

// Decoder reads and decodes Avro values from a container file.
type Decoder struct {
reader *avro.Reader
resetReader *bytesx.ResetReader
decoder *avro.Decoder
meta map[string][]byte
sync [16]byte
schema avro.Schema

codec Codec

Expand All @@ -71,14 +92,15 @@ type Decoder struct {
func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) {
cfg := decoderConfig{
DecoderConfig: avro.DefaultConfig,
SchemaCache: avro.DefaultSchemaCache,
}
for _, opt := range opts {
opt(&cfg)
}

reader := avro.NewReader(r, 1024)

h, err := readHeader(reader)
h, err := readHeader(reader, cfg.SchemaCache)
if err != nil {
return nil, fmt.Errorf("decoder: %w", err)
}
Expand All @@ -92,6 +114,7 @@ func NewDecoder(r io.Reader, opts ...DecoderFunc) (*Decoder, error) {
meta: h.Meta,
sync: h.Sync,
codec: h.Codec,
schema: h.Schema,
}, nil
}

Expand All @@ -100,6 +123,12 @@ func (d *Decoder) Metadata() map[string][]byte {
return d.meta
}

// Schema returns the schema that was parsed from the file's metadata
// and that is used to interpret the file's contents.
func (d *Decoder) Schema() avro.Schema {
return d.schema
}

// HasNext determines if there is another value to read.
func (d *Decoder) HasNext() bool {
if d.count <= 0 {
Expand Down Expand Up @@ -174,6 +203,8 @@ type encoderConfig struct {
Metadata map[string][]byte
Sync [16]byte
EncodingConfig avro.API
SchemaCache *avro.SchemaCache
SchemaMarshaler func(avro.Schema) ([]byte, error)
}

// EncoderFunc represents a configuration function for Encoder.
Expand Down Expand Up @@ -209,6 +240,22 @@ func WithMetadata(meta map[string][]byte) EncoderFunc {
}
}

// WithEncoderSchemaCache sets the schema cache for the encoder.
// If not specified, defaults to avro.DefaultSchemaCache.
func WithEncoderSchemaCache(cache *avro.SchemaCache) EncoderFunc {
return func(cfg *encoderConfig) {
cfg.SchemaCache = cache
}
}

// WithSchemaMarshaler sets the schema marshaler for the encoder.
// If not specified, defaults to DefaultSchemaMarshaler.
func WithSchemaMarshaler(m func(avro.Schema) ([]byte, error)) EncoderFunc {
return func(cfg *encoderConfig) {
cfg.SchemaMarshaler = m
}
}

// WithSyncBlock sets the sync block.
func WithSyncBlock(sync [16]byte) EncoderFunc {
return func(cfg *encoderConfig) {
Expand Down Expand Up @@ -241,17 +288,23 @@ type Encoder struct {
// If the writer is an existing ocf file, it will append data using the
// existing schema.
func NewEncoder(s string, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
cfg := encoderConfig{
BlockLength: 100,
CodecName: Null,
CodecCompression: -1,
Metadata: map[string][]byte{},
EncodingConfig: avro.DefaultConfig,
}
for _, opt := range opts {
opt(&cfg)
cfg := computeEncoderConfig(opts)
schema, err := avro.ParseWithCache(s, "", cfg.SchemaCache)
if err != nil {
return nil, err
}
return newEncoder(schema, w, cfg)
}

// NewEncoderWithSchema returns a new encoder that writes to w using schema s.
//
// If the writer is an existing ocf file, it will append data using the
// existing schema.
func NewEncoderWithSchema(schema avro.Schema, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
return newEncoder(schema, w, computeEncoderConfig(opts))
}

func newEncoder(schema avro.Schema, w io.Writer, cfg encoderConfig) (*Encoder, error) {
switch file := w.(type) {
case nil:
return nil, errors.New("writer cannot be nil")
Expand All @@ -263,7 +316,7 @@ func NewEncoder(s string, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {

if info.Size() > 0 {
reader := avro.NewReader(file, 1024)
h, err := readHeader(reader)
h, err := readHeader(reader, cfg.SchemaCache)
if err != nil {
return nil, err
}
Expand All @@ -285,12 +338,12 @@ func NewEncoder(s string, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
}
}

schema, err := avro.Parse(s)
schemaJSON, err := cfg.SchemaMarshaler(schema)
if err != nil {
return nil, err
}

cfg.Metadata[schemaKey] = []byte(schema.String())
cfg.Metadata[schemaKey] = schemaJSON
cfg.Metadata[codecKey] = []byte(cfg.CodecName)
header := Header{
Magic: magicBytes,
Expand Down Expand Up @@ -324,6 +377,22 @@ func NewEncoder(s string, w io.Writer, opts ...EncoderFunc) (*Encoder, error) {
return e, nil
}

func computeEncoderConfig(opts []EncoderFunc) encoderConfig {
cfg := encoderConfig{
BlockLength: 100,
CodecName: Null,
CodecCompression: -1,
Metadata: map[string][]byte{},
EncodingConfig: avro.DefaultConfig,
SchemaCache: avro.DefaultSchemaCache,
SchemaMarshaler: DefaultSchemaMarshaler,
}
for _, opt := range opts {
opt(&cfg)
}
return cfg
}

// Write v to the internal buffer. This method skips the internal encoder and
// therefore the caller is responsible for encoding the bytes. No error will be
// thrown if the bytes does not conform to the schema given to NewEncoder, but
Expand Down Expand Up @@ -400,7 +469,7 @@ type ocfHeader struct {
Sync [16]byte
}

func readHeader(reader *avro.Reader) (*ocfHeader, error) {
func readHeader(reader *avro.Reader, schemaCache *avro.SchemaCache) (*ocfHeader, error) {
var h Header
reader.ReadVal(HeaderSchema, &h)
if reader.Error != nil {
Expand All @@ -410,7 +479,7 @@ func readHeader(reader *avro.Reader) (*ocfHeader, error) {
if h.Magic != magicBytes {
return nil, errors.New("invalid avro file")
}
schema, err := avro.Parse(string(h.Meta[schemaKey]))
schema, err := avro.ParseBytesWithCache(h.Meta[schemaKey], "", schemaCache)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -447,3 +516,11 @@ func skipToEnd(reader *avro.Reader, sync [16]byte) error {
}
}
}

func defaultMarshalSchema(schema avro.Schema) ([]byte, error) {
return []byte(schema.String()), nil
}

func fullMarshalSchema(schema avro.Schema) ([]byte, error) {
return json.Marshal(schema)
}
Loading

0 comments on commit 4966106

Please sign in to comment.