Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support parallel reading of local orphan clean #4320

Merged
merged 5 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,22 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;
import static org.apache.paimon.utils.ThreadPoolUtils.sequentialBatchedExecute;

/**
* Local {@link OrphanFilesClean}, it will use thread pool to execute deletion.
Expand All @@ -67,6 +71,8 @@ public class LocalOrphanFilesClean extends OrphanFilesClean {

private Set<String> candidateDeletes;

private final int parallelism;

public LocalOrphanFilesClean(FileStoreTable table) {
this(table, System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1));
}
Expand All @@ -79,9 +85,8 @@ public LocalOrphanFilesClean(
FileStoreTable table, long olderThanMillis, SerializableConsumer<Path> fileCleaner) {
super(table, olderThanMillis, fileCleaner);
this.deleteFiles = new ArrayList<>();
this.executor =
createCachedThreadPool(
table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN");
this.parallelism = table.coreOptions().deleteFileThreadNum();
this.executor = createCachedThreadPool(parallelism, "ORPHAN_FILES_CLEAN");
}

public List<Path> clean() throws IOException, ExecutionException, InterruptedException {
Expand Down Expand Up @@ -113,31 +118,62 @@ public List<Path> clean() throws IOException, ExecutionException, InterruptedExc
return deleteFiles;
}

private List<String> getUsedFiles(String branch) {
List<String> usedFiles = new ArrayList<>();
private void collectWithoutDataFile(
String branch, Consumer<String> usedFileConsumer, Consumer<String> manifestConsumer)
throws IOException {
randomlyOnlyExecute(
executor,
snapshot -> {
try {
collectWithoutDataFile(
branch, snapshot, usedFileConsumer, manifestConsumer);
} catch (IOException e) {
throw new RuntimeException(e);
}
},
safelyGetAllSnapshots(branch));
}

private Set<String> getUsedFiles(String branch) {
Set<String> usedFiles = ConcurrentHashMap.newKeySet();
ManifestFile manifestFile =
table.switchToBranch(branch).store().manifestFileFactory().create();
try {
Set<String> manifests = new HashSet<>();
Set<String> manifests = ConcurrentHashMap.newKeySet();
collectWithoutDataFile(branch, usedFiles::add, manifests::add);
List<String> dataFiles = new ArrayList<>();
for (String manifestName : manifests) {
retryReadingFiles(
() -> manifestFile.readWithIOException(manifestName),
Collections.<ManifestEntry>emptyList())
.stream()
.map(ManifestEntry::file)
.forEach(
f -> {
if (candidateDeletes.contains(f.fileName())) {
dataFiles.add(f.fileName());
}
f.extraFiles().stream()
.filter(candidateDeletes::contains)
.forEach(dataFiles::add);
});
Iterable<String> dataFiles =
sequentialBatchedExecute(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why using sequentialBatchedExecute? Maybe use randomlyExecute?

Copy link
Contributor Author

@bknbkn bknbkn Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace sequentialBatchedExecute with randomlyOnlyExecute , detail in #4320 (comment)

executor,
manifestName -> {
try {
List<String> dataFilesInBatch = new ArrayList<>();
retryReadingFiles(
() ->
manifestFile.readWithIOException(
manifestName),
Collections.<ManifestEntry>emptyList())
.stream()
.map(ManifestEntry::file)
.forEach(
f -> {
if (candidateDeletes.contains(
f.fileName())) {
dataFilesInBatch.add(f.fileName());
}
f.extraFiles().stream()
.filter(candidateDeletes::contains)
.forEach(dataFilesInBatch::add);
});
return dataFilesInBatch;
} catch (IOException e) {
throw new RuntimeException(e);
}
},
new ArrayList<>(manifests),
parallelism);
for (String fileName : dataFiles) {
usedFiles.add(fileName);
}
usedFiles.addAll(dataFiles);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,6 @@ protected Set<Snapshot> safelyGetAllSnapshots(String branch) throws IOException
return readSnapshots;
}

protected void collectWithoutDataFile(
String branch, Consumer<String> usedFileConsumer, Consumer<String> manifestConsumer)
throws IOException {
for (Snapshot snapshot : safelyGetAllSnapshots(branch)) {
collectWithoutDataFile(branch, snapshot, usedFileConsumer, manifestConsumer);
}
}

protected void collectWithoutDataFile(
String branch,
Snapshot snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -52,6 +54,8 @@
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.branchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;

/** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */
public class SnapshotManager implements Serializable {
Expand Down Expand Up @@ -414,8 +418,7 @@ public List<Path> snapshotPaths(Predicate<Long> predicate) throws IOException {
}

public Iterator<Snapshot> snapshotsWithinRange(
Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId)
throws IOException {
Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId) {
Long lowerBoundSnapshotId = earliestSnapshotId();
Long upperBoundSnapshotId = latestSnapshotId();
Long lowerId;
Expand Down Expand Up @@ -457,7 +460,7 @@ public Iterator<Snapshot> snapshotsWithinRange(

public Iterator<Changelog> changelogs() throws IOException {
return listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX)
.map(snapshotId -> changelog(snapshotId))
.map(this::changelog)
.sorted(Comparator.comparingLong(Changelog::id))
.iterator();
}
Expand All @@ -469,38 +472,62 @@ public Iterator<Changelog> changelogs() throws IOException {
public List<Snapshot> safelyGetAllSnapshots() throws IOException {
List<Path> paths =
listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX)
.map(id -> snapshotPath(id))
.map(this::snapshotPath)
.collect(Collectors.toList());

List<Snapshot> snapshots = new ArrayList<>();
for (Path path : paths) {
try {
snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path)));
} catch (FileNotFoundException ignored) {
}
}
List<Snapshot> snapshots = Collections.synchronizedList(new ArrayList<>(paths.size()));
collectSnapshots(
path -> {
try {
snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path)));
} catch (IOException e) {
if (!(e instanceof FileNotFoundException)) {
throw new RuntimeException(e);
}
}
},
paths);

return snapshots;
}

public List<Changelog> safelyGetAllChangelogs() throws IOException {
List<Path> paths =
listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX)
.map(id -> longLivedChangelogPath(id))
.map(this::longLivedChangelogPath)
.collect(Collectors.toList());

List<Changelog> changelogs = new ArrayList<>();
for (Path path : paths) {
try {
String json = fileIO.readFileUtf8(path);
changelogs.add(Changelog.fromJson(json));
} catch (FileNotFoundException ignored) {
}
}
List<Changelog> changelogs = Collections.synchronizedList(new ArrayList<>(paths.size()));
collectSnapshots(
path -> {
try {
changelogs.add(Changelog.fromJson(fileIO.readFileUtf8(path)));
} catch (IOException e) {
if (!(e instanceof FileNotFoundException)) {
throw new RuntimeException(e);
}
}
},
paths);

return changelogs;
}

private void collectSnapshots(Consumer<Path> pathConsumer, List<Path> paths)
throws IOException {
ExecutorService executor =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use ThreadPoolUtils#createCachedThreadPool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix

createCachedThreadPool(
Runtime.getRuntime().availableProcessors(), "SNAPSHOT_COLLECTOR");

try {
randomlyOnlyExecute(executor, pathConsumer, paths);
} catch (RuntimeException e) {
throw new IOException(e);
} finally {
executor.shutdown();
}
}

/**
* Try to get non snapshot files. If any error occurred, just ignore it and return an empty
* result.
Expand Down
Loading