diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/AsyncPositionOutputStream.java b/paimon-common/src/main/java/org/apache/paimon/fs/AsyncPositionOutputStream.java index 0cc043b22c9a..4ea6e1e038f0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/AsyncPositionOutputStream.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/AsyncPositionOutputStream.java @@ -40,7 +40,8 @@ public class AsyncPositionOutputStream extends PositionOutputStream { Executors.newCachedThreadPool(newDaemonThreadFactory("AsyncOutputStream")); public static final int AWAIT_TIMEOUT_SECONDS = 10; - public static final int BUFFER_SIZE = 1024 * 32; + public static final int BUFFER_SIZE = 1024 * 64; + public static final int MAX_BUFFER = 1024; private final PositionOutputStream out; private final FixLenByteArrayOutputStream buffer; @@ -49,6 +50,7 @@ public class AsyncPositionOutputStream extends PositionOutputStream { private final AtomicReference exception; private final Future future; + private int totalBuffers; private long position; public AsyncPositionOutputStream(PositionOutputStream out) { @@ -60,6 +62,7 @@ public AsyncPositionOutputStream(PositionOutputStream out) { this.future = EXECUTOR_SERVICE.submit(this::execute); this.buffer = new FixLenByteArrayOutputStream(); this.buffer.setBuffer(new byte[BUFFER_SIZE]); + this.totalBuffers = 1; } @VisibleForTesting @@ -107,14 +110,32 @@ public long getPos() throws IOException { return position; } - private void flushBuffer() { + private void flushBuffer() throws IOException { if (buffer.getCount() == 0) { return; } putEvent(new DataEvent(buffer.getBuffer(), buffer.getCount())); - byte[] byteArray = bufferQueue.poll(); + byte[] byteArray; + if (totalBuffers >= MAX_BUFFER) { + while (true) { + checkException(); + try { + byteArray = bufferQueue.poll(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (byteArray != null) { + break; + } + } catch (InterruptedException e) { + sendEndEvent(); + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } else { + byteArray = bufferQueue.poll(); + } if (byteArray == null) { byteArray = new byte[BUFFER_SIZE]; + totalBuffers++; } buffer.setBuffer(byteArray); buffer.setCount(0); @@ -163,6 +184,7 @@ public void flush() throws IOException { } checkException(); } catch (InterruptedException e) { + sendEndEvent(); Thread.currentThread().interrupt(); throw new RuntimeException(e); } @@ -173,7 +195,7 @@ public void flush() throws IOException { public void close() throws IOException { checkException(); flushBuffer(); - putEvent(new EndEvent()); + sendEndEvent(); try { this.future.get(); } catch (InterruptedException e) { @@ -184,6 +206,10 @@ public void close() throws IOException { } } + private void sendEndEvent() { + putEvent(new EndEvent()); + } + private void putEvent(AsyncEvent event) { try { eventQueue.put(event);