From a5de1b2a032df8ef1d86acce1b66987f30cd17a5 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Thu, 22 Feb 2024 16:44:32 +0800 Subject: [PATCH] Implement merge/replace branch in BranchManager #2153 --- .../apache/paimon/schema/SchemaManager.java | 30 +++- .../paimon/table/AbstractFileStoreTable.java | 5 + .../apache/paimon/table/ReadonlyTable.java | 8 + .../java/org/apache/paimon/table/Table.java | 3 + .../java/org/apache/paimon/tag/TableTag.java | 38 +++++ .../apache/paimon/utils/BranchManager.java | 158 +++++++++++++++++- .../apache/paimon/utils/SnapshotManager.java | 35 +++- .../org/apache/paimon/utils/TagManager.java | 72 +++++++- .../procedure/ReplaceBranchProcedure.java | 54 ++++++ .../org.apache.paimon.factories.Factory | 1 + .../flink/action/BranchActionITCase.java | 93 +++++++++++ 11 files changed, 479 insertions(+), 18 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/tag/TableTag.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index a6d274688aea0..9a841ef7bfd94 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -109,11 +109,28 @@ public Optional latest(String branchName) { } } + /** List all schema with branch. */ + public List listAllWithBranch(String branchName) { + return listAllIdsWithBranch(branchName).stream() + .map(this::schema) + .collect(Collectors.toList()); + } + /** List all schema. */ public List listAll() { return listAllIds().stream().map(this::schema).collect(Collectors.toList()); } + /** List all schema IDs with branch. */ + public List listAllIdsWithBranch(String branchName) { + try { + return listVersionedFiles(fileIO, branchSchemaDirectory(branchName), SCHEMA_PREFIX) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + /** List all schema IDs. */ public List listAllIds() { try { @@ -482,22 +499,25 @@ public static TableSchema fromPath(FileIO fileIO, Path path) { } } - private Path schemaDirectory() { - return new Path(tableRoot + "/schema"); + public Path schemaDirectory() { + return branchSchemaDirectory(DEFAULT_MAIN_BRANCH); } @VisibleForTesting public Path toSchemaPath(long id) { - return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id); + return branchSchemaPath(DEFAULT_MAIN_BRANCH, id); } public Path branchSchemaDirectory(String branchName) { - return new Path(getBranchPath(tableRoot, branchName) + "/schema"); + return new Path(getBranchPath(fileIO, tableRoot, branchName) + "/schema"); } public Path branchSchemaPath(String branchName, long schemaId) { return new Path( - getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId); + getBranchPath(fileIO, tableRoot, branchName) + + "/schema/" + + SCHEMA_PREFIX + + schemaId); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 6eea9d2aec520..0fcab535a359e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -456,6 +456,11 @@ public void deleteBranch(String branchName) { branchManager().deleteBranch(branchName); } + @Override + public void replaceBranch(String fromBranch) { + branchManager().replaceBranch(fromBranch); + } + @Override public void rollbackTo(String tagName) { TagManager tagManager = tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index f0d52b641015e..5c38f35532d32 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -149,6 +149,14 @@ default void deleteBranch(String branchName) { this.getClass().getSimpleName())); } + @Override + default void replaceBranch(String fromBranch) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support replaceBranch.", + this.getClass().getSimpleName())); + } + @Override default ExpireSnapshots newExpireSnapshots() { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 64ab6d2ab0a71..8c3b467c85573 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -96,6 +96,9 @@ public interface Table extends Serializable { @Experimental void deleteBranch(String branchName); + @Experimental + void replaceBranch(String fromBranch); + /** Manually expire snapshots, parameters can be controlled independently of table options. */ @Experimental ExpireSnapshots newExpireSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TableTag.java b/paimon-core/src/main/java/org/apache/paimon/tag/TableTag.java new file mode 100644 index 0000000000000..9d82fbedcec17 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TableTag.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tag; + +/** {@link TableTag} has tag relevant information for table. */ +public class TableTag { + private final String tagName; + private final long createTime; + + public TableTag(String tagName, Long createTime) { + this.tagName = tagName; + this.createTime = createTime; + } + + public String getTagName() { + return tagName; + } + + public long getCreateTime() { + return createTime; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 6564bd4e56dcd..96e07805d7e0d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -23,15 +23,20 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.tag.TableTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.SortedMap; import java.util.stream.Collectors; @@ -45,6 +50,7 @@ public class BranchManager { public static final String BRANCH_PREFIX = "branch-"; public static final String DEFAULT_MAIN_BRANCH = "main"; + public static final String MAIN_BRANCH_FILE = "MAIN-BRANCH"; private final FileIO fileIO; private final Path tablePath; @@ -65,19 +71,36 @@ public BranchManager( this.schemaManager = schemaManager; } + /** Commit specify branch to main. */ + public void commitMainBranch(String branchName) throws IOException { + Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE); + fileIO.delete(mainBranchFile, false); + fileIO.overwriteFileUtf8(mainBranchFile, branchName); + } + /** Return the root Directory of branch. */ public Path branchDirectory() { return new Path(tablePath + "/branch"); } /** Return the path string of a branch. */ - public static String getBranchPath(Path tablePath, String branchName) { + public static String getBranchPath(FileIO fileIO, Path tablePath, String branchName) { + if (StringUtils.isBlank(branchName)) { + return tablePath.toString(); + } + if (branchName.equals(DEFAULT_MAIN_BRANCH)) { + branchName = forwardBranchName(fileIO, tablePath, branchName); + } + // No main branch replacement has occurred. + if (branchName.equals(DEFAULT_MAIN_BRANCH)) { + return tablePath.toString(); + } return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName; } /** Return the path of a branch. */ public Path branchPath(String branchName) { - return new Path(getBranchPath(tablePath, branchName)); + return new Path(getBranchPath(fileIO, tablePath, branchName)); } public void createBranch(String branchName, String tagName) { @@ -110,7 +133,7 @@ public void createBranch(String branchName, String tagName) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } @@ -124,11 +147,114 @@ public void deleteBranch(String branchName) { LOG.info( String.format( "Deleting the branch failed due to an exception in deleting the directory %s. Please try again.", - getBranchPath(tablePath, branchName)), + getBranchPath(fileIO, tablePath, branchName)), e); } } + /** Replace specify branch to main branch. */ + public void replaceBranch(String branchName) { + checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); + checkArgument(branchExists(branchName), "Branch name '%s' not exists.", branchName); + try { + // 0. Cache previous tag,snapshot,schema directory. + Path tagDirectory = tagManager.tagDirectory(); + Path snapshotDirectory = snapshotManager.snapshotDirectory(); + Path schemaDirectory = schemaManager.schemaDirectory(); + // 1. Calculate and copy the snapshots, tags and schemas which should be copied from the + // main branch to target branch. + calculateCopyMainBranchToTargetBranch(branchName); + // 2. Update the Main Branch File to the target branch. + updateMainBranchToTargetBranch(branchName); + // 3.Drop the previous main branch, including snapshots, tags and schemas. + dropPreviousMainBranch(tagDirectory, snapshotDirectory, schemaDirectory); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** Calculate copy main branch to target branch. */ + private void calculateCopyMainBranchToTargetBranch(String branchName) throws IOException { + TableBranch fromBranch = + this.branches().stream() + .filter(branch -> branch.getBranchName().equals(branchName)) + .findFirst() + .orElse(null); + if (fromBranch == null) { + throw new RuntimeException(String.format("No branches found %s", branchName)); + } + // Copy tags. + List tags = tagManager.tableTags(); + TableTag fromTag = + tags.stream() + .filter( + tableTag -> + tableTag.getTagName() + .equals(fromBranch.getCreatedFromTag())) + .findFirst() + .get(); + for (TableTag tag : tags) { + if (tagManager.branchTagExists(branchName, tag.getTagName())) { + // If it already exists, skip it directly. + continue; + } + if (tag.getCreateTime() < fromTag.getCreateTime()) { + fileIO.copyFileUtf8( + tagManager.tagPath(tag.getTagName()), + tagManager.branchTagPath(branchName, tag.getTagName())); + } + } + // Copy snapshots. + Iterator snapshots = snapshotManager.snapshots(); + Snapshot fromSnapshot = snapshotManager.snapshot(fromBranch.getCreatedFromSnapshot()); + while (snapshots.hasNext()) { + Snapshot snapshot = snapshots.next(); + if (snapshotManager.branchSnapshotExists(branchName, snapshot.id())) { + // If it already exists, skip it directly. + continue; + } + if (snapshot.id() < fromSnapshot.id()) { + fileIO.copyFileUtf8( + snapshotManager.snapshotPath(snapshot.id()), + snapshotManager.branchSnapshotPath(branchName, snapshot.id())); + } + } + + // Copy schemas. + List schemaIds = schemaManager.listAllIds(); + Set existsSchemas = new HashSet<>(schemaManager.listAllIdsWithBranch(branchName)); + for (Long schemaId : schemaIds) { + TableSchema tableSchema = schemaManager.schema(schemaId); + if (existsSchemas.contains(schemaId)) { + // If it already exists, skip it directly. + continue; + } + if (tableSchema.id() < fromSnapshot.schemaId()) { + fileIO.copyFileUtf8( + schemaManager.toSchemaPath(schemaId), + schemaManager.branchSchemaPath(branchName, schemaId)); + } + } + } + + /** Update main branch to target branch. */ + private void updateMainBranchToTargetBranch(String branchName) throws IOException { + commitMainBranch(branchName); + } + + /** Directly delete snapshot, tag , schema directory. */ + private void dropPreviousMainBranch( + Path tagDirectory, Path snapshotDirectory, Path schemaDirectory) throws IOException { + // Delete tags. + fileIO.delete(tagDirectory, true); + + // Delete snapshots. + fileIO.delete(snapshotDirectory, true); + + // Delete schemas. + fileIO.delete(schemaDirectory, true); + } + /** Check if path exists. */ public boolean fileExists(Path path) { try { @@ -169,7 +295,7 @@ public List branches() { String branchName = path.getLeft().getName().substring(BRANCH_PREFIX.length()); FileStoreTable branchTable = FileStoreTableFactory.create( - fileIO, new Path(getBranchPath(tablePath, branchName))); + fileIO, new Path(getBranchPath(fileIO, tablePath, branchName))); SortedMap> snapshotTags = branchTable.tagManager().tags(); checkArgument(!snapshotTags.isEmpty()); Snapshot snapshot = snapshotTags.firstKey(); @@ -184,4 +310,26 @@ public List branches() { throw new RuntimeException(e); } } + + /** Forward branch name. */ + public static String forwardBranchName(FileIO fileIO, Path tablePath, String branchName) { + if (branchName.equals(DEFAULT_MAIN_BRANCH)) { + Path path = new Path(tablePath, MAIN_BRANCH_FILE); + try { + if (fileIO.exists(path)) { + String data = fileIO.readFileUtf8(path); + if (StringUtils.isBlank(data)) { + return DEFAULT_MAIN_BRANCH; + } else { + return data; + } + } else { + return DEFAULT_MAIN_BRANCH; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return branchName; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index b330fc30389f4..74b143776c52c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -75,20 +75,23 @@ public Path tablePath() { } public Path snapshotDirectory() { - return new Path(tablePath + "/snapshot"); + return branchSnapshotDirectory(DEFAULT_MAIN_BRANCH); } public Path snapshotPath(long snapshotId) { - return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); + return branchSnapshotPath(DEFAULT_MAIN_BRANCH, snapshotId); } public Path branchSnapshotDirectory(String branchName) { - return new Path(getBranchPath(tablePath, branchName) + "/snapshot"); + return new Path(getBranchPath(fileIO, tablePath, branchName) + "/snapshot"); } public Path branchSnapshotPath(String branchName, long snapshotId) { return new Path( - getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); + getBranchPath(fileIO, tablePath, branchName) + + "/snapshot/" + + SNAPSHOT_PREFIX + + snapshotId); } public Path snapshotPathByBranch(String branchName, long snapshotId) { @@ -112,6 +115,17 @@ public Snapshot snapshot(String branchName, long snapshotId) { return Snapshot.fromPath(fileIO, snapshotPath); } + public boolean branchSnapshotExists(String branchName, long snapshotId) { + Path path = snapshotPathByBranch(branchName, snapshotId); + try { + return fileIO.exists(path); + } catch (IOException e) { + throw new RuntimeException( + "Failed to determine if snapshot #" + snapshotId + " exists in path " + path, + e); + } + } + public boolean snapshotExists(long snapshotId) { Path path = snapshotPath(snapshotId); try { @@ -250,6 +264,13 @@ public Iterator snapshots() throws IOException { .iterator(); } + public Iterator snapshotsWithBranch(String branchName) throws IOException { + return listVersionedFiles(fileIO, snapshotDirByBranch(branchName), SNAPSHOT_PREFIX) + .map(snapshotId -> snapshot(branchName, snapshotId)) + .sorted(Comparator.comparingLong(Snapshot::id)) + .iterator(); + } + /** * If {@link FileNotFoundException} is thrown when reading the snapshot file, this snapshot may * be deleted by other processes, so just skip this snapshot. @@ -457,6 +478,12 @@ private Long findByListFiles(BinaryOperator reducer, String branchName) .orElse(null); } + public void deleteLatestHint(String branchName) throws IOException { + Path snapshotDir = snapshotDirByBranch(branchName); + Path hintFile = new Path(snapshotDir, LATEST); + fileIO.delete(hintFile, false); + } + public void commitLatestHint(long snapshotId) throws IOException { commitLatestHint(snapshotId, DEFAULT_MAIN_BRANCH); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index a29a3e151c766..8371994406125 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -26,6 +26,7 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.tag.TableTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,11 +36,13 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -61,17 +64,22 @@ public TagManager(FileIO fileIO, Path tablePath) { /** Return the root Directory of tags. */ public Path tagDirectory() { - return new Path(tablePath + "/tag"); + return branchTagDirectory(DEFAULT_MAIN_BRANCH); + } + + public Path branchTagDirectory(String branchName) { + return new Path(getBranchPath(fileIO, tablePath, branchName) + "/tag"); } /** Return the path of a tag. */ public Path tagPath(String tagName) { - return new Path(tablePath + "/tag/" + TAG_PREFIX + tagName); + return branchTagPath(DEFAULT_MAIN_BRANCH, tagName); } /** Return the path of a tag in branch. */ public Path branchTagPath(String branchName, String tagName) { - return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName); + return new Path( + getBranchPath(fileIO, tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName); } /** Create a tag from given snapshot and save it in the storage. */ @@ -191,6 +199,19 @@ private void doClean( taggedSnapshot, tagDeletion.manifestSkippingSet(skippedSnapshots)); } + /** Check if a tag exists. */ + public boolean branchTagExists(String branchName, String tagName) { + Path path = branchTagPath(branchName, tagName); + try { + return fileIO.exists(path); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Failed to determine if tag '%s' exists in path %s.", tagName, path), + e); + } + } + /** Check if a tag exists. */ public boolean tagExists(String tagName) { Path path = tagPath(tagName); @@ -228,6 +249,11 @@ public SortedMap> tags() { return tags(tagName -> true); } + /** Get all tagged snapshots with names sorted by snapshot id. */ + public SortedMap> tagsWithBranch(String branchName) { + return tagsWithBranch(tagName -> true, branchName); + } + /** * Retrieves a sorted map of snapshots filtered based on a provided predicate. The predicate * determines which tag names should be included in the result. Only snapshots with tag names @@ -240,11 +266,21 @@ public SortedMap> tags() { * @throws RuntimeException if an IOException occurs during retrieval of snapshots. */ public SortedMap> tags(Predicate filter) { + return tagsWithBranch(filter, null); + } + + public SortedMap> tagsWithBranch( + Predicate filter, String branchName) { TreeMap> tags = new TreeMap<>(Comparator.comparingLong(Snapshot::id)); try { + + Path tagDirectory = + StringUtils.isBlank(branchName) + ? tagDirectory() + : branchTagDirectory(branchName); List paths = - listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX) + listVersionedFileStatus(fileIO, tagDirectory, TAG_PREFIX) .map(FileStatus::getPath) .collect(Collectors.toList()); @@ -288,6 +324,34 @@ public List allTagNames() { return tags().values().stream().flatMap(Collection::stream).collect(Collectors.toList()); } + public List tableTags() { + return branchTableTags(DEFAULT_MAIN_BRANCH); + } + + public List branchTableTags(String branchName) { + List tags = new ArrayList<>(); + try { + + Path tagDirectory = + branchName.equals(DEFAULT_MAIN_BRANCH) + ? tagDirectory() + : branchTagDirectory(branchName); + + List> paths = + listVersionedFileStatus(fileIO, tagDirectory, TAG_PREFIX) + .map(status -> Pair.of(status.getPath(), status.getModificationTime())) + .collect(Collectors.toList()); + + for (Map.Entry path : paths) { + String tagName = path.getKey().getName().substring(TAG_PREFIX.length()); + tags.add(new TableTag(tagName, path.getValue())); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return tags; + } + private int findIndex(Snapshot taggedSnapshot, List taggedSnapshots) { for (int i = 0; i < taggedSnapshots.size(); i++) { if (taggedSnapshot.id() == taggedSnapshots.get(i).id()) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java new file mode 100644 index 0000000000000..10ef4a67ae875 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceBranchProcedure.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Replace branch procedure for given branch. Usage: + * + *

