From 9c1fb0a40e31b7a6c504ec6d3230219d09fdc182 Mon Sep 17 00:00:00 2001 From: Wojciech Trefon Date: Tue, 15 Oct 2024 08:53:04 +0200 Subject: [PATCH] Add proper types to mapper --- .../schemaevolution/ColumnTypeMapper.java | 40 +--------- .../iceberg/IcebergColumnTypeMapper.java | 51 +++++++++++- .../snowflake/SnowflakeColumnTypeMapper.java | 41 ++++++++++ .../iceberg/IcebergColumnTypeMapperTest.java | 80 +++++++++++++++++++ 4 files changed, 169 insertions(+), 43 deletions(-) create mode 100644 src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/ColumnTypeMapper.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/ColumnTypeMapper.java index 24765a53f..e7d3d196b 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/ColumnTypeMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/ColumnTypeMapper.java @@ -1,16 +1,5 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution; -import static org.apache.kafka.connect.data.Schema.Type.ARRAY; -import static org.apache.kafka.connect.data.Schema.Type.BOOLEAN; -import static org.apache.kafka.connect.data.Schema.Type.BYTES; -import static org.apache.kafka.connect.data.Schema.Type.FLOAT32; -import static org.apache.kafka.connect.data.Schema.Type.FLOAT64; -import static org.apache.kafka.connect.data.Schema.Type.INT16; -import static org.apache.kafka.connect.data.Schema.Type.INT32; -import static org.apache.kafka.connect.data.Schema.Type.INT64; -import static org.apache.kafka.connect.data.Schema.Type.STRING; -import static org.apache.kafka.connect.data.Schema.Type.STRUCT; - import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.connect.data.Schema; @@ -28,32 +17,5 @@ public String mapToColumnType(Schema.Type kafkaType) { * @param value JSON node * @return Kafka type */ - public Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) { - if (value == null || value.isNull()) { - return STRING; - } else if (value.isNumber()) { - if (value.isShort()) { - return INT16; - } else if (value.isInt()) { - return INT32; - } else if (value.isFloat()) { - return FLOAT32; - } else if (value.isDouble()) { - return FLOAT64; - } - return INT64; - } else if (value.isTextual()) { - return STRING; - } else if (value.isBoolean()) { - return BOOLEAN; - } else if (value.isBinary()) { - return BYTES; - } else if (value.isArray()) { - return ARRAY; - } else if (value.isObject()) { - return STRUCT; - } else { - return null; - } - } + public abstract Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java index a679c499d..206c21b39 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java @@ -1,6 +1,16 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; +import static org.apache.kafka.connect.data.Schema.Type.ARRAY; +import static org.apache.kafka.connect.data.Schema.Type.BOOLEAN; +import static org.apache.kafka.connect.data.Schema.Type.BYTES; +import static org.apache.kafka.connect.data.Schema.Type.FLOAT32; +import static org.apache.kafka.connect.data.Schema.Type.FLOAT64; +import static org.apache.kafka.connect.data.Schema.Type.INT64; +import static org.apache.kafka.connect.data.Schema.Type.STRING; +import static org.apache.kafka.connect.data.Schema.Type.STRUCT; + import com.snowflake.kafka.connector.internal.streaming.schemaevolution.ColumnTypeMapper; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -13,20 +23,20 @@ public String mapToColumnType(Schema.Type kafkaType, String schemaName) { switch (kafkaType) { case INT8: case INT16: - return "BIGINT"; + return "INT"; case INT32: if (Date.LOGICAL_NAME.equals(schemaName)) { return "DATE"; } else if (Time.LOGICAL_NAME.equals(schemaName)) { return "TIME(6)"; } else { - return "BIGINT"; + return "INT"; } case INT64: if (Timestamp.LOGICAL_NAME.equals(schemaName)) { return "TIMESTAMP(6)"; } else { - return "BIGINT"; + return "LONG"; } case FLOAT32: return "FLOAT"; @@ -45,7 +55,40 @@ public String mapToColumnType(Schema.Type kafkaType, String schemaName) { case ARRAY: default: // MAP and STRUCT will go here - throw new IllegalStateException("Arrays, struct and map not supported!"); + throw new IllegalArgumentException("Arrays, struct and map not supported!"); + } + } + + /** + * Map the JSON node type to Kafka type + * + * @param value JSON node + * @return Kafka type + */ + @Override + public Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) { + if (value == null || value.isNull()) { + return STRING; + } else if (value.isNumber()) { + if (value.isFloat()) { + return FLOAT32; + } else if (value.isDouble()) { + return FLOAT64; + } + return INT64; // short, int, long we treat as 64-bit numbers as from the value we can't infer + // smaller types + } else if (value.isTextual()) { + return STRING; + } else if (value.isBoolean()) { + return BOOLEAN; + } else if (value.isBinary()) { + return BYTES; + } else if (value.isArray()) { + return ARRAY; + } else if (value.isObject()) { + return STRUCT; + } else { + return null; } } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeColumnTypeMapper.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeColumnTypeMapper.java index 8fa7cb189..00a0674aa 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeColumnTypeMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeColumnTypeMapper.java @@ -1,6 +1,18 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution.snowflake; +import static org.apache.kafka.connect.data.Schema.Type.ARRAY; +import static org.apache.kafka.connect.data.Schema.Type.BOOLEAN; +import static org.apache.kafka.connect.data.Schema.Type.BYTES; +import static org.apache.kafka.connect.data.Schema.Type.FLOAT32; +import static org.apache.kafka.connect.data.Schema.Type.FLOAT64; +import static org.apache.kafka.connect.data.Schema.Type.INT16; +import static org.apache.kafka.connect.data.Schema.Type.INT32; +import static org.apache.kafka.connect.data.Schema.Type.INT64; +import static org.apache.kafka.connect.data.Schema.Type.STRING; +import static org.apache.kafka.connect.data.Schema.Type.STRUCT; + import com.snowflake.kafka.connector.internal.streaming.schemaevolution.ColumnTypeMapper; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -57,4 +69,33 @@ public String mapToColumnType(Schema.Type kafkaType, String schemaName) { return "VARIANT"; } } + + public Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) { + if (value == null || value.isNull()) { + return STRING; + } else if (value.isNumber()) { + if (value.isShort()) { + return INT16; + } else if (value.isInt()) { + return INT32; + } else if (value.isFloat()) { + return FLOAT32; + } else if (value.isDouble()) { + return FLOAT64; + } + return INT64; + } else if (value.isTextual()) { + return STRING; + } else if (value.isBoolean()) { + return BOOLEAN; + } else if (value.isBinary()) { + return BYTES; + } else if (value.isArray()) { + return ARRAY; + } else if (value.isObject()) { + return STRUCT; + } else { + return null; + } + } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java new file mode 100644 index 000000000..6adb0a1f3 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java @@ -0,0 +1,80 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.stream.Stream; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.JsonNodeFactory; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +class IcebergColumnTypeMapperTest { + + private final IcebergColumnTypeMapper mapper = new IcebergColumnTypeMapper(); + + @ParameterizedTest(name = "should map Kafka type {0} to Snowflake column type {2}") + @MethodSource("kafkaTypesToMap") + void shouldMapKafkaTypeToSnowflakeColumnType( + Schema.Type kafkaType, String schemaName, String expectedSnowflakeType) { + assertThat(mapper.mapToColumnType(kafkaType, schemaName)).isEqualTo(expectedSnowflakeType); + } + + @ParameterizedTest() + @MethodSource("jsonNodeTypesToMap") + void shouldMapJsonNodeTypeToKafkaType(JsonNode value, Schema.Type expectedKafkaType) { + assertThat(mapper.mapJsonNodeTypeToKafkaType(value)).isEqualTo(expectedKafkaType); + } + + @ParameterizedTest() + @MethodSource("kafkaTypesToThrowException") + void shouldThrowExceptionWhenMappingUnsupportedKafkaType(Schema.Type kafkaType) { + assertThatThrownBy(() -> mapper.mapToColumnType(kafkaType, null)) + .isInstanceOf(IllegalArgumentException.class); + } + + private static Stream kafkaTypesToMap() { + return Stream.of( + Arguments.of(Schema.Type.INT8, null, "INT"), + Arguments.of(Schema.Type.INT16, null, "INT"), + Arguments.of(Schema.Type.INT32, Date.LOGICAL_NAME, "DATE"), + Arguments.of(Schema.Type.INT32, Time.LOGICAL_NAME, "TIME(6)"), + Arguments.of(Schema.Type.INT32, null, "INT"), + Arguments.of(Schema.Type.INT64, Timestamp.LOGICAL_NAME, "TIMESTAMP(6)"), + Arguments.of(Schema.Type.INT64, null, "LONG"), + Arguments.of(Schema.Type.FLOAT32, null, "FLOAT"), + Arguments.of(Schema.Type.FLOAT64, null, "DOUBLE"), + Arguments.of(Schema.Type.BOOLEAN, null, "BOOLEAN"), + Arguments.of(Schema.Type.STRING, null, "VARCHAR"), + Arguments.of(Schema.Type.BYTES, Decimal.LOGICAL_NAME, "VARCHAR"), + Arguments.of(Schema.Type.BYTES, null, "BINARY")); + } + + private static Stream kafkaTypesToThrowException() { + return Stream.of( + Arguments.of(Schema.Type.ARRAY), + Arguments.of(Schema.Type.MAP), + Arguments.of(Schema.Type.STRUCT)); + } + + private static Stream jsonNodeTypesToMap() { + return Stream.of( + Arguments.of(JsonNodeFactory.instance.nullNode(), Schema.Type.STRING), + Arguments.of(JsonNodeFactory.instance.numberNode((short) 1), Schema.Type.INT64), + Arguments.of(JsonNodeFactory.instance.numberNode(1), Schema.Type.INT64), + Arguments.of(JsonNodeFactory.instance.numberNode(1L), Schema.Type.INT64), + Arguments.of(JsonNodeFactory.instance.numberNode(1.0), Schema.Type.FLOAT64), + Arguments.of(JsonNodeFactory.instance.numberNode(1.0f), Schema.Type.FLOAT32), + Arguments.of(JsonNodeFactory.instance.booleanNode(true), Schema.Type.BOOLEAN), + Arguments.of(JsonNodeFactory.instance.textNode("text"), Schema.Type.STRING), + Arguments.of(JsonNodeFactory.instance.binaryNode(new byte[] {1, 2, 3}), Schema.Type.BYTES), + Arguments.of(JsonNodeFactory.instance.arrayNode(), Schema.Type.ARRAY), + Arguments.of(JsonNodeFactory.instance.objectNode(), Schema.Type.STRUCT)); + } +}