Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for schema evolution #329

Merged
merged 28 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b9d7442
feat: add support for schema evolution poc
redaLaanait Nov 13, 2023
d68f799
Merge branch 'main' of https://github.com/hamba/avro into feat/schema…
redaLaanait Nov 13, 2023
4ade51d
fix: type promotion POC
redaLaanait Nov 17, 2023
845d478
fix: Fix and include last remarks
redaLaanait Nov 25, 2023
908868d
reduce default decoder verbosity
redaLaanait Nov 25, 2023
3f02ab4
add tests for schema resolution
redaLaanait Nov 30, 2023
2498c29
attempt replacing readNext by native decoders
redaLaanait Nov 30, 2023
9d6c104
fix(schema compatibility): support named schema aliases
redaLaanait Dec 11, 2023
9b51d57
fix(default decoder): try different implementation
redaLaanait Dec 11, 2023
eaef2ad
fix(defaut decoder): cache field encoded default and borrow reader
redaLaanait Dec 12, 2023
b4224dc
fix: resolver unable to resolve nullDefault type
redaLaanait Dec 12, 2023
0b74a61
clean up generic decode and improve test coverage
redaLaanait Dec 15, 2023
0fc43f2
fix decoder cachekey to consider primitives with promotion and fields…
redaLaanait Dec 15, 2023
62b27ce
fix: codec generic
redaLaanait Dec 15, 2023
7db00da
cleanups and fixes
redaLaanait Dec 16, 2023
04e2ddc
Merge branch 'main' of https://github.com/hamba/avro into feat/schema…
redaLaanait Dec 21, 2023
51224e6
fix: fix resolve record
redaLaanait Dec 21, 2023
ade5d38
improve record cache fingerprint
redaLaanait Dec 21, 2023
5adccb8
Merge branch 'main' of https://github.com/hamba/avro into feat/schema…
redaLaanait Dec 21, 2023
e364790
fix(default encoder): better handling of nullDefault
redaLaanait Jan 9, 2024
bc0e276
fix: record cache fingerprint
redaLaanait Jan 9, 2024
2ec1b60
rename FieldDrain by FieldIgnore
redaLaanait Jan 9, 2024
17926ac
fix: record cache fingerprint
redaLaanait Jan 10, 2024
1dc0276
fix: record cache fingerprint
redaLaanait Jan 10, 2024
85f8d7c
clean up
redaLaanait Jan 10, 2024
b96aefc
clean up
redaLaanait Jan 10, 2024
f2c19a2
fix: codec default reader/writer usage
redaLaanait Jan 12, 2024
35a64d9
fix: bytes to string converter
redaLaanait Jan 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
522 changes: 522 additions & 0 deletions codec_default.go

Large diffs are not rendered by default.

693 changes: 693 additions & 0 deletions codec_default_internal_test.go

Large diffs are not rendered by default.

129 changes: 127 additions & 2 deletions codec_dynamic.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package avro

import (
"fmt"
"math/big"
"reflect"
"time"
"unsafe"

"github.com/modern-go/reflect2"
Expand All @@ -15,13 +18,27 @@ func (d *efaceDecoder) Decode(ptr unsafe.Pointer, r *Reader) {
pObj := (*any)(ptr)
obj := *pObj
if obj == nil {
*pObj = r.ReadNext(d.schema)
rPtr, rtyp, err := dynamicReceiver(d.schema, r.cfg.resolver)
if err != nil {
r.ReportError("Read", err.Error())
return
}
decoderOfType(r.cfg, d.schema, rtyp).Decode(rPtr, r)
*pObj = rtyp.UnsafeIndirect(rPtr)
// *pObj = r.ReadNext(d.schema)
return
}

typ := reflect2.TypeOf(obj)
if typ.Kind() != reflect.Ptr {
*pObj = r.ReadNext(d.schema)
rPtr, rTyp, err := dynamicReceiver(d.schema, r.cfg.resolver)
if err != nil {
r.ReportError("Read", err.Error())
return
}
decoderOfType(r.cfg, d.schema, rTyp).Decode(rPtr, r)
*pObj = rTyp.UnsafeIndirect(rPtr)
// *pObj = r.ReadNext(d.schema)
return
}

Expand All @@ -45,3 +62,111 @@ func (e *interfaceEncoder) Encode(ptr unsafe.Pointer, w *Writer) {
obj := e.typ.UnsafeIndirect(ptr)
w.WriteVal(e.schema, obj)
}

func dynamicReceiver(schema Schema, resolver *TypeResolver) (unsafe.Pointer, reflect2.Type, error) {
var ls LogicalSchema
lts, ok := schema.(LogicalTypeSchema)
if ok {
ls = lts.Logical()
}

name := string(schema.Type())
if ls != nil {
name += "." + string(ls.Type())
}
if resolver != nil {
typ, err := resolver.Type(name)
if err == nil {
return typ.UnsafeNew(), typ, nil
}
}

switch schema.Type() {
case Boolean:
var v bool
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case Int:
if ls != nil {
switch ls.Type() {
case Date:
var v time.Time
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil

case TimeMillis:
var v time.Duration
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
}
}
var v int
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case Long:
if ls != nil {
switch ls.Type() {
case TimeMicros:
var v time.Duration
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil

case TimestampMillis:
var v time.Time
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil

case TimestampMicros:
var v time.Time
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
}
}
var v int64
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case Float:
var v float32
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case Double:
var v float64
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case String:
var v string
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case Bytes:
if ls != nil && ls.Type() == Decimal {
var v *big.Rat
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
}
var v []byte
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case Record:
var v map[string]any
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case Ref:
return dynamicReceiver(schema.(*RefSchema).Schema(), resolver)
case Enum:
var v string
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case Array:
v := make([]any, 0)
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case Map:
var v map[string]any
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case Union:
var v map[string]any
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case Fixed:
fixed := schema.(*FixedSchema)
ls := fixed.Logical()
if ls != nil {
switch ls.Type() {
case Duration:
var v LogicalDuration
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case Decimal:
var v big.Rat
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
}
}
// note that uint64 case is not supported, due to the lack of indicator at the schema-level (logical type)
var v []byte
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
default:
return nil, nil, fmt.Errorf("dynamic receiver not found for schema: %v", name)
}
}
Loading
Loading