Skip to content

Commit

Permalink
Add proper types to mapper
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-wtrefon committed Oct 15, 2024
1 parent 98d14ea commit 9c1fb0a
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,5 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution;

import static org.apache.kafka.connect.data.Schema.Type.ARRAY;
import static org.apache.kafka.connect.data.Schema.Type.BOOLEAN;
import static org.apache.kafka.connect.data.Schema.Type.BYTES;
import static org.apache.kafka.connect.data.Schema.Type.FLOAT32;
import static org.apache.kafka.connect.data.Schema.Type.FLOAT64;
import static org.apache.kafka.connect.data.Schema.Type.INT16;
import static org.apache.kafka.connect.data.Schema.Type.INT32;
import static org.apache.kafka.connect.data.Schema.Type.INT64;
import static org.apache.kafka.connect.data.Schema.Type.STRING;
import static org.apache.kafka.connect.data.Schema.Type.STRUCT;

import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Schema;

Expand All @@ -28,32 +17,5 @@ public String mapToColumnType(Schema.Type kafkaType) {
* @param value JSON node
* @return Kafka type
*/
public Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) {
if (value == null || value.isNull()) {
return STRING;
} else if (value.isNumber()) {
if (value.isShort()) {
return INT16;
} else if (value.isInt()) {
return INT32;
} else if (value.isFloat()) {
return FLOAT32;
} else if (value.isDouble()) {
return FLOAT64;
}
return INT64;
} else if (value.isTextual()) {
return STRING;
} else if (value.isBoolean()) {
return BOOLEAN;
} else if (value.isBinary()) {
return BYTES;
} else if (value.isArray()) {
return ARRAY;
} else if (value.isObject()) {
return STRUCT;
} else {
return null;
}
}
public abstract Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import static org.apache.kafka.connect.data.Schema.Type.ARRAY;
import static org.apache.kafka.connect.data.Schema.Type.BOOLEAN;
import static org.apache.kafka.connect.data.Schema.Type.BYTES;
import static org.apache.kafka.connect.data.Schema.Type.FLOAT32;
import static org.apache.kafka.connect.data.Schema.Type.FLOAT64;
import static org.apache.kafka.connect.data.Schema.Type.INT64;
import static org.apache.kafka.connect.data.Schema.Type.STRING;
import static org.apache.kafka.connect.data.Schema.Type.STRUCT;

