Skip to content

Commit

Permalink
fix: schema compatibility fingerprint (#354)
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma authored Feb 7, 2024
1 parent c51e954 commit 0673db6
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 269 deletions.
8 changes: 3 additions & 5 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ type ValEncoder interface {

// ReadVal parses Avro value and stores the result in the value pointed to by obj.
func (r *Reader) ReadVal(schema Schema, obj any) {
key := cacheFingerprintOf(schema)
decoder := r.cfg.getDecoderFromCache(key, reflect2.RTypeOf(obj))
decoder := r.cfg.getDecoderFromCache(schema.CacheFingerprint(), reflect2.RTypeOf(obj))
if decoder == nil {
typ := reflect2.TypeOf(obj)
if typ.Kind() != reflect.Ptr {
Expand Down Expand Up @@ -66,15 +65,14 @@ func (w *Writer) WriteVal(schema Schema, val any) {

func (c *frozenConfig) DecoderOf(schema Schema, typ reflect2.Type) ValDecoder {
rtype := typ.RType()
key := cacheFingerprintOf(schema)
decoder := c.getDecoderFromCache(key, rtype)
decoder := c.getDecoderFromCache(schema.CacheFingerprint(), rtype)
if decoder != nil {
return decoder
}

ptrType := typ.(*reflect2.UnsafePtrType)
decoder = decoderOfType(c, schema, ptrType.Elem())
c.addDecoderToCache(key, rtype, decoder)
c.addDecoderToCache(schema.CacheFingerprint(), rtype, decoder)
return decoder
}

Expand Down
28 changes: 14 additions & 14 deletions codec_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

//nolint:maintidx // Splitting this would not make it simpler.
func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecoder {
isConv := schema.actual != ""
resolved := schema.encodedType != ""
switch typ.Kind() {
case reflect.Bool:
if schema.Type() != Boolean {
Expand Down Expand Up @@ -60,8 +60,8 @@ func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecode
if schema.Type() != Long {
break
}
if isConv {
return &longConvCodec[uint32]{convert: createLongConverter(schema.actual)}
if resolved {
return &longConvCodec[uint32]{convert: createLongConverter(schema.encodedType)}
}
return &longCodec[uint32]{}

Expand All @@ -74,12 +74,12 @@ func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecode

case st == Long && lt == TimeMicros: // time.Duration
return &timeMicrosCodec{
convert: createLongConverter(schema.actual),
convert: createLongConverter(schema.encodedType),
}

case st == Long:
if isConv {
return &longConvCodec[int64]{convert: createLongConverter(schema.actual)}
if resolved {
return &longConvCodec[int64]{convert: createLongConverter(schema.encodedType)}
}
return &longCodec[int64]{}

Expand All @@ -91,17 +91,17 @@ func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecode
if schema.Type() != Float {
break
}
if isConv {
return &float32ConvCodec{convert: createFloatConverter(schema.actual)}
if resolved {
return &float32ConvCodec{convert: createFloatConverter(schema.encodedType)}
}
return &float32Codec{}

case reflect.Float64:
if schema.Type() != Double {
break
}
if isConv {
return &float64ConvCodec{convert: createDoubleConverter(schema.actual)}
if resolved {
return &float64ConvCodec{convert: createDoubleConverter(schema.encodedType)}
}
return &float64Codec{}

Expand All @@ -127,21 +127,21 @@ func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecode
return &dateCodec{}
case isTime && st == Long && lt == TimestampMillis:
return &timestampMillisCodec{
convert: createLongConverter(schema.actual),
convert: createLongConverter(schema.encodedType),
}
case isTime && st == Long && lt == TimestampMicros:
return &timestampMicrosCodec{
convert: createLongConverter(schema.actual),
convert: createLongConverter(schema.encodedType),
}
case isTime && st == Long && lt == LocalTimestampMillis:
return &timestampMillisCodec{
local: true,
convert: createLongConverter(schema.actual),
convert: createLongConverter(schema.encodedType),
}
case isTime && st == Long && lt == LocalTimestampMicros:
return &timestampMicrosCodec{
local: true,
convert: createLongConverter(schema.actual),
convert: createLongConverter(schema.encodedType),
}
case typ.Type1().ConvertibleTo(ratType) && st == Bytes && lt == Decimal:
dec := ls.(*DecimalLogicalSchema)
Expand Down
48 changes: 16 additions & 32 deletions config_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestConfig_ReusesDecoders(t *testing.T) {
assert.Same(t, dec1, dec2)
}

func TestConfig_ReusesDecoders_WithRecordFieldActions(t *testing.T) {
func TestConfig_ReusesDecoders_WithWriterFingerprint(t *testing.T) {
type testObj struct {
A int64 `avro:"a"`
B string `avro:"b"`
Expand All @@ -59,39 +59,21 @@ func TestConfig_ReusesDecoders_WithRecordFieldActions(t *testing.T) {
}`
typ := reflect2.TypeOfPtr(&testObj{})

t.Run("set default", func(t *testing.T) {
api := Config{
TagKey: "test",
BlockLength: 2,
}.Freeze()
cfg := api.(*frozenConfig)

schema1 := MustParse(sch)
schema2 := MustParse(sch)
schema2.(*RecordSchema).Fields()[1].action = FieldSetDefault

dec1 := cfg.DecoderOf(schema1, typ)
dec2 := cfg.DecoderOf(schema2, typ)

assert.NotSame(t, dec1, dec2)
})

t.Run("ignore", func(t *testing.T) {
api := Config{
TagKey: "test",
BlockLength: 2,
}.Freeze()
cfg := api.(*frozenConfig)
api := Config{
TagKey: "test",
BlockLength: 2,
}.Freeze()
cfg := api.(*frozenConfig)

schema1 := MustParse(sch)
schema1.(*RecordSchema).Fields()[0].action = FieldIgnore
schema2 := MustParse(sch)
schema1 := MustParse(sch)
schema2 := MustParse(sch)
fp := [32]byte{1, 2, 3}
schema2.(*RecordSchema).writerFingerprint = &fp

dec1 := cfg.DecoderOf(schema1, typ)
dec2 := cfg.DecoderOf(schema2, typ)
dec1 := cfg.DecoderOf(schema1, typ)
dec2 := cfg.DecoderOf(schema2, typ)

assert.NotSame(t, dec1, dec2)
})
assert.NotSame(t, dec1, dec2)
}

func TestConfig_ReusesDecoders_WithEnum(t *testing.T) {
Expand All @@ -111,7 +93,9 @@ func TestConfig_ReusesDecoders_WithEnum(t *testing.T) {

schema1 := MustParse(sch)
schema2 := MustParse(sch)
schema2.(*EnumSchema).actual = []string{"foo", "bar"}
schema2.(*EnumSchema).encodedSymbols = []string{"foo", "bar"}
fp := schema1.Fingerprint()
schema2.(*EnumSchema).writerFingerprint = &fp

dec1 := cfg.DecoderOf(schema1, typ)
dec2 := cfg.DecoderOf(schema2, typ)
Expand Down
Loading

0 comments on commit 0673db6

Please sign in to comment.