diff --git a/schema.go b/schema.go index 6c0018f..02950ca 100644 --- a/schema.go +++ b/schema.go @@ -6,6 +6,7 @@ import ( "crypto/sha256" "errors" "fmt" + "hash" "sort" "strconv" "strings" @@ -95,6 +96,12 @@ const ( SHA256 FingerprintType = "SHA256" ) +var fingerprinters = map[FingerprintType]hash.Hash{ + CRC64Avro: crc64.New(), + MD5: md5.New(), + SHA256: sha256.New(), +} + // SchemaCache is a cache of schemas. type SchemaCache struct { cache sync.Map // map[string]Schema @@ -136,6 +143,9 @@ type Schema interface { // String returns the canonical form of the schema. String() string + // Resolve returns the resolved canonical form of the schema. + Resolve(cache *SchemaCache) string + // Fingerprint returns the SHA256 fingerprint of the schema. Fingerprint() [32]byte @@ -283,23 +293,14 @@ func (f *fingerprinter) FingerprintUsing(typ FingerprintType, stringer fmt.Strin return v.([]byte), nil } - data := []byte(stringer.String()) - - var fingerprint []byte - switch typ { - case CRC64Avro: - h := crc64.Sum(data) - fingerprint = h[:] - case MD5: - h := md5.Sum(data) - fingerprint = h[:] - case SHA256: - h := sha256.Sum256(data) - fingerprint = h[:] - default: + h, ok := fingerprinters[typ] + if !ok { return nil, fmt.Errorf("avro: unknown fingerprint algorithm %s", typ) } + h.Reset() + _, _ = h.Write([]byte(stringer.String())) + fingerprint := h.Sum(make([]byte, 0, h.Size())) f.cache.Store(typ, fingerprint) return fingerprint, nil } @@ -492,6 +493,11 @@ func (s *PrimitiveSchema) String() string { return `{"type":"` + string(s.typ) + `",` + s.logical.String() + `}` } +// Resolve returns the resolved form of the schema. +func (s *PrimitiveSchema) Resolve(_ *SchemaCache) string { + return s.String() +} + // MarshalJSON marshals the schema to json. func (s *PrimitiveSchema) MarshalJSON() ([]byte, error) { if s.logical == nil && len(s.props) == 0 { @@ -609,8 +615,48 @@ func (s *RecordSchema) String() string { if len(fields) > 0 { fields = fields[:len(fields)-1] } + if len(s.Namespace()) == 0 { + return `{"name":"` + s.FullName() + `","type":"` + typ + `","fields":[` + fields + `]}` + } else { + return `{"namespace":"` + s.Namespace() + `","name":"` + s.Name() + `","type":"` + typ + `","fields":[` + fields + `]}` + } +} + +// Resolve returns the resolved form of the schema. +func (s *RecordSchema) Resolve(cache *SchemaCache) string { + typ := "record" + if s.isError { + typ = "error" + } + + fields := "" + for _, f := range s.fields { + switch lookup := f.typ.(type) { + case *RefSchema: + switch found := lookup.actual.(type) { + case *RecordSchema: + if strings.Contains(found.full, ".") { + fields += `{"name":"` + f.Name() + `","type":` + found.Resolve(cache) + `},` + } else { + fields += f.String() + "," + } + default: + fields += f.String() + "," + } + default: + fields += f.String() + "," + } + } + if len(fields) > 0 { + fields = fields[:len(fields)-1] + } + + if len(s.Namespace()) == 0 { + return `{"name":"` + s.FullName() + `","type":"` + typ + `","fields":[` + fields + `]}` + } else { + return `{"namespace":"` + s.Namespace() + `","name":"` + s.Name() + `","type":"` + typ + `","fields":[` + fields + `]}` + } - return `{"name":"` + s.FullName() + `","type":"` + typ + `","fields":[` + fields + `]}` } // MarshalJSON marshals the schema to json. @@ -966,8 +1012,17 @@ func (s *EnumSchema) String() string { if len(symbols) > 0 { symbols = symbols[:len(symbols)-1] } + if len(s.Namespace()) == 0 { + return `{"name":"` + s.FullName() + `","type":"enum","symbols":[` + symbols + `]}` + } else { + return `{"namespace":"` + s.Namespace() + `","name":"` + s.Name() + `","type":"enum","symbols":[` + symbols + `]}` + } - return `{"name":"` + s.FullName() + `","type":"enum","symbols":[` + symbols + `]}` +} + +// Resolve returns the resolved form of the schema. +func (s *EnumSchema) Resolve(_ *SchemaCache) string { + return s.String() } // MarshalJSON marshals the schema to json. @@ -1060,6 +1115,11 @@ func (s *ArraySchema) String() string { return `{"type":"array","items":` + s.items.String() + `}` } +// Resolve returns the resolved form of the schema. +func (s *ArraySchema) Resolve(_ *SchemaCache) string { + return s.String() +} + // MarshalJSON marshals the schema to json. func (s *ArraySchema) MarshalJSON() ([]byte, error) { buf := new(bytes.Buffer) @@ -1130,6 +1190,11 @@ func (s *MapSchema) String() string { return `{"type":"map","values":` + s.values.String() + `}` } +// Resolve returns the resolved form of the schema. +func (s *MapSchema) Resolve(_ *SchemaCache) string { + return s.String() +} + // MarshalJSON marshals the schema to json. func (s *MapSchema) MarshalJSON() ([]byte, error) { buf := new(bytes.Buffer) @@ -1242,6 +1307,19 @@ func (s *UnionSchema) String() string { return `[` + types + `]` } +// Resolve returns the resolved form of the schema. +func (s *UnionSchema) Resolve(cache *SchemaCache) string { + types := "" + for _, typ := range s.types { + types += typ.Resolve(cache) + "," + } + if len(types) > 0 { + types = types[:len(types)-1] + } + + return `[` + types + `]` +} + // MarshalJSON marshals the schema to json. func (s *UnionSchema) MarshalJSON() ([]byte, error) { return jsoniter.Marshal(s.types) @@ -1323,7 +1401,12 @@ func (s *FixedSchema) String() string { logical = "," + s.logical.String() } - return `{"name":"` + s.FullName() + `","type":"fixed","size":` + size + logical + `}` + return `{"namespace":"` + s.Namespace() + `","name":"` + s.Name() + `","type":"fixed","size":` + size + logical + `}` +} + +// Resolve returns the resolved form of the schema. +func (s *FixedSchema) Resolve(_ *SchemaCache) string { + return s.String() } // MarshalJSON marshals the schema to json. @@ -1386,6 +1469,11 @@ func (s *NullSchema) String() string { return `"null"` } +// Resolve returns the resolved form of the schema. +func (s *NullSchema) Resolve(_ *SchemaCache) string { + return s.String() +} + // MarshalJSON marshals the schema to json. func (s *NullSchema) MarshalJSON() ([]byte, error) { return []byte(`"null"`), nil @@ -1433,6 +1521,11 @@ func (s *RefSchema) String() string { return `"` + s.actual.FullName() + `"` } +// Resolve returns the resolved form of the schema. +func (s *RefSchema) Resolve(cache *SchemaCache) string { + return s.actual.Resolve(cache) +} + // MarshalJSON marshals the schema to json. func (s *RefSchema) MarshalJSON() ([]byte, error) { return []byte(`"` + s.actual.FullName() + `"`), nil