Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1842220 Add comments to columns created by schema evolution #1014

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,18 @@ public class DescribeTableRow {
private final String column;
private final String type;

private final String comment;

public DescribeTableRow(String column, String type, String comment) {
this.column = column;
this.type = type;
this.comment = comment;
}

public DescribeTableRow(String column, String type) {
this.column = column;
this.type = type;
this.comment = null;
}

public String getColumn() {
Expand All @@ -20,6 +29,10 @@ public String getType() {
return type;
}

public String getComment() {
return comment;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1179,7 +1179,8 @@ public Optional<List<DescribeTableRow>> describeTable(String tableName) {
while (result.next()) {
String columnName = result.getString("name");
String type = result.getString("type");
rows.add(new DescribeTableRow(columnName, type));
String comment = result.getString("comment");
rows.add(new DescribeTableRow(columnName, type, comment));
}
return Optional.of(rows);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
class IcebergColumnTree {

private final IcebergFieldNode rootNode;
private final String comment;

String getColumnName() {
return rootNode.name;
Expand All @@ -13,7 +14,17 @@ IcebergFieldNode getRootNode() {
return rootNode;
}

String getComment() {
return comment;
}

public IcebergColumnTree(IcebergFieldNode rootNode, String comment) {
this.rootNode = rootNode;
this.comment = comment;
}

IcebergColumnTree(IcebergFieldNode rootNode) {
this.rootNode = rootNode;
this.comment = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ IcebergColumnTree fromConnectSchema(Field kafkaConnectField) {
+ kafkaConnectField.name());
IcebergFieldNode rootNode =
createNode(kafkaConnectField.name().toUpperCase(), kafkaConnectField.schema());
return new IcebergColumnTree(rootNode);
return new IcebergColumnTree(rootNode, kafkaConnectField.schema().doc());
}

// -- parse tree from Iceberg schema logic --
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ private Map<String, ColumnInfos> toColumnInfos(List<IcebergColumnTree> columnTre
.map(
columnTree ->
Maps.immutableEntry(
columnTree.getColumnName(), new ColumnInfos(typeBuilder.buildType(columnTree))))
columnTree.getColumnName(),
new ColumnInfos(typeBuilder.buildType(columnTree), columnTree.getComment())))
.collect(
Collectors.toMap(
Map.Entry::getKey, Map.Entry::getValue, (oldValue, newValue) -> newValue));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ public void testEvolutionOfPrimitives_withSchema(
private static Stream<Arguments> primitiveEvolutionDataSource() {
return Stream.of(
Arguments.of(
singleBooleanField(),
singleBooleanFieldWithSchema(),
booleanAndIntWithSchema(),
booleanAndAllKindsOfIntWithSchema(),
allPrimitivesWithSchema(),
Expand Down Expand Up @@ -483,4 +483,24 @@ private static Stream<Arguments> testEvolutionOfComplexTypes_dataSource() {
twoObjectsExtendedWithMapAndArrayPayload(),
false));
}

@Test
@Disabled
void shouldAppendCommentTest() throws Exception {
// when
// insert record with a comment
insertWithRetry(schemaAndPayloadWithComment(), 0, true);
// insert record without a comment
insertWithRetry(singleBooleanFieldWithSchema(), 1, true);
waitForOffset(2);

// then
// comment is read from schema and set into first column
List<DescribeTableRow> columns = describeTable(tableName);
assertEquals("Test comment", columns.get(1).getComment());
// default comment is set into second column
assertEquals(
"column created by schema evolution from Snowflake Kafka Connector",
columns.get(2).getComment());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ static String complexPayloadWithSchema() {
+ " }"
+ " }";

static String singleBooleanField() {
static String singleBooleanFieldWithSchema() {
return SCHEMA_BEGINNING
+ BOOL_SCHEMA
+ SCHEMA_END
Expand Down Expand Up @@ -476,6 +476,15 @@ static String twoObjectsExtendedWithMapAndArrayPayload() {
+ " }";
}

static String schemaAndPayloadWithComment() {
return SCHEMA_BEGINNING
+ COMMENTED_SCHEMA
+ SCHEMA_END
+ "\"payload\": {"
+ STRING_PAYLOAD
+ "}}";
}

static String BOOL_SCHEMA = " { \"field\" : \"test_boolean\", \"type\" : \"boolean\"} ";

static String INT64_SCHEMA = "{ \"field\" : \"test_int64\", \"type\" : \"int64\" }";
Expand All @@ -489,6 +498,9 @@ static String twoObjectsExtendedWithMapAndArrayPayload() {

static String STRING_SCHEMA = "{ \"field\" : \"test_string\", \"type\" : \"string\" }";

static String COMMENTED_SCHEMA =
"{ \"field\" : \"test_string\", \"type\" : \"string\", \"doc\": \"Test comment\" }";

static final String BOOL_PAYLOAD = "\"test_boolean\" : true ";
static final String INT64_PAYLOAD = "\"test_int64\" : 2137324241343241 ";
static final String INT32_PAYLOAD = "\"test_int32\" : 2137 ";
Expand Down
Loading