From 0673db67a378fa6aa3518051df611d05aef37c20 Mon Sep 17 00:00:00 2001 From: Nicholas Wiersma Date: Wed, 7 Feb 2024 19:04:45 +0200 Subject: [PATCH] fix: schema compatibility fingerprint (#354) --- codec.go | 8 +- codec_native.go | 28 ++--- config_internal_test.go | 48 +++----- schema.go | 233 +++++++++++++++++++++-------------- schema_compatibility.go | 124 +++++++++++++------ schema_compatibility_test.go | 4 +- schema_internal_test.go | 133 ++++++++------------ 7 files changed, 309 insertions(+), 269 deletions(-) diff --git a/codec.go b/codec.go index c209c61..7657f86 100644 --- a/codec.go +++ b/codec.go @@ -34,8 +34,7 @@ type ValEncoder interface { // ReadVal parses Avro value and stores the result in the value pointed to by obj. func (r *Reader) ReadVal(schema Schema, obj any) { - key := cacheFingerprintOf(schema) - decoder := r.cfg.getDecoderFromCache(key, reflect2.RTypeOf(obj)) + decoder := r.cfg.getDecoderFromCache(schema.CacheFingerprint(), reflect2.RTypeOf(obj)) if decoder == nil { typ := reflect2.TypeOf(obj) if typ.Kind() != reflect.Ptr { @@ -66,15 +65,14 @@ func (w *Writer) WriteVal(schema Schema, val any) { func (c *frozenConfig) DecoderOf(schema Schema, typ reflect2.Type) ValDecoder { rtype := typ.RType() - key := cacheFingerprintOf(schema) - decoder := c.getDecoderFromCache(key, rtype) + decoder := c.getDecoderFromCache(schema.CacheFingerprint(), rtype) if decoder != nil { return decoder } ptrType := typ.(*reflect2.UnsafePtrType) decoder = decoderOfType(c, schema, ptrType.Elem()) - c.addDecoderToCache(key, rtype, decoder) + c.addDecoderToCache(schema.CacheFingerprint(), rtype, decoder) return decoder } diff --git a/codec_native.go b/codec_native.go index 3129e29..62d4239 100644 --- a/codec_native.go +++ b/codec_native.go @@ -12,7 +12,7 @@ import ( //nolint:maintidx // Splitting this would not make it simpler. func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecoder { - isConv := schema.actual != "" + resolved := schema.encodedType != "" switch typ.Kind() { case reflect.Bool: if schema.Type() != Boolean { @@ -60,8 +60,8 @@ func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecode if schema.Type() != Long { break } - if isConv { - return &longConvCodec[uint32]{convert: createLongConverter(schema.actual)} + if resolved { + return &longConvCodec[uint32]{convert: createLongConverter(schema.encodedType)} } return &longCodec[uint32]{} @@ -74,12 +74,12 @@ func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecode case st == Long && lt == TimeMicros: // time.Duration return &timeMicrosCodec{ - convert: createLongConverter(schema.actual), + convert: createLongConverter(schema.encodedType), } case st == Long: - if isConv { - return &longConvCodec[int64]{convert: createLongConverter(schema.actual)} + if resolved { + return &longConvCodec[int64]{convert: createLongConverter(schema.encodedType)} } return &longCodec[int64]{} @@ -91,8 +91,8 @@ func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecode if schema.Type() != Float { break } - if isConv { - return &float32ConvCodec{convert: createFloatConverter(schema.actual)} + if resolved { + return &float32ConvCodec{convert: createFloatConverter(schema.encodedType)} } return &float32Codec{} @@ -100,8 +100,8 @@ func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecode if schema.Type() != Double { break } - if isConv { - return &float64ConvCodec{convert: createDoubleConverter(schema.actual)} + if resolved { + return &float64ConvCodec{convert: createDoubleConverter(schema.encodedType)} } return &float64Codec{} @@ -127,21 +127,21 @@ func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecode return &dateCodec{} case isTime && st == Long && lt == TimestampMillis: return ×tampMillisCodec{ - convert: createLongConverter(schema.actual), + convert: createLongConverter(schema.encodedType), } case isTime && st == Long && lt == TimestampMicros: return ×tampMicrosCodec{ - convert: createLongConverter(schema.actual), + convert: createLongConverter(schema.encodedType), } case isTime && st == Long && lt == LocalTimestampMillis: return ×tampMillisCodec{ local: true, - convert: createLongConverter(schema.actual), + convert: createLongConverter(schema.encodedType), } case isTime && st == Long && lt == LocalTimestampMicros: return ×tampMicrosCodec{ local: true, - convert: createLongConverter(schema.actual), + convert: createLongConverter(schema.encodedType), } case typ.Type1().ConvertibleTo(ratType) && st == Bytes && lt == Decimal: dec := ls.(*DecimalLogicalSchema) diff --git a/config_internal_test.go b/config_internal_test.go index d256c47..4d14625 100644 --- a/config_internal_test.go +++ b/config_internal_test.go @@ -44,7 +44,7 @@ func TestConfig_ReusesDecoders(t *testing.T) { assert.Same(t, dec1, dec2) } -func TestConfig_ReusesDecoders_WithRecordFieldActions(t *testing.T) { +func TestConfig_ReusesDecoders_WithWriterFingerprint(t *testing.T) { type testObj struct { A int64 `avro:"a"` B string `avro:"b"` @@ -59,39 +59,21 @@ func TestConfig_ReusesDecoders_WithRecordFieldActions(t *testing.T) { }` typ := reflect2.TypeOfPtr(&testObj{}) - t.Run("set default", func(t *testing.T) { - api := Config{ - TagKey: "test", - BlockLength: 2, - }.Freeze() - cfg := api.(*frozenConfig) - - schema1 := MustParse(sch) - schema2 := MustParse(sch) - schema2.(*RecordSchema).Fields()[1].action = FieldSetDefault - - dec1 := cfg.DecoderOf(schema1, typ) - dec2 := cfg.DecoderOf(schema2, typ) - - assert.NotSame(t, dec1, dec2) - }) - - t.Run("ignore", func(t *testing.T) { - api := Config{ - TagKey: "test", - BlockLength: 2, - }.Freeze() - cfg := api.(*frozenConfig) + api := Config{ + TagKey: "test", + BlockLength: 2, + }.Freeze() + cfg := api.(*frozenConfig) - schema1 := MustParse(sch) - schema1.(*RecordSchema).Fields()[0].action = FieldIgnore - schema2 := MustParse(sch) + schema1 := MustParse(sch) + schema2 := MustParse(sch) + fp := [32]byte{1, 2, 3} + schema2.(*RecordSchema).writerFingerprint = &fp - dec1 := cfg.DecoderOf(schema1, typ) - dec2 := cfg.DecoderOf(schema2, typ) + dec1 := cfg.DecoderOf(schema1, typ) + dec2 := cfg.DecoderOf(schema2, typ) - assert.NotSame(t, dec1, dec2) - }) + assert.NotSame(t, dec1, dec2) } func TestConfig_ReusesDecoders_WithEnum(t *testing.T) { @@ -111,7 +93,9 @@ func TestConfig_ReusesDecoders_WithEnum(t *testing.T) { schema1 := MustParse(sch) schema2 := MustParse(sch) - schema2.(*EnumSchema).actual = []string{"foo", "bar"} + schema2.(*EnumSchema).encodedSymbols = []string{"foo", "bar"} + fp := schema1.Fingerprint() + schema2.(*EnumSchema).writerFingerprint = &fp dec1 := cfg.DecoderOf(schema1, typ) dec2 := cfg.DecoderOf(schema2, typ) diff --git a/schema.go b/schema.go index 4f88fec..78d7b35 100644 --- a/schema.go +++ b/schema.go @@ -77,24 +77,6 @@ const ( Duration LogicalType = "duration" ) -func isNative(typ Type) bool { - switch typ { - case Null, Boolean, Int, Long, Float, Double, Bytes, String: - return true - default: - return false - } -} - -func isPromotable(typ Type) bool { - switch typ { - case Int, Long, Float, String, Bytes: - return true - default: - return false - } -} - // Action is a field action used during decoding process. type Action string @@ -166,6 +148,11 @@ type Schema interface { // FingerprintUsing returns the fingerprint of the schema using the given algorithm or an error. FingerprintUsing(FingerprintType) ([]byte, error) + + // CacheFingerprint returns the unique identity of the schema. + // This returns a unique identity for schemas resolved from a writer schema, otherwise it returns + // the schemas Fingerprint. + CacheFingerprint() [32]byte } // LogicalSchema represents an Avro schema with a logical type. @@ -315,30 +302,33 @@ func (f *fingerprinter) FingerprintUsing(typ FingerprintType, stringer fmt.Strin return fingerprint, nil } -// cacheFingerprintOf returns a special fingerprint mainly used by decoders cache. -func cacheFingerprintOf(schema Schema) [32]byte { - if s, ok := schema.(interface{ CacheFingerprint() [32]byte }); ok { - return s.CacheFingerprint() - } - return schema.Fingerprint() -} - type cacheFingerprinter struct { - key atomic.Value // [32]byte + writerFingerprint *[32]byte + + cache atomic.Value // [32]byte } -func (sf *cacheFingerprinter) fingerprint(data []any) [32]byte { - if v := sf.key.Load(); v != nil { +// CacheFingerprint returns the SHA256 identity of the schema. +func (i *cacheFingerprinter) CacheFingerprint(schema Schema, fn func() []byte) [32]byte { + if v := i.cache.Load(); v != nil { return v.([32]byte) } - b, err := jsoniter.Marshal(data) - if err != nil { - panic("cache fingerprint: couldn't json marshal receipt data: " + err.Error()) + if i.writerFingerprint == nil { + fp := schema.Fingerprint() + i.cache.Store(fp) + return fp } - key := sha256.Sum256(b) - sf.key.Store(key) - return key + + fp := schema.Fingerprint() + d := append([]byte{}, fp[:]...) + d = append(d, (*i.writerFingerprint)[:]...) + if fn != nil { + d = append(d, fn()...) + } + ident := sha256.Sum256(d) + i.cache.Store(ident) + return ident } type properties struct { @@ -397,6 +387,7 @@ type schemaConfig struct { def any order Order props map[string]any + wfp *[32]byte } // SchemaOption is a function that sets a schema option. @@ -437,6 +428,20 @@ func WithProps(props map[string]any) SchemaOption { } } +func withWriterFingerprint(fp [32]byte) SchemaOption { + return func(opts *schemaConfig) { + opts.wfp = &fp + } +} + +func withWriterFingerprintIfResolved(fp [32]byte, resolved bool) SchemaOption { + return func(opts *schemaConfig) { + if resolved { + opts.wfp = &fp + } + } +} + // PrimitiveSchema is an Avro primitive type schema. type PrimitiveSchema struct { properties @@ -446,10 +451,9 @@ type PrimitiveSchema struct { typ Type logical LogicalSchema - // actual presents the actual type of the encoded value - // which can be promoted to schema current type. + // encodedType is the type of the encoded value, if it is different from the typ. // It's only used in the context of write-read schema resolution. - actual Type + encodedType Type } // NewPrimitiveSchema creates a new PrimitiveSchema. @@ -460,9 +464,10 @@ func NewPrimitiveSchema(t Type, l LogicalSchema, opts ...SchemaOption) *Primitiv } return &PrimitiveSchema{ - properties: newProperties(cfg.props, schemaReserved), - typ: t, - logical: l, + properties: newProperties(cfg.props, schemaReserved), + cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, + typ: t, + logical: l, } } @@ -519,13 +524,9 @@ func (s *PrimitiveSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) return s.fingerprinter.FingerprintUsing(typ, s) } -// CacheFingerprint returns a special fingerprint of the schema for caching purposes. +// CacheFingerprint returns unique identity of the schema. func (s *PrimitiveSchema) CacheFingerprint() [32]byte { - if s.actual == "" { - return s.Fingerprint() - } - - return s.cacheFingerprinter.fingerprint([]any{s.Fingerprint(), s.actual}) + return s.cacheFingerprinter.CacheFingerprint(s, nil) } // RecordSchema is an Avro record type schema. @@ -552,10 +553,11 @@ func NewRecordSchema(name, namespace string, fields []*Field, opts ...SchemaOpti } return &RecordSchema{ - name: n, - properties: newProperties(cfg.props, schemaReserved), - fields: fields, - doc: cfg.doc, + name: n, + properties: newProperties(cfg.props, schemaReserved), + cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, + fields: fields, + doc: cfg.doc, }, nil } @@ -652,22 +654,19 @@ func (s *RecordSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { return s.fingerprinter.FingerprintUsing(typ, s) } -// CacheFingerprint returns a special fingerprint of the schema for caching purposes. +// CacheFingerprint returns unique identity of the schema. func (s *RecordSchema) CacheFingerprint() [32]byte { - data := make([]any, 0) - for _, field := range s.fields { - if field.HasDefault() && field.action == FieldSetDefault { - data = append(data, field.Name(), field.Default()) - } - if field.action == FieldIgnore { - data = append(data, field.Name()+string(FieldIgnore)) + return s.cacheFingerprinter.CacheFingerprint(s, func() []byte { + var defs []any + for _, field := range s.fields { + if !field.HasDefault() { + continue + } + defs = append(defs, field.Default()) } - } - if len(data) == 0 { - return s.Fingerprint() - } - data = append(data, s.Fingerprint()) - return s.cacheFingerprinter.fingerprint(data) + b, _ := jsoniter.Marshal(defs) + return b + }) } // Field is an Avro record type field. @@ -853,9 +852,10 @@ type EnumSchema struct { symbols []string def string doc string - // actual presents the actual symbols of the encoded value. + + // encodedSymbols is the symbols of the encoded value. // It's only used in the context of write-read schema resolution. - actual []string + encodedSymbols []string } // NewEnumSchema creates a new enum schema instance. @@ -888,11 +888,12 @@ func NewEnumSchema(name, namespace string, symbols []string, opts ...SchemaOptio } return &EnumSchema{ - name: n, - properties: newProperties(cfg.props, schemaReserved), - symbols: symbols, - def: def, - doc: cfg.doc, + name: n, + properties: newProperties(cfg.props, schemaReserved), + cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, + symbols: symbols, + def: def, + doc: cfg.doc, }, nil } @@ -923,11 +924,11 @@ func (s *EnumSchema) Symbols() []string { // Symbol returns the symbol for the given index. // It might return the default value in the context of write-read schema resolution. func (s *EnumSchema) Symbol(i int) (string, bool) { + resolv := len(s.encodedSymbols) > 0 symbols := s.symbols - // has actual symbols - hasActual := len(s.actual) > 0 - if hasActual { - symbols = s.actual + if resolv { + // A different set of symbols is encoded. + symbols = s.encodedSymbols } if i < 0 || i >= len(symbols) { @@ -935,14 +936,12 @@ func (s *EnumSchema) Symbol(i int) (string, bool) { } symbol := symbols[i] - - if hasActual && !hasSymbol(s.symbols, symbol) { + if resolv && !hasSymbol(s.symbols, symbol) { if !s.HasDefault() { return "", false } return s.Default(), true } - return symbol, true } @@ -1011,19 +1010,21 @@ func (s *EnumSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { return s.fingerprinter.FingerprintUsing(typ, s) } -// CacheFingerprint returns a special fingerprint of the schema for caching purposes. +// CacheFingerprint returns unique identity of the schema. func (s *EnumSchema) CacheFingerprint() [32]byte { - if len(s.actual) == 0 || !s.HasDefault() { - return s.Fingerprint() - } - - return s.cacheFingerprinter.fingerprint([]any{s.Fingerprint(), s.actual, s.Default()}) + return s.cacheFingerprinter.CacheFingerprint(s, func() []byte { + if !s.HasDefault() { + return []byte{} + } + return []byte(s.Default()) + }) } // ArraySchema is an Avro array type schema. type ArraySchema struct { properties fingerprinter + cacheFingerprinter items Schema } @@ -1036,8 +1037,9 @@ func NewArraySchema(items Schema, opts ...SchemaOption) *ArraySchema { } return &ArraySchema{ - properties: newProperties(cfg.props, schemaReserved), - items: items, + properties: newProperties(cfg.props, schemaReserved), + cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, + items: items, } } @@ -1083,10 +1085,16 @@ func (s *ArraySchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { return s.fingerprinter.FingerprintUsing(typ, s) } +// CacheFingerprint returns unique identity of the schema. +func (s *ArraySchema) CacheFingerprint() [32]byte { + return s.cacheFingerprinter.CacheFingerprint(s, nil) +} + // MapSchema is an Avro map type schema. type MapSchema struct { properties fingerprinter + cacheFingerprinter values Schema } @@ -1099,8 +1107,9 @@ func NewMapSchema(values Schema, opts ...SchemaOption) *MapSchema { } return &MapSchema{ - properties: newProperties(cfg.props, schemaReserved), - values: values, + properties: newProperties(cfg.props, schemaReserved), + cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, + values: values, } } @@ -1146,15 +1155,26 @@ func (s *MapSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { return s.fingerprinter.FingerprintUsing(typ, s) } +// CacheFingerprint returns unique identity of the schema. +func (s *MapSchema) CacheFingerprint() [32]byte { + return s.cacheFingerprinter.CacheFingerprint(s, nil) +} + // UnionSchema is an Avro union type schema. type UnionSchema struct { fingerprinter + cacheFingerprinter types Schemas } // NewUnionSchema creates a union schema instance. -func NewUnionSchema(types []Schema) (*UnionSchema, error) { +func NewUnionSchema(types []Schema, opts ...SchemaOption) (*UnionSchema, error) { + var cfg schemaConfig + for _, opt := range opts { + opt(&cfg) + } + seen := map[string]bool{} for _, schema := range types { if schema.Type() == Union { @@ -1170,7 +1190,8 @@ func NewUnionSchema(types []Schema) (*UnionSchema, error) { } return &UnionSchema{ - types: types, + cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, + types: types, }, nil } @@ -1234,11 +1255,17 @@ func (s *UnionSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { return s.fingerprinter.FingerprintUsing(typ, s) } +// CacheFingerprint returns unique identity of the schema. +func (s *UnionSchema) CacheFingerprint() [32]byte { + return s.cacheFingerprinter.CacheFingerprint(s, nil) +} + // FixedSchema is an Avro fixed type schema. type FixedSchema struct { name properties fingerprinter + cacheFingerprinter size int logical LogicalSchema @@ -1262,10 +1289,11 @@ func NewFixedSchema( } return &FixedSchema{ - name: n, - properties: newProperties(cfg.props, schemaReserved), - size: size, - logical: logical, + name: n, + properties: newProperties(cfg.props, schemaReserved), + cacheFingerprinter: cacheFingerprinter{writerFingerprint: cfg.wfp}, + size: size, + logical: logical, }, nil } @@ -1336,6 +1364,11 @@ func (s *FixedSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { return s.fingerprinter.FingerprintUsing(typ, s) } +// CacheFingerprint returns unique identity of the schema. +func (s *FixedSchema) CacheFingerprint() [32]byte { + return s.cacheFingerprinter.CacheFingerprint(s, nil) +} + // NullSchema is an Avro null type schema. type NullSchema struct { fingerprinter @@ -1366,6 +1399,11 @@ func (s *NullSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { return s.fingerprinter.FingerprintUsing(typ, s) } +// CacheFingerprint returns unique identity of the schema. +func (s *NullSchema) CacheFingerprint() [32]byte { + return s.Fingerprint() +} + // RefSchema is a reference to a named Avro schema. type RefSchema struct { actual NamedSchema @@ -1408,6 +1446,11 @@ func (s *RefSchema) FingerprintUsing(typ FingerprintType) ([]byte, error) { return s.actual.FingerprintUsing(typ) } +// CacheFingerprint returns unique identity of the schema. +func (s *RefSchema) CacheFingerprint() [32]byte { + return s.actual.CacheFingerprint() +} + // PrimitiveLogicalSchema is a logical type with no properties. type PrimitiveLogicalSchema struct { typ LogicalType diff --git a/schema_compatibility.go b/schema_compatibility.go index efb1eef..3ba7948 100644 --- a/schema_compatibility.go +++ b/schema_compatibility.go @@ -276,11 +276,12 @@ func (c *SchemaCompatibility) Resolve(reader, writer Schema) (Schema, error) { return nil, err } - return c.resolve(reader, writer) + schema, _, err := c.resolve(reader, writer) + return schema, err } // resolve requires the reader's schema to be already compatible with the writer's. -func (c *SchemaCompatibility) resolve(reader, writer Schema) (Schema, error) { +func (c *SchemaCompatibility) resolve(reader, writer Schema) (schema Schema, resolved bool, err error) { if reader.Type() == Ref { reader = reader.(*RefSchema).Schema() } @@ -291,144 +292,191 @@ func (c *SchemaCompatibility) resolve(reader, writer Schema) (Schema, error) { if writer.Type() != reader.Type() { if reader.Type() == Union { for _, schema := range reader.(*UnionSchema).Types() { - sch, err := c.Resolve(schema, writer) + sch, _, err := c.resolve(schema, writer) if err != nil { continue } - return sch, nil + return sch, true, nil } - return nil, fmt.Errorf("reader union lacking writer schema %s", writer.Type()) + return nil, false, fmt.Errorf("reader union lacking writer schema %s", writer.Type()) } if writer.Type() == Union { schemas := make([]Schema, 0) for _, schema := range writer.(*UnionSchema).Types() { - sch, err := c.Resolve(reader, schema) + sch, _, err := c.resolve(reader, schema) if err != nil { - return nil, err + return nil, false, err } schemas = append(schemas, sch) } - return NewUnionSchema(schemas) + s, err := NewUnionSchema(schemas, withWriterFingerprint(writer.Fingerprint())) + return s, true, err } - if isPromotable(writer.Type()) { - r := NewPrimitiveSchema(reader.Type(), reader.(*PrimitiveSchema).Logical()) - r.actual = writer.Type() - return r, nil + if isPromotable(writer.Type(), reader.Type()) { + r := NewPrimitiveSchema(reader.Type(), reader.(*PrimitiveSchema).Logical(), + withWriterFingerprint(writer.Fingerprint()), + ) + r.encodedType = writer.Type() + return r, true, nil } - return nil, fmt.Errorf("failed to resolve composite schema for %s and %s", reader.Type(), writer.Type()) + return nil, false, fmt.Errorf("failed to resolve composite schema for %s and %s", reader.Type(), writer.Type()) } if isNative(writer.Type()) { - return reader, nil + return reader, false, nil } if writer.Type() == Enum { r := reader.(*EnumSchema) w := writer.(*EnumSchema) - if err := c.checkEnumSymbols(r, w); err != nil { + if err = c.checkEnumSymbols(r, w); err != nil { if r.HasDefault() { enum, _ := NewEnumSchema(r.Name(), r.Namespace(), r.Symbols(), WithAliases(r.Aliases()), WithDefault(r.Default()), + withWriterFingerprint(w.Fingerprint()), ) - enum.actual = w.Symbols() - return enum, nil + enum.encodedSymbols = w.Symbols() + return enum, true, nil } - return nil, err + return nil, false, err } - return reader, nil + return reader, false, nil } if writer.Type() == Fixed { - return reader, nil + return reader, false, nil } if writer.Type() == Union { schemas := make([]Schema, 0) - for _, schema := range writer.(*UnionSchema).Types() { - sch, err := c.Resolve(reader, schema) + for _, s := range writer.(*UnionSchema).Types() { + sch, resolv, err := c.resolve(reader, s) if err != nil { - return nil, err + return nil, false, err } schemas = append(schemas, sch) + resolved = resolv || resolved + } + s, err := NewUnionSchema(schemas, withWriterFingerprintIfResolved(writer.Fingerprint(), resolved)) + if err != nil { + return nil, false, err } - return NewUnionSchema(schemas) + return s, resolved, nil } if writer.Type() == Array { - schema, err := c.resolve(reader.(*ArraySchema).Items(), writer.(*ArraySchema).Items()) + schema, resolved, err = c.resolve(reader.(*ArraySchema).Items(), writer.(*ArraySchema).Items()) if err != nil { - return nil, err + return nil, false, err } - return NewArraySchema(schema), nil + return NewArraySchema(schema, withWriterFingerprintIfResolved(writer.Fingerprint(), resolved)), resolved, nil } if writer.Type() == Map { - schema, err := c.resolve(reader.(*MapSchema).Values(), writer.(*MapSchema).Values()) + schema, resolved, err = c.resolve(reader.(*MapSchema).Values(), writer.(*MapSchema).Values()) if err != nil { - return nil, err + return nil, false, err } - return NewMapSchema(schema), nil + return NewMapSchema(schema, withWriterFingerprintIfResolved(writer.Fingerprint(), resolved)), resolved, nil } if writer.Type() == Record { return c.resolveRecord(reader, writer) } - return nil, fmt.Errorf("failed to resolve composite schema for %s and %s", reader.Type(), writer.Type()) + return nil, false, fmt.Errorf("failed to resolve composite schema for %s and %s", reader.Type(), writer.Type()) } -func (c *SchemaCompatibility) resolveRecord(reader, writer Schema) (Schema, error) { +func (c *SchemaCompatibility) resolveRecord(reader, writer Schema) (Schema, bool, error) { w := writer.(*RecordSchema) r := reader.(*RecordSchema) fields := make([]*Field, 0) seen := make(map[string]struct{}) + var resolved bool for _, wf := range w.Fields() { rf, ok := c.getField(r.Fields(), wf, func(gfo *getFieldOptions) { gfo.elemAlias = true }) if !ok { + // The field was not found in the reader schema, it should be ignored. f, _ := NewField(wf.Name(), wf.Type(), WithAliases(wf.aliases), WithOrder(wf.order)) - // I believe def is read only it can be copied even if it's a like-pointer type; - // data race should not occur. f.def = wf.def f.hasDef = wf.hasDef f.action = FieldIgnore fields = append(fields, f) + + resolved = true continue } - ft, err := c.resolve(rf.Type(), wf.Type()) + ft, resolv, err := c.resolve(rf.Type(), wf.Type()) if err != nil { - return nil, err + return nil, false, err } f, _ := NewField(rf.Name(), ft, WithAliases(rf.aliases), WithOrder(rf.order)) f.def = rf.def f.hasDef = rf.hasDef fields = append(fields, f) + resolved = resolv || resolved seen[rf.Name()] = struct{}{} } for _, rf := range r.Fields() { - // check if seen in writer's record if _, ok := seen[rf.Name()]; ok { + // This field has already been seen. continue } + // The schemas are already known to be compatible, so there must be a default on + // the field in the writer. Use the default. + f, _ := NewField(rf.Name(), rf.Type(), WithAliases(rf.aliases), WithOrder(rf.order)) f.def = rf.def f.hasDef = rf.hasDef f.action = FieldSetDefault fields = append(fields, f) + + resolved = true } - return NewRecordSchema(r.Name(), r.Namespace(), fields, WithAliases(r.Aliases())) + schema, err := NewRecordSchema(r.Name(), r.Namespace(), fields, + WithAliases(r.Aliases()), + withWriterFingerprintIfResolved(writer.Fingerprint(), resolved), + ) + return schema, resolved, err +} + +func isNative(typ Type) bool { + switch typ { + case Null, Boolean, Int, Long, Float, Double, Bytes, String: + return true + default: + return false + } +} + +func isPromotable(writerTyp, readerType Type) bool { + switch writerTyp { + case Int: + return readerType == Long || readerType == Float || readerType == Double + case Long: + return readerType == Float || readerType == Double + case Float: + return readerType == Double + case String: + return readerType == Bytes + case Bytes: + return readerType == String + default: + return false + } } diff --git a/schema_compatibility_test.go b/schema_compatibility_test.go index 500667b..3a9b93e 100644 --- a/schema_compatibility_test.go +++ b/schema_compatibility_test.go @@ -577,7 +577,9 @@ func TestSchemaCompatibility_Resolve(t *testing.T) { // decoder cache must be aware of fields defaults. name: "Record Writer Field Missing With Record Default 2", reader: `{ - "type":"record", "name":"test", "namespace": "org.hamba.avro", + "type":"record", + "name":"test", + "namespace": "org.hamba.avro", "fields":[ {"name": "a", "type": "int"}, { diff --git a/schema_internal_test.go b/schema_internal_test.go index 575a608..50a5651 100644 --- a/schema_internal_test.go +++ b/schema_internal_test.go @@ -379,7 +379,7 @@ func TestSchema_FingerprintUsingCaches(t *testing.T) { got, _ := schema.FingerprintUsing(CRC64Avro) - value, ok := schema.cache.Load(CRC64Avro) + value, ok := schema.fingerprinter.cache.Load(CRC64Avro) require.True(t, ok) assert.Equal(t, want, value) assert.Equal(t, want, got) @@ -387,48 +387,73 @@ func TestSchema_FingerprintUsingCaches(t *testing.T) { func TestSchema_IsPromotable(t *testing.T) { tests := []struct { - typ Type - wantOk bool + writerTyp Type + readerType Type + want bool }{ { - typ: Int, - wantOk: true, + writerTyp: Int, + readerType: Long, + want: true, }, { - typ: Long, - wantOk: true, + writerTyp: Int, + readerType: Float, + want: true, }, { - typ: Float, - wantOk: true, + writerTyp: Int, + readerType: Double, + want: true, }, { - typ: String, - wantOk: true, + writerTyp: Long, + readerType: Float, + want: true, }, { - typ: Bytes, - wantOk: true, + writerTyp: Long, + readerType: Double, + want: true, }, { - typ: Double, - wantOk: false, + writerTyp: Float, + readerType: Double, + want: true, }, { - typ: Boolean, - wantOk: false, + writerTyp: String, + readerType: Bytes, + want: true, }, { - typ: Null, - wantOk: false, + writerTyp: Bytes, + readerType: String, + want: true, + }, + { + writerTyp: Double, + readerType: Int, + want: false, + }, + { + writerTyp: Boolean, + readerType: Int, + want: false, + }, + { + writerTyp: Null, + readerType: Null, + want: false, }, } for i, test := range tests { test := test t.Run(strconv.Itoa(i), func(t *testing.T) { - ok := isPromotable(test.typ) - assert.Equal(t, test.wantOk, ok) + ok := isPromotable(test.writerTyp, test.readerType) + + assert.Equal(t, test.want, ok) }) } } @@ -541,66 +566,6 @@ func TestSchema_FieldEncodeDefault(t *testing.T) { assert.Equal(t, []byte("foo"), def) } -func TestSchema_CacheFingerprint(t *testing.T) { - t.Run("invalid", func(t *testing.T) { - cacheFingerprint := cacheFingerprinter{} - assert.Panics(t, func() { - cacheFingerprint.fingerprint([]any{func() {}}) - }) - }) - - t.Run("promoted", func(t *testing.T) { - schema := NewPrimitiveSchema(Long, nil) - assert.Equal(t, schema.Fingerprint(), schema.CacheFingerprint()) - - schema = NewPrimitiveSchema(Long, nil) - schema.actual = Int - assert.NotEqual(t, schema.Fingerprint(), schema.CacheFingerprint()) - }) - - t.Run("enum", func(t *testing.T) { - schema1 := MustParse(`{ - "type": "enum", - "name": "test.enum", - "symbols": ["foo"] - }`).(*EnumSchema) - - schema2 := MustParse(`{ - "type": "enum", - "name": "test.enum", - "symbols": ["foo"], - "default": "foo" - }`).(*EnumSchema) - schema2.actual = []string{"boo"} - - assert.Equal(t, schema1.Fingerprint(), schema1.CacheFingerprint()) - assert.NotEqual(t, schema1.CacheFingerprint(), schema2.CacheFingerprint()) - }) - - t.Run("record", func(t *testing.T) { - schema1 := MustParse(`{ - "type": "record", - "name": "test", - "fields" : [ - {"name": "a", "type": "string"}, - {"name": "b", "type": "boolean"} - ] - }`).(*RecordSchema) - - schema2 := MustParse(`{ - "type": "record", - "name": "test2", - "fields" : [ - {"name": "a", "type": "string", "default": "bar"}, - {"name": "b", "type": "boolean", "default": false} - ] - }`).(*RecordSchema) - - assert.Equal(t, schema1.Fingerprint(), schema1.CacheFingerprint()) - assert.NotEqual(t, schema1.CacheFingerprint(), schema2.CacheFingerprint()) - }) -} - func TestEnumSchema_GetSymbol(t *testing.T) { tests := []struct { schemaFn func() *EnumSchema @@ -636,7 +601,7 @@ func TestEnumSchema_GetSymbol(t *testing.T) { { schemaFn: func() *EnumSchema { enum, _ := NewEnumSchema("foo", "", []string{"FOO"}) - enum.actual = []string{"FOO", "BAR"} + enum.encodedSymbols = []string{"FOO", "BAR"} return enum }, idx: 1, @@ -645,7 +610,7 @@ func TestEnumSchema_GetSymbol(t *testing.T) { { schemaFn: func() *EnumSchema { enum, _ := NewEnumSchema("foo", "", []string{"FOO"}, WithDefault("FOO")) - enum.actual = []string{"FOO", "BAR"} + enum.encodedSymbols = []string{"FOO", "BAR"} return enum }, idx: 1, @@ -655,7 +620,7 @@ func TestEnumSchema_GetSymbol(t *testing.T) { { schemaFn: func() *EnumSchema { enum, _ := NewEnumSchema("foo", "", []string{"FOO", "BAR"}) - enum.actual = []string{"FOO"} + enum.encodedSymbols = []string{"FOO"} return enum }, idx: 0,