Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support parallel reading of local orphan clean #4320

Merged
merged 5 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,21 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;

/**
* Local {@link OrphanFilesClean}, it will use thread pool to execute deletion.
Expand Down Expand Up @@ -113,31 +116,52 @@ public List<Path> clean() throws IOException, ExecutionException, InterruptedExc
return deleteFiles;
}

private List<String> getUsedFiles(String branch) {
List<String> usedFiles = new ArrayList<>();
private void collectWithoutDataFile(
String branch, Consumer<String> usedFileConsumer, Consumer<String> manifestConsumer)
throws IOException {
randomlyOnlyExecute(
executor,
snapshot -> {
try {
collectWithoutDataFile(
branch, snapshot, usedFileConsumer, manifestConsumer);
} catch (IOException e) {
throw new RuntimeException(e);
}
},
safelyGetAllSnapshots(branch));
}

private Set<String> getUsedFiles(String branch) {
Set<String> usedFiles = ConcurrentHashMap.newKeySet();
ManifestFile manifestFile =
table.switchToBranch(branch).store().manifestFileFactory().create();
try {
Set<String> manifests = new HashSet<>();
Set<String> manifests = ConcurrentHashMap.newKeySet();
collectWithoutDataFile(branch, usedFiles::add, manifests::add);
List<String> dataFiles = new ArrayList<>();
for (String manifestName : manifests) {
retryReadingFiles(
() -> manifestFile.readWithIOException(manifestName),
Collections.<ManifestEntry>emptyList())
.stream()
.map(ManifestEntry::file)
.forEach(
f -> {
if (candidateDeletes.contains(f.fileName())) {
dataFiles.add(f.fileName());
}
f.extraFiles().stream()
.filter(candidateDeletes::contains)
.forEach(dataFiles::add);
});
}
usedFiles.addAll(dataFiles);
randomlyOnlyExecute(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if manifests size is very big, use randomlyOnlyExecute may oom?
Do you think so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is indeed a risk of OOM.

But this can be avoided by reducing the parallelism (if it's equal to 1, then it's similar to serial execution before), and here it's just providing an ability to read in parallel.

If parallelism 1 is still OOM, then distributed mode is considered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about change randomlyOnlyExecute to sequentialBatchedExecute?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I repleaced it with sequentialBatchedExecute since It has better control over memory

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the end, you still need to put it in a set, so as long as there is no inflation during the reading process, there should be no risk of oom here.

Copy link
Contributor Author

@bknbkn bknbkn Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I test sequentialBatchedExecute randomlyOnlyExecute and randomlyExecute in 16 parallelism on table with 2PB of data , None of them happened OOM.

Besides sequentialBatchedExecute procedure cost 30 min , randomlyOnlyExecute and randomlyExecute cost about 20 min

So I prefer to use randomlyOnlyExecute because this method is more concise and no other variables are introduced, it can also reduce the gc process of temporary data files variable. The disadvantage of this method is that there will be waiting lock overhead while inserting useFiles, but this has no impact on the overall time consumption, because the overall time consumption mainly comes from IO.

What do you think? @JingsongLi @wwj6591812

executor,
manifestName -> {
try {
retryReadingFiles(
() -> manifestFile.readWithIOException(manifestName),
Collections.<ManifestEntry>emptyList())
.stream()
.map(ManifestEntry::file)
.forEach(
f -> {
if (candidateDeletes.contains(f.fileName())) {
usedFiles.add(f.fileName());
}
f.extraFiles().stream()
.filter(candidateDeletes::contains)
.forEach(usedFiles::add);
});
} catch (IOException e) {
throw new RuntimeException(e);
}
},
manifests);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,6 @@ protected Set<Snapshot> safelyGetAllSnapshots(String branch) throws IOException
return readSnapshots;
}

protected void collectWithoutDataFile(
String branch, Consumer<String> usedFileConsumer, Consumer<String> manifestConsumer)
throws IOException {
for (Snapshot snapshot : safelyGetAllSnapshots(branch)) {
collectWithoutDataFile(branch, snapshot, usedFileConsumer, manifestConsumer);
}
}

protected void collectWithoutDataFile(
String branch,
Snapshot snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -52,6 +54,8 @@
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.branchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;

/** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */
public class SnapshotManager implements Serializable {
Expand Down Expand Up @@ -414,8 +418,7 @@ public List<Path> snapshotPaths(Predicate<Long> predicate) throws IOException {
}

public Iterator<Snapshot> snapshotsWithinRange(
Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId)
throws IOException {
Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId) {
Long lowerBoundSnapshotId = earliestSnapshotId();
Long upperBoundSnapshotId = latestSnapshotId();
Long lowerId;
Expand Down Expand Up @@ -457,7 +460,7 @@ public Iterator<Snapshot> snapshotsWithinRange(

public Iterator<Changelog> changelogs() throws IOException {
return listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX)
.map(snapshotId -> changelog(snapshotId))
.map(this::changelog)
.sorted(Comparator.comparingLong(Changelog::id))
.iterator();
}
Expand All @@ -469,38 +472,62 @@ public Iterator<Changelog> changelogs() throws IOException {
public List<Snapshot> safelyGetAllSnapshots() throws IOException {
List<Path> paths =
listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX)
.map(id -> snapshotPath(id))
.map(this::snapshotPath)
.collect(Collectors.toList());

List<Snapshot> snapshots = new ArrayList<>();
for (Path path : paths) {
try {
snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path)));
} catch (FileNotFoundException ignored) {
}
}
List<Snapshot> snapshots = Collections.synchronizedList(new ArrayList<>(paths.size()));
collectSnapshots(
path -> {
try {
snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path)));
} catch (IOException e) {
if (!(e instanceof FileNotFoundException)) {
throw new RuntimeException(e);
}
}
},
paths);

return snapshots;
}

public List<Changelog> safelyGetAllChangelogs() throws IOException {
List<Path> paths =
listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX)
.map(id -> longLivedChangelogPath(id))
.map(this::longLivedChangelogPath)
.collect(Collectors.toList());

List<Changelog> changelogs = new ArrayList<>();
for (Path path : paths) {
try {
String json = fileIO.readFileUtf8(path);
changelogs.add(Changelog.fromJson(json));
} catch (FileNotFoundException ignored) {
}
}
List<Changelog> changelogs = Collections.synchronizedList(new ArrayList<>(paths.size()));
collectSnapshots(
path -> {
try {
changelogs.add(Changelog.fromJson(fileIO.readFileUtf8(path)));
} catch (IOException e) {
if (!(e instanceof FileNotFoundException)) {
throw new RuntimeException(e);
}
}
},
paths);

return changelogs;
}

private void collectSnapshots(Consumer<Path> pathConsumer, List<Path> paths)
throws IOException {
ExecutorService executor =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use ThreadPoolUtils#createCachedThreadPool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix

createCachedThreadPool(
Runtime.getRuntime().availableProcessors(), "SNAPSHOT_COLLECTOR");

try {
randomlyOnlyExecute(executor, pathConsumer, paths);
} catch (RuntimeException e) {
throw new IOException(e);
} finally {
executor.shutdown();
}
}

/**
* Try to get non snapshot files. If any error occurred, just ignore it and return an empty
* result.
Expand Down
Loading