Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Apr 18, 2024
1 parent 00108d3 commit ea7785e
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,22 @@ public void testDynamicOptions() throws Exception {
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(2));
}

@Test
public void testReadWriteBranch() throws Exception {
// create table
sql("CREATE TABLE T (id INT)");
// insert data
batchSql("INSERT INTO T VALUES (1)");
// create tag
paimonTable("T").createTag("tag1", 1);
// create branch
paimonTable("T").createBranch("branch1", "tag1");
// insert data to branch
batchSql("INSERT INTO T/*+ OPTIONS('branch' = 'branch1') */ VALUES (2)");
List<Row> rows = batchSql("select * from T /*+ OPTIONS('branch' = 'branch1') */");
assertThat(rows).containsExactlyInAnyOrder(Row.of(2), Row.of(1));
}

@Override
protected List<String> ddl() {
return Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,8 @@ public void testTagsTable() throws Exception {

List<Row> result = sql("SELECT tag_name, snapshot_id, schema_id, record_count FROM T$tags");

assertThat(result).containsExactlyInAnyOrder(Row.of("tag1", 1L, 0L, 1L), Row.of("tag2", 2L, 0L, 2L));
assertThat(result)
.containsExactlyInAnyOrder(Row.of("tag1", 1L, 0L, 1L), Row.of("tag2", 2L, 0L, 2L));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,61 @@ public void testLookupChangelog() throws Exception {
innerTestChangelogProducing(Collections.singletonList("'changelog-producer' = 'lookup'"));
}

@Test
public void testAlterTableReadWriteBranch() throws Exception {
TableEnvironment sEnv =
tableEnvironmentBuilder()
.streamingMode()
.checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100)
.parallelism(1)
.build();

sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
sEnv.executeSql("USE CATALOG testCatalog");
sEnv.executeSql(
"CREATE TABLE T2 ( k INT, v STRING, PRIMARY KEY (k) NOT ENFORCED ) "
+ "WITH ( "
+ "'bucket' = '2'"
+ ")");

CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM T2").collect();

// insert data
sEnv.executeSql("INSERT INTO T2 VALUES (1, 'A')").await();
// read initial data
List<String> actual = new ArrayList<>();
for (int i = 0; i < 1; i++) {
actual.add(it.next().toString());
}

assertThat(actual).containsExactlyInAnyOrder("+I[1, A]");

// create tag
sEnv.executeSql(
String.format("CALL sys.create_tag('%s.%s', 'tag2', 1, '5 d')", "default", "T2"));
// create branch
sEnv.executeSql(
String.format(
"CALL sys.create_branch('%s.%s', 'branch1', 'tag2')", "default", "T2"));
// alter table
sEnv.executeSql("ALTER TABLE T2 SET ('changelog-producer'='full-compaction')");

CloseableIterator<Row> branchIt =
sEnv.executeSql("select * from T2 /*+ OPTIONS('branch' = 'branch1') */").collect();
// insert data to branch
sEnv.executeSql(
"INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */ VALUES (10, 'v10'),(11, 'v11'),(12, 'v12')")
.await();

// read initial data
List<String> actualBranch = new ArrayList<>();
for (int i = 0; i < 4; i++) {
actualBranch.add(branchIt.next().toString());
}
assertThat(actualBranch)
.containsExactlyInAnyOrder("+I[1, A]", "+I[10, v10]", "+I[11, v11]", "+I[12, v12]");
}

private void innerTestChangelogProducing(List<String> options) throws Exception {
TableEnvironment sEnv =
tableEnvironmentBuilder()
Expand Down

0 comments on commit ea7785e

Please sign in to comment.