Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] fix the issue where the historical schemaId could not be found when reading the branch table. #4454

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,7 @@ public void createBranch(String branchName) {

try {
TableSchema latestSchema = schemaManager.latest().get();
fileIO.copyFile(
schemaManager.toSchemaPath(latestSchema.id()),
schemaManager.copyWithBranch(branchName).toSchemaPath(latestSchema.id()),
true);
copySchemasToBranch(branchName, latestSchema.id());
} catch (IOException e) {
throw new RuntimeException(
String.format(
Expand All @@ -123,10 +120,7 @@ public void createBranch(String branchName, String tagName) {
snapshotManager.snapshotPath(snapshot.id()),
snapshotManager.copyWithBranch(branchName).snapshotPath(snapshot.id()),
true);
fileIO.copyFile(
schemaManager.toSchemaPath(snapshot.schemaId()),
schemaManager.copyWithBranch(branchName).toSchemaPath(snapshot.schemaId()),
true);
copySchemasToBranch(branchName, snapshot.schemaId());
} catch (IOException e) {
throw new RuntimeException(
String.format(
Expand Down Expand Up @@ -249,4 +243,15 @@ private void validateBranch(String branchName) {
"Branch name cannot be pure numeric string but is '%s'.",
branchName);
}

private void copySchemasToBranch(String branchName, long schemaId) throws IOException {
for (int i = 0; i <= schemaId; i++) {
if (schemaManager.schemaExists(i)) {
fileIO.copyFile(
schemaManager.toSchemaPath(i),
schemaManager.copyWithBranch(branchName).toSchemaPath(i),
true);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink;

import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -498,6 +499,43 @@ public void testCannotSetEmptyFallbackBranch() {
.satisfies(anyCauseMatches(IllegalArgumentException.class, errMsg));
}

@Test
public void testReadBranchTableWithMultiSchemaIds() throws Exception {
sql(
"CREATE TABLE T ("
+ " pt INT"
+ ", k INT"
+ ", v STRING"
+ ", PRIMARY KEY (pt, k) NOT ENFORCED"
+ " ) PARTITIONED BY (pt) WITH ("
+ " 'bucket' = '2'"
+ " )");

sql("INSERT INTO T VALUES" + " (1, 10, 'apple')," + " (1, 20, 'banana')");

sql("ALTER TABLE `T` ADD (v2 INT)");

sql("INSERT INTO T VALUES" + " (2, 10, 'cat', 2)," + " (2, 20, 'dog', 2)");

sql("ALTER TABLE `T` ADD (v3 INT)");

sql("CALL sys.create_tag('default.T', 'tag1', 2)");

sql("CALL sys.create_branch('default.T', 'test', 'tag1')");

FileStoreTable table = paimonTable("T");
SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location(), "test");
List<Long> schemaIds = schemaManager.listAllIds();
assertThat(schemaIds.size()).isEqualTo(2);

assertThat(collectResult("SELECT * FROM T$branch_test"))
.containsExactlyInAnyOrder(
"+I[1, 10, apple, null]",
"+I[1, 20, banana, null]",
"+I[2, 10, cat, 2]",
"+I[2, 20, dog, 2]");
}

private List<String> collectResult(String sql) throws Exception {
List<String> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
Expand Down
Loading