Skip to content

Commit

Permalink
cleanups and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
redaLaanait committed Dec 17, 2023
1 parent 62b27ce commit 7db00da
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 41 deletions.
40 changes: 39 additions & 1 deletion codec_default_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,45 @@ func TestDecoder_InvalidDefault(t *testing.T) {
require.Error(t, err)
}

func TestDecoder_DrainField(t *testing.T) {
defer ConfigTeardown()

// write schema
// `{
// // "type": "record",
// // "name": "test",
// // "fields" : [
// // {"name": "a", "type": "string"}
// // ]
// // }`

// {"a": "foo"}
data := []byte{0x6, 0x66, 0x6f, 0x6f}

schema := MustParse(`{
"type": "record",
"name": "test",
"fields" : [
{"name": "a", "type": "string"},
{"name": "b", "type": "float", "default": 10.45}
]
}`)

schema.(*RecordSchema).Fields()[0].action = FieldDrain
schema.(*RecordSchema).Fields()[1].action = FieldSetDefault

type TestRecord struct {
A string `avro:"a"`
B float32 `avro:"b"`
}

var got TestRecord
err := NewDecoderForSchema(schema, bytes.NewReader(data)).Decode(&got)

require.NoError(t, err)
assert.Equal(t, TestRecord{B: 10.45, A: ""}, got)
}

func TestDecoder_DefaultBool(t *testing.T) {
defer ConfigTeardown()

Expand Down Expand Up @@ -714,5 +753,4 @@ func TestDecoder_DefaultFixed(t *testing.T) {
assert.Equal(t, big.NewRat(1734, 5), &got.B)
assert.Equal(t, "foo", got.A)
})

}
25 changes: 15 additions & 10 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,11 +521,11 @@ func (s *PrimitiveSchema) FingerprintUsing(typ FingerprintType) ([]byte, error)

