From 478735e742b76ac9078ebf7bda05b9cc9beebdc0 Mon Sep 17 00:00:00 2001 From: dave sinclair Date: Thu, 4 Apr 2024 15:28:39 -0400 Subject: [PATCH] Error on invalid type/logicalType - Ran into a nasty issue where I had incorrectly used a `time.Duration` with a `long` Avro type and `timestamp-millis`. Hamba was encoding the entire int64 for this case instead of erroring on an invalid schema. When Spark/SQL tried to [convert](https://github.com/apache/spark/blob/v3.5.0/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala#L142C38-L142C52) this into a microseconds (it wants everything as micros), it eventually tries to do this: ``` def millisToMicros(millis: Long): Long = { Math.multiplyExact(millis, MICROS_PER_MILLIS) } ``` And [Math.multiplyExact](https://docs.oracle.com/en/java/javase/22/docs/api/java.base/java/lang/Math.html#multiplyExact(long,int)) detects an overflow because it is trying to multiply the long by 1000, which doesn't fit. This fix will give the user an error if they use the `logicalType` incorrectly. The error will look something like `avro: time.Duration is unsupported for Avro long and logicalType timestamp-micros`. --- codec_native.go | 13 +++++++++++-- decoder_native_test.go | 14 ++++++++++++++ encoder_native_test.go | 13 +++++++++++++ 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/codec_native.go b/codec_native.go index 62d4239..5448591 100644 --- a/codec_native.go +++ b/codec_native.go @@ -77,12 +77,15 @@ func createDecoderOfNative(schema *PrimitiveSchema, typ reflect2.Type) ValDecode convert: createLongConverter(schema.encodedType), } - case st == Long: + case st == Long && lt == "": if resolved { return &longConvCodec[int64]{convert: createLongConverter(schema.encodedType)} } return &longCodec[int64]{} + case lt != "": + return &errorDecoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s", typ.String(), schema.Type(), lt)} + default: break } @@ -228,10 +231,16 @@ func createEncoderOfNative(schema Schema, typ reflect2.Type) ValEncoder { switch { case st == Int && lt == TimeMillis: // time.Duration return &timeMillisCodec{} + case st == Long && lt == TimeMicros: // time.Duration return &timeMicrosCodec{} - case st == Long: + + case st == Long && lt == "": return &longCodec[int64]{} + + case lt != "": + return &errorEncoder{err: fmt.Errorf("avro: %s is unsupported for Avro %s and logicalType %s", typ.String(), schema.Type(), lt)} + default: break } diff --git a/decoder_native_test.go b/decoder_native_test.go index 0915c15..398bed1 100644 --- a/decoder_native_test.go +++ b/decoder_native_test.go @@ -641,6 +641,20 @@ func TestDecoder_Duration_TimeMicros(t *testing.T) { assert.Equal(t, 123456789123*time.Microsecond, got) } +func TestDecoder_Duration_InvalidLogicalType(t *testing.T) { + defer ConfigTeardown() + + data := []byte{0x86, 0xEA, 0xC8, 0xE9, 0x97, 0x07} + schema := `{"type":"long","logicalType":"timestamp-micros"}` + dec, err := avro.NewDecoder(schema, bytes.NewReader(data)) + require.NoError(t, err) + + var got time.Duration + err = dec.Decode(&got) + + assert.Error(t, err) +} + func TestDecoder_DurationInvalidSchema(t *testing.T) { defer ConfigTeardown() diff --git a/encoder_native_test.go b/encoder_native_test.go index f201e06..739b1fe 100644 --- a/encoder_native_test.go +++ b/encoder_native_test.go @@ -626,6 +626,19 @@ func TestEncoder_Duration_TimeMicros(t *testing.T) { assert.Equal(t, []byte{0x86, 0xEA, 0xC8, 0xE9, 0x97, 0x07}, buf.Bytes()) } +func TestEncoder_Duration_InvalidLogicalType(t *testing.T) { + defer ConfigTeardown() + + schema := `{"type":"long","logicalType":"timestamp-micros"}` + buf := bytes.NewBuffer([]byte{}) + enc, err := avro.NewEncoder(schema, buf) + require.NoError(t, err) + + err = enc.Encode(123456789123 * time.Microsecond) + + assert.Error(t, err) +} + func TestEncoder_DurationInvalidSchema(t *testing.T) { defer ConfigTeardown()