Skip to content

Commit

Permalink
[core] Ensure enough tasks for scan.manifest.parallelism (apache#2722)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jan 17, 2024
1 parent 97d1314 commit dbc06f7
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParallellyExecuteUtils;
import org.apache.paimon.utils.ScanParallelExecutor;
import org.apache.paimon.utils.SnapshotManager;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -261,7 +261,7 @@ private Pair<Snapshot, List<ManifestEntry>> doPlan(

AtomicLong cntEntries = new AtomicLong(0);
Iterable<ManifestEntry> entries =
ParallellyExecuteUtils.parallelismBatchIterable(
ScanParallelExecutor.parallelismBatchIterable(
files -> {
List<ManifestEntry> entryList =
files.parallelStream()
Expand Down
27 changes: 22 additions & 5 deletions paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,35 @@ public class FileUtils {

public static final ForkJoinPool COMMON_IO_FORK_JOIN_POOL;

// if we want to name threads in the fork join pool we need all these
// see https://stackoverflow.com/questions/34303094/
private static volatile ForkJoinPool scanIoForkJoinPool;

static {
COMMON_IO_FORK_JOIN_POOL =
createForkJoinPool(
"file-store-common-io", Runtime.getRuntime().availableProcessors());
}

private static ForkJoinPool createForkJoinPool(String namePrefix, int parallelism) {
// if we want to name threads in the fork join pool we need all these
// see https://stackoverflow.com/questions/34303094/
ForkJoinPool.ForkJoinWorkerThreadFactory factory =
pool -> {
ForkJoinWorkerThread worker =
ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("file-store-common-io-" + worker.getPoolIndex());
worker.setName(namePrefix + "-" + worker.getPoolIndex());
return worker;
};
COMMON_IO_FORK_JOIN_POOL =
new ForkJoinPool(Runtime.getRuntime().availableProcessors(), factory, null, false);
return new ForkJoinPool(parallelism, factory, null, false);
}

public static synchronized ForkJoinPool getScanIoForkJoinPool(int parallelism) {
if (scanIoForkJoinPool == null || parallelism > scanIoForkJoinPool.getParallelism()) {
if (scanIoForkJoinPool != null) {
scanIoForkJoinPool.shutdown();
}
scanIoForkJoinPool = createForkJoinPool("paimon-scan-io", parallelism);
}
return scanIoForkJoinPool;
}

public static <T> List<T> readListFromFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,27 @@
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;

/**
* This class is a parallel execution util class, which mainly aim to process tasks parallelly with
* memory control.
*/
public class ParallellyExecuteUtils {
public class ScanParallelExecutor {

// reduce memory usage by batch iterable process, the cached result in memory will be queueSize
public static <T, U> Iterable<T> parallelismBatchIterable(
Function<List<U>, List<T>> processor, List<U> input, @Nullable Integer queueSize) {
ForkJoinPool poolCandidate = FileUtils.COMMON_IO_FORK_JOIN_POOL;
if (queueSize == null) {
queueSize = FileUtils.COMMON_IO_FORK_JOIN_POOL.getParallelism();
queueSize = poolCandidate.getParallelism();
} else if (queueSize <= 0) {
throw new NegativeArraySizeException("queue size should not be negetive");
}

final Queue<List<U>> stack = new ArrayDeque<>(Lists.partition(input, queueSize));

final int settledQueueSize = queueSize;
return () ->
new Iterator<T>() {
List<T> activeList = null;
Expand Down Expand Up @@ -76,7 +78,7 @@ private void advanceIfNeeded() {
activeList =
CompletableFuture.supplyAsync(
() -> processor.apply(stack.poll()),
FileUtils.COMMON_IO_FORK_JOIN_POOL)
getExecutePool(settledQueueSize))
.get();
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -85,4 +87,10 @@ private void advanceIfNeeded() {
}
};
}

private static ForkJoinPool getExecutePool(int queueSize) {
return queueSize > FileUtils.COMMON_IO_FORK_JOIN_POOL.getParallelism()
? FileUtils.getScanIoForkJoinPool(queueSize)
: FileUtils.COMMON_IO_FORK_JOIN_POOL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/** This test mainly test for the methods in {@link ParallellyExecuteUtils}. */
public class ParallellyExecuteUtilsTest {
/** This test mainly test for the methods in {@link ScanParallelExecutor}. */
public class ScanParallelExecutorTest {

@Test
public void testParallelismBatchIterable() {
Expand All @@ -41,7 +41,7 @@ public void testParallelismBatchIterable() {
}

Iterable<Integer> re =
ParallellyExecuteUtils.parallelismBatchIterable(
ScanParallelExecutor.parallelismBatchIterable(
l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()),
nums,
null);
Expand All @@ -62,7 +62,7 @@ public void testParallelismBatchIterable2() {
}

Iterable<Integer> re =
ParallellyExecuteUtils.parallelismBatchIterable(
ScanParallelExecutor.parallelismBatchIterable(
l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()),
nums,
null);
Expand All @@ -83,7 +83,7 @@ public void testParallelismBatchIterable3() {
}

Iterable<Integer> re =
ParallellyExecuteUtils.parallelismBatchIterable(
ScanParallelExecutor.parallelismBatchIterable(
l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()),
nums,
null);
Expand All @@ -109,7 +109,7 @@ public void testParallelismBatchIterable4() {
}

Iterable<Integer> re =
ParallellyExecuteUtils.parallelismBatchIterable(
ScanParallelExecutor.parallelismBatchIterable(
l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()),
nums,
null);
Expand All @@ -129,7 +129,7 @@ public void testParallelismBatchIterable4() {
@Test
public void testForEmptyInput() {
Iterable<Integer> re =
ParallellyExecuteUtils.parallelismBatchIterable(
ScanParallelExecutor.parallelismBatchIterable(
l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()),
(List<Integer>) Collections.EMPTY_LIST,
null);
Expand All @@ -139,7 +139,7 @@ public void testForEmptyInput() {
@Test
public void testForSingletonInput() {
Iterable<Integer> re =
ParallellyExecuteUtils.parallelismBatchIterable(
ScanParallelExecutor.parallelismBatchIterable(
l -> l.parallelStream().map(i -> i + 1).collect(Collectors.toList()),
Collections.singletonList(1),
null);
Expand All @@ -150,7 +150,7 @@ public void testForSingletonInput() {
public void testDifferentQueueSizeWithFilterElement() {
for (int queueSize = 1; queueSize < 20; queueSize++) {
Iterable<Integer> re =
ParallellyExecuteUtils.parallelismBatchIterable(
ScanParallelExecutor.parallelismBatchIterable(
l -> l.parallelStream().filter(i -> i > 5).collect(Collectors.toList()),
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
queueSize);
Expand Down

0 comments on commit dbc06f7

Please sign in to comment.