diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 8f3cc240d..7ad8ba971 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -40,8 +40,8 @@ public class DorisExecutionOptions implements Serializable { private static final int DEFAULT_BUFFER_COUNT = 3; // batch flush private static final int DEFAULT_FLUSH_QUEUE_SIZE = 2; - private static final int DEFAULT_BUFFER_FLUSH_MAX_ROWS = 50000; - private static final int DEFAULT_BUFFER_FLUSH_MAX_BYTES = 10 * 1024 * 1024; + private static final int DEFAULT_BUFFER_FLUSH_MAX_ROWS = 500000; + private static final int DEFAULT_BUFFER_FLUSH_MAX_BYTES = 100 * 1024 * 1024; private static final long DEFAULT_BUFFER_FLUSH_INTERVAL_MS = 10 * 1000; private final int checkInterval; private final int maxRetries; @@ -358,9 +358,6 @@ public Builder setFlushQueueSize(int flushQueueSize) { } public Builder setBufferFlushIntervalMs(long bufferFlushIntervalMs) { - Preconditions.checkState( - bufferFlushIntervalMs >= 1000, - "bufferFlushIntervalMs must be greater than or equal to 1 second"); this.bufferFlushIntervalMs = bufferFlushIntervalMs; return this; } @@ -397,6 +394,19 @@ public DorisExecutionOptions build() { && JSON.equals(streamLoadProp.getProperty(FORMAT_KEY))) { streamLoadProp.put(READ_JSON_BY_LINE, true); } + + Preconditions.checkArgument( + bufferFlushIntervalMs >= 1000, + "bufferFlushIntervalMs must be greater than or equal to 1 second"); + + Preconditions.checkArgument( + bufferFlushMaxRows >= 10000, + "bufferFlushMaxRows must be greater than or equal to 10000"); + + Preconditions.checkArgument( + bufferFlushMaxBytes >= 10485760, + "bufferFlushMaxBytes must be greater than or equal to 10485760(10MB)"); + return new DorisExecutionOptions( checkInterval, maxRetries, diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java new file mode 100644 index 000000000..3c0068eb4 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.batch; + +import org.apache.http.entity.AbstractHttpEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + +public class BatchBufferHttpEntity extends AbstractHttpEntity { + + private static final Logger LOG = LoggerFactory.getLogger(BatchBufferHttpEntity.class); + protected static final int OUTPUT_BUFFER_SIZE = 4096; + private final List buffer; + private final long contentLength; + + public BatchBufferHttpEntity(BatchRecordBuffer recordBuffer) { + this.buffer = recordBuffer.getBuffer(); + this.contentLength = recordBuffer.getBufferSizeBytes(); + } + + @Override + public boolean isRepeatable() { + return true; + } + + @Override + public boolean isChunked() { + return false; + } + + @Override + public long getContentLength() { + return contentLength; + } + + @Override + public InputStream getContent() { + return new BatchBufferStream(buffer); + } + + @Override + public void writeTo(OutputStream outStream) throws IOException { + try (InputStream inStream = new BatchBufferStream(buffer)) { + final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE]; + int readLen; + while ((readLen = inStream.read(buffer)) != -1) { + outStream.write(buffer, 0, readLen); + } + } + } + + @Override + public boolean isStreaming() { + return false; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java new file mode 100644 index 000000000..a782bb53f --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.batch; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; + +public class BatchBufferStream extends InputStream { + private final Iterator iterator; + private byte[] currentRow; + private int currentPos; + + public BatchBufferStream(List buffer) { + this.iterator = buffer.iterator(); + } + + @Override + public int read() throws IOException { + return 0; + } + + @Override + public int read(byte[] buf) throws IOException { + return read(buf, 0, buf.length); + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + if (!iterator.hasNext() && currentRow == null) { + return -1; + } + + byte[] item = currentRow; + int pos = currentPos; + int readBytes = 0; + while (readBytes < len && (item != null || iterator.hasNext())) { + if (item == null) { + item = iterator.next(); + pos = 0; + } + + int size = Math.min(len - readBytes, item.length - pos); + System.arraycopy(item, pos, buf, off + readBytes, size); + readBytes += size; + pos += size; + + if (pos == item.length) { + item = null; + pos = 0; + } + } + currentRow = item; + currentPos = pos; + return readBytes; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java index df40e7a9b..8eb98037f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java @@ -17,80 +17,52 @@ package org.apache.doris.flink.sink.batch; -import org.apache.flink.annotation.VisibleForTesting; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; +import java.util.LinkedList; /** buffer to queue. */ public class BatchRecordBuffer { private static final Logger LOG = LoggerFactory.getLogger(BatchRecordBuffer.class); public static final String LINE_SEPARATOR = "\n"; private String labelName; - private ByteBuffer buffer; + private LinkedList buffer; private byte[] lineDelimiter; private int numOfRecords = 0; - private int bufferSizeBytes = 0; + private long bufferSizeBytes = 0; private boolean loadBatchFirstRecord = true; private String database; private String table; + private final long createTime = System.currentTimeMillis(); + private long retainTime = 0; - public BatchRecordBuffer() {} - - public BatchRecordBuffer(byte[] lineDelimiter, int bufferSize) { - super(); - this.lineDelimiter = lineDelimiter; - this.buffer = ByteBuffer.allocate(bufferSize); + public BatchRecordBuffer() { + this.buffer = new LinkedList<>(); } - public BatchRecordBuffer(String database, String table, byte[] lineDelimiter, int bufferSize) { + public BatchRecordBuffer(String database, String table, byte[] lineDelimiter, long retainTime) { super(); this.database = database; this.table = table; this.lineDelimiter = lineDelimiter; - this.buffer = ByteBuffer.allocate(bufferSize); + this.buffer = new LinkedList<>(); + this.retainTime = retainTime; } - public void insert(byte[] record) { - ensureCapacity(record.length); + public int insert(byte[] record) { + int recordSize = record.length; if (loadBatchFirstRecord) { loadBatchFirstRecord = false; } else if (lineDelimiter != null) { - this.buffer.put(this.lineDelimiter); + this.buffer.add(this.lineDelimiter); + setBufferSizeBytes(this.bufferSizeBytes + this.lineDelimiter.length); + recordSize += this.lineDelimiter.length; } - this.buffer.put(record); - setNumOfRecords(getNumOfRecords() + 1); - setBufferSizeBytes(getBufferSizeBytes() + record.length); - } - - @VisibleForTesting - public void ensureCapacity(int length) { - int lineDelimiterSize = this.lineDelimiter == null ? 0 : this.lineDelimiter.length; - if (buffer.remaining() - lineDelimiterSize >= length) { - return; - } - int currentRemain = buffer.remaining(); - int currentCapacity = buffer.capacity(); - // add lineDelimiter length - int needed = length - buffer.remaining() + lineDelimiterSize; - // grow at least 1MB - long grow = Math.max(needed, 1024 * 1024); - // grow at least 50% of the current size - grow = Math.max(buffer.capacity() / 2, grow); - int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.capacity() + grow); - ByteBuffer tmp = ByteBuffer.allocate(newCapacity); - buffer.flip(); - tmp.put(buffer); - buffer.clear(); - buffer = tmp; - LOG.info( - "record length {},buffer remain {} ,grow capacity {} to {}", - length, - currentRemain, - currentCapacity, - newCapacity); + this.buffer.add(record); + setNumOfRecords(this.numOfRecords + 1); + setBufferSizeBytes(this.bufferSizeBytes + record.length); + return recordSize; } public String getLabelName() { @@ -106,13 +78,6 @@ public boolean isEmpty() { return numOfRecords == 0; } - public ByteBuffer getData() { - // change mode - buffer.flip(); - LOG.debug("flush buffer: {} records, {} bytes", getNumOfRecords(), getBufferSizeBytes()); - return buffer; - } - public void clear() { this.buffer.clear(); this.numOfRecords = 0; @@ -121,7 +86,7 @@ public void clear() { this.loadBatchFirstRecord = true; } - public ByteBuffer getBuffer() { + public LinkedList getBuffer() { return buffer; } @@ -131,7 +96,7 @@ public int getNumOfRecords() { } /** @return Buffer size in bytes */ - public int getBufferSizeBytes() { + public long getBufferSizeBytes() { return bufferSizeBytes; } @@ -141,7 +106,7 @@ public void setNumOfRecords(int numOfRecords) { } /** @param bufferSizeBytes Updates sum of size of records present in this buffer (Bytes) */ - public void setBufferSizeBytes(int bufferSizeBytes) { + public void setBufferSizeBytes(long bufferSizeBytes) { this.bufferSizeBytes = bufferSizeBytes; } @@ -160,4 +125,22 @@ public String getTable() { public void setTable(String table) { this.table = table; } + + public String getTableIdentifier() { + if (database != null && table != null) { + return database + "." + table; + } + return null; + } + + public byte[] getLineDelimiter() { + return lineDelimiter; + } + + public boolean shouldFlush() { + // When the buffer create time is later than the first interval trigger, + // the write will not be triggered in the next interval, + // so multiply it by 1.5 to trigger it as early as possible. + return (System.currentTimeMillis() - createTime) * 1.5 > retainTime; + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 2dd7a50e8..3240dafe4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -36,7 +36,6 @@ import org.apache.doris.flink.sink.writer.LabelGenerator; import org.apache.http.client.entity.GzipCompressingEntity; import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; @@ -45,7 +44,6 @@ import java.io.IOException; import java.io.Serializable; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -61,7 +59,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT; import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; @@ -82,6 +84,8 @@ public class DorisBatchStreamLoad implements Serializable { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); + private static final long STREAM_LOAD_MAX_BYTES = 10 * 1024 * 1024 * 1024L; // 10 GB + private static final long STREAM_LOAD_MAX_ROWS = Integer.MAX_VALUE; private final LabelGenerator labelGenerator; private final byte[] lineDelimiter; private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load"; @@ -103,6 +107,10 @@ public class DorisBatchStreamLoad implements Serializable { private boolean enableGroupCommit; private boolean enableGzCompress; private int subTaskId; + private long maxBlockedBytes; + private final AtomicLong currentCacheBytes = new AtomicLong(0L); + private final Lock lock = new ReentrantLock(); + private final Condition block = lock.newCondition(); public DorisBatchStreamLoad( DorisOptions dorisOptions, @@ -137,6 +145,10 @@ public DorisBatchStreamLoad( this.enableGzCompress = loadProps.getProperty(COMPRESS_TYPE, "").equals(COMPRESS_TYPE_GZ); this.executionOptions = executionOptions; this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize()); + // maxBlockedBytes ensures that a buffer can be written even if the queue is full + this.maxBlockedBytes = + (long) executionOptions.getBufferFlushMaxBytes() + * (executionOptions.getFlushQueueSize() + 1); if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) { String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); Preconditions.checkState( @@ -144,7 +156,7 @@ public DorisBatchStreamLoad( "tableIdentifier input error, the format is database.table"); this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, tableInfo[0], tableInfo[1]); } - this.loadAsyncExecutor = new LoadAsyncExecutor(); + this.loadAsyncExecutor = new LoadAsyncExecutor(executionOptions.getFlushQueueSize()); this.loadExecutorService = new ThreadPoolExecutor( 1, @@ -165,10 +177,10 @@ public DorisBatchStreamLoad( * @param record * @throws IOException */ - public synchronized void writeRecord(String database, String table, byte[] record) - throws InterruptedException { + public synchronized void writeRecord(String database, String table, byte[] record) { checkFlushException(); String bufferKey = getTableIdentifier(database, table); + BatchRecordBuffer buffer = bufferMap.computeIfAbsent( bufferKey, @@ -177,30 +189,91 @@ public synchronized void writeRecord(String database, String table, byte[] recor database, table, this.lineDelimiter, - executionOptions.getBufferFlushMaxBytes())); - buffer.insert(record); - // When it exceeds 80% of the byteSize,to flush, to avoid triggering bytebuffer expansion - if (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() * 0.8 - || (executionOptions.getBufferFlushMaxRows() != 0 - && buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) { - flush(bufferKey, false); + executionOptions.getBufferFlushIntervalMs())); + + int bytes = buffer.insert(record); + currentCacheBytes.addAndGet(bytes); + if (currentCacheBytes.get() > maxBlockedBytes) { + lock.lock(); + try { + while (currentCacheBytes.get() >= maxBlockedBytes) { + LOG.info( + "Cache full, waiting for flush, currentBytes: {}, maxBlockedBytes: {}", + currentCacheBytes.get(), + maxBlockedBytes); + block.await(1, TimeUnit.SECONDS); + } + } catch (InterruptedException e) { + this.exception.set(e); + throw new RuntimeException(e); + } finally { + lock.unlock(); + } } + + // queue has space, flush according to the bufferMaxRows/bufferMaxBytes + if (flushQueue.size() < executionOptions.getFlushQueueSize() + && (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() + || buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) { + boolean flush = bufferFullFlush(bufferKey); + LOG.info("trigger flush by buffer full, flush: {}", flush); + + } else if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES + || buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) { + // The buffer capacity exceeds the stream load limit, flush + boolean flush = bufferFullFlush(bufferKey); + LOG.info("trigger flush by buffer exceeding the limit, flush: {}", flush); + } + } + + public synchronized boolean bufferFullFlush(String bufferKey) { + return doFlush(bufferKey, false, true); + } + + public synchronized boolean intervalFlush() { + return doFlush(null, false, false); } - public synchronized void flush(String bufferKey, boolean waitUtilDone) - throws InterruptedException { + public synchronized boolean checkpointFlush() { + return doFlush(null, true, false); + } + + private synchronized boolean doFlush( + String bufferKey, boolean waitUtilDone, boolean bufferFull) { checkFlushException(); + if (waitUtilDone || bufferFull) { + boolean flush = flush(bufferKey, waitUtilDone); + return flush; + } else if (flushQueue.size() < executionOptions.getFlushQueueSize()) { + boolean flush = flush(bufferKey, false); + return flush; + } + return false; + } + + private synchronized boolean flush(String bufferKey, boolean waitUtilDone) { if (null == bufferKey) { + boolean flush = false; for (String key : bufferMap.keySet()) { - flushBuffer(key); + BatchRecordBuffer buffer = bufferMap.get(key); + if (waitUtilDone || buffer.shouldFlush()) { + // Ensure that the interval satisfies intervalMS + flushBuffer(key); + flush = true; + } + } + if (!waitUtilDone && !flush) { + return false; } } else if (bufferMap.containsKey(bufferKey)) { flushBuffer(bufferKey); + } else { + throw new DorisBatchLoadException("buffer not found for key: " + bufferKey); } - if (waitUtilDone) { waitAsyncLoadFinish(); } + return true; } private synchronized void flushBuffer(String bufferKey) { @@ -247,20 +320,96 @@ public void close() { this.flushQueue.clear(); } + @VisibleForTesting + public boolean mergeBuffer(List recordList, BatchRecordBuffer buffer) { + boolean merge = false; + if (recordList.size() > 1) { + boolean sameTable = + recordList.stream() + .map(BatchRecordBuffer::getTableIdentifier) + .distinct() + .count() + == 1; + // Buffers can be merged only if they belong to the same table. + if (sameTable) { + for (BatchRecordBuffer recordBuffer : recordList) { + if (recordBuffer != null + && recordBuffer.getLabelName() != null + && !buffer.getLabelName().equals(recordBuffer.getLabelName()) + && !recordBuffer.getBuffer().isEmpty()) { + merge(buffer, recordBuffer); + merge = true; + } + } + LOG.info( + "merge {} buffer to one stream load, result bufferBytes {}", + recordList.size(), + buffer.getBufferSizeBytes()); + } + } + return merge; + } + + private boolean merge(BatchRecordBuffer mergeBuffer, BatchRecordBuffer buffer) { + if (buffer.getBuffer().isEmpty()) { + return false; + } + if (!mergeBuffer.getBuffer().isEmpty()) { + mergeBuffer.getBuffer().add(mergeBuffer.getLineDelimiter()); + mergeBuffer.setBufferSizeBytes( + mergeBuffer.getBufferSizeBytes() + mergeBuffer.getLineDelimiter().length); + currentCacheBytes.addAndGet(buffer.getLineDelimiter().length); + } + mergeBuffer.getBuffer().addAll(buffer.getBuffer()); + mergeBuffer.setNumOfRecords(mergeBuffer.getNumOfRecords() + buffer.getNumOfRecords()); + mergeBuffer.setBufferSizeBytes( + mergeBuffer.getBufferSizeBytes() + buffer.getBufferSizeBytes()); + return true; + } + class LoadAsyncExecutor implements Runnable { + + private int flushQueueSize; + + public LoadAsyncExecutor(int flushQueueSize) { + this.flushQueueSize = flushQueueSize; + } + @Override public void run() { LOG.info("LoadAsyncExecutor start"); loadThreadAlive = true; + List recordList = new ArrayList<>(flushQueueSize); while (started.get()) { - BatchRecordBuffer buffer = null; + recordList.clear(); try { - buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS); - if (buffer == null) { + BatchRecordBuffer buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS); + if (buffer == null || buffer.getLabelName() == null) { + // label is empty and does not need to load. It is the flag of waitUtilDone continue; } - if (buffer.getLabelName() != null) { - load(buffer.getLabelName(), buffer); + recordList.add(buffer); + boolean merge = false; + if (!flushQueue.isEmpty()) { + flushQueue.drainTo(recordList, flushQueueSize - 1); + if (mergeBuffer(recordList, buffer)) { + load(buffer.getLabelName(), buffer); + merge = true; + } + } + + if (!merge) { + for (BatchRecordBuffer bf : recordList) { + if (bf == null || bf.getLabelName() == null) { + continue; + } + load(bf.getLabelName(), bf); + } + } + + if (flushQueue.size() < flushQueueSize) { + // Avoid waiting for 2 rounds of intervalMs + doFlush(null, false, false); } } catch (Exception e) { LOG.error("worker running error", e); @@ -280,9 +429,8 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { label = null; } refreshLoadUrl(buffer.getDatabase(), buffer.getTable()); - ByteBuffer data = buffer.getData(); - ByteArrayEntity entity = - new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit()); + + BatchBufferHttpEntity entity = new BatchBufferHttpEntity(buffer); HttpPutBuilder putBuilder = new HttpPutBuilder(); putBuilder .setUrl(loadUrl) @@ -321,6 +469,18 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { respContent.getErrorURL()); throw new DorisBatchLoadException(errMsg); } else { + long cacheByteBeforeFlush = + currentCacheBytes.getAndAdd(-respContent.getLoadBytes()); + LOG.info( + "load success, cacheBeforeFlushBytes: {}, currentCacheBytes : {}", + cacheByteBeforeFlush, + currentCacheBytes.get()); + lock.lock(); + try { + block.signal(); + } finally { + lock.unlock(); + } return; } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java index 6fbde55de..db486bcb8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java @@ -107,9 +107,9 @@ public void initializeLoad() { private void intervalFlush() { try { - LOG.info("interval flush triggered."); - batchStreamLoad.flush(null, false); - } catch (InterruptedException e) { + boolean flush = batchStreamLoad.intervalFlush(); + LOG.debug("interval flush trigger, flush: {}", flush); + } catch (Exception e) { flushException = e; } } @@ -125,7 +125,7 @@ public void flush(boolean flush) throws IOException, InterruptedException { checkFlushException(); writeOneDorisRecord(serializer.flush()); LOG.info("checkpoint flush triggered."); - batchStreamLoad.flush(null, true); + batchStreamLoad.checkpointFlush(); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchRecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchRecordBuffer.java new file mode 100644 index 000000000..e5f4c4ebb --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchRecordBuffer.java @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.copy; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; + +/** buffer to queue. */ +public class BatchRecordBuffer { + private static final Logger LOG = LoggerFactory.getLogger(BatchRecordBuffer.class); + public static final String LINE_SEPARATOR = "\n"; + private String labelName; + private ByteBuffer buffer; + private byte[] lineDelimiter; + private int numOfRecords = 0; + private int bufferSizeBytes = 0; + private boolean loadBatchFirstRecord = true; + private String database; + private String table; + + public BatchRecordBuffer() {} + + public BatchRecordBuffer(byte[] lineDelimiter, int bufferSize) { + super(); + this.lineDelimiter = lineDelimiter; + this.buffer = ByteBuffer.allocate(bufferSize); + } + + public BatchRecordBuffer(String database, String table, byte[] lineDelimiter, int bufferSize) { + super(); + this.database = database; + this.table = table; + this.lineDelimiter = lineDelimiter; + this.buffer = ByteBuffer.allocate(bufferSize); + } + + public void insert(byte[] record) { + ensureCapacity(record.length); + if (loadBatchFirstRecord) { + loadBatchFirstRecord = false; + } else if (lineDelimiter != null) { + this.buffer.put(this.lineDelimiter); + } + this.buffer.put(record); + setNumOfRecords(getNumOfRecords() + 1); + setBufferSizeBytes(getBufferSizeBytes() + record.length); + } + + @VisibleForTesting + public void ensureCapacity(int length) { + int lineDelimiterSize = this.lineDelimiter == null ? 0 : this.lineDelimiter.length; + if (buffer.remaining() - lineDelimiterSize >= length) { + return; + } + int currentRemain = buffer.remaining(); + int currentCapacity = buffer.capacity(); + // add lineDelimiter length + int needed = length - buffer.remaining() + lineDelimiterSize; + // grow at least 1MB + long grow = Math.max(needed, 1024 * 1024); + // grow at least 50% of the current size + grow = Math.max(buffer.capacity() / 2, grow); + int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.capacity() + grow); + ByteBuffer tmp = ByteBuffer.allocate(newCapacity); + buffer.flip(); + tmp.put(buffer); + buffer.clear(); + buffer = tmp; + LOG.info( + "record length {},buffer remain {} ,grow capacity {} to {}", + length, + currentRemain, + currentCapacity, + newCapacity); + } + + public String getLabelName() { + return labelName; + } + + public void setLabelName(String labelName) { + this.labelName = labelName; + } + + /** @return true if buffer is empty */ + public boolean isEmpty() { + return numOfRecords == 0; + } + + public ByteBuffer getData() { + // change mode + buffer.flip(); + LOG.debug("flush buffer: {} records, {} bytes", getNumOfRecords(), getBufferSizeBytes()); + return buffer; + } + + public void clear() { + this.buffer.clear(); + this.numOfRecords = 0; + this.bufferSizeBytes = 0; + this.labelName = null; + this.loadBatchFirstRecord = true; + } + + public ByteBuffer getBuffer() { + return buffer; + } + + /** @return Number of records in this buffer */ + public int getNumOfRecords() { + return numOfRecords; + } + + /** @return Buffer size in bytes */ + public int getBufferSizeBytes() { + return bufferSizeBytes; + } + + /** @param numOfRecords Updates number of records (Usually by 1) */ + public void setNumOfRecords(int numOfRecords) { + this.numOfRecords = numOfRecords; + } + + /** @param bufferSizeBytes Updates sum of size of records present in this buffer (Bytes) */ + public void setBufferSizeBytes(int bufferSizeBytes) { + this.bufferSizeBytes = bufferSizeBytes; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java index be8adcb07..2c5ed5c27 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java @@ -28,7 +28,6 @@ import org.apache.doris.flink.sink.EscapeHandler; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; -import org.apache.doris.flink.sink.batch.BatchRecordBuffer; import org.apache.doris.flink.sink.writer.LabelGenerator; import org.apache.http.Header; import org.apache.http.HttpEntity; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index 02e59084c..2a1c9b1a4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -275,14 +275,14 @@ public class DorisConfigOptions { public static final ConfigOption SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions.key("sink.buffer-flush.max-rows") .intType() - .defaultValue(50000) + .defaultValue(500000) .withDescription( "The maximum number of flush items in each batch, the default is 5w"); public static final ConfigOption SINK_BUFFER_FLUSH_MAX_BYTES = ConfigOptions.key("sink.buffer-flush.max-bytes") .memoryType() - .defaultValue(MemorySize.parse("10mb")) + .defaultValue(MemorySize.parse("100mb")) .withDescription( "The maximum number of bytes flushed in each batch, the default is 10MB"); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java index 9cc197166..bc19c5725 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java @@ -52,9 +52,9 @@ public void testEquals() { .setWriteMode(WriteMode.STREAM_LOAD) .setLabelPrefix("doris") .enable2PC() - .setBufferFlushMaxBytes(10) + .setBufferFlushMaxBytes(10485760) .setBufferFlushIntervalMs(10000) - .setBufferFlushMaxRows(12) + .setBufferFlushMaxRows(10000) .setCheckInterval(10) .setIgnoreCommitError(true) .setDeletable(true) @@ -72,9 +72,9 @@ public void testEquals() { .setWriteMode(WriteMode.STREAM_LOAD) .setLabelPrefix("doris") .enable2PC() - .setBufferFlushMaxBytes(10) + .setBufferFlushMaxBytes(10485760) .setBufferFlushIntervalMs(10000) - .setBufferFlushMaxRows(12) + .setBufferFlushMaxRows(10000) .setCheckInterval(10) .setIgnoreCommitError(true) .setDeletable(true) @@ -111,17 +111,17 @@ public void testEquals() { Assert.assertNotEquals(exceptOptions, builder.build()); builder.enable2PC(); - builder.setBufferFlushMaxBytes(11); + builder.setBufferFlushMaxBytes(104857601); Assert.assertNotEquals(exceptOptions, builder.build()); - builder.setBufferFlushMaxBytes(10); + builder.setBufferFlushMaxBytes(10485760); builder.setBufferFlushIntervalMs(100001); Assert.assertNotEquals(exceptOptions, builder.build()); builder.setBufferFlushIntervalMs(10000); - builder.setBufferFlushMaxRows(2); + builder.setBufferFlushMaxRows(10000); Assert.assertNotEquals(exceptOptions, builder.build()); - builder.setBufferFlushMaxRows(12); + builder.setBufferFlushMaxRows(10000); builder.setCheckInterval(11); Assert.assertNotEquals(exceptOptions, builder.build()); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index aa3d00dae..de0ef0413 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -208,9 +208,9 @@ public void testTableBatch() throws Exception { + " 'sink.enable.batch-mode' = 'true'," + " 'sink.enable-delete' = 'true'," + " 'sink.flush.queue-size' = '2'," - + " 'sink.buffer-flush.max-rows' = '1'," - + " 'sink.buffer-flush.max-bytes' = '5'," - + " 'sink.buffer-flush.interval' = '10s'" + + " 'sink.buffer-flush.max-rows' = '10000'," + + " 'sink.buffer-flush.max-bytes' = '10MB'," + + " 'sink.buffer-flush.interval' = '1s'" + ")", getFenodes(), DATABASE + "." + TABLE_CSV_BATCH_TBL, @@ -219,7 +219,7 @@ public void testTableBatch() throws Exception { tEnv.executeSql(sinkDDL); tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all SELECT 'flink',2"); - Thread.sleep(10000); + Thread.sleep(20000); List expected = Arrays.asList("doris,1", "flink,2"); String query = String.format( @@ -248,8 +248,9 @@ public void testDataStreamBatch() throws Exception { executionBuilder .setLabelPrefix(UUID.randomUUID().toString()) .setStreamLoadProp(properties) - .setBufferFlushMaxBytes(1) - .setBufferFlushMaxRows(10); + .setBufferFlushMaxBytes(10485760) + .setBufferFlushMaxRows(10000) + .setBufferFlushIntervalMs(1000); builder.setDorisExecutionOptions(executionBuilder.build()) .setSerializer(new SimpleStringSerializer()) @@ -258,7 +259,7 @@ public void testDataStreamBatch() throws Exception { env.fromElements("doris,1", "flink,2").sinkTo(builder.build()); env.execute(); - Thread.sleep(10000); + Thread.sleep(20000); List expected = Arrays.asList("doris,1", "flink,2"); String query = String.format( @@ -295,9 +296,9 @@ public void testTableGroupCommit() throws Exception { + " 'sink.enable.batch-mode' = 'true'," + " 'sink.enable-delete' = 'true'," + " 'sink.flush.queue-size' = '2'," - + " 'sink.buffer-flush.max-rows' = '3'," - + " 'sink.buffer-flush.max-bytes' = '5000'," - + " 'sink.buffer-flush.interval' = '10s'" + + " 'sink.buffer-flush.max-rows' = '10000'," + + " 'sink.buffer-flush.max-bytes' = '10MB'," + + " 'sink.buffer-flush.interval' = '1s'" + ")", getFenodes(), DATABASE + "." + TABLE_GROUP_COMMIT, diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferHttpEntity.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferHttpEntity.java new file mode 100644 index 000000000..fe20c5442 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferHttpEntity.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.batch; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestBatchBufferHttpEntity { + + @Test + public void testWrite() throws Exception { + BatchRecordBuffer recordBuffer = TestBatchBufferStream.mockBuffer(); + byte[] expectedData = TestBatchBufferStream.mergeByteArrays(recordBuffer.getBuffer()); + Assert.assertEquals(recordBuffer.getNumOfRecords(), 1000); + + BatchBufferHttpEntity entity = new BatchBufferHttpEntity(recordBuffer); + assertTrue(entity.isRepeatable()); + assertFalse(entity.isStreaming()); + assertEquals(entity.getContentLength(), expectedData.length); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + entity.writeTo(outputStream); + assertArrayEquals(expectedData, outputStream.toByteArray()); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferStream.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferStream.java new file mode 100644 index 000000000..3dad5b60a --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferStream.java @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.batch; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Random; +import java.util.UUID; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestBatchBufferStream { + + @Test + public void testRead() throws Exception { + BatchRecordBuffer recordBuffer = mockBuffer(); + byte[] expectedData = mergeByteArrays(recordBuffer.getBuffer()); + Assert.assertEquals(recordBuffer.getNumOfRecords(), 1000); + + byte[] actualData = new byte[(int) recordBuffer.getBufferSizeBytes()]; + try (BatchBufferStream inputStream = new BatchBufferStream(recordBuffer.getBuffer())) { + int len = inputStream.read(actualData, 0, actualData.length); + assertEquals(actualData.length, len); + assertArrayEquals(expectedData, actualData); + } + } + + @Test + public void testReadBufLen() throws Exception { + BatchRecordBuffer recordBuffer = mockBuffer(); + byte[] expectedData = mergeByteArrays(recordBuffer.getBuffer()); + Assert.assertEquals(recordBuffer.getNumOfRecords(), 1000); + + byte[] actualData = new byte[(int) recordBuffer.getBufferSizeBytes()]; + try (BatchBufferStream inputStream = new BatchBufferStream(recordBuffer.getBuffer())) { + int pos = 0; + while (pos < actualData.length) { + // mock random length + int maxLen = new Random().nextInt(actualData.length - pos) + 1; + int len = inputStream.read(actualData, pos, maxLen); + if (len == -1) { + break; + } + assertTrue(len > 0 && len <= maxLen); + pos += len; + } + assertEquals(actualData.length, pos); + assertArrayEquals(expectedData, actualData); + } + } + + public static BatchRecordBuffer mockBuffer() { + BatchRecordBuffer recordBuffer = new BatchRecordBuffer(); + for (int i = 0; i < 1000; i++) { + recordBuffer.insert((UUID.randomUUID() + "," + i).getBytes()); + } + return recordBuffer; + } + + public static byte[] mergeByteArrays(List listOfByteArrays) { + int totalLength = 0; + for (byte[] byteArray : listOfByteArrays) { + totalLength += byteArray.length; + } + + byte[] mergedArray = new byte[totalLength]; + + int currentPosition = 0; + for (byte[] byteArray : listOfByteArrays) { + System.arraycopy(byteArray, 0, mergedArray, currentPosition, byteArray.length); + currentPosition += byteArray.length; + } + + return mergedArray; + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java index d52149d63..62d84c990 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java @@ -41,9 +41,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; @@ -84,8 +88,10 @@ public void testInit() { @Test public void testLoadFail() throws Exception { + LOG.info("testLoadFail start"); DorisReadOptions readOptions = DorisReadOptions.builder().build(); - DorisExecutionOptions executionOptions = DorisExecutionOptions.builder().build(); + DorisExecutionOptions executionOptions = + DorisExecutionOptions.builder().setBufferFlushIntervalMs(1000).build(); DorisOptions options = DorisOptions.builder() .setFenodes("127.0.0.1:1") @@ -104,7 +110,7 @@ public void testLoadFail() throws Exception { () -> loader.isLoadThreadAlive(), Deadline.fromNow(Duration.ofSeconds(10)), 100L, - "Condition was not met in given timeout."); + "testLoadFail wait loader start failed."); Assert.assertTrue(loader.isLoadThreadAlive()); BackendUtil backendUtil = mock(BackendUtil.class); HttpClientBuilder httpClientBuilder = mock(HttpClientBuilder.class); @@ -118,17 +124,25 @@ public void testLoadFail() throws Exception { when(httpClientBuilder.build()).thenReturn(httpClient); when(httpClient.execute(any())).thenReturn(response); loader.writeRecord("db", "tbl", "1,data".getBytes()); - loader.flush("db.tbl", true); + loader.checkpointFlush(); + TestUtil.waitUntilCondition( + () -> !loader.isLoadThreadAlive(), + Deadline.fromNow(Duration.ofSeconds(20)), + 100L, + "testLoadFail wait loader exit failed." + loader.isLoadThreadAlive()); AtomicReference exception = loader.getException(); Assert.assertTrue(exception.get() instanceof Exception); Assert.assertTrue(exception.get().getMessage().contains("stream load error")); + LOG.info("testLoadFail end"); } @Test public void testLoadError() throws Exception { + LOG.info("testLoadError start"); DorisReadOptions readOptions = DorisReadOptions.builder().build(); - DorisExecutionOptions executionOptions = DorisExecutionOptions.builder().build(); + DorisExecutionOptions executionOptions = + DorisExecutionOptions.builder().setBufferFlushIntervalMs(1000).build(); DorisOptions options = DorisOptions.builder() .setFenodes("127.0.0.1:1") @@ -148,7 +162,7 @@ public void testLoadError() throws Exception { () -> loader.isLoadThreadAlive(), Deadline.fromNow(Duration.ofSeconds(10)), 100L, - "Condition was not met in given timeout."); + "testLoadError wait loader start failed."); Assert.assertTrue(loader.isLoadThreadAlive()); BackendUtil backendUtil = mock(BackendUtil.class); HttpClientBuilder httpClientBuilder = mock(HttpClientBuilder.class); @@ -161,12 +175,17 @@ public void testLoadError() throws Exception { when(httpClientBuilder.build()).thenReturn(httpClient); when(httpClient.execute(any())).thenReturn(response); loader.writeRecord("db", "tbl", "1,data".getBytes()); - loader.flush("db.tbl", true); + loader.checkpointFlush(); + TestUtil.waitUntilCondition( + () -> !loader.isLoadThreadAlive(), + Deadline.fromNow(Duration.ofSeconds(20)), + 100L, + "testLoadError wait loader exit failed." + loader.isLoadThreadAlive()); AtomicReference exception = loader.getException(); - Assert.assertTrue(exception.get() instanceof Exception); Assert.assertTrue(exception.get().getMessage().contains("stream load error")); + LOG.info("testLoadError end"); } @After @@ -175,4 +194,51 @@ public void after() { backendUtilMockedStatic.close(); } } + + @Test + public void mergeBufferTest() { + DorisReadOptions readOptions = DorisReadOptions.builder().build(); + DorisExecutionOptions executionOptions = DorisExecutionOptions.builder().build(); + DorisOptions options = + DorisOptions.builder() + .setFenodes("127.0.0.1:8030") + .setBenodes("127.0.0.1:9030") + .setTableIdentifier("db.tbl") + .build(); + + DorisBatchStreamLoad loader = + new DorisBatchStreamLoad( + options, readOptions, executionOptions, new LabelGenerator("xx", false), 0); + + List bufferList = new ArrayList<>(); + BatchRecordBuffer recordBuffer = + new BatchRecordBuffer("db", "tbl", "\n".getBytes(StandardCharsets.UTF_8), 0); + recordBuffer.insert("doris,2".getBytes(StandardCharsets.UTF_8)); + recordBuffer.setLabelName("label2"); + BatchRecordBuffer buffer = + new BatchRecordBuffer("db", "tbl", "\n".getBytes(StandardCharsets.UTF_8), 0); + buffer.insert("doris,1".getBytes(StandardCharsets.UTF_8)); + buffer.setLabelName("label1"); + + boolean flag = loader.mergeBuffer(bufferList, buffer); + Assert.assertEquals(false, flag); + + bufferList.add(buffer); + bufferList.add(recordBuffer); + flag = loader.mergeBuffer(bufferList, buffer); + Assert.assertEquals(true, flag); + byte[] bytes = mergeByteArrays(buffer.getBuffer()); + Assert.assertArrayEquals(bytes, "doris,1\ndoris,2".getBytes(StandardCharsets.UTF_8)); + + // multi table + bufferList.clear(); + bufferList.add(buffer); + BatchRecordBuffer recordBuffer2 = + new BatchRecordBuffer("db", "tbl2", "\n".getBytes(StandardCharsets.UTF_8), 0); + recordBuffer2.insert("doris,3".getBytes(StandardCharsets.UTF_8)); + recordBuffer2.setLabelName("label3"); + bufferList.add(recordBuffer2); + flag = loader.mergeBuffer(bufferList, buffer); + Assert.assertEquals(false, flag); + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestBatchRecordBuffer.java similarity index 99% rename from flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java rename to flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestBatchRecordBuffer.java index 1a6897c9c..3107225a9 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestBatchRecordBuffer.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.flink.sink.batch; +package org.apache.doris.flink.sink.copy; import org.junit.Assert; import org.junit.Test; diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java index 05a93dc5f..56887d93f 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java @@ -135,7 +135,7 @@ public void testDorisSinkProperties() { properties.put("sink.use-cache", "true"); properties.put("sink.enable.batch-mode", "true"); properties.put("sink.flush.queue-size", "2"); - properties.put("sink.buffer-flush.max-rows", "1000"); + properties.put("sink.buffer-flush.max-rows", "10000"); properties.put("sink.buffer-flush.max-bytes", "10MB"); properties.put("sink.buffer-flush.interval", "10s"); properties.put("sink.ignore.update-before", "true"); @@ -169,7 +169,7 @@ public void testDorisSinkProperties() { .enable2PC() .setBufferFlushIntervalMs(10000) .setBufferFlushMaxBytes(10 * 1024 * 1024) - .setBufferFlushMaxRows(1000) + .setBufferFlushMaxRows(10000) .setFlushQueueSize(2) .setUseCache(true) .setIgnoreCommitError(false)