Skip to content

Commit

Permalink
[core] Limit max buffer in AsyncPositionOutputStream
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Aug 7, 2024
1 parent cf0ac44 commit a3f5fbe
Showing 1 changed file with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public class AsyncPositionOutputStream extends PositionOutputStream {
Executors.newCachedThreadPool(newDaemonThreadFactory("AsyncOutputStream"));

public static final int AWAIT_TIMEOUT_SECONDS = 10;
public static final int BUFFER_SIZE = 1024 * 32;
public static final int BUFFER_SIZE = 1024 * 64;
public static final int MAX_BUFFER = 1024;

private final PositionOutputStream out;
private final FixLenByteArrayOutputStream buffer;
Expand All @@ -49,6 +50,7 @@ public class AsyncPositionOutputStream extends PositionOutputStream {
private final AtomicReference<Throwable> exception;
private final Future<?> future;

private int totalBuffers;
private long position;

public AsyncPositionOutputStream(PositionOutputStream out) {
Expand All @@ -60,6 +62,7 @@ public AsyncPositionOutputStream(PositionOutputStream out) {
this.future = EXECUTOR_SERVICE.submit(this::execute);
this.buffer = new FixLenByteArrayOutputStream();
this.buffer.setBuffer(new byte[BUFFER_SIZE]);
this.totalBuffers = 1;
}

@VisibleForTesting
Expand Down Expand Up @@ -107,14 +110,32 @@ public long getPos() throws IOException {
return position;
}

private void flushBuffer() {
private void flushBuffer() throws IOException {
if (buffer.getCount() == 0) {
return;
}
putEvent(new DataEvent(buffer.getBuffer(), buffer.getCount()));
byte[] byteArray = bufferQueue.poll();
byte[] byteArray;
if (totalBuffers >= MAX_BUFFER) {
while (true) {
checkException();
try {
byteArray = bufferQueue.poll(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (byteArray != null) {
break;
}
} catch (InterruptedException e) {
sendEndEvent();
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
} else {
byteArray = bufferQueue.poll();
}
if (byteArray == null) {
byteArray = new byte[BUFFER_SIZE];
totalBuffers++;
}
buffer.setBuffer(byteArray);
buffer.setCount(0);
Expand Down Expand Up @@ -163,6 +184,7 @@ public void flush() throws IOException {
}
checkException();
} catch (InterruptedException e) {
sendEndEvent();
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
Expand All @@ -173,7 +195,7 @@ public void flush() throws IOException {
public void close() throws IOException {
checkException();
flushBuffer();
putEvent(new EndEvent());
sendEndEvent();
try {
this.future.get();
} catch (InterruptedException e) {
Expand All @@ -184,6 +206,10 @@ public void close() throws IOException {
}
}

private void sendEndEvent() {
putEvent(new EndEvent());
}

private void putEvent(AsyncEvent event) {
try {
eventQueue.put(event);
Expand Down

0 comments on commit a3f5fbe

Please sign in to comment.