Skip to content

Commit

Permalink
SNOW-1665420 add logic to parse Iceberg schema (#996)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bzabek authored Nov 15, 2024
1 parent f7652f9 commit 14d43f7
Show file tree
Hide file tree
Showing 11 changed files with 458 additions and 2 deletions.
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
<confluent.version>7.7.0</confluent.version>
<!--Compatible protobuf version https://github.com/confluentinc/common/blob/v7.7.0/pom.xml#L91 -->
<protobuf.version>3.25.5</protobuf.version>
<iceberg.version>1.6.1</iceberg.version>
</properties>


Expand Down Expand Up @@ -607,6 +608,17 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>

</dependencies>

<profiles>
Expand Down
13 changes: 13 additions & 0 deletions pom_confluent.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
<confluent.version>7.7.0</confluent.version>
<!--Compatible protobuf version https://github.com/confluentinc/common/blob/v7.7.0/pom.xml#L91 -->
<protobuf.version>3.25.5</protobuf.version>
<iceberg.version>1.6.1</iceberg.version>
</properties>


Expand Down Expand Up @@ -757,6 +758,18 @@
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>

</dependencies>

<profiles>
Expand Down
7 changes: 6 additions & 1 deletion scripts/process_licenses.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@
"org.projectnessie.cel:cel-jackson": APACHE_LICENSE,
"org.projectnessie.cel:cel-tools": APACHE_LICENSE,
"org.xerial.snappy:snappy-java": APACHE_LICENSE,
"org.yaml:snakeyaml": APACHE_LICENSE
"org.yaml:snakeyaml": APACHE_LICENSE,
"org.apache.iceberg:iceberg-api": APACHE_LICENSE,
"org.apache.iceberg:iceberg-core": APACHE_LICENSE,
"org.apache.iceberg:iceberg-common": APACHE_LICENSE,
"io.airlift:aircompressor": APACHE_LICENSE,
"org.roaringbitmap:RoaringBitmap": APACHE_LICENSE
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import org.apache.iceberg.types.Type;

/** Wrapper class for Iceberg schema retrieved from channel. */
public class ApacheIcebergColumnSchema {

private final Type schema;

private final String columnName;

public ApacheIcebergColumnSchema(Type schema, String columnName) {
this.schema = schema;
this.columnName = columnName.toUpperCase();
}

public Type getSchema() {
return schema;
}

public String getColumnName() {
return columnName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

/** Class with object types compatible with Snowflake Iceberg table */
public class IcebergColumnTree {

private final IcebergFieldNode rootNode;

public IcebergColumnTree(ApacheIcebergColumnSchema columnSchema) {
this.rootNode = new IcebergFieldNode(columnSchema.getColumnName(), columnSchema.getSchema());
}

public String buildQuery() {
StringBuilder sb = new StringBuilder();
return rootNode.buildQuery(sb, "ROOT_NODE").toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,61 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.ColumnTypeMapper;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;

public class IcebergColumnTypeMapper extends ColumnTypeMapper {

/**
* See <a href="https://docs.snowflake.com/en/user-guide/tables-iceberg-data-types">Data types for
* Apache Iceberg™ tables</a>
*/
public static String mapToSnowflakeDataType(Type apacheIcebergType) {
switch (apacheIcebergType.typeId()) {
case BOOLEAN:
return "BOOLEAN";
case INTEGER:
return "NUMBER(10,0)";
case LONG:
return "NUMBER(19,0)";
case FLOAT:
case DOUBLE:
return "FLOAT";
case DATE:
return "DATE";
case TIME:
return "TIME(6)";
case TIMESTAMP:
Types.TimestampType timestamp = (Types.TimestampType) apacheIcebergType;
return timestamp.shouldAdjustToUTC() ? "TIMESTAMP_LTZ" : "TIMESTAMP";
case STRING:
return "VARCHAR(16777216)";
case UUID:
return "BINARY(16)";
case FIXED:
throw new IllegalArgumentException("FIXED column type not supported!");
case BINARY:
return "BINARY";
case DECIMAL:
Types.DecimalType decimal = (Types.DecimalType) apacheIcebergType;
return decimal.toString().toUpperCase();
case STRUCT:
return "OBJECT";
case LIST:
return "ARRAY";
case MAP:
return "MAP";
default:
throw new IllegalArgumentException(
"Fail unsupported datatype! - " + apacheIcebergType.typeId());
}
}

@Override
public String mapToColumnType(Schema.Type kafkaType, String schemaName) {
switch (kafkaType) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.JsonUtil;

/**
* This class is used to Iceberg data type (include primitive types and nested types) serialization
* and deserialization.
*/
public class IcebergDataTypeParser {
public static final String ELEMENT = "element";
public static final String KEY = "key";
public static final String VALUE = "value";
private static final String TYPE = "type";
private static final String STRUCT = "struct";
private static final String LIST = "list";
private static final String MAP = "map";
private static final String FIELDS = "fields";
private static final String DOC = "doc";
private static final String NAME = "name";
private static final String ID = "id";
private static final String ELEMENT_ID = "element-id";
private static final String KEY_ID = "key-id";
private static final String VALUE_ID = "value-id";
private static final String REQUIRED = "required";
private static final String ELEMENT_REQUIRED = "element-required";
private static final String VALUE_REQUIRED = "value-required";

private static final String EMPTY_FIELD_CHAR = "\\";

/** Object mapper for this class */
private static final ObjectMapper MAPPER = new ObjectMapper();

/**
* Get Iceberg data type information by deserialization.
*
* @param icebergDataType string representation of Iceberg data type
* @return Iceberg data type
*/
public static Type deserializeIcebergType(String icebergDataType) {
try {
JsonNode json = MAPPER.readTree(icebergDataType);
return getTypeFromJson(json);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException(
String.format("Failed to deserialize Iceberg data type: %s", icebergDataType));
}
}

/**
* Get corresponding Iceberg data type from JsonNode.
*
* @param jsonNode JsonNode parsed from Iceberg type string.
* @return Iceberg data type
*/
public static Type getTypeFromJson(@Nonnull JsonNode jsonNode) {
if (jsonNode.isTextual()) {
return Types.fromPrimitiveString(jsonNode.asText());
} else if (jsonNode.isObject()) {
if (!jsonNode.has(TYPE)) {
throw new IllegalArgumentException(
String.format("Missing key '%s' in schema: %s", TYPE, jsonNode));
}
String type = jsonNode.get(TYPE).asText();
if (STRUCT.equals(type)) {
return structFromJson(jsonNode);
} else if (LIST.equals(type)) {
return listFromJson(jsonNode);
} else if (MAP.equals(type)) {
return mapFromJson(jsonNode);
}
throw new IllegalArgumentException(
String.format("Cannot parse Iceberg type: %s, schema: %s", type, jsonNode));
}

throw new IllegalArgumentException("Cannot parse Iceberg type from schema: " + jsonNode);
}

/**
* Get Iceberg struct type information from JsonNode.
*
* @param json JsonNode parsed from Iceberg type string.
* @return struct type
*/
public static @Nonnull Types.StructType structFromJson(@Nonnull JsonNode json) {
if (!json.has(FIELDS)) {
throw new IllegalArgumentException(
String.format("Missing key '%s' in schema: %s", FIELDS, json));
}
JsonNode fieldArray = json.get(FIELDS);
Preconditions.checkArgument(fieldArray != null, "Field array cannot be null");
Preconditions.checkArgument(
fieldArray.isArray(), "Cannot parse struct fields from non-array: %s", fieldArray);

List<Types.NestedField> fields = Lists.newArrayListWithExpectedSize(fieldArray.size());
Iterator<JsonNode> iterator = fieldArray.elements();
while (iterator.hasNext()) {
JsonNode field = iterator.next();
Preconditions.checkArgument(
field.isObject(), "Cannot parse struct field from non-object: %s", field);

int id = JsonUtil.getInt(ID, field);

/* TypeToMessageType throws on empty field name, use a backslash to represent it and escape remaining backslash. */
String name =
JsonUtil.getString(NAME, field)
.replace(EMPTY_FIELD_CHAR, EMPTY_FIELD_CHAR + EMPTY_FIELD_CHAR);
if (name.isEmpty()) {
name = EMPTY_FIELD_CHAR;
}
Type type = getTypeFromJson(field.get(TYPE));

String doc = JsonUtil.getStringOrNull(DOC, field);
boolean isRequired = JsonUtil.getBool(REQUIRED, field);
if (isRequired) {
fields.add(Types.NestedField.required(id, name, type, doc));
} else {
fields.add(Types.NestedField.optional(id, name, type, doc));
}
}

return Types.StructType.of(fields);
}

/**
* Get Iceberg list type information from JsonNode.
*
* @param json JsonNode parsed from Iceberg type string.
* @return list type
*/
public static Types.ListType listFromJson(JsonNode json) {
int elementId = JsonUtil.getInt(ELEMENT_ID, json);
Type elementType = getTypeFromJson(json.get(ELEMENT));
boolean isRequired = JsonUtil.getBool(ELEMENT_REQUIRED, json);

if (isRequired) {
return Types.ListType.ofRequired(elementId, elementType);
} else {
return Types.ListType.ofOptional(elementId, elementType);
}
}

/**
* Get Iceberg map type from JsonNode.
*
* @param json JsonNode parsed from Iceberg type string.
* @return map type
*/
public static Types.MapType mapFromJson(JsonNode json) {
int keyId = JsonUtil.getInt(KEY_ID, json);
Type keyType = getTypeFromJson(json.get(KEY));

int valueId = JsonUtil.getInt(VALUE_ID, json);
Type valueType = getTypeFromJson(json.get(VALUE));

boolean isRequired = JsonUtil.getBool(VALUE_REQUIRED, json);

if (isRequired) {
return Types.MapType.ofRequired(keyId, valueId, keyType, valueType);
} else {
return Types.MapType.ofOptional(keyId, valueId, keyType, valueType);
}
}
}
Loading

0 comments on commit 14d43f7

Please sign in to comment.