Skip to content

Commit

Permalink
[core] Introduce delete-file.thread-num (#3751)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jul 16, 2024
1 parent ba2d420 commit ae77340
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 17 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@
<td>Duration</td>
<td>The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication.</td>
</tr>
<tr>
<td><h5>delete-file.thread-num</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.</td>
</tr>
<tr>
<td><h5>delete.force-produce-changelog</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
13 changes: 13 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,14 @@ public class CoreOptions implements Serializable {
"When set to true, produce Iceberg metadata after a snapshot is committed, "
+ "so that Iceberg readers can read Paimon's raw files.");

public static final ConfigOption<Integer> DELETE_FILE_THREAD_NUM =
key("delete-file.thread-num")
.intType()
.noDefaultValue()
.withDescription(
"The maximum number of concurrent deleting files. "
+ "By default is the number of processors available to the Java virtual machine.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -1508,6 +1516,11 @@ public boolean cleanEmptyDirectories() {
return options.get(SNAPSHOT_CLEAN_EMPTY_DIRECTORIES);
}

public int deleteFileThreadNum() {
return options.getOptional(DELETE_FILE_THREAD_NUM)
.orElseGet(() -> Runtime.getRuntime().availableProcessors());
}

public ExpireConfig expireConfig() {
return ExpireConfig.builder()
.snapshotRetainMax(snapshotNumRetainMax())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.fs.FileIO;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static org.apache.paimon.utils.ThreadUtils.newDaemonThreadFactory;

/** Thread pool to delete files using {@link FileIO}. */
public class FileDeletionThreadPool {

private static ThreadPoolExecutor executorService =
createCachedThreadPool(Runtime.getRuntime().availableProcessors());

public static synchronized ThreadPoolExecutor getExecutorService(int threadNum) {
if (threadNum <= executorService.getMaximumPoolSize()) {
return executorService;
}
// we don't need to close previous pool
// it is just cached pool
executorService = createCachedThreadPool(threadNum);

return executorService;
}

private static ThreadPoolExecutor createCachedThreadPool(int threadNum) {
return new ThreadPoolExecutor(
0,
threadNum,
1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(),
newDaemonThreadFactory("DELETE-FILE-THREAD-POOL"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ public SnapshotDeletion newSnapshotDeletion() {
newIndexFileHandler(),
newStatsFileHandler(),
options.changelogProducer() != CoreOptions.ChangelogProducer.NONE,
options.cleanEmptyDirectories());
options.cleanEmptyDirectories(),
options.deleteFileThreadNum());
}

@Override
Expand All @@ -230,7 +231,8 @@ public ChangelogDeletion newChangelogDeletion() {
manifestListFactory().create(),
newIndexFileHandler(),
newStatsFileHandler(),
options.cleanEmptyDirectories());
options.cleanEmptyDirectories(),
options.deleteFileThreadNum());
}

@Override
Expand All @@ -247,7 +249,8 @@ public TagDeletion newTagDeletion() {
manifestListFactory().create(),
newIndexFileHandler(),
newStatsFileHandler(),
options.cleanEmptyDirectories());
options.cleanEmptyDirectories(),
options.deleteFileThreadNum());
}

public abstract Comparator<InternalRow> newKeyComparator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,17 @@ public ChangelogDeletion(
ManifestList manifestList,
IndexFileHandler indexFileHandler,
StatsFileHandler statsFileHandler,
boolean cleanEmptyDirectories) {
boolean cleanEmptyDirectories,
int deleteFileThreadNum) {
super(
fileIO,
pathFactory,
manifestFile,
manifestList,
indexFileHandler,
statsFileHandler,
cleanEmptyDirectories);
cleanEmptyDirectories,
deleteFileThreadNum);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.utils.FileDeletionThreadPool;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.TagManager;

Expand Down Expand Up @@ -71,9 +71,10 @@ public abstract class FileDeletionBase<T extends Snapshot> {
protected final IndexFileHandler indexFileHandler;
protected final StatsFileHandler statsFileHandler;
private final boolean cleanEmptyDirectories;

protected final Map<BinaryRow, Set<Integer>> deletionBuckets;
protected final Executor ioExecutor;

private final Executor deleteFileExecutor;

protected boolean changelogDecoupled;

/** Used to record which tag is cached. */
Expand All @@ -89,7 +90,8 @@ public FileDeletionBase(
ManifestList manifestList,
IndexFileHandler indexFileHandler,
StatsFileHandler statsFileHandler,
boolean cleanEmptyDirectories) {
boolean cleanEmptyDirectories,
int deleteFileThreadNum) {
this.fileIO = fileIO;
this.pathFactory = pathFactory;
this.manifestFile = manifestFile;
Expand All @@ -98,7 +100,7 @@ public FileDeletionBase(
this.statsFileHandler = statsFileHandler;
this.cleanEmptyDirectories = cleanEmptyDirectories;
this.deletionBuckets = new HashMap<>();
this.ioExecutor = FileUtils.COMMON_IO_FORK_JOIN_POOL;
this.deleteFileExecutor = FileDeletionThreadPool.getExecutorService(deleteFileThreadNum);
}

/**
Expand Down Expand Up @@ -455,15 +457,15 @@ private boolean tryDeleteEmptyDirectory(Path path) {
}
}

protected <T> void deleteFiles(Collection<T> files, Consumer<T> deletion) {
protected <F> void deleteFiles(Collection<F> files, Consumer<F> deletion) {
if (files.isEmpty()) {
return;
}

List<CompletableFuture<Void>> deletionFutures = new ArrayList<>(files.size());
for (T file : files) {
for (F file : files) {
deletionFutures.add(
CompletableFuture.runAsync(() -> deletion.accept(file), ioExecutor));
CompletableFuture.runAsync(() -> deletion.accept(file), deleteFileExecutor));
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@ public SnapshotDeletion(
IndexFileHandler indexFileHandler,
StatsFileHandler statsFileHandler,
boolean produceChangelog,
boolean cleanEmptyDirectories) {
boolean cleanEmptyDirectories,
int deleteFileThreadNum) {
super(
fileIO,
pathFactory,
manifestFile,
manifestList,
indexFileHandler,
statsFileHandler,
cleanEmptyDirectories);
cleanEmptyDirectories,
deleteFileThreadNum);
this.produceChangelog = produceChangelog;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ public TagDeletion(
ManifestList manifestList,
IndexFileHandler indexFileHandler,
StatsFileHandler statsFileHandler,
boolean cleanEmptyDirectories) {
boolean cleanEmptyDirectories,
int deleteFileThreadNum) {
super(
fileIO,
pathFactory,
manifestFile,
manifestList,
indexFileHandler,
statsFileHandler,
cleanEmptyDirectories);
cleanEmptyDirectories,
deleteFileThreadNum);
}

@Override
Expand Down

0 comments on commit ae77340

Please sign in to comment.