Skip to content

Commit

Permalink
SNOW-1842220 Add comments to columns created by schema evolution (#1014)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-bzabek authored Dec 3, 2024
1 parent c16d13d commit 90670e4
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,18 @@ public class DescribeTableRow {
private final String column;
private final String type;

private final String comment;

public DescribeTableRow(String column, String type, String comment) {
this.column = column;
this.type = type;
this.comment = comment;
}

public DescribeTableRow(String column, String type) {
this.column = column;
this.type = type;
this.comment = null;
}

public String getColumn() {
Expand All @@ -20,6 +29,10 @@ public String getType() {
return type;
}

public String getComment() {
return comment;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,8 @@ public Optional<List<DescribeTableRow>> describeTable(String tableName) {
while (result.next()) {
String columnName = result.getString("name");
String type = result.getString("type");
rows.add(new DescribeTableRow(columnName, type));
String comment = result.getString("comment");
rows.add(new DescribeTableRow(columnName, type, comment));
}
return Optional.of(rows);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
class IcebergColumnTree {

private final IcebergFieldNode rootNode;
private final String comment;

String getColumnName() {
return rootNode.name;
Expand All @@ -13,7 +14,17 @@ IcebergFieldNode getRootNode() {
return rootNode;
}

String getComment() {
return comment;
}

public IcebergColumnTree(IcebergFieldNode rootNode, String comment) {
this.rootNode = rootNode;
this.comment = comment;
}

IcebergColumnTree(IcebergFieldNode rootNode) {
this.rootNode = rootNode;
this.comment = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ IcebergColumnTree fromConnectSchema(Field kafkaConnectField) {
+ kafkaConnectField.name());
IcebergFieldNode rootNode =
createNode(kafkaConnectField.name().toUpperCase(), kafkaConnectField.schema());
return new IcebergColumnTree(rootNode);
return new IcebergColumnTree(rootNode, kafkaConnectField.schema().doc());
}

// -- parse tree from Iceberg schema logic --
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ private Map<String, ColumnInfos> toColumnInfos(List<IcebergColumnTree> columnTre
.map(
columnTree ->
Maps.immutableEntry(
columnTree.getColumnName(), new ColumnInfos(typeBuilder.buildType(columnTree))))
columnTree.getColumnName(),
new ColumnInfos(typeBuilder.buildType(columnTree), columnTree.getComment())))
.collect(
Collectors.toMap(
Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> newValue));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ public void testEvolutionOfPrimitives_withSchema(
private static Stream<Arguments> primitiveEvolutionDataSource() {
return Stream.of(
Arguments.of(
singleBooleanField(),
singleBooleanFieldWithSchema(),
booleanAndIntWithSchema(),
booleanAndAllKindsOfIntWithSchema(),
allPrimitivesWithSchema(),
Expand Down Expand Up @@ -483,4 +483,24 @@ private static Stream<Arguments> testEvolutionOfComplexTypes_dataSource() {
twoObjectsExtendedWithMapAndArrayPayload(),
false));
}

@Test
@Disabled
void shouldAppendCommentTest() throws Exception {
// when
// insert record with a comment
insertWithRetry(schemaAndPayloadWithComment(), 0, true);
// insert record without a comment
insertWithRetry(singleBooleanFieldWithSchema(), 1, true);
waitForOffset(2);

// then
// comment is read from schema and set into first column
List<DescribeTableRow> columns = describeTable(tableName);
assertEquals("Test comment", columns.get(1).getComment());
// default comment is set into second column
assertEquals(
"column created by schema evolution from Snowflake Kafka Connector",
columns.get(2).getComment());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ static String complexPayloadWithSchema() {
+ " }"
+ " }";

static String singleBooleanField() {
static String singleBooleanFieldWithSchema() {
return SCHEMA_BEGINNING
+ BOOL_SCHEMA
+ SCHEMA_END
Expand Down Expand Up @@ -476,6 +476,15 @@ static String twoObjectsExtendedWithMapAndArrayPayload() {
+ " }";
}

static String schemaAndPayloadWithComment() {
return SCHEMA_BEGINNING
+ COMMENTED_SCHEMA
+ SCHEMA_END
+ "\"payload\": {"
+ STRING_PAYLOAD
+ "}}";
}

static String BOOL_SCHEMA = " { \"field\" : \"test_boolean\", \"type\" : \"boolean\"} ";

static String INT64_SCHEMA = "{ \"field\" : \"test_int64\", \"type\" : \"int64\" }";
Expand All @@ -489,6 +498,9 @@ static String twoObjectsExtendedWithMapAndArrayPayload() {

static String STRING_SCHEMA = "{ \"field\" : \"test_string\", \"type\" : \"string\" }";

static String COMMENTED_SCHEMA =
"{ \"field\" : \"test_string\", \"type\" : \"string\", \"doc\": \"Test comment\" }";

static final String BOOL_PAYLOAD = "\"test_boolean\" : true ";
static final String INT64_PAYLOAD = "\"test_int64\" : 2137324241343241 ";
static final String INT32_PAYLOAD = "\"test_int32\" : 2137 ";
Expand Down

0 comments on commit 90670e4

Please sign in to comment.