import com.snowflake.kafka.connector.internal.streaming.schemaevolution.ColumnTypeMapper;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -13,20 +23,20 @@ public String mapToColumnType(Schema.Type kafkaType, String schemaName) {
switch (kafkaType) {
case INT8:
case INT16:
return "BIGINT";
return "INT";
case INT32:
if (Date.LOGICAL_NAME.equals(schemaName)) {
return "DATE";
} else if (Time.LOGICAL_NAME.equals(schemaName)) {
return "TIME(6)";
} else {
return "BIGINT";
return "INT";
}
case INT64:
if (Timestamp.LOGICAL_NAME.equals(schemaName)) {
return "TIMESTAMP(6)";
} else {
return "BIGINT";
return "LONG";
}
case FLOAT32:
return "FLOAT";
Expand All @@ -45,7 +55,40 @@ public String mapToColumnType(Schema.Type kafkaType, String schemaName) {
case ARRAY:
default:
// MAP and STRUCT will go here
throw new IllegalStateException("Arrays, struct and map not supported!");
throw new IllegalArgumentException("Arrays, struct and map not supported!");
}
}

/**
* Map the JSON node type to Kafka type
*
* @param value JSON node
* @return Kafka type
*/
@Override
public Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) {
if (value == null || value.isNull()) {
return STRING;
} else if (value.isNumber()) {
if (value.isFloat()) {
return FLOAT32;
} else if (value.isDouble()) {
return FLOAT64;
}
return INT64; // short, int, long we treat as 64-bit numbers as from the value we can't infer
// smaller types
} else if (value.isTextual()) {
return STRING;
} else if (value.isBoolean()) {
return BOOLEAN;
} else if (value.isBinary()) {
return BYTES;
} else if (value.isArray()) {
return ARRAY;
} else if (value.isObject()) {
return STRUCT;
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.snowflake;

import static org.apache.kafka.connect.data.Schema.Type.ARRAY;
import static org.apache.kafka.connect.data.Schema.Type.BOOLEAN;
import static org.apache.kafka.connect.data.Schema.Type.BYTES;
import static org.apache.kafka.connect.data.Schema.Type.FLOAT32;
import static org.apache.kafka.connect.data.Schema.Type.FLOAT64;
import static org.apache.kafka.connect.data.Schema.Type.INT16;
import static org.apache.kafka.connect.data.Schema.Type.INT32;
import static org.apache.kafka.connect.data.Schema.Type.INT64;
import static org.apache.kafka.connect.data.Schema.Type.STRING;
import static org.apache.kafka.connect.data.Schema.Type.STRUCT;

import com.snowflake.kafka.connector.internal.streaming.schemaevolution.ColumnTypeMapper;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -57,4 +69,33 @@ public String mapToColumnType(Schema.Type kafkaType, String schemaName) {
return "VARIANT";
}
}

public Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) {
if (value == null || value.isNull()) {
return STRING;
} else if (value.isNumber()) {
if (value.isShort()) {
return INT16;
} else if (value.isInt()) {
return INT32;
} else if (value.isFloat()) {
return FLOAT32;
} else if (value.isDouble()) {
return FLOAT64;
}
return INT64;
} else if (value.isTextual()) {
return STRING;
} else if (value.isBoolean()) {
return BOOLEAN;
} else if (value.isBinary()) {
return BYTES;
} else if (value.isArray()) {
return ARRAY;
} else if (value.isObject()) {
return STRUCT;
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.stream.Stream;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

class IcebergColumnTypeMapperTest {

private final IcebergColumnTypeMapper mapper = new IcebergColumnTypeMapper();

@ParameterizedTest(name = "should map Kafka type {0} to Snowflake column type {2}")
@MethodSource("kafkaTypesToMap")
void shouldMapKafkaTypeToSnowflakeColumnType(
Schema.Type kafkaType, String schemaName, String expectedSnowflakeType) {
assertThat(mapper.mapToColumnType(kafkaType, schemaName)).isEqualTo(expectedSnowflakeType);
}

@ParameterizedTest()
@MethodSource("jsonNodeTypesToMap")
void shouldMapJsonNodeTypeToKafkaType(JsonNode value, Schema.Type expectedKafkaType) {
assertThat(mapper.mapJsonNodeTypeToKafkaType(value)).isEqualTo(expectedKafkaType);
}

@ParameterizedTest()
@MethodSource("kafkaTypesToThrowException")
void shouldThrowExceptionWhenMappingUnsupportedKafkaType(Schema.Type kafkaType) {
assertThatThrownBy(() -> mapper.mapToColumnType(kafkaType, null))
.isInstanceOf(IllegalArgumentException.class);
}

private static Stream<Arguments> kafkaTypesToMap() {
return Stream.of(
Arguments.of(Schema.Type.INT8, null, "INT"),
Arguments.of(Schema.Type.INT16, null, "INT"),
Arguments.of(Schema.Type.INT32, Date.LOGICAL_NAME, "DATE"),
Arguments.of(Schema.Type.INT32, Time.LOGICAL_NAME, "TIME(6)"),
Arguments.of(Schema.Type.INT32, null, "INT"),
Arguments.of(Schema.Type.INT64, Timestamp.LOGICAL_NAME, "TIMESTAMP(6)"),
Arguments.of(Schema.Type.INT64, null, "LONG"),
Arguments.of(Schema.Type.FLOAT32, null, "FLOAT"),
Arguments.of(Schema.Type.FLOAT64, null, "DOUBLE"),
Arguments.of(Schema.Type.BOOLEAN, null, "BOOLEAN"),
Arguments.of(Schema.Type.STRING, null, "VARCHAR"),
Arguments.of(Schema.Type.BYTES, Decimal.LOGICAL_NAME, "VARCHAR"),
Arguments.of(Schema.Type.BYTES, null, "BINARY"));
}

private static Stream<Arguments> kafkaTypesToThrowException() {
return Stream.of(
Arguments.of(Schema.Type.ARRAY),
Arguments.of(Schema.Type.MAP),
Arguments.of(Schema.Type.STRUCT));
}

private static Stream<Arguments> jsonNodeTypesToMap() {
return Stream.of(
Arguments.of(JsonNodeFactory.instance.nullNode(), Schema.Type.STRING),
Arguments.of(JsonNodeFactory.instance.numberNode((short) 1), Schema.Type.INT64),
Arguments.of(JsonNodeFactory.instance.numberNode(1), Schema.Type.INT64),
Arguments.of(JsonNodeFactory.instance.numberNode(1L), Schema.Type.INT64),
Arguments.of(JsonNodeFactory.instance.numberNode(1.0), Schema.Type.FLOAT64),
Arguments.of(JsonNodeFactory.instance.numberNode(1.0f), Schema.Type.FLOAT32),
Arguments.of(JsonNodeFactory.instance.booleanNode(true), Schema.Type.BOOLEAN),
Arguments.of(JsonNodeFactory.instance.textNode("text"), Schema.Type.STRING),
Arguments.of(JsonNodeFactory.instance.binaryNode(new byte[] {1, 2, 3}), Schema.Type.BYTES),
Arguments.of(JsonNodeFactory.instance.arrayNode(), Schema.Type.ARRAY),
Arguments.of(JsonNodeFactory.instance.objectNode(), Schema.Type.STRUCT));
}
}

0 comments on commit 9c1fb0a

Please sign in to comment.