Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Aug 12, 2024
1 parent 4e6f61f commit 0a4a60a
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,53 @@ public void close() {
this.flushQueue.clear();
}

@VisibleForTesting
public boolean mergeBuffer(List<BatchRecordBuffer> recordList, BatchRecordBuffer buffer) {
boolean merge = false;
if (recordList.size() > 1) {
boolean sameTable =
recordList.stream()
.map(BatchRecordBuffer::getTableIdentifier)
.distinct()
.count()
== 1;
// Buffers can be merged only if they belong to the same table.
if (sameTable) {
for (BatchRecordBuffer recordBuffer : recordList) {
if (recordBuffer != null
&& recordBuffer.getLabelName() != null
&& !buffer.getLabelName().equals(recordBuffer.getLabelName())
&& !recordBuffer.getBuffer().isEmpty()) {
merge(buffer, recordBuffer);
merge = true;
}
}
LOG.info(
"merge {} buffer to one stream load, result bufferBytes {}",
recordList.size(),
buffer.getBufferSizeBytes());
}
}
return merge;
}

private boolean merge(BatchRecordBuffer mergeBuffer, BatchRecordBuffer buffer) {
if (buffer.getBuffer().isEmpty()) {
return false;
}
if (!mergeBuffer.getBuffer().isEmpty()) {
mergeBuffer.getBuffer().add(mergeBuffer.getLineDelimiter());
mergeBuffer.setBufferSizeBytes(
mergeBuffer.getBufferSizeBytes() + mergeBuffer.getLineDelimiter().length);
currentCacheBytes.addAndGet(buffer.getLineDelimiter().length);
}
mergeBuffer.getBuffer().addAll(buffer.getBuffer());
mergeBuffer.setNumOfRecords(mergeBuffer.getNumOfRecords() + buffer.getNumOfRecords());
mergeBuffer.setBufferSizeBytes(
mergeBuffer.getBufferSizeBytes() + buffer.getBufferSizeBytes());
return true;
}

class LoadAsyncExecutor implements Runnable {

private int flushQueueSize;
Expand All @@ -342,12 +389,24 @@ public void run() {
}

recordList.add(buffer);
boolean merge = false;
if (!flushQueue.isEmpty()) {
flushQueue.drainTo(recordList, flushQueueSize - 1);
mergeBuffer(recordList, buffer);
if (mergeBuffer(recordList, buffer)) {
load(buffer.getLabelName(), buffer);
merge = true;
}
}

if (!merge) {
for (BatchRecordBuffer bf : recordList) {
if (bf == null || bf.getLabelName() == null) {
continue;
}
load(bf.getLabelName(), bf);
}
}

load(buffer.getLabelName(), buffer);
if (flushQueue.isEmpty()) {
// Avoid waiting for 2 rounds of intervalMs
doFlush(null, false, false);
Expand All @@ -364,48 +423,6 @@ public void run() {
loadThreadAlive = false;
}

private void mergeBuffer(List<BatchRecordBuffer> recordList, BatchRecordBuffer buffer) {
if (recordList.size() > 1) {
boolean sameTable =
recordList.stream()
.map(BatchRecordBuffer::getTableIdentifier)
.distinct()
.count()
== 1;
// Buffers can be merged only if they belong to the same table.
if (sameTable) {
for (BatchRecordBuffer recordBuffer : recordList) {
if (recordBuffer.getLabelName() != null
&& !buffer.getLabelName().equals(recordBuffer.getLabelName())
&& !recordBuffer.getBuffer().isEmpty()) {
merge(buffer, recordBuffer);
}
}
LOG.info(
"merge {} buffer to one stream load, result bufferBytes {}",
recordList.size(),
buffer.getBufferSizeBytes());
}
}
}

public boolean merge(BatchRecordBuffer mergeBuffer, BatchRecordBuffer buffer) {
if (buffer.getBuffer().isEmpty()) {
return false;
}
if (!mergeBuffer.getBuffer().isEmpty()) {
mergeBuffer.getBuffer().add(mergeBuffer.getLineDelimiter());
mergeBuffer.setBufferSizeBytes(
mergeBuffer.getBufferSizeBytes() + mergeBuffer.getLineDelimiter().length);
currentCacheBytes.addAndGet(buffer.getLineDelimiter().length);
}
mergeBuffer.getBuffer().addAll(buffer.getBuffer());
mergeBuffer.setNumOfRecords(mergeBuffer.getNumOfRecords() + buffer.getNumOfRecords());
mergeBuffer.setBufferSizeBytes(
mergeBuffer.getBufferSizeBytes() + buffer.getBufferSizeBytes());
return true;
}

/** execute stream load. */
public void load(String label, BatchRecordBuffer buffer) throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
Expand Down Expand Up @@ -186,4 +190,51 @@ public void after() {
backendUtilMockedStatic.close();
}
}

@Test
public void mergeBufferTest() throws Exception {
DorisReadOptions readOptions = DorisReadOptions.builder().build();
DorisExecutionOptions executionOptions = DorisExecutionOptions.builder().build();
DorisOptions options =
DorisOptions.builder()
.setFenodes("127.0.0.1:8030")
.setBenodes("127.0.0.1:9030")
.setTableIdentifier("db.tbl")
.build();

DorisBatchStreamLoad loader =
new DorisBatchStreamLoad(
options, readOptions, executionOptions, new LabelGenerator("xx", false), 0);

List<BatchRecordBuffer> bufferList = new ArrayList<>();
BatchRecordBuffer recordBuffer =
new BatchRecordBuffer("db", "tbl", "\n".getBytes(StandardCharsets.UTF_8), 0);
recordBuffer.insert("doris,2".getBytes(StandardCharsets.UTF_8));
recordBuffer.setLabelName("label2");
BatchRecordBuffer buffer =
new BatchRecordBuffer("db", "tbl", "\n".getBytes(StandardCharsets.UTF_8), 0);
buffer.insert("doris,1".getBytes(StandardCharsets.UTF_8));
buffer.setLabelName("label1");

boolean flag = loader.mergeBuffer(bufferList, buffer);
Assert.assertEquals(false, flag);

bufferList.add(buffer);
bufferList.add(recordBuffer);
flag = loader.mergeBuffer(bufferList, buffer);
Assert.assertEquals(true, flag);
byte[] bytes = mergeByteArrays(buffer.getBuffer());
Assert.assertArrayEquals(bytes, "doris,1\ndoris,2".getBytes(StandardCharsets.UTF_8));

// multi table
bufferList.clear();
bufferList.add(buffer);
BatchRecordBuffer recordBuffer2 =
new BatchRecordBuffer("db", "tbl2", "\n".getBytes(StandardCharsets.UTF_8), 0);
recordBuffer2.insert("doris,3".getBytes(StandardCharsets.UTF_8));
recordBuffer2.setLabelName("label3");
bufferList.add(recordBuffer2);
flag = loader.mergeBuffer(bufferList, buffer);
Assert.assertEquals(false, flag);
}
}

0 comments on commit 0a4a60a

Please sign in to comment.