diff --git a/src/main/java/com/snowflake/kafka/connector/internal/DescribeTableRow.java b/src/main/java/com/snowflake/kafka/connector/internal/DescribeTableRow.java index 5a385819e..40db5623e 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/DescribeTableRow.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/DescribeTableRow.java @@ -7,9 +7,18 @@ public class DescribeTableRow { private final String column; private final String type; + private final String comment; + + public DescribeTableRow(String column, String type, String comment) { + this.column = column; + this.type = type; + this.comment = comment; + } + public DescribeTableRow(String column, String type) { this.column = column; this.type = type; + this.comment = null; } public String getColumn() { @@ -20,6 +29,10 @@ public String getType() { return type; } + public String getComment() { + return comment; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 51171e928..3cb5bd7c2 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -1179,7 +1179,8 @@ public Optional> describeTable(String tableName) { while (result.next()) { String columnName = result.getString("name"); String type = result.getString("type"); - rows.add(new DescribeTableRow(columnName, type)); + String comment = result.getString("comment"); + rows.add(new DescribeTableRow(columnName, type, comment)); } return Optional.of(rows); } catch (Exception e) { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java index 03eb2d36b..d6e9a3c50 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTree.java @@ -4,6 +4,7 @@ class IcebergColumnTree { private final IcebergFieldNode rootNode; + private final String comment; String getColumnName() { return rootNode.name; @@ -13,7 +14,17 @@ IcebergFieldNode getRootNode() { return rootNode; } + String getComment() { + return comment; + } + + public IcebergColumnTree(IcebergFieldNode rootNode, String comment) { + this.rootNode = rootNode; + this.comment = comment; + } + IcebergColumnTree(IcebergFieldNode rootNode) { this.rootNode = rootNode; + this.comment = null; } } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java index 4cd2a72bc..17e3d4cc8 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTreeFactory.java @@ -44,7 +44,7 @@ IcebergColumnTree fromConnectSchema(Field kafkaConnectField) { + kafkaConnectField.name()); IcebergFieldNode rootNode = createNode(kafkaConnectField.name().toUpperCase(), kafkaConnectField.schema()); - return new IcebergColumnTree(rootNode); + return new IcebergColumnTree(rootNode, kafkaConnectField.schema().doc()); } // -- parse tree from Iceberg schema logic -- diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java index 99aa4fe1d..d2a59dfba 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergSchemaEvolutionService.java @@ -170,7 +170,8 @@ private Map toColumnInfos(List columnTre .map( columnTree -> Maps.immutableEntry( - columnTree.getColumnName(), new ColumnInfos(typeBuilder.buildType(columnTree)))) + columnTree.getColumnName(), + new ColumnInfos(typeBuilder.buildType(columnTree), columnTree.getComment()))) .collect( Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> newValue)); diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java index 338bc4130..0ead14d20 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/IcebergIngestionSchemaEvolutionIT.java @@ -384,7 +384,7 @@ public void testEvolutionOfPrimitives_withSchema( private static Stream primitiveEvolutionDataSource() { return Stream.of( Arguments.of( - singleBooleanField(), + singleBooleanFieldWithSchema(), booleanAndIntWithSchema(), booleanAndAllKindsOfIntWithSchema(), allPrimitivesWithSchema(), @@ -483,4 +483,24 @@ private static Stream testEvolutionOfComplexTypes_dataSource() { twoObjectsExtendedWithMapAndArrayPayload(), false)); } + + @Test + @Disabled + void shouldAppendCommentTest() throws Exception { + // when + // insert record with a comment + insertWithRetry(schemaAndPayloadWithComment(), 0, true); + // insert record without a comment + insertWithRetry(singleBooleanFieldWithSchema(), 1, true); + waitForOffset(2); + + // then + // comment is read from schema and set into first column + List columns = describeTable(tableName); + assertEquals("Test comment", columns.get(1).getComment()); + // default comment is set into second column + assertEquals( + "column created by schema evolution from Snowflake Kafka Connector", + columns.get(2).getComment()); + } } diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java index 449e1187e..59ca9e89f 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/TestJsons.java @@ -142,7 +142,7 @@ static String complexPayloadWithSchema() { + " }" + " }"; - static String singleBooleanField() { + static String singleBooleanFieldWithSchema() { return SCHEMA_BEGINNING + BOOL_SCHEMA + SCHEMA_END @@ -476,6 +476,15 @@ static String twoObjectsExtendedWithMapAndArrayPayload() { + " }"; } + static String schemaAndPayloadWithComment() { + return SCHEMA_BEGINNING + + COMMENTED_SCHEMA + + SCHEMA_END + + "\"payload\": {" + + STRING_PAYLOAD + + "}}"; + } + static String BOOL_SCHEMA = " { \"field\" : \"test_boolean\", \"type\" : \"boolean\"} "; static String INT64_SCHEMA = "{ \"field\" : \"test_int64\", \"type\" : \"int64\" }"; @@ -489,6 +498,9 @@ static String twoObjectsExtendedWithMapAndArrayPayload() { static String STRING_SCHEMA = "{ \"field\" : \"test_string\", \"type\" : \"string\" }"; + static String COMMENTED_SCHEMA = + "{ \"field\" : \"test_string\", \"type\" : \"string\", \"doc\": \"Test comment\" }"; + static final String BOOL_PAYLOAD = "\"test_boolean\" : true "; static final String INT64_PAYLOAD = "\"test_int64\" : 2137324241343241 "; static final String INT32_PAYLOAD = "\"test_int32\" : 2137 ";