Skip to content

Commit

Permalink
[core] Support parallel reading of local orphan clean (#4320)
Browse files Browse the repository at this point in the history
  • Loading branch information
bknbkn authored Oct 17, 2024
1 parent 82a0117 commit 3dcc5ae
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,21 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;

/**
* Local {@link OrphanFilesClean}, it will use thread pool to execute deletion.
Expand Down Expand Up @@ -113,31 +116,52 @@ public List<Path> clean() throws IOException, ExecutionException, InterruptedExc
return deleteFiles;
}

private List<String> getUsedFiles(String branch) {
List<String> usedFiles = new ArrayList<>();
private void collectWithoutDataFile(
String branch, Consumer<String> usedFileConsumer, Consumer<String> manifestConsumer)
throws IOException {
randomlyOnlyExecute(
executor,
snapshot -> {
try {
collectWithoutDataFile(
branch, snapshot, usedFileConsumer, manifestConsumer);
} catch (IOException e) {
throw new RuntimeException(e);
}
},
safelyGetAllSnapshots(branch));
}

private Set<String> getUsedFiles(String branch) {
Set<String> usedFiles = ConcurrentHashMap.newKeySet();
ManifestFile manifestFile =
table.switchToBranch(branch).store().manifestFileFactory().create();
try {
Set<String> manifests = new HashSet<>();
Set<String> manifests = ConcurrentHashMap.newKeySet();
collectWithoutDataFile(branch, usedFiles::add, manifests::add);
List<String> dataFiles = new ArrayList<>();
for (String manifestName : manifests) {
retryReadingFiles(
() -> manifestFile.readWithIOException(manifestName),
Collections.<ManifestEntry>emptyList())
.stream()
.map(ManifestEntry::file)
.forEach(
f -> {
if (candidateDeletes.contains(f.fileName())) {
dataFiles.add(f.fileName());
}
f.extraFiles().stream()
.filter(candidateDeletes::contains)
.forEach(dataFiles::add);
});
}
usedFiles.addAll(dataFiles);
randomlyOnlyExecute(
executor,
manifestName -> {
try {
retryReadingFiles(
() -> manifestFile.readWithIOException(manifestName),
Collections.<ManifestEntry>emptyList())
.stream()
.map(ManifestEntry::file)
.forEach(
f -> {
if (candidateDeletes.contains(f.fileName())) {
usedFiles.add(f.fileName());
}
f.extraFiles().stream()
.filter(candidateDeletes::contains)
.forEach(usedFiles::add);
});
} catch (IOException e) {
throw new RuntimeException(e);
}
},
manifests);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,6 @@ protected Set<Snapshot> safelyGetAllSnapshots(String branch) throws IOException
return readSnapshots;
}

protected void collectWithoutDataFile(
String branch, Consumer<String> usedFileConsumer, Consumer<String> manifestConsumer)
throws IOException {
for (Snapshot snapshot : safelyGetAllSnapshots(branch)) {
collectWithoutDataFile(branch, snapshot, usedFileConsumer, manifestConsumer);
}
}

protected void collectWithoutDataFile(
String branch,
Snapshot snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -52,6 +54,8 @@
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.branchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;

/** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */
public class SnapshotManager implements Serializable {
Expand Down Expand Up @@ -414,8 +418,7 @@ public List<Path> snapshotPaths(Predicate<Long> predicate) throws IOException {
}

public Iterator<Snapshot> snapshotsWithinRange(
Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId)
throws IOException {
Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId) {
Long lowerBoundSnapshotId = earliestSnapshotId();
Long upperBoundSnapshotId = latestSnapshotId();
Long lowerId;
Expand Down Expand Up @@ -457,7 +460,7 @@ public Iterator<Snapshot> snapshotsWithinRange(

public Iterator<Changelog> changelogs() throws IOException {
return listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX)
.map(snapshotId -> changelog(snapshotId))
.map(this::changelog)
.sorted(Comparator.comparingLong(Changelog::id))
.iterator();
}
Expand All @@ -469,38 +472,62 @@ public Iterator<Changelog> changelogs() throws IOException {
public List<Snapshot> safelyGetAllSnapshots() throws IOException {
List<Path> paths =
listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX)
.map(id -> snapshotPath(id))
.map(this::snapshotPath)
.collect(Collectors.toList());

List<Snapshot> snapshots = new ArrayList<>();
for (Path path : paths) {
try {
snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path)));
} catch (FileNotFoundException ignored) {
}
}
List<Snapshot> snapshots = Collections.synchronizedList(new ArrayList<>(paths.size()));
collectSnapshots(
path -> {
try {
snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path)));
} catch (IOException e) {
if (!(e instanceof FileNotFoundException)) {
throw new RuntimeException(e);
}
}
},
paths);

return snapshots;
}

public List<Changelog> safelyGetAllChangelogs() throws IOException {
List<Path> paths =
listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX)
.map(id -> longLivedChangelogPath(id))
.map(this::longLivedChangelogPath)
.collect(Collectors.toList());

List<Changelog> changelogs = new ArrayList<>();
for (Path path : paths) {
try {
String json = fileIO.readFileUtf8(path);
changelogs.add(Changelog.fromJson(json));
} catch (FileNotFoundException ignored) {
}
}
List<Changelog> changelogs = Collections.synchronizedList(new ArrayList<>(paths.size()));
collectSnapshots(
path -> {
try {
changelogs.add(Changelog.fromJson(fileIO.readFileUtf8(path)));
} catch (IOException e) {
if (!(e instanceof FileNotFoundException)) {
throw new RuntimeException(e);
}
}
},
paths);

return changelogs;
}

private void collectSnapshots(Consumer<Path> pathConsumer, List<Path> paths)
throws IOException {
ExecutorService executor =
createCachedThreadPool(
Runtime.getRuntime().availableProcessors(), "SNAPSHOT_COLLECTOR");

try {
randomlyOnlyExecute(executor, pathConsumer, paths);
} catch (RuntimeException e) {
throw new IOException(e);
} finally {
executor.shutdown();
}
}

/**
* Try to get non snapshot files. If any error occurred, just ignore it and return an empty
* result.
Expand Down

0 comments on commit 3dcc5ae

Please sign in to comment.