Skip to content

Commit

Permalink
[core] Optimized iterative list implementations for FileIO (apache#4952)
Browse files Browse the repository at this point in the history
  • Loading branch information
smdsbz authored Jan 21, 2025
1 parent 878b99f commit cfb0075
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.RemoteIterator;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.hadoop.SerializableConfiguration;
import org.apache.paimon.utils.FunctionWithException;
Expand Down Expand Up @@ -104,6 +105,29 @@ public FileStatus[] listStatus(Path path) throws IOException {
return statuses;
}

@Override
public RemoteIterator<FileStatus> listFilesIterative(Path path, boolean recursive)
throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
org.apache.hadoop.fs.RemoteIterator<org.apache.hadoop.fs.LocatedFileStatus> hadoopIter =
getFileSystem(hadoopPath).listFiles(hadoopPath, recursive);
return new RemoteIterator<FileStatus>() {
@Override
public boolean hasNext() throws IOException {
return hadoopIter.hasNext();
}

@Override
public FileStatus next() throws IOException {
org.apache.hadoop.fs.FileStatus hadoopStatus = hadoopIter.next();
return new HadoopFileStatus(hadoopStatus);
}

@Override
public void close() throws IOException {}
};
}

@Override
public boolean exists(Path path) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -85,6 +87,43 @@ void testFileDoesNotExist() throws IOException {
assertThat(fs.exists(new Path(basePath, randomName()))).isFalse();
}

// --- list files

@Test
void testListFilesIterativeNonRecursive() throws IOException {
Path fileA = createRandomFileInDirectory(basePath);
Path dirB = new Path(basePath, randomName());
fs.mkdirs(dirB);
Path fileBC = createRandomFileInDirectory(dirB);

List<FileStatus> allFiles = new ArrayList<>();
try (RemoteIterator<FileStatus> iter = fs.listFilesIterative(basePath, false)) {
while (iter.hasNext()) {
allFiles.add(iter.next());
}
}
assertThat(allFiles.size()).isEqualTo(1);
assertThat(allFiles.get(0).getPath()).isEqualTo(fileA);
}

@Test
void testListFilesIterativeRecursive() throws IOException {
Path fileA = createRandomFileInDirectory(basePath);
Path dirB = new Path(basePath, randomName());
fs.mkdirs(dirB);
Path fileBC = createRandomFileInDirectory(dirB);

List<FileStatus> allFiles = new ArrayList<>();
try (RemoteIterator<FileStatus> iter = fs.listFilesIterative(basePath, true)) {
while (iter.hasNext()) {
allFiles.add(iter.next());
}
}
assertThat(allFiles.size()).isEqualTo(2);
assertThat(allFiles.stream().filter(f -> f.getPath().equals(fileA)).count()).isEqualTo(1);
assertThat(allFiles.stream().filter(f -> f.getPath().equals(fileBC)).count()).isEqualTo(1);
}

// --- delete

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.RemoteIterator;
import org.apache.paimon.fs.SeekableInputStream;

import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -77,6 +78,29 @@ public FileStatus[] listStatus(Path path) throws IOException {
return statuses;
}

@Override
public RemoteIterator<FileStatus> listFilesIterative(Path path, boolean recursive)
throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
org.apache.hadoop.fs.RemoteIterator<org.apache.hadoop.fs.LocatedFileStatus> hadoopIter =
getFileSystem(hadoopPath).listFiles(hadoopPath, recursive);
return new RemoteIterator<FileStatus>() {
@Override
public boolean hasNext() throws IOException {
return hadoopIter.hasNext();
}

@Override
public FileStatus next() throws IOException {
org.apache.hadoop.fs.FileStatus hadoopStatus = hadoopIter.next();
return new HadoopFileStatus(hadoopStatus);
}

@Override
public void close() throws IOException {}
};
}

@Override
public boolean exists(Path path) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.RemoteIterator;
import org.apache.paimon.fs.SeekableInputStream;

import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -77,6 +78,29 @@ public FileStatus[] listStatus(Path path) throws IOException {
return statuses;
}

@Override
public RemoteIterator<FileStatus> listFilesIterative(Path path, boolean recursive)
throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
org.apache.hadoop.fs.RemoteIterator<org.apache.hadoop.fs.LocatedFileStatus> hadoopIter =
getFileSystem(hadoopPath).listFiles(hadoopPath, recursive);
return new RemoteIterator<FileStatus>() {
@Override
public boolean hasNext() throws IOException {
return hadoopIter.hasNext();
}

@Override
public FileStatus next() throws IOException {
org.apache.hadoop.fs.FileStatus hadoopStatus = hadoopIter.next();
return new HadoopFileStatus(hadoopStatus);
}

@Override
public void close() throws IOException {}
};
}

@Override
public boolean exists(Path path) throws IOException {
org.apache.hadoop.fs.Path hadoopPath = path(path);
Expand Down

0 comments on commit cfb0075

Please sign in to comment.