From af5d3018b9d80f634f37c3660f33b0b9871dfe3e Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Fri, 9 Aug 2024 22:36:49 +0800 Subject: [PATCH] fix --- .../flink/cfg/DorisExecutionOptions.java | 16 +-- .../sink/batch/BatchBufferHttpEntity.java | 1 - .../flink/sink/batch/BatchRecordBuffer.java | 9 +- .../sink/batch/DorisBatchStreamLoad.java | 98 +++++++++++++++++-- 4 files changed, 100 insertions(+), 24 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 25bd497f9..826480b84 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -358,6 +358,9 @@ public Builder setFlushQueueSize(int flushQueueSize) { } public Builder setBufferFlushIntervalMs(long bufferFlushIntervalMs) { + Preconditions.checkState( + bufferFlushIntervalMs >= 1000, + "bufferFlushIntervalMs must be greater than or equal to 1 second"); this.bufferFlushIntervalMs = bufferFlushIntervalMs; return this; } @@ -394,7 +397,6 @@ public DorisExecutionOptions build() { && JSON.equals(streamLoadProp.getProperty(FORMAT_KEY))) { streamLoadProp.put(READ_JSON_BY_LINE, true); } - checkParams(); return new DorisExecutionOptions( checkInterval, @@ -416,17 +418,5 @@ public DorisExecutionOptions build() { writeMode, ignoreCommitError); } - - private void checkParams() { - Preconditions.checkState( - bufferFlushIntervalMs >= 1000, - "bufferFlushIntervalMs must be greater than or equal to 1 second"); - Preconditions.checkState( - bufferFlushMaxBytes >= 52428800, - "bufferFlushMaxBytes must be greater than or equal to 52428800(50mb)"); - Preconditions.checkState( - bufferFlushMaxRows >= 50000, - "bufferFlushMaxRows must be greater than or equal to 50000"); - } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java index a70d26323..3c0068eb4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java @@ -29,7 +29,6 @@ public class BatchBufferHttpEntity extends AbstractHttpEntity { private static final Logger LOG = LoggerFactory.getLogger(BatchBufferHttpEntity.class); - protected static final int OUTPUT_BUFFER_SIZE = 4096; private final List buffer; private final long contentLength; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java index 0e38f2728..7149a0603 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java @@ -50,16 +50,19 @@ public BatchRecordBuffer(String database, String table, byte[] lineDelimiter, lo this.retainTime = retainTime; } - public void insert(byte[] record) { + public int insert(byte[] record) { + int recordSize = record.length; if (loadBatchFirstRecord) { loadBatchFirstRecord = false; } else if (lineDelimiter != null) { this.buffer.add(this.lineDelimiter); setBufferSizeBytes(this.bufferSizeBytes + this.lineDelimiter.length); + recordSize += this.lineDelimiter.length; } this.buffer.add(record); setNumOfRecords(this.numOfRecords + 1); setBufferSizeBytes(this.bufferSizeBytes + record.length); + return recordSize; } public String getLabelName() { @@ -130,6 +133,10 @@ public String getTableIdentifier() { return null; } + public byte[] getLineDelimiter() { + return lineDelimiter; + } + public boolean shouldFlush() { return System.currentTimeMillis() - createTime > retainTime; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index e457c56c6..adfbe988a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -59,7 +59,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT; import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; @@ -101,6 +105,13 @@ public class DorisBatchStreamLoad implements Serializable { private boolean enableGroupCommit; private boolean enableGzCompress; private int subTaskId; + private long streamLoadMaxRows = Integer.MAX_VALUE; + // todoļ¼šCan be configured + private long streamLoadMaxBytes = 1024 * 1024 * 1024; + private long maxBlockedBytes = 200 * 1024 * 1024; + private final AtomicLong currentCacheBytes = new AtomicLong(0L); + private final Lock lock = new ReentrantLock(); + private final Condition block = lock.newCondition(); public DorisBatchStreamLoad( DorisOptions dorisOptions, @@ -166,6 +177,7 @@ public DorisBatchStreamLoad( public synchronized void writeRecord(String database, String table, byte[] record) { checkFlushException(); String bufferKey = getTableIdentifier(database, table); + BatchRecordBuffer buffer = bufferMap.computeIfAbsent( bufferKey, @@ -175,11 +187,39 @@ public synchronized void writeRecord(String database, String table, byte[] recor table, this.lineDelimiter, executionOptions.getBufferFlushIntervalMs())); - buffer.insert(record); - if (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() - || buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows()) { + + int bytes = buffer.insert(record); + currentCacheBytes.addAndGet(bytes); + if (currentCacheBytes.get() > maxBlockedBytes) { + lock.lock(); + try { + while (currentCacheBytes.get() >= maxBlockedBytes) { + LOG.info( + "Cache full, waiting for flush, currentBytes: {}, maxBlockedBytes: {}", + currentCacheBytes.get(), + maxBlockedBytes); + block.await(1, TimeUnit.SECONDS); + } + } catch (InterruptedException e) { + this.exception.set(e); + throw new RuntimeException(e); + } finally { + lock.unlock(); + } + } + + // queue has space, flush according to the bufferMaxRows/bufferMaxBytes + if (flushQueue.size() < executionOptions.getFlushQueueSize() + && (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() + || buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) { boolean flush = bufferFullFlush(bufferKey); LOG.info("trigger flush by buffer full, flush: {}", flush); + + } else if (buffer.getBufferSizeBytes() >= streamLoadMaxRows + || buffer.getNumOfRecords() >= streamLoadMaxBytes) { + // The buffer capacity exceeds the stream load limit, flush + boolean flush = bufferFullFlush(bufferKey); + LOG.info("trigger flush by buffer exceeding the limit, flush: {}", flush); } } @@ -299,10 +339,12 @@ public void run() { } recordList.add(buffer); - flushQueue.drainTo(recordList, flushQueueSize - 1); - mergeBuffer(recordList, buffer); - load(buffer.getLabelName(), buffer); + if (!flushQueue.isEmpty()) { + flushQueue.drainTo(recordList, flushQueueSize - 1); + mergeBuffer(recordList, buffer); + } + load(buffer.getLabelName(), buffer); if (flushQueue.isEmpty()) { // Avoid waiting for 2 rounds of intervalMs doFlush(null, false, false); @@ -331,17 +373,43 @@ private void mergeBuffer(List recordList, BatchRecordBuffer b if (sameTable) { for (BatchRecordBuffer recordBuffer : recordList) { if (recordBuffer.getLabelName() != null - && !buffer.getLabelName().equals(recordBuffer.getLabelName())) { - buffer.getBuffer().addAll(recordBuffer.getBuffer()); + && !buffer.getLabelName().equals(recordBuffer.getLabelName()) + && !recordBuffer.getBuffer().isEmpty()) { + merge(buffer, recordBuffer); } } - LOG.info("merge {} buffer to one stream load", recordList.size()); + LOG.info( + "merge {} buffer to one stream load, result bufferBytes {}", + recordList.size(), + buffer.getBufferSizeBytes()); } } } + public boolean merge(BatchRecordBuffer mergeBuffer, BatchRecordBuffer buffer) { + if (buffer.getBuffer().isEmpty()) { + return false; + } + if (!mergeBuffer.getBuffer().isEmpty()) { + mergeBuffer.getBuffer().add(mergeBuffer.getLineDelimiter()); + mergeBuffer.setBufferSizeBytes( + mergeBuffer.getBufferSizeBytes() + mergeBuffer.getLineDelimiter().length); + currentCacheBytes.addAndGet(buffer.getLineDelimiter().length); + } + mergeBuffer.getBuffer().addAll(buffer.getBuffer()); + mergeBuffer.setNumOfRecords(mergeBuffer.getNumOfRecords() + buffer.getNumOfRecords()); + mergeBuffer.setBufferSizeBytes( + mergeBuffer.getBufferSizeBytes() + buffer.getBufferSizeBytes()); + return true; + } + /** execute stream load. */ public void load(String label, BatchRecordBuffer buffer) throws IOException { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } if (enableGroupCommit) { label = null; } @@ -386,6 +454,18 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { respContent.getErrorURL()); throw new DorisBatchLoadException(errMsg); } else { + long cacheByteBeforeFlush = + currentCacheBytes.getAndAdd(-respContent.getLoadBytes()); + LOG.info( + "load success, cacheBeforeFlushBytes: {}, currentCacheBytes : {}", + cacheByteBeforeFlush, + currentCacheBytes.get()); + lock.lock(); + try { + block.signal(); + } finally { + lock.unlock(); + } return; } }