From 2754d3cb0d153df23c439b1997234150e6e0591d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Z=C4=85bek?= Date: Wed, 13 Nov 2024 09:53:53 +0100 Subject: [PATCH] SNOW-1665420 add logic to parse Iceberg schema --- pom.xml | 19 +- .../iceberg/ApacheIcebergColumnSchema.java | 24 +++ .../iceberg/IcebergColumnTree.java | 17 ++ .../iceberg/IcebergColumnTypeMapper.java | 49 ++++- .../iceberg/IcebergDataTypeParser.java | 185 ++++++++++++++++++ .../iceberg/IcebergFieldNode.java | 86 ++++++++ .../iceberg/IcebergTableSchema.java | 9 + .../streaming/iceberg/IcebergIngestionIT.java | 6 +- .../IcebergIngestionSchemaEvolutionIT.java | 44 +++++ .../iceberg/ParseIcebergColumnTreeTest.java | 71 +++++++ 10 files changed, 505 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ApacheIcebergColumnSchema.java create mode 100644 src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java create mode 100644 src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergDataTypeParser.java create mode 100644 src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java create mode 100644 src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchema.java create mode 100644 src/test/java/com/snowflake/kafka/connector/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java diff --git a/pom.xml b/pom.xml index 09ecf17ad..78d4faacb 100644 --- a/pom.xml +++ b/pom.xml @@ -56,6 +56,7 @@ 7.7.0 3.25.5 + 1.5.2 @@ -338,7 +339,7 @@ net.snowflake snowflake-ingest-sdk - 2.3.0 + 3.0.0 net.snowflake @@ -347,6 +348,22 @@ + + org.apache.iceberg + iceberg-api + ${iceberg.version} + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + org.apache.iceberg + iceberg-parquet + ${iceberg.version} + + org.apache.avro avro diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ApacheIcebergColumnSchema.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ApacheIcebergColumnSchema.java new file mode 100644 index 000000000..650ab64d7 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ApacheIcebergColumnSchema.java @@ -0,0 +1,24 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +import org.apache.iceberg.types.Type; + +/** Wrapper class for Iceberg schema retrieved from channel. */ +public class ApacheIcebergColumnSchema { + + private final Type schema; + + private final String columnName; + + public ApacheIcebergColumnSchema(Type schema, String columnName) { + this.schema = schema; + this.columnName = columnName.toUpperCase(); + } + + public Type getSchema() { + return schema; + } + + public String getColumnName() { + return columnName; + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java new file mode 100644 index 000000000..71e7b0086 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java @@ -0,0 +1,17 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + + +/** Class with object types compatible with Snowflake Iceberg table */ +public class IcebergColumnTree { + + private final IcebergFieldNode rootNode; + + public IcebergColumnTree(ApacheIcebergColumnSchema columnSchema) { + this.rootNode = new IcebergFieldNode(columnSchema.getColumnName(), columnSchema.getSchema()); + } + + public String buildQuery() { + StringBuilder sb = new StringBuilder(); + return rootNode.buildQuery(sb, "ROOT_NODE").toString(); + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java index 206c21b39..d3fac5583 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java @@ -11,6 +11,8 @@ import com.snowflake.kafka.connector.internal.streaming.schemaevolution.ColumnTypeMapper; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -18,6 +20,52 @@ import org.apache.kafka.connect.data.Timestamp; public class IcebergColumnTypeMapper extends ColumnTypeMapper { + + /** + * See Data types for + * Apache Iceberg™ tables + */ + public static String mapToSnowflakeDataType(Type apacheIcebergType) { + switch (apacheIcebergType.typeId()) { + case BOOLEAN: + return "BOOLEAN"; + case INTEGER: + return "NUMBER(10,0)"; + case LONG: + return "NUMBER(19,0)"; + case FLOAT: + case DOUBLE: + return "FLOAT"; + case DATE: + return "DATE"; + case TIME: + return "TIME(6)"; + case TIMESTAMP: + Types.TimestampType timestamp = (Types.TimestampType) apacheIcebergType; + return timestamp.shouldAdjustToUTC() ? "TIMESTAMP_LTZ" : "TIMESTAMP"; + case STRING: + return "VARCHAR(16777216)"; + case UUID: + return "BINARY(16)"; + case FIXED: + throw new IllegalArgumentException("FIXED column type not supported!"); + case BINARY: + return "BINARY"; + case DECIMAL: + Types.DecimalType decimal = (Types.DecimalType) apacheIcebergType; + return decimal.toString().toUpperCase(); + case STRUCT: + return "OBJECT"; + case LIST: + return "ARRAY"; + case MAP: + return "MAP"; + default: + throw new IllegalArgumentException( + "Fail unsupported datatype! - " + apacheIcebergType.typeId()); + } + } + @Override public String mapToColumnType(Schema.Type kafkaType, String schemaName) { switch (kafkaType) { @@ -54,7 +102,6 @@ public String mapToColumnType(Schema.Type kafkaType, String schemaName) { } case ARRAY: default: - // MAP and STRUCT will go here throw new IllegalArgumentException("Arrays, struct and map not supported!"); } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergDataTypeParser.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergDataTypeParser.java new file mode 100644 index 000000000..ad6b24d02 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergDataTypeParser.java @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2024 Snowflake Computing Inc. All rights reserved. + */ + +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import java.util.Iterator; +import java.util.List; +import javax.annotation.Nonnull; +import org.apache.iceberg.parquet.TypeToMessageType; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.JsonUtil; + +/** + * This class is used to Iceberg data type (include primitive types and nested types) serialization + * and deserialization. + * + *

This code is modified from + * GlobalServices/modules/data-lake/datalake-api/src/main/java/com/snowflake/metadata/iceberg + * /IcebergDataTypeParser.java + */ +public class IcebergDataTypeParser { + public static final String ELEMENT = "element"; + public static final String KEY = "key"; + public static final String VALUE = "value"; + private static final String TYPE = "type"; + private static final String STRUCT = "struct"; + private static final String LIST = "list"; + private static final String MAP = "map"; + private static final String FIELDS = "fields"; + private static final String DOC = "doc"; + private static final String NAME = "name"; + private static final String ID = "id"; + private static final String ELEMENT_ID = "element-id"; + private static final String KEY_ID = "key-id"; + private static final String VALUE_ID = "value-id"; + private static final String REQUIRED = "required"; + private static final String ELEMENT_REQUIRED = "element-required"; + private static final String VALUE_REQUIRED = "value-required"; + + private static final String EMPTY_FIELD_CHAR = "\\"; + + /** Object mapper for this class */ + private static final ObjectMapper MAPPER = new ObjectMapper(); + + /** Util class that contains the mapping between Iceberg data type and Parquet data type */ + private static final TypeToMessageType typeToMessageType = new TypeToMessageType(); + + /** + * Get Iceberg data type information by deserialization. + * + * @param icebergDataType string representation of Iceberg data type + * @return Iceberg data type + */ + public static Type deserializeIcebergType(String icebergDataType) { + try { + JsonNode json = MAPPER.readTree(icebergDataType); + return getTypeFromJson(json); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException( + String.format("Failed to deserialize Iceberg data type: %s", icebergDataType)); + } + } + + /** + * Get corresponding Iceberg data type from JsonNode. + * + * @param jsonNode JsonNode parsed from Iceberg type string. + * @return Iceberg data type + */ + public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) { + if (jsonNode.isTextual()) { + return Types.fromPrimitiveString(jsonNode.asText()); + } else if (jsonNode.isObject()) { + if (!jsonNode.has(TYPE)) { + throw new IllegalArgumentException( + String.format("Missing key '%s' in schema: %s", TYPE, jsonNode)); + } + String type = jsonNode.get(TYPE).asText(); + if (STRUCT.equals(type)) { + return structFromJson(jsonNode); + } else if (LIST.equals(type)) { + return listFromJson(jsonNode); + } else if (MAP.equals(type)) { + return mapFromJson(jsonNode); + } + throw new IllegalArgumentException( + String.format("Cannot parse Iceberg type: %s, schema: %s", type, jsonNode)); + } + + throw new IllegalArgumentException("Cannot parse Iceberg type from schema: " + jsonNode); + } + + /** + * Get Iceberg struct type information from JsonNode. + * + * @param json JsonNode parsed from Iceberg type string. + * @return struct type + */ + public static @Nonnull Types.StructType structFromJson(@Nonnull JsonNode json) { + if (!json.has(FIELDS)) { + throw new IllegalArgumentException( + String.format("Missing key '%s' in schema: %s", FIELDS, json)); + } + JsonNode fieldArray = json.get(FIELDS); + Preconditions.checkArgument(fieldArray != null, "Field array cannot be null"); + Preconditions.checkArgument( + fieldArray.isArray(), "Cannot parse struct fields from non-array: %s", fieldArray); + + List fields = Lists.newArrayListWithExpectedSize(fieldArray.size()); + Iterator iterator = fieldArray.elements(); + while (iterator.hasNext()) { + JsonNode field = iterator.next(); + Preconditions.checkArgument( + field.isObject(), "Cannot parse struct field from non-object: %s", field); + + int id = JsonUtil.getInt(ID, field); + + /* TypeToMessageType throws on empty field name, use a backslash to represent it and escape remaining backslash. */ + String name = + JsonUtil.getString(NAME, field) + .replace(EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR); + if (name.isEmpty()) { + name = EMPTY_FIELD_CHAR; + } + Type type = getTypeFromJson(field.get(TYPE)); + + String doc = JsonUtil.getStringOrNull(DOC, field); + boolean isRequired = JsonUtil.getBool(REQUIRED, field); + if (isRequired) { + fields.add(Types.NestedField.required(id, name, type, doc)); + } else { + fields.add(Types.NestedField.optional(id, name, type, doc)); + } + } + + return Types.StructType.of(fields); + } + + /** + * Get Iceberg list type information from JsonNode. + * + * @param json JsonNode parsed from Iceberg type string. + * @return list type + */ + public static Types.ListType listFromJson(JsonNode json) { + int elementId = JsonUtil.getInt(ELEMENT_ID, json); + Type elementType = getTypeFromJson(json.get(ELEMENT)); + boolean isRequired = JsonUtil.getBool(ELEMENT_REQUIRED, json); + + if (isRequired) { + return Types.ListType.ofRequired(elementId, elementType); + } else { + return Types.ListType.ofOptional(elementId, elementType); + } + } + + /** + * Get Iceberg map type from JsonNode. + * + * @param json JsonNode parsed from Iceberg type string. + * @return map type + */ + public static Types.MapType mapFromJson(JsonNode json) { + int keyId = JsonUtil.getInt(KEY_ID, json); + Type keyType = getTypeFromJson(json.get(KEY)); + + int valueId = JsonUtil.getInt(VALUE_ID, json); + Type valueType = getTypeFromJson(json.get(VALUE)); + + boolean isRequired = JsonUtil.getBool(VALUE_REQUIRED, json); + + if (isRequired) { + return Types.MapType.ofRequired(keyId, valueId, keyType, valueType); + } else { + return Types.MapType.ofOptional(keyId, valueId, keyType, valueType); + } + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java new file mode 100644 index 000000000..fe8e96754 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java @@ -0,0 +1,86 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +import java.util.LinkedHashMap; +import java.util.stream.Collectors; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +class IcebergFieldNode { + + public final String name; + + public final String snowflakeIcebergType; + + public final LinkedHashMap children; + + public IcebergFieldNode(String name, Type apacheIcebergSchema) { + this.name = name.toUpperCase(); + this.snowflakeIcebergType = IcebergColumnTypeMapper.mapToSnowflakeDataType(apacheIcebergSchema); + this.children = produceChildren(apacheIcebergSchema); + } + + private LinkedHashMap produceChildren(Type apacheIcebergSchema) { + // primitives must not have children + if (apacheIcebergSchema.isPrimitiveType()) { + return new LinkedHashMap<>(); + } + Type.NestedType nestedField = apacheIcebergSchema.asNestedType(); + return nestedField.fields().stream() + .collect( + Collectors.toMap( + Types.NestedField::name, + this::fromNestedField, + // It's impossible to have two same keys + (v1, v2) -> { + throw new IllegalArgumentException("Two same keys: " + v1); + }, + LinkedHashMap::new)); + } + + private IcebergFieldNode fromNestedField(Types.NestedField field) { + return new IcebergFieldNode(field.name(), field.type()); + } + + /** + * @param sb StringBuilder + * @param parentType Snowflake Iceberg table compatible type. If a root node is a parent then + * "ROOT_NODE" is passed, because we always generate root nodes column name. + * @return StringBuilder with appended query elements + */ + StringBuilder buildQuery(StringBuilder sb, String parentType) { + appendNameAndType(sb, parentType); + if (!children.isEmpty()) { + sb.append("("); + appendChildren(sb, this.snowflakeIcebergType); + sb.append(")"); + } + return sb; + } + + private StringBuilder appendNameAndType(StringBuilder sb, String parentType) { + if (parentType.equals("ARRAY") || parentType.equals("MAP")) { + sb.append(snowflakeIcebergType); + } else { + sb.append(name); + sb.append(" "); + sb.append(snowflakeIcebergType); + } + return sb; + } + + private StringBuilder appendChildren(StringBuilder sb, String parentType) { + children.forEach( + (name, node) -> { + node.buildQuery(sb, parentType); + sb.append(", "); + }); + removeLastSeparator(sb); + return sb; + } + + private StringBuilder removeLastSeparator(StringBuilder sb) { + sb.deleteCharAt(sb.length() - 1); + sb.deleteCharAt(sb.length() - 1); + return sb; + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchema.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchema.java new file mode 100644 index 000000000..e4ed70f3e --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchema.java @@ -0,0 +1,9 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +import java.util.List; + +/** Wrapper for multiple columns, not necessary all columns that are in the table */ +public class IcebergTableSchema { + + private List columns; +} diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java index 11dd31868..412396af7 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java @@ -49,7 +49,6 @@ public void setUp() { SnowflakeSinkConnectorConfig.setDefaultValues(config); config.put(ICEBERG_ENABLED, "TRUE"); config.put(ENABLE_SCHEMATIZATION_CONFIG, isSchemaEvolutionEnabled().toString()); - createIcebergTable(); enableSchemaEvolution(tableName); @@ -117,9 +116,10 @@ protected SinkRecord createKafkaRecord(String jsonString, int offset, boolean wi + ") " + "SELECT * FROM extracted_data " + "ORDER BY offset_extracted asc;"; - + // StreamingClientProvider.streamingClientProvider.getRegisteredClients().entrySet().stream().collect(Collectors.toList()).get(0).getValue(). protected List> selectAllSchematizedRecords() { - + // executeQueryAndCollectResult(conn.getConnection(), "use warehouse kafka_push_test_wh", + // tableName, null); return select(tableName, selectAllSortByOffset, PrimitiveJsonRecord::fromSchematizedResult); } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java index 08f0bf2f3..8ca9fa625 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java @@ -81,6 +81,50 @@ void shouldEvolveSchemaAndInsertRecords( assertRecordsInTable(); } + @ParameterizedTest(name = "{0}") + @MethodSource("prepareData") + @Disabled + // Schema evolution for structured types is not yet supported + void shouldEvolveSchemaAndInsertRecords_structuredData( + String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema) + throws Exception { + // start off with just one column + List rows = describeTable(tableName); + assertThat(rows) + .hasSize(1) + .extracting(DescribeTableRow::getColumn) + .contains(Utils.TABLE_COLUMN_METADATA); + + SinkRecord record = createKafkaRecord(message, 0, withSchema); + service.insert(Collections.singletonList(record)); + waitForOffset(-1); + rows = describeTable(tableName); + assertThat(rows.size()).isEqualTo(9); + + // don't check metadata column schema, we have different tests for that + rows = + rows.stream() + .filter(r -> !r.getColumn().equals(Utils.TABLE_COLUMN_METADATA)) + .collect(Collectors.toList()); + + assertThat(rows).containsExactlyInAnyOrder(expectedSchema); + + // resend and store same record without any issues now + service.insert(Collections.singletonList(record)); + waitForOffset(1); + + // and another record with same schema + service.insert(Collections.singletonList(createKafkaRecord(message, 1, withSchema))); + waitForOffset(2); + + String testStruct = "{ \"testStruct\": {" + "\"k1\" : 1," + "\"k2\" : 2" + "} " + "}"; + + service.insert(Collections.singletonList(createKafkaRecord(testStruct, 2, false))); + + service.insert(Collections.singletonList(createKafkaRecord(testStruct, 2, false))); + waitForOffset(3); + } + private void assertRecordsInTable() { List> recordsWithMetadata = selectAllSchematizedRecords(); diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java b/src/test/java/com/snowflake/kafka/connector/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java new file mode 100644 index 000000000..eb5baa6f9 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java @@ -0,0 +1,71 @@ +package com.snowflake.kafka.connector.streaming.schemaevolution.iceberg; + +import static org.junit.jupiter.params.provider.Arguments.arguments; + +import com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg.ApacheIcebergColumnSchema; +import com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg.IcebergColumnTree; +import com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg.IcebergDataTypeParser; +import java.util.stream.Stream; +import org.apache.iceberg.types.Type; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class ParseIcebergColumnTreeTest { + + @ParameterizedTest + @MethodSource("icebergSchemas") + void parseFromApacheIcebergSchema(String plainIcebergSchema, String expectedQuery) { + // given + Type type = IcebergDataTypeParser.deserializeIcebergType(plainIcebergSchema); + // when + ApacheIcebergColumnSchema apacheSchema = + new ApacheIcebergColumnSchema(type, "TEST_COLUMN_NAME"); + IcebergColumnTree tree = new IcebergColumnTree(apacheSchema); + // then + Assertions.assertEquals(expectedQuery, tree.buildQuery()); + } + + static Stream icebergSchemas() { + return Stream.of( + // primitives + arguments("\"boolean\"", "TEST_COLUMN_NAME BOOLEAN"), + arguments("\"int\"", "TEST_COLUMN_NAME NUMBER(10,0)"), + arguments("\"long\"", "TEST_COLUMN_NAME NUMBER(19,0)"), + arguments("\"float\"", "TEST_COLUMN_NAME FLOAT"), + arguments("\"double\"", "TEST_COLUMN_NAME FLOAT"), + arguments("\"date\"", "TEST_COLUMN_NAME DATE"), + arguments("\"time\"", "TEST_COLUMN_NAME TIME(6)"), + arguments("\"timestamptz\"", "TEST_COLUMN_NAME TIMESTAMP_LTZ"), + arguments("\"timestamp\"", "TEST_COLUMN_NAME TIMESTAMP"), + arguments("\"string\"", "TEST_COLUMN_NAME VARCHAR(16777216)"), + arguments("\"uuid\"", "TEST_COLUMN_NAME BINARY(16)"), + arguments("\"binary\"", "TEST_COLUMN_NAME BINARY"), + arguments("\"decimal(10,5)\"", "TEST_COLUMN_NAME DECIMAL(10, 5)"), + // simple struct + arguments( + "{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}]}", + "TEST_COLUMN_NAME OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0))"), + // list + arguments( + "{\"type\":\"list\",\"element-id\":23,\"element\":\"long\",\"element-required\":false}", + "TEST_COLUMN_NAME ARRAY(NUMBER(19,0))"), + // map + arguments( + "{\"type\":\"map\",\"key-id\":4,\"key\":\"int\",\"value-id\":5,\"value\":\"string\",\"value-required\":false}", + "TEST_COLUMN_NAME MAP(NUMBER(10,0), VARCHAR(16777216))"), + // structs with nested objects + arguments( + "{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"},{\"id\":25,\"name\":\"no\",\"required\":false,\"type\":{\"type\":\"struct\",\"fields\":[{\"id\":26,\"name\":\"nk1\",\"required\":false,\"type\":\"string\"},{\"id\":27,\"name\":\"nk2\",\"required\":false,\"type\":\"string\"}]}}]}", + "TEST_COLUMN_NAME OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0), no OBJECT(nk1" + + " VARCHAR(16777216), nk2 VARCHAR(16777216)))"), + arguments( + "{\"type\":\"struct\",\"fields\":[{\"id\":2,\"name\":\"offset\",\"required\":false,\"type\":\"int\"},{\"id\":3,\"name\":\"topic\",\"required\":false,\"type\":\"string\"},{\"id\":4,\"name\":\"partition\",\"required\":false,\"type\":\"int\"},{\"id\":5,\"name\":\"key\",\"required\":false,\"type\":\"string\"},{\"id\":6,\"name\":\"schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":7,\"name\":\"key_schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":8,\"name\":\"CreateTime\",\"required\":false,\"type\":\"long\"},{\"id\":9,\"name\":\"LogAppendTime\",\"required\":false,\"type\":\"long\"},{\"id\":10,\"name\":\"SnowflakeConnectorPushTime\",\"required\":false,\"type\":\"long\"},{\"id\":11,\"name\":\"headers\",\"required\":false,\"type\":{\"type\":\"map\",\"key-id\":12,\"key\":\"string\",\"value-id\":13,\"value\":\"string\",\"value-required\":false}}]}\n", + "TEST_COLUMN_NAME OBJECT(offset NUMBER(10,0), topic VARCHAR(16777216), partition" + + " NUMBER(10,0), key VARCHAR(16777216), schema_id NUMBER(10,0), key_schema_id" + + " NUMBER(10,0), CreateTime NUMBER(19,0), LogAppendTime NUMBER(19,0)," + + " SnowflakeConnectorPushTime NUMBER(19,0), headers MAP(VARCHAR(16777216)," + + " VARCHAR(16777216)))")); + } +}