org.apache.avro
avro
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ApacheIcebergColumnSchema.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ApacheIcebergColumnSchema.java
new file mode 100644
index 000000000..650ab64d7
--- /dev/null
+++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ApacheIcebergColumnSchema.java
@@ -0,0 +1,24 @@
+package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;
+
+import org.apache.iceberg.types.Type;
+
+/** Wrapper class for Iceberg schema retrieved from channel. */
+public class ApacheIcebergColumnSchema {
+
+ private final Type schema;
+
+ private final String columnName;
+
+ public ApacheIcebergColumnSchema(Type schema, String columnName) {
+ this.schema = schema;
+ this.columnName = columnName.toUpperCase();
+ }
+
+ public Type getSchema() {
+ return schema;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+}
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java
new file mode 100644
index 000000000..93495cde7
--- /dev/null
+++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java
@@ -0,0 +1,16 @@
+package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;
+
+/** Class with object types compatible with Snowflake Iceberg table */
+public class IcebergColumnTree {
+
+ private final IcebergFieldNode rootNode;
+
+ public IcebergColumnTree(ApacheIcebergColumnSchema columnSchema) {
+ this.rootNode = new IcebergFieldNode(columnSchema.getColumnName(), columnSchema.getSchema());
+ }
+
+ public String buildQuery() {
+ StringBuilder sb = new StringBuilder();
+ return rootNode.buildQuery(sb, "ROOT_NODE").toString();
+ }
+}
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java
index 206c21b39..1bfbed5ce 100644
--- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java
+++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java
@@ -11,6 +11,8 @@
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.ColumnTypeMapper;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
@@ -18,6 +20,52 @@
import org.apache.kafka.connect.data.Timestamp;
public class IcebergColumnTypeMapper extends ColumnTypeMapper {
+
+ /**
+ * See Data types for
+ * Apache Iceberg™ tables
+ */
+ public static String mapToSnowflakeDataType(Type apacheIcebergType) {
+ switch (apacheIcebergType.typeId()) {
+ case BOOLEAN:
+ return "BOOLEAN";
+ case INTEGER:
+ return "NUMBER(10,0)";
+ case LONG:
+ return "NUMBER(19,0)";
+ case FLOAT:
+ case DOUBLE:
+ return "FLOAT";
+ case DATE:
+ return "DATE";
+ case TIME:
+ return "TIME(6)";
+ case TIMESTAMP:
+ Types.TimestampType timestamp = (Types.TimestampType) apacheIcebergType;
+ return timestamp.shouldAdjustToUTC() ? "TIMESTAMP_LTZ" : "TIMESTAMP";
+ case STRING:
+ return "VARCHAR(16777216)";
+ case UUID:
+ return "BINARY(16)";
+ case FIXED:
+ throw new IllegalArgumentException("FIXED column type not supported!");
+ case BINARY:
+ return "BINARY";
+ case DECIMAL:
+ Types.DecimalType decimal = (Types.DecimalType) apacheIcebergType;
+ return decimal.toString().toUpperCase();
+ case STRUCT:
+ return "OBJECT";
+ case LIST:
+ return "ARRAY";
+ case MAP:
+ return "MAP";
+ default:
+ throw new IllegalArgumentException(
+ "Fail unsupported datatype! - " + apacheIcebergType.typeId());
+ }
+ }
+
@Override
public String mapToColumnType(Schema.Type kafkaType, String schemaName) {
switch (kafkaType) {
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergDataTypeParser.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergDataTypeParser.java
new file mode 100644
index 000000000..ad6b24d02
--- /dev/null
+++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergDataTypeParser.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved.
+ */
+
+package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.Nonnull;
+import org.apache.iceberg.parquet.TypeToMessageType;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.JsonUtil;
+
+/**
+ * This class is used to Iceberg data type (include primitive types and nested types) serialization
+ * and deserialization.
+ *
+ * This code is modified from
+ * GlobalServices/modules/data-lake/datalake-api/src/main/java/com/snowflake/metadata/iceberg
+ * /IcebergDataTypeParser.java
+ */
+public class IcebergDataTypeParser {
+ public static final String ELEMENT = "element";
+ public static final String KEY = "key";
+ public static final String VALUE = "value";
+ private static final String TYPE = "type";
+ private static final String STRUCT = "struct";
+ private static final String LIST = "list";
+ private static final String MAP = "map";
+ private static final String FIELDS = "fields";
+ private static final String DOC = "doc";
+ private static final String NAME = "name";
+ private static final String ID = "id";
+ private static final String ELEMENT_ID = "element-id";
+ private static final String KEY_ID = "key-id";
+ private static final String VALUE_ID = "value-id";
+ private static final String REQUIRED = "required";
+ private static final String ELEMENT_REQUIRED = "element-required";
+ private static final String VALUE_REQUIRED = "value-required";
+
+ private static final String EMPTY_FIELD_CHAR = "\\";
+
+ /** Object mapper for this class */
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ /** Util class that contains the mapping between Iceberg data type and Parquet data type */
+ private static final TypeToMessageType typeToMessageType = new TypeToMessageType();
+
+ /**
+ * Get Iceberg data type information by deserialization.
+ *
+ * @param icebergDataType string representation of Iceberg data type
+ * @return Iceberg data type
+ */
+ public static Type deserializeIcebergType(String icebergDataType) {
+ try {
+ JsonNode json = MAPPER.readTree(icebergDataType);
+ return getTypeFromJson(json);
+ } catch (JsonProcessingException e) {
+ throw new IllegalArgumentException(
+ String.format("Failed to deserialize Iceberg data type: %s", icebergDataType));
+ }
+ }
+
+ /**
+ * Get corresponding Iceberg data type from JsonNode.
+ *
+ * @param jsonNode JsonNode parsed from Iceberg type string.
+ * @return Iceberg data type
+ */
+ public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) {
+ if (jsonNode.isTextual()) {
+ return Types.fromPrimitiveString(jsonNode.asText());
+ } else if (jsonNode.isObject()) {
+ if (!jsonNode.has(TYPE)) {
+ throw new IllegalArgumentException(
+ String.format("Missing key '%s' in schema: %s", TYPE, jsonNode));
+ }
+ String type = jsonNode.get(TYPE).asText();
+ if (STRUCT.equals(type)) {
+ return structFromJson(jsonNode);
+ } else if (LIST.equals(type)) {
+ return listFromJson(jsonNode);
+ } else if (MAP.equals(type)) {
+ return mapFromJson(jsonNode);
+ }
+ throw new IllegalArgumentException(
+ String.format("Cannot parse Iceberg type: %s, schema: %s", type, jsonNode));
+ }
+
+ throw new IllegalArgumentException("Cannot parse Iceberg type from schema: " + jsonNode);
+ }
+
+ /**
+ * Get Iceberg struct type information from JsonNode.
+ *
+ * @param json JsonNode parsed from Iceberg type string.
+ * @return struct type
+ */
+ public static @Nonnull Types.StructType structFromJson(@Nonnull JsonNode json) {
+ if (!json.has(FIELDS)) {
+ throw new IllegalArgumentException(
+ String.format("Missing key '%s' in schema: %s", FIELDS, json));
+ }
+ JsonNode fieldArray = json.get(FIELDS);
+ Preconditions.checkArgument(fieldArray != null, "Field array cannot be null");
+ Preconditions.checkArgument(
+ fieldArray.isArray(), "Cannot parse struct fields from non-array: %s", fieldArray);
+
+ List fields = Lists.newArrayListWithExpectedSize(fieldArray.size());
+ Iterator iterator = fieldArray.elements();
+ while (iterator.hasNext()) {
+ JsonNode field = iterator.next();
+ Preconditions.checkArgument(
+ field.isObject(), "Cannot parse struct field from non-object: %s", field);
+
+ int id = JsonUtil.getInt(ID, field);
+
+ /* TypeToMessageType throws on empty field name, use a backslash to represent it and escape remaining backslash. */
+ String name =
+ JsonUtil.getString(NAME, field)
+ .replace(EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR);
+ if (name.isEmpty()) {
+ name = EMPTY_FIELD_CHAR;
+ }
+ Type type = getTypeFromJson(field.get(TYPE));
+
+ String doc = JsonUtil.getStringOrNull(DOC, field);
+ boolean isRequired = JsonUtil.getBool(REQUIRED, field);
+ if (isRequired) {
+ fields.add(Types.NestedField.required(id, name, type, doc));
+ } else {
+ fields.add(Types.NestedField.optional(id, name, type, doc));
+ }
+ }
+
+ return Types.StructType.of(fields);
+ }
+
+ /**
+ * Get Iceberg list type information from JsonNode.
+ *
+ * @param json JsonNode parsed from Iceberg type string.
+ * @return list type
+ */
+ public static Types.ListType listFromJson(JsonNode json) {
+ int elementId = JsonUtil.getInt(ELEMENT_ID, json);
+ Type elementType = getTypeFromJson(json.get(ELEMENT));
+ boolean isRequired = JsonUtil.getBool(ELEMENT_REQUIRED, json);
+
+ if (isRequired) {
+ return Types.ListType.ofRequired(elementId, elementType);
+ } else {
+ return Types.ListType.ofOptional(elementId, elementType);
+ }
+ }
+
+ /**
+ * Get Iceberg map type from JsonNode.
+ *
+ * @param json JsonNode parsed from Iceberg type string.
+ * @return map type
+ */
+ public static Types.MapType mapFromJson(JsonNode json) {
+ int keyId = JsonUtil.getInt(KEY_ID, json);
+ Type keyType = getTypeFromJson(json.get(KEY));
+
+ int valueId = JsonUtil.getInt(VALUE_ID, json);
+ Type valueType = getTypeFromJson(json.get(VALUE));
+
+ boolean isRequired = JsonUtil.getBool(VALUE_REQUIRED, json);
+
+ if (isRequired) {
+ return Types.MapType.ofRequired(keyId, valueId, keyType, valueType);
+ } else {
+ return Types.MapType.ofOptional(keyId, valueId, keyType, valueType);
+ }
+ }
+}
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java
new file mode 100644
index 000000000..fe8e96754
--- /dev/null
+++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java
@@ -0,0 +1,86 @@
+package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;
+
+import java.util.LinkedHashMap;
+import java.util.stream.Collectors;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+class IcebergFieldNode {
+
+ public final String name;
+
+ public final String snowflakeIcebergType;
+
+ public final LinkedHashMap children;
+
+ public IcebergFieldNode(String name, Type apacheIcebergSchema) {
+ this.name = name.toUpperCase();
+ this.snowflakeIcebergType = IcebergColumnTypeMapper.mapToSnowflakeDataType(apacheIcebergSchema);
+ this.children = produceChildren(apacheIcebergSchema);
+ }
+
+ private LinkedHashMap produceChildren(Type apacheIcebergSchema) {
+ // primitives must not have children
+ if (apacheIcebergSchema.isPrimitiveType()) {
+ return new LinkedHashMap<>();
+ }
+ Type.NestedType nestedField = apacheIcebergSchema.asNestedType();
+ return nestedField.fields().stream()
+ .collect(
+ Collectors.toMap(
+ Types.NestedField::name,
+ this::fromNestedField,
+ // It's impossible to have two same keys
+ (v1, v2) -> {
+ throw new IllegalArgumentException("Two same keys: " + v1);
+ },
+ LinkedHashMap::new));
+ }
+
+ private IcebergFieldNode fromNestedField(Types.NestedField field) {
+ return new IcebergFieldNode(field.name(), field.type());
+ }
+
+ /**
+ * @param sb StringBuilder
+ * @param parentType Snowflake Iceberg table compatible type. If a root node is a parent then
+ * "ROOT_NODE" is passed, because we always generate root nodes column name.
+ * @return StringBuilder with appended query elements
+ */
+ StringBuilder buildQuery(StringBuilder sb, String parentType) {
+ appendNameAndType(sb, parentType);
+ if (!children.isEmpty()) {
+ sb.append("(");
+ appendChildren(sb, this.snowflakeIcebergType);
+ sb.append(")");
+ }
+ return sb;
+ }
+
+ private StringBuilder appendNameAndType(StringBuilder sb, String parentType) {
+ if (parentType.equals("ARRAY") || parentType.equals("MAP")) {
+ sb.append(snowflakeIcebergType);
+ } else {
+ sb.append(name);
+ sb.append(" ");
+ sb.append(snowflakeIcebergType);
+ }
+ return sb;
+ }
+
+ private StringBuilder appendChildren(StringBuilder sb, String parentType) {
+ children.forEach(
+ (name, node) -> {
+ node.buildQuery(sb, parentType);
+ sb.append(", ");
+ });
+ removeLastSeparator(sb);
+ return sb;
+ }
+
+ private StringBuilder removeLastSeparator(StringBuilder sb) {
+ sb.deleteCharAt(sb.length() - 1);
+ sb.deleteCharAt(sb.length() - 1);
+ return sb;
+ }
+}
diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchema.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchema.java
new file mode 100644
index 000000000..e4ed70f3e
--- /dev/null
+++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchema.java
@@ -0,0 +1,9 @@
+package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;
+
+import java.util.List;
+
+/** Wrapper for multiple columns, not necessary all columns that are in the table */
+public class IcebergTableSchema {
+
+ private List columns;
+}
diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java
index 11dd31868..e35f2a15d 100644
--- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java
+++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java
@@ -49,7 +49,6 @@ public void setUp() {
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(ICEBERG_ENABLED, "TRUE");
config.put(ENABLE_SCHEMATIZATION_CONFIG, isSchemaEvolutionEnabled().toString());
-
createIcebergTable();
enableSchemaEvolution(tableName);
@@ -119,7 +118,6 @@ protected SinkRecord createKafkaRecord(String jsonString, int offset, boolean wi
+ "ORDER BY offset_extracted asc;";
protected List> selectAllSchematizedRecords() {
-
return select(tableName, selectAllSortByOffset, PrimitiveJsonRecord::fromSchematizedResult);
}
diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java
index 08f0bf2f3..8ca9fa625 100644
--- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java
+++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java
@@ -81,6 +81,50 @@ void shouldEvolveSchemaAndInsertRecords(
assertRecordsInTable();
}
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("prepareData")
+ @Disabled
+ // Schema evolution for structured types is not yet supported
+ void shouldEvolveSchemaAndInsertRecords_structuredData(
+ String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema)
+ throws Exception {
+ // start off with just one column
+ List rows = describeTable(tableName);
+ assertThat(rows)
+ .hasSize(1)
+ .extracting(DescribeTableRow::getColumn)
+ .contains(Utils.TABLE_COLUMN_METADATA);
+
+ SinkRecord record = createKafkaRecord(message, 0, withSchema);
+ service.insert(Collections.singletonList(record));
+ waitForOffset(-1);
+ rows = describeTable(tableName);
+ assertThat(rows.size()).isEqualTo(9);
+
+ // don't check metadata column schema, we have different tests for that
+ rows =
+ rows.stream()
+ .filter(r -> !r.getColumn().equals(Utils.TABLE_COLUMN_METADATA))
+ .collect(Collectors.toList());
+
+ assertThat(rows).containsExactlyInAnyOrder(expectedSchema);
+
+ // resend and store same record without any issues now
+ service.insert(Collections.singletonList(record));
+ waitForOffset(1);
+
+ // and another record with same schema
+ service.insert(Collections.singletonList(createKafkaRecord(message, 1, withSchema)));
+ waitForOffset(2);
+
+ String testStruct = "{ \"testStruct\": {" + "\"k1\" : 1," + "\"k2\" : 2" + "} " + "}";
+
+ service.insert(Collections.singletonList(createKafkaRecord(testStruct, 2, false)));
+
+ service.insert(Collections.singletonList(createKafkaRecord(testStruct, 2, false)));
+ waitForOffset(3);
+ }
+
private void assertRecordsInTable() {
List> recordsWithMetadata =
selectAllSchematizedRecords();
diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java b/src/test/java/com/snowflake/kafka/connector/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java
new file mode 100644
index 000000000..6d5d9a52d
--- /dev/null
+++ b/src/test/java/com/snowflake/kafka/connector/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java
@@ -0,0 +1,71 @@
+package com.snowflake.kafka.connector.streaming.schemaevolution.iceberg;
+
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+import com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg.ApacheIcebergColumnSchema;
+import com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg.IcebergColumnTree;
+import com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg.IcebergDataTypeParser;
+import java.util.stream.Stream;
+import org.apache.iceberg.types.Type;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+public class ParseIcebergColumnTreeTest {
+
+ @ParameterizedTest
+ @MethodSource("icebergSchemas")
+ void parseFromApacheIcebergSchema(String plainIcebergSchema, String expectedQuery) {
+ // given
+ Type type = IcebergDataTypeParser.deserializeIcebergType(plainIcebergSchema);
+ // when
+ ApacheIcebergColumnSchema apacheSchema =
+ new ApacheIcebergColumnSchema(type, "TEST_COLUMN_NAME");
+ IcebergColumnTree tree = new IcebergColumnTree(apacheSchema);
+ // then
+ Assertions.assertEquals(expectedQuery, tree.buildQuery());
+ }
+
+ static Stream icebergSchemas() {
+ return Stream.of(
+ // primitives
+ arguments("\"boolean\"", "TEST_COLUMN_NAME BOOLEAN"),
+ arguments("\"int\"", "TEST_COLUMN_NAME NUMBER(10,0)"),
+ arguments("\"long\"", "TEST_COLUMN_NAME NUMBER(19,0)"),
+ arguments("\"float\"", "TEST_COLUMN_NAME FLOAT"),
+ arguments("\"double\"", "TEST_COLUMN_NAME FLOAT"),
+ arguments("\"date\"", "TEST_COLUMN_NAME DATE"),
+ arguments("\"time\"", "TEST_COLUMN_NAME TIME(6)"),
+ arguments("\"timestamptz\"", "TEST_COLUMN_NAME TIMESTAMP_LTZ"),
+ arguments("\"timestamp\"", "TEST_COLUMN_NAME TIMESTAMP"),
+ arguments("\"string\"", "TEST_COLUMN_NAME VARCHAR(16777216)"),
+ arguments("\"uuid\"", "TEST_COLUMN_NAME BINARY(16)"),
+ arguments("\"binary\"", "TEST_COLUMN_NAME BINARY"),
+ arguments("\"decimal(10,5)\"", "TEST_COLUMN_NAME DECIMAL(10, 5)"),
+ // simple struct
+ arguments(
+ "{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}]}",
+ "TEST_COLUMN_NAME OBJECT(K1 NUMBER(10,0), K2 NUMBER(10,0))"),
+ // list
+ arguments(
+ "{\"type\":\"list\",\"element-id\":23,\"element\":\"long\",\"element-required\":false}",
+ "TEST_COLUMN_NAME ARRAY(NUMBER(19,0))"),
+ // map
+ arguments(
+ "{\"type\":\"map\",\"key-id\":4,\"key\":\"int\",\"value-id\":5,\"value\":\"string\",\"value-required\":false}",
+ "TEST_COLUMN_NAME MAP(NUMBER(10,0), VARCHAR(16777216))"),
+ // structs with nested objects
+ arguments(
+ "{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"},{\"id\":25,\"name\":\"nested_object\",\"required\":false,\"type\":{\"type\":\"struct\",\"fields\":[{\"id\":26,\"name\":\"nested_key1\",\"required\":false,\"type\":\"string\"},{\"id\":27,\"name\":\"nested_key2\",\"required\":false,\"type\":\"string\"}]}}]}",
+ "TEST_COLUMN_NAME OBJECT(K1 NUMBER(10,0), K2 NUMBER(10,0), NESTED_OBJECT"
+ + " OBJECT(NESTED_KEY1 VARCHAR(16777216), NESTED_KEY2 VARCHAR(16777216)))"),
+ arguments(
+ "{\"type\":\"struct\",\"fields\":[{\"id\":2,\"name\":\"offset\",\"required\":false,\"type\":\"int\"},{\"id\":3,\"name\":\"topic\",\"required\":false,\"type\":\"string\"},{\"id\":4,\"name\":\"partition\",\"required\":false,\"type\":\"int\"},{\"id\":5,\"name\":\"key\",\"required\":false,\"type\":\"string\"},{\"id\":6,\"name\":\"schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":7,\"name\":\"key_schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":8,\"name\":\"CreateTime\",\"required\":false,\"type\":\"long\"},{\"id\":9,\"name\":\"LogAppendTime\",\"required\":false,\"type\":\"long\"},{\"id\":10,\"name\":\"SnowflakeConnectorPushTime\",\"required\":false,\"type\":\"long\"},{\"id\":11,\"name\":\"headers\",\"required\":false,\"type\":{\"type\":\"map\",\"key-id\":12,\"key\":\"string\",\"value-id\":13,\"value\":\"string\",\"value-required\":false}}]}\n",
+ "TEST_COLUMN_NAME OBJECT(OFFSET NUMBER(10,0), TOPIC VARCHAR(16777216), PARTITION"
+ + " NUMBER(10,0), KEY VARCHAR(16777216), SCHEMA_ID NUMBER(10,0), KEY_SCHEMA_ID"
+ + " NUMBER(10,0), CREATETIME NUMBER(19,0), LOGAPPENDTIME NUMBER(19,0),"
+ + " SNOWFLAKECONNECTORPUSHTIME NUMBER(19,0), HEADERS MAP(VARCHAR(16777216),"
+ + " VARCHAR(16777216)))"));
+ }
+}