Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Aug 9, 2024
1 parent e58833b commit af5d301
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ public Builder setFlushQueueSize(int flushQueueSize) {
}

public Builder setBufferFlushIntervalMs(long bufferFlushIntervalMs) {
Preconditions.checkState(
bufferFlushIntervalMs >= 1000,
"bufferFlushIntervalMs must be greater than or equal to 1 second");
this.bufferFlushIntervalMs = bufferFlushIntervalMs;
return this;
}
Expand Down Expand Up @@ -394,7 +397,6 @@ public DorisExecutionOptions build() {
&& JSON.equals(streamLoadProp.getProperty(FORMAT_KEY))) {
streamLoadProp.put(READ_JSON_BY_LINE, true);
}
checkParams();

return new DorisExecutionOptions(
checkInterval,
Expand All @@ -416,17 +418,5 @@ public DorisExecutionOptions build() {
writeMode,
ignoreCommitError);
}

private void checkParams() {
Preconditions.checkState(
bufferFlushIntervalMs >= 1000,
"bufferFlushIntervalMs must be greater than or equal to 1 second");
Preconditions.checkState(
bufferFlushMaxBytes >= 52428800,
"bufferFlushMaxBytes must be greater than or equal to 52428800(50mb)");
Preconditions.checkState(
bufferFlushMaxRows >= 50000,
"bufferFlushMaxRows must be greater than or equal to 50000");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
public class BatchBufferHttpEntity extends AbstractHttpEntity {

private static final Logger LOG = LoggerFactory.getLogger(BatchBufferHttpEntity.class);

protected static final int OUTPUT_BUFFER_SIZE = 4096;
private final List<byte[]> buffer;
private final long contentLength;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,19 @@ public BatchRecordBuffer(String database, String table, byte[] lineDelimiter, lo
this.retainTime = retainTime;
}

public void insert(byte[] record) {
public int insert(byte[] record) {
int recordSize = record.length;
if (loadBatchFirstRecord) {
loadBatchFirstRecord = false;
} else if (lineDelimiter != null) {
this.buffer.add(this.lineDelimiter);
setBufferSizeBytes(this.bufferSizeBytes + this.lineDelimiter.length);
recordSize += this.lineDelimiter.length;
}
this.buffer.add(record);
setNumOfRecords(this.numOfRecords + 1);
setBufferSizeBytes(this.bufferSizeBytes + record.length);
return recordSize;
}

public String getLabelName() {
Expand Down Expand Up @@ -130,6 +133,10 @@ public String getTableIdentifier() {
return null;
}

public byte[] getLineDelimiter() {
return lineDelimiter;
}

public boolean shouldFlush() {
return System.currentTimeMillis() - createTime > retainTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
Expand Down Expand Up @@ -101,6 +105,13 @@ public class DorisBatchStreamLoad implements Serializable {
private boolean enableGroupCommit;
private boolean enableGzCompress;
private int subTaskId;
private long streamLoadMaxRows = Integer.MAX_VALUE;
// todo:Can be configured
private long streamLoadMaxBytes = 1024 * 1024 * 1024;
private long maxBlockedBytes = 200 * 1024 * 1024;
private final AtomicLong currentCacheBytes = new AtomicLong(0L);
private final Lock lock = new ReentrantLock();
private final Condition block = lock.newCondition();

public DorisBatchStreamLoad(
DorisOptions dorisOptions,
Expand Down Expand Up @@ -166,6 +177,7 @@ public DorisBatchStreamLoad(
public synchronized void writeRecord(String database, String table, byte[] record) {
checkFlushException();
String bufferKey = getTableIdentifier(database, table);

BatchRecordBuffer buffer =
bufferMap.computeIfAbsent(
bufferKey,
Expand All @@ -175,11 +187,39 @@ public synchronized void writeRecord(String database, String table, byte[] recor
table,
this.lineDelimiter,
executionOptions.getBufferFlushIntervalMs()));
buffer.insert(record);
if (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes()
|| buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows()) {

int bytes = buffer.insert(record);
currentCacheBytes.addAndGet(bytes);
if (currentCacheBytes.get() > maxBlockedBytes) {
lock.lock();
try {
while (currentCacheBytes.get() >= maxBlockedBytes) {
LOG.info(
"Cache full, waiting for flush, currentBytes: {}, maxBlockedBytes: {}",
currentCacheBytes.get(),
maxBlockedBytes);
block.await(1, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
this.exception.set(e);
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}

// queue has space, flush according to the bufferMaxRows/bufferMaxBytes
if (flushQueue.size() < executionOptions.getFlushQueueSize()
&& (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes()
|| buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) {
boolean flush = bufferFullFlush(bufferKey);
LOG.info("trigger flush by buffer full, flush: {}", flush);

} else if (buffer.getBufferSizeBytes() >= streamLoadMaxRows
|| buffer.getNumOfRecords() >= streamLoadMaxBytes) {
// The buffer capacity exceeds the stream load limit, flush
boolean flush = bufferFullFlush(bufferKey);
LOG.info("trigger flush by buffer exceeding the limit, flush: {}", flush);
}
}

Expand Down Expand Up @@ -299,10 +339,12 @@ public void run() {
}

recordList.add(buffer);
flushQueue.drainTo(recordList, flushQueueSize - 1);
mergeBuffer(recordList, buffer);
load(buffer.getLabelName(), buffer);
if (!flushQueue.isEmpty()) {
flushQueue.drainTo(recordList, flushQueueSize - 1);
mergeBuffer(recordList, buffer);
}

load(buffer.getLabelName(), buffer);
if (flushQueue.isEmpty()) {
// Avoid waiting for 2 rounds of intervalMs
doFlush(null, false, false);
Expand Down Expand Up @@ -331,17 +373,43 @@ private void mergeBuffer(List<BatchRecordBuffer> recordList, BatchRecordBuffer b
if (sameTable) {
for (BatchRecordBuffer recordBuffer : recordList) {
if (recordBuffer.getLabelName() != null
&& !buffer.getLabelName().equals(recordBuffer.getLabelName())) {
buffer.getBuffer().addAll(recordBuffer.getBuffer());
&& !buffer.getLabelName().equals(recordBuffer.getLabelName())
&& !recordBuffer.getBuffer().isEmpty()) {
merge(buffer, recordBuffer);
}
}
LOG.info("merge {} buffer to one stream load", recordList.size());
LOG.info(
"merge {} buffer to one stream load, result bufferBytes {}",
recordList.size(),
buffer.getBufferSizeBytes());
}
}
}

public boolean merge(BatchRecordBuffer mergeBuffer, BatchRecordBuffer buffer) {
if (buffer.getBuffer().isEmpty()) {
return false;
}
if (!mergeBuffer.getBuffer().isEmpty()) {
mergeBuffer.getBuffer().add(mergeBuffer.getLineDelimiter());
mergeBuffer.setBufferSizeBytes(
mergeBuffer.getBufferSizeBytes() + mergeBuffer.getLineDelimiter().length);
currentCacheBytes.addAndGet(buffer.getLineDelimiter().length);
}
mergeBuffer.getBuffer().addAll(buffer.getBuffer());
mergeBuffer.setNumOfRecords(mergeBuffer.getNumOfRecords() + buffer.getNumOfRecords());
mergeBuffer.setBufferSizeBytes(
mergeBuffer.getBufferSizeBytes() + buffer.getBufferSizeBytes());
return true;
}

/** execute stream load. */
public void load(String label, BatchRecordBuffer buffer) throws IOException {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (enableGroupCommit) {
label = null;
}
Expand Down Expand Up @@ -386,6 +454,18 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
respContent.getErrorURL());
throw new DorisBatchLoadException(errMsg);
} else {
long cacheByteBeforeFlush =
currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
LOG.info(
"load success, cacheBeforeFlushBytes: {}, currentCacheBytes : {}",
cacheByteBeforeFlush,
currentCacheBytes.get());
lock.lock();
try {
block.signal();
} finally {
lock.unlock();
}
return;
}
}
Expand Down

0 comments on commit af5d301

Please sign in to comment.