diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java index 5186597bf..7d4f37591 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java @@ -1,8 +1,5 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; -import com.google.common.base.Preconditions; -import org.apache.kafka.connect.data.Field; - /** Class with object types compatible with Snowflake Iceberg table */ class IcebergColumnTree { @@ -12,30 +9,12 @@ String getColumnName() { return rootNode.name; } - IcebergColumnTree(IcebergColumnSchema columnSchema) { - // rootNodes name serve as a name of the column, hence it is uppercase - String columnName = columnSchema.getColumnName().toUpperCase(); - this.rootNode = new IcebergFieldNode(columnName, columnSchema.getSchema()); - } - - IcebergColumnTree(IcebergColumnJsonValuePair pair) { - // rootNodes name serve as a name of the column, hence it is uppercase - String columnName = pair.getColumnName().toUpperCase(); - this.rootNode = new IcebergFieldNode(columnName, pair.getJsonNode()); - } - - IcebergColumnTree(Field field) { - String columnName = field.name().toUpperCase(); - this.rootNode = new IcebergFieldNode(columnName, field.schema()); + public IcebergFieldNode getRootNode() { + return rootNode; } - /** Add fields from other tree. Do not override nor modify any already existing nodes. */ - IcebergColumnTree merge(IcebergColumnTree modifiedTree) { - Preconditions.checkArgument( - this.getColumnName().equals(modifiedTree.getColumnName()), - "Error merging column schemas. Tried to merge schemas for two different columns"); - this.rootNode.merge(modifiedTree.rootNode); - return this; + IcebergColumnTree(IcebergFieldNode rootNode) { + this.rootNode = rootNode; } /** Returns data type of the column */ diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java new file mode 100644 index 000000000..78b61639c --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java @@ -0,0 +1,157 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; + +public class IcebergColumnTreeFactory { + private final IcebergColumnTypeMapper mapper; + + public IcebergColumnTreeFactory() { + this.mapper = new IcebergColumnTypeMapper(); + } + + IcebergColumnTree fromIcebergSchema(IcebergColumnSchema columnSchema) { + IcebergFieldNode rootNode = + createNode(columnSchema.getColumnName().toUpperCase(), columnSchema.getSchema()); + return new IcebergColumnTree(rootNode); + } + + IcebergColumnTree fromJson(IcebergColumnJsonValuePair pair) { + IcebergFieldNode rootNode = createNode(pair.getColumnName().toUpperCase(), pair.getJsonNode()); + return new IcebergColumnTree(rootNode); + } + + IcebergColumnTree fromConnectSchema(Field kafkaConnectField) { + IcebergFieldNode rootNode = + createNode(kafkaConnectField.name().toUpperCase(), kafkaConnectField.schema()); + return new IcebergColumnTree(rootNode); + } + + // -- parse tree from Iceberg schema logic -- + private IcebergFieldNode createNode(String name, Type apacheIcebergSchema) { + String snowflakeType = mapper.mapToColumnTypeFromIcebergSchema(apacheIcebergSchema); + return new IcebergFieldNode(name, snowflakeType, produceChildren(apacheIcebergSchema)); + } + + private LinkedHashMap produceChildren(Type apacheIcebergSchema) { + // primitives must not have children + if (apacheIcebergSchema.isPrimitiveType()) { + return new LinkedHashMap<>(); + } + Type.NestedType nestedField = apacheIcebergSchema.asNestedType(); + return nestedField.fields().stream() + .collect( + Collectors.toMap( + Types.NestedField::name, + field -> createNode(field.name(), field.type()), + // It's impossible to have two same keys + (v1, v2) -> { + throw new IllegalArgumentException("Two same keys: " + v1); + }, + LinkedHashMap::new)); + } + + // -- parse tree from kafka record payload logic -- + private IcebergFieldNode createNode(String name, JsonNode jsonNode) { + String snowflakeType = mapper.mapToColumnTypeFromJson(jsonNode); + return new IcebergFieldNode(name, snowflakeType, produceChildren(jsonNode)); + } + + private LinkedHashMap produceChildren(JsonNode recordNode) { + if (recordNode.isNull()) { + return new LinkedHashMap<>(); + } + if (recordNode.isArray()) { + ArrayNode arrayNode = (ArrayNode) recordNode; + return produceChildrenFromArray(arrayNode); + } + if (recordNode.isObject()) { + ObjectNode objectNode = (ObjectNode) recordNode; + return produceChildrenFromObject(objectNode); + } + return new LinkedHashMap<>(); + } + + private LinkedHashMap produceChildrenFromArray(ArrayNode arrayNode) { + JsonNode arrayElement = arrayNode.get(0); + // VARCHAR is set for an empty array: [] -> ARRAY(VARCHAR) + if (arrayElement == null) { + LinkedHashMap child = new LinkedHashMap<>(); + child.put( + "element", new IcebergFieldNode("element", "VARCHAR(16777216)", new LinkedHashMap<>())); + return child; + } + LinkedHashMap child = new LinkedHashMap<>(); + child.put("element", createNode("element", arrayElement)); + return child; + } + + private LinkedHashMap produceChildrenFromObject(ObjectNode objectNode) { + return objectNode.properties().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + stringJsonNodeEntry -> + createNode(stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue()), + (v1, v2) -> { + throw new IllegalArgumentException("Two same keys: " + v1); + }, + LinkedHashMap::new)); + } + + // -- parse tree from kafka record schema logic -- + private IcebergFieldNode createNode(String name, Schema schema) { + String snowflakeType = + mapper.mapToColumnTypeFromKafkaSchema(schema.schema().type(), schema.schema().name()); + return new IcebergFieldNode(name, snowflakeType, produceChildren(schema.schema())); + } + + private LinkedHashMap produceChildren(Schema connectSchema) { + if (connectSchema.type() == Schema.Type.STRUCT) { + return produceChildrenFromStruct(connectSchema); + } + if (connectSchema.type() == Schema.Type.MAP) { + return produceChildrenFromMap(connectSchema); + } + if (connectSchema.type() == Schema.Type.ARRAY) { + return produceChildrenForArray(connectSchema); + } else { // isPrimitive == true + return new LinkedHashMap<>(); + } + } + + private LinkedHashMap produceChildrenForArray( + Schema connectSchemaForArray) { + LinkedHashMap child = new LinkedHashMap<>(); + child.put("element", createNode("element", connectSchemaForArray.valueSchema())); + return child; + } + + private LinkedHashMap produceChildrenFromStruct(Schema connectSchema) { + return connectSchema.fields().stream() + .collect( + Collectors.toMap( + Field::name, + f -> createNode(f.name(), f.schema()), + (v1, v2) -> { + throw new IllegalArgumentException("Two same keys: " + v1); + }, + LinkedHashMap::new)); + } + + private LinkedHashMap produceChildrenFromMap(Schema connectSchema) { + LinkedHashMap keyValue = new LinkedHashMap<>(); + // these names will not be used when creating a query + keyValue.put("key", createNode("key", connectSchema.keySchema())); + keyValue.put("value", createNode("value", connectSchema.valueSchema())); + return keyValue; + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeMerger.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeMerger.java new file mode 100644 index 000000000..be2396f2d --- /dev/null +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeMerger.java @@ -0,0 +1,39 @@ +package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; + +import com.google.common.base.Preconditions; +import com.snowflake.kafka.connector.internal.KCLogger; + +public class IcebergColumnTreeMerger { + + private final KCLogger LOGGER = new KCLogger(IcebergColumnTreeMerger.class.getName()); + + /** + * Method designed for unstructured data types. Enhances already existing unstructured columns + * with new subfields. + */ + void merge(IcebergColumnTree currentTree, IcebergColumnTree treeWithNewType) { + validate(currentTree, treeWithNewType); + LOGGER.debug("Attempting to apply changes for column:" + currentTree.getColumnName()); + + merge(currentTree.getRootNode(), treeWithNewType.getRootNode()); + } + + /** Method adds new children to a node. It does not change anything else. */ + private void merge(IcebergFieldNode currentNode, IcebergFieldNode nodeToMerge) { + nodeToMerge.children.forEach( + (key, node) -> { + IcebergFieldNode currentNodesChild = currentNode.children.get(key); + if (currentNodesChild == null) { + currentNode.children.put(key, node); + } else { + merge(currentNodesChild, node); + } + }); + } + + private void validate(IcebergColumnTree currentTree, IcebergColumnTree treeWithNewType) { + Preconditions.checkArgument( + currentTree.getColumnName().equals(treeWithNewType.getColumnName()), + "Error merging column schemas. Tried to merge schemas for two different columns"); + } +} diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java index bd63a7e21..e2d41b631 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergFieldNode.java @@ -1,15 +1,6 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.LinkedHashMap; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; class IcebergFieldNode { @@ -22,30 +13,11 @@ class IcebergFieldNode { final LinkedHashMap children; - IcebergFieldNode(String name, Type apacheIcebergSchema) { - this.name = name; - this.snowflakeIcebergType = mapper.mapToColumnTypeFromIcebergSchema(apacheIcebergSchema); - this.children = produceChildren(apacheIcebergSchema); - } - - IcebergFieldNode(String name, JsonNode jsonNode) { - this.name = name; - this.snowflakeIcebergType = mapper.mapToColumnTypeFromJson(jsonNode); - this.children = produceChildren(jsonNode); - } - - IcebergFieldNode(String name, Schema kafkaConnectField) { - this.name = name; - this.snowflakeIcebergType = - mapper.mapToColumnTypeFromKafkaSchema( - kafkaConnectField.schema().type(), kafkaConnectField.schema().name()); - this.children = produceChildren(kafkaConnectField); - } - - private IcebergFieldNode(String name, String snowflakeIcebergType) { + public IcebergFieldNode( + String name, String snowflakeIcebergType, LinkedHashMap children) { this.name = name; this.snowflakeIcebergType = snowflakeIcebergType; - this.children = new LinkedHashMap<>(); + this.children = children; } /** @@ -68,128 +40,6 @@ StringBuilder buildQuery(StringBuilder sb, String parentType) { return sb; } - /** - * Method does not modify, delete any existing nodes and its types, names. It is meant only to add - * new children. - */ - void merge(IcebergFieldNode nodeToMerge) { - nodeToMerge.children.forEach( - (key, node) -> { - IcebergFieldNode thisChild = this.children.get(key); - if (thisChild == null) { - this.children.put(key, node); - } else { - thisChild.merge(node); - } - }); - } - - private LinkedHashMap produceChildren(JsonNode recordNode) { - if (recordNode.isNull()) { - return new LinkedHashMap<>(); - } - if (recordNode.isArray()) { - ArrayNode arrayNode = (ArrayNode) recordNode; - return produceChildrenFromArray(arrayNode); - } - if (recordNode.isObject()) { - ObjectNode objectNode = (ObjectNode) recordNode; - return produceChildrenFromObject(objectNode); - } - return new LinkedHashMap<>(); - } - - private LinkedHashMap produceChildrenFromArray(ArrayNode arrayNode) { - JsonNode arrayElement = arrayNode.get(0); - // VARCHAR is set for an empty array: [] -> ARRAY(VARCHAR) - if (arrayElement == null) { - LinkedHashMap child = new LinkedHashMap<>(); - child.put("element", new IcebergFieldNode("element", "VARCHAR(16777216)")); - return child; - } - LinkedHashMap child = new LinkedHashMap<>(); - child.put("element", new IcebergFieldNode("element", arrayElement)); - return child; - } - - private LinkedHashMap produceChildrenFromObject(ObjectNode objectNode) { - return objectNode.properties().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - stringJsonNodeEntry -> - new IcebergFieldNode( - stringJsonNodeEntry.getKey(), stringJsonNodeEntry.getValue()), - (v1, v2) -> { - throw new IllegalArgumentException("Two same keys: " + v1); - }, - LinkedHashMap::new)); - } - - private LinkedHashMap produceChildren(Schema connectSchema) { - Schema.Type type = connectSchema.type(); - if (connectSchema.type() == Schema.Type.STRUCT) { - return produceChildrenFromStruct(connectSchema); - } - if (connectSchema.type() == Schema.Type.MAP) { - return produceChildrenFromMap(connectSchema); - } - if (connectSchema.type() == Schema.Type.ARRAY) { - return produceChildrenForArray(connectSchema); - } else { // isPrimitive == true - return new LinkedHashMap<>(); - } - } - - private LinkedHashMap produceChildrenForArray( - Schema connectSchemaForArray) { - LinkedHashMap child = new LinkedHashMap<>(); - child.put("element", new IcebergFieldNode("element", connectSchemaForArray.valueSchema())); - return child; - } - - private LinkedHashMap produceChildrenFromStruct(Schema connectSchema) { - return connectSchema.fields().stream() - .collect( - Collectors.toMap( - Field::name, - f -> new IcebergFieldNode(f.name(), f.schema()), - (v1, v2) -> { - throw new IllegalArgumentException("Two same keys: " + v1); - }, - LinkedHashMap::new)); - } - - private LinkedHashMap produceChildrenFromMap(Schema connectSchema) { - LinkedHashMap keyValue = new LinkedHashMap<>(); - // these names will not be used when creating a query - keyValue.put("key", new IcebergFieldNode("key", connectSchema.keySchema())); - keyValue.put("value", new IcebergFieldNode("value", connectSchema.valueSchema())); - return keyValue; - } - - private LinkedHashMap produceChildren(Type apacheIcebergSchema) { - // primitives must not have children - if (apacheIcebergSchema.isPrimitiveType()) { - return new LinkedHashMap<>(); - } - Type.NestedType nestedField = apacheIcebergSchema.asNestedType(); - return nestedField.fields().stream() - .collect( - Collectors.toMap( - Types.NestedField::name, - this::fromNestedField, - // It's impossible to have two same keys - (v1, v2) -> { - throw new IllegalArgumentException("Two same keys: " + v1); - }, - LinkedHashMap::new)); - } - - private IcebergFieldNode fromNestedField(Types.NestedField field) { - return new IcebergFieldNode(field.name(), field.type()); - } - private void appendNameAndType(StringBuilder sb) { sb.append(name); sb.append(" "); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java index 63e38ff85..bace2efde 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java @@ -21,10 +21,12 @@ public class IcebergSchemaEvolutionService implements SchemaEvolutionService { private final SnowflakeConnectionService conn; private final IcebergTableSchemaResolver icebergTableSchemaResolver; + private final IcebergColumnTreeMerger mergeTreeService; public IcebergSchemaEvolutionService(SnowflakeConnectionService conn) { this.conn = conn; this.icebergTableSchemaResolver = new IcebergTableSchemaResolver(); + this.mergeTreeService = new IcebergColumnTreeMerger(); } /** @@ -115,6 +117,9 @@ private List distinguishColumnsToModify( } private void alterAddColumns(String tableName, List addedColumns) { + if (addedColumns.isEmpty()) { + return; + } Map columnInfosMap = toColumnInfos(addedColumns); try { conn.appendColumnsToIcebergTable(tableName, columnInfosMap); @@ -148,7 +153,7 @@ private void mergeChangesIntoExistingColumns( .filter(c -> c.getColumnName().equals(existingColumn.getColumnName())) .collect(Collectors.toList()) .get(0); - existingColumn.merge(mewVersion); + mergeTreeService.merge(existingColumn, mewVersion); }); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java index e71cfe8b3..18b881b09 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergTableSchemaResolver.java @@ -17,6 +17,7 @@ class IcebergTableSchemaResolver { + private final IcebergColumnTreeFactory treeFactory = new IcebergColumnTreeFactory(); private final IcebergColumnTypeMapper mapper = IcebergColumnTypeMapper.INSTANCE; private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableSchemaResolver.class); @@ -39,7 +40,7 @@ public List resolveIcebergSchemaFromChannel( .collect(Collectors.toList()); return apacheIcebergColumnSchemas.stream() - .map(IcebergColumnTree::new) + .map(treeFactory::fromIcebergSchema) .collect(Collectors.toList()); } @@ -92,7 +93,7 @@ private List getTableSchemaFromJson( return Streams.stream(recordNode.fields()) .map(IcebergColumnJsonValuePair::from) .filter(pair -> columnsToEvolve.contains(pair.getColumnName().toUpperCase())) - .map(IcebergColumnTree::new) + .map(treeFactory::fromJson) .collect(Collectors.toList()); } @@ -131,7 +132,7 @@ private List getTableSchemaFromRecordSchema( + ", schemaColumns: " + schema.fields().stream().map(Field::name).collect(Collectors.toList())); } - return foundColumns.stream().map(IcebergColumnTree::new).collect(Collectors.toList()); + return foundColumns.stream().map(treeFactory::fromConnectSchema).collect(Collectors.toList()); } return ImmutableList.of(); } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java index b96b35c52..2f0b3074e 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/ParseIcebergColumnTreeTest.java @@ -22,6 +22,9 @@ public class ParseIcebergColumnTreeTest { + private final IcebergColumnTreeFactory treeFactory = new IcebergColumnTreeFactory(); + private final IcebergColumnTreeMerger mergeTreeService = new IcebergColumnTreeMerger(); + @ParameterizedTest @MethodSource("icebergSchemas") void parseFromApacheIcebergSchema(String plainIcebergSchema, String expectedType) { @@ -29,7 +32,7 @@ void parseFromApacheIcebergSchema(String plainIcebergSchema, String expectedType Type type = IcebergDataTypeParser.deserializeIcebergType(plainIcebergSchema); // when IcebergColumnSchema apacheSchema = new IcebergColumnSchema(type, "TEST_COLUMN_NAME"); - IcebergColumnTree tree = new IcebergColumnTree(apacheSchema); + IcebergColumnTree tree = treeFactory.fromIcebergSchema(apacheSchema); // then Assertions.assertEquals(expectedType, tree.buildType()); Assertions.assertEquals("TEST_COLUMN_NAME", tree.getColumnName()); @@ -94,7 +97,7 @@ void parseFromJsonRecordSchema(String jsonString, String expectedType) { IcebergColumnJsonValuePair columnValuePair = IcebergColumnJsonValuePair.from(recordNode.fields().next()); // when - IcebergColumnTree tree = new IcebergColumnTree(columnValuePair); + IcebergColumnTree tree = treeFactory.fromJson(columnValuePair); // then Assertions.assertEquals(expectedType, tree.buildType()); Assertions.assertEquals("TESTCOLUMNNAME", tree.getColumnName()); @@ -155,7 +158,7 @@ void mergeTwoTreesTest(String plainIcebergSchema, String recordJson, String expe // given tree parsed from channel Type type = IcebergDataTypeParser.deserializeIcebergType(plainIcebergSchema); IcebergColumnSchema apacheSchema = new IcebergColumnSchema(type, "TESTSTRUCT"); - IcebergColumnTree alreadyExistingTree = new IcebergColumnTree(apacheSchema); + IcebergColumnTree alreadyExistingTree = treeFactory.fromIcebergSchema(apacheSchema); // tree parsed from a record SinkRecord record = createKafkaRecord(recordJson, false); @@ -163,9 +166,9 @@ void mergeTwoTreesTest(String plainIcebergSchema, String recordJson, String expe IcebergColumnJsonValuePair columnValuePair = IcebergColumnJsonValuePair.from(recordNode.fields().next()); - IcebergColumnTree modifiedTree = new IcebergColumnTree(columnValuePair); + IcebergColumnTree modifiedTree = treeFactory.fromJson(columnValuePair); // then - alreadyExistingTree.merge(modifiedTree); + mergeTreeService.merge(alreadyExistingTree, modifiedTree); String expected = expectedResult.replaceAll("/ +/g", " "); Assertions.assertEquals(expected, alreadyExistingTree.buildType());