+ *  CALL sys.replace_branch('tableId', 'branchName')
+ * 
+ */ +public class ReplaceBranchProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "replace_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call(ProcedureContext procedureContext, String tableId, String branchName) + throws Catalog.TableNotExistException { + return innerCall(tableId, branchName); + } + + private String[] innerCall(String tableId, String branchName) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + table.replaceBranch(branchName); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 04f2b7933b274..50c913a33c822 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -45,3 +45,4 @@ org.apache.paimon.flink.procedure.MigrateFileProcedure org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure org.apache.paimon.flink.procedure.QueryServiceProcedure org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure +org.apache.paimon.flink.procedure.ReplaceBranchProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index 8d445ab95b07e..f661d91e07c4b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -21,21 +21,29 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.TableScan; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BranchManager; +import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.junit.Assert; import org.junit.jupiter.api.Test; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init; import static org.assertj.core.api.Assertions.assertThat; /** IT cases for branch management actions. */ class BranchActionITCase extends ActionITCaseBase { + @Test void testCreateAndDeleteBranch() throws Exception { @@ -78,4 +86,89 @@ void testCreateAndDeleteBranch() throws Exception { "CALL sys.delete_branch('%s.%s', 'branch_name')", database, tableName)); assertThat(branchManager.branchExists("branch_name")).isFalse(); } + + @Test + void testReplaceBranchToTargetBranch() throws Exception { + init(warehouse); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, + new String[] {"k", "v"}); + FileStoreTable table = + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("k"), + Collections.emptyMap()); + + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // 3 snapshots + writeData(rowData(1L, BinaryString.fromString("Hi"))); + writeData(rowData(2L, BinaryString.fromString("Hello"))); + writeData(rowData(3L, BinaryString.fromString("Paimon"))); + + // Create tag2 + TagManager tagManager = new TagManager(table.fileIO(), table.location()); + callProcedure( + String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + assertThat(tagManager.tagExists("tag2")).isTrue(); + + // Create replace_branch_name branch + BranchManager branchManager = table.branchManager(); + callProcedure( + String.format( + "CALL sys.create_branch('%s.%s', 'replace_branch_name', 'tag2')", + database, tableName)); + assertThat(branchManager.branchExists("replace_branch_name")).isTrue(); + + // Replace branch + callProcedure( + String.format( + "CALL sys.replace_branch('%s.%s', 'replace_branch_name')", + database, tableName)); + + // Check snapshot + SnapshotManager snapshotManager = table.snapshotManager(); + assertThat(snapshotManager.snapshotExists(3)).isFalse(); + + // Renew write + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // Add data, forward to replace branch + for (long i = 4; i < 14; i++) { + writeData(rowData(i, BinaryString.fromString(String.format("new.data_%s", i)))); + } + + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().plan(); + List result = + getResult( + readBuilder.newRead(), + plan == null ? Collections.emptyList() : plan.splits(), + rowType); + List sortedActual = new ArrayList<>(result); + List expected = + Arrays.asList( + "+I[1, Hi]", + "+I[2, Hello]", + "+I[4, new.data_4]", + "+I[5, new.data_5]", + "+I[6, new.data_6]", + "+I[7, new.data_7]", + "+I[8, new.data_8]", + "+I[9, new.data_9]", + "+I[10, new.data_10]", + "+I[11, new.data_11]", + "+I[12, new.data_12]", + "+I[13, new.data_13]"); + Assert.assertEquals(expected, sortedActual); + + callProcedure( + String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName)); + assertThat(tagManager.tagExists("tag3")).isTrue(); + } }