diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index bc6f8b5c4efc..1f5fb0dc1e82 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -26,6 +26,12 @@
+
+ async-file-write |
+ true |
+ Boolean |
+ Whether to enable asynchronous IO writing when writing files. |
+
auto-create |
false |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 156915e2365d..93ada725fddb 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1283,6 +1283,13 @@ public class CoreOptions implements Serializable {
"When a batch job queries from a table, if a partition does not exist in the current branch, "
+ "the reader will try to get this partition from this fallback branch.");
+ public static final ConfigOption ASYNC_FILE_WRITE =
+ key("async-file-write")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to enable asynchronous IO writing when writing files.");
+
private final Options options;
public CoreOptions(Map options) {
@@ -2020,6 +2027,10 @@ public boolean prepareCommitWaitCompaction() {
return options.get(LOOKUP_WAIT);
}
+ public boolean asyncFileWrite() {
+ return options.get(ASYNC_FILE_WRITE);
+ }
+
public boolean metadataIcebergCompatible() {
return options.get(METADATA_ICEBERG_COMPATIBLE);
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/AsyncPositionOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/fs/AsyncPositionOutputStream.java
new file mode 100644
index 000000000000..0cc043b22c9a
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/AsyncPositionOutputStream.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.utils.FixLenByteArrayOutputStream;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.paimon.utils.ThreadUtils.newDaemonThreadFactory;
+
+/** A {@link PositionOutputStream} which uses a async thread to write data. */
+public class AsyncPositionOutputStream extends PositionOutputStream {
+
+ public static final ExecutorService EXECUTOR_SERVICE =
+ Executors.newCachedThreadPool(newDaemonThreadFactory("AsyncOutputStream"));
+
+ public static final int AWAIT_TIMEOUT_SECONDS = 10;
+ public static final int BUFFER_SIZE = 1024 * 32;
+
+ private final PositionOutputStream out;
+ private final FixLenByteArrayOutputStream buffer;
+ private final LinkedBlockingQueue bufferQueue;
+ private final LinkedBlockingQueue eventQueue;
+ private final AtomicReference exception;
+ private final Future> future;
+
+ private long position;
+
+ public AsyncPositionOutputStream(PositionOutputStream out) {
+ this.out = out;
+ this.bufferQueue = new LinkedBlockingQueue<>();
+ this.eventQueue = new LinkedBlockingQueue<>();
+ this.exception = new AtomicReference<>();
+ this.position = 0;
+ this.future = EXECUTOR_SERVICE.submit(this::execute);
+ this.buffer = new FixLenByteArrayOutputStream();
+ this.buffer.setBuffer(new byte[BUFFER_SIZE]);
+ }
+
+ @VisibleForTesting
+ LinkedBlockingQueue getBufferQueue() {
+ return bufferQueue;
+ }
+
+ private void execute() {
+ try {
+ doWork();
+ } catch (Throwable e) {
+ exception.set(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void doWork() throws InterruptedException, IOException {
+ try {
+ while (true) {
+ AsyncEvent event = eventQueue.poll(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ if (event == null) {
+ continue;
+ }
+ if (event instanceof EndEvent) {
+ return;
+ }
+ if (event instanceof DataEvent) {
+ DataEvent dataEvent = (DataEvent) event;
+ out.write(dataEvent.data, 0, dataEvent.length);
+ bufferQueue.add(dataEvent.data);
+ }
+ if (event instanceof FlushEvent) {
+ out.flush();
+ ((FlushEvent) event).latch.countDown();
+ }
+ }
+ } finally {
+ out.close();
+ }
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ checkException();
+ return position;
+ }
+
+ private void flushBuffer() {
+ if (buffer.getCount() == 0) {
+ return;
+ }
+ putEvent(new DataEvent(buffer.getBuffer(), buffer.getCount()));
+ byte[] byteArray = bufferQueue.poll();
+ if (byteArray == null) {
+ byteArray = new byte[BUFFER_SIZE];
+ }
+ buffer.setBuffer(byteArray);
+ buffer.setCount(0);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ checkException();
+ position++;
+ while (buffer.write((byte) b) != 1) {
+ flushBuffer();
+ }
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ checkException();
+ position += len;
+ while (true) {
+ int written = buffer.write(b, off, len);
+ off += written;
+ len -= written;
+ if (len == 0) {
+ return;
+ }
+ flushBuffer();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ checkException();
+ flushBuffer();
+ FlushEvent event = new FlushEvent();
+ putEvent(event);
+ while (true) {
+ try {
+ boolean await = event.latch.await(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ if (await) {
+ return;
+ }
+ checkException();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ checkException();
+ flushBuffer();
+ putEvent(new EndEvent());
+ try {
+ this.future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void putEvent(AsyncEvent event) {
+ try {
+ eventQueue.put(event);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void checkException() throws IOException {
+ Throwable throwable = exception.get();
+ if (throwable != null) {
+ if (throwable instanceof IOException) {
+ throw (IOException) throwable;
+ }
+ if (throwable instanceof RuntimeException) {
+ throw (RuntimeException) throwable;
+ }
+ throw new IOException(throwable);
+ }
+ }
+
+ private interface AsyncEvent {}
+
+ private static class DataEvent implements AsyncEvent {
+
+ private final byte[] data;
+ private final int length;
+
+ public DataEvent(byte[] data, int length) {
+ this.data = data;
+ this.length = length;
+ }
+ }
+
+ private static class FlushEvent implements AsyncEvent {
+ private final CountDownLatch latch = new CountDownLatch(1);
+ }
+
+ private static class EndEvent implements AsyncEvent {}
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/FixLenByteArrayOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/utils/FixLenByteArrayOutputStream.java
new file mode 100644
index 000000000000..8926a52ce65e
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/FixLenByteArrayOutputStream.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.ByteArrayOutputStream;
+
+/** A {@link ByteArrayOutputStream} which can reuse byte array. */
+public class FixLenByteArrayOutputStream {
+
+ private byte[] buf;
+ private int count;
+
+ public void setBuffer(byte[] buffer) {
+ this.buf = buffer;
+ }
+
+ public byte[] getBuffer() {
+ return buf;
+ }
+
+ public int write(byte[] b, int off, int len) {
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) {
+ throw new IndexOutOfBoundsException();
+ }
+ int writeLen = Math.min(len, buf.length - count);
+ System.arraycopy(b, off, buf, count, writeLen);
+ count += writeLen;
+ return writeLen;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public int write(byte b) {
+ if (count < buf.length) {
+ buf[count] = b;
+ count += 1;
+ return 1;
+ }
+ return 0;
+ }
+
+ public void setCount(int count) {
+ this.count = count;
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ReuseByteArrayOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/utils/ReuseByteArrayOutputStream.java
new file mode 100644
index 000000000000..cca955656c53
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ReuseByteArrayOutputStream.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.ByteArrayOutputStream;
+
+/** A {@link ByteArrayOutputStream} which can reuse byte array. */
+public class ReuseByteArrayOutputStream extends ByteArrayOutputStream {
+
+ public ReuseByteArrayOutputStream(int size) {
+ super(size);
+ }
+
+ public void setBuffer(byte[] buffer) {
+ this.buf = buffer;
+ }
+
+ public byte[] getBuffer() {
+ return buf;
+ }
+}
diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/AsyncPositionOutputStreamTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/AsyncPositionOutputStreamTest.java
new file mode 100644
index 000000000000..115a9b3cdc50
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/fs/AsyncPositionOutputStreamTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class AsyncPositionOutputStreamTest {
+
+ @Test
+ public void testHugeWriteByteArray() throws IOException {
+ ByteArrayPositionOutputStream result = new ByteArrayPositionOutputStream();
+ AsyncPositionOutputStream out = new AsyncPositionOutputStream(result);
+ int len = 10 * 1024;
+ ByteArrayOutputStream expected = new ByteArrayOutputStream(len);
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ for (int i = 0; i < len; i++) {
+ byte[] bytes = new byte[rnd.nextInt(20)];
+ rnd.nextBytes(bytes);
+ expected.write(bytes);
+ out.write(bytes);
+ }
+ out.close();
+
+ assertThat(result.out.toByteArray()).isEqualTo(expected.toByteArray());
+ }
+
+ @Test
+ public void testHugeWriteByte() throws IOException {
+ ByteArrayPositionOutputStream result = new ByteArrayPositionOutputStream();
+ AsyncPositionOutputStream out = new AsyncPositionOutputStream(result);
+ int len = 32 * 1024 + 20;
+ ByteArrayOutputStream expected = new ByteArrayOutputStream(len);
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ for (int i = 0; i < len; i++) {
+ int b = rnd.nextInt();
+ expected.write(b);
+ out.write(b);
+ }
+ out.close();
+
+ assertThat(result.out.toByteArray()).isEqualTo(expected.toByteArray());
+ }
+
+ @Test
+ public void testNormal() throws IOException {
+ ByteArrayPositionOutputStream byteOut = new ByteArrayPositionOutputStream();
+ AsyncPositionOutputStream out = new AsyncPositionOutputStream(byteOut);
+ out.write(1);
+ out.write(new byte[] {2, 3});
+ out.write(new byte[] {3, 4, 5}, 1, 1);
+ out.close();
+ assertThat(byteOut.out.toByteArray()).isEqualTo(new byte[] {1, 2, 3, 4});
+ }
+
+ @Test
+ public void testGetPos() throws IOException {
+ AsyncPositionOutputStream out =
+ new AsyncPositionOutputStream(new ByteArrayPositionOutputStream());
+ out.write(new byte[] {1, 2, 3});
+ assertThat(out.getPos()).isEqualTo(3);
+ out.write(new byte[] {5, 6, 7});
+ assertThat(out.getPos()).isEqualTo(6);
+ }
+
+ @Test
+ public void testFlush() throws IOException {
+ ByteArrayPositionOutputStream byteOut =
+ new ByteArrayPositionOutputStream() {
+ @Override
+ public void write(byte[] b) throws IOException {
+ try {
+ Thread.sleep(100);
+ super.write(b);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ AsyncPositionOutputStream out = new AsyncPositionOutputStream(byteOut);
+ out.write(new byte[] {1, 2, 3});
+ out.write(new byte[] {5, 6, 7});
+ out.flush();
+ assertThat(byteOut.getPos()).isEqualTo(6);
+
+ out.write(new byte[] {8, 9});
+
+ // test repeat flush
+ out.flush();
+ out.flush();
+ assertThat(byteOut.getPos()).isEqualTo(8);
+
+ assertThat(out.getBufferQueue().size()).isEqualTo(1);
+ }
+
+ @Test
+ public void testFlushWithException() throws IOException {
+ String msg = "your exception!";
+ ByteArrayPositionOutputStream byteOut =
+ new ByteArrayPositionOutputStream() {
+ @Override
+ public void write(byte[] b, int off, int len) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ throw new RuntimeException(msg);
+ }
+ };
+ AsyncPositionOutputStream out = new AsyncPositionOutputStream(byteOut);
+ out.write(new byte[] {5, 6, 7});
+ assertThatThrownBy(out::flush).hasMessage(msg);
+ }
+
+ @Test
+ public void testClose() throws IOException {
+ ByteArrayPositionOutputStream byteOut =
+ new ByteArrayPositionOutputStream() {
+ @Override
+ public void write(byte[] b) throws IOException {
+ try {
+ Thread.sleep(100);
+ super.write(b);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ AsyncPositionOutputStream out = new AsyncPositionOutputStream(byteOut);
+ out.write(new byte[] {1, 2, 3});
+ out.write(new byte[] {5, 6, 7});
+ out.close();
+ assertThat(byteOut.getPos()).isEqualTo(6);
+ }
+
+ @Test
+ public void testThrowException() throws IOException {
+ String msg = "your exception!";
+ ByteArrayPositionOutputStream out =
+ new ByteArrayPositionOutputStream() {
+ @Override
+ public void write(byte[] b, int off, int len) {
+ throw new RuntimeException(msg);
+ }
+ };
+
+ AsyncPositionOutputStream asyncOut = new AsyncPositionOutputStream(out);
+ asyncOut.write(new byte[] {1, 2, 3});
+ assertThatThrownBy(asyncOut::close).hasMessageContaining(msg);
+ assertThat(out.closed).isTrue();
+ }
+
+ private static class ByteArrayPositionOutputStream extends PositionOutputStream {
+
+ private final ByteArrayOutputStream out;
+
+ private boolean closed;
+
+ private ByteArrayPositionOutputStream() {
+ this.out = new ByteArrayOutputStream();
+ }
+
+ @Override
+ public long getPos() {
+ return out.size();
+ }
+
+ @Override
+ public void write(int b) {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) {
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ closed = true;
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 897674b3b347..17ebe215ee1a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -35,7 +35,7 @@
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
-import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
+import org.apache.paimon.operation.AppendOnlyFileStoreWrite.BucketFileRead;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.statistics.SimpleColStatsCollector;
@@ -68,8 +68,9 @@ public class AppendOnlyWriter implements RecordWriter, MemoryOwner
private final RowType writeSchema;
private final DataFilePathFactory pathFactory;
private final CompactManager compactManager;
- private final AppendOnlyFileStoreWrite.BucketFileRead bucketFileRead;
+ private final BucketFileRead bucketFileRead;
private final boolean forceCompact;
+ private final boolean asyncFileWrite;
private final List newFiles;
private final List deletedFiles;
private final List compactBefore;
@@ -94,7 +95,7 @@ public AppendOnlyWriter(
RowType writeSchema,
long maxSequenceNumber,
CompactManager compactManager,
- AppendOnlyFileStoreWrite.BucketFileRead bucketFileRead,
+ BucketFileRead bucketFileRead,
boolean forceCompact,
DataFilePathFactory pathFactory,
@Nullable CommitIncrement increment,
@@ -104,7 +105,8 @@ public AppendOnlyWriter(
String spillCompression,
SimpleColStatsCollector.Factory[] statsCollectors,
MemorySize maxDiskSize,
- FileIndexOptions fileIndexOptions) {
+ FileIndexOptions fileIndexOptions,
+ boolean asyncFileWrite) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
@@ -114,6 +116,7 @@ public AppendOnlyWriter(
this.compactManager = compactManager;
this.bucketFileRead = bucketFileRead;
this.forceCompact = forceCompact;
+ this.asyncFileWrite = asyncFileWrite;
this.newFiles = new ArrayList<>();
this.deletedFiles = new ArrayList<>();
this.compactBefore = new ArrayList<>();
@@ -261,7 +264,8 @@ private RowDataRollingFileWriter createRollingRowWriter() {
fileCompression,
statsCollectors,
fileIndexOptions,
- FileSource.APPEND);
+ FileSource.APPEND,
+ asyncFileWrite);
}
private void trySyncLatestCompaction(boolean blocking)
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
index 97ab8c5d1798..dfa5f7e5adbf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -125,7 +125,8 @@ private class IcebergManifestEntryWriter
factory,
path,
serializer::toRow,
- fileCompression);
+ fileCompression,
+ false);
this.partitionStatsCollector = new SimpleStatsCollector(partitionType);
this.sequenceNumber = sequenceNumber;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
index 46db1ddd1b71..064d72829141 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java
@@ -93,7 +93,8 @@ public KeyValueDataFileWriter(
simpleStatsExtractor,
compression,
StatsCollectorFactories.createStatsFactories(
- options, KeyValue.schema(keyType, valueType).getFieldNames()));
+ options, KeyValue.schema(keyType, valueType).getFieldNames()),
+ options.asyncFileWrite());
this.keyType = keyType;
this.valueType = valueType;
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
index ae18768c953a..f9c9b950214f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java
@@ -62,7 +62,8 @@ public RowDataFileWriter(
String fileCompression,
SimpleColStatsCollector.Factory[] statsCollectors,
FileIndexOptions fileIndexOptions,
- FileSource fileSource) {
+ FileSource fileSource,
+ boolean asyncFileWrite) {
super(
fileIO,
factory,
@@ -71,7 +72,8 @@ public RowDataFileWriter(
writeSchema,
simpleStatsExtractor,
fileCompression,
- statsCollectors);
+ statsCollectors,
+ asyncFileWrite);
this.schemaId = schemaId;
this.seqNumCounter = seqNumCounter;
this.statsArraySerializer = new SimpleStatsConverter(writeSchema);
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
index 9827c8f7c803..e60913d25f87 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java
@@ -42,7 +42,8 @@ public RowDataRollingFileWriter(
String fileCompression,
SimpleColStatsCollector.Factory[] statsCollectors,
FileIndexOptions fileIndexOptions,
- FileSource fileSource) {
+ FileSource fileSource,
+ boolean asyncFileWrite) {
super(
() ->
new RowDataFileWriter(
@@ -60,7 +61,8 @@ public RowDataRollingFileWriter(
fileCompression,
statsCollectors,
fileIndexOptions,
- fileSource),
+ fileSource,
+ asyncFileWrite),
targetFileSize);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index 6d72106d6c56..c9de9e8d5a40 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -21,6 +21,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.AsyncPositionOutputStream;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
@@ -58,13 +59,17 @@ public SingleFileWriter(
FormatWriterFactory factory,
Path path,
Function converter,
- String compression) {
+ String compression,
+ boolean asyncWrite) {
this.fileIO = fileIO;
this.path = path;
this.converter = converter;
try {
out = fileIO.newOutputStream(path, false);
+ if (asyncWrite) {
+ out = new AsyncPositionOutputStream(out);
+ }
writer = factory.create(out, compression);
} catch (IOException e) {
LOG.warn(
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
index e37993144491..705204dfd62e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java
@@ -53,8 +53,9 @@ public StatsCollectingSingleFileWriter(
RowType writeSchema,
@Nullable SimpleStatsExtractor simpleStatsExtractor,
String compression,
- SimpleColStatsCollector.Factory[] statsCollectors) {
- super(fileIO, factory, path, converter, compression);
+ SimpleColStatsCollector.Factory[] statsCollectors,
+ boolean asyncWrite) {
+ super(fileIO, factory, path, converter, compression, asyncWrite);
this.simpleStatsExtractor = simpleStatsExtractor;
if (this.simpleStatsExtractor == null) {
this.simpleStatsCollector = new SimpleStatsCollector(writeSchema, statsCollectors);
diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 4fd3df960ffc..2f5cb4f2369d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -116,7 +116,13 @@ private class ManifestEntryWriter extends SingleFileWriter createWriter(
spillCompression,
statsCollectors,
maxDiskSize,
- fileIndexOptions);
+ fileIndexOptions,
+ options.asyncFileWrite());
}
public AppendOnlyCompactManager.CompactRewriter compactRewriter(
@@ -184,7 +185,8 @@ public AppendOnlyCompactManager.CompactRewriter compactRewriter(
fileCompression,
statsCollectors,
fileIndexOptions,
- FileSource.COMPACT);
+ FileSource.COMPACT,
+ options.asyncFileWrite());
try {
rewriter.write(bucketReader(partition, bucket).read(toCompact));
} finally {
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index 0b2bd719ff96..fc5bd926a295 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -49,7 +49,7 @@
public abstract class MemoryFileStoreWrite extends AbstractFileStoreWrite {
private static final Logger LOG = LoggerFactory.getLogger(MemoryFileStoreWrite.class);
- private final CoreOptions options;
+ protected final CoreOptions options;
protected final CacheManager cacheManager;
private MemoryPoolFactory writeBufferPool;
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 2fdb4968d9e1..b040cda1b227 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -628,7 +628,8 @@ private Pair> createWriter(
StatsCollectorFactories.createStatsFactories(
options, AppendOnlyWriterTest.SCHEMA.getFieldNames()),
MemorySize.MAX_VALUE,
- new FileIndexOptions());
+ new FileIndexOptions(),
+ true);
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
return Pair.of(writer, compactManager.allFiles());
diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 1a3503f7a789..e2ccd063e878 100644
--- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -91,7 +91,8 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception
StatsCollectorFactories.createStatsFactories(
options, SCHEMA.getFieldNames()),
MemorySize.MAX_VALUE,
- new FileIndexOptions());
+ new FileIndexOptions(),
+ true);
appendOnlyWriter.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
appendOnlyWriter.write(
diff --git a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index 2ac2a8e4d319..d3039e00ca02 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -90,7 +90,8 @@ public void initialize(String identifier) {
new CoreOptions(new HashMap<>()),
SCHEMA.getFieldNames()),
new FileIndexOptions(),
- FileSource.APPEND),
+ FileSource.APPEND,
+ true),
TARGET_FILE_SIZE);
}