From 31b9277ae3fa37c4e8e31e66a82f1d933c4171ea Mon Sep 17 00:00:00 2001 From: baokainan Date: Fri, 27 Sep 2024 18:21:15 +0800 Subject: [PATCH 1/5] [core] Support local orphan clean parallel reading --- .../operation/LocalOrphanFilesClean.java | 66 +++++++++++++------ .../paimon/operation/OrphanFilesClean.java | 8 --- .../apache/paimon/utils/SnapshotManager.java | 57 +++++++++++----- 3 files changed, 87 insertions(+), 44 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index f3f58107ae38..5db8b693c38e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -40,18 +40,21 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute; +import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute; /** * Local {@link OrphanFilesClean}, it will use thread pool to execute deletion. @@ -113,31 +116,52 @@ public List clean() throws IOException, ExecutionException, InterruptedExc return deleteFiles; } - private List getUsedFiles(String branch) { - List usedFiles = new ArrayList<>(); + protected void collectWithoutDataFile( + String branch, Consumer usedFileConsumer, Consumer manifestConsumer) + throws IOException { + randomlyOnlyExecute( + executor, + snapshot -> { + try { + collectWithoutDataFile( + branch, snapshot, usedFileConsumer, manifestConsumer); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, + safelyGetAllSnapshots(branch)); + } + + private Set getUsedFiles(String branch) { + Set usedFiles = ConcurrentHashMap.newKeySet(); ManifestFile manifestFile = table.switchToBranch(branch).store().manifestFileFactory().create(); try { - Set manifests = new HashSet<>(); + Set manifests = ConcurrentHashMap.newKeySet(); collectWithoutDataFile(branch, usedFiles::add, manifests::add); - List dataFiles = new ArrayList<>(); - for (String manifestName : manifests) { - retryReadingFiles( - () -> manifestFile.readWithIOException(manifestName), - Collections.emptyList()) - .stream() - .map(ManifestEntry::file) - .forEach( - f -> { - if (candidateDeletes.contains(f.fileName())) { - dataFiles.add(f.fileName()); - } - f.extraFiles().stream() - .filter(candidateDeletes::contains) - .forEach(dataFiles::add); - }); - } - usedFiles.addAll(dataFiles); + randomlyOnlyExecute( + executor, + manifestName -> { + try { + retryReadingFiles( + () -> manifestFile.readWithIOException(manifestName), + Collections.emptyList()) + .stream() + .map(ManifestEntry::file) + .forEach( + f -> { + if (candidateDeletes.contains(f.fileName())) { + usedFiles.add(f.fileName()); + } + f.extraFiles().stream() + .filter(candidateDeletes::contains) + .forEach(usedFiles::add); + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, + manifests); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 0f2bad27fc9a..5698908cb9b0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -146,14 +146,6 @@ protected Set safelyGetAllSnapshots(String branch) throws IOException return readSnapshots; } - protected void collectWithoutDataFile( - String branch, Consumer usedFileConsumer, Consumer manifestConsumer) - throws IOException { - for (Snapshot snapshot : safelyGetAllSnapshots(branch)) { - collectWithoutDataFile(branch, snapshot, usedFileConsumer, manifestConsumer); - } - } - protected void collectWithoutDataFile( String branch, Snapshot snapshot, diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 4dda63960fdb..c17ade8895db 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -42,8 +42,11 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.BinaryOperator; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -52,6 +55,7 @@ import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.branchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; +import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute; /** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */ public class SnapshotManager implements Serializable { @@ -472,13 +476,18 @@ public List safelyGetAllSnapshots() throws IOException { .map(id -> snapshotPath(id)) .collect(Collectors.toList()); - List snapshots = new ArrayList<>(); - for (Path path : paths) { - try { - snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path))); - } catch (FileNotFoundException ignored) { - } - } + List snapshots = Collections.synchronizedList(new ArrayList<>()); + collectSnapshots( + path -> { + try { + snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path))); + } catch (IOException e) { + if (!(e instanceof FileNotFoundException)) { + throw new RuntimeException(e); + } + } + }, + paths); return snapshots; } @@ -489,18 +498,36 @@ public List safelyGetAllChangelogs() throws IOException { .map(id -> longLivedChangelogPath(id)) .collect(Collectors.toList()); - List changelogs = new ArrayList<>(); - for (Path path : paths) { - try { - String json = fileIO.readFileUtf8(path); - changelogs.add(Changelog.fromJson(json)); - } catch (FileNotFoundException ignored) { - } - } + List changelogs = Collections.synchronizedList(new ArrayList<>()); + collectSnapshots( + path -> { + try { + changelogs.add(Changelog.fromJson(fileIO.readFileUtf8(path))); + } catch (IOException e) { + if (!(e instanceof FileNotFoundException)) { + throw new RuntimeException(e); + } + } + }, + paths); return changelogs; } + private void collectSnapshots(Consumer pathConsumer, List paths) + throws IOException { + ExecutorService executor = + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + + try { + randomlyOnlyExecute(executor, pathConsumer, paths); + } catch (RuntimeException e) { + throw new IOException(e); + } finally { + executor.shutdown(); + } + } + /** * Try to get non snapshot files. If any error occurred, just ignore it and return an empty * result. From 953f39f160308e166fc92e4b3db853571700ee5b Mon Sep 17 00:00:00 2001 From: baokainan Date: Mon, 14 Oct 2024 13:40:12 +0800 Subject: [PATCH 2/5] fix style --- .../java/org/apache/paimon/operation/LocalOrphanFilesClean.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 5db8b693c38e..3ee108c10359 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -116,7 +116,7 @@ public List clean() throws IOException, ExecutionException, InterruptedExc return deleteFiles; } - protected void collectWithoutDataFile( + private void collectWithoutDataFile( String branch, Consumer usedFileConsumer, Consumer manifestConsumer) throws IOException { randomlyOnlyExecute( From 29e56f0550e603a504c6c16db9fc22c93b9ad9c2 Mon Sep 17 00:00:00 2001 From: baokainan Date: Mon, 14 Oct 2024 15:23:15 +0800 Subject: [PATCH 3/5] fix style --- .../apache/paimon/utils/SnapshotManager.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index c17ade8895db..95eccd140beb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -43,7 +43,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.BinaryOperator; import java.util.function.Consumer; @@ -55,6 +54,7 @@ import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.branchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; +import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute; /** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */ @@ -418,8 +418,7 @@ public List snapshotPaths(Predicate predicate) throws IOException { } public Iterator snapshotsWithinRange( - Optional optionalMaxSnapshotId, Optional optionalMinSnapshotId) - throws IOException { + Optional optionalMaxSnapshotId, Optional optionalMinSnapshotId) { Long lowerBoundSnapshotId = earliestSnapshotId(); Long upperBoundSnapshotId = latestSnapshotId(); Long lowerId; @@ -461,7 +460,7 @@ public Iterator snapshotsWithinRange( public Iterator changelogs() throws IOException { return listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX) - .map(snapshotId -> changelog(snapshotId)) + .map(this::changelog) .sorted(Comparator.comparingLong(Changelog::id)) .iterator(); } @@ -473,10 +472,10 @@ public Iterator changelogs() throws IOException { public List safelyGetAllSnapshots() throws IOException { List paths = listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX) - .map(id -> snapshotPath(id)) + .map(this::snapshotPath) .collect(Collectors.toList()); - List snapshots = Collections.synchronizedList(new ArrayList<>()); + List snapshots = Collections.synchronizedList(new ArrayList<>(paths.size())); collectSnapshots( path -> { try { @@ -495,10 +494,10 @@ public List safelyGetAllSnapshots() throws IOException { public List safelyGetAllChangelogs() throws IOException { List paths = listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX) - .map(id -> longLivedChangelogPath(id)) + .map(this::longLivedChangelogPath) .collect(Collectors.toList()); - List changelogs = Collections.synchronizedList(new ArrayList<>()); + List changelogs = Collections.synchronizedList(new ArrayList<>(paths.size())); collectSnapshots( path -> { try { @@ -517,7 +516,8 @@ public List safelyGetAllChangelogs() throws IOException { private void collectSnapshots(Consumer pathConsumer, List paths) throws IOException { ExecutorService executor = - Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + createCachedThreadPool( + Runtime.getRuntime().availableProcessors(), "SNAPSHOT_COLLECTOR"); try { randomlyOnlyExecute(executor, pathConsumer, paths); From 43350a67ac39a27b05c92454d9d417a2bd090138 Mon Sep 17 00:00:00 2001 From: baokainan Date: Mon, 14 Oct 2024 20:07:55 +0800 Subject: [PATCH 4/5] use sequentialBatchedExecute --- .../operation/LocalOrphanFilesClean.java | 64 +++++++++++-------- 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 3ee108c10359..89fb68e8d6ab 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -55,6 +55,7 @@ import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute; import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute; +import static org.apache.paimon.utils.ThreadPoolUtils.sequentialBatchedExecute; /** * Local {@link OrphanFilesClean}, it will use thread pool to execute deletion. @@ -70,6 +71,8 @@ public class LocalOrphanFilesClean extends OrphanFilesClean { private Set candidateDeletes; + private final int parallelism; + public LocalOrphanFilesClean(FileStoreTable table) { this(table, System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1)); } @@ -82,9 +85,8 @@ public LocalOrphanFilesClean( FileStoreTable table, long olderThanMillis, SerializableConsumer fileCleaner) { super(table, olderThanMillis, fileCleaner); this.deleteFiles = new ArrayList<>(); - this.executor = - createCachedThreadPool( - table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN"); + this.parallelism = table.coreOptions().deleteFileThreadNum(); + this.executor = createCachedThreadPool(parallelism, "ORPHAN_FILES_CLEAN"); } public List clean() throws IOException, ExecutionException, InterruptedException { @@ -139,29 +141,39 @@ private Set getUsedFiles(String branch) { try { Set manifests = ConcurrentHashMap.newKeySet(); collectWithoutDataFile(branch, usedFiles::add, manifests::add); - randomlyOnlyExecute( - executor, - manifestName -> { - try { - retryReadingFiles( - () -> manifestFile.readWithIOException(manifestName), - Collections.emptyList()) - .stream() - .map(ManifestEntry::file) - .forEach( - f -> { - if (candidateDeletes.contains(f.fileName())) { - usedFiles.add(f.fileName()); - } - f.extraFiles().stream() - .filter(candidateDeletes::contains) - .forEach(usedFiles::add); - }); - } catch (IOException e) { - throw new RuntimeException(e); - } - }, - manifests); + Iterable dataFiles = + sequentialBatchedExecute( + executor, + manifestName -> { + try { + List dataFilesInBatch = new ArrayList<>(); + retryReadingFiles( + () -> + manifestFile.readWithIOException( + manifestName), + Collections.emptyList()) + .stream() + .map(ManifestEntry::file) + .forEach( + f -> { + if (candidateDeletes.contains( + f.fileName())) { + dataFilesInBatch.add(f.fileName()); + } + f.extraFiles().stream() + .filter(candidateDeletes::contains) + .forEach(dataFilesInBatch::add); + }); + return dataFilesInBatch; + } catch (IOException e) { + throw new RuntimeException(e); + } + }, + new ArrayList<>(manifests), + parallelism); + for (String fileName : dataFiles) { + usedFiles.add(fileName); + } } catch (IOException e) { throw new RuntimeException(e); } From 0705afa788634c294f62a96c92574cab0d07b291 Mon Sep 17 00:00:00 2001 From: bknbkn <674631410@qq.com> Date: Wed, 16 Oct 2024 13:33:18 +0800 Subject: [PATCH 5/5] Revert "use sequentialBatchedExecute" This reverts commit 43350a67ac39a27b05c92454d9d417a2bd090138. --- .../operation/LocalOrphanFilesClean.java | 64 ++++++++----------- 1 file changed, 26 insertions(+), 38 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 89fb68e8d6ab..3ee108c10359 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -55,7 +55,6 @@ import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute; import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute; -import static org.apache.paimon.utils.ThreadPoolUtils.sequentialBatchedExecute; /** * Local {@link OrphanFilesClean}, it will use thread pool to execute deletion. @@ -71,8 +70,6 @@ public class LocalOrphanFilesClean extends OrphanFilesClean { private Set candidateDeletes; - private final int parallelism; - public LocalOrphanFilesClean(FileStoreTable table) { this(table, System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1)); } @@ -85,8 +82,9 @@ public LocalOrphanFilesClean( FileStoreTable table, long olderThanMillis, SerializableConsumer fileCleaner) { super(table, olderThanMillis, fileCleaner); this.deleteFiles = new ArrayList<>(); - this.parallelism = table.coreOptions().deleteFileThreadNum(); - this.executor = createCachedThreadPool(parallelism, "ORPHAN_FILES_CLEAN"); + this.executor = + createCachedThreadPool( + table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN"); } public List clean() throws IOException, ExecutionException, InterruptedException { @@ -141,39 +139,29 @@ private Set getUsedFiles(String branch) { try { Set manifests = ConcurrentHashMap.newKeySet(); collectWithoutDataFile(branch, usedFiles::add, manifests::add); - Iterable dataFiles = - sequentialBatchedExecute( - executor, - manifestName -> { - try { - List dataFilesInBatch = new ArrayList<>(); - retryReadingFiles( - () -> - manifestFile.readWithIOException( - manifestName), - Collections.emptyList()) - .stream() - .map(ManifestEntry::file) - .forEach( - f -> { - if (candidateDeletes.contains( - f.fileName())) { - dataFilesInBatch.add(f.fileName()); - } - f.extraFiles().stream() - .filter(candidateDeletes::contains) - .forEach(dataFilesInBatch::add); - }); - return dataFilesInBatch; - } catch (IOException e) { - throw new RuntimeException(e); - } - }, - new ArrayList<>(manifests), - parallelism); - for (String fileName : dataFiles) { - usedFiles.add(fileName); - } + randomlyOnlyExecute( + executor, + manifestName -> { + try { + retryReadingFiles( + () -> manifestFile.readWithIOException(manifestName), + Collections.emptyList()) + .stream() + .map(ManifestEntry::file) + .forEach( + f -> { + if (candidateDeletes.contains(f.fileName())) { + usedFiles.add(f.fileName()); + } + f.extraFiles().stream() + .filter(candidateDeletes::contains) + .forEach(usedFiles::add); + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + }, + manifests); } catch (IOException e) { throw new RuntimeException(e); }