From 84732cde8846fb97d7aa5f0d6765d6f552b8884c Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Tue, 6 Aug 2024 14:24:14 +0800 Subject: [PATCH] [core] Introduce AsyncPositionOutputStream (#3875) --- .../generated/core_configuration.html | 6 + .../java/org/apache/paimon/CoreOptions.java | 11 + .../paimon/fs/AsyncPositionOutputStream.java | 227 ++++++++++++++++++ .../utils/FixLenByteArrayOutputStream.java | 63 +++++ .../utils/ReuseByteArrayOutputStream.java | 37 +++ .../fs/AsyncPositionOutputStreamTest.java | 217 +++++++++++++++++ .../paimon/append/AppendOnlyWriter.java | 14 +- .../iceberg/manifest/IcebergManifestFile.java | 3 +- .../paimon/io/KeyValueDataFileWriter.java | 3 +- .../apache/paimon/io/RowDataFileWriter.java | 6 +- .../paimon/io/RowDataRollingFileWriter.java | 6 +- .../apache/paimon/io/SingleFileWriter.java | 7 +- .../io/StatsCollectingSingleFileWriter.java | 5 +- .../apache/paimon/manifest/ManifestFile.java | 8 +- .../operation/AppendOnlyFileStoreWrite.java | 6 +- .../operation/MemoryFileStoreWrite.java | 2 +- .../paimon/append/AppendOnlyWriterTest.java | 3 +- .../paimon/format/FileFormatSuffixTest.java | 3 +- .../paimon/io/RollingFileWriterTest.java | 3 +- 19 files changed, 609 insertions(+), 21 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/fs/AsyncPositionOutputStream.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/utils/FixLenByteArrayOutputStream.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/utils/ReuseByteArrayOutputStream.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/fs/AsyncPositionOutputStreamTest.java diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index bc6f8b5c4efc..1f5fb0dc1e82 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -26,6 +26,12 @@ + +
async-file-write
+ true + Boolean + Whether to enable asynchronous IO writing when writing files. +
auto-create
false diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 156915e2365d..93ada725fddb 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1283,6 +1283,13 @@ public class CoreOptions implements Serializable { "When a batch job queries from a table, if a partition does not exist in the current branch, " + "the reader will try to get this partition from this fallback branch."); + public static final ConfigOption ASYNC_FILE_WRITE = + key("async-file-write") + .booleanType() + .defaultValue(true) + .withDescription( + "Whether to enable asynchronous IO writing when writing files."); + private final Options options; public CoreOptions(Map options) { @@ -2020,6 +2027,10 @@ public boolean prepareCommitWaitCompaction() { return options.get(LOOKUP_WAIT); } + public boolean asyncFileWrite() { + return options.get(ASYNC_FILE_WRITE); + } + public boolean metadataIcebergCompatible() { return options.get(METADATA_ICEBERG_COMPATIBLE); } diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/AsyncPositionOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/fs/AsyncPositionOutputStream.java new file mode 100644 index 000000000000..0cc043b22c9a --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fs/AsyncPositionOutputStream.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.utils.FixLenByteArrayOutputStream; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.paimon.utils.ThreadUtils.newDaemonThreadFactory; + +/** A {@link PositionOutputStream} which uses a async thread to write data. */ +public class AsyncPositionOutputStream extends PositionOutputStream { + + public static final ExecutorService EXECUTOR_SERVICE = + Executors.newCachedThreadPool(newDaemonThreadFactory("AsyncOutputStream")); + + public static final int AWAIT_TIMEOUT_SECONDS = 10; + public static final int BUFFER_SIZE = 1024 * 32; + + private final PositionOutputStream out; + private final FixLenByteArrayOutputStream buffer; + private final LinkedBlockingQueue bufferQueue; + private final LinkedBlockingQueue eventQueue; + private final AtomicReference exception; + private final Future future; + + private long position; + + public AsyncPositionOutputStream(PositionOutputStream out) { + this.out = out; + this.bufferQueue = new LinkedBlockingQueue<>(); + this.eventQueue = new LinkedBlockingQueue<>(); + this.exception = new AtomicReference<>(); + this.position = 0; + this.future = EXECUTOR_SERVICE.submit(this::execute); + this.buffer = new FixLenByteArrayOutputStream(); + this.buffer.setBuffer(new byte[BUFFER_SIZE]); + } + + @VisibleForTesting + LinkedBlockingQueue getBufferQueue() { + return bufferQueue; + } + + private void execute() { + try { + doWork(); + } catch (Throwable e) { + exception.set(e); + throw new RuntimeException(e); + } + } + + private void doWork() throws InterruptedException, IOException { + try { + while (true) { + AsyncEvent event = eventQueue.poll(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (event == null) { + continue; + } + if (event instanceof EndEvent) { + return; + } + if (event instanceof DataEvent) { + DataEvent dataEvent = (DataEvent) event; + out.write(dataEvent.data, 0, dataEvent.length); + bufferQueue.add(dataEvent.data); + } + if (event instanceof FlushEvent) { + out.flush(); + ((FlushEvent) event).latch.countDown(); + } + } + } finally { + out.close(); + } + } + + @Override + public long getPos() throws IOException { + checkException(); + return position; + } + + private void flushBuffer() { + if (buffer.getCount() == 0) { + return; + } + putEvent(new DataEvent(buffer.getBuffer(), buffer.getCount())); + byte[] byteArray = bufferQueue.poll(); + if (byteArray == null) { + byteArray = new byte[BUFFER_SIZE]; + } + buffer.setBuffer(byteArray); + buffer.setCount(0); + } + + @Override + public void write(int b) throws IOException { + checkException(); + position++; + while (buffer.write((byte) b) != 1) { + flushBuffer(); + } + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + checkException(); + position += len; + while (true) { + int written = buffer.write(b, off, len); + off += written; + len -= written; + if (len == 0) { + return; + } + flushBuffer(); + } + } + + @Override + public void flush() throws IOException { + checkException(); + flushBuffer(); + FlushEvent event = new FlushEvent(); + putEvent(event); + while (true) { + try { + boolean await = event.latch.await(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (await) { + return; + } + checkException(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } + + @Override + public void close() throws IOException { + checkException(); + flushBuffer(); + putEvent(new EndEvent()); + try { + this.future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + private void putEvent(AsyncEvent event) { + try { + eventQueue.put(event); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + private void checkException() throws IOException { + Throwable throwable = exception.get(); + if (throwable != null) { + if (throwable instanceof IOException) { + throw (IOException) throwable; + } + if (throwable instanceof RuntimeException) { + throw (RuntimeException) throwable; + } + throw new IOException(throwable); + } + } + + private interface AsyncEvent {} + + private static class DataEvent implements AsyncEvent { + + private final byte[] data; + private final int length; + + public DataEvent(byte[] data, int length) { + this.data = data; + this.length = length; + } + } + + private static class FlushEvent implements AsyncEvent { + private final CountDownLatch latch = new CountDownLatch(1); + } + + private static class EndEvent implements AsyncEvent {} +} diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/FixLenByteArrayOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/utils/FixLenByteArrayOutputStream.java new file mode 100644 index 000000000000..8926a52ce65e --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/FixLenByteArrayOutputStream.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import java.io.ByteArrayOutputStream; + +/** A {@link ByteArrayOutputStream} which can reuse byte array. */ +public class FixLenByteArrayOutputStream { + + private byte[] buf; + private int count; + + public void setBuffer(byte[] buffer) { + this.buf = buffer; + } + + public byte[] getBuffer() { + return buf; + } + + public int write(byte[] b, int off, int len) { + if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) { + throw new IndexOutOfBoundsException(); + } + int writeLen = Math.min(len, buf.length - count); + System.arraycopy(b, off, buf, count, writeLen); + count += writeLen; + return writeLen; + } + + public int getCount() { + return count; + } + + public int write(byte b) { + if (count < buf.length) { + buf[count] = b; + count += 1; + return 1; + } + return 0; + } + + public void setCount(int count) { + this.count = count; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ReuseByteArrayOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/utils/ReuseByteArrayOutputStream.java new file mode 100644 index 000000000000..cca955656c53 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ReuseByteArrayOutputStream.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import java.io.ByteArrayOutputStream; + +/** A {@link ByteArrayOutputStream} which can reuse byte array. */ +public class ReuseByteArrayOutputStream extends ByteArrayOutputStream { + + public ReuseByteArrayOutputStream(int size) { + super(size); + } + + public void setBuffer(byte[] buffer) { + this.buf = buffer; + } + + public byte[] getBuffer() { + return buf; + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/AsyncPositionOutputStreamTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/AsyncPositionOutputStreamTest.java new file mode 100644 index 000000000000..115a9b3cdc50 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/fs/AsyncPositionOutputStreamTest.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fs; + +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class AsyncPositionOutputStreamTest { + + @Test + public void testHugeWriteByteArray() throws IOException { + ByteArrayPositionOutputStream result = new ByteArrayPositionOutputStream(); + AsyncPositionOutputStream out = new AsyncPositionOutputStream(result); + int len = 10 * 1024; + ByteArrayOutputStream expected = new ByteArrayOutputStream(len); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + for (int i = 0; i < len; i++) { + byte[] bytes = new byte[rnd.nextInt(20)]; + rnd.nextBytes(bytes); + expected.write(bytes); + out.write(bytes); + } + out.close(); + + assertThat(result.out.toByteArray()).isEqualTo(expected.toByteArray()); + } + + @Test + public void testHugeWriteByte() throws IOException { + ByteArrayPositionOutputStream result = new ByteArrayPositionOutputStream(); + AsyncPositionOutputStream out = new AsyncPositionOutputStream(result); + int len = 32 * 1024 + 20; + ByteArrayOutputStream expected = new ByteArrayOutputStream(len); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + for (int i = 0; i < len; i++) { + int b = rnd.nextInt(); + expected.write(b); + out.write(b); + } + out.close(); + + assertThat(result.out.toByteArray()).isEqualTo(expected.toByteArray()); + } + + @Test + public void testNormal() throws IOException { + ByteArrayPositionOutputStream byteOut = new ByteArrayPositionOutputStream(); + AsyncPositionOutputStream out = new AsyncPositionOutputStream(byteOut); + out.write(1); + out.write(new byte[] {2, 3}); + out.write(new byte[] {3, 4, 5}, 1, 1); + out.close(); + assertThat(byteOut.out.toByteArray()).isEqualTo(new byte[] {1, 2, 3, 4}); + } + + @Test + public void testGetPos() throws IOException { + AsyncPositionOutputStream out = + new AsyncPositionOutputStream(new ByteArrayPositionOutputStream()); + out.write(new byte[] {1, 2, 3}); + assertThat(out.getPos()).isEqualTo(3); + out.write(new byte[] {5, 6, 7}); + assertThat(out.getPos()).isEqualTo(6); + } + + @Test + public void testFlush() throws IOException { + ByteArrayPositionOutputStream byteOut = + new ByteArrayPositionOutputStream() { + @Override + public void write(byte[] b) throws IOException { + try { + Thread.sleep(100); + super.write(b); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }; + AsyncPositionOutputStream out = new AsyncPositionOutputStream(byteOut); + out.write(new byte[] {1, 2, 3}); + out.write(new byte[] {5, 6, 7}); + out.flush(); + assertThat(byteOut.getPos()).isEqualTo(6); + + out.write(new byte[] {8, 9}); + + // test repeat flush + out.flush(); + out.flush(); + assertThat(byteOut.getPos()).isEqualTo(8); + + assertThat(out.getBufferQueue().size()).isEqualTo(1); + } + + @Test + public void testFlushWithException() throws IOException { + String msg = "your exception!"; + ByteArrayPositionOutputStream byteOut = + new ByteArrayPositionOutputStream() { + @Override + public void write(byte[] b, int off, int len) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new RuntimeException(msg); + } + }; + AsyncPositionOutputStream out = new AsyncPositionOutputStream(byteOut); + out.write(new byte[] {5, 6, 7}); + assertThatThrownBy(out::flush).hasMessage(msg); + } + + @Test + public void testClose() throws IOException { + ByteArrayPositionOutputStream byteOut = + new ByteArrayPositionOutputStream() { + @Override + public void write(byte[] b) throws IOException { + try { + Thread.sleep(100); + super.write(b); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }; + AsyncPositionOutputStream out = new AsyncPositionOutputStream(byteOut); + out.write(new byte[] {1, 2, 3}); + out.write(new byte[] {5, 6, 7}); + out.close(); + assertThat(byteOut.getPos()).isEqualTo(6); + } + + @Test + public void testThrowException() throws IOException { + String msg = "your exception!"; + ByteArrayPositionOutputStream out = + new ByteArrayPositionOutputStream() { + @Override + public void write(byte[] b, int off, int len) { + throw new RuntimeException(msg); + } + }; + + AsyncPositionOutputStream asyncOut = new AsyncPositionOutputStream(out); + asyncOut.write(new byte[] {1, 2, 3}); + assertThatThrownBy(asyncOut::close).hasMessageContaining(msg); + assertThat(out.closed).isTrue(); + } + + private static class ByteArrayPositionOutputStream extends PositionOutputStream { + + private final ByteArrayOutputStream out; + + private boolean closed; + + private ByteArrayPositionOutputStream() { + this.out = new ByteArrayOutputStream(); + } + + @Override + public long getPos() { + return out.size(); + } + + @Override + public void write(int b) { + out.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] b, int off, int len) { + out.write(b, off, len); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + out.close(); + closed = true; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 897674b3b347..17ebe215ee1a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -35,7 +35,7 @@ import org.apache.paimon.manifest.FileSource; import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.memory.MemorySegmentPool; -import org.apache.paimon.operation.AppendOnlyFileStoreWrite; +import org.apache.paimon.operation.AppendOnlyFileStoreWrite.BucketFileRead; import org.apache.paimon.options.MemorySize; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.statistics.SimpleColStatsCollector; @@ -68,8 +68,9 @@ public class AppendOnlyWriter implements RecordWriter, MemoryOwner private final RowType writeSchema; private final DataFilePathFactory pathFactory; private final CompactManager compactManager; - private final AppendOnlyFileStoreWrite.BucketFileRead bucketFileRead; + private final BucketFileRead bucketFileRead; private final boolean forceCompact; + private final boolean asyncFileWrite; private final List newFiles; private final List deletedFiles; private final List compactBefore; @@ -94,7 +95,7 @@ public AppendOnlyWriter( RowType writeSchema, long maxSequenceNumber, CompactManager compactManager, - AppendOnlyFileStoreWrite.BucketFileRead bucketFileRead, + BucketFileRead bucketFileRead, boolean forceCompact, DataFilePathFactory pathFactory, @Nullable CommitIncrement increment, @@ -104,7 +105,8 @@ public AppendOnlyWriter( String spillCompression, SimpleColStatsCollector.Factory[] statsCollectors, MemorySize maxDiskSize, - FileIndexOptions fileIndexOptions) { + FileIndexOptions fileIndexOptions, + boolean asyncFileWrite) { this.fileIO = fileIO; this.schemaId = schemaId; this.fileFormat = fileFormat; @@ -114,6 +116,7 @@ public AppendOnlyWriter( this.compactManager = compactManager; this.bucketFileRead = bucketFileRead; this.forceCompact = forceCompact; + this.asyncFileWrite = asyncFileWrite; this.newFiles = new ArrayList<>(); this.deletedFiles = new ArrayList<>(); this.compactBefore = new ArrayList<>(); @@ -261,7 +264,8 @@ private RowDataRollingFileWriter createRollingRowWriter() { fileCompression, statsCollectors, fileIndexOptions, - FileSource.APPEND); + FileSource.APPEND, + asyncFileWrite); } private void trySyncLatestCompaction(boolean blocking) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java index 97ab8c5d1798..dfa5f7e5adbf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java @@ -125,7 +125,8 @@ private class IcebergManifestEntryWriter factory, path, serializer::toRow, - fileCompression); + fileCompression, + false); this.partitionStatsCollector = new SimpleStatsCollector(partitionType); this.sequenceNumber = sequenceNumber; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index 46db1ddd1b71..064d72829141 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -93,7 +93,8 @@ public KeyValueDataFileWriter( simpleStatsExtractor, compression, StatsCollectorFactories.createStatsFactories( - options, KeyValue.schema(keyType, valueType).getFieldNames())); + options, KeyValue.schema(keyType, valueType).getFieldNames()), + options.asyncFileWrite()); this.keyType = keyType; this.valueType = valueType; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index ae18768c953a..f9c9b950214f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -62,7 +62,8 @@ public RowDataFileWriter( String fileCompression, SimpleColStatsCollector.Factory[] statsCollectors, FileIndexOptions fileIndexOptions, - FileSource fileSource) { + FileSource fileSource, + boolean asyncFileWrite) { super( fileIO, factory, @@ -71,7 +72,8 @@ public RowDataFileWriter( writeSchema, simpleStatsExtractor, fileCompression, - statsCollectors); + statsCollectors, + asyncFileWrite); this.schemaId = schemaId; this.seqNumCounter = seqNumCounter; this.statsArraySerializer = new SimpleStatsConverter(writeSchema); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java index 9827c8f7c803..e60913d25f87 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataRollingFileWriter.java @@ -42,7 +42,8 @@ public RowDataRollingFileWriter( String fileCompression, SimpleColStatsCollector.Factory[] statsCollectors, FileIndexOptions fileIndexOptions, - FileSource fileSource) { + FileSource fileSource, + boolean asyncFileWrite) { super( () -> new RowDataFileWriter( @@ -60,7 +61,8 @@ public RowDataRollingFileWriter( fileCompression, statsCollectors, fileIndexOptions, - fileSource), + fileSource, + asyncFileWrite), targetFileSize); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java index 6d72106d6c56..c9de9e8d5a40 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.FormatWriter; import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.fs.AsyncPositionOutputStream; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.PositionOutputStream; @@ -58,13 +59,17 @@ public SingleFileWriter( FormatWriterFactory factory, Path path, Function converter, - String compression) { + String compression, + boolean asyncWrite) { this.fileIO = fileIO; this.path = path; this.converter = converter; try { out = fileIO.newOutputStream(path, false); + if (asyncWrite) { + out = new AsyncPositionOutputStream(out); + } writer = factory.create(out, compression); } catch (IOException e) { LOG.warn( diff --git a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java index e37993144491..705204dfd62e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java @@ -53,8 +53,9 @@ public StatsCollectingSingleFileWriter( RowType writeSchema, @Nullable SimpleStatsExtractor simpleStatsExtractor, String compression, - SimpleColStatsCollector.Factory[] statsCollectors) { - super(fileIO, factory, path, converter, compression); + SimpleColStatsCollector.Factory[] statsCollectors, + boolean asyncWrite) { + super(fileIO, factory, path, converter, compression, asyncWrite); this.simpleStatsExtractor = simpleStatsExtractor; if (this.simpleStatsExtractor == null) { this.simpleStatsCollector = new SimpleStatsCollector(writeSchema, statsCollectors); diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java index 4fd3df960ffc..2f5cb4f2369d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java @@ -116,7 +116,13 @@ private class ManifestEntryWriter extends SingleFileWriter createWriter( spillCompression, statsCollectors, maxDiskSize, - fileIndexOptions); + fileIndexOptions, + options.asyncFileWrite()); } public AppendOnlyCompactManager.CompactRewriter compactRewriter( @@ -184,7 +185,8 @@ public AppendOnlyCompactManager.CompactRewriter compactRewriter( fileCompression, statsCollectors, fileIndexOptions, - FileSource.COMPACT); + FileSource.COMPACT, + options.asyncFileWrite()); try { rewriter.write(bucketReader(partition, bucket).read(toCompact)); } finally { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java index 0b2bd719ff96..fc5bd926a295 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java @@ -49,7 +49,7 @@ public abstract class MemoryFileStoreWrite extends AbstractFileStoreWrite { private static final Logger LOG = LoggerFactory.getLogger(MemoryFileStoreWrite.class); - private final CoreOptions options; + protected final CoreOptions options; protected final CacheManager cacheManager; private MemoryPoolFactory writeBufferPool; diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 2fdb4968d9e1..b040cda1b227 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -628,7 +628,8 @@ private Pair> createWriter( StatsCollectorFactories.createStatsFactories( options, AppendOnlyWriterTest.SCHEMA.getFieldNames()), MemorySize.MAX_VALUE, - new FileIndexOptions()); + new FileIndexOptions(), + true); writer.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); return Pair.of(writer, compactManager.allFiles()); diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index 1a3503f7a789..e2ccd063e878 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -91,7 +91,8 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception StatsCollectorFactories.createStatsFactories( options, SCHEMA.getFieldNames()), MemorySize.MAX_VALUE, - new FileIndexOptions()); + new FileIndexOptions(), + true); appendOnlyWriter.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); appendOnlyWriter.write( diff --git a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java index 2ac2a8e4d319..d3039e00ca02 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java @@ -90,7 +90,8 @@ public void initialize(String identifier) { new CoreOptions(new HashMap<>()), SCHEMA.getFieldNames()), new FileIndexOptions(), - FileSource.APPEND), + FileSource.APPEND, + true), TARGET_FILE_SIZE); }