// CacheFingerprint returns a special fingerprint of the schema for caching purposes.
func (s *PrimitiveSchema) CacheFingerprint() [32]byte {
data := []any{s.Fingerprint()}
if s.actual != "" {
data = append(data, s.actual)
if s.actual == "" {
return s.Fingerprint()
}
return s.cacheFingerprinter.fingerprint(data)

return s.cacheFingerprinter.fingerprint([]any{s.Fingerprint(), s.actual})
}

// RecordSchema is an Avro record type schema.
Expand Down Expand Up @@ -654,12 +654,17 @@ func (s *RecordSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) {

// CacheFingerprint returns a special fingerprint of the schema for caching purposes.
func (s *RecordSchema) CacheFingerprint() [32]byte {
data := []any{s.Fingerprint()}
data := make([]any, 0)
for _, field := range s.fields {
if field.Default() != nil {
data = append(data, field.Default())
}
}
if len(data) == 0 {
return s.Fingerprint()
}

data = append(data, s.Fingerprint())
return s.cacheFingerprinter.fingerprint(data)
}

Expand All @@ -679,7 +684,7 @@ type Field struct {
action Action
// encodedDef mainly used when decoding data that lack the field for schema evolution purposes.
// Its value remains empty unless the field's encodeDefault function is called.
encodedDef []byte
encodedDef atomic.Value
}

type noDef struct{}
Expand Down Expand Up @@ -764,8 +769,8 @@ func (f *Field) Default() any {
}

func (f *Field) encodeDefault(encode func(any) ([]byte, error)) ([]byte, error) {
if f.encodedDef != nil {
return f.encodedDef, nil
if v := f.encodedDef.Load(); v != nil {
return v.([]byte), nil
}
if !f.hasDef {
return nil, fmt.Errorf("avro: '%s' field must have a non-empty default value", f.name)
Expand All @@ -777,9 +782,9 @@ func (f *Field) encodeDefault(encode func(any) ([]byte, error)) ([]byte, error)
if err != nil {
return nil, err
}
f.encodedDef = b
f.encodedDef.Store(b)

return f.encodedDef, nil
return b, nil
}

// Doc returns the documentation of a field.
Expand Down
23 changes: 8 additions & 15 deletions schema_compatibility.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,6 @@ func (c *SchemaCompatibility) checkSchemaName(reader, writer NamedSchema) error
if c.contains(reader.Aliases(), writer.FullName()) {
return nil
}
// for _, alias := range reader.Aliases() {
// if alias == writer.FullName() {
// return nil
// }
// }
return fmt.Errorf("reader schema %s and writer schema %s names do not match", reader.FullName(), writer.FullName())
}

Expand Down Expand Up @@ -248,9 +243,6 @@ type getFieldOptions struct {
func (c *SchemaCompatibility) getField(a []*Field, f *Field, optFns ...func(*getFieldOptions)) (*Field, bool) {
opt := getFieldOptions{}
for _, fn := range optFns {
if fn == nil {
continue
}
fn(&opt)
}
for _, field := range a {
Expand All @@ -277,21 +269,22 @@ func (c *SchemaCompatibility) getField(a []*Field, f *Field, optFns ...func(*get
//
// It fails if the writer and reader schemas are not compatible.
func (c *SchemaCompatibility) Resolve(reader, writer Schema) (Schema, error) {
if reader.Type() == Ref {
reader = reader.(*RefSchema).Schema()
}
if writer.Type() == Ref {
writer = writer.(*RefSchema).Schema()
}

if err := c.compatible(reader, writer); err != nil {
return nil, err
}

return c.resolve(reader, writer)
}

// resolve requires the reader's schema to be already compatible with the writer's.
func (c *SchemaCompatibility) resolve(reader, writer Schema) (Schema, error) {
if reader.Type() == Ref {
reader = reader.(*RefSchema).Schema()
}
if writer.Type() == Ref {
writer = writer.(*RefSchema).Schema()
}

if writer.Type() != reader.Type() {
if reader.Type() == Union {
for _, schema := range reader.(*UnionSchema).Types() {
Expand Down
112 changes: 97 additions & 15 deletions schema_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package avro_test
import (
"math/big"
"testing"
"time"

"github.com/hamba/avro/v2"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -292,6 +293,27 @@ func TestSchemaCompatibility_Resolve(t *testing.T) {
value: 10,
want: int64(10),
},
{
name: "Int Promote Long Time millis",
reader: `{"type":"long","logicalType":"timestamp-millis"}`,
writer: `"int"`,
value: 5000,
want: time.UnixMilli(5000).UTC(),
},
{
name: "Int Promote Long Time micros",
reader: `{"type":"long","logicalType":"timestamp-micros"}`,
writer: `"int"`,
value: 5000,
want: time.UnixMicro(5000).UTC(),
},
{
name: "Int Promote Long Time micros",
reader: `{"type":"long","logicalType":"time-micros"}`,
writer: `"int"`,
value: 5000,
want: 5000 * time.Microsecond,
},
{
name: "Int Promote Float",
reader: `"float"`,
Expand Down Expand Up @@ -334,6 +356,16 @@ func TestSchemaCompatibility_Resolve(t *testing.T) {
value: "foo",
want: []byte("foo"),
},
{
// I'm not sure about this edge cases;
// I took the reverse path and tried to find a Decimal that can be encoded to
// a binary that is a valid UTF-8 sequence.
name: "String Promote Bytes With Logical Decimal",
reader: `{"type":"bytes","logicalType":"decimal","precision":4,"scale":2}`,
writer: `"string"`,
value: "d",
want: big.NewRat(1, 1),
},
{
name: "Bytes Promote String",
reader: `"string"`,
Expand Down Expand Up @@ -671,35 +703,49 @@ func TestSchemaCompatibility_Resolve(t *testing.T) {
"type": "record",
"name": "parent",
"namespace": "org.hamba.avro",
"fields": [{
"name": "a",
"type": "int"
},
"fields": [
{
"name": "b",
"name": "a",
"type": {
"type": "record",
"name": "test",
"name": "embed",
"namespace": "org.hamba.avro",
"fields": [{
"name": "a",
"type": "long"
}]
},
"default": {"a": 10}
}
},
{
"name": "c",
"type": "test",
"name": "b",
"type": "embed",
"default": {"a": 20}
}
]
}`,
writer: `{"type":"record", "name":"parent", "namespace": "org.hamba.avro", "fields":[{"name": "a", "type": "int"}]}`,
value: map[string]any{"a": 10},
writer: `{
"type": "record",
"name": "parent",
"namespace": "org.hamba.avro",
"fields": [
{
"name": "a",
"type": {
"type": "record",
"name": "embed",
"namespace": "org.hamba.avro",
"fields": [{
"name": "a",
"type": "long"
}]
}
}
]
}`,
value: map[string]any{"a": map[string]any{"a": int64(10)}},
want: map[string]any{
"a": 10,
"b": map[string]any{"a": int64(10)},
"c": map[string]any{"a": int64(20)},
"a": map[string]any{"a": int64(10)},
"b": map[string]any{"a": int64(20)},
},
},
}
Expand All @@ -725,3 +771,39 @@ func TestSchemaCompatibility_Resolve(t *testing.T) {
})
}
}

func TestSchemaCompatibility_ResolveWithRefs(t *testing.T) {
sch1 := avro.MustParse(`{
"type": "record",
"name": "test",
"fields" : [
{"name": "a", "type": "string"}
]
}`)
sch2 := avro.MustParse(`{
"type": "record",
"name": "test",
"fields" : [
{"name": "a", "type": "bytes"}
]
}`)

r := avro.NewRefSchema(sch1.(*avro.RecordSchema))
w := avro.NewRefSchema(sch2.(*avro.RecordSchema))

sc := avro.NewSchemaCompatibility()

value := map[string]any{"a": []byte("foo")}
b, err := avro.Marshal(w, value)
assert.NoError(t, err)

sch, err := sc.Resolve(r, w)
assert.NoError(t, err)

var result any
err = avro.Unmarshal(sch, b, &result)
assert.NoError(t, err)

want := map[string]any{"a": "foo"}
assert.Equal(t, want, result)
}
Loading

0 comments on commit 7db00da

Please sign in to comment.