diff --git a/pom.xml b/pom.xml
index f7b2dc9f3..ffd12cf21 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,14 +71,6 @@
true
-
- snowflake-nexus-snapshots
- Snowflake Nexus snapshots
- https://nexus.int.snowflakecomputing.com/repository/Snapshots/
-
- true
-
-
diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java
index 70ef9130e..3b82c2d98 100644
--- a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java
+++ b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java
@@ -28,12 +28,10 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException;
@@ -287,14 +285,7 @@ private Map getMapFromJsonNodeForStreamingIngest(JsonNode node)
String columnName = columnNames.next();
JsonNode columnNode = node.get(columnName);
Object columnValue;
- if (columnNode.isArray()) {
- List itemList = new ArrayList<>();
- ArrayNode arrayNode = (ArrayNode) columnNode;
- for (JsonNode e : arrayNode) {
- itemList.add(e.isTextual() ? e.textValue() : MAPPER.writeValueAsString(e));
- }
- columnValue = itemList;
- } else if (columnNode.isTextual()) {
+ if (columnNode.isTextual()) {
columnValue = columnNode.textValue();
} else if (columnNode.isNull()) {
columnValue = null;
diff --git a/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java
index 2908378d0..ac33b3126 100644
--- a/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java
+++ b/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java
@@ -18,7 +18,9 @@ public class SchematizationTestUtils {
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT32", "FLOAT");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT64", "FLOAT");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("APPROVAL", "BOOLEAN");
- SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY", "ARRAY");
+ SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_STRING", "ARRAY");
+ SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_INT", "ARRAY");
+ SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_JSON", "ARRAY");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_MAP", "VARIANT");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RECORD_METADATA", "VARIANT");
}
@@ -54,7 +56,11 @@ public class SchematizationTestUtils {
CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT32", 0.99);
CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT64", 0.99);
CONTENT_FOR_AVRO_TABLE_CREATION.put("APPROVAL", true);
- CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY", "[\"a\",\"b\"]");
+ CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY_STRING", "[\"a\",\"b\"]");
+ CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY_INT", "[1,2]");
+ CONTENT_FOR_AVRO_TABLE_CREATION.put(
+ "INFO_ARRAY_JSON",
+ "[null,\"{\\\"a\\\":1,\\\"b\\\":null,\\\"c\\\":null,\\\"d\\\":\\\"89asda9s0a\\\"}\"]");
CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_MAP", "{\"field\":3}");
CONTENT_FOR_AVRO_TABLE_CREATION.put("RECORD_METADATA", "RECORD_METADATA_PLACE_HOLDER");
}
diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java
index 64d70f49b..b0680efb1 100644
--- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java
+++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java
@@ -1230,7 +1230,9 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception {
.field("rating_float32", Schema.FLOAT32_SCHEMA)
.field("rating_float64", Schema.FLOAT64_SCHEMA)
.field("approval", Schema.BOOLEAN_SCHEMA)
- .field("info_array", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
+ .field("info_array_string", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
+ .field("info_array_int", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
+ .field("info_array_json", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build())
.field(
"info_map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build());
@@ -1244,7 +1246,11 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception {
.put("rating_float32", 0.99f)
.put("rating_float64", 0.99d)
.put("approval", true)
- .put("info_array", Arrays.asList("a", "b"))
+ .put("info_array_string", Arrays.asList("a", "b"))
+ .put("info_array_int", Arrays.asList(1, 2))
+ .put(
+ "info_array_json",
+ Arrays.asList(null, "{\"a\": 1, \"b\": null, \"c\": null, \"d\": \"89asda9s0a\"}"))
.put("info_map", Collections.singletonMap("field", 3));
SchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
diff --git a/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java b/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java
index 822463b67..2ab7c1326 100644
--- a/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java
+++ b/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java
@@ -258,6 +258,26 @@ public void testSchematizationStringField() throws JsonProcessingException {
assert got.get("\"ANSWER\"").equals("42");
}
+ @Test
+ public void testSchematizationArrayOfObject() throws JsonProcessingException {
+ RecordService service = new RecordService();
+ SnowflakeJsonConverter jsonConverter = new SnowflakeJsonConverter();
+
+ service.setEnableSchematization(true);
+ String value =
+ "{\"players\":[{\"name\":\"John Doe\",\"age\":30},{\"name\":\"Jane Doe\",\"age\":30}]}";
+ byte[] valueContents = (value).getBytes(StandardCharsets.UTF_8);
+ SchemaAndValue sv = jsonConverter.toConnectData(topic, valueContents);
+
+ SinkRecord record =
+ new SinkRecord(
+ topic, partition, Schema.STRING_SCHEMA, "string", sv.schema(), sv.value(), partition);
+
+ Map got = service.getProcessedRecordForStreamingIngest(record);
+ assert got.get("\"PLAYERS\"")
+ .equals("[{\"name\":\"John Doe\",\"age\":30},{\"name\":\"Jane Doe\",\"age\":30}]");
+ }
+
@Test
public void testColumnNameFormatting() throws JsonProcessingException {
RecordService service = new RecordService();