From aa92afaecfa6dd7dba57c130efe9262f75344b5e Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Fri, 12 Apr 2024 18:09:59 +0800 Subject: [PATCH] [core] Add listDirectories to FileIO (#3205) --- .../java/org/apache/paimon/fs/FileIO.java | 17 +++++++++++++ .../paimon/catalog/FileSystemCatalog.java | 5 ++-- .../apache/paimon/utils/BranchManager.java | 6 ++--- .../org/apache/paimon/utils/FileUtils.java | 24 +++++++++++++++++++ 4 files changed, 47 insertions(+), 5 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 84c1040ea24d..ae8ed60eb06a 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -37,6 +37,7 @@ import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -102,6 +103,22 @@ public interface FileIO extends Serializable { */ FileStatus[] listStatus(Path path) throws IOException; + /** + * List the statuses of the directories in the given path if the path is a directory. + * + *

{@link FileIO} implementation may have optimization for list directories. + * + * @param path given path + * @return the statuses of the directories in the given path + */ + default FileStatus[] listDirectories(Path path) throws IOException { + FileStatus[] statuses = listStatus(path); + if (statuses != null) { + statuses = Arrays.stream(statuses).filter(FileStatus::isDir).toArray(FileStatus[]::new); + } + return statuses; + } + /** * Check if exists. * diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 8ffe0f271916..1e4e5b0ebaaa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -59,7 +59,7 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) { @Override public List listDatabases() { List databases = new ArrayList<>(); - for (FileStatus status : uncheck(() -> fileIO.listStatus(warehouse))) { + for (FileStatus status : uncheck(() -> fileIO.listDirectories(warehouse))) { Path path = status.getPath(); if (status.isDir() && isDatabase(path)) { databases.add(database(path)); @@ -100,7 +100,8 @@ protected void dropDatabaseImpl(String name) { @Override protected List listTablesImpl(String databaseName) { List tables = new ArrayList<>(); - for (FileStatus status : uncheck(() -> fileIO.listStatus(newDatabasePath(databaseName)))) { + for (FileStatus status : + uncheck(() -> fileIO.listDirectories(newDatabasePath(databaseName)))) { if (status.isDir() && tableExists(status.getPath())) { tables.add(status.getPath().getName()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 95034e05a6f4..4656deb67671 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -37,7 +37,7 @@ import java.util.SortedMap; import java.util.stream.Collectors; -import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; +import static org.apache.paimon.utils.FileUtils.listVersionedDirectories; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Manager for {@code Branch}. */ @@ -153,7 +153,7 @@ public boolean branchExists(String branchName) { /** Get branch count for the table. */ public long branchCount() { try { - return listVersionedFileStatus(fileIO, branchDirectory(), BRANCH_PREFIX).count(); + return listVersionedDirectories(fileIO, branchDirectory(), BRANCH_PREFIX).count(); } catch (IOException e) { throw new RuntimeException(e); } @@ -163,7 +163,7 @@ public long branchCount() { public List branches() { try { List> paths = - listVersionedFileStatus(fileIO, branchDirectory(), BRANCH_PREFIX) + listVersionedDirectories(fileIO, branchDirectory(), BRANCH_PREFIX) .map(status -> Pair.of(status.getPath(), status.getModificationTime())) .collect(Collectors.toList()); PriorityQueue pq = diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java index eddc7273e76b..17823f34d92a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java @@ -118,6 +118,30 @@ public static Stream listVersionedFileStatus(FileIO fileIO, Path dir .filter(status -> status.getPath().getName().startsWith(prefix)); } + /** + * List versioned directories for the directory. + * + * @return file status stream + */ + public static Stream listVersionedDirectories( + FileIO fileIO, Path dir, String prefix) throws IOException { + if (!fileIO.exists(dir)) { + return Stream.empty(); + } + + FileStatus[] statuses = fileIO.listDirectories(dir); + + if (statuses == null) { + throw new RuntimeException( + String.format( + "The return value is null of the listStatus for the '%s' directory.", + dir)); + } + + return Arrays.stream(statuses) + .filter(status -> status.getPath().getName().startsWith(prefix)); + } + public static void checkExists(FileIO fileIO, Path file) throws IOException { if (!fileIO.exists(file)) { throw new FileNotFoundException(