Skip to content

Commit

Permalink
[core] fix the issue where the historical schemaId could not be found…
Browse files Browse the repository at this point in the history
… when reading the branch table. (#4454)
  • Loading branch information
liming30 authored Nov 5, 2024
1 parent 5844a89 commit b555fca
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,7 @@ public void createBranch(String branchName) {

try {
TableSchema latestSchema = schemaManager.latest().get();
fileIO.copyFile(
schemaManager.toSchemaPath(latestSchema.id()),
schemaManager.copyWithBranch(branchName).toSchemaPath(latestSchema.id()),
true);
copySchemasToBranch(branchName, latestSchema.id());
} catch (IOException e) {
throw new RuntimeException(
String.format(
Expand All @@ -123,10 +120,7 @@ public void createBranch(String branchName, String tagName) {
snapshotManager.snapshotPath(snapshot.id()),
snapshotManager.copyWithBranch(branchName).snapshotPath(snapshot.id()),
true);
fileIO.copyFile(
schemaManager.toSchemaPath(snapshot.schemaId()),
schemaManager.copyWithBranch(branchName).toSchemaPath(snapshot.schemaId()),
true);
copySchemasToBranch(branchName, snapshot.schemaId());
} catch (IOException e) {
throw new RuntimeException(
String.format(
Expand Down Expand Up @@ -249,4 +243,15 @@ private void validateBranch(String branchName) {
"Branch name cannot be pure numeric string but is '%s'.",
branchName);
}

private void copySchemasToBranch(String branchName, long schemaId) throws IOException {
for (int i = 0; i <= schemaId; i++) {
if (schemaManager.schemaExists(i)) {
fileIO.copyFile(
schemaManager.toSchemaPath(i),
schemaManager.copyWithBranch(branchName).toSchemaPath(i),
true);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink;

import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -498,6 +499,43 @@ public void testCannotSetEmptyFallbackBranch() {
.satisfies(anyCauseMatches(IllegalArgumentException.class, errMsg));
}

@Test
public void testReadBranchTableWithMultiSchemaIds() throws Exception {
sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

sql("INSERT INTO T VALUES" + " (1, 10, 'apple')," + " (1, 20, 'banana')");

sql("ALTER TABLE `T` ADD (v2 INT)");

sql("INSERT INTO T VALUES" + " (2, 10, 'cat', 2)," + " (2, 20, 'dog', 2)");

sql("ALTER TABLE `T` ADD (v3 INT)");

sql("CALL sys.create_tag('default.T', 'tag1', 2)");

sql("CALL sys.create_branch('default.T', 'test', 'tag1')");

FileStoreTable table = paimonTable("T");
SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location(), "test");
List<Long> schemaIds = schemaManager.listAllIds();
assertThat(schemaIds.size()).isEqualTo(2);

assertThat(collectResult("SELECT * FROM T$branch_test"))
.containsExactlyInAnyOrder(
"+I[1, 10, apple, null]",
"+I[1, 20, banana, null]",
"+I[2, 10, cat, 2]",
"+I[2, 20, dog, 2]");
}

private List<String> collectResult(String sql) throws Exception {
List<String> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
Expand Down

0 comments on commit b555fca

Please sign in to comment.