Skip to content

Commit

Permalink
fix: Long timestamp default decoding (#443)
Browse files Browse the repository at this point in the history
  • Loading branch information
redaLaanait authored Sep 7, 2024
1 parent 91b9f6f commit 883fb8f
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 13 deletions.
7 changes: 4 additions & 3 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (
)

var (
timeType = reflect.TypeOf(time.Time{})
ratType = reflect.TypeOf(big.Rat{})
durType = reflect.TypeOf(LogicalDuration{})
timeType = reflect.TypeOf(time.Time{})
timeDurationType = reflect.TypeOf(time.Duration(0))
ratType = reflect.TypeOf(big.Rat{})
durType = reflect.TypeOf(LogicalDuration{})
)

type null struct{}
Expand Down
22 changes: 12 additions & 10 deletions codec_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,17 @@ func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecode
convert: createLongConverter(schema.encodedType),
}

case st == Long && lt == "":
case st == Long:
isTimestamp := (lt == TimestampMillis || lt == TimestampMicros)
if isTimestamp && typ.Type1() == timeDurationType {
return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s",
typ.Type1().String(), schema.Type(), lt)}
}
if resolved {
return &longConvCodec[int64]{convert: createLongConverter(schema.encodedType)}
}
return &longCodec[int64]{}

case lt != "":
return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s",
typ.String(), schema.Type(), lt)}

default:
break
}
Expand Down Expand Up @@ -245,13 +246,14 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder {
case st == Long && lt == TimeMicros: // time.Duration
return &timeMicrosCodec{}

case st == Long && lt == "":
case st == Long:
isTimestamp := (lt == TimestampMillis || lt == TimestampMicros)
if isTimestamp && typ.Type1() == timeDurationType {
return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s",
typ.Type1().String(), schema.Type(), lt)}
}
return &longCodec[int64]{}

case lt != "":
return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s",
typ.String(), schema.Type(), lt)}

default:
break
}
Expand Down
57 changes: 57 additions & 0 deletions schema_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package avro_test

import (
"math/big"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -815,6 +816,62 @@ func TestSchemaCompatibility_Resolve(t *testing.T) {
"b": map[string]any{"a": int64(20)},
},
},
{
name: "Record Writer Field Missing With Long timestamp-millis Default",
reader: `{
"type":"record", "name":"test", "namespace": "org.hamba.avro",
"fields":[
{"name": "a", "type": "string"},
{
"name": "b",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"default": ` + strconv.FormatInt(1725616800000, 10) + `
}
]
}`,
writer: `{
"type":"record", "name":"test", "namespace": "org.hamba.avro",
"fields":[
{"name": "a", "type": "string"}
]
}`,
value: map[string]any{"a": "foo"},
want: map[string]any{
"a": "foo",
"b": time.UnixMilli(1725616800000).UTC(), // 2024-09-06 10:00:00
},
},
{
name: "Record Writer Field Missing With Long timestamp-micros Default",
reader: `{
"type":"record", "name":"test", "namespace": "org.hamba.avro",
"fields":[
{"name": "a", "type": "string"},
{
"name": "b",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
},
"default": ` + strconv.FormatInt(1725616800000000, 10) + `
}
]
}`,
writer: `{
"type":"record", "name":"test", "namespace": "org.hamba.avro",
"fields":[
{"name": "a", "type": "string"}
]
}`,
value: map[string]any{"a": "foo"},
want: map[string]any{
"a": "foo",
"b": time.UnixMicro(1725616800000000).UTC(), // 2024-09-06 10:00:00
},
},
}

for _, test := range tests {
Expand Down

0 comments on commit 883fb8f

Please sign in to comment.