diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index 411f6a58e483..13c74349b6a4 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -346,19 +346,5 @@ All available procedures are listed below. CALL sys.merge_branch('default.T', 'branch1') - - replace_branch - - CALL [catalog.]sys.replace_branch('identifier', 'branchName') - - - To replace main branch with specified branch. Arguments: -
  • identifier: the target table identifier. Cannot be empty.
  • -
  • branchName: name of the branch to be replaced.
  • - - - CALL sys.replace_branch('default.T', 'branch1') - - diff --git a/docs/content/maintenance/manage-branches.md b/docs/content/maintenance/manage-branches.md index b9671153ca59..264ea506c932 100644 --- a/docs/content/maintenance/manage-branches.md +++ b/docs/content/maintenance/manage-branches.md @@ -108,24 +108,3 @@ CALL sys.merge_branch('default.T', 'branch1'); {{< /tab >}} {{< /tabs >}} - -## Replace Branch - -Replacing main branch with custom branch will copy the snapshots, tags and schemas which should be copied from the main branch to target branch, and then the custom branch will be the new 'main' branch. - -Suppose there is a job read and write the specified branch, by replacing main branch with the created branch, we don't need to do -anything with this job, for the new branch is totally complete now, and the 'main' branch is pointed to the new branch now. - -We cannot merge branch to main here because the new job will still read and write the specified branch which will be completely independent of main. - -{{< tabs "replace-branch" >}} - -{{< tab "Flink" >}} - -```sql -CALL sys.replace_branch('default.T', 'branch1'); -``` - -{{< /tab >}} - -{{< /tabs >}} \ No newline at end of file diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index 1f3ce2f12952..b14ba9b4fec8 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -156,17 +156,6 @@ This section introduce all available spark procedures about paimon. CALL sys.merge_branch(table => 'test_db.T', branch => 'test_branch') - - replace_branch - - To replace main branch with specified branch. Arguments: -
  • table: the target table identifier. Cannot be empty.
  • -
  • branch: name of the branch to be replaced.
  • - - - CALL sys.replace_branch(table => 'test_db.T', branch => 'test_branch') - - reset_consumer diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index 548ae69ee5cc..30f2288f829e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -210,12 +210,6 @@ public void mergeBranch(String branchName) { wrapped.mergeBranch(branchName); } - @Override - public void replaceBranch(String fromBranch) { - privilegeChecker.assertCanInsert(identifier); - wrapped.replaceBranch(fromBranch); - } - @Override public ExpireSnapshots newExpireSnapshots() { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index cb478d7bad28..8fa3b0789c8f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -67,7 +67,7 @@ import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX; import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; -import static org.apache.paimon.utils.BranchManager.getBranchPath; +import static org.apache.paimon.utils.BranchManager.branchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; import static org.apache.paimon.utils.Preconditions.checkState; @@ -499,13 +499,12 @@ public static TableSchema fromPath(FileIO fileIO, Path path) { } public Path schemaDirectory() { - return new Path(getBranchPath(fileIO, tableRoot, branch) + "/schema"); + return new Path(branchPath(tableRoot, branch) + "/schema"); } @VisibleForTesting public Path toSchemaPath(long schemaId) { - return new Path( - getBranchPath(fileIO, tableRoot, branch) + "/schema/" + SCHEMA_PREFIX + schemaId); + return new Path(branchPath(tableRoot, branch) + "/schema/" + SCHEMA_PREFIX + schemaId); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index cddc1d7bfa1f..dd8e5e17f47d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -529,11 +529,6 @@ public void mergeBranch(String branchName) { branchManager().mergeBranch(branchName); } - @Override - public void replaceBranch(String fromBranch) { - branchManager().replaceBranch(fromBranch); - } - @Override public void rollbackTo(String tagName) { TagManager tagManager = tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index 70023d5551aa..c92f4ce5f61a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -190,14 +190,6 @@ default void mergeBranch(String branchName) { this.getClass().getSimpleName())); } - @Override - default void replaceBranch(String fromBranch) { - throw new UnsupportedOperationException( - String.format( - "Readonly Table %s does not support replaceBranch.", - this.getClass().getSimpleName())); - } - @Override default ExpireSnapshots newExpireSnapshots() { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 227f2836d931..aeca29d63cbb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -115,9 +115,6 @@ public interface Table extends Serializable { @Experimental void mergeBranch(String branchName); - @Experimental - void replaceBranch(String fromBranch); - /** Manually expire snapshots, parameters can be controlled independently of table options. */ @Experimental ExpireSnapshots newExpireSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 63c33e5932cb..ce708b78532c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -34,12 +34,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.PriorityQueue; -import java.util.Set; import java.util.SortedMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -55,7 +52,6 @@ public class BranchManager { public static final String BRANCH_PREFIX = "branch-"; public static final String DEFAULT_MAIN_BRANCH = "main"; - public static final String MAIN_BRANCH_FILE = "MAIN-BRANCH"; private final FileIO fileIO; private final Path tablePath; @@ -76,12 +72,6 @@ public BranchManager( this.schemaManager = schemaManager; } - /** Commit specify branch to main. */ - public void commitMainBranch(String branchName) throws IOException { - Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE); - fileIO.overwriteFileUtf8(mainBranchFile, branchName); - } - /** Return the root Directory of branch. */ public Path branchDirectory() { return new Path(tablePath + "/branch"); @@ -92,45 +82,15 @@ public static boolean isMainBranch(String branch) { } /** Return the path string of a branch. */ - public static String getBranchPath(FileIO fileIO, Path tablePath, String branch) { - if (isMainBranch(branch)) { - Path path = new Path(tablePath, MAIN_BRANCH_FILE); - try { - if (fileIO.exists(path)) { - String data = fileIO.readFileUtf8(path); - if (StringUtils.isBlank(data)) { - return tablePath.toString(); - } else { - return tablePath.toString() + "/branch/" + BRANCH_PREFIX + data; - } - } else { - return tablePath.toString(); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branch; - } - - public String defaultMainBranch() { - Path path = new Path(tablePath, MAIN_BRANCH_FILE); - try { - if (fileIO.exists(path)) { - String data = fileIO.readFileUtf8(path); - if (!StringUtils.isBlank(data)) { - return data; - } - } - return DEFAULT_MAIN_BRANCH; - } catch (IOException e) { - throw new RuntimeException(e); - } + public static String branchPath(Path tablePath, String branch) { + return isMainBranch(branch) + ? tablePath.toString() + : tablePath.toString() + "/branch/" + BRANCH_PREFIX + branch; } /** Return the path of a branch. */ public Path branchPath(String branchName) { - return new Path(getBranchPath(fileIO, tablePath, branchName)); + return new Path(branchPath(tablePath, branchName)); } /** Create empty branch. */ @@ -156,7 +116,7 @@ public void createBranch(String branchName) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(fileIO, tablePath, branchName)), + branchName, branchPath(tablePath, branchName)), e); } } @@ -188,17 +148,17 @@ public void createBranch(String branchName, long snapshotId) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(fileIO, tablePath, branchName)), + branchName, branchPath(tablePath, branchName)), e); } } public void createBranch(String branchName, String tagName) { - String mainBranch = defaultMainBranch(); checkArgument( !isMainBranch(branchName), String.format( - "Branch name '%s' is the default branch and cannot be used.", mainBranch)); + "Branch name '%s' is the default branch and cannot be created.", + DEFAULT_MAIN_BRANCH)); checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName); checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName); @@ -224,7 +184,7 @@ public void createBranch(String branchName, String tagName) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(fileIO, tablePath, branchName)), + branchName, branchPath(tablePath, branchName)), e); } } @@ -238,111 +198,11 @@ public void deleteBranch(String branchName) { LOG.info( String.format( "Deleting the branch failed due to an exception in deleting the directory %s. Please try again.", - getBranchPath(fileIO, tablePath, branchName)), + branchPath(tablePath, branchName)), e); } } - /** Replace specify branch to main branch. */ - public void replaceBranch(String branchName) { - String mainBranch = defaultMainBranch(); - checkArgument( - !isMainBranch(branchName), - String.format( - "Branch name '%s' is the default main branch and cannot be replaced repeatedly.", - mainBranch)); - checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); - checkArgument(branchExists(branchName), "Branch name '%s' not exists.", branchName); - try { - // 0. Cache previous tag,snapshot,schema directory. - Path tagDirectory = tagManager.tagDirectory(); - Path snapshotDirectory = snapshotManager.snapshotDirectory(); - Path schemaDirectory = schemaManager.schemaDirectory(); - // 1. Calculate and copy the snapshots, tags and schemas which should be copied from the - // main to branch. - calculateCopyMainToBranch(branchName); - // 2. Update the Main Branch File to the target branch. - commitMainBranch(branchName); - // 3.Drop the previous main branch, including snapshots, tags and schemas. - dropPreviousMainBranch(tagDirectory, snapshotDirectory, schemaDirectory); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** Calculate copy main branch to target branch. */ - private void calculateCopyMainToBranch(String branchName) throws IOException { - TableBranch fromBranch = - this.branches().stream() - .filter(branch -> branch.getBranchName().equals(branchName)) - .findFirst() - .orElse(null); - if (fromBranch == null) { - throw new RuntimeException(String.format("No branches found %s", branchName)); - } - Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot()); - // Copy tags. - List tags = tagManager.allTagNames(); - TagManager branchTagManager = tagManager.copyWithBranch(branchName); - for (String tagName : tags) { - if (branchTagManager.tagExists(tagName)) { - // If it already exists, skip it directly. - continue; - } - Snapshot snapshot = tagManager.taggedSnapshot(tagName); - if (snapshot.id() < fromSnapshot.id()) { - fileIO.copyFileUtf8(tagManager.tagPath(tagName), branchTagManager.tagPath(tagName)); - } - } - // Copy snapshots. - Iterator snapshots = snapshotManager.snapshots(); - SnapshotManager branchSnapshotManager = snapshotManager.copyWithBranch(branchName); - while (snapshots.hasNext()) { - Snapshot snapshot = snapshots.next(); - if (snapshot.id() >= fromSnapshot.id()) { - continue; - } - if (branchSnapshotManager.snapshotExists(snapshot.id())) { - // If it already exists, skip it directly. - continue; - } - fileIO.copyFileUtf8( - snapshotManager.snapshotPath(snapshot.id()), - branchSnapshotManager.snapshotPath(snapshot.id())); - } - - // Copy schemas. - List schemaIds = schemaManager.listAllIds(); - SchemaManager branchSchemaManager = schemaManager.copyWithBranch(branchName); - Set existsSchemas = new HashSet<>(branchSchemaManager.listAllIds()); - - for (Long schemaId : schemaIds) { - if (existsSchemas.contains(schemaId)) { - // If it already exists, skip it directly. - continue; - } - TableSchema tableSchema = schemaManager.schema(schemaId); - if (tableSchema.id() < fromSnapshot.schemaId()) { - fileIO.copyFileUtf8( - schemaManager.toSchemaPath(schemaId), - branchSchemaManager.toSchemaPath(schemaId)); - } - } - } - - /** Directly delete snapshot, tag , schema directory. */ - private void dropPreviousMainBranch( - Path tagDirectory, Path snapshotDirectory, Path schemaDirectory) throws IOException { - // Delete tags. - fileIO.delete(tagDirectory, true); - - // Delete snapshots. - fileIO.delete(snapshotDirectory, true); - - // Delete schemas. - fileIO.delete(schemaDirectory, true); - } - /** Check if path exists. */ public boolean fileExists(Path path) { try { @@ -423,7 +283,7 @@ public void mergeBranch(String branchName) { throw new RuntimeException( String.format( "Exception occurs when merge branch '%s' (directory in %s).", - branchName, getBranchPath(fileIO, tablePath, branchName)), + branchName, branchPath(tablePath, branchName)), e); } } @@ -434,15 +294,6 @@ public boolean branchExists(String branchName) { return fileExists(branchPath); } - /** Get branch count for the table. */ - public long branchCount() { - try { - return listVersionedDirectories(fileIO, branchDirectory(), BRANCH_PREFIX).count(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - /** Get all branches for the table. */ public List branches() { try { @@ -463,7 +314,7 @@ public List branches() { } FileStoreTable branchTable = FileStoreTableFactory.create( - fileIO, new Path(getBranchPath(fileIO, tablePath, branchName))); + fileIO, new Path(branchPath(tablePath, branchName))); SortedMap> snapshotTags = branchTable.tagManager().tags(); Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId(); if (snapshotTags.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index af1d450db1d6..868f92f76df4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -47,7 +47,7 @@ import java.util.stream.LongStream; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; -import static org.apache.paimon.utils.BranchManager.getBranchPath; +import static org.apache.paimon.utils.BranchManager.branchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; /** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */ @@ -90,27 +90,21 @@ public Path tablePath() { } public Path changelogDirectory() { - return new Path(getBranchPath(fileIO, tablePath, branch) + "/changelog"); + return new Path(branchPath(tablePath, branch) + "/changelog"); } public Path longLivedChangelogPath(long snapshotId) { return new Path( - getBranchPath(fileIO, tablePath, branch) - + "/changelog/" - + CHANGELOG_PREFIX - + snapshotId); + branchPath(tablePath, branch) + "/changelog/" + CHANGELOG_PREFIX + snapshotId); } public Path snapshotPath(long snapshotId) { return new Path( - getBranchPath(fileIO, tablePath, branch) - + "/snapshot/" - + SNAPSHOT_PREFIX - + snapshotId); + branchPath(tablePath, branch) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); } public Path snapshotDirectory() { - return new Path(getBranchPath(fileIO, tablePath, branch) + "/snapshot"); + return new Path(branchPath(tablePath, branch) + "/snapshot"); } public Snapshot snapshot(long snapshotId) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index b35b55cf4b28..eb2273f3c888 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -46,7 +46,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; -import static org.apache.paimon.utils.BranchManager.getBranchPath; +import static org.apache.paimon.utils.BranchManager.branchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -78,12 +78,12 @@ public TagManager copyWithBranch(String branchName) { /** Return the root Directory of tags. */ public Path tagDirectory() { - return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag"); + return new Path(branchPath(tablePath, branch) + "/tag"); } /** Return the path of a tag. */ public Path tagPath(String tagName) { - return new Path(getBranchPath(fileIO, tablePath, branch) + "/tag/" + TAG_PREFIX + tagName); + return new Path(branchPath(tablePath, branch) + "/tag/" + TAG_PREFIX + tagName); } /** Create a tag from given snapshot and save it in the storage. */ diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 6ff0bd4412d8..9e0a7c224132 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1114,7 +1114,7 @@ public void testUnsupportedBranchName() throws Exception { .satisfies( anyCauseMatches( IllegalArgumentException.class, - "Branch name 'main' is the default branch and cannot be used.")); + "Branch name 'main' is the default branch and cannot be created.")); assertThatThrownBy(() -> table.createBranch("branch-1", "tag1")) .satisfies( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java deleted file mode 100644 index 10ef4a67ae87..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.procedure; - -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.table.Table; - -import org.apache.flink.table.procedure.ProcedureContext; - -/** - * Replace branch procedure for given branch. Usage: - * - *
    
    - *  CALL sys.replace_branch('tableId', 'branchName')
    - * 
    - */ -public class ReplaceBranchProcedure extends ProcedureBase { - - public static final String IDENTIFIER = "replace_branch"; - - @Override - public String identifier() { - return IDENTIFIER; - } - - public String[] call(ProcedureContext procedureContext, String tableId, String branchName) - throws Catalog.TableNotExistException { - return innerCall(tableId, branchName); - } - - private String[] innerCall(String tableId, String branchName) - throws Catalog.TableNotExistException { - Table table = catalog.getTable(Identifier.fromString(tableId)); - table.replaceBranch(branchName); - return new String[] {"Success"}; - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 91ede6b5a8eb..f61df27a6acb 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -56,6 +56,5 @@ org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure org.apache.paimon.flink.procedure.RepairProcedure -org.apache.paimon.flink.procedure.ReplaceBranchProcedure org.apache.paimon.flink.procedure.MergeBranchProcedure org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index a41d46e38abc..f3d8d391b169 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -170,86 +170,6 @@ void testCreateAndDeleteEmptyBranch() throws Exception { assertThat(branchManager.branchExists("empty_branch_name")).isFalse(); } - @Test - void testReplaceBranch() throws Exception { - init(warehouse); - RowType rowType = - RowType.of( - new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, - new String[] {"k", "v"}); - FileStoreTable table = - createFileStoreTable( - rowType, - Collections.emptyList(), - Collections.singletonList("k"), - Collections.emptyList(), - Collections.emptyMap()); - - StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); - write = writeBuilder.newWrite(); - commit = writeBuilder.newCommit(); - - // 3 snapshots - writeData(rowData(1L, BinaryString.fromString("Hi"))); - writeData(rowData(2L, BinaryString.fromString("Hello"))); - writeData(rowData(3L, BinaryString.fromString("Paimon"))); - - // Create tag2 - TagManager tagManager = new TagManager(table.fileIO(), table.location()); - callProcedure( - String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); - assertThat(tagManager.tagExists("tag2")).isTrue(); - - // Create replace_branch_name branch - BranchManager branchManager = table.branchManager(); - callProcedure( - String.format( - "CALL sys.create_branch('%s.%s', 'replace_branch_name', 'tag2')", - database, tableName)); - assertThat(branchManager.branchExists("replace_branch_name")).isTrue(); - - // Replace branch - callProcedure( - String.format( - "CALL sys.replace_branch('%s.%s', 'replace_branch_name')", - database, tableName)); - - // Check snapshot - SnapshotManager snapshotManager = table.snapshotManager(); - assertThat(snapshotManager.snapshotExists(3)).isFalse(); - - // Renew write - write = writeBuilder.newWrite(); - commit = writeBuilder.newCommit(); - - // Add data, forward to replace branch - for (long i = 4; i < 14; i++) { - writeData(rowData(i, BinaryString.fromString(String.format("new.data_%s", i)))); - } - - List result = readTableData(table); - List sortedActual = new ArrayList<>(result); - List expected = - Arrays.asList( - "+I[1, Hi]", - "+I[2, Hello]", - "+I[4, new.data_4]", - "+I[5, new.data_5]", - "+I[6, new.data_6]", - "+I[7, new.data_7]", - "+I[8, new.data_8]", - "+I[9, new.data_9]", - "+I[10, new.data_10]", - "+I[11, new.data_11]", - "+I[12, new.data_12]", - "+I[13, new.data_13]"); - Assert.assertEquals(expected, sortedActual); - - callProcedure( - String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName)); - assertThat(tagManager.tagExists("tag3")).isTrue(); - } - @Test void testMergeBranch() throws Exception { init(warehouse); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index 749894cb4db0..d423e2045098 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -32,7 +32,6 @@ import org.apache.paimon.spark.procedure.ProcedureBuilder; import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure; import org.apache.paimon.spark.procedure.RepairProcedure; -import org.apache.paimon.spark.procedure.ReplaceBranchProcedure; import org.apache.paimon.spark.procedure.ResetConsumerProcedure; import org.apache.paimon.spark.procedure.RollbackProcedure; @@ -70,7 +69,6 @@ private static Map> initProcedureBuilders() { procedureBuilders.put("expire_partitions", ExpirePartitionsProcedure::builder); procedureBuilders.put("repair", RepairProcedure::builder); procedureBuilders.put("merge_branch", MergeBranchProcedure::builder); - procedureBuilders.put("replace_branch", ReplaceBranchProcedure::builder); procedureBuilders.put("reset_consumer", ResetConsumerProcedure::builder); return procedureBuilders.build(); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceBranchProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceBranchProcedure.java deleted file mode 100644 index b87952e3ef6b..000000000000 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ReplaceBranchProcedure.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.spark.procedure; - -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.TableCatalog; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; - -import static org.apache.spark.sql.types.DataTypes.StringType; - -/** - * Replace branch procedure for given branch. Usage: - * - *
    
    - *  CALL sys.replace_branch('tableId', 'branchName')
    - * 
    - */ -public class ReplaceBranchProcedure extends BaseProcedure { - - private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] { - ProcedureParameter.required("table", StringType), - ProcedureParameter.required("branch", StringType) - }; - - private static final StructType OUTPUT_TYPE = - new StructType( - new StructField[] { - new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) - }); - - protected ReplaceBranchProcedure(TableCatalog tableCatalog) { - super(tableCatalog); - } - - @Override - public ProcedureParameter[] parameters() { - return PARAMETERS; - } - - @Override - public StructType outputType() { - return OUTPUT_TYPE; - } - - @Override - public InternalRow[] call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - String branch = args.getString(1); - return modifyPaimonTable( - tableIdent, - table -> { - table.replaceBranch(branch); - InternalRow outputRow = newInternalRow(true); - return new InternalRow[] {outputRow}; - }); - } - - public static ProcedureBuilder builder() { - return new Builder() { - @Override - public ReplaceBranchProcedure doBuild() { - return new ReplaceBranchProcedure(tableCatalog()); - } - }; - } - - @Override - public String description() { - return "ReplaceBranchProcedure"; - } -} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceBranchProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceBranchProcedureTest.scala deleted file mode 100644 index 48b6caa19b76..000000000000 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ReplaceBranchProcedureTest.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.spark.procedure - -import org.apache.paimon.spark.PaimonSparkTestBase - -import org.apache.spark.sql.Row - -class ReplaceBranchProcedureTest extends PaimonSparkTestBase { - - test("Paimon procedure: replace branch test") { - spark.sql(s""" - |CREATE TABLE T (id STRING, name STRING) - |USING PAIMON - |TBLPROPERTIES ('primary-key'='id') - |""".stripMargin) - - spark.sql(s"INSERT INTO T VALUES ('1', 'a')") - spark.sql(s"INSERT INTO T VALUES ('2', 'b')") - spark.sql(s"INSERT INTO T VALUES ('3', 'c')") - spark.sql(s"INSERT INTO T VALUES ('4', 'd')") - - checkAnswer( - spark.sql( - "CALL paimon.sys.create_tag(" + - "table => 'test.T', tag => 'test_tag', snapshot => 2)"), - Row(true) :: Nil) - - checkAnswer( - spark.sql( - "CALL paimon.sys.create_branch(table => 'test.T', branch => 'test_branch', tag => 'test_tag')"), - Row(true) :: Nil) - - checkAnswer( - spark.sql("CALL paimon.sys.replace_branch(table => 'test.T', branch => 'test_branch')"), - Row(true) :: Nil) - - spark.sql(s"INSERT INTO T VALUES ('5', 'e')") - spark.sql(s"INSERT INTO T VALUES ('6', 'f')") - - val query = () => spark.sql("SELECT * FROM T") - checkAnswer(query(), Row("1", "a") :: Row("2", "b") :: Row("5", "e") :: Row("6", "f") :: Nil) - - } - -}