Skip to content

Commit

Permalink
[core] Add listDirectories to FileIO (apache#3205)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Apr 12, 2024
1 parent 0d40e68 commit aa92afa
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 5 deletions.
17 changes: 17 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -102,6 +103,22 @@ public interface FileIO extends Serializable {
*/
FileStatus[] listStatus(Path path) throws IOException;

/**
* List the statuses of the directories in the given path if the path is a directory.
*
* <p>{@link FileIO} implementation may have optimization for list directories.
*
* @param path given path
* @return the statuses of the directories in the given path
*/
default FileStatus[] listDirectories(Path path) throws IOException {
FileStatus[] statuses = listStatus(path);
if (statuses != null) {
statuses = Arrays.stream(statuses).filter(FileStatus::isDir).toArray(FileStatus[]::new);
}
return statuses;
}

/**
* Check if exists.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) {
@Override
public List<String> listDatabases() {
List<String> databases = new ArrayList<>();
for (FileStatus status : uncheck(() -> fileIO.listStatus(warehouse))) {
for (FileStatus status : uncheck(() -> fileIO.listDirectories(warehouse))) {
Path path = status.getPath();
if (status.isDir() && isDatabase(path)) {
databases.add(database(path));
Expand Down Expand Up @@ -100,7 +100,8 @@ protected void dropDatabaseImpl(String name) {
@Override
protected List<String> listTablesImpl(String databaseName) {
List<String> tables = new ArrayList<>();
for (FileStatus status : uncheck(() -> fileIO.listStatus(newDatabasePath(databaseName)))) {
for (FileStatus status :
uncheck(() -> fileIO.listDirectories(newDatabasePath(databaseName)))) {
if (status.isDir() && tableExists(status.getPath())) {
tables.add(status.getPath().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import java.util.SortedMap;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.FileUtils.listVersionedDirectories;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Manager for {@code Branch}. */
Expand Down Expand Up @@ -153,7 +153,7 @@ public boolean branchExists(String branchName) {
/** Get branch count for the table. */
public long branchCount() {
try {
return listVersionedFileStatus(fileIO, branchDirectory(), BRANCH_PREFIX).count();
return listVersionedDirectories(fileIO, branchDirectory(), BRANCH_PREFIX).count();
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -163,7 +163,7 @@ public long branchCount() {
public List<TableBranch> branches() {
try {
List<Pair<Path, Long>> paths =
listVersionedFileStatus(fileIO, branchDirectory(), BRANCH_PREFIX)
listVersionedDirectories(fileIO, branchDirectory(), BRANCH_PREFIX)
.map(status -> Pair.of(status.getPath(), status.getModificationTime()))
.collect(Collectors.toList());
PriorityQueue<TableBranch> pq =
Expand Down
24 changes: 24 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,30 @@ public static Stream<FileStatus> listVersionedFileStatus(FileIO fileIO, Path dir
.filter(status -> status.getPath().getName().startsWith(prefix));
}

/**
* List versioned directories for the directory.
*
* @return file status stream
*/
public static Stream<FileStatus> listVersionedDirectories(
FileIO fileIO, Path dir, String prefix) throws IOException {
if (!fileIO.exists(dir)) {
return Stream.empty();
}

FileStatus[] statuses = fileIO.listDirectories(dir);

if (statuses == null) {
throw new RuntimeException(
String.format(
"The return value is null of the listStatus for the '%s' directory.",
dir));
}

return Arrays.stream(statuses)
.filter(status -> status.getPath().getName().startsWith(prefix));
}

public static void checkExists(FileIO fileIO, Path file) throws IOException {
if (!fileIO.exists(file)) {
throw new FileNotFoundException(
Expand Down

0 comments on commit aa92afa

Please sign in to comment.