Skip to content

Commit

Permalink
fix: type promotion POC
Browse files Browse the repository at this point in the history
  • Loading branch information
redaLaanait committed Nov 17, 2023
1 parent d68f799 commit 4ade51d
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 68 deletions.
178 changes: 141 additions & 37 deletions codec_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
)

func createDecoderOfNative(schema Schema, typ reflect2.Type) ValDecoder {
actual := schema.(*PrimitiveSchema).actual

switch typ.Kind() {
case reflect.Bool:
if schema.Type() != Boolean {
Expand Down Expand Up @@ -58,7 +60,9 @@ func createDecoderOfNative(schema Schema, typ reflect2.Type) ValDecoder {
if schema.Type() != Long {
break
}
return &longCodec[uint32]{}
return &longCodec[uint32]{
promoter: getCodecPromoter[uint32](actual),
}

case reflect.Int64:
st := schema.Type()
Expand All @@ -68,10 +72,14 @@ func createDecoderOfNative(schema Schema, typ reflect2.Type) ValDecoder {
return &timeMillisCodec{}

case st == Long && lt == TimeMicros: // time.Duration
return &timeMicrosCodec{}
return &timeMicrosCodec{
promoter: getCodecPromoter[int64](actual),
}

case st == Long:
return &longCodec[int64]{}
return &longCodec[int64]{
promoter: getCodecPromoter[int64](actual),
}

default:
break
Expand All @@ -81,25 +89,34 @@ func createDecoderOfNative(schema Schema, typ reflect2.Type) ValDecoder {
if schema.Type() != Float {
break
}
return &float32Codec{}
return &float32Codec{
promoter: getCodecPromoter[float32](actual),
}

case reflect.Float64:
if schema.Type() != Double {
break
}
return &float64Codec{}
return &float64Codec{
promoter: getCodecPromoter[float64](actual),
}

case reflect.String:
if schema.Type() != String {
break
}
return &stringCodec{}
return &stringCodec{
promoter: getCodecPromoter[string](actual),
}

case reflect.Slice:
if typ.(reflect2.SliceType).Elem().Kind() != reflect.Uint8 || schema.Type() != Bytes {
break
}
return &bytesCodec{sliceType: typ.(*reflect2.UnsafeSliceType)}
return &bytesCodec{
sliceType: typ.(*reflect2.UnsafeSliceType),
promoter: getCodecPromoter[[]byte](actual),
}

case reflect.Struct:
st := schema.Type()
Expand All @@ -113,15 +130,22 @@ func createDecoderOfNative(schema Schema, typ reflect2.Type) ValDecoder {
return &dateCodec{}

case Istpy1Time && st == Long && lt == TimestampMillis:
return &timestampMillisCodec{}
return &timestampMillisCodec{
promoter: getCodecPromoter[int64](actual),
}

case Istpy1Time && st == Long && lt == TimestampMicros:
return &timestampMicrosCodec{}
return &timestampMicrosCodec{
promoter: getCodecPromoter[int64](actual),
}

case Istpy1Rat && st == Bytes && lt == Decimal:
dec := ls.(*DecimalLogicalSchema)

return &bytesDecimalCodec{prec: dec.Precision(), scale: dec.Scale()}
return &bytesDecimalCodec{
prec: dec.Precision(), scale: dec.Scale(),
promoter: getCodecPromoter[[]byte](actual),
}

default:
break
Expand All @@ -139,7 +163,10 @@ func createDecoderOfNative(schema Schema, typ reflect2.Type) ValDecoder {
}
dec := ls.(*DecimalLogicalSchema)

return &bytesDecimalPtrCodec{prec: dec.Precision(), scale: dec.Scale()}
return &bytesDecimalPtrCodec{
prec: dec.Precision(), scale: dec.Scale(),
promoter: getCodecPromoter[[]byte](actual),
}
}

return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s", typ.String(), schema.Type())}
Expand Down Expand Up @@ -206,6 +233,7 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder {
return &timeMillisCodec{}

case st == Long && lt == TimeMicros: // time.Duration

return &timeMicrosCodec{}

case st == Long:
Expand Down Expand Up @@ -340,20 +368,37 @@ type largeInt interface {
~int32 | ~uint32 | int64
}

type longCodec[T largeInt] struct{}
type longCodec[T largeInt] struct {
promoter *codecPromoter[T]
}

func (*longCodec[T]) Decode(ptr unsafe.Pointer, r *Reader) {
*((*T)(ptr)) = T(r.ReadLong())
func (c *longCodec[T]) Decode(ptr unsafe.Pointer, r *Reader) {
var v T
if c.promoter != nil {
v = c.promoter.promote(r)
} else {
v = T(r.ReadLong())
}
*((*T)(ptr)) = v
}

func (*longCodec[T]) Encode(ptr unsafe.Pointer, w *Writer) {
w.WriteLong(int64(*((*T)(ptr))))
}

type float32Codec struct{}
type float32Codec struct {
promoter *codecPromoter[float32]
}

func (c *float32Codec) Decode(ptr unsafe.Pointer, r *Reader) {
var v float32
if c.promoter != nil {
v = c.promoter.promote(r)
} else {
v = r.ReadFloat()
}

func (*float32Codec) Decode(ptr unsafe.Pointer, r *Reader) {
*((*float32)(ptr)) = r.ReadFloat()
*((*float32)(ptr)) = v
}

func (*float32Codec) Encode(ptr unsafe.Pointer, w *Writer) {
Expand All @@ -366,20 +411,36 @@ func (*float32DoubleCodec) Encode(ptr unsafe.Pointer, w *Writer) {
w.WriteDouble(float64(*((*float32)(ptr))))
}

type float64Codec struct{}
type float64Codec struct {
promoter *codecPromoter[float64]
}

func (*float64Codec) Decode(ptr unsafe.Pointer, r *Reader) {
*((*float64)(ptr)) = r.ReadDouble()
func (c *float64Codec) Decode(ptr unsafe.Pointer, r *Reader) {
var v float64
if c.promoter != nil {
v = c.promoter.promote(r)
} else {
v = r.ReadDouble()
}
*((*float64)(ptr)) = v
}

func (*float64Codec) Encode(ptr unsafe.Pointer, w *Writer) {
w.WriteDouble(*((*float64)(ptr)))
}

type stringCodec struct{}
type stringCodec struct {
promoter *codecPromoter[string]
}

func (*stringCodec) Decode(ptr unsafe.Pointer, r *Reader) {
*((*string)(ptr)) = r.ReadString()
func (c *stringCodec) Decode(ptr unsafe.Pointer, r *Reader) {
var v string
if c.promoter != nil {
v = c.promoter.promote(r)
} else {
v = r.ReadString()
}
*((*string)(ptr)) = v
}

func (*stringCodec) Encode(ptr unsafe.Pointer, w *Writer) {
Expand All @@ -388,10 +449,17 @@ func (*stringCodec) Encode(ptr unsafe.Pointer, w *Writer) {

type bytesCodec struct {
sliceType *reflect2.UnsafeSliceType
promoter *codecPromoter[[]byte]
}

func (c *bytesCodec) Decode(ptr unsafe.Pointer, r *Reader) {
b := r.ReadBytes()
var b []byte
if c.promoter != nil {
b = c.promoter.promote(r)
} else {
b = r.ReadBytes()
}
// b := r.ReadBytes()
c.sliceType.UnsafeSet(ptr, reflect2.PtrOf(b))
}

Expand All @@ -413,10 +481,17 @@ func (c *dateCodec) Encode(ptr unsafe.Pointer, w *Writer) {
w.WriteInt(int32(days))
}

type timestampMillisCodec struct{}
type timestampMillisCodec struct {
promoter *codecPromoter[int64]
}

func (c *timestampMillisCodec) Decode(ptr unsafe.Pointer, r *Reader) {
i := r.ReadLong()
var i int64
if c.promoter != nil {
i = c.promoter.promote(r)
} else {
i = r.ReadLong()
}
sec := i / 1e3
nsec := (i - sec*1e3) * 1e6
*((*time.Time)(ptr)) = time.Unix(sec, nsec).UTC()
Expand All @@ -427,10 +502,17 @@ func (c *timestampMillisCodec) Encode(ptr unsafe.Pointer, w *Writer) {
w.WriteLong(t.Unix()*1e3 + int64(t.Nanosecond()/1e6))
}

type timestampMicrosCodec struct{}
type timestampMicrosCodec struct {
promoter *codecPromoter[int64]
}

func (c *timestampMicrosCodec) Decode(ptr unsafe.Pointer, r *Reader) {
i := r.ReadLong()
var i int64
if c.promoter != nil {
i = c.promoter.promote(r)
} else {
i = r.ReadLong()
}
sec := i / 1e6
nsec := (i - sec*1e6) * 1e3
*((*time.Time)(ptr)) = time.Unix(sec, nsec).UTC()
Expand All @@ -441,7 +523,8 @@ func (c *timestampMicrosCodec) Encode(ptr unsafe.Pointer, w *Writer) {
w.WriteLong(t.Unix()*1e6 + int64(t.Nanosecond()/1e3))
}

type timeMillisCodec struct{}
type timeMillisCodec struct {

Check failure on line 526 in codec_native.go

View workflow job for this annotation

GitHub Actions / test (1.20)

File is not `gofumpt`-ed with `-extra` (gofumpt)
}

func (c *timeMillisCodec) Decode(ptr unsafe.Pointer, r *Reader) {
i := r.ReadInt()
Expand All @@ -453,10 +536,17 @@ func (c *timeMillisCodec) Encode(ptr unsafe.Pointer, w *Writer) {
w.WriteInt(int32(d.Nanoseconds() / int64(time.Millisecond)))
}

type timeMicrosCodec struct{}
type timeMicrosCodec struct {
promoter *codecPromoter[int64]
}

func (c *timeMicrosCodec) Decode(ptr unsafe.Pointer, r *Reader) {
i := r.ReadLong()
var i int64
if c.promoter != nil {
i = c.promoter.promote(r)
} else {
i = r.ReadLong()
}
*((*time.Duration)(ptr)) = time.Duration(i) * time.Microsecond
}

Expand All @@ -468,12 +558,19 @@ func (c *timeMicrosCodec) Encode(ptr unsafe.Pointer, w *Writer) {
var one = big.NewInt(1)

type bytesDecimalCodec struct {
prec int
scale int
prec int
scale int
promoter *codecPromoter[[]byte]
}

func (c *bytesDecimalCodec) Decode(ptr unsafe.Pointer, r *Reader) {
b := r.ReadBytes()
var b []byte
if c.promoter != nil {
b = c.promoter.promote(r)
} else {
b = r.ReadBytes()
}
// b := r.ReadBytes()
if i := (&big.Int{}).SetBytes(b); len(b) > 0 && b[0]&0x80 > 0 {
i.Sub(i, new(big.Int).Lsh(one, uint(len(b))*8))
}
Expand Down Expand Up @@ -514,12 +611,19 @@ func (c *bytesDecimalCodec) Encode(ptr unsafe.Pointer, w *Writer) {
}

type bytesDecimalPtrCodec struct {
prec int
scale int
prec int
scale int
promoter *codecPromoter[[]byte]
}

func (c *bytesDecimalPtrCodec) Decode(ptr unsafe.Pointer, r *Reader) {
b := r.ReadBytes()
var b []byte
if c.promoter != nil {
b = c.promoter.promote(r)
} else {
b = r.ReadBytes()
}
// b := r.ReadBytes()
if i := (&big.Int{}).SetBytes(b); len(b) > 0 && b[0]&0x80 > 0 {
i.Sub(i, new(big.Int).Lsh(one, uint(len(b))*8))
}
Expand Down
Loading

0 comments on commit 4ade51d

Please sign in to comment.