diff --git a/src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java b/src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java new file mode 100644 index 000000000..9131c6245 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java @@ -0,0 +1,377 @@ +package com.snowflake.kafka.connector.records; + +import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.primitiveJsonExample; +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.snowflake.kafka.connector.Utils; +import com.snowflake.kafka.connector.records.RecordService.SnowflakeTableRow; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +class IcebergTableStreamingRecordMapperTest { + private final IcebergTableStreamingRecordMapper mapper = + new IcebergTableStreamingRecordMapper(objectMapper); + private static final ObjectMapper objectMapper = new ObjectMapper(); + + private static final ImmutableMap primitiveJsonAsMap = + ImmutableMap.of( + "id_int8", + 8, + "id_int16", + 16, + "id_int32", + 32, + "id_int64", + 64, + "description", + "dogs are the best", + "rating_float32", + 0.5, + "rating_float64", + 0.25, + "approval", + true); + + private static final String fullMetadataJsonExample = + "{" + + "\"offset\": 10," + + "\"topic\": \"topic\"," + + "\"partition\": 0," + + "\"key\": \"key\"," + + "\"schema_id\": 1," + + "\"key_schema_id\": 2," + + "\"CreateTime\": 3," + + "\"LogAppendTime\": 4," + + "\"SnowflakeConnectorPushTime\": 5," + + "\"headers\": {\"objectAsJsonStringHeader\": {" + + "\"key1\": \"value1\"," + + "\"key2\": \"value2\"" + + "}," + + "\"header2\": \"testheaderstring\"," + + "\"header3\": 3.5}" + + "}"; + + private static final Map fullMetadataJsonAsMap = + ImmutableMap.of( + "offset", + 10, + "topic", + "topic", + "partition", + 0, + "key", + "key", + "schema_id", + 1, + "key_schema_id", + 2, + "CreateTime", + 3, + "LogAppendTime", + 4, + "SnowflakeConnectorPushTime", + 5, + "headers", + ImmutableMap.of( + "objectAsJsonStringHeader", + "{\"key1\":\"value1\",\"key2\":\"value2\"}", + "header2", + "testheaderstring", + "header3", + "3.5")); + + @ParameterizedTest(name = "{0}") + @MethodSource("prepareSchematizationData") + void shouldMapRecord_schematizationEnabled( + String description, SnowflakeTableRow row, Map expected) + throws JsonProcessingException { + // When + Map result = mapper.processSnowflakeRecord(row, true, true); + + // Then + assertThat(result).isEqualTo(expected); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("prepareMetadataData") + void shouldMapMetadata(String description, SnowflakeTableRow row, Map expected) + throws JsonProcessingException { + // When + Map result = mapper.processSnowflakeRecord(row, false, true); + Map resultSchematized = mapper.processSnowflakeRecord(row, true, true); + + // Then + assertThat(result.get(Utils.TABLE_COLUMN_METADATA)).isEqualTo(expected); + assertThat(resultSchematized.get(Utils.TABLE_COLUMN_METADATA)).isEqualTo(expected); + } + + @ParameterizedTest(name = "{0}") + @MethodSource("prepareNoSchematizationData") + void shouldMapRecord_schematizationDisabled( + String description, SnowflakeTableRow row, Map expected) + throws JsonProcessingException { + // When + Map result = mapper.processSnowflakeRecord(row, false, true); + + // Then + assertThat(result).isEqualTo(expected); + } + + private static Stream prepareSchematizationData() throws JsonProcessingException { + return Stream.of( + Arguments.of( + "Empty JSON", + buildRow("{}"), + ImmutableMap.of(Utils.TABLE_COLUMN_METADATA, fullMetadataJsonAsMap)), + Arguments.of( + "Simple JSON", + buildRow("{\"key\": \"value\"}"), + ImmutableMap.of( + "\"KEY\"", "value", Utils.TABLE_COLUMN_METADATA, fullMetadataJsonAsMap)), + Arguments.of( + "Already quoted key JSON", + buildRow("{\"\\\"key\\\"\": \"value\"}"), + ImmutableMap.of( + "\"key\"", "value", Utils.TABLE_COLUMN_METADATA, fullMetadataJsonAsMap)), + Arguments.of( + "Already quoted UPPERCASE key JSON", + buildRow("{\"\\\"KEY\\\"\": \"value\"}"), + ImmutableMap.of( + "\"KEY\"", "value", Utils.TABLE_COLUMN_METADATA, fullMetadataJsonAsMap)), + Arguments.of( + "JSON with different primitive types", + buildRow(primitiveJsonExample), + ImmutableMap.of( + "\"ID_INT8\"", + 8, + "\"ID_INT16\"", + 16, + "\"ID_INT32\"", + 32, + "\"ID_INT64\"", + 64, + "\"DESCRIPTION\"", + "dogs are the best", + "\"RATING_FLOAT32\"", + 0.5, + "\"RATING_FLOAT64\"", + 0.25, + "\"APPROVAL\"", + true, + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Nested array", + buildRow("{\"key\": [" + primitiveJsonExample + ", " + primitiveJsonExample + "]}"), + ImmutableMap.of( + "\"KEY\"", + ImmutableList.of(primitiveJsonAsMap, primitiveJsonAsMap), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Empty nested array", + buildRow("{\"key\": []}"), + ImmutableMap.of( + "\"KEY\"", ImmutableList.of(), Utils.TABLE_COLUMN_METADATA, fullMetadataJsonAsMap)), + Arguments.of( + "Empty object", + buildRow("{\"key\": {}}"), + ImmutableMap.of( + "\"KEY\"", ImmutableMap.of(), Utils.TABLE_COLUMN_METADATA, fullMetadataJsonAsMap)), + Arguments.of( + "Nested object", + buildRow("{\"key\":" + primitiveJsonExample + "}"), + ImmutableMap.of( + "\"KEY\"", primitiveJsonAsMap, Utils.TABLE_COLUMN_METADATA, fullMetadataJsonAsMap)), + Arguments.of( + "Double nested object", + buildRow("{\"key\":{\"key2\":" + primitiveJsonExample + "}}"), + ImmutableMap.of( + "\"KEY\"", + ImmutableMap.of("key2", primitiveJsonAsMap), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Nested objects and primitive types", + buildRow( + primitiveJsonExample.replaceFirst("}", "") + + ",\"key\":" + + primitiveJsonExample + + "}"), + ImmutableMap.of( + "\"ID_INT8\"", + 8, + "\"ID_INT16\"", + 16, + "\"ID_INT32\"", + 32, + "\"ID_INT64\"", + 64, + "\"DESCRIPTION\"", + "dogs are the best", + "\"RATING_FLOAT32\"", + 0.5, + "\"RATING_FLOAT64\"", + 0.25, + "\"APPROVAL\"", + true, + "\"KEY\"", + primitiveJsonAsMap, + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap))); + } + + private static Stream prepareMetadataData() throws JsonProcessingException { + return Stream.of( + Arguments.of("Full metadata", buildRow("{}"), fullMetadataJsonAsMap), + Arguments.of( + "Empty metadata", buildRow("{}", "{}"), ImmutableMap.of("headers", ImmutableMap.of())), + Arguments.of( + "Metadata with null headers", + buildRow("{}", "{\"headers\": null}"), + ImmutableMap.of("headers", ImmutableMap.of())), + Arguments.of( + "Metadata with empty headers", + buildRow("{}", "{\"headers\": {}}"), + ImmutableMap.of("headers", ImmutableMap.of())), + Arguments.of( + "Metadata with headers with null keys", + buildRow("{}", "{\"headers\": {\"key\": null}}"), + ImmutableMap.of("headers", mapWithNullableValuesOf("key", null))), + Arguments.of( + "Metadata with headers with nested null keys", + buildRow("{}", "{\"headers\": {\"key\": {\"key2\": null }}}"), + ImmutableMap.of("headers", ImmutableMap.of("key", "{\"key2\":null}"))), + Arguments.of( + "Metadata with null field value", + buildRow("{}", "{\"offset\": null}"), + mapWithNullableValuesOf("offset", null, "headers", ImmutableMap.of()))); + } + + private static Stream prepareNoSchematizationData() throws JsonProcessingException { + return Stream.of( + Arguments.of( + "Empty JSON", + buildRow("{}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of(), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Simple JSON", + buildRow("{\"key\": \"value\"}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of("key", "value"), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "UPPERCASE key JSON", + buildRow("{\"KEY\": \"value\"}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of("KEY", "value"), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "JSON with different primitive types", + buildRow(primitiveJsonExample), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + primitiveJsonAsMap, + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Nested array", + buildRow("{\"key\": [" + primitiveJsonExample + ", " + primitiveJsonExample + "]}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of("key", ImmutableList.of(primitiveJsonAsMap, primitiveJsonAsMap)), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Empty nested array", + buildRow("{\"key\": []}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of("key", ImmutableList.of()), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Empty object", + buildRow("{\"key\": {}}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of("key", ImmutableMap.of()), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Nested object", + buildRow("{\"key\":" + primitiveJsonExample + "}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of("key", primitiveJsonAsMap), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Double nested object", + buildRow("{\"key\":{\"key2\":" + primitiveJsonExample + "}}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + ImmutableMap.of("key", ImmutableMap.of("key2", primitiveJsonAsMap)), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap)), + Arguments.of( + "Nested objects and primitive types", + buildRow( + primitiveJsonExample.replaceFirst("}", "") + + ",\"key\":" + + primitiveJsonExample + + "}"), + ImmutableMap.of( + Utils.TABLE_COLUMN_CONTENT, + addToMap(primitiveJsonAsMap, "key", primitiveJsonAsMap), + Utils.TABLE_COLUMN_METADATA, + fullMetadataJsonAsMap))); + } + + private static Map addToMap(Map map, String key, T value) { + HashMap newMap = new HashMap<>(map); + newMap.put(key, value); + return newMap; + } + + private static Map mapWithNullableValuesOf(String key, T value) { + Map map = new HashMap<>(); + map.put(key, value); + return map; + } + + private static Map mapWithNullableValuesOf( + String key, T value, String key2, T value2) { + Map map = new HashMap<>(); + map.put(key, value); + map.put(key2, value2); + return map; + } + + private static SnowflakeTableRow buildRow(String content) throws JsonProcessingException { + return buildRow(content, fullMetadataJsonExample); + } + + private static SnowflakeTableRow buildRow(String content, String metadata) + throws JsonProcessingException { + return new SnowflakeTableRow( + new SnowflakeRecordContent(objectMapper.readTree(content)), + objectMapper.readTree(metadata)); + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java index 26b1d0fa7..56dae9200 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java @@ -42,62 +42,6 @@ public abstract class IcebergIngestionIT extends BaseIcebergIT { protected static final PrimitiveJsonRecord emptyPrimitiveJsonRecordValue = // FIXME: there is currently some bug in Iceberg when storing int64 values new PrimitiveJsonRecord(0L, 0L, 0L, /*0L, */ null, 0.0, 0.0, false); - protected static final String primitiveJson = - "{" - + " \"id_int8\": 8," - + " \"id_int16\": 16," - + " \"id_int32\": 32," - + " \"id_int64\": 64," - + " \"description\": \"dogs are the best\"," - + " \"rating_float32\": 0.5," - + " \"rating_float64\": 0.25," - + " \"approval\": true" - + "}"; - - protected static final String primitiveJsonWithSchema = - "{" - + " \"schema\": {" - + " \"type\": \"struct\"," - + " \"fields\": [" - + " {" - + " \"field\": \"id_int8\"," - + " \"type\": \"int8\"" - + " }," - + " {" - + " \"field\": \"id_int16\"," - + " \"type\": \"int16\"" - + " }," - + " {" - + " \"field\": \"id_int32\"," - + " \"type\": \"int32\"" - + " }," - + " {" - + " \"field\": \"id_int64\"," - + " \"type\": \"int64\"" - + " }," - + " {" - + " \"field\": \"description\"," - + " \"type\": \"string\"" - + " }," - + " {" - + " \"field\": \"rating_float32\"," - + " \"type\": \"float\"" - + " }," - + " {" - + " \"field\": \"rating_float64\"," - + " \"type\": \"double\"" - + " }," - + " {" - + " \"field\": \"approval\"," - + " \"type\": \"boolean\"" - + " }" - + " ]," - + " \"optional\": false," - + " \"name\": \"sf.kc.test\"" - + " }," - + " \"payload\": " - + primitiveJson - + "}"; @BeforeEach public void setUp() { diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java index 8e8c63c6f..5cc9c83eb 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionNoSchemaEvolutionIT.java @@ -1,5 +1,7 @@ package com.snowflake.kafka.connector.streaming.iceberg; +import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.primitiveJsonExample; +import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.primitiveJsonWithSchemaExample; import static org.assertj.core.api.Assertions.assertThat; import com.snowflake.kafka.connector.Utils; @@ -50,8 +52,8 @@ protected void createIcebergTable() { private static Stream prepareData() { return Stream.of( - Arguments.of("Primitive JSON with schema", primitiveJsonWithSchema, true), - Arguments.of("Primitive JSON without schema", primitiveJson, false)); + Arguments.of("Primitive JSON with schema", primitiveJsonWithSchemaExample, true), + Arguments.of("Primitive JSON without schema", primitiveJsonExample, false)); } @ParameterizedTest(name = "{0}") diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java index 27c35fb09..e9ac9f1cd 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java @@ -1,5 +1,7 @@ package com.snowflake.kafka.connector.streaming.iceberg; +import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.primitiveJsonExample; +import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.primitiveJsonWithSchemaExample; import static org.assertj.core.api.Assertions.assertThat; import com.snowflake.kafka.connector.Utils; @@ -105,7 +107,7 @@ private static Stream prepareData() { return Stream.of( Arguments.of( "Primitive JSON with schema", - primitiveJsonWithSchema, + primitiveJsonWithSchemaExample, new DescribeTableRow[] { new DescribeTableRow("ID_INT8", "NUMBER(10,0)"), new DescribeTableRow("ID_INT16", "NUMBER(10,0)"), @@ -119,7 +121,7 @@ private static Stream prepareData() { true), Arguments.of( "Primitive JSON without schema", - primitiveJson, + primitiveJsonExample, new DescribeTableRow[] { new DescribeTableRow("ID_INT8", "NUMBER(19,0)"), new DescribeTableRow("ID_INT16", "NUMBER(19,0)"), diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java index 3b4a9983a..fdb74375a 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java @@ -17,6 +17,63 @@ public class PrimitiveJsonRecord { + public static final String primitiveJsonExample = + "{" + + " \"id_int8\": 8," + + " \"id_int16\": 16," + + " \"id_int32\": 32," + + " \"id_int64\": 64," + + " \"description\": \"dogs are the best\"," + + " \"rating_float32\": 0.5," + + " \"rating_float64\": 0.25," + + " \"approval\": true" + + "}"; + + public static final String primitiveJsonWithSchemaExample = + "{" + + " \"schema\": {" + + " \"type\": \"struct\"," + + " \"fields\": [" + + " {" + + " \"field\": \"id_int8\"," + + " \"type\": \"int8\"" + + " }," + + " {" + + " \"field\": \"id_int16\"," + + " \"type\": \"int16\"" + + " }," + + " {" + + " \"field\": \"id_int32\"," + + " \"type\": \"int32\"" + + " }," + + " {" + + " \"field\": \"id_int64\"," + + " \"type\": \"int64\"" + + " }," + + " {" + + " \"field\": \"description\"," + + " \"type\": \"string\"" + + " }," + + " {" + + " \"field\": \"rating_float32\"," + + " \"type\": \"float\"" + + " }," + + " {" + + " \"field\": \"rating_float64\"," + + " \"type\": \"double\"" + + " }," + + " {" + + " \"field\": \"approval\"," + + " \"type\": \"boolean\"" + + " }" + + " ]," + + " \"optional\": false," + + " \"name\": \"sf.kc.test\"" + + " }," + + " \"payload\": " + + primitiveJsonExample + + "}"; + private static final ObjectMapper MAPPER = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false);