Skip to content

Commit

Permalink
detach buildType logic into a service
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bzabek committed Nov 29, 2024
1 parent 665dbb8 commit 5d5df77
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,11 @@ String getColumnName() {
return rootNode.name;
}

public IcebergFieldNode getRootNode() {
IcebergFieldNode getRootNode() {
return rootNode;
}

IcebergColumnTree(IcebergFieldNode rootNode) {
this.rootNode = rootNode;
}

/** Returns data type of the column */
String buildType() {
StringBuilder sb = new StringBuilder();
return rootNode.buildQuery(sb, "ROOT_NODE").toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.snowflake.kafka.connector.internal.KCLogger;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -12,24 +13,30 @@
import org.apache.kafka.connect.data.Schema;

public class IcebergColumnTreeFactory {

private final KCLogger LOGGER = new KCLogger(IcebergColumnTreeFactory.class.getName());

private final IcebergColumnTypeMapper mapper;

public IcebergColumnTreeFactory() {
this.mapper = new IcebergColumnTypeMapper();
}

IcebergColumnTree fromIcebergSchema(IcebergColumnSchema columnSchema) {
LOGGER.debug("Attempting to resolve schema from schema stored in a channel");
IcebergFieldNode rootNode =
createNode(columnSchema.getColumnName().toUpperCase(), columnSchema.getSchema());
return new IcebergColumnTree(rootNode);
}

IcebergColumnTree fromJson(IcebergColumnJsonValuePair pair) {
LOGGER.debug("Attempting to resolve schema from records payload");
IcebergFieldNode rootNode = createNode(pair.getColumnName().toUpperCase(), pair.getJsonNode());
return new IcebergColumnTree(rootNode);
}

IcebergColumnTree fromConnectSchema(Field kafkaConnectField) {
LOGGER.debug("Attempting to resolve schema from schema attached to a record");
IcebergFieldNode rootNode =
createNode(kafkaConnectField.name().toUpperCase(), kafkaConnectField.schema());
return new IcebergColumnTree(rootNode);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.snowflake.kafka.connector.internal.streaming.schemaevolution.iceberg;

public class IcebergColumnTreeTypeBuilder {

/** Returns data type of the column */
String buildType(IcebergColumnTree columnTree) {
StringBuilder sb = new StringBuilder();
IcebergFieldNode rootNode = columnTree.getRootNode();
return buildType(sb, rootNode, ROOT_NODE_TYPE).toString();
}

/**
* Generate Snow SQL type for the column.
*
* @param sb StringBuilder
* @param parentType Snowflake Iceberg table compatible type. ROOT_NODE_TYPE is a special case,
* here we never generate column name for it.
* @return field name + data type
*/
private StringBuilder buildType(StringBuilder sb, IcebergFieldNode fieldNode, String parentType) {
if (parentType.equals("ARRAY")
|| parentType.equals("MAP")
|| parentType.equals(ROOT_NODE_TYPE)) {
sb.append(fieldNode.snowflakeIcebergType);
} else {
appendNameAndType(sb, fieldNode);
}
if (!fieldNode.children.isEmpty()) {
sb.append("(");
appendChildren(sb, fieldNode);
sb.append(")");
}
return sb;
}

private void appendNameAndType(StringBuilder sb, IcebergFieldNode fieldNode) {
sb.append(fieldNode.name);
sb.append(" ");
sb.append(fieldNode.snowflakeIcebergType);
}

private void appendChildren(StringBuilder sb, IcebergFieldNode parentNode) {
String parentType = parentNode.snowflakeIcebergType;
parentNode.children.forEach(
(name, childNode) -> {
buildType(sb, childNode, parentType);
sb.append(", ");
});
removeLastSeparator(sb);
}

private void removeLastSeparator(StringBuilder sb) {
sb.deleteCharAt(sb.length() - 1);
sb.deleteCharAt(sb.length() - 1);
}

private static final String ROOT_NODE_TYPE = "ROOT_NODE";
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@

class IcebergFieldNode {

// todo consider refactoring into some more classes
private final IcebergColumnTypeMapper mapper = IcebergColumnTypeMapper.INSTANCE;

final String name;

final String snowflakeIcebergType;
Expand All @@ -19,44 +16,4 @@ public IcebergFieldNode(
this.snowflakeIcebergType = snowflakeIcebergType;
this.children = children;
}

/**
* @param sb StringBuilder
* @param parentType Snowflake Iceberg table compatible type. If a root node is a parent then
* "ROOT_NODE" is passed, because we always generate root nodes column name.
* @return field name + data type
*/
StringBuilder buildQuery(StringBuilder sb, String parentType) {
if (parentType.equals("ARRAY") || parentType.equals("MAP") || parentType.equals("ROOT_NODE")) {
sb.append(snowflakeIcebergType);
} else {
appendNameAndType(sb);
}
if (!children.isEmpty()) {
sb.append("(");
appendChildren(sb, this.snowflakeIcebergType);
sb.append(")");
}
return sb;
}

private void appendNameAndType(StringBuilder sb) {
sb.append(name);
sb.append(" ");
sb.append(snowflakeIcebergType);
}

private void appendChildren(StringBuilder sb, String parentType) {
children.forEach(
(name, node) -> {
node.buildQuery(sb, parentType);
sb.append(", ");
});
removeLastSeparator(sb);
}

private void removeLastSeparator(StringBuilder sb) {
sb.deleteCharAt(sb.length() - 1);
sb.deleteCharAt(sb.length() - 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ public class IcebergSchemaEvolutionService implements SchemaEvolutionService {
private final SnowflakeConnectionService conn;
private final IcebergTableSchemaResolver icebergTableSchemaResolver;
private final IcebergColumnTreeMerger mergeTreeService;
private final IcebergColumnTreeTypeBuilder typeBuilder;

public IcebergSchemaEvolutionService(SnowflakeConnectionService conn) {
this.conn = conn;
this.icebergTableSchemaResolver = new IcebergTableSchemaResolver();
this.mergeTreeService = new IcebergColumnTreeMerger();
this.typeBuilder = new IcebergColumnTreeTypeBuilder();
}

/**
Expand Down Expand Up @@ -162,7 +164,7 @@ private Map<String, ColumnInfos> toColumnInfos(List<IcebergColumnTree> columnTre
.map(
columnTree ->
Maps.immutableEntry(
columnTree.getColumnName(), new ColumnInfos(columnTree.buildType())))
columnTree.getColumnName(), new ColumnInfos(typeBuilder.buildType(columnTree))))
.collect(
Collectors.toMap(
Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> newValue));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class ParseIcebergColumnTreeTest {

private final IcebergColumnTreeFactory treeFactory = new IcebergColumnTreeFactory();
private final IcebergColumnTreeMerger mergeTreeService = new IcebergColumnTreeMerger();
private final IcebergColumnTreeTypeBuilder typeBuilder = new IcebergColumnTreeTypeBuilder();

@ParameterizedTest
@MethodSource("icebergSchemas")
Expand All @@ -34,7 +35,7 @@ void parseFromApacheIcebergSchema(String plainIcebergSchema, String expectedType
IcebergColumnSchema apacheSchema = new IcebergColumnSchema(type, "TEST_COLUMN_NAME");
IcebergColumnTree tree = treeFactory.fromIcebergSchema(apacheSchema);
// then
Assertions.assertEquals(expectedType, tree.buildType());
Assertions.assertEquals(expectedType, typeBuilder.buildType(tree));
Assertions.assertEquals("TEST_COLUMN_NAME", tree.getColumnName());
}

Expand Down Expand Up @@ -99,7 +100,7 @@ void parseFromJsonRecordSchema(String jsonString, String expectedType) {
// when
IcebergColumnTree tree = treeFactory.fromJson(columnValuePair);
// then
Assertions.assertEquals(expectedType, tree.buildType());
Assertions.assertEquals(expectedType, typeBuilder.buildType(tree));
Assertions.assertEquals("TESTCOLUMNNAME", tree.getColumnName());
}

Expand Down Expand Up @@ -171,7 +172,7 @@ void mergeTwoTreesTest(String plainIcebergSchema, String recordJson, String expe
mergeTreeService.merge(alreadyExistingTree, modifiedTree);

String expected = expectedResult.replaceAll("/ +/g", " ");
Assertions.assertEquals(expected, alreadyExistingTree.buildType());
Assertions.assertEquals(expected, typeBuilder.buildType(alreadyExistingTree));
Assertions.assertEquals("TESTSTRUCT", alreadyExistingTree.getColumnName());
}

Expand Down

0 comments on commit 5d5df77

Please sign in to comment.