Skip to content

Commit

Permalink
fix: type promotion POC
Browse files Browse the repository at this point in the history
  • Loading branch information
redaLaanait committed Nov 17, 2023
1 parent d68f799 commit 10b955d
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 21 deletions.
64 changes: 64 additions & 0 deletions codec_promoter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package avro

import (
"reflect"

"github.com/modern-go/reflect2"
)

func getCodecPromoter[T any](actual Type) *codecPromoter[T] {

Check failure on line 9 in codec_promoter.go

View workflow job for this annotation

GitHub Actions / test (1.20)

func `getCodecPromoter` is unused (unused)

Check failure on line 9 in codec_promoter.go

View workflow job for this annotation

GitHub Actions / test (1.21)

func `getCodecPromoter` is unused (unused)
if actual == "" {
return nil
}

return &codecPromoter[T]{actual: actual}
}

type codecPromoter[T any] struct {

Check failure on line 17 in codec_promoter.go

View workflow job for this annotation

GitHub Actions / test (1.20)

type `codecPromoter` is unused (unused)

Check failure on line 17 in codec_promoter.go

View workflow job for this annotation

GitHub Actions / test (1.21)

type `codecPromoter` is unused (unused)
actual Type
}

func (p *codecPromoter[T]) promote(r *Reader) (t T) {

Check failure on line 21 in codec_promoter.go

View workflow job for this annotation

GitHub Actions / test (1.20)

func `(*codecPromoter[T]).promote` is unused (unused)

Check failure on line 21 in codec_promoter.go

View workflow job for this annotation

GitHub Actions / test (1.21)

func `(*codecPromoter[T]).promote` is unused (unused)
tt := reflect2.TypeOf(t)

convert := func(typ reflect2.Type, obj any) (t T) {
if !reflect.TypeOf(obj).ConvertibleTo(typ.Type1()) {
r.ReportError("decode promotable", "unsupported type")
// return zero value
return t
}
return reflect.ValueOf(obj).Convert(typ.Type1()).Interface().(T)
}

switch p.actual {
case Int:
var obj int32
(&intCodec[int32]{}).Decode(reflect2.PtrOf(&obj), r)
t = convert(tt, obj)

case Long:
var obj int64
(&longCodec[int64]{}).Decode(reflect2.PtrOf(&obj), r)
t = convert(tt, obj)

case Float:
var obj float32
(&float32Codec{}).Decode(reflect2.PtrOf(&obj), r)
t = convert(tt, obj)

case String:
var obj string
(&stringCodec{}).Decode(reflect2.PtrOf(&obj), r)
t = convert(tt, obj)

case Bytes:
var obj []byte
(&bytesCodec{}).Decode(reflect2.PtrOf(&obj), r)
t = convert(tt, obj)

default:
r.ReportError("decode promotable", "unsupported actual type")
}

return t
}
102 changes: 102 additions & 0 deletions reader_promoter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package avro

import (
"reflect"
)

type ReaderPromoter interface {

Check warning on line 7 in reader_promoter.go

View workflow job for this annotation

GitHub Actions / test (1.20)

exported: exported type ReaderPromoter should have comment or be unexported (revive)

Check warning on line 7 in reader_promoter.go

View workflow job for this annotation

GitHub Actions / test (1.21)

exported: exported type ReaderPromoter should have comment or be unexported (revive)
ReadLong() int64
ReadFloat() float32
ReadDouble() float64
ReadString() string
ReadBytes() []byte
}

type readerPromoter struct {
actual, current Type
r *Reader
}

var _ ReaderPromoter = &readerPromoter{}

var promotedInvalid = struct{}{}

func (p *readerPromoter) readActual() any {
switch p.actual {
case Int:
return p.r.ReadInt()

case Long:
return p.r.ReadLong()

case Float:
return p.r.ReadFloat()

case String:
return p.r.ReadString()

case Bytes:
return p.r.ReadBytes()

default:
p.r.ReportError("decode promotable", "unsupported actual type")
return promotedInvalid
}
}

func (p *readerPromoter) ReadLong() int64 {
if v := p.readActual(); v != promotedInvalid {
return p.promote(v, p.current).(int64)
}

return 0
}

func (p *readerPromoter) ReadFloat() float32 {
if v := p.readActual(); v != promotedInvalid {
return p.promote(v, p.current).(float32)
}

return 0
}

func (p *readerPromoter) ReadDouble() float64 {
if v := p.readActual(); v != promotedInvalid {
return p.promote(v, p.current).(float64)
}

return 0
}

func (p *readerPromoter) ReadString() string {
if v := p.readActual(); v != promotedInvalid {
return p.promote(v, p.current).(string)
}

return ""
}

func (p *readerPromoter) ReadBytes() []byte {
if v := p.readActual(); v != promotedInvalid {
return p.promote(v, p.current).([]byte)
}

return nil
}

