diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java index 70ef9130e2..3b82c2d987 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java @@ -28,12 +28,10 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.text.SimpleDateFormat; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.TimeZone; import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; @@ -287,14 +285,7 @@ private Map getMapFromJsonNodeForStreamingIngest(JsonNode node) String columnName = columnNames.next(); JsonNode columnNode = node.get(columnName); Object columnValue; - if (columnNode.isArray()) { - List itemList = new ArrayList<>(); - ArrayNode arrayNode = (ArrayNode) columnNode; - for (JsonNode e : arrayNode) { - itemList.add(e.isTextual() ? e.textValue() : MAPPER.writeValueAsString(e)); - } - columnValue = itemList; - } else if (columnNode.isTextual()) { + if (columnNode.isTextual()) { columnValue = columnNode.textValue(); } else if (columnNode.isNull()) { columnValue = null; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java index 2908378d04..ac33b31263 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java @@ -18,7 +18,9 @@ public class SchematizationTestUtils { SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT32", "FLOAT"); SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT64", "FLOAT"); SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("APPROVAL", "BOOLEAN"); - SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY", "ARRAY"); + SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_STRING", "ARRAY"); + SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_INT", "ARRAY"); + SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_JSON", "ARRAY"); SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_MAP", "VARIANT"); SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RECORD_METADATA", "VARIANT"); } @@ -54,7 +56,11 @@ public class SchematizationTestUtils { CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT32", 0.99); CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT64", 0.99); CONTENT_FOR_AVRO_TABLE_CREATION.put("APPROVAL", true); - CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY", "[\"a\",\"b\"]"); + CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY_STRING", "[\"a\",\"b\"]"); + CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY_INT", "[1,2]"); + CONTENT_FOR_AVRO_TABLE_CREATION.put( + "INFO_ARRAY_JSON", + "[null,\"{\\\"a\\\":1,\\\"b\\\":null,\\\"c\\\":null,\\\"d\\\":\\\"89asda9s0a\\\"}\"]"); CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_MAP", "{\"field\":3}"); CONTENT_FOR_AVRO_TABLE_CREATION.put("RECORD_METADATA", "RECORD_METADATA_PLACE_HOLDER"); } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index 64d70f49b1..b0680efb17 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -1230,7 +1230,9 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception { .field("rating_float32", Schema.FLOAT32_SCHEMA) .field("rating_float64", Schema.FLOAT64_SCHEMA) .field("approval", Schema.BOOLEAN_SCHEMA) - .field("info_array", SchemaBuilder.array(Schema.STRING_SCHEMA).build()) + .field("info_array_string", SchemaBuilder.array(Schema.STRING_SCHEMA).build()) + .field("info_array_int", SchemaBuilder.array(Schema.INT32_SCHEMA).build()) + .field("info_array_json", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build()) .field( "info_map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build()); @@ -1244,7 +1246,11 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception { .put("rating_float32", 0.99f) .put("rating_float64", 0.99d) .put("approval", true) - .put("info_array", Arrays.asList("a", "b")) + .put("info_array_string", Arrays.asList("a", "b")) + .put("info_array_int", Arrays.asList(1, 2)) + .put( + "info_array_json", + Arrays.asList(null, "{\"a\": 1, \"b\": null, \"c\": null, \"d\": \"89asda9s0a\"}")) .put("info_map", Collections.singletonMap("field", 3)); SchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient(); diff --git a/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java b/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java index 822463b67f..2ab7c13261 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java @@ -258,6 +258,26 @@ public void testSchematizationStringField() throws JsonProcessingException { assert got.get("\"ANSWER\"").equals("42"); } + @Test + public void testSchematizationArrayOfObject() throws JsonProcessingException { + RecordService service = new RecordService(); + SnowflakeJsonConverter jsonConverter = new SnowflakeJsonConverter(); + + service.setEnableSchematization(true); + String value = + "{\"players\":[{\"name\":\"John Doe\",\"age\":30},{\"name\":\"Jane Doe\",\"age\":30}]}"; + byte[] valueContents = (value).getBytes(StandardCharsets.UTF_8); + SchemaAndValue sv = jsonConverter.toConnectData(topic, valueContents); + + SinkRecord record = + new SinkRecord( + topic, partition, Schema.STRING_SCHEMA, "string", sv.schema(), sv.value(), partition); + + Map got = service.getProcessedRecordForStreamingIngest(record); + assert got.get("\"PLAYERS\"") + .equals("[{\"name\":\"John Doe\",\"age\":30},{\"name\":\"Jane Doe\",\"age\":30}]"); + } + @Test public void testColumnNameFormatting() throws JsonProcessingException { RecordService service = new RecordService();