Skip to content

Commit

Permalink
Error on invalid type/logicalType
Browse files Browse the repository at this point in the history
- Ran into a nasty issue where I had incorrectly used a `time.Duration` with a `long` Avro type and `timestamp-millis`. Hamba was encoding the entire int64 for this case instead of erroring on an invalid schema. When Spark/SQL tried to [convert](https://github.com/apache/spark/blob/v3.5.0/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L142C38-L142C52) this into a microseconds (it wants everything as micros), it eventually tries to do this:
```
def millisToMicros(millis: Long): Long = {
    Math.multiplyExact(millis, MICROS_PER_MILLIS)
}
```
And [Math.multiplyExact](https://docs.oracle.com/en/java/javase/22/docs/api/java.base/java/lang/Math.html#multiplyExact(long,int)) detects an overflow because it is trying to multiply the long by 1000, which doesn't fit.
This fix will give the user an error if they use the `logicalType` incorrectly. The error will look something like `avro: time.Duration is unsupported for Avro long and logicalType timestamp-micros`.
  • Loading branch information
stampy88 committed Apr 4, 2024
1 parent fd90800 commit 478735e
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 2 deletions.
13 changes: 11 additions & 2 deletions codec_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,15 @@ func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecode
convert: createLongConverter(schema.encodedType),
}

case st == Long:
case st == Long && lt == "":
if resolved {
return &longConvCodec[int64]{convert: createLongConverter(schema.encodedType)}
}
return &longCodec[int64]{}

case lt != "":
return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s", typ.String(), schema.Type(), lt)}

Check failure on line 87 in codec_native.go

View workflow job for this annotation

GitHub Actions / test (1.21)

line is 131 characters (lll)

Check failure on line 87 in codec_native.go

View workflow job for this annotation

GitHub Actions / test (1.22)

line is 131 characters (lll)

default:
break
}
Expand Down Expand Up @@ -228,10 +231,16 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder {
switch {
case st == Int && lt == TimeMillis: // time.Duration
return &timeMillisCodec{}

case st == Long && lt == TimeMicros: // time.Duration
return &timeMicrosCodec{}
case st == Long:

case st == Long && lt == "":
return &longCodec[int64]{}

case lt != "":
return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s", typ.String(), schema.Type(), lt)}

Check failure on line 242 in codec_native.go

View workflow job for this annotation

GitHub Actions / test (1.21)

line is 131 characters (lll)

Check failure on line 242 in codec_native.go

View workflow job for this annotation

GitHub Actions / test (1.22)

line is 131 characters (lll)

default:
break
}
Expand Down
14 changes: 14 additions & 0 deletions decoder_native_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,20 @@ func TestDecoder_Duration_TimeMicros(t *testing.T) {
assert.Equal(t, 123456789123*time.Microsecond, got)
}

func TestDecoder_Duration_InvalidLogicalType(t *testing.T) {
defer ConfigTeardown()

data := []byte{0x86, 0xEA, 0xC8, 0xE9, 0x97, 0x07}
schema := `{"type":"long","logicalType":"timestamp-micros"}`
dec, err := avro.NewDecoder(schema, bytes.NewReader(data))
require.NoError(t, err)

var got time.Duration
err = dec.Decode(&got)

assert.Error(t, err)
}

func TestDecoder_DurationInvalidSchema(t *testing.T) {
defer ConfigTeardown()

Expand Down
13 changes: 13 additions & 0 deletions encoder_native_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,19 @@ func TestEncoder_Duration_TimeMicros(t *testing.T) {
assert.Equal(t, []byte{0x86, 0xEA, 0xC8, 0xE9, 0x97, 0x07}, buf.Bytes())
}

func TestEncoder_Duration_InvalidLogicalType(t *testing.T) {
defer ConfigTeardown()

schema := `{"type":"long","logicalType":"timestamp-micros"}`
buf := bytes.NewBuffer([]byte{})
enc, err := avro.NewEncoder(schema, buf)
require.NoError(t, err)

err = enc.Encode(123456789123 * time.Microsecond)

assert.Error(t, err)
}

func TestEncoder_DurationInvalidSchema(t *testing.T) {
defer ConfigTeardown()

Expand Down

0 comments on commit 478735e

Please sign in to comment.