From 7100dc0649af43cb875b6efdc99e4815671ebd75 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Wed, 6 Mar 2024 19:46:00 +0800 Subject: [PATCH] fixed --- .../paimon/operation/FileStoreCommitImpl.java | 7 +--- .../apache/paimon/utils/BranchManager.java | 11 ++--- .../apache/paimon/utils/SnapshotManager.java | 42 +++++++------------ .../org/apache/paimon/utils/TagManager.java | 16 ++++--- .../paimon/table/FileStoreTableTestBase.java | 5 +-- 5 files changed, 29 insertions(+), 52 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 0264f1e45a3b..4b9ae9ed3e7a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -71,8 +71,6 @@ import java.util.concurrent.Callable; import java.util.stream.Collectors; -import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; - /** * Default implementation of {@link FileStoreCommit}. * @@ -707,10 +705,7 @@ public boolean tryCommitOnce( @Nullable String newStatsFileName) { long newSnapshotId = latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1; - Path newSnapshotPath = - branchName.equals(DEFAULT_MAIN_BRANCH) - ? snapshotManager.snapshotPath(newSnapshotId) - : snapshotManager.branchSnapshotPath(branchName, newSnapshotId); + Path newSnapshotPath = snapshotManager.snapshotPath(branchName, newSnapshotId); if (LOG.isDebugEnabled()) { LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 96e07805d7e0..6ed54372760f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -85,9 +85,6 @@ public Path branchDirectory() { /** Return the path string of a branch. */ public static String getBranchPath(FileIO fileIO, Path tablePath, String branchName) { - if (StringUtils.isBlank(branchName)) { - return tablePath.toString(); - } if (branchName.equals(DEFAULT_MAIN_BRANCH)) { branchName = forwardBranchName(fileIO, tablePath, branchName); } @@ -122,10 +119,10 @@ public void createBranch(String branchName, String tagName) { try { // Copy the corresponding tag, snapshot and schema files into the branch directory fileIO.copyFileUtf8( - tagManager.tagPath(tagName), tagManager.branchTagPath(branchName, tagName)); + tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName)); fileIO.copyFileUtf8( snapshotManager.snapshotPath(snapshot.id()), - snapshotManager.branchSnapshotPath(branchName, snapshot.id())); + snapshotManager.snapshotPath(branchName, snapshot.id())); fileIO.copyFileUtf8( schemaManager.toSchemaPath(snapshot.schemaId()), schemaManager.branchSchemaPath(branchName, snapshot.schemaId())); @@ -201,7 +198,7 @@ private void calculateCopyMainBranchToTargetBranch(String branchName) throws IOE if (tag.getCreateTime() < fromTag.getCreateTime()) { fileIO.copyFileUtf8( tagManager.tagPath(tag.getTagName()), - tagManager.branchTagPath(branchName, tag.getTagName())); + tagManager.tagPath(branchName, tag.getTagName())); } } // Copy snapshots. @@ -216,7 +213,7 @@ private void calculateCopyMainBranchToTargetBranch(String branchName) throws IOE if (snapshot.id() < fromSnapshot.id()) { fileIO.copyFileUtf8( snapshotManager.snapshotPath(snapshot.id()), - snapshotManager.branchSnapshotPath(branchName, snapshot.id())); + snapshotManager.snapshotPath(branchName, snapshot.id())); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 74b143776c52..0282a7e53c96 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -75,18 +75,18 @@ public Path tablePath() { } public Path snapshotDirectory() { - return branchSnapshotDirectory(DEFAULT_MAIN_BRANCH); + return snapshotDirectory(DEFAULT_MAIN_BRANCH); } - public Path snapshotPath(long snapshotId) { - return branchSnapshotPath(DEFAULT_MAIN_BRANCH, snapshotId); + public Path snapshotDirectory(String branchName) { + return new Path(getBranchPath(fileIO, tablePath, branchName) + "/snapshot"); } - public Path branchSnapshotDirectory(String branchName) { - return new Path(getBranchPath(fileIO, tablePath, branchName) + "/snapshot"); + public Path snapshotPath(long snapshotId) { + return snapshotPath(DEFAULT_MAIN_BRANCH, snapshotId); } - public Path branchSnapshotPath(String branchName, long snapshotId) { + public Path snapshotPath(String branchName, long snapshotId) { return new Path( getBranchPath(fileIO, tablePath, branchName) + "/snapshot/" @@ -94,29 +94,17 @@ public Path branchSnapshotPath(String branchName, long snapshotId) { + snapshotId); } - public Path snapshotPathByBranch(String branchName, long snapshotId) { - return branchName.equals(DEFAULT_MAIN_BRANCH) - ? snapshotPath(snapshotId) - : branchSnapshotPath(branchName, snapshotId); - } - - public Path snapshotDirByBranch(String branchName) { - return branchName.equals(DEFAULT_MAIN_BRANCH) - ? snapshotDirectory() - : branchSnapshotDirectory(branchName); - } - public Snapshot snapshot(long snapshotId) { return snapshot(DEFAULT_MAIN_BRANCH, snapshotId); } public Snapshot snapshot(String branchName, long snapshotId) { - Path snapshotPath = snapshotPathByBranch(branchName, snapshotId); + Path snapshotPath = snapshotPath(branchName, snapshotId); return Snapshot.fromPath(fileIO, snapshotPath); } public boolean branchSnapshotExists(String branchName, long snapshotId) { - Path path = snapshotPathByBranch(branchName, snapshotId); + Path path = snapshotPath(branchName, snapshotId); try { return fileIO.exists(path); } catch (IOException e) { @@ -265,7 +253,7 @@ public Iterator snapshots() throws IOException { } public Iterator snapshotsWithBranch(String branchName) throws IOException { - return listVersionedFiles(fileIO, snapshotDirByBranch(branchName), SNAPSHOT_PREFIX) + return listVersionedFiles(fileIO, snapshotDirectory(branchName), SNAPSHOT_PREFIX) .map(snapshotId -> snapshot(branchName, snapshotId)) .sorted(Comparator.comparingLong(Snapshot::id)) .iterator(); @@ -415,7 +403,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { } private @Nullable Long findLatest(String branchName) throws IOException { - Path snapshotDir = snapshotDirByBranch(branchName); + Path snapshotDir = snapshotDirectory(branchName); if (!fileIO.exists(snapshotDir)) { return null; } @@ -433,7 +421,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { } private @Nullable Long findEarliest(String branchName) throws IOException { - Path snapshotDir = snapshotDirByBranch(branchName); + Path snapshotDir = snapshotDirectory(branchName); if (!fileIO.exists(snapshotDir)) { return null; } @@ -452,7 +440,7 @@ public Long readHint(String fileName) { } public Long readHint(String fileName, String branchName) { - Path snapshotDir = snapshotDirByBranch(branchName); + Path snapshotDir = snapshotDirectory(branchName); Path path = new Path(snapshotDir, fileName); int retryNumber = 0; while (retryNumber++ < READ_HINT_RETRY_NUM) { @@ -472,14 +460,14 @@ public Long readHint(String fileName, String branchName) { private Long findByListFiles(BinaryOperator reducer, String branchName) throws IOException { - Path snapshotDir = snapshotDirByBranch(branchName); + Path snapshotDir = snapshotDirectory(branchName); return listVersionedFiles(fileIO, snapshotDir, SNAPSHOT_PREFIX) .reduce(reducer) .orElse(null); } public void deleteLatestHint(String branchName) throws IOException { - Path snapshotDir = snapshotDirByBranch(branchName); + Path snapshotDir = snapshotDirectory(branchName); Path hintFile = new Path(snapshotDir, LATEST); fileIO.delete(hintFile, false); } @@ -502,7 +490,7 @@ public void commitEarliestHint(long snapshotId, String branchName) throws IOExce private void commitHint(long snapshotId, String fileName, String branchName) throws IOException { - Path snapshotDir = snapshotDirByBranch(branchName); + Path snapshotDir = snapshotDirectory(branchName); Path hintFile = new Path(snapshotDir, fileName); fileIO.overwriteFileUtf8(hintFile, String.valueOf(snapshotId)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 837199440612..6af17ec8444e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -64,20 +64,20 @@ public TagManager(FileIO fileIO, Path tablePath) { /** Return the root Directory of tags. */ public Path tagDirectory() { - return branchTagDirectory(DEFAULT_MAIN_BRANCH); + return tagDirectory(DEFAULT_MAIN_BRANCH); } - public Path branchTagDirectory(String branchName) { + public Path tagDirectory(String branchName) { return new Path(getBranchPath(fileIO, tablePath, branchName) + "/tag"); } /** Return the path of a tag. */ public Path tagPath(String tagName) { - return branchTagPath(DEFAULT_MAIN_BRANCH, tagName); + return tagPath(DEFAULT_MAIN_BRANCH, tagName); } /** Return the path of a tag in branch. */ - public Path branchTagPath(String branchName, String tagName) { + public Path tagPath(String branchName, String tagName) { return new Path( getBranchPath(fileIO, tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName); } @@ -201,7 +201,7 @@ private void doClean( /** Check if a tag exists. */ public boolean branchTagExists(String branchName, String tagName) { - Path path = branchTagPath(branchName, tagName); + Path path = tagPath(branchName, tagName); try { return fileIO.exists(path); } catch (IOException e) { @@ -276,9 +276,7 @@ public SortedMap> tagsWithBranch( try { Path tagDirectory = - StringUtils.isBlank(branchName) - ? tagDirectory() - : branchTagDirectory(branchName); + StringUtils.isBlank(branchName) ? tagDirectory() : tagDirectory(branchName); List paths = listVersionedFileStatus(fileIO, tagDirectory, TAG_PREFIX) .map(FileStatus::getPath) @@ -335,7 +333,7 @@ public List branchTableTags(String branchName) { Path tagDirectory = branchName.equals(DEFAULT_MAIN_BRANCH) ? tagDirectory() - : branchTagDirectory(branchName); + : tagDirectory(branchName); List> paths = listVersionedFileStatus(fileIO, tagDirectory, TAG_PREFIX) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index b9b117182ec8..a07b5f355344 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -949,15 +949,14 @@ public void testCreateBranch() throws Exception { // verify test-tag in test-branch is equal to snapshot 2 Snapshot branchTag = Snapshot.fromPath( - new TraceableFileIO(), tagManager.branchTagPath("test-branch", "test-tag")); + new TraceableFileIO(), tagManager.tagPath("test-branch", "test-tag")); assertThat(branchTag.equals(snapshot2)).isTrue(); // verify snapshot in test-branch is equal to snapshot 2 SnapshotManager snapshotManager = new SnapshotManager(new TraceableFileIO(), tablePath); Snapshot branchSnapshot = Snapshot.fromPath( - new TraceableFileIO(), - snapshotManager.branchSnapshotPath("test-branch", 2)); + new TraceableFileIO(), snapshotManager.snapshotPath("test-branch", 2)); assertThat(branchSnapshot.equals(snapshot2)).isTrue(); // verify schema in test-branch is equal to schema 0