Skip to content

Commit

Permalink
[test] Complement the ITCase for flink branchSql. (apache#3811)
Browse files Browse the repository at this point in the history
  • Loading branch information
LinMingQiang authored Jul 28, 2024
1 parent 59e0ae1 commit 6dbeff8
Showing 1 changed file with 275 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,55 +18,76 @@

package org.apache.paimon.flink;

import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.branch.TableBranch;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotManager;

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

/** IT cases for table with branches using SQL. */
public class BranchSqlITCase extends AbstractTestBase {

@TempDir java.nio.file.Path tempDir;
public class BranchSqlITCase extends CatalogITCaseBase {

@Test
public void testAlterTable() throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql(
"CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = '" + tempDir + "' )");
tEnv.executeSql("USE CATALOG mycat");
tEnv.executeSql(
"CREATE TABLE t ( pt INT, k INT, v STRING, PRIMARY KEY (pt, k) NOT ENFORCED ) "
+ "PARTITIONED BY (pt) WITH ( 'bucket' = '2' )");

tEnv.executeSql(
"INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana'), (2, 10, 'cat'), (2, 20, 'dog')")
.await();
tEnv.executeSql("CALL sys.create_branch('default.t', 'test', 1)");
tEnv.executeSql("INSERT INTO t VALUES (1, 10, 'APPLE'), (2, 20, 'DOG'), (2, 30, 'horse')")
.await();

tEnv.executeSql("ALTER TABLE `t$branch_test` ADD (v2 INT)").await();
tEnv.executeSql(
"INSERT INTO `t$branch_test` VALUES "
+ "(1, 10, 'cherry', 100), (2, 20, 'bird', 200), (2, 40, 'wolf', 400)")
.await();

assertThat(collectResult(tEnv, "SELECT * FROM t"))
public void testAlterBranchTable() throws Exception {

sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

sql(
"INSERT INTO T VALUES"
+ " (1, 10, 'apple'),"
+ " (1, 20, 'banana'),"
+ " (2, 10, 'cat'),"
+ " (2, 20, 'dog')");

sql("CALL sys.create_branch('default.T', 'test', 1)");

FileStoreTable branchTable = paimonTable("T$branch_test");
assertThat(branchTable.schema().fields().size()).isEqualTo(3);

sql(
"INSERT INTO T VALUES"
+ " (1, 10, 'APPLE'),"
+ " (2, 20, 'DOG'),"
+ " (2, 30, 'horse')");

// Add v2 column for branch table.
sql("ALTER TABLE `T$branch_test` ADD (v2 INT)");

branchTable = paimonTable("T$branch_test");
assertThat(branchTable.schema().fields().size()).isEqualTo(4);

sql(
"INSERT INTO `T$branch_test` VALUES "
+ "(1, 10, 'cherry', 100)"
+ ", (2, 20, 'bird', 200)"
+ ", (2, 40, 'wolf', 400)");

assertThat(collectResult("SELECT * FROM T"))
.containsExactlyInAnyOrder(
"+I[1, 10, APPLE]",
"+I[1, 20, banana]",
"+I[2, 30, horse]",
"+I[2, 10, cat]",
"+I[2, 20, DOG]");
assertThat(collectResult(tEnv, "SELECT * FROM t$branch_test"))

assertThat(collectResult("SELECT * FROM T$branch_test"))
.containsExactlyInAnyOrder(
"+I[1, 10, cherry, 100]",
"+I[1, 20, banana, null]",
Expand All @@ -75,7 +96,224 @@ public void testAlterTable() throws Exception {
"+I[2, 40, wolf, 400]");
}

private List<String> collectResult(TableEnvironment tEnv, String sql) throws Exception {
@Test
public void testCreateBranchFromTag() throws Exception {
sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

// snapshot 1.
sql("INSERT INTO T VALUES" + " (1, 10, 'apple')," + " (1, 20, 'banana')");
// snapshot 2.
sql("INSERT INTO T VALUES" + " (2, 10, 'cat')," + " (2, 20, 'dog')");

sql("CALL sys.create_tag('default.T', 'tag1', 1)");
sql("CALL sys.create_tag('default.T', 'tag2', 2)");

sql("CALL sys.create_branch('default.T', 'test', 'tag1')");
sql("CALL sys.create_branch('default.T', 'test2', 'tag2')");

FileStoreTable branchTable = paimonTable("T$branch_test");
assertThat(branchTable.tagManager().tagExists("tag1")).isEqualTo(true);

assertThat(collectResult("SELECT * FROM T$branch_test"))
.containsExactlyInAnyOrder("+I[1, 10, apple]", "+I[1, 20, banana]");

FileStoreTable branchTable2 = paimonTable("T$branch_test2");
assertThat(branchTable2.tagManager().tagExists("tag2")).isEqualTo(true);

assertThat(collectResult("SELECT * FROM T$branch_test2"))
.containsExactlyInAnyOrder(
"+I[1, 10, apple]",
"+I[1, 20, banana]",
"+I[2, 10, cat]",
"+I[2, 20, dog]");
}

@Test
public void testCreateBranchFromSnapshot() throws Catalog.TableNotExistException {

sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

// snapshot 1.
sql("INSERT INTO T VALUES(1, 10, 'apple')");

// snapshot 2.
sql("INSERT INTO T VALUES(1, 20, 'dog')");

sql("CALL sys.create_branch('default.T', 'test', 1)");
sql("CALL sys.create_branch('default.T', 'test2', 2)");

FileStoreTable table = paimonTable("T");

assertThat(
table.branchManager().branches().stream()
.map(TableBranch::getCreatedFromSnapshot))
.containsExactlyInAnyOrder(1L, 2L);

assertThat(paimonTable("T$branch_test").snapshotManager().snapshotExists(1))
.isEqualTo(true);

assertThat(paimonTable("T$branch_test2").snapshotManager().snapshotExists(2))
.isEqualTo(true);
}

@Test
public void testCreateEmptyBranch() throws Exception {
sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

// snapshot 1.
sql("INSERT INTO T VALUES(1, 10, 'apple')");

// snapshot 2.
sql("INSERT INTO T VALUES(1, 20, 'dog')");

assertThat(collectResult("SELECT * FROM T"))
.containsExactlyInAnyOrder("+I[1, 10, apple]", "+I[1, 20, dog]");

// create en empty branch.
sql("CALL sys.create_branch('default.T', 'empty_branch')");

sql("INSERT INTO `T$branch_empty_branch` VALUES (3, 30, 'banana')");

assertThat(collectResult("SELECT * FROM T$branch_empty_branch"))
.containsExactlyInAnyOrder("+I[3, 30, banana]");
}

@Test
public void testDeleteBranchTable() throws Exception {
sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

// snapshot 1.
sql("INSERT INTO T VALUES(1, 10, 'apple')");

// snapshot 2.
sql("INSERT INTO T VALUES(1, 20, 'dog')");

sql("CALL sys.create_branch('default.T', 'test', 1)");
sql("CALL sys.create_branch('default.T', 'test2', 2)");

FileStoreTable table = paimonTable("T");

assertThat(
table.branchManager().branches().stream()
.map(TableBranch::getCreatedFromSnapshot))
.containsExactlyInAnyOrder(1L, 2L);

assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName))
.containsExactlyInAnyOrder("test", "test2");

sql("CALL sys.delete_branch('default.T', 'test')");

assertThat(table.branchManager().branches().stream().map(TableBranch::getBranchName))
.containsExactlyInAnyOrder("test2");
}

@Test
public void testBranchManagerGetBranchSnapshotsList()
throws Catalog.TableNotExistException, IOException {
sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

sql("INSERT INTO T VALUES (1, 10, 'hxh')");
sql("INSERT INTO T VALUES (1, 20, 'hxh')");
sql("INSERT INTO T VALUES (1, 30, 'hxh')");

FileStoreTable table = paimonTable("T");
checkSnapshots(table.snapshotManager(), 1, 3);

sql("CALL sys.create_branch('default.T', 'test1', 1)");
sql("CALL sys.create_branch('default.T', 'test2', 2)");
sql("CALL sys.create_branch('default.T', 'test3', 3)");

assertThat(
table.branchManager().branches().stream()
.map(TableBranch::getCreatedFromSnapshot))
.containsExactlyInAnyOrder(1L, 2L, 3L);
}

@Test
public void testBranchFastForward() throws Exception {
sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

FileStoreTable table = paimonTable("T");
SnapshotManager snapshotManager = table.snapshotManager();

sql("INSERT INTO T VALUES (1, 10, 'hunter')");
sql("INSERT INTO T VALUES (1, 20, 'hunter')");
sql("INSERT INTO T VALUES (1, 30, 'hunter')");

checkSnapshots(snapshotManager, 1, 3);

assertThat(collectResult("SELECT * FROM T"))
.containsExactlyInAnyOrder(
"+I[1, 10, hunter]", "+I[1, 20, hunter]", "+I[1, 30, hunter]");

sql("CALL sys.create_branch('default.T', 'test', 1)");

sql("INSERT INTO `T$branch_test` VALUES (2, 10, 'hunterX')");

checkSnapshots(paimonTable("T$branch_test").snapshotManager(), 1, 2);

// query branch data.
assertThat(collectResult("SELECT * FROM T$branch_test"))
.containsExactlyInAnyOrder("+I[1, 10, hunter]", "+I[2, 10, hunterX]");

sql("CALL sys.fast_forward('default.T', 'test')");

// Branch `test` replaces the main branch.
assertThat(collectResult("SELECT * FROM T"))
.containsExactlyInAnyOrder("+I[1, 10, hunter]", "+I[2, 10, hunterX]");

checkSnapshots(snapshotManager, 1, 2);
}

private List<String> collectResult(String sql) throws Exception {
List<String> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
while (it.hasNext()) {
Expand All @@ -84,4 +322,10 @@ private List<String> collectResult(TableEnvironment tEnv, String sql) throws Exc
}
return result;
}

private void checkSnapshots(SnapshotManager sm, int earliest, int latest) throws IOException {
assertThat(sm.snapshotCount()).isEqualTo(latest - earliest + 1);
assertThat(sm.earliestSnapshotId()).isEqualTo(earliest);
assertThat(sm.latestSnapshotId()).isEqualTo(latest);
}
}

0 comments on commit 6dbeff8

Please sign in to comment.