diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index be0a2e8058c2..cddfaf936620 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -18,55 +18,76 @@ package org.apache.paimon.flink; -import org.apache.paimon.flink.util.AbstractTestBase; +import org.apache.paimon.branch.TableBranch; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.SnapshotManager; -import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; /** IT cases for table with branches using SQL. */ -public class BranchSqlITCase extends AbstractTestBase { - - @TempDir java.nio.file.Path tempDir; +public class BranchSqlITCase extends CatalogITCaseBase { @Test - public void testAlterTable() throws Exception { - TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); - tEnv.executeSql( - "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + tempDir + "' )"); - tEnv.executeSql("USE CATALOG mycat"); - tEnv.executeSql( - "CREATE TABLE t ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k) NOT ENFORCED ) " - + "PARTITIONED BY (pt) WITH ( 'bucket' = '2' )"); - - tEnv.executeSql( - "INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana'), (2, 10, 'cat'), (2, 20, 'dog')") - .await(); - tEnv.executeSql("CALL sys.create_branch('default.t', 'test', 1)"); - tEnv.executeSql("INSERT INTO t VALUES (1, 10, 'APPLE'), (2, 20, 'DOG'), (2, 30, 'horse')") - .await(); - - tEnv.executeSql("ALTER TABLE `t$branch_test` ADD (v2 INT)").await(); - tEnv.executeSql( - "INSERT INTO `t$branch_test` VALUES " - + "(1, 10, 'cherry', 100), (2, 20, 'bird', 200), (2, 40, 'wolf', 400)") - .await(); - - assertThat(collectResult(tEnv, "SELECT * FROM t")) + public void testAlterBranchTable() throws Exception { + + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + sql( + "INSERT INTO T VALUES" + + " (1, 10, 'apple')," + + " (1, 20, 'banana')," + + " (2, 10, 'cat')," + + " (2, 20, 'dog')"); + + sql("CALL sys.create_branch('default.T', 'test', 1)"); + + FileStoreTable branchTable = paimonTable("T$branch_test"); + assertThat(branchTable.schema().fields().size()).isEqualTo(3); + + sql( + "INSERT INTO T VALUES" + + " (1, 10, 'APPLE')," + + " (2, 20, 'DOG')," + + " (2, 30, 'horse')"); + + // Add v2 column for branch table. + sql("ALTER TABLE `T$branch_test` ADD (v2 INT)"); + + branchTable = paimonTable("T$branch_test"); + assertThat(branchTable.schema().fields().size()).isEqualTo(4); + + sql( + "INSERT INTO `T$branch_test` VALUES " + + "(1, 10, 'cherry', 100)" + + ", (2, 20, 'bird', 200)" + + ", (2, 40, 'wolf', 400)"); + + assertThat(collectResult("SELECT * FROM T")) .containsExactlyInAnyOrder( "+I[1, 10, APPLE]", "+I[1, 20, banana]", "+I[2, 30, horse]", "+I[2, 10, cat]", "+I[2, 20, DOG]"); - assertThat(collectResult(tEnv, "SELECT * FROM t$branch_test")) + + assertThat(collectResult("SELECT * FROM T$branch_test")) .containsExactlyInAnyOrder( "+I[1, 10, cherry, 100]", "+I[1, 20, banana, null]", @@ -75,7 +96,224 @@ public void testAlterTable() throws Exception { "+I[2, 40, wolf, 400]"); } - private List collectResult(TableEnvironment tEnv, String sql) throws Exception { + @Test + public void testCreateBranchFromTag() throws Exception { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + // snapshot 1. + sql("INSERT INTO T VALUES" + " (1, 10, 'apple')," + " (1, 20, 'banana')"); + // snapshot 2. + sql("INSERT INTO T VALUES" + " (2, 10, 'cat')," + " (2, 20, 'dog')"); + + sql("CALL sys.create_tag('default.T', 'tag1', 1)"); + sql("CALL sys.create_tag('default.T', 'tag2', 2)"); + + sql("CALL sys.create_branch('default.T', 'test', 'tag1')"); + sql("CALL sys.create_branch('default.T', 'test2', 'tag2')"); + + FileStoreTable branchTable = paimonTable("T$branch_test"); + assertThat(branchTable.tagManager().tagExists("tag1")).isEqualTo(true); + + assertThat(collectResult("SELECT * FROM T$branch_test")) + .containsExactlyInAnyOrder("+I[1, 10, apple]", "+I[1, 20, banana]"); + + FileStoreTable branchTable2 = paimonTable("T$branch_test2"); + assertThat(branchTable2.tagManager().tagExists("tag2")).isEqualTo(true); + + assertThat(collectResult("SELECT * FROM T$branch_test2")) + .containsExactlyInAnyOrder( + "+I[1, 10, apple]", + "+I[1, 20, banana]", + "+I[2, 10, cat]", + "+I[2, 20, dog]"); + } + + @Test + public void testCreateBranchFromSnapshot() throws Catalog.TableNotExistException { + + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + // snapshot 1. + sql("INSERT INTO T VALUES(1, 10, 'apple')"); + + // snapshot 2. + sql("INSERT INTO T VALUES(1, 20, 'dog')"); + + sql("CALL sys.create_branch('default.T', 'test', 1)"); + sql("CALL sys.create_branch('default.T', 'test2', 2)"); + + FileStoreTable table = paimonTable("T"); + + assertThat( + table.branchManager().branches().stream() + .map(TableBranch::getCreatedFromSnapshot)) + .containsExactlyInAnyOrder(1L, 2L); + + assertThat(paimonTable("T$branch_test").snapshotManager().snapshotExists(1)) + .isEqualTo(true); + + assertThat(paimonTable("T$branch_test2").snapshotManager().snapshotExists(2)) + .isEqualTo(true); + } + + @Test + public void testCreateEmptyBranch() throws Exception { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + // snapshot 1. + sql("INSERT INTO T VALUES(1, 10, 'apple')"); + + // snapshot 2. + sql("INSERT INTO T VALUES(1, 20, 'dog')"); + + assertThat(collectResult("SELECT * FROM T")) + .containsExactlyInAnyOrder("+I[1, 10, apple]", "+I[1, 20, dog]"); + + // create en empty branch. + sql("CALL sys.create_branch('default.T', 'empty_branch')"); + + sql("INSERT INTO `T$branch_empty_branch` VALUES (3, 30, 'banana')"); + + assertThat(collectResult("SELECT * FROM T$branch_empty_branch")) + .containsExactlyInAnyOrder("+I[3, 30, banana]"); + } + + @Test + public void testDeleteBranchTable() throws Exception { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + // snapshot 1. + sql("INSERT INTO T VALUES(1, 10, 'apple')"); + + // snapshot 2. + sql("INSERT INTO T VALUES(1, 20, 'dog')"); + + sql("CALL sys.create_branch('default.T', 'test', 1)"); + sql("CALL sys.create_branch('default.T', 'test2', 2)"); + + FileStoreTable table = paimonTable("T"); + + assertThat( + table.branchManager().branches().stream() + .map(TableBranch::getCreatedFromSnapshot)) + .containsExactlyInAnyOrder(1L, 2L); + + assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName)) + .containsExactlyInAnyOrder("test", "test2"); + + sql("CALL sys.delete_branch('default.T', 'test')"); + + assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName)) + .containsExactlyInAnyOrder("test2"); + } + + @Test + public void testBranchManagerGetBranchSnapshotsList() + throws Catalog.TableNotExistException, IOException { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + sql("INSERT INTO T VALUES (1, 10, 'hxh')"); + sql("INSERT INTO T VALUES (1, 20, 'hxh')"); + sql("INSERT INTO T VALUES (1, 30, 'hxh')"); + + FileStoreTable table = paimonTable("T"); + checkSnapshots(table.snapshotManager(), 1, 3); + + sql("CALL sys.create_branch('default.T', 'test1', 1)"); + sql("CALL sys.create_branch('default.T', 'test2', 2)"); + sql("CALL sys.create_branch('default.T', 'test3', 3)"); + + assertThat( + table.branchManager().branches().stream() + .map(TableBranch::getCreatedFromSnapshot)) + .containsExactlyInAnyOrder(1L, 2L, 3L); + } + + @Test + public void testBranchFastForward() throws Exception { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + FileStoreTable table = paimonTable("T"); + SnapshotManager snapshotManager = table.snapshotManager(); + + sql("INSERT INTO T VALUES (1, 10, 'hunter')"); + sql("INSERT INTO T VALUES (1, 20, 'hunter')"); + sql("INSERT INTO T VALUES (1, 30, 'hunter')"); + + checkSnapshots(snapshotManager, 1, 3); + + assertThat(collectResult("SELECT * FROM T")) + .containsExactlyInAnyOrder( + "+I[1, 10, hunter]", "+I[1, 20, hunter]", "+I[1, 30, hunter]"); + + sql("CALL sys.create_branch('default.T', 'test', 1)"); + + sql("INSERT INTO `T$branch_test` VALUES (2, 10, 'hunterX')"); + + checkSnapshots(paimonTable("T$branch_test").snapshotManager(), 1, 2); + + // query branch data. + assertThat(collectResult("SELECT * FROM T$branch_test")) + .containsExactlyInAnyOrder("+I[1, 10, hunter]", "+I[2, 10, hunterX]"); + + sql("CALL sys.fast_forward('default.T', 'test')"); + + // Branch `test` replaces the main branch. + assertThat(collectResult("SELECT * FROM T")) + .containsExactlyInAnyOrder("+I[1, 10, hunter]", "+I[2, 10, hunterX]"); + + checkSnapshots(snapshotManager, 1, 2); + } + + private List collectResult(String sql) throws Exception { List result = new ArrayList<>(); try (CloseableIterator it = tEnv.executeSql(sql).collect()) { while (it.hasNext()) { @@ -84,4 +322,10 @@ private List collectResult(TableEnvironment tEnv, String sql) throws Exc } return result; } + + private void checkSnapshots(SnapshotManager sm, int earliest, int latest) throws IOException { + assertThat(sm.snapshotCount()).isEqualTo(latest - earliest + 1); + assertThat(sm.earliestSnapshotId()).isEqualTo(earliest); + assertThat(sm.latestSnapshotId()).isEqualTo(latest); + } }