Skip to content

Commit

Permalink
[core] Improve fault tolerance for data spill to disk. (#4675)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongyujiang authored Dec 10, 2024
1 parent 84eadce commit 90a37c8
Showing 1 changed file with 16 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -63,24 +65,32 @@ public FileChannelManagerImpl(String[] tempDirs, String prefix) {
}

private static File[] createFiles(String[] tempDirs, String prefix) {
File[] files = new File[tempDirs.length];
List<File> filesList = new ArrayList<>();
for (int i = 0; i < tempDirs.length; i++) {
File baseDir = new File(tempDirs[i]);
String subfolder = String.format("paimon-%s-%s", prefix, UUID.randomUUID());
File storageDir = new File(baseDir, subfolder);

if (!storageDir.exists() && !storageDir.mkdirs()) {
throw new RuntimeException(
"Could not create storage directory for FileChannelManager: "
+ storageDir.getAbsolutePath());
LOG.warn(
"Failed to create directory {}, temp directory {} will not be used",
storageDir.getAbsolutePath(),
tempDirs[i]);
continue;
}
files[i] = storageDir;

filesList.add(storageDir);

LOG.debug(
"FileChannelManager uses directory {} for spill files.",
storageDir.getAbsolutePath());
}
return files;

if (filesList.isEmpty()) {
throw new RuntimeException("No available temporary directories");
}

return filesList.toArray(new File[0]);
}

@Override
Expand Down

0 comments on commit 90a37c8

Please sign in to comment.