From c84bd0061c198067c6b6e4f164072a872a9924b0 Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Thu, 5 Sep 2024 21:05:27 +0800 Subject: [PATCH 1/2] hotfix branch manager gets schema, tag and snapshot paths method to reduce constant string usage. --- .../apache/paimon/schema/SchemaManager.java | 8 ++ .../apache/paimon/utils/BranchManager.java | 80 ++++++------------- .../apache/paimon/utils/SnapshotManager.java | 9 ++- .../org/apache/paimon/utils/TagManager.java | 17 ++-- 4 files changed, 49 insertions(+), 65 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index e164213e2bf8..a506c88281e2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -64,6 +64,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.apache.paimon.catalog.Catalog.DB_SUFFIX; @@ -588,6 +589,13 @@ public Path toSchemaPath(long schemaId) { return new Path(branchPath() + "/schema/" + SCHEMA_PREFIX + schemaId); } + public List schemaPaths(Predicate predicate) throws IOException { + return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX) + .filter(predicate) + .map(this::toSchemaPath) + .collect(Collectors.toList()); + } + /** * Delete schema with specific id. * diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 5a439a8ec715..905998a39cb3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -20,7 +20,6 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -29,12 +28,12 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.paimon.utils.FileUtils.listVersionedDirectories; -import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Manager for {@code Branch}. */ @@ -87,17 +86,7 @@ public Path branchPath(String branchName) { /** Create empty branch. */ public void createBranch(String branchName) { - checkArgument( - !isMainBranch(branchName), - String.format( - "Branch name '%s' is the default branch and cannot be used.", - DEFAULT_MAIN_BRANCH)); - checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); - checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName); - checkArgument( - !branchName.chars().allMatch(Character::isDigit), - "Branch name cannot be pure numeric string but is '%s'.", - branchName); + validateBranch(branchName); try { TableSchema latestSchema = schemaManager.latest().get(); @@ -115,18 +104,8 @@ branchName, branchPath(tablePath, branchName)), } public void createBranch(String branchName, String tagName) { - checkArgument( - !isMainBranch(branchName), - String.format( - "Branch name '%s' is the default branch and cannot be created.", - DEFAULT_MAIN_BRANCH)); - checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); - checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName); + validateBranch(branchName); checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName); - checkArgument( - !branchName.chars().allMatch(Character::isDigit), - "Branch name cannot be pure numeric string but is '%s'.", - branchName); Snapshot snapshot = tagManager.taggedSnapshot(tagName); @@ -170,10 +149,7 @@ public void deleteBranch(String branchName) { /** Check if path exists. */ public boolean fileExists(Path path) { try { - if (fileIO.exists(path)) { - return true; - } - return false; + return fileIO.exists(path); } catch (IOException e) { throw new RuntimeException( String.format("Failed to determine if path '%s' exists.", path), e); @@ -197,37 +173,15 @@ public void fastForward(String branchName) { // Delete snapshot, schema, and tag from the main branch which occurs after // earliestSnapshotId List deleteSnapshotPaths = - listVersionedFileStatus( - fileIO, snapshotManager.snapshotDirectory(), "snapshot-") - .map(FileStatus::getPath) - .filter( - path -> - Snapshot.fromPath(fileIO, path).id() - >= earliestSnapshotId) - .collect(Collectors.toList()); - List deleteSchemaPaths = - listVersionedFileStatus(fileIO, schemaManager.schemaDirectory(), "schema-") - .map(FileStatus::getPath) - .filter( - path -> - TableSchema.fromPath(fileIO, path).id() - >= earliestSchemaId) - .collect(Collectors.toList()); + snapshotManager.snapshotPaths(id -> id >= earliestSnapshotId); + List deleteSchemaPaths = schemaManager.schemaPaths(id -> id >= earliestSchemaId); List deleteTagPaths = - listVersionedFileStatus(fileIO, tagManager.tagDirectory(), "tag-") - .map(FileStatus::getPath) - .filter( - path -> - Snapshot.fromPath(fileIO, path).id() - >= earliestSnapshotId) - .collect(Collectors.toList()); + tagManager.tagPaths( + path -> Snapshot.fromPath(fileIO, path).id() >= earliestSnapshotId); List deletePaths = - Stream.concat( - Stream.concat( - deleteSnapshotPaths.stream(), - deleteSchemaPaths.stream()), - deleteTagPaths.stream()) + Stream.of(deleteSnapshotPaths, deleteSchemaPaths, deleteTagPaths) + .flatMap(Collection::stream) .collect(Collectors.toList()); // Delete latest snapshot hint @@ -271,4 +225,18 @@ public List branches() { throw new RuntimeException(e); } } + + private void validateBranch(String branchName) { + checkArgument( + !isMainBranch(branchName), + String.format( + "Branch name '%s' is the default branch and cannot be used.", + DEFAULT_MAIN_BRANCH)); + checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); + checkArgument( + !branchName.chars().allMatch(Character::isDigit), + "Branch name cannot be pure numeric string but is '%s'.", + branchName); + checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 7a150854403a..65696e438255 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -390,11 +390,18 @@ public long snapshotCount() throws IOException { public Iterator snapshots() throws IOException { return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX) - .map(id -> snapshot(id)) + .map(this::snapshot) .sorted(Comparator.comparingLong(Snapshot::id)) .iterator(); } + public List snapshotPaths(Predicate predicate) throws IOException { + return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX) + .filter(predicate) + .map(this::snapshotPath) + .collect(Collectors.toList()); + } + public Iterator snapshotsWithinRange( Optional optionalMaxSnapshotId, Optional optionalMinSnapshotId) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 84b298a0eff8..f12bf5ec2e11 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -88,6 +88,13 @@ public Path tagPath(String tagName) { return new Path(branchPath(tablePath, branch) + "/tag/" + TAG_PREFIX + tagName); } + public List tagPaths(Predicate predicate) throws IOException { + return listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX) + .map(FileStatus::getPath) + .filter(predicate) + .collect(Collectors.toList()); + } + /** Create a tag from given snapshot and save it in the storage. */ public void createTag( Snapshot snapshot, @@ -305,10 +312,7 @@ public SortedMap> tags(Predicate filter) { TreeMap> tags = new TreeMap<>(Comparator.comparingLong(Snapshot::id)); try { - List paths = - listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX) - .map(FileStatus::getPath) - .collect(Collectors.toList()); + List paths = tagPaths(path -> true); for (Path path : paths) { String tagName = path.getName().substring(TAG_PREFIX.length()); @@ -333,10 +337,7 @@ public SortedMap> tags(Predicate filter) { /** Get all {@link Tag}s. */ public List> tagObjects() { try { - List paths = - listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX) - .map(FileStatus::getPath) - .collect(Collectors.toList()); + List paths = tagPaths(path -> true); List> tags = new ArrayList<>(); for (Path path : paths) { String tagName = path.getName().substring(TAG_PREFIX.length()); From 58eff1d62a49e832617dce27cd7559483d62d0fb Mon Sep 17 00:00:00 2001 From: zhuangchong Date: Fri, 6 Sep 2024 09:20:39 +0800 Subject: [PATCH 2/2] fix test error --- .../java/org/apache/paimon/table/FileStoreTableTestBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index fe82e9f07abe..0a1f599033c1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1168,7 +1168,7 @@ public void testUnsupportedBranchName() throws Exception { .satisfies( anyCauseMatches( IllegalArgumentException.class, - "Branch name 'main' is the default branch and cannot be created.")); + "Branch name 'main' is the default branch and cannot be used.")); assertThatThrownBy(() -> table.createBranch("branch-1", "tag1")) .satisfies(