diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 062e935328fce..a1b079e92cc5a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -38,11 +38,13 @@ import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.system.SystemTableLoader; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.StringUtils; import javax.annotation.Nullable; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -294,7 +296,7 @@ public void alterTable( if (ignoreIfNotExists) { return; } - throw new TableNotExistException(identifier); + throw new TableNotExistException(identifier, branchName); } alterTableImpl(identifier, branchName, changes); @@ -399,7 +401,21 @@ protected abstract TableSchema getDataTableSchema(Identifier identifier, String @VisibleForTesting public Path getDataTableLocation(Identifier identifier) { - return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getObjectName()); + Optional> optionalBranchName = + getOriginalIdentifierAndBranch(identifier); + String branch = DEFAULT_MAIN_BRANCH; + if (optionalBranchName.isPresent()) { + identifier = optionalBranchName.get().getLeft(); + branch = optionalBranchName.get().getRight(); + } + Path databasePath = newDatabasePath(identifier.getDatabaseName()); + if (branch.equals(DEFAULT_MAIN_BRANCH)) { + return new Path(databasePath, identifier.getObjectName()); + } else { + return new Path( + databasePath + File.separator + identifier.getObjectName(), + "branch/" + BranchManager.BRANCH_PREFIX + branch); + } } private static Optional> getOriginalIdentifierAndBranch( @@ -477,9 +493,21 @@ public static Path newTableLocation(String warehouse, Identifier identifier) { "Table name[%s] cannot contain '%s' separator", identifier.getObjectName(), SYSTEM_TABLE_SPLITTER)); } - return new Path( - newDatabasePath(warehouse, identifier.getDatabaseName()), - identifier.getObjectName()); + Optional> optionalBranchName = + getOriginalIdentifierAndBranch(identifier); + String branch = DEFAULT_MAIN_BRANCH; + if (optionalBranchName.isPresent()) { + identifier = optionalBranchName.get().getLeft(); + branch = optionalBranchName.get().getRight(); + } + Path databasePath = newDatabasePath(warehouse, identifier.getDatabaseName()); + if (branch.equals(DEFAULT_MAIN_BRANCH)) { + return new Path(databasePath, identifier.getObjectName()); + } else { + return new Path( + databasePath + File.separator + identifier.getObjectName(), + "branch/" + BranchManager.BRANCH_PREFIX + branch); + } } public static Path newDatabasePath(String warehouse, String database) { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index ac229464b008c..a7f740563bfc6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -36,6 +36,8 @@ import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; + /** * This interface is responsible for reading and writing metadata such as database/table from a * paimon catalog. @@ -411,16 +413,26 @@ public Identifier identifier() { /** Exception for trying to operate on a table that doesn't exist. */ class TableNotExistException extends Exception { - private static final String MSG = "Table %s does not exist."; - + private static final String MSG_TABLE = "Table %s does not exist."; + private static final String MSG_BRANCH = "Branch %s does not exist."; private final Identifier identifier; public TableNotExistException(Identifier identifier) { - this(identifier, null); + this(identifier, null, DEFAULT_MAIN_BRANCH); } - public TableNotExistException(Identifier identifier, Throwable cause) { - super(String.format(MSG, identifier.getFullName()), cause); + public TableNotExistException(Identifier identifier, String branchName) { + this(identifier, null, branchName); + } + + public TableNotExistException(Identifier identifier, Throwable cause, String branchName) { + super( + branchName.equals(DEFAULT_MAIN_BRANCH) + ? String.format(MSG_TABLE, identifier.getFullName()) + : String.format( + MSG_BRANCH, + identifier.getFullName() + BRANCH_PREFIX + branchName), + cause); this.identifier = identifier; } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 64f38a106c93e..8648196eddde8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -118,7 +118,7 @@ public TableSchema getDataTableSchema(Identifier identifier, String branchName) return s; } }) - .orElseThrow(() -> new TableNotExistException(identifier)); + .orElseThrow(() -> new TableNotExistException(identifier, branchName)); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index 19c9e83c45bc9..018a306bb4457 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.jdbc.JdbcCatalog; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; @@ -29,6 +30,7 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; @@ -38,6 +40,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.io.File; +import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -349,6 +353,49 @@ public void testGetTable() throws Exception { .withMessage("Table non_existing_db.test_table does not exist."); } + @Test + public void testGetDataTableLocation() { + Path path = + ((JdbcCatalog) catalog) + .getDataTableLocation(Identifier.create("test_db", "test_table$branch_a")); + assertThat(path.toString()) + .isEqualTo( + new File( + "file:/" + tempFile, + "test_db" + + ".db" + + File.separator + + "test_table" + + File.separator + + "branch" + + File.separator + + BranchManager.BRANCH_PREFIX + + "a") + .toString()); + } + + @Test + public void testNewTableLocation() { + Path path = + AbstractCatalog.newTableLocation( + String.valueOf(tempFile), + Identifier.create("test_db", "test_table$branch_a")); + assertThat(path.toString()) + .isEqualTo( + new File( + String.valueOf(tempFile), + "test_db" + + ".db" + + File.separator + + "test_table" + + File.separator + + "branch" + + File.separator + + BranchManager.BRANCH_PREFIX + + "a") + .toString()); + } + @Test public void testDropTable() throws Exception { catalog.createDatabase("test_db", false); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java index 9a5b9d901448f..380adb8105ca2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.utils.BlockingIterator; import org.apache.flink.table.planner.factories.TestValuesTableFactory; @@ -278,6 +279,22 @@ public void testReadWriteBranch() throws Exception { assertThat(rows).containsExactlyInAnyOrder(Row.of(2), Row.of(1)); } + @Test + public void testBranchNotExist() throws Exception { + // create table + sql("CREATE TABLE T (id INT)"); + // insert data + batchSql("INSERT INTO T VALUES (1)"); + // create tag + paimonTable("T").createTag("tag1", 1); + // create branch + paimonTable("T").createBranch("branch1", "tag1"); + // call the FileSystemCatalog.getDataTableSchema() function + assertThatThrownBy(() -> paimonTable("T$branch_branch2")) + .isInstanceOf(Catalog.TableNotExistException.class) + .hasMessage("Branch %s does not exist.", "default.T$branch_branch2"); + } + @Override protected List ddl() { return Arrays.asList( diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala new file mode 100644 index 0000000000000..15418ab8a680f --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/AlterBranchProcedureTest.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.streaming.StreamTest + +class AlterBranchProcedureTest extends PaimonSparkTestBase with StreamTest { + + import testImplicits._ + test("Paimon Procedure: alter schema structure and test $branch syntax.") { + withTempDir { + checkpointDir => + // define a change-log table and test `forEachBatch` api + spark.sql(s""" + |CREATE TABLE T (a INT, b STRING) + |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(Int, String)] + val stream = inputData + .toDS() + .toDF("a", "b") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T ORDER BY a") + try { + // snapshot-1 + inputData.addData((1, "a")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Nil) + + // snapshot-2 + inputData.addData((2, "b")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil) + + // snapshot-3 + inputData.addData((2, "b2")) + stream.processAllAvailable() + checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil) + + val table = loadTable("T") + val branchManager = table.branchManager() + + // create branch with snapshot + checkAnswer( + spark.sql( + "CALL paimon.sys.create_branch(table => 'test.T', branch => 'snapshot_branch', snapshot => 2)"), + Row(true) :: Nil) + assert(branchManager.branchExists("snapshot_branch")) + + spark.sql("INSERT INTO T VALUES (1, 'APPLE'), (2,'DOG'), (2, 'horse')") + spark.sql("ALTER TABLE `T$branch_snapshot_branch` ADD COLUMNS(c INT)") + spark.sql( + "INSERT INTO `T$branch_snapshot_branch` VALUES " + "(1,'cherry', 100), (2,'bird', 200), (3, 'wolf', 400)") + + checkAnswer( + spark.sql("SELECT * FROM T ORDER BY a, b"), + Row(1, "APPLE") :: Row(2, "horse") :: Nil) + checkAnswer( + spark.sql("SELECT * FROM `T$branch_snapshot_branch` ORDER BY a, b,c"), + Row(1, "cherry", 100) :: Row(2, "bird", 200) :: Row(3, "wolf", 400) :: Nil) + assert(branchManager.branchExists("snapshot_branch")) + } + } + } +}