diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java index 015164191e367..dbf04b85b8428 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BranchesTable.java @@ -40,6 +40,7 @@ import org.apache.paimon.table.source.SingletonSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; @@ -84,7 +85,8 @@ public class BranchesTable implements ReadonlyTable { 0, "branch_name", SerializationUtils.newStringType(false)), new DataField( 1, "created_from_tag", SerializationUtils.newStringType(true)), - new DataField(2, "create_time", new TimestampType(false, 3)))); + new DataField(2, "created_from_snapshot", new BigIntType(true)), + new DataField(3, "create_time", new TimestampType(false, 3)))); private final FileIO fileIO; private final Path location; @@ -232,6 +234,7 @@ private List branches(FileStoreTable table) throws IOException { for (Pair path : paths) { String branchName = path.getLeft().getName().substring(BRANCH_PREFIX.length()); String basedTag = null; + Long basedSnapshotId = null; long creationTime = path.getRight(); Optional tableSchema = @@ -243,6 +246,7 @@ private List branches(FileStoreTable table) throws IOException { SortedMap> snapshotTags = branchTable.tagManager().tags(); Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId(); + if (!snapshotTags.isEmpty()) { Snapshot snapshot = snapshotTags.firstKey(); if (earliestSnapshotId >= snapshot.id()) { @@ -251,6 +255,7 @@ private List branches(FileStoreTable table) throws IOException { .tagManager() .sortTagsOfOneSnapshot(snapshotTags.get(snapshot)); basedTag = tags.get(0); + basedSnapshotId = snapshot.id(); } } } @@ -259,6 +264,7 @@ private List branches(FileStoreTable table) throws IOException { GenericRow.of( BinaryString.fromString(branchName), BinaryString.fromString(basedTag), + basedSnapshotId, Timestamp.fromLocalDateTime( DateTimeUtils.toLocalDateTime(creationTime)))); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 78aa4175a93e7..5cf9326dab28c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -169,6 +169,37 @@ public void testCreateEmptyBranch() throws Exception { .containsExactlyInAnyOrder("+I[3, 30, banana]"); } + @Test + public void testBranchManagerGetBranchSnapshotsList() throws Exception { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + sql("INSERT INTO T VALUES (1, 10, 'hxh')"); + sql("INSERT INTO T VALUES (1, 20, 'hxh')"); + sql("INSERT INTO T VALUES (1, 30, 'hxh')"); + + FileStoreTable table = paimonTable("T"); + checkSnapshots(table.snapshotManager(), 1, 3); + + sql("CALL sys.create_tag('default.T', 'tag1', 1)"); + sql("CALL sys.create_tag('default.T', 'tag2', 2)"); + sql("CALL sys.create_tag('default.T', 'tag3', 3)"); + + sql("CALL sys.create_branch('default.T', 'test1', 'tag1')"); + sql("CALL sys.create_branch('default.T', 'test2', 'tag2')"); + sql("CALL sys.create_branch('default.T', 'test3', 'tag3')"); + + assertThat(collectResult("SELECT created_from_snapshot FROM `T$branches`")) + .containsExactlyInAnyOrder("+I[1]", "+I[2]", "+I[3]"); + } + @Test public void testDeleteBranchTable() throws Exception { sql( @@ -194,13 +225,17 @@ public void testDeleteBranchTable() throws Exception { sql("CALL sys.create_branch('default.T', 'test', 'tag1')"); sql("CALL sys.create_branch('default.T', 'test2', 'tag2')"); - assertThat(collectResult("SELECT branch_name, created_from_tag FROM `T$branches`")) - .containsExactlyInAnyOrder("+I[test, tag1]", "+I[test2, tag2]"); + assertThat( + collectResult( + "SELECT branch_name, created_from_tag, created_from_snapshot FROM `T$branches`")) + .containsExactlyInAnyOrder("+I[test, tag1, 1]", "+I[test2, tag2, 2]"); sql("CALL sys.delete_branch('default.T', 'test')"); - assertThat(collectResult("SELECT branch_name, created_from_tag FROM `T$branches`")) - .containsExactlyInAnyOrder("+I[test2, tag2]"); + assertThat( + collectResult( + "SELECT branch_name, created_from_tag, created_from_snapshot FROM `T$branches`")) + .containsExactlyInAnyOrder("+I[test2, tag2, 2]"); } @Test diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index 9eaa1edf34039..9f32320204233 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -879,13 +879,13 @@ public void testBranchesTable() throws Exception { paimonTable("T$branch_branch3").createTag("tag_branch1", 2); List result = - sql("SELECT branch_name, created_from_tag FROM T$branches ORDER BY branch_name"); - + sql( + "SELECT branch_name, created_from_tag, created_from_snapshot FROM T$branches ORDER BY branch_name"); assertThat(result) .containsExactly( - Row.of("branch1", "tag1"), - Row.of("branch2", "tag1"), - Row.of("branch3", null)); + Row.of("branch1", "tag1", 1L), + Row.of("branch2", "tag1", 1L), + Row.of("branch3", null, null)); } @Test