Skip to content

Commit

Permalink
support parallel close writers
Browse files Browse the repository at this point in the history
  • Loading branch information
neuyilan committed Oct 9, 2024
1 parent b2641ad commit 1b18ea8
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 68 deletions.
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,12 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The serialized refresh handler of materialized table.");

public static final ConfigOption<Integer> TABLE_CLOSE_WRITERS_THREAD_NUMBER =
key("table.close-writers-thread-number")
.intType()
.defaultValue(4)
.withDescription("The thread number for closing one table's writers");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -1817,6 +1823,10 @@ public int numSortedRunCompactionTrigger() {
return options.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER);
}

public int tableCloseWritersThreadNumber() {
return options.get(TABLE_CLOSE_WRITERS_THREAD_NUMBER);
}

@Nullable
public Duration optimizedCompactionInterval() {
return options.get(COMPACTION_OPTIMIZATION_INTERVAL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public AbstractMemorySegmentPool(long maxMemory, int pageSize) {
}

@Override
public MemorySegment nextSegment() {
if (this.segments.size() > 0) {
public synchronized MemorySegment nextSegment() {
if (!this.segments.isEmpty()) {
return this.segments.poll();
} else if (numPage < maxPages) {
numPage++;
Expand All @@ -56,7 +56,7 @@ public int pageSize() {
}

@Override
public void returnAll(List<MemorySegment> memory) {
public synchronized void returnAll(List<MemorySegment> memory) {
segments.addAll(memory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.ExecutorUtils;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;

Expand All @@ -48,12 +49,17 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
Expand Down Expand Up @@ -88,6 +94,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
protected CompactionMetrics compactionMetrics = null;
protected final String tableName;
private boolean isInsertOnly;
private final ExecutorService closeWritersExecutor;

protected AbstractFileStoreWrite(
SnapshotManager snapshotManager,
Expand All @@ -97,7 +104,8 @@ protected AbstractFileStoreWrite(
String tableName,
int totalBuckets,
RowType partitionType,
int writerNumberMax) {
int writerNumberMax,
int tableCloseWritersThreadNumber) {
this.snapshotManager = snapshotManager;
this.scan = scan;
this.indexFactory = indexFactory;
Expand All @@ -107,6 +115,10 @@ protected AbstractFileStoreWrite(
this.writers = new HashMap<>();
this.tableName = tableName;
this.writerNumberMax = writerNumberMax;
this.closeWritersExecutor =
Executors.newFixedThreadPool(
tableCloseWritersThreadNumber,
new ExecutorThreadFactory("table-close-writers-thread-" + tableName));
}

@Override
Expand Down Expand Up @@ -190,68 +202,108 @@ public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIden
writerCleanChecker = createWriterCleanChecker();
}

List<CommitMessage> result = new ArrayList<>();

Iterator<Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>>> partIter =
writers.entrySet().iterator();
while (partIter.hasNext()) {
Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partEntry = partIter.next();
BinaryRow partition = partEntry.getKey();
Iterator<Map.Entry<Integer, WriterContainer<T>>> bucketIter =
partEntry.getValue().entrySet().iterator();
while (bucketIter.hasNext()) {
Map.Entry<Integer, WriterContainer<T>> entry = bucketIter.next();
int bucket = entry.getKey();
WriterContainer<T> writerContainer = entry.getValue();

CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction);
List<IndexFileMeta> newIndexFiles = new ArrayList<>();
if (writerContainer.indexMaintainer != null) {
newIndexFiles.addAll(writerContainer.indexMaintainer.prepareCommit());
}
CompactDeletionFile compactDeletionFile = increment.compactDeletionFile();
if (compactDeletionFile != null) {
compactDeletionFile.getOrCompute().ifPresent(newIndexFiles::add);
}
CommitMessageImpl committable =
new CommitMessageImpl(
partition,
bucket,
increment.newFilesIncrement(),
increment.compactIncrement(),
new IndexIncrement(newIndexFiles));
result.add(committable);

if (committable.isEmpty()) {
if (writerCleanChecker.apply(writerContainer)) {
// Clear writer if no update, and if its latest modification has committed.
//
// We need a mechanism to clear writers, otherwise there will be more and
// more such as yesterday's partition that no longer needs to be written.
if (LOG.isDebugEnabled()) {
LOG.debug(
"Closing writer for partition {}, bucket {}. "
+ "Writer's last modified identifier is {}, "
+ "while current commit identifier is {}.",
partition,
bucket,
writerContainer.lastModifiedCommitIdentifier,
commitIdentifier);
List<CompletableFuture<CommitMessage>> futures = new ArrayList<>();
Set<BinaryRow> partitionsToRemove = new HashSet<>();
Map<BinaryRow, Set<Integer>> bucketsToRemovePerPartition = new HashMap<>();

writers.forEach(
(partition, bucketMap) -> {
Set<Integer> bucketsToRemove = new ConcurrentSkipListSet<>();
bucketMap.forEach(
(bucket, writerContainer) -> {
CompletableFuture<CommitMessage> future =
CompletableFuture.supplyAsync(
() -> {
try {
return closeWriterContainer(
partition,
bucket,
writerContainer,
waitCompaction,
writerCleanChecker,
commitIdentifier,
bucketsToRemove);
} catch (Exception e) {
throw new RuntimeException(e);
}
},
closeWritersExecutor);
futures.add(future);
});
bucketsToRemovePerPartition.put(partition, bucketsToRemove);
});

CompletableFuture<Void> allTasksDone =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allTasksDone.get();

List<CommitMessage> results =
futures.stream().map(CompletableFuture::join).collect(Collectors.toList());

bucketsToRemovePerPartition.forEach(
(partition, buckets) -> {
if (!buckets.isEmpty()) {
buckets.forEach(bucket -> writers.get(partition).remove(bucket));
if (writers.get(partition).isEmpty()) {
partitionsToRemove.add(partition);
}
writerContainer.writer.close();
bucketIter.remove();
}
} else {
writerContainer.lastModifiedCommitIdentifier = commitIdentifier;
}
}
});

if (partEntry.getValue().isEmpty()) {
partIter.remove();
partitionsToRemove.forEach(writers::remove);
return results;
}

@VisibleForTesting
public CommitMessage closeWriterContainer(
BinaryRow partition,
int bucket,
WriterContainer<T> writerContainer,
boolean waitCompaction,
Function<WriterContainer<T>, Boolean> writerCleanChecker,
long commitIdentifier,
Set<Integer> bucketsToRemove)
throws Exception {
CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction);
List<IndexFileMeta> newIndexFiles = new ArrayList<>();
if (writerContainer.indexMaintainer != null) {
newIndexFiles.addAll(writerContainer.indexMaintainer.prepareCommit());
}
CompactDeletionFile compactDeletionFile = increment.compactDeletionFile();
if (compactDeletionFile != null) {
compactDeletionFile.getOrCompute().ifPresent(newIndexFiles::add);
}
CommitMessageImpl committable =
new CommitMessageImpl(
partition,
bucket,
increment.newFilesIncrement(),
increment.compactIncrement(),
new IndexIncrement(newIndexFiles));

if (committable.isEmpty()) {
if (writerCleanChecker.apply(writerContainer)) {
// Clear writer if no update, and if its latest modification has committed.
//
// We need a mechanism to clear writers, otherwise there will be more and
// more such as yesterday's partition that no longer needs to be written.
if (LOG.isDebugEnabled()) {
LOG.debug(
"Closing writer for partition {}, bucket {}. "
+ "Writer's last modified identifier is {}, "
+ "while current commit identifier is {}.",
partition,
bucket,
writerContainer.lastModifiedCommitIdentifier,
commitIdentifier);
}
writerContainer.writer.close();
bucketsToRemove.add(bucket);
}
} else {
writerContainer.lastModifiedCommitIdentifier = commitIdentifier;
}

return result;
return committable;
}

// This abstract function returns a whole function (instead of just a boolean value),
Expand Down Expand Up @@ -291,11 +343,33 @@ Function<WriterContainer<T>, Boolean> createNoConflictAwareWriterCleanChecker()

@Override
public void close() throws Exception {
for (Map<Integer, WriterContainer<T>> bucketWriters : writers.values()) {
for (WriterContainer<T> writerContainer : bucketWriters.values()) {
writerContainer.writer.close();
}
}
List<CompletableFuture<Void>> futures =
writers.values().stream()
.flatMap(
bucketWriters ->
bucketWriters.values().stream()
.map(
writerContainer ->
CompletableFuture.runAsync(
() -> {
try {
writerContainer
.writer
.close();
} catch (Exception e) {
LOG.error(
"Failed to close writer: ",
e);
}
},
closeWritersExecutor)))
.collect(Collectors.toList());

CompletableFuture<Void> allTasksDone =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allTasksDone.get();
ExecutorUtils.gracefulShutdown(1, TimeUnit.MINUTES, closeWritersExecutor);

writers.clear();
if (lazyCompactExecutor != null && closeCompactExecutorWhenLeaving) {
lazyCompactExecutor.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public MemoryFileStoreWrite(
tableName,
options.bucket(),
partitionType,
options.writeMaxWritersToSpill());
options.writeMaxWritersToSpill(),
options.tableCloseWritersThreadNumber());
this.options = options;
this.cacheManager = new CacheManager(options.lookupCacheMaxMemory());
}
Expand Down
Loading

0 comments on commit 1b18ea8

Please sign in to comment.