From b555fca25fd9ab5ae642756add5628beb08167d6 Mon Sep 17 00:00:00 2001 From: "liming.1018" Date: Tue, 5 Nov 2024 15:08:08 +0800 Subject: [PATCH] [core] fix the issue where the historical schemaId could not be found when reading the branch table. (#4454) --- .../apache/paimon/utils/BranchManager.java | 21 ++++++---- .../apache/paimon/flink/BranchSqlITCase.java | 38 +++++++++++++++++++ 2 files changed, 51 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index c2793de37799..bc353bb10d16 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -94,10 +94,7 @@ public void createBranch(String branchName) { try { TableSchema latestSchema = schemaManager.latest().get(); - fileIO.copyFile( - schemaManager.toSchemaPath(latestSchema.id()), - schemaManager.copyWithBranch(branchName).toSchemaPath(latestSchema.id()), - true); + copySchemasToBranch(branchName, latestSchema.id()); } catch (IOException e) { throw new RuntimeException( String.format( @@ -123,10 +120,7 @@ public void createBranch(String branchName, String tagName) { snapshotManager.snapshotPath(snapshot.id()), snapshotManager.copyWithBranch(branchName).snapshotPath(snapshot.id()), true); - fileIO.copyFile( - schemaManager.toSchemaPath(snapshot.schemaId()), - schemaManager.copyWithBranch(branchName).toSchemaPath(snapshot.schemaId()), - true); + copySchemasToBranch(branchName, snapshot.schemaId()); } catch (IOException e) { throw new RuntimeException( String.format( @@ -249,4 +243,15 @@ private void validateBranch(String branchName) { "Branch name cannot be pure numeric string but is '%s'.", branchName); } + + private void copySchemasToBranch(String branchName, long schemaId) throws IOException { + for (int i = 0; i <= schemaId; i++) { + if (schemaManager.schemaExists(i)) { + fileIO.copyFile( + schemaManager.toSchemaPath(i), + schemaManager.copyWithBranch(branchName).toSchemaPath(i), + true); + } + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 6cf82131f0d9..1d33a9e8a6f2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.BlockingIterator; import org.apache.paimon.utils.SnapshotManager; @@ -498,6 +499,43 @@ public void testCannotSetEmptyFallbackBranch() { .satisfies(anyCauseMatches(IllegalArgumentException.class, errMsg)); } + @Test + public void testReadBranchTableWithMultiSchemaIds() throws Exception { + sql( + "CREATE TABLE T (" + + " pt INT" + + ", k INT" + + ", v STRING" + + ", PRIMARY KEY (pt, k) NOT ENFORCED" + + " ) PARTITIONED BY (pt) WITH (" + + " 'bucket' = '2'" + + " )"); + + sql("INSERT INTO T VALUES" + " (1, 10, 'apple')," + " (1, 20, 'banana')"); + + sql("ALTER TABLE `T` ADD (v2 INT)"); + + sql("INSERT INTO T VALUES" + " (2, 10, 'cat', 2)," + " (2, 20, 'dog', 2)"); + + sql("ALTER TABLE `T` ADD (v3 INT)"); + + sql("CALL sys.create_tag('default.T', 'tag1', 2)"); + + sql("CALL sys.create_branch('default.T', 'test', 'tag1')"); + + FileStoreTable table = paimonTable("T"); + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location(), "test"); + List schemaIds = schemaManager.listAllIds(); + assertThat(schemaIds.size()).isEqualTo(2); + + assertThat(collectResult("SELECT * FROM T$branch_test")) + .containsExactlyInAnyOrder( + "+I[1, 10, apple, null]", + "+I[1, 20, banana, null]", + "+I[2, 10, cat, 2]", + "+I[2, 20, dog, 2]"); + } + private List collectResult(String sql) throws Exception { List result = new ArrayList<>(); try (CloseableIterator it = tEnv.executeSql(sql).collect()) {