diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index b5a0040d151e..6612012704c2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -64,6 +64,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.apache.paimon.catalog.Catalog.DB_SUFFIX; @@ -588,6 +589,13 @@ public Path toSchemaPath(long schemaId) { return new Path(branchPath() + "/schema/" + SCHEMA_PREFIX + schemaId); } + public List schemaPaths(Predicate predicate) throws IOException { + return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX) + .filter(predicate) + .map(this::toSchemaPath) + .collect(Collectors.toList()); + } + /** * Delete schema with specific id. * diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index dac4a52cbedf..d398ca9c3f42 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -20,7 +20,6 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -29,12 +28,12 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.paimon.utils.FileUtils.listVersionedDirectories; -import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Manager for {@code Branch}. */ @@ -87,20 +86,7 @@ public Path branchPath(String branchName) { /** Create empty branch. */ public void createBranch(String branchName) { - checkArgument( - !isMainBranch(branchName), - String.format( - "Branch name '%s' is the default branch and cannot be used.", - DEFAULT_MAIN_BRANCH)); - checkArgument( - !StringUtils.isNullOrWhitespaceOnly(branchName), - "Branch name '%s' is blank.", - branchName); - checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName); - checkArgument( - !branchName.chars().allMatch(Character::isDigit), - "Branch name cannot be pure numeric string but is '%s'.", - branchName); + validateBranch(branchName); try { TableSchema latestSchema = schemaManager.latest().get(); @@ -118,21 +104,8 @@ branchName, branchPath(tablePath, branchName)), } public void createBranch(String branchName, String tagName) { - checkArgument( - !isMainBranch(branchName), - String.format( - "Branch name '%s' is the default branch and cannot be created.", - DEFAULT_MAIN_BRANCH)); - checkArgument( - !StringUtils.isNullOrWhitespaceOnly(branchName), - "Branch name '%s' is blank.", - branchName); - checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName); + validateBranch(branchName); checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName); - checkArgument( - !branchName.chars().allMatch(Character::isDigit), - "Branch name cannot be pure numeric string but is '%s'.", - branchName); Snapshot snapshot = tagManager.taggedSnapshot(tagName); @@ -176,10 +149,7 @@ public void deleteBranch(String branchName) { /** Check if path exists. */ public boolean fileExists(Path path) { try { - if (fileIO.exists(path)) { - return true; - } - return false; + return fileIO.exists(path); } catch (IOException e) { throw new RuntimeException( String.format("Failed to determine if path '%s' exists.", path), e); @@ -206,37 +176,15 @@ public void fastForward(String branchName) { // Delete snapshot, schema, and tag from the main branch which occurs after // earliestSnapshotId List deleteSnapshotPaths = - listVersionedFileStatus( - fileIO, snapshotManager.snapshotDirectory(), "snapshot-") - .map(FileStatus::getPath) - .filter( - path -> - Snapshot.fromPath(fileIO, path).id() - >= earliestSnapshotId) - .collect(Collectors.toList()); - List deleteSchemaPaths = - listVersionedFileStatus(fileIO, schemaManager.schemaDirectory(), "schema-") - .map(FileStatus::getPath) - .filter( - path -> - TableSchema.fromPath(fileIO, path).id() - >= earliestSchemaId) - .collect(Collectors.toList()); + snapshotManager.snapshotPaths(id -> id >= earliestSnapshotId); + List deleteSchemaPaths = schemaManager.schemaPaths(id -> id >= earliestSchemaId); List deleteTagPaths = - listVersionedFileStatus(fileIO, tagManager.tagDirectory(), "tag-") - .map(FileStatus::getPath) - .filter( - path -> - Snapshot.fromPath(fileIO, path).id() - >= earliestSnapshotId) - .collect(Collectors.toList()); + tagManager.tagPaths( + path -> Snapshot.fromPath(fileIO, path).id() >= earliestSnapshotId); List deletePaths = - Stream.concat( - Stream.concat( - deleteSnapshotPaths.stream(), - deleteSchemaPaths.stream()), - deleteTagPaths.stream()) + Stream.of(deleteSnapshotPaths, deleteSchemaPaths, deleteTagPaths) + .flatMap(Collection::stream) .collect(Collectors.toList()); // Delete latest snapshot hint @@ -280,4 +228,21 @@ public List branches() { throw new RuntimeException(e); } } + + private void validateBranch(String branchName) { + checkArgument( + !isMainBranch(branchName), + String.format( + "Branch name '%s' is the default branch and cannot be used.", + DEFAULT_MAIN_BRANCH)); + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(branchName), + "Branch name '%s' is blank.", + branchName); + checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName); + checkArgument( + !branchName.chars().allMatch(Character::isDigit), + "Branch name cannot be pure numeric string but is '%s'.", + branchName); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index c151373e62fe..dc2f087cd459 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -391,11 +391,18 @@ public long snapshotCount() throws IOException { public Iterator snapshots() throws IOException { return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX) - .map(id -> snapshot(id)) + .map(this::snapshot) .sorted(Comparator.comparingLong(Snapshot::id)) .iterator(); } + public List snapshotPaths(Predicate predicate) throws IOException { + return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX) + .filter(predicate) + .map(this::snapshotPath) + .collect(Collectors.toList()); + } + public Iterator snapshotsWithinRange( Optional optionalMaxSnapshotId, Optional optionalMinSnapshotId) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index f3a64c195dd7..089e87c86452 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -88,6 +88,13 @@ public Path tagPath(String tagName) { return new Path(branchPath(tablePath, branch) + "/tag/" + TAG_PREFIX + tagName); } + public List tagPaths(Predicate predicate) throws IOException { + return listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX) + .map(FileStatus::getPath) + .filter(predicate) + .collect(Collectors.toList()); + } + /** Create a tag from given snapshot and save it in the storage. */ public void createTag( Snapshot snapshot, @@ -307,10 +314,7 @@ public SortedMap> tags(Predicate filter) { TreeMap> tags = new TreeMap<>(Comparator.comparingLong(Snapshot::id)); try { - List paths = - listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX) - .map(FileStatus::getPath) - .collect(Collectors.toList()); + List paths = tagPaths(path -> true); for (Path path : paths) { String tagName = path.getName().substring(TAG_PREFIX.length()); @@ -335,10 +339,7 @@ public SortedMap> tags(Predicate filter) { /** Get all {@link Tag}s. */ public List> tagObjects() { try { - List paths = - listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX) - .map(FileStatus::getPath) - .collect(Collectors.toList()); + List paths = tagPaths(path -> true); List> tags = new ArrayList<>(); for (Path path : paths) { String tagName = path.getName().substring(TAG_PREFIX.length()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index fe82e9f07abe..0a1f599033c1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1168,7 +1168,7 @@ public void testUnsupportedBranchName() throws Exception { .satisfies( anyCauseMatches( IllegalArgumentException.class, - "Branch name 'main' is the default branch and cannot be created.")); + "Branch name 'main' is the default branch and cannot be used.")); assertThatThrownBy(() -> table.createBranch("branch-1", "tag1")) .satisfies(