Skip to content

Commit

Permalink
[core] Use parallelismBatchIterable to reduce memory cost (apache#3598)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jun 25, 2024
1 parent 4cf2330 commit 6e2deb5
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ public FileStoreCommitImpl newCommit(String commitUser) {
newKeyComparator(),
options.branch(),
newStatsFileHandler(),
bucketMode());
bucketMode(),
options.scanManifestParallelism());
}

@Override
Expand Down
59 changes: 20 additions & 39 deletions paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,17 @@

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.ScanParallelExecutor;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/** Entry representing a file. */
public interface FileEntry {
Expand Down Expand Up @@ -118,12 +116,10 @@ static <T extends FileEntry> Collection<T> mergeEntries(Iterable<T> entries) {
static void mergeEntries(
ManifestFile manifestFile,
List<ManifestFileMeta> manifestFiles,
Map<Identifier, ManifestEntry> map) {
List<Supplier<List<ManifestEntry>>> manifestReadFutures =
readManifestEntries(manifestFile, manifestFiles);
for (Supplier<List<ManifestEntry>> taskResult : manifestReadFutures) {
mergeEntries(taskResult.get(), map);
}
Map<Identifier, ManifestEntry> map,
@Nullable Integer manifestReadParallelism) {
mergeEntries(
readManifestEntries(manifestFile, manifestFiles, manifestReadParallelism), map);
}

static <T extends FileEntry> void mergeEntries(Iterable<T> entries, Map<Identifier, T> map) {
Expand Down Expand Up @@ -156,33 +152,18 @@ static <T extends FileEntry> void mergeEntries(Iterable<T> entries, Map<Identifi
}
}

static List<Supplier<List<ManifestEntry>>> readManifestEntries(
ManifestFile manifestFile, List<ManifestFileMeta> manifestFiles) {
List<Supplier<List<ManifestEntry>>> result = new ArrayList<>();
for (ManifestFileMeta file : manifestFiles) {
Future<List<ManifestEntry>> future =
CompletableFuture.supplyAsync(
() -> manifestFile.read(file.fileName(), file.fileSize()),
FileUtils.COMMON_IO_FORK_JOIN_POOL);
result.add(
() -> {
try {
return future.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to read manifest file.", e);
}
});
}
return result;
}

static Future<List<ManifestEntry>> readManifestEntry(
ManifestFile manifestFile, ManifestFileMeta file) {
Future<List<ManifestEntry>> future =
CompletableFuture.supplyAsync(
() -> manifestFile.read(file.fileName(), file.fileSize()),
FileUtils.COMMON_IO_FORK_JOIN_POOL);
return future;
static Iterable<ManifestEntry> readManifestEntries(
ManifestFile manifestFile,
List<ManifestFileMeta> manifestFiles,
@Nullable Integer manifestReadParallelism) {
return ScanParallelExecutor.parallelismBatchIterable(
files ->
files.parallelStream()
.flatMap(
m -> manifestFile.read(m.fileName(), m.fileSize()).stream())
.collect(Collectors.toList()),
manifestFiles,
manifestReadParallelism);
}

static <T extends FileEntry> void assertNoDelete(Collection<T> entries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
Expand All @@ -43,7 +45,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
Expand Down Expand Up @@ -150,7 +151,8 @@ public static List<ManifestFileMeta> merge(
long suggestedMetaSize,
int suggestedMinMetaCount,
long manifestFullCompactionSize,
RowType partitionType) {
RowType partitionType,
@Nullable Integer manifestReadParallelism) {
// these are the newly created manifest files, clean them up if exception occurs
List<ManifestFileMeta> newMetas = new ArrayList<>();

Expand All @@ -162,15 +164,17 @@ public static List<ManifestFileMeta> merge(
manifestFile,
suggestedMetaSize,
manifestFullCompactionSize,
partitionType);
partitionType,
manifestReadParallelism);
return fullCompacted.orElseGet(
() ->
tryMinorCompaction(
input,
newMetas,
manifestFile,
suggestedMetaSize,
suggestedMinMetaCount));
suggestedMinMetaCount,
manifestReadParallelism));
} catch (Throwable e) {
// exception occurs, clean up and rethrow
for (ManifestFileMeta manifest : newMetas) {
Expand All @@ -185,7 +189,8 @@ private static List<ManifestFileMeta> tryMinorCompaction(
List<ManifestFileMeta> newMetas,
ManifestFile manifestFile,
long suggestedMetaSize,
int suggestedMinMetaCount) {
int suggestedMinMetaCount,
@Nullable Integer manifestReadParallelism) {
List<ManifestFileMeta> result = new ArrayList<>();
List<ManifestFileMeta> candidates = new ArrayList<>();
long totalSize = 0;
Expand All @@ -195,15 +200,16 @@ private static List<ManifestFileMeta> tryMinorCompaction(
candidates.add(manifest);
if (totalSize >= suggestedMetaSize) {
// reach suggested file size, perform merging and produce new file
mergeCandidates(candidates, manifestFile, result, newMetas);
mergeCandidates(
candidates, manifestFile, result, newMetas, manifestReadParallelism);
candidates.clear();
totalSize = 0;
}
}

// merge the last bit of manifests if there are too many
if (candidates.size() >= suggestedMinMetaCount) {
mergeCandidates(candidates, manifestFile, result, newMetas);
mergeCandidates(candidates, manifestFile, result, newMetas, manifestReadParallelism);
} else {
result.addAll(candidates);
}
Expand All @@ -214,14 +220,15 @@ private static void mergeCandidates(
List<ManifestFileMeta> candidates,
ManifestFile manifestFile,
List<ManifestFileMeta> result,
List<ManifestFileMeta> newMetas) {
List<ManifestFileMeta> newMetas,
@Nullable Integer manifestReadParallelism) {
if (candidates.size() == 1) {
result.add(candidates.get(0));
return;
}

Map<Identifier, ManifestEntry> map = new LinkedHashMap<>();
FileEntry.mergeEntries(manifestFile, candidates, map);
FileEntry.mergeEntries(manifestFile, candidates, map, manifestReadParallelism);
if (!map.isEmpty()) {
List<ManifestFileMeta> merged = manifestFile.write(new ArrayList<>(map.values()));
result.addAll(merged);
Expand All @@ -235,12 +242,13 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
ManifestFile manifestFile,
long suggestedMetaSize,
long sizeTrigger,
RowType partitionType)
RowType partitionType,
@Nullable Integer manifestReadParallelism)
throws Exception {
// 1. should trigger full compaction

List<ManifestFileMeta> base = new ArrayList<>();
int totalManifestSize = 0;
long totalManifestSize = 0;
int i = 0;
for (; i < inputs.size(); i++) {
ManifestFileMeta file = inputs.get(i);
Expand Down Expand Up @@ -277,7 +285,7 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
// 2.1. try to skip base files by partition filter

Map<Identifier, ManifestEntry> deltaMerged = new LinkedHashMap<>();
FileEntry.mergeEntries(manifestFile, delta, deltaMerged);
FileEntry.mergeEntries(manifestFile, delta, deltaMerged, manifestReadParallelism);

List<ManifestFileMeta> result = new ArrayList<>();
int j = 0;
Expand Down Expand Up @@ -354,29 +362,13 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
mergedEntries.clear();

// 2.3.2 merge base files
List<ManifestEntry> asyncManifestEntries = null;
for (; j < base.size(); j++) {
Future<List<ManifestEntry>> reader =
FileEntry.readManifestEntry(manifestFile, base.get(j));
if (asyncManifestEntries != null) {
for (ManifestEntry entry : asyncManifestEntries) {
checkArgument(entry.kind() == FileKind.ADD);
if (!deleteEntries.contains(entry.identifier())) {
writer.write(entry);
}
}
}
asyncManifestEntries = reader.get();
}

if (asyncManifestEntries != null) {
for (ManifestEntry entry : asyncManifestEntries) {
checkArgument(entry.kind() == FileKind.ADD);
if (!deleteEntries.contains(entry.identifier())) {
writer.write(entry);
}
for (ManifestEntry entry :
FileEntry.readManifestEntries(
manifestFile, base.subList(j, base.size()), manifestReadParallelism)) {
checkArgument(entry.kind() == FileKind.ADD);
if (!deleteEntries.contains(entry.identifier())) {
writer.write(entry);
}
asyncManifestEntries.clear();
}

// 2.3.3 merge deltaMerged
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private final boolean dynamicPartitionOverwrite;
@Nullable private final Comparator<InternalRow> keyComparator;
private final String branchName;
@Nullable private final Integer manifestReadParallelism;

@Nullable private Lock lock;
private boolean ignoreEmptyCommit;
Expand Down Expand Up @@ -150,7 +151,8 @@ public FileStoreCommitImpl(
@Nullable Comparator<InternalRow> keyComparator,
String branchName,
StatsFileHandler statsFileHandler,
BucketMode bucketMode) {
BucketMode bucketMode,
@Nullable Integer manifestReadParallelism) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.commitUser = commitUser;
Expand All @@ -169,6 +171,7 @@ public FileStoreCommitImpl(
this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
this.keyComparator = keyComparator;
this.branchName = branchName;
this.manifestReadParallelism = manifestReadParallelism;

this.lock = null;
this.ignoreEmptyCommit = true;
Expand Down Expand Up @@ -845,7 +848,8 @@ public boolean tryCommitOnce(
manifestTargetSize.getBytes(),
manifestMergeMinCount,
manifestFullCompactionSize.getBytes(),
partitionType));
partitionType,
manifestReadParallelism));
previousChangesListName = manifestList.write(newMetas);

// the added records subtract the deleted records from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ public class ScanParallelExecutor {
// reduce memory usage by batch iterable process, the cached result in memory will be queueSize
public static <T, U> Iterable<T> parallelismBatchIterable(
Function<List<U>, List<T>> processor, List<U> input, @Nullable Integer queueSize) {
ForkJoinPool poolCandidate = COMMON_IO_FORK_JOIN_POOL;
if (queueSize == null) {
queueSize = poolCandidate.getParallelism();
queueSize = COMMON_IO_FORK_JOIN_POOL.getParallelism();
} else if (queueSize <= 0) {
throw new NegativeArraySizeException("queue size should not be negetive");
}
Expand Down Expand Up @@ -73,7 +72,7 @@ public T next() {

private void advanceIfNeeded() {
while ((activeList == null || index >= activeList.size())
&& stack.size() > 0) {
&& !stack.isEmpty()) {
// reset index
index = 0;
try {
Expand Down
Loading

0 comments on commit 6e2deb5

Please sign in to comment.