From 1b18ea84cdae9a2ed08d9bd2a088595ba67dba72 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Wed, 9 Oct 2024 19:09:10 +0800 Subject: [PATCH 1/7] support parallel close writers --- .../java/org/apache/paimon/CoreOptions.java | 10 + .../memory/AbstractMemorySegmentPool.java | 6 +- .../operation/AbstractFileStoreWrite.java | 202 ++++++++++++------ .../operation/MemoryFileStoreWrite.java | 3 +- .../operation/AbstractFileStoreWriteTest.java | 180 ++++++++++++++++ 5 files changed, 333 insertions(+), 68 deletions(-) create mode 100644 paimon-core/src/test/java/org/apache/paimon/operation/AbstractFileStoreWriteTest.java diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 81422ba681e8..0e7a99e608ba 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1442,6 +1442,12 @@ public class CoreOptions implements Serializable { .noDefaultValue() .withDescription("The serialized refresh handler of materialized table."); + public static final ConfigOption TABLE_CLOSE_WRITERS_THREAD_NUMBER = + key("table.close-writers-thread-number") + .intType() + .defaultValue(4) + .withDescription("The thread number for closing one table's writers"); + private final Options options; public CoreOptions(Map options) { @@ -1817,6 +1823,10 @@ public int numSortedRunCompactionTrigger() { return options.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER); } + public int tableCloseWritersThreadNumber() { + return options.get(TABLE_CLOSE_WRITERS_THREAD_NUMBER); + } + @Nullable public Duration optimizedCompactionInterval() { return options.get(COMPACTION_OPTIMIZATION_INTERVAL); diff --git a/paimon-common/src/main/java/org/apache/paimon/memory/AbstractMemorySegmentPool.java b/paimon-common/src/main/java/org/apache/paimon/memory/AbstractMemorySegmentPool.java index b6214850df10..c07746c31371 100644 --- a/paimon-common/src/main/java/org/apache/paimon/memory/AbstractMemorySegmentPool.java +++ b/paimon-common/src/main/java/org/apache/paimon/memory/AbstractMemorySegmentPool.java @@ -37,8 +37,8 @@ public AbstractMemorySegmentPool(long maxMemory, int pageSize) { } @Override - public MemorySegment nextSegment() { - if (this.segments.size() > 0) { + public synchronized MemorySegment nextSegment() { + if (!this.segments.isEmpty()) { return this.segments.poll(); } else if (numPage < maxPages) { numPage++; @@ -56,7 +56,7 @@ public int pageSize() { } @Override - public void returnAll(List memory) { + public synchronized void returnAll(List memory) { segments.addAll(memory); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index dab20d642cb9..f5526b4aaed8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -37,6 +37,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.ExecutorThreadFactory; +import org.apache.paimon.utils.ExecutorUtils; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; @@ -48,12 +49,17 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME; import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber; @@ -88,6 +94,7 @@ public abstract class AbstractFileStoreWrite implements FileStoreWrite { protected CompactionMetrics compactionMetrics = null; protected final String tableName; private boolean isInsertOnly; + private final ExecutorService closeWritersExecutor; protected AbstractFileStoreWrite( SnapshotManager snapshotManager, @@ -97,7 +104,8 @@ protected AbstractFileStoreWrite( String tableName, int totalBuckets, RowType partitionType, - int writerNumberMax) { + int writerNumberMax, + int tableCloseWritersThreadNumber) { this.snapshotManager = snapshotManager; this.scan = scan; this.indexFactory = indexFactory; @@ -107,6 +115,10 @@ protected AbstractFileStoreWrite( this.writers = new HashMap<>(); this.tableName = tableName; this.writerNumberMax = writerNumberMax; + this.closeWritersExecutor = + Executors.newFixedThreadPool( + tableCloseWritersThreadNumber, + new ExecutorThreadFactory("table-close-writers-thread-" + tableName)); } @Override @@ -190,68 +202,108 @@ public List prepareCommit(boolean waitCompaction, long commitIden writerCleanChecker = createWriterCleanChecker(); } - List result = new ArrayList<>(); - - Iterator>>> partIter = - writers.entrySet().iterator(); - while (partIter.hasNext()) { - Map.Entry>> partEntry = partIter.next(); - BinaryRow partition = partEntry.getKey(); - Iterator>> bucketIter = - partEntry.getValue().entrySet().iterator(); - while (bucketIter.hasNext()) { - Map.Entry> entry = bucketIter.next(); - int bucket = entry.getKey(); - WriterContainer writerContainer = entry.getValue(); - - CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction); - List newIndexFiles = new ArrayList<>(); - if (writerContainer.indexMaintainer != null) { - newIndexFiles.addAll(writerContainer.indexMaintainer.prepareCommit()); - } - CompactDeletionFile compactDeletionFile = increment.compactDeletionFile(); - if (compactDeletionFile != null) { - compactDeletionFile.getOrCompute().ifPresent(newIndexFiles::add); - } - CommitMessageImpl committable = - new CommitMessageImpl( - partition, - bucket, - increment.newFilesIncrement(), - increment.compactIncrement(), - new IndexIncrement(newIndexFiles)); - result.add(committable); - - if (committable.isEmpty()) { - if (writerCleanChecker.apply(writerContainer)) { - // Clear writer if no update, and if its latest modification has committed. - // - // We need a mechanism to clear writers, otherwise there will be more and - // more such as yesterday's partition that no longer needs to be written. - if (LOG.isDebugEnabled()) { - LOG.debug( - "Closing writer for partition {}, bucket {}. " - + "Writer's last modified identifier is {}, " - + "while current commit identifier is {}.", - partition, - bucket, - writerContainer.lastModifiedCommitIdentifier, - commitIdentifier); + List> futures = new ArrayList<>(); + Set partitionsToRemove = new HashSet<>(); + Map> bucketsToRemovePerPartition = new HashMap<>(); + + writers.forEach( + (partition, bucketMap) -> { + Set bucketsToRemove = new ConcurrentSkipListSet<>(); + bucketMap.forEach( + (bucket, writerContainer) -> { + CompletableFuture future = + CompletableFuture.supplyAsync( + () -> { + try { + return closeWriterContainer( + partition, + bucket, + writerContainer, + waitCompaction, + writerCleanChecker, + commitIdentifier, + bucketsToRemove); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + closeWritersExecutor); + futures.add(future); + }); + bucketsToRemovePerPartition.put(partition, bucketsToRemove); + }); + + CompletableFuture allTasksDone = + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + allTasksDone.get(); + + List results = + futures.stream().map(CompletableFuture::join).collect(Collectors.toList()); + + bucketsToRemovePerPartition.forEach( + (partition, buckets) -> { + if (!buckets.isEmpty()) { + buckets.forEach(bucket -> writers.get(partition).remove(bucket)); + if (writers.get(partition).isEmpty()) { + partitionsToRemove.add(partition); } - writerContainer.writer.close(); - bucketIter.remove(); } - } else { - writerContainer.lastModifiedCommitIdentifier = commitIdentifier; - } - } + }); - if (partEntry.getValue().isEmpty()) { - partIter.remove(); + partitionsToRemove.forEach(writers::remove); + return results; + } + + @VisibleForTesting + public CommitMessage closeWriterContainer( + BinaryRow partition, + int bucket, + WriterContainer writerContainer, + boolean waitCompaction, + Function, Boolean> writerCleanChecker, + long commitIdentifier, + Set bucketsToRemove) + throws Exception { + CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction); + List newIndexFiles = new ArrayList<>(); + if (writerContainer.indexMaintainer != null) { + newIndexFiles.addAll(writerContainer.indexMaintainer.prepareCommit()); + } + CompactDeletionFile compactDeletionFile = increment.compactDeletionFile(); + if (compactDeletionFile != null) { + compactDeletionFile.getOrCompute().ifPresent(newIndexFiles::add); + } + CommitMessageImpl committable = + new CommitMessageImpl( + partition, + bucket, + increment.newFilesIncrement(), + increment.compactIncrement(), + new IndexIncrement(newIndexFiles)); + + if (committable.isEmpty()) { + if (writerCleanChecker.apply(writerContainer)) { + // Clear writer if no update, and if its latest modification has committed. + // + // We need a mechanism to clear writers, otherwise there will be more and + // more such as yesterday's partition that no longer needs to be written. + if (LOG.isDebugEnabled()) { + LOG.debug( + "Closing writer for partition {}, bucket {}. " + + "Writer's last modified identifier is {}, " + + "while current commit identifier is {}.", + partition, + bucket, + writerContainer.lastModifiedCommitIdentifier, + commitIdentifier); + } + writerContainer.writer.close(); + bucketsToRemove.add(bucket); } + } else { + writerContainer.lastModifiedCommitIdentifier = commitIdentifier; } - - return result; + return committable; } // This abstract function returns a whole function (instead of just a boolean value), @@ -291,11 +343,33 @@ Function, Boolean> createNoConflictAwareWriterCleanChecker() @Override public void close() throws Exception { - for (Map> bucketWriters : writers.values()) { - for (WriterContainer writerContainer : bucketWriters.values()) { - writerContainer.writer.close(); - } - } + List> futures = + writers.values().stream() + .flatMap( + bucketWriters -> + bucketWriters.values().stream() + .map( + writerContainer -> + CompletableFuture.runAsync( + () -> { + try { + writerContainer + .writer + .close(); + } catch (Exception e) { + LOG.error( + "Failed to close writer: ", + e); + } + }, + closeWritersExecutor))) + .collect(Collectors.toList()); + + CompletableFuture allTasksDone = + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + allTasksDone.get(); + ExecutorUtils.gracefulShutdown(1, TimeUnit.MINUTES, closeWritersExecutor); + writers.clear(); if (lazyCompactExecutor != null && closeCompactExecutorWhenLeaving) { lazyCompactExecutor.shutdownNow(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java index b7feeead4bbb..cd22c3b06efb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java @@ -75,7 +75,8 @@ public MemoryFileStoreWrite( tableName, options.bucket(), partitionType, - options.writeMaxWritersToSpill()); + options.writeMaxWritersToSpill(), + options.tableCloseWritersThreadNumber()); this.options = options; this.cacheManager = new CacheManager(options.lookupCacheMaxMemory()); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/AbstractFileStoreWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/AbstractFileStoreWriteTest.java new file mode 100644 index 000000000000..7c31ea3f2019 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/AbstractFileStoreWriteTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.KeyValue; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.FileSystemCatalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.disk.IOManagerImpl; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.operation.AbstractFileStoreWrite.WriterContainer; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowKind; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.function.Function; + +/** Tests for {@link AbstractFileStoreWrite}. */ +public class AbstractFileStoreWriteTest { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testCloseWriterContainer() throws Exception { + FileStoreTable table = createFileStoreTable(); + KeyValueFileStoreWrite write = + (KeyValueFileStoreWrite) + table.store() + .newWrite("ss") + .withIOManager(new IOManagerImpl(tempDir.toString())); + write.withExecutionMode(false); + + write.write( + partition(0), + 0, + new KeyValue().replace(getBinaryRow(0), RowKind.INSERT, GenericRow.of(0, 0))); + + Assertions.assertThat(write.writers.size()).isEqualTo(1); + Assertions.assertThat(write.writers.get(partition(0)).size()).isEqualTo(1); + + AbstractFileStoreWrite.WriterContainer writerContainer = + write.writers.get(partition(0)).get(0); + + Assertions.assertThat(writerContainer.indexMaintainer).isNotNull(); + Assertions.assertThat(writerContainer.deletionVectorsMaintainer).isNotNull(); + + Set bucketsToRemove = new ConcurrentSkipListSet<>(); + Function, Boolean> writerCleanChecker = + tmpWriterContainer -> false; + CommitMessageImpl commitMessage = + (CommitMessageImpl) + write.closeWriterContainer( + partition(0), + 0, + writerContainer, + false, + writerCleanChecker, + 100, + bucketsToRemove); + + long records = + commitMessage.newFilesIncrement().newFiles().stream() + .mapToLong(DataFileMeta::rowCount) + .sum(); + Assertions.assertThat(records).isEqualTo(1); + + // add new records + write.write( + partition(0), + 0, + new KeyValue().replace(getBinaryRow(1), RowKind.INSERT, GenericRow.of(1, 1))); + write.write( + partition(0), + 0, + new KeyValue().replace(getBinaryRow(2), RowKind.INSERT, GenericRow.of(2, 2))); + commitMessage = + (CommitMessageImpl) + write.closeWriterContainer( + partition(0), + 0, + writerContainer, + false, + writerCleanChecker, + 200, + bucketsToRemove); + records = + commitMessage.newFilesIncrement().newFiles().stream() + .mapToLong(DataFileMeta::rowCount) + .sum(); + Assertions.assertThat(records).isEqualTo(2); + + // add one deletion + write.write( + partition(0), + 0, + new KeyValue().replace(getBinaryRow(3), RowKind.INSERT, GenericRow.of(3, 3))); + write.write( + partition(0), + 0, + new KeyValue().replace(getBinaryRow(4), RowKind.INSERT, GenericRow.of(4, 4))); + writerContainer.deletionVectorsMaintainer.notifyNewDeletion("file_0", 0); + + commitMessage = + (CommitMessageImpl) + write.closeWriterContainer( + partition(0), + 0, + writerContainer, + false, + writerCleanChecker, + 300, + bucketsToRemove); + records = + commitMessage.newFilesIncrement().newFiles().stream() + .mapToLong(DataFileMeta::rowCount) + .sum(); + Assertions.assertThat(records).isEqualTo(2); + } + + private BinaryRow getBinaryRow(int i) { + BinaryRow binaryRow = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(binaryRow); + writer.writeInt(0, i); + writer.complete(); + return binaryRow; + } + + private BinaryRow partition(int i) { + BinaryRow binaryRow = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(binaryRow); + writer.writeInt(0, i); + writer.complete(); + return binaryRow; + } + + protected FileStoreTable createFileStoreTable() throws Exception { + Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new Path(tempDir.toString())); + Schema schema = + Schema.newBuilder() + .column("k", DataTypes.INT()) + .column("v", DataTypes.INT()) + .primaryKey("k") + .option("bucket", "-1") + .option("deletion-vectors.enabled", "true") + .build(); + Identifier identifier = Identifier.create("default", "test"); + catalog.createDatabase("default", false); + catalog.createTable(identifier, schema, false); + return (FileStoreTable) catalog.getTable(identifier); + } +} From 9239267abb8a1574ef4a3e1887f7953bfc460b10 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Wed, 16 Oct 2024 16:48:41 +0800 Subject: [PATCH 2/7] add doc description --- .../shortcodes/generated/core_configuration.html | 6 ++++++ .../main/java/org/apache/paimon/CoreOptions.java | 13 +++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 4375347785bc..d3471459a536 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -942,5 +942,11 @@ Integer The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote to the zorder sort. + +
table.close-writers-thread-number
+ 4 + Integer + The number of threads used to flush data and close files during checkpoint. + diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 0e7a99e608ba..9bc185bf7aac 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1373,6 +1373,13 @@ public class CoreOptions implements Serializable { .withDescription( "Whether to enable asynchronous IO writing when writing files."); + public static final ConfigOption TABLE_CLOSE_WRITERS_THREAD_NUMBER = + key("table.close-writers-thread-number") + .intType() + .defaultValue(4) + .withDescription( + "The number of threads used to flush data and close files during checkpoint."); + @ExcludeFromDocumentation("Only used internally to support materialized table") public static final ConfigOption MATERIALIZED_TABLE_SNAPSHOT = key("materialized-table.snapshot") @@ -1442,12 +1449,6 @@ public class CoreOptions implements Serializable { .noDefaultValue() .withDescription("The serialized refresh handler of materialized table."); - public static final ConfigOption TABLE_CLOSE_WRITERS_THREAD_NUMBER = - key("table.close-writers-thread-number") - .intType() - .defaultValue(4) - .withDescription("The thread number for closing one table's writers"); - private final Options options; public CoreOptions(Map options) { From 899926a6c3a81df1f81e06a41599c773f4d4cfb1 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Fri, 18 Oct 2024 15:14:34 +0800 Subject: [PATCH 3/7] shutdown the closeWritersExecutor --- .../org/apache/paimon/operation/AbstractFileStoreWrite.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index f5526b4aaed8..b90509b1e902 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -368,7 +368,9 @@ public void close() throws Exception { CompletableFuture allTasksDone = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allTasksDone.get(); - ExecutorUtils.gracefulShutdown(1, TimeUnit.MINUTES, closeWritersExecutor); + if (closeWritersExecutor != null) { + closeWritersExecutor.shutdownNow(); + } writers.clear(); if (lazyCompactExecutor != null && closeCompactExecutorWhenLeaving) { From 3d3459bc2f20ace65a426f8c646ca38f40819f33 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Fri, 18 Oct 2024 15:31:07 +0800 Subject: [PATCH 4/7] remove useless codes --- .../org/apache/paimon/operation/AbstractFileStoreWrite.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index b90509b1e902..a5c43d6a1ae0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -37,7 +37,6 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.ExecutorThreadFactory; -import org.apache.paimon.utils.ExecutorUtils; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; @@ -57,7 +56,6 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; From a65d79522fc6459de0ee2fec944256fe6afc2a2c Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Fri, 18 Oct 2024 17:17:26 +0800 Subject: [PATCH 5/7] refine the code --- .../operation/AbstractFileStoreWrite.java | 50 +++++++++---------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index a5c43d6a1ae0..e52ddf37fe6d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -48,14 +48,15 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; @@ -201,8 +202,8 @@ public List prepareCommit(boolean waitCompaction, long commitIden } List> futures = new ArrayList<>(); - Set partitionsToRemove = new HashSet<>(); - Map> bucketsToRemovePerPartition = new HashMap<>(); + Set partitionsToRemove = ConcurrentHashMap.newKeySet(); + Map> bucketsToRemovePerPartition = new ConcurrentHashMap<>(); writers.forEach( (partition, bucketMap) -> { @@ -341,33 +342,30 @@ Function, Boolean> createNoConflictAwareWriterCleanChecker() @Override public void close() throws Exception { - List> futures = - writers.values().stream() - .flatMap( - bucketWriters -> - bucketWriters.values().stream() - .map( - writerContainer -> - CompletableFuture.runAsync( - () -> { - try { - writerContainer - .writer - .close(); - } catch (Exception e) { - LOG.error( - "Failed to close writer: ", - e); - } - }, - closeWritersExecutor))) - .collect(Collectors.toList()); - + List> futures = new ArrayList<>(); + // Close each writer in parallel + for (Map> bucketWriter : writers.values()) { + for (WriterContainer writerContainer : bucketWriter.values()) { + futures.add( + CompletableFuture.runAsync( + () -> { + try { + writerContainer.writer.close(); + } catch (Exception e) { + LOG.error("Failed to close writer: ", e); + } + }, + closeWritersExecutor)); + } + } CompletableFuture allTasksDone = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allTasksDone.get(); if (closeWritersExecutor != null) { - closeWritersExecutor.shutdownNow(); + closeWritersExecutor.shutdown(); + if (!closeWritersExecutor.awaitTermination(1, TimeUnit.MINUTES)) { + closeWritersExecutor.shutdownNow(); + } } writers.clear(); From 3a932eae0b27253309ee2d72c8cc41894cb82775 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Fri, 18 Oct 2024 19:56:39 +0800 Subject: [PATCH 6/7] refine the code --- paimon-common/src/main/java/org/apache/paimon/CoreOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 97534dccbcf1..6d95e146b19a 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1384,7 +1384,7 @@ public class CoreOptions implements Serializable { public static final ConfigOption TABLE_CLOSE_WRITERS_THREAD_NUMBER = key("table.close-writers-thread-number") .intType() - .defaultValue(4) + .defaultValue(1) .withDescription( "The number of threads used to flush data and close files during checkpoint."); From 8a0b507994e635ea7a9b4854e0f83967c38bc01f Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Mon, 21 Oct 2024 09:13:40 +0800 Subject: [PATCH 7/7] change the default thread number --- paimon-common/src/main/java/org/apache/paimon/CoreOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 6d95e146b19a..97534dccbcf1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1384,7 +1384,7 @@ public class CoreOptions implements Serializable { public static final ConfigOption TABLE_CLOSE_WRITERS_THREAD_NUMBER = key("table.close-writers-thread-number") .intType() - .defaultValue(1) + .defaultValue(4) .withDescription( "The number of threads used to flush data and close files during checkpoint.");