func (p *readerPromoter) promote(obj any, st Type) (t any) {
switch st {
case Long:
return int64(reflect.ValueOf(obj).Int())

Check failure on line 90 in reader_promoter.go

View workflow job for this annotation

GitHub Actions / test (1.20)

unnecessary conversion (unconvert)

Check failure on line 90 in reader_promoter.go

View workflow job for this annotation

GitHub Actions / test (1.21)

unnecessary conversion (unconvert)
case Float:
return float32(reflect.ValueOf(obj).Int())
case Double:
return float64(reflect.ValueOf(obj).Float())

Check failure on line 94 in reader_promoter.go

View workflow job for this annotation

GitHub Actions / test (1.20)

unnecessary conversion (unconvert)

Check failure on line 94 in reader_promoter.go

View workflow job for this annotation

GitHub Actions / test (1.21)

unnecessary conversion (unconvert)
case String:
return string(reflect.ValueOf(obj).Bytes())
case Bytes:
return []byte(reflect.ValueOf(obj).String())
}

return obj
}
5 changes: 5 additions & 0 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,11 @@ type PrimitiveSchema struct {

typ Type
logical LogicalSchema

// actual presents the actual type of the encoded value
// which can be promoted to schema current type.
// This field is only used in the context of write read schema resolution.
actual Type
}

// NewPrimitiveSchema creates a new PrimitiveSchema.
Expand Down
6 changes: 5 additions & 1 deletion schema_compatibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,11 @@ func (c *SchemaCompatibility) Resolve(reader, writer Schema) (Schema, error) {

if writer.Type() != reader.Type() {
if isPromotable(writer.Type()) {
return reader, nil
// TODO clean up

Check failure on line 305 in schema_compatibility.go

View workflow job for this annotation

GitHub Actions / test (1.20)

schema_compatibility.go:305: Line contains TODO/BUG/FIXME: "TODO clean up" (godox)

Check failure on line 305 in schema_compatibility.go

View workflow job for this annotation

GitHub Actions / test (1.21)

schema_compatibility.go:305: Line contains TODO/BUG/FIXME: "TODO clean up" (godox)
r := *reader.(*PrimitiveSchema)

Check failure on line 306 in schema_compatibility.go

View workflow job for this annotation

GitHub Actions / test (1.20)

copylocks: assignment copies lock value to r: github.com/hamba/avro/v2.PrimitiveSchema contains github.com/hamba/avro/v2.fingerprinter contains sync.Map contains sync.Mutex (govet)

Check failure on line 306 in schema_compatibility.go

View workflow job for this annotation

GitHub Actions / test (1.21)

copylocks: assignment copies lock value to r: github.com/hamba/avro/v2.PrimitiveSchema contains github.com/hamba/avro/v2.fingerprinter contains sync.Map contains sync.Mutex (govet)
r.actual = writer.Type()

return &r, nil
}

if reader.Type() == Union {
Expand Down
56 changes: 36 additions & 20 deletions schema_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package avro_test
import (
"log"
"testing"
"time"

"github.com/hamba/avro/v2"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -287,17 +288,23 @@ func TestSchemaCompatibility_Resolve(t *testing.T) {
},{
"name": "a",
"type": "int"
},
{
"name": "k",
"type": "string"
}]
}`)

type A1 struct {
A int32 `avro:"a"`
C int32 `avro:"c"`
A int32 `avro:"a"`
C int32 `avro:"c"`
K string `avro:"k"`
}

a1 := A1{
A: 10,
C: 1000000,
K: "K value",
}

b, err := avro.Marshal(sch1, a1)
Expand All @@ -309,21 +316,29 @@ func TestSchemaCompatibility_Resolve(t *testing.T) {
"name": "A",
"type": "record",
"fields": [
{
"name": "b",
"type": "string",
"default": "boo"
},{
"name": "aa",
"aliases": ["a"],
"type": "long"
},{
"name": "d",
"type": {
"type": "array", "items": "int"
{
"name": "k",
"type": "bytes"
},
"default":[1, 2, 3, 4]
}]
{
"name": "b",
"type": "string",
"default": "boo"
},{
"name": "aa",
"aliases": ["a"],
"type": {
"type": "long",
"logicalType":"time-micros"
}
},{
"name": "d",
"type": {
"type": "array", "items": "int"
},
"default":[1, 2, 3, 4]
}
]
}`)

sc := avro.NewSchemaCompatibility()
Expand All @@ -335,9 +350,10 @@ func TestSchemaCompatibility_Resolve(t *testing.T) {
}

type A2 struct {
A int64 `avro:"aa"`
B string `avro:"b"`
D []int32 `avro:"d"`
A time.Duration `avro:"aa"`
B string `avro:"b"`
D []int32 `avro:"d"`
K []byte `avro:"k"`
}

a2 := A2{}
Expand All @@ -347,5 +363,5 @@ func TestSchemaCompatibility_Resolve(t *testing.T) {
t.Fatalf("unmarshal error %v", err)
}

log.Printf("result: %+v", a2)
log.Printf("result: %+v %+v %T %+v", a2, a2.A, a2.A, string(a2.K))
}

0 comments on commit 10b955d

Please sign in to comment.