Skip to content

Commit

Permalink
Serialize schemas feature hamba#400
Browse files Browse the repository at this point in the history
  • Loading branch information
ludovic-pourrat authored May 17, 2024
1 parent 57422ee commit e7c5004
Showing 1 changed file with 110 additions and 17 deletions.
127 changes: 110 additions & 17 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/sha256"
"errors"
"fmt"
"hash"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -95,6 +96,12 @@ const (
SHA256 FingerprintType = "SHA256"
)

var fingerprinters = map[FingerprintType]hash.Hash{
CRC64Avro: crc64.New(),
MD5: md5.New(),
SHA256: sha256.New(),
}

// SchemaCache is a cache of schemas.
type SchemaCache struct {
cache sync.Map // map[string]Schema
Expand Down Expand Up @@ -136,6 +143,9 @@ type Schema interface {
// String returns the canonical form of the schema.
String() string

// Resolve returns the resolved canonical form of the schema.
Resolve(cache *SchemaCache) string

// Fingerprint returns the SHA256 fingerprint of the schema.
Fingerprint() [32]byte

Expand Down Expand Up @@ -283,23 +293,14 @@ func (f *fingerprinter) FingerprintUsing(typ FingerprintType, stringer fmt.Strin
return v.([]byte), nil
}

data := []byte(stringer.String())

var fingerprint []byte
switch typ {
case CRC64Avro:
h := crc64.Sum(data)
fingerprint = h[:]
case MD5:
h := md5.Sum(data)
fingerprint = h[:]
case SHA256:
h := sha256.Sum256(data)
fingerprint = h[:]
default:
h, ok := fingerprinters[typ]
if !ok {
return nil, fmt.Errorf("avro: unknown fingerprint algorithm %s", typ)
}

h.Reset()
_, _ = h.Write([]byte(stringer.String()))
fingerprint := h.Sum(make([]byte, 0, h.Size()))
f.cache.Store(typ, fingerprint)
return fingerprint, nil
}
Expand Down Expand Up @@ -492,6 +493,11 @@ func (s *PrimitiveSchema) String() string {
return `{"type":"` + string(s.typ) + `",` + s.logical.String() + `}`
}

// Resolve returns the resolved form of the schema.
func (s *PrimitiveSchema) Resolve(_ *SchemaCache) string {
return s.String()
}

// MarshalJSON marshals the schema to json.
func (s *PrimitiveSchema) MarshalJSON() ([]byte, error) {
if s.logical == nil && len(s.props) == 0 {
Expand Down Expand Up @@ -609,8 +615,48 @@ func (s *RecordSchema) String() string {
if len(fields) > 0 {
fields = fields[:len(fields)-1]
}
if len(s.Namespace()) == 0 {
return `{"name":"` + s.FullName() + `","type":"` + typ + `","fields":[` + fields + `]}`
} else {
return `{"namespace":"` + s.Namespace() + `","name":"` + s.Name() + `","type":"` + typ + `","fields":[` + fields + `]}`
}
}

// Resolve returns the resolved form of the schema.
func (s *RecordSchema) Resolve(cache *SchemaCache) string {
typ := "record"
if s.isError {
typ = "error"
}

fields := ""
for _, f := range s.fields {
switch lookup := f.typ.(type) {
case *RefSchema:
switch found := lookup.actual.(type) {
case *RecordSchema:
if strings.Contains(found.full, ".") {
fields += `{"name":"` + f.Name() + `","type":` + found.Resolve(cache) + `},`
} else {
fields += f.String() + ","
}
default:
fields += f.String() + ","
}
default:
fields += f.String() + ","
}
}
if len(fields) > 0 {
fields = fields[:len(fields)-1]
}

if len(s.Namespace()) == 0 {
return `{"name":"` + s.FullName() + `","type":"` + typ + `","fields":[` + fields + `]}`
} else {
return `{"namespace":"` + s.Namespace() + `","name":"` + s.Name() + `","type":"` + typ + `","fields":[` + fields + `]}`
}

return `{"name":"` + s.FullName() + `","type":"` + typ + `","fields":[` + fields + `]}`
}

// MarshalJSON marshals the schema to json.
Expand Down Expand Up @@ -966,8 +1012,17 @@ func (s *EnumSchema) String() string {
if len(symbols) > 0 {
symbols = symbols[:len(symbols)-1]
}
if len(s.Namespace()) == 0 {
return `{"name":"` + s.FullName() + `","type":"enum","symbols":[` + symbols + `]}`
} else {
return `{"namespace":"` + s.Namespace() + `","name":"` + s.Name() + `","type":"enum","symbols":[` + symbols + `]}`
}

return `{"name":"` + s.FullName() + `","type":"enum","symbols":[` + symbols + `]}`
}

// Resolve returns the resolved form of the schema.
func (s *EnumSchema) Resolve(_ *SchemaCache) string {
return s.String()
}

// MarshalJSON marshals the schema to json.
Expand Down Expand Up @@ -1060,6 +1115,11 @@ func (s *ArraySchema) String() string {
return `{"type":"array","items":` + s.items.String() + `}`
}

// Resolve returns the resolved form of the schema.
func (s *ArraySchema) Resolve(_ *SchemaCache) string {
return s.String()
}

// MarshalJSON marshals the schema to json.
func (s *ArraySchema) MarshalJSON() ([]byte, error) {
buf := new(bytes.Buffer)
Expand Down Expand Up @@ -1130,6 +1190,11 @@ func (s *MapSchema) String() string {
return `{"type":"map","values":` + s.values.String() + `}`
}

// Resolve returns the resolved form of the schema.
func (s *MapSchema) Resolve(_ *SchemaCache) string {
return s.String()
}

// MarshalJSON marshals the schema to json.
func (s *MapSchema) MarshalJSON() ([]byte, error) {
buf := new(bytes.Buffer)
Expand Down Expand Up @@ -1242,6 +1307,19 @@ func (s *UnionSchema) String() string {
return `[` + types + `]`
}

// Resolve returns the resolved form of the schema.
func (s *UnionSchema) Resolve(cache *SchemaCache) string {
types := ""
for _, typ := range s.types {
types += typ.Resolve(cache) + ","
}
if len(types) > 0 {
types = types[:len(types)-1]
}

return `[` + types + `]`
}

// MarshalJSON marshals the schema to json.
func (s *UnionSchema) MarshalJSON() ([]byte, error) {
return jsoniter.Marshal(s.types)
Expand Down Expand Up @@ -1323,7 +1401,12 @@ func (s *FixedSchema) String() string {
logical = "," + s.logical.String()
}

return `{"name":"` + s.FullName() + `","type":"fixed","size":` + size + logical + `}`
return `{"namespace":"` + s.Namespace() + `","name":"` + s.Name() + `","type":"fixed","size":` + size + logical + `}`
}

// Resolve returns the resolved form of the schema.
func (s *FixedSchema) Resolve(_ *SchemaCache) string {
return s.String()
}

// MarshalJSON marshals the schema to json.
Expand Down Expand Up @@ -1386,6 +1469,11 @@ func (s *NullSchema) String() string {
return `"null"`
}

// Resolve returns the resolved form of the schema.
func (s *NullSchema) Resolve(_ *SchemaCache) string {
return s.String()
}

// MarshalJSON marshals the schema to json.
func (s *NullSchema) MarshalJSON() ([]byte, error) {
return []byte(`"null"`), nil
Expand Down Expand Up @@ -1433,6 +1521,11 @@ func (s *RefSchema) String() string {
return `"` + s.actual.FullName() + `"`
}

// Resolve returns the resolved form of the schema.
func (s *RefSchema) Resolve(cache *SchemaCache) string {
return s.actual.Resolve(cache)
}

// MarshalJSON marshals the schema to json.
func (s *RefSchema) MarshalJSON() ([]byte, error) {
return []byte(`"` + s.actual.FullName() + `"`), nil
Expand Down

0 comments on commit e7c5004

Please sign in to comment.