Skip to content

Commit

Permalink
feat: support local timestamp logic types
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma committed Jan 17, 2024
1 parent d25c1c8 commit a5c78a2
Show file tree
Hide file tree
Showing 9 changed files with 310 additions and 67 deletions.
52 changes: 27 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,31 +68,33 @@ More examples in the [godoc](https://godoc.org/github.com/hamba/avro/v2).

#### Types Conversions

| Avro | Go Struct | Go Interface |
|-------------------------|--------------------------------------------------------|--------------------------|
| `null` | `nil` | `nil` |
| `boolean` | `bool` | `bool` |
| `bytes` | `[]byte` | `[]byte` |
| `float` | `float32` | `float32` |
| `double` | `float64` | `float64` |
| `long` | `int64`, `uint32`\* | `int64`, `uint32` |
| `int` | `int`, `int32`, `int16`, `int8`, `uint8`\*, `uint16`\* | `int`, `uint8`, `uint16` |
| `fixed` | `uint64` | `uint64` |
| `string` | `string` | `string` |
| `array` | `[]T` | `[]any` |
| `enum` | `string` | `string` |
| `fixed` | `[n]byte` | `[n]byte` |
| `map` | `map[string]T{}` | `map[string]any` |
| `record` | `struct` | `map[string]any` |
| `union` | *see below* | *see below* |
| `int.date` | `time.Time` | `time.Time` |
| `int.time-millis` | `time.Duration` | `time.Duration` |
| `long.time-micros` | `time.Duration` | `time.Duration` |
| `long.timestamp-millis` | `time.Time` | `time.Time` |
| `long.timestamp-micros` | `time.Time` | `time.Time` |
| `bytes.decimal` | `*big.Rat` | `*big.Rat` |
| `fixed.decimal` | `*big.Rat` | `*big.Rat` |
| `string.uuid` | `string` | `string` |
| Avro | Go Struct | Go Interface |
|-------------------------------|--------------------------------------------------------|--------------------------|
| `null` | `nil` | `nil` |
| `boolean` | `bool` | `bool` |
| `bytes` | `[]byte` | `[]byte` |
| `float` | `float32` | `float32` |
| `double` | `float64` | `float64` |
| `long` | `int64`, `uint32`\* | `int64`, `uint32` |
| `int` | `int`, `int32`, `int16`, `int8`, `uint8`\*, `uint16`\* | `int`, `uint8`, `uint16` |
| `fixed` | `uint64` | `uint64` |
| `string` | `string` | `string` |
| `array` | `[]T` | `[]any` |
| `enum` | `string` | `string` |
| `fixed` | `[n]byte` | `[n]byte` |
| `map` | `map[string]T{}` | `map[string]any` |
| `record` | `struct` | `map[string]any` |
| `union` | *see below* | *see below* |
| `int.date` | `time.Time` | `time.Time` |
| `int.time-millis` | `time.Duration` | `time.Duration` |
| `long.time-micros` | `time.Duration` | `time.Duration` |
| `long.timestamp-millis` | `time.Time` | `time.Time` |
| `long.timestamp-micros` | `time.Time` | `time.Time` |
| `long.local-timestamp-millis` | `time.Time` | `time.Time` |
| `long.local-timestamp-micros` | `time.Time` | `time.Time` |
| `bytes.decimal` | `*big.Rat` | `*big.Rat` |
| `fixed.decimal` | `*big.Rat` | `*big.Rat` |
| `string.uuid` | `string` | `string` |

\* Please note that when the Go type is an unsigned integer care must be taken to ensure that information is not lost
when converting between the Avro type and Go type. For example, storing a *negative* number in Avro of `int = -100`
Expand Down
11 changes: 8 additions & 3 deletions codec_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func genericDecode(schema Schema, r *Reader) any {
return nil
}

// seems generic reader is not compatible with codec
// Generic reader returns a different result from the
// codec in the case of a big.Rat. Handle this.
if rTyp.Type1() == ratType {
dec := obj.(big.Rat)
return &dec
Expand Down Expand Up @@ -69,14 +70,18 @@ func genericReceiver(schema Schema) (unsafe.Pointer, reflect2.Type, error) {
case TimeMicros:
var v time.Duration
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil

case TimestampMillis:
var v time.Time
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil

case TimestampMicros:
var v time.Time
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case LocalTimestampMillis:
var v time.Time
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
case LocalTimestampMicros:
var v time.Time
return unsafe.Pointer(&v), reflect2.TypeOf(v), nil
}
}
var v int64
Expand Down
19 changes: 16 additions & 3 deletions codec_generic_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package avro
import (
"bytes"
"math/big"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -76,6 +75,20 @@ func TestGenericDecode(t *testing.T) {
want: time.Date(2020, 1, 2, 3, 4, 5, 0, time.UTC),
wantErr: require.NoError,
},
{
name: "Long Local-Timestamp-Millis",
data: []byte{0x90, 0xB2, 0xAE, 0xC3, 0xEC, 0x5B},
schema: `{"type":"long","logicalType":"local-timestamp-millis"}`,
want: time.Date(2020, 1, 2, 3, 4, 5, 0, time.Local),
wantErr: require.NoError,
},
{
name: "Long Local-Timestamp-Micros",
data: []byte{0x80, 0xCD, 0xB7, 0xA2, 0xEE, 0xC7, 0xCD, 0x05},
schema: `{"type":"long","logicalType":"local-timestamp-micros"}`,
want: time.Date(2020, 1, 2, 3, 4, 5, 0, time.Local),
wantErr: require.NoError,
},
{
name: "Float",
data: []byte{0x33, 0x33, 0x93, 0x3F},
Expand Down Expand Up @@ -197,9 +210,9 @@ func TestGenericDecode(t *testing.T) {
},
}

for i, test := range tests {
for _, test := range tests {
test := test
t.Run(strconv.Itoa(i), func(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
schema := MustParse(test.schema)
r := NewReader(bytes.NewReader(test.data), 10)

Expand Down
85 changes: 58 additions & 27 deletions codec_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,24 +120,29 @@ func createDecoderOfNative(schema Schema, typ reflect2.Type) ValDecoder {
st := schema.Type()
ls := getLogicalSchema(schema)
lt := getLogicalType(schema)
tpy1 := typ.Type1()
Istpy1Time := tpy1.ConvertibleTo(timeType)
Istpy1Rat := tpy1.ConvertibleTo(ratType)
isTime := typ.Type1().ConvertibleTo(timeType)
switch {
case Istpy1Time && st == Int && lt == Date:
case isTime && st == Int && lt == Date:
return &dateCodec{}

case Istpy1Time && st == Long && lt == TimestampMillis:
case isTime && st == Long && lt == TimestampMillis:
return &timestampMillisCodec{
convert: converter.toLong,
}

case Istpy1Time && st == Long && lt == TimestampMicros:
case isTime && st == Long && lt == TimestampMicros:
return &timestampMicrosCodec{
convert: converter.toLong,
}

case Istpy1Rat && st == Bytes && lt == Decimal:
case isTime && st == Long && lt == LocalTimestampMillis:
return &timestampMillisCodec{
local: true,
convert: converter.toLong,
}
case isTime && st == Long && lt == LocalTimestampMicros:
return &timestampMicrosCodec{
local: true,
convert: converter.toLong,
}
case typ.Type1().ConvertibleTo(ratType) && st == Bytes && lt == Decimal:
dec := ls.(*DecimalLogicalSchema)
return &bytesDecimalCodec{
prec: dec.Precision(), scale: dec.Scale(),
Expand Down Expand Up @@ -228,13 +233,10 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder {
switch {
case st == Int && lt == TimeMillis: // time.Duration
return &timeMillisCodec{}

case st == Long && lt == TimeMicros: // time.Duration
return &timeMicrosCodec{}

case st == Long:
return &longCodec[int64]{}

default:
break
}
Expand All @@ -243,7 +245,6 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder {
switch schema.Type() {
case Double:
return &float32DoubleCodec{}

case Float:
return &float32Codec{}
}
Expand All @@ -269,24 +270,22 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder {
case reflect.Struct:
st := schema.Type()
lt := getLogicalType(schema)
tpy1 := typ.Type1()
Istpy1Time := tpy1.ConvertibleTo(timeType)
Istpy1Rat := tpy1.ConvertibleTo(ratType)
isTime := typ.Type1().ConvertibleTo(timeType)
switch {
case Istpy1Time && st == Int && lt == Date:
case isTime && st == Int && lt == Date:
return &dateCodec{}
case Istpy1Time && st == Long && lt == TimestampMillis:
case isTime && st == Long && lt == TimestampMillis:
return &timestampMillisCodec{}

case Istpy1Time && st == Long && lt == TimestampMicros:
case isTime && st == Long && lt == TimestampMicros:
return &timestampMicrosCodec{}

case Istpy1Rat && st != Bytes || lt == Decimal:
case isTime && st == Long && lt == LocalTimestampMillis:
return &timestampMillisCodec{local: true}
case isTime && st == Long && lt == LocalTimestampMicros:
return &timestampMicrosCodec{local: true}
case typ.Type1().ConvertibleTo(ratType) && st != Bytes || lt == Decimal:
ls := getLogicalSchema(schema)
dec := ls.(*DecimalLogicalSchema)

return &bytesDecimalCodec{prec: dec.Precision(), scale: dec.Scale()}

default:
break
}
Expand Down Expand Up @@ -477,6 +476,7 @@ func (c *dateCodec) Encode(ptr unsafe.Pointer, w *Writer) {
}

type timestampMillisCodec struct {
local bool
convert func(*Reader) int64
}

Expand All @@ -489,15 +489,31 @@ func (c *timestampMillisCodec) Decode(ptr unsafe.Pointer, r *Reader) {
}
sec := i / 1e3
nsec := (i - sec*1e3) * 1e6
*((*time.Time)(ptr)) = time.Unix(sec, nsec).UTC()
t := time.Unix(sec, nsec)

if c.local {
// When doing unix time, Go will convert the time from UTC to Local,
// changing the time by the number of seconds in the zone offset.
// Remove those added seconds.
_, offset := t.Zone()
t = t.Add(time.Duration(-1*offset) * time.Second)
*((*time.Time)(ptr)) = t
return
}
*((*time.Time)(ptr)) = t.UTC()
}

func (c *timestampMillisCodec) Encode(ptr unsafe.Pointer, w *Writer) {
t := *((*time.Time)(ptr))
if c.local {
t = t.Local()

Check failure on line 509 in codec_native.go

View workflow job for this annotation

GitHub Actions / test (1.21)

usage of time.Local (gosmopolitan)
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
}
w.WriteLong(t.Unix()*1e3 + int64(t.Nanosecond()/1e6))
}

type timestampMicrosCodec struct {
local bool
convert func(*Reader) int64
}

Expand All @@ -510,11 +526,26 @@ func (c *timestampMicrosCodec) Decode(ptr unsafe.Pointer, r *Reader) {
}
sec := i / 1e6
nsec := (i - sec*1e6) * 1e3
*((*time.Time)(ptr)) = time.Unix(sec, nsec).UTC()
t := time.Unix(sec, nsec)

if c.local {
// When doing unix time, Go will convert the time from UTC to Local,
// changing the time by the number of seconds in the zone offset.
// Remove those added seconds.
_, offset := t.Zone()
t = t.Add(time.Duration(-1*offset) * time.Second)
*((*time.Time)(ptr)) = t
return
}
*((*time.Time)(ptr)) = t.UTC()
}

func (c *timestampMicrosCodec) Encode(ptr unsafe.Pointer, w *Writer) {
t := *((*time.Time)(ptr))
if c.local {
t = t.Local()

Check failure on line 546 in codec_native.go

View workflow job for this annotation

GitHub Actions / test (1.21)

usage of time.Local (gosmopolitan)
t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
}
w.WriteLong(t.Unix()*1e6 + int64(t.Nanosecond()/1e3))
}

Expand Down
90 changes: 90 additions & 0 deletions decoder_native_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,96 @@ func TestDecoder_Time_TimestampMillisOneMicros(t *testing.T) {
assert.Equal(t, time.Date(1970, 1, 1, 0, 0, 0, 1e3, time.UTC), got)
}

func TestDecoder_Time_LocalTimestampMillis(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x90, 0xB2, 0xAE, 0xC3, 0xEC, 0x5B}
schema := `{"type":"long","logicalType":"local-timestamp-millis"}`
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
require.NoError(t, err)

var got time.Time
err = dec.Decode(&got)

require.NoError(t, err)
assert.Equal(t, time.Date(2020, 1, 2, 3, 4, 5, 0, time.Local), got)
}

func TestDecoder_Time_LocalTimestampMillisZero(t *testing.T) {
defer ConfigTeardown()

data := []byte{0xff, 0xdf, 0xe6, 0xa2, 0xe2, 0xa0, 0x1c}
schema := `{"type":"long","logicalType":"local-timestamp-millis"}`
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
require.NoError(t, err)

var got time.Time
err = dec.Decode(&got)

require.NoError(t, err)
assert.Equal(t, time.Date(1, 1, 1, 0, 0, 0, 0, time.Local), got)
}

func TestDecoder_Time_LocalTimestampMillisOneMillis(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x02}
schema := `{"type":"long","logicalType":"local-timestamp-millis"}`
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
require.NoError(t, err)

var got time.Time
err = dec.Decode(&got)

require.NoError(t, err)
assert.Equal(t, time.Date(1970, 1, 1, 0, 0, 0, 1e6, time.Local), got)
}

func TestDecoder_Time_LocalTimestampMicros(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x80, 0xCD, 0xB7, 0xA2, 0xEE, 0xC7, 0xCD, 0x05}
schema := `{"type":"long","logicalType":"local-timestamp-micros"}`
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
require.NoError(t, err)

var got time.Time
err = dec.Decode(&got)

require.NoError(t, err)
assert.Equal(t, time.Date(2020, 1, 2, 3, 4, 5, 0, time.Local), got)
}

func TestDecoder_Time_LocalTimestampMicrosZero(t *testing.T) {
defer ConfigTeardown()

data := []byte{0xff, 0xff, 0xdd, 0xf2, 0xdf, 0xff, 0xdf, 0xdc, 0x1}
schema := `{"type":"long","logicalType":"local-timestamp-micros"}`
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
require.NoError(t, err)

var got time.Time
err = dec.Decode(&got)

require.NoError(t, err)
assert.Equal(t, time.Date(1, 1, 1, 0, 0, 0, 0, time.Local), got)
}

func TestDecoder_Time_LocalTimestampMillisOneMicros(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x02}
schema := `{"type":"long","logicalType":"local-timestamp-micros"}`
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
require.NoError(t, err)

var got time.Time
err = dec.Decode(&got)

require.NoError(t, err)
assert.Equal(t, time.Date(1970, 1, 1, 0, 0, 0, 1e3, time.Local), got)
}

func TestDecoder_TimeInvalidSchema(t *testing.T) {
defer ConfigTeardown()

Expand Down
Loading

0 comments on commit a5c78a2

Please sign in to comment.