diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index d451dcadb..88de699ca 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -319,6 +319,53 @@ public void close() { this.flushQueue.clear(); } + @VisibleForTesting + public boolean mergeBuffer(List recordList, BatchRecordBuffer buffer) { + boolean merge = false; + if (recordList.size() > 1) { + boolean sameTable = + recordList.stream() + .map(BatchRecordBuffer::getTableIdentifier) + .distinct() + .count() + == 1; + // Buffers can be merged only if they belong to the same table. + if (sameTable) { + for (BatchRecordBuffer recordBuffer : recordList) { + if (recordBuffer != null + && recordBuffer.getLabelName() != null + && !buffer.getLabelName().equals(recordBuffer.getLabelName()) + && !recordBuffer.getBuffer().isEmpty()) { + merge(buffer, recordBuffer); + merge = true; + } + } + LOG.info( + "merge {} buffer to one stream load, result bufferBytes {}", + recordList.size(), + buffer.getBufferSizeBytes()); + } + } + return merge; + } + + private boolean merge(BatchRecordBuffer mergeBuffer, BatchRecordBuffer buffer) { + if (buffer.getBuffer().isEmpty()) { + return false; + } + if (!mergeBuffer.getBuffer().isEmpty()) { + mergeBuffer.getBuffer().add(mergeBuffer.getLineDelimiter()); + mergeBuffer.setBufferSizeBytes( + mergeBuffer.getBufferSizeBytes() + mergeBuffer.getLineDelimiter().length); + currentCacheBytes.addAndGet(buffer.getLineDelimiter().length); + } + mergeBuffer.getBuffer().addAll(buffer.getBuffer()); + mergeBuffer.setNumOfRecords(mergeBuffer.getNumOfRecords() + buffer.getNumOfRecords()); + mergeBuffer.setBufferSizeBytes( + mergeBuffer.getBufferSizeBytes() + buffer.getBufferSizeBytes()); + return true; + } + class LoadAsyncExecutor implements Runnable { private int flushQueueSize; @@ -342,12 +389,24 @@ public void run() { } recordList.add(buffer); + boolean merge = false; if (!flushQueue.isEmpty()) { flushQueue.drainTo(recordList, flushQueueSize - 1); - mergeBuffer(recordList, buffer); + if (mergeBuffer(recordList, buffer)) { + load(buffer.getLabelName(), buffer); + merge = true; + } + } + + if (!merge) { + for (BatchRecordBuffer bf : recordList) { + if (bf == null || bf.getLabelName() == null) { + continue; + } + load(bf.getLabelName(), bf); + } } - load(buffer.getLabelName(), buffer); if (flushQueue.isEmpty()) { // Avoid waiting for 2 rounds of intervalMs doFlush(null, false, false); @@ -364,48 +423,6 @@ public void run() { loadThreadAlive = false; } - private void mergeBuffer(List recordList, BatchRecordBuffer buffer) { - if (recordList.size() > 1) { - boolean sameTable = - recordList.stream() - .map(BatchRecordBuffer::getTableIdentifier) - .distinct() - .count() - == 1; - // Buffers can be merged only if they belong to the same table. - if (sameTable) { - for (BatchRecordBuffer recordBuffer : recordList) { - if (recordBuffer.getLabelName() != null - && !buffer.getLabelName().equals(recordBuffer.getLabelName()) - && !recordBuffer.getBuffer().isEmpty()) { - merge(buffer, recordBuffer); - } - } - LOG.info( - "merge {} buffer to one stream load, result bufferBytes {}", - recordList.size(), - buffer.getBufferSizeBytes()); - } - } - } - - public boolean merge(BatchRecordBuffer mergeBuffer, BatchRecordBuffer buffer) { - if (buffer.getBuffer().isEmpty()) { - return false; - } - if (!mergeBuffer.getBuffer().isEmpty()) { - mergeBuffer.getBuffer().add(mergeBuffer.getLineDelimiter()); - mergeBuffer.setBufferSizeBytes( - mergeBuffer.getBufferSizeBytes() + mergeBuffer.getLineDelimiter().length); - currentCacheBytes.addAndGet(buffer.getLineDelimiter().length); - } - mergeBuffer.getBuffer().addAll(buffer.getBuffer()); - mergeBuffer.setNumOfRecords(mergeBuffer.getNumOfRecords() + buffer.getNumOfRecords()); - mergeBuffer.setBufferSizeBytes( - mergeBuffer.getBufferSizeBytes() + buffer.getBufferSizeBytes()); - return true; - } - /** execute stream load. */ public void load(String label, BatchRecordBuffer buffer) throws IOException { try { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java index 1b1286f62..d8bbc42e8 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java @@ -41,9 +41,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; @@ -186,4 +190,51 @@ public void after() { backendUtilMockedStatic.close(); } } + + @Test + public void mergeBufferTest() throws Exception { + DorisReadOptions readOptions = DorisReadOptions.builder().build(); + DorisExecutionOptions executionOptions = DorisExecutionOptions.builder().build(); + DorisOptions options = + DorisOptions.builder() + .setFenodes("127.0.0.1:8030") + .setBenodes("127.0.0.1:9030") + .setTableIdentifier("db.tbl") + .build(); + + DorisBatchStreamLoad loader = + new DorisBatchStreamLoad( + options, readOptions, executionOptions, new LabelGenerator("xx", false), 0); + + List bufferList = new ArrayList<>(); + BatchRecordBuffer recordBuffer = + new BatchRecordBuffer("db", "tbl", "\n".getBytes(StandardCharsets.UTF_8), 0); + recordBuffer.insert("doris,2".getBytes(StandardCharsets.UTF_8)); + recordBuffer.setLabelName("label2"); + BatchRecordBuffer buffer = + new BatchRecordBuffer("db", "tbl", "\n".getBytes(StandardCharsets.UTF_8), 0); + buffer.insert("doris,1".getBytes(StandardCharsets.UTF_8)); + buffer.setLabelName("label1"); + + boolean flag = loader.mergeBuffer(bufferList, buffer); + Assert.assertEquals(false, flag); + + bufferList.add(buffer); + bufferList.add(recordBuffer); + flag = loader.mergeBuffer(bufferList, buffer); + Assert.assertEquals(true, flag); + byte[] bytes = mergeByteArrays(buffer.getBuffer()); + Assert.assertArrayEquals(bytes, "doris,1\ndoris,2".getBytes(StandardCharsets.UTF_8)); + + // multi table + bufferList.clear(); + bufferList.add(buffer); + BatchRecordBuffer recordBuffer2 = + new BatchRecordBuffer("db", "tbl2", "\n".getBytes(StandardCharsets.UTF_8), 0); + recordBuffer2.insert("doris,3".getBytes(StandardCharsets.UTF_8)); + recordBuffer2.setLabelName("label3"); + bufferList.add(recordBuffer2); + flag = loader.mergeBuffer(bufferList, buffer); + Assert.assertEquals(false, flag); + } }