diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index aeab93eda81f..1ce613b04963 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -942,5 +942,11 @@
Integer |
The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote to the zorder sort. |
+
+ table.close-writers-thread-number |
+ 4 |
+ Integer |
+ The number of threads used to flush data and close files during checkpoint. |
+
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 1256c7ba0d87..97534dccbcf1 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1381,6 +1381,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to enable asynchronous IO writing when writing files.");
+ public static final ConfigOption TABLE_CLOSE_WRITERS_THREAD_NUMBER =
+ key("table.close-writers-thread-number")
+ .intType()
+ .defaultValue(4)
+ .withDescription(
+ "The number of threads used to flush data and close files during checkpoint.");
+
@ExcludeFromDocumentation("Only used internally to support materialized table")
public static final ConfigOption MATERIALIZED_TABLE_SNAPSHOT =
key("materialized-table.snapshot")
@@ -1829,6 +1836,10 @@ public int numSortedRunCompactionTrigger() {
return options.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER);
}
+ public int tableCloseWritersThreadNumber() {
+ return options.get(TABLE_CLOSE_WRITERS_THREAD_NUMBER);
+ }
+
@Nullable
public Duration optimizedCompactionInterval() {
return options.get(COMPACTION_OPTIMIZATION_INTERVAL);
diff --git a/paimon-common/src/main/java/org/apache/paimon/memory/AbstractMemorySegmentPool.java b/paimon-common/src/main/java/org/apache/paimon/memory/AbstractMemorySegmentPool.java
index b6214850df10..c07746c31371 100644
--- a/paimon-common/src/main/java/org/apache/paimon/memory/AbstractMemorySegmentPool.java
+++ b/paimon-common/src/main/java/org/apache/paimon/memory/AbstractMemorySegmentPool.java
@@ -37,8 +37,8 @@ public AbstractMemorySegmentPool(long maxMemory, int pageSize) {
}
@Override
- public MemorySegment nextSegment() {
- if (this.segments.size() > 0) {
+ public synchronized MemorySegment nextSegment() {
+ if (!this.segments.isEmpty()) {
return this.segments.poll();
} else if (numPage < maxPages) {
numPage++;
@@ -56,7 +56,7 @@ public int pageSize() {
}
@Override
- public void returnAll(List memory) {
+ public synchronized void returnAll(List memory) {
segments.addAll(memory);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index dab20d642cb9..e52ddf37fe6d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -48,12 +48,17 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
@@ -88,6 +93,7 @@ public abstract class AbstractFileStoreWrite implements FileStoreWrite {
protected CompactionMetrics compactionMetrics = null;
protected final String tableName;
private boolean isInsertOnly;
+ private final ExecutorService closeWritersExecutor;
protected AbstractFileStoreWrite(
SnapshotManager snapshotManager,
@@ -97,7 +103,8 @@ protected AbstractFileStoreWrite(
String tableName,
int totalBuckets,
RowType partitionType,
- int writerNumberMax) {
+ int writerNumberMax,
+ int tableCloseWritersThreadNumber) {
this.snapshotManager = snapshotManager;
this.scan = scan;
this.indexFactory = indexFactory;
@@ -107,6 +114,10 @@ protected AbstractFileStoreWrite(
this.writers = new HashMap<>();
this.tableName = tableName;
this.writerNumberMax = writerNumberMax;
+ this.closeWritersExecutor =
+ Executors.newFixedThreadPool(
+ tableCloseWritersThreadNumber,
+ new ExecutorThreadFactory("table-close-writers-thread-" + tableName));
}
@Override
@@ -190,68 +201,108 @@ public List prepareCommit(boolean waitCompaction, long commitIden
writerCleanChecker = createWriterCleanChecker();
}
- List result = new ArrayList<>();
-
- Iterator>>> partIter =
- writers.entrySet().iterator();
- while (partIter.hasNext()) {
- Map.Entry>> partEntry = partIter.next();
- BinaryRow partition = partEntry.getKey();
- Iterator>> bucketIter =
- partEntry.getValue().entrySet().iterator();
- while (bucketIter.hasNext()) {
- Map.Entry> entry = bucketIter.next();
- int bucket = entry.getKey();
- WriterContainer writerContainer = entry.getValue();
-
- CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction);
- List newIndexFiles = new ArrayList<>();
- if (writerContainer.indexMaintainer != null) {
- newIndexFiles.addAll(writerContainer.indexMaintainer.prepareCommit());
- }
- CompactDeletionFile compactDeletionFile = increment.compactDeletionFile();
- if (compactDeletionFile != null) {
- compactDeletionFile.getOrCompute().ifPresent(newIndexFiles::add);
- }
- CommitMessageImpl committable =
- new CommitMessageImpl(
- partition,
- bucket,
- increment.newFilesIncrement(),
- increment.compactIncrement(),
- new IndexIncrement(newIndexFiles));
- result.add(committable);
-
- if (committable.isEmpty()) {
- if (writerCleanChecker.apply(writerContainer)) {
- // Clear writer if no update, and if its latest modification has committed.
- //
- // We need a mechanism to clear writers, otherwise there will be more and
- // more such as yesterday's partition that no longer needs to be written.
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Closing writer for partition {}, bucket {}. "
- + "Writer's last modified identifier is {}, "
- + "while current commit identifier is {}.",
- partition,
- bucket,
- writerContainer.lastModifiedCommitIdentifier,
- commitIdentifier);
+ List> futures = new ArrayList<>();
+ Set partitionsToRemove = ConcurrentHashMap.newKeySet();
+ Map> bucketsToRemovePerPartition = new ConcurrentHashMap<>();
+
+ writers.forEach(
+ (partition, bucketMap) -> {
+ Set bucketsToRemove = new ConcurrentSkipListSet<>();
+ bucketMap.forEach(
+ (bucket, writerContainer) -> {
+ CompletableFuture future =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return closeWriterContainer(
+ partition,
+ bucket,
+ writerContainer,
+ waitCompaction,
+ writerCleanChecker,
+ commitIdentifier,
+ bucketsToRemove);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ },
+ closeWritersExecutor);
+ futures.add(future);
+ });
+ bucketsToRemovePerPartition.put(partition, bucketsToRemove);
+ });
+
+ CompletableFuture allTasksDone =
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ allTasksDone.get();
+
+ List results =
+ futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
+
+ bucketsToRemovePerPartition.forEach(
+ (partition, buckets) -> {
+ if (!buckets.isEmpty()) {
+ buckets.forEach(bucket -> writers.get(partition).remove(bucket));
+ if (writers.get(partition).isEmpty()) {
+ partitionsToRemove.add(partition);
}
- writerContainer.writer.close();
- bucketIter.remove();
}
- } else {
- writerContainer.lastModifiedCommitIdentifier = commitIdentifier;
- }
- }
+ });
- if (partEntry.getValue().isEmpty()) {
- partIter.remove();
+ partitionsToRemove.forEach(writers::remove);
+ return results;
+ }
+
+ @VisibleForTesting
+ public CommitMessage closeWriterContainer(
+ BinaryRow partition,
+ int bucket,
+ WriterContainer writerContainer,
+ boolean waitCompaction,
+ Function, Boolean> writerCleanChecker,
+ long commitIdentifier,
+ Set bucketsToRemove)
+ throws Exception {
+ CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction);
+ List newIndexFiles = new ArrayList<>();
+ if (writerContainer.indexMaintainer != null) {
+ newIndexFiles.addAll(writerContainer.indexMaintainer.prepareCommit());
+ }
+ CompactDeletionFile compactDeletionFile = increment.compactDeletionFile();
+ if (compactDeletionFile != null) {
+ compactDeletionFile.getOrCompute().ifPresent(newIndexFiles::add);
+ }
+ CommitMessageImpl committable =
+ new CommitMessageImpl(
+ partition,
+ bucket,
+ increment.newFilesIncrement(),
+ increment.compactIncrement(),
+ new IndexIncrement(newIndexFiles));
+
+ if (committable.isEmpty()) {
+ if (writerCleanChecker.apply(writerContainer)) {
+ // Clear writer if no update, and if its latest modification has committed.
+ //
+ // We need a mechanism to clear writers, otherwise there will be more and
+ // more such as yesterday's partition that no longer needs to be written.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Closing writer for partition {}, bucket {}. "
+ + "Writer's last modified identifier is {}, "
+ + "while current commit identifier is {}.",
+ partition,
+ bucket,
+ writerContainer.lastModifiedCommitIdentifier,
+ commitIdentifier);
+ }
+ writerContainer.writer.close();
+ bucketsToRemove.add(bucket);
}
+ } else {
+ writerContainer.lastModifiedCommitIdentifier = commitIdentifier;
}
-
- return result;
+ return committable;
}
// This abstract function returns a whole function (instead of just a boolean value),
@@ -291,11 +342,32 @@ Function, Boolean> createNoConflictAwareWriterCleanChecker()
@Override
public void close() throws Exception {
- for (Map> bucketWriters : writers.values()) {
- for (WriterContainer writerContainer : bucketWriters.values()) {
- writerContainer.writer.close();
+ List> futures = new ArrayList<>();
+ // Close each writer in parallel
+ for (Map> bucketWriter : writers.values()) {
+ for (WriterContainer> writerContainer : bucketWriter.values()) {
+ futures.add(
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ writerContainer.writer.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close writer: ", e);
+ }
+ },
+ closeWritersExecutor));
}
}
+ CompletableFuture allTasksDone =
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ allTasksDone.get();
+ if (closeWritersExecutor != null) {
+ closeWritersExecutor.shutdown();
+ if (!closeWritersExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
+ closeWritersExecutor.shutdownNow();
+ }
+ }
+
writers.clear();
if (lazyCompactExecutor != null && closeCompactExecutorWhenLeaving) {
lazyCompactExecutor.shutdownNow();
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index b7feeead4bbb..cd22c3b06efb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -75,7 +75,8 @@ public MemoryFileStoreWrite(
tableName,
options.bucket(),
partitionType,
- options.writeMaxWritersToSpill());
+ options.writeMaxWritersToSpill(),
+ options.tableCloseWritersThreadNumber());
this.options = options;
this.cacheManager = new CacheManager(options.lookupCacheMaxMemory());
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/AbstractFileStoreWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/AbstractFileStoreWriteTest.java
new file mode 100644
index 000000000000..7c31ea3f2019
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/AbstractFileStoreWriteTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.AbstractFileStoreWrite.WriterContainer;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.function.Function;
+
+/** Tests for {@link AbstractFileStoreWrite}. */
+public class AbstractFileStoreWriteTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testCloseWriterContainer() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ KeyValueFileStoreWrite write =
+ (KeyValueFileStoreWrite)
+ table.store()
+ .newWrite("ss")
+ .withIOManager(new IOManagerImpl(tempDir.toString()));
+ write.withExecutionMode(false);
+
+ write.write(
+ partition(0),
+ 0,
+ new KeyValue().replace(getBinaryRow(0), RowKind.INSERT, GenericRow.of(0, 0)));
+
+ Assertions.assertThat(write.writers.size()).isEqualTo(1);
+ Assertions.assertThat(write.writers.get(partition(0)).size()).isEqualTo(1);
+
+ AbstractFileStoreWrite.WriterContainer writerContainer =
+ write.writers.get(partition(0)).get(0);
+
+ Assertions.assertThat(writerContainer.indexMaintainer).isNotNull();
+ Assertions.assertThat(writerContainer.deletionVectorsMaintainer).isNotNull();
+
+ Set bucketsToRemove = new ConcurrentSkipListSet<>();
+ Function, Boolean> writerCleanChecker =
+ tmpWriterContainer -> false;
+ CommitMessageImpl commitMessage =
+ (CommitMessageImpl)
+ write.closeWriterContainer(
+ partition(0),
+ 0,
+ writerContainer,
+ false,
+ writerCleanChecker,
+ 100,
+ bucketsToRemove);
+
+ long records =
+ commitMessage.newFilesIncrement().newFiles().stream()
+ .mapToLong(DataFileMeta::rowCount)
+ .sum();
+ Assertions.assertThat(records).isEqualTo(1);
+
+ // add new records
+ write.write(
+ partition(0),
+ 0,
+ new KeyValue().replace(getBinaryRow(1), RowKind.INSERT, GenericRow.of(1, 1)));
+ write.write(
+ partition(0),
+ 0,
+ new KeyValue().replace(getBinaryRow(2), RowKind.INSERT, GenericRow.of(2, 2)));
+ commitMessage =
+ (CommitMessageImpl)
+ write.closeWriterContainer(
+ partition(0),
+ 0,
+ writerContainer,
+ false,
+ writerCleanChecker,
+ 200,
+ bucketsToRemove);
+ records =
+ commitMessage.newFilesIncrement().newFiles().stream()
+ .mapToLong(DataFileMeta::rowCount)
+ .sum();
+ Assertions.assertThat(records).isEqualTo(2);
+
+ // add one deletion
+ write.write(
+ partition(0),
+ 0,
+ new KeyValue().replace(getBinaryRow(3), RowKind.INSERT, GenericRow.of(3, 3)));
+ write.write(
+ partition(0),
+ 0,
+ new KeyValue().replace(getBinaryRow(4), RowKind.INSERT, GenericRow.of(4, 4)));
+ writerContainer.deletionVectorsMaintainer.notifyNewDeletion("file_0", 0);
+
+ commitMessage =
+ (CommitMessageImpl)
+ write.closeWriterContainer(
+ partition(0),
+ 0,
+ writerContainer,
+ false,
+ writerCleanChecker,
+ 300,
+ bucketsToRemove);
+ records =
+ commitMessage.newFilesIncrement().newFiles().stream()
+ .mapToLong(DataFileMeta::rowCount)
+ .sum();
+ Assertions.assertThat(records).isEqualTo(2);
+ }
+
+ private BinaryRow getBinaryRow(int i) {
+ BinaryRow binaryRow = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
+ writer.writeInt(0, i);
+ writer.complete();
+ return binaryRow;
+ }
+
+ private BinaryRow partition(int i) {
+ BinaryRow binaryRow = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
+ writer.writeInt(0, i);
+ writer.complete();
+ return binaryRow;
+ }
+
+ protected FileStoreTable createFileStoreTable() throws Exception {
+ Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new Path(tempDir.toString()));
+ Schema schema =
+ Schema.newBuilder()
+ .column("k", DataTypes.INT())
+ .column("v", DataTypes.INT())
+ .primaryKey("k")
+ .option("bucket", "-1")
+ .option("deletion-vectors.enabled", "true")
+ .build();
+ Identifier identifier = Identifier.create("default", "test");
+ catalog.createDatabase("default", false);
+ catalog.createTable(identifier, schema, false);
+ return (FileStoreTable) catalog.getTable(identifier);
+ }
+}