diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnJsonValuePair.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnJsonValuePair.java index a369fede6..fc138084e 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnJsonValuePair.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnJsonValuePair.java @@ -1,12 +1,10 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; import com.fasterxml.jackson.databind.JsonNode; -import com.snowflake.kafka.connector.Utils; import java.util.Map; class IcebergColumnJsonValuePair { private final String columnName; - private final String quotedColumnName; private final JsonNode jsonNode; static IcebergColumnJsonValuePair from(Map.Entry field) { @@ -15,7 +13,6 @@ static IcebergColumnJsonValuePair from(Map.Entry field) { IcebergColumnJsonValuePair(String columnName, JsonNode jsonNode) { this.columnName = columnName; - this.quotedColumnName = Utils.quoteNameIfNeeded(columnName); this.jsonNode = jsonNode; } @@ -23,10 +20,6 @@ String getColumnName() { return columnName; } - String getQuotedColumnName() { - return quotedColumnName; - } - JsonNode getJsonNode() { return jsonNode; } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java index d6518c263..35a0847be 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java @@ -31,7 +31,7 @@ IcebergColumnTree merge(IcebergColumnTree modifiedTree) { return this; } - public String buildType() { + String buildType() { StringBuilder sb = new StringBuilder(); return rootNode.buildQuery(sb, "ROOT_NODE").toString(); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java index 93daf72f9..28170ffff 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java @@ -10,7 +10,6 @@ import static org.apache.kafka.connect.data.Schema.Type.STRUCT; import com.fasterxml.jackson.databind.JsonNode; -import com.snowflake.kafka.connector.internal.streaming.schemaevolution.snowflake.SnowflakeColumnTypeMapper; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.kafka.connect.data.Date; @@ -21,19 +20,13 @@ class IcebergColumnTypeMapper { - private final SnowflakeColumnTypeMapper snowflakeColumnTypeMapper; - static IcebergColumnTypeMapper INSTANCE = - new IcebergColumnTypeMapper(new SnowflakeColumnTypeMapper()); - - public IcebergColumnTypeMapper(SnowflakeColumnTypeMapper snowflakeColumnTypeMapper) { - this.snowflakeColumnTypeMapper = snowflakeColumnTypeMapper; - } + static IcebergColumnTypeMapper INSTANCE = new IcebergColumnTypeMapper(); /** * See Data types for * Apache Iceberg™ tables */ - public String mapToSnowflakeDataType(Type apacheIcebergType) { + String mapToSnowflakeDataType(Type apacheIcebergType) { switch (apacheIcebergType.typeId()) { case BOOLEAN: return "BOOLEAN"; @@ -85,7 +78,7 @@ String mapToColumnDataTypeFromJson(JsonNode value) { return mapToColumnType(kafkaType, null); } - public String mapToColumnType(Schema.Type kafkaType, String schemaName) { + String mapToColumnType(Schema.Type kafkaType, String schemaName) { switch (kafkaType) { case INT8: case INT16: @@ -137,7 +130,7 @@ public String mapToColumnType(Schema.Type kafkaType, String schemaName) { * @param value JSON node * @return Kafka type */ - public Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) { + Schema.Type mapJsonNodeTypeToKafkaType(JsonNode value) { if (value == null || value.isNull()) { return STRING; } else if (value.isNumber()) { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java index fc4a5e74c..8f392fa0a 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java @@ -42,11 +42,8 @@ public void evolveIcebergSchemaIfNeeded( SinkRecord record, Map schemaAlreadyInUse) { String tableName = targetItems.getTableName(); - // don't care about fields, just find which columns are changed. - Set columnsToEvolve = - targetItems.getColumnsToAdd().stream() - .map(targetItem -> targetItem.split("\\.")[0].replaceAll("\"", "")) - .collect(Collectors.toSet()); + + Set columnsToEvolve = getColumnsToEvolve(targetItems); // Add columns if needed, ignore any exceptions since other task might be succeeded if (!columnsToEvolve.isEmpty()) { @@ -110,6 +107,18 @@ public void evolveIcebergSchemaIfNeeded( } } + /** + * Get only column names, ignore nested field names. Remove double quotes. + * + *

example: TEST_STRUCT.field1 -> TEST_STRUCT + */ + private Set getColumnsToEvolve(SchemaEvolutionTargetItems targetItems) { + return targetItems.getColumnsToAdd().stream() + // remove double quotes + .map(targetItem -> targetItem.split("\\.")[0].replaceAll("\"", "")) + .collect(Collectors.toSet()); + } + private String generateAddColumnQuery(List columnsToAdd) { if (columnsToAdd.isEmpty()) { return ""; diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java index b4f32b798..b1883f64a 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java @@ -186,7 +186,7 @@ private void assertRecordsInTable() { private static Stream prepareData() { return Stream.of( - // Reading schema from a record is not yet supported. + // READING SCHEMA FROM A RECORD IS NOT YET SUPPORTED. // Arguments.of( // "Primitive JSON with schema", // primitiveJsonWithSchemaExample,