diff --git a/pom.xml b/pom.xml index 0f18b78c3..d7ef48f1a 100644 --- a/pom.xml +++ b/pom.xml @@ -56,6 +56,7 @@ 7.7.0 3.25.5 + 1.6.1 @@ -607,6 +608,17 @@ test + + org.apache.iceberg + iceberg-api + ${iceberg.version} + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + diff --git a/pom_confluent.xml b/pom_confluent.xml index 61e7f3685..d5837cf60 100644 --- a/pom_confluent.xml +++ b/pom_confluent.xml @@ -66,6 +66,7 @@ 7.7.0 3.25.5 + 1.6.1 @@ -757,6 +758,18 @@ ${assertj-core.version} test + + + org.apache.iceberg + iceberg-api + ${iceberg.version} + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + diff --git a/scripts/process_licenses.py b/scripts/process_licenses.py index 9b1ab126d..7a3fe07b2 100644 --- a/scripts/process_licenses.py +++ b/scripts/process_licenses.py @@ -91,7 +91,12 @@ "org.projectnessie.cel:cel-jackson": APACHE_LICENSE, "org.projectnessie.cel:cel-tools": APACHE_LICENSE, "org.xerial.snappy:snappy-java": APACHE_LICENSE, - "org.yaml:snakeyaml": APACHE_LICENSE + "org.yaml:snakeyaml": APACHE_LICENSE, + "org.apache.iceberg:iceberg-api": APACHE_LICENSE, + "org.apache.iceberg:iceberg-core": APACHE_LICENSE, + "org.apache.iceberg:iceberg-common": APACHE_LICENSE, + "io.airlift:aircompressor": APACHE_LICENSE, + "org.roaringbitmap:RoaringBitmap": APACHE_LICENSE } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ApacheIcebergColumnSchema.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ApacheIcebergColumnSchema.java new file mode 100644 index 000000000..650ab64d7 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ApacheIcebergColumnSchema.java @@ -0,0 +1,24 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +import org.apache.iceberg.types.Type; + +/** Wrapper class for Iceberg schema retrieved from channel. */ +public class ApacheIcebergColumnSchema { + + private final Type schema; + + private final String columnName; + + public ApacheIcebergColumnSchema(Type schema, String columnName) { + this.schema = schema; + this.columnName = columnName.toUpperCase(); + } + + public Type getSchema() { + return schema; + } + + public String getColumnName() { + return columnName; + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java new file mode 100644 index 000000000..93495cde7 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java @@ -0,0 +1,16 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +/** Class with object types compatible with Snowflake Iceberg table */ +public class IcebergColumnTree { + + private final IcebergFieldNode rootNode; + + public IcebergColumnTree(ApacheIcebergColumnSchema columnSchema) { + this.rootNode = new IcebergFieldNode(columnSchema.getColumnName(), columnSchema.getSchema()); + } + + public String buildQuery() { + StringBuilder sb = new StringBuilder(); + return rootNode.buildQuery(sb, "ROOT_NODE").toString(); + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java index 42c5ee1b9..b0a6c9e74 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java @@ -11,6 +11,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.snowflake.kafka.connector.internal.streaming.schemaevolution.ColumnTypeMapper; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -18,6 +20,52 @@ import org.apache.kafka.connect.data.Timestamp; public class IcebergColumnTypeMapper extends ColumnTypeMapper { + + /** + * See Data types for + * Apache Iceberg™ tables + */ + public static String mapToSnowflakeDataType(Type apacheIcebergType) { + switch (apacheIcebergType.typeId()) { + case BOOLEAN: + return "BOOLEAN"; + case INTEGER: + return "NUMBER(10,0)"; + case LONG: + return "NUMBER(19,0)"; + case FLOAT: + case DOUBLE: + return "FLOAT"; + case DATE: + return "DATE"; + case TIME: + return "TIME(6)"; + case TIMESTAMP: + Types.TimestampType timestamp = (Types.TimestampType) apacheIcebergType; + return timestamp.shouldAdjustToUTC() ? "TIMESTAMP_LTZ" : "TIMESTAMP"; + case STRING: + return "VARCHAR(16777216)"; + case UUID: + return "BINARY(16)"; + case FIXED: + throw new IllegalArgumentException("FIXED column type not supported!"); + case BINARY: + return "BINARY"; + case DECIMAL: + Types.DecimalType decimal = (Types.DecimalType) apacheIcebergType; + return decimal.toString().toUpperCase(); + case STRUCT: + return "OBJECT"; + case LIST: + return "ARRAY"; + case MAP: + return "MAP"; + default: + throw new IllegalArgumentException( + "Fail unsupported datatype! - " + apacheIcebergType.typeId()); + } + } + @Override public String mapToColumnType(Schema.Type kafkaType, String schemaName) { switch (kafkaType) { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergDataTypeParser.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergDataTypeParser.java new file mode 100644 index 000000000..6ffbd89cd --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergDataTypeParser.java @@ -0,0 +1,173 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import java.util.Iterator; +import java.util.List; +import javax.annotation.Nonnull; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.JsonUtil; + +/** + * This class is used to Iceberg data type (include primitive types and nested types) serialization + * and deserialization. + */ +public class IcebergDataTypeParser { + public static final String ELEMENT = "element"; + public static final String KEY = "key"; + public static final String VALUE = "value"; + private static final String TYPE = "type"; + private static final String STRUCT = "struct"; + private static final String LIST = "list"; + private static final String MAP = "map"; + private static final String FIELDS = "fields"; + private static final String DOC = "doc"; + private static final String NAME = "name"; + private static final String ID = "id"; + private static final String ELEMENT_ID = "element-id"; + private static final String KEY_ID = "key-id"; + private static final String VALUE_ID = "value-id"; + private static final String REQUIRED = "required"; + private static final String ELEMENT_REQUIRED = "element-required"; + private static final String VALUE_REQUIRED = "value-required"; + + private static final String EMPTY_FIELD_CHAR = "\\"; + + /** Object mapper for this class */ + private static final ObjectMapper MAPPER = new ObjectMapper(); + + /** + * Get Iceberg data type information by deserialization. + * + * @param icebergDataType string representation of Iceberg data type + * @return Iceberg data type + */ + public static Type deserializeIcebergType(String icebergDataType) { + try { + JsonNode json = MAPPER.readTree(icebergDataType); + return getTypeFromJson(json); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException( + String.format("Failed to deserialize Iceberg data type: %s", icebergDataType)); + } + } + + /** + * Get corresponding Iceberg data type from JsonNode. + * + * @param jsonNode JsonNode parsed from Iceberg type string. + * @return Iceberg data type + */ + public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) { + if (jsonNode.isTextual()) { + return Types.fromPrimitiveString(jsonNode.asText()); + } else if (jsonNode.isObject()) { + if (!jsonNode.has(TYPE)) { + throw new IllegalArgumentException( + String.format("Missing key '%s' in schema: %s", TYPE, jsonNode)); + } + String type = jsonNode.get(TYPE).asText(); + if (STRUCT.equals(type)) { + return structFromJson(jsonNode); + } else if (LIST.equals(type)) { + return listFromJson(jsonNode); + } else if (MAP.equals(type)) { + return mapFromJson(jsonNode); + } + throw new IllegalArgumentException( + String.format("Cannot parse Iceberg type: %s, schema: %s", type, jsonNode)); + } + + throw new IllegalArgumentException("Cannot parse Iceberg type from schema: " + jsonNode); + } + + /** + * Get Iceberg struct type information from JsonNode. + * + * @param json JsonNode parsed from Iceberg type string. + * @return struct type + */ + public static @Nonnull Types.StructType structFromJson(@Nonnull JsonNode json) { + if (!json.has(FIELDS)) { + throw new IllegalArgumentException( + String.format("Missing key '%s' in schema: %s", FIELDS, json)); + } + JsonNode fieldArray = json.get(FIELDS); + Preconditions.checkArgument(fieldArray != null, "Field array cannot be null"); + Preconditions.checkArgument( + fieldArray.isArray(), "Cannot parse struct fields from non-array: %s", fieldArray); + + List fields = Lists.newArrayListWithExpectedSize(fieldArray.size()); + Iterator iterator = fieldArray.elements(); + while (iterator.hasNext()) { + JsonNode field = iterator.next(); + Preconditions.checkArgument( + field.isObject(), "Cannot parse struct field from non-object: %s", field); + + int id = JsonUtil.getInt(ID, field); + + /* TypeToMessageType throws on empty field name, use a backslash to represent it and escape remaining backslash. */ + String name = + JsonUtil.getString(NAME, field) + .replace(EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR); + if (name.isEmpty()) { + name = EMPTY_FIELD_CHAR; + } + Type type = getTypeFromJson(field.get(TYPE)); + + String doc = JsonUtil.getStringOrNull(DOC, field); + boolean isRequired = JsonUtil.getBool(REQUIRED, field); + if (isRequired) { + fields.add(Types.NestedField.required(id, name, type, doc)); + } else { + fields.add(Types.NestedField.optional(id, name, type, doc)); + } + } + + return Types.StructType.of(fields); + } + + /** + * Get Iceberg list type information from JsonNode. + * + * @param json JsonNode parsed from Iceberg type string. + * @return list type + */ + public static Types.ListType listFromJson(JsonNode json) { + int elementId = JsonUtil.getInt(ELEMENT_ID, json); + Type elementType = getTypeFromJson(json.get(ELEMENT)); + boolean isRequired = JsonUtil.getBool(ELEMENT_REQUIRED, json); + + if (isRequired) { + return Types.ListType.ofRequired(elementId, elementType); + } else { + return Types.ListType.ofOptional(elementId, elementType); + } + } + + /** + * Get Iceberg map type from JsonNode. + * + * @param json JsonNode parsed from Iceberg type string. + * @return map type + */ + public static Types.MapType mapFromJson(JsonNode json) { + int keyId = JsonUtil.getInt(KEY_ID, json); + Type keyType = getTypeFromJson(json.get(KEY)); + + int valueId = JsonUtil.getInt(VALUE_ID, json); + Type valueType = getTypeFromJson(json.get(VALUE)); + + boolean isRequired = JsonUtil.getBool(VALUE_REQUIRED, json); + + if (isRequired) { + return Types.MapType.ofRequired(keyId, valueId, keyType, valueType); + } else { + return Types.MapType.ofOptional(keyId, valueId, keyType, valueType); + } + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java new file mode 100644 index 000000000..fdca41d22 --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java @@ -0,0 +1,86 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +import java.util.LinkedHashMap; +import java.util.stream.Collectors; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +class IcebergFieldNode { + + public final String name; + + public final String snowflakeIcebergType; + + public final LinkedHashMap children; + + IcebergFieldNode(String name, Type apacheIcebergSchema) { + this.name = name; + this.snowflakeIcebergType = IcebergColumnTypeMapper.mapToSnowflakeDataType(apacheIcebergSchema); + this.children = produceChildren(apacheIcebergSchema); + } + + private LinkedHashMap produceChildren(Type apacheIcebergSchema) { + // primitives must not have children + if (apacheIcebergSchema.isPrimitiveType()) { + return new LinkedHashMap<>(); + } + Type.NestedType nestedField = apacheIcebergSchema.asNestedType(); + return nestedField.fields().stream() + .collect( + Collectors.toMap( + Types.NestedField::name, + this::fromNestedField, + // It's impossible to have two same keys + (v1, v2) -> { + throw new IllegalArgumentException("Two same keys: " + v1); + }, + LinkedHashMap::new)); + } + + private IcebergFieldNode fromNestedField(Types.NestedField field) { + return new IcebergFieldNode(field.name(), field.type()); + } + + /** + * @param sb StringBuilder + * @param parentType Snowflake Iceberg table compatible type. If a root node is a parent then + * "ROOT_NODE" is passed, because we always generate root nodes column name. + * @return StringBuilder with appended query elements + */ + StringBuilder buildQuery(StringBuilder sb, String parentType) { + if (parentType.equals("ARRAY") || parentType.equals("MAP")) { + sb.append(snowflakeIcebergType); + } else { + appendNameAndType(sb); + } + if (!children.isEmpty()) { + sb.append("("); + appendChildren(sb, this.snowflakeIcebergType); + sb.append(")"); + } + return sb; + } + + private StringBuilder appendNameAndType(StringBuilder sb) { + sb.append(name); + sb.append(" "); + sb.append(snowflakeIcebergType); + return sb; + } + + private StringBuilder appendChildren(StringBuilder sb, String parentType) { + children.forEach( + (name, node) -> { + node.buildQuery(sb, parentType); + sb.append(", "); + }); + removeLastSeparator(sb); + return sb; + } + + private StringBuilder removeLastSeparator(StringBuilder sb) { + sb.deleteCharAt(sb.length() - 1); + sb.deleteCharAt(sb.length() - 1); + return sb; + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchema.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchema.java new file mode 100644 index 000000000..e4ed70f3e --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchema.java @@ -0,0 +1,9 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +import java.util.List; + +/** Wrapper for multiple columns, not necessary all columns that are in the table */ +public class IcebergTableSchema { + + private List columns; +} diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java index 2bd6f666c..682baef01 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionIT.java @@ -121,7 +121,6 @@ protected SinkRecord createKafkaRecord(String jsonString, int offset, boolean wi + "ORDER BY offset_extracted asc;"; protected List> selectAllSchematizedRecords() { - return select(tableName, selectAllSortByOffset, PrimitiveJsonRecord::fromSchematizedResult); } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java b/src/test/java/com/snowflake/kafka/connector/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java new file mode 100644 index 000000000..010a73d21 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java @@ -0,0 +1,71 @@ +package com.snowflake.kafka.connector.streaming.schemaevolution.iceberg; + +import static org.junit.jupiter.params.provider.Arguments.arguments; + +import com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg.ApacheIcebergColumnSchema; +import com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg.IcebergColumnTree; +import com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg.IcebergDataTypeParser; +import java.util.stream.Stream; +import org.apache.iceberg.types.Type; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class ParseIcebergColumnTreeTest { + + @ParameterizedTest + @MethodSource("icebergSchemas") + void parseFromApacheIcebergSchema(String plainIcebergSchema, String expectedQuery) { + // given + Type type = IcebergDataTypeParser.deserializeIcebergType(plainIcebergSchema); + // when + ApacheIcebergColumnSchema apacheSchema = + new ApacheIcebergColumnSchema(type, "TEST_COLUMN_NAME"); + IcebergColumnTree tree = new IcebergColumnTree(apacheSchema); + // then + Assertions.assertEquals(expectedQuery, tree.buildQuery()); + } + + static Stream icebergSchemas() { + return Stream.of( + // primitives + arguments("\"boolean\"", "TEST_COLUMN_NAME BOOLEAN"), + arguments("\"int\"", "TEST_COLUMN_NAME NUMBER(10,0)"), + arguments("\"long\"", "TEST_COLUMN_NAME NUMBER(19,0)"), + arguments("\"float\"", "TEST_COLUMN_NAME FLOAT"), + arguments("\"double\"", "TEST_COLUMN_NAME FLOAT"), + arguments("\"date\"", "TEST_COLUMN_NAME DATE"), + arguments("\"time\"", "TEST_COLUMN_NAME TIME(6)"), + arguments("\"timestamptz\"", "TEST_COLUMN_NAME TIMESTAMP_LTZ"), + arguments("\"timestamp\"", "TEST_COLUMN_NAME TIMESTAMP"), + arguments("\"string\"", "TEST_COLUMN_NAME VARCHAR(16777216)"), + arguments("\"uuid\"", "TEST_COLUMN_NAME BINARY(16)"), + arguments("\"binary\"", "TEST_COLUMN_NAME BINARY"), + arguments("\"decimal(10,5)\"", "TEST_COLUMN_NAME DECIMAL(10, 5)"), + // simple struct + arguments( + "{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"}]}", + "TEST_COLUMN_NAME OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0))"), + // list + arguments( + "{\"type\":\"list\",\"element-id\":23,\"element\":\"long\",\"element-required\":false}", + "TEST_COLUMN_NAME ARRAY(NUMBER(19,0))"), + // map + arguments( + "{\"type\":\"map\",\"key-id\":4,\"key\":\"int\",\"value-id\":5,\"value\":\"string\",\"value-required\":false}", + "TEST_COLUMN_NAME MAP(NUMBER(10,0), VARCHAR(16777216))"), + // structs with nested objects + arguments( + "{\"type\":\"struct\",\"fields\":[{\"id\":23,\"name\":\"k1\",\"required\":false,\"type\":\"int\"},{\"id\":24,\"name\":\"k2\",\"required\":false,\"type\":\"int\"},{\"id\":25,\"name\":\"nested_object\",\"required\":false,\"type\":{\"type\":\"struct\",\"fields\":[{\"id\":26,\"name\":\"nested_key1\",\"required\":false,\"type\":\"string\"},{\"id\":27,\"name\":\"nested_key2\",\"required\":false,\"type\":\"string\"}]}}]}", + "TEST_COLUMN_NAME OBJECT(k1 NUMBER(10,0), k2 NUMBER(10,0), nested_object" + + " OBJECT(nested_key1 VARCHAR(16777216), nested_key2 VARCHAR(16777216)))"), + arguments( + "{\"type\":\"struct\",\"fields\":[{\"id\":2,\"name\":\"offset\",\"required\":false,\"type\":\"int\"},{\"id\":3,\"name\":\"topic\",\"required\":false,\"type\":\"string\"},{\"id\":4,\"name\":\"partition\",\"required\":false,\"type\":\"int\"},{\"id\":5,\"name\":\"key\",\"required\":false,\"type\":\"string\"},{\"id\":6,\"name\":\"schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":7,\"name\":\"key_schema_id\",\"required\":false,\"type\":\"int\"},{\"id\":8,\"name\":\"CreateTime\",\"required\":false,\"type\":\"long\"},{\"id\":9,\"name\":\"LogAppendTime\",\"required\":false,\"type\":\"long\"},{\"id\":10,\"name\":\"SnowflakeConnectorPushTime\",\"required\":false,\"type\":\"long\"},{\"id\":11,\"name\":\"headers\",\"required\":false,\"type\":{\"type\":\"map\",\"key-id\":12,\"key\":\"string\",\"value-id\":13,\"value\":\"string\",\"value-required\":false}}]}\n", + "TEST_COLUMN_NAME OBJECT(offset NUMBER(10,0), topic VARCHAR(16777216), partition" + + " NUMBER(10,0), key VARCHAR(16777216), schema_id NUMBER(10,0), key_schema_id" + + " NUMBER(10,0), CreateTime NUMBER(19,0), LogAppendTime NUMBER(19,0)," + + " SnowflakeConnectorPushTime NUMBER(19,0), headers MAP(VARCHAR(16777216)," + + " VARCHAR(16777216)))")); + } +}