From 0f5b84d5fd80fc06cbacf58848bb420d30060dc2 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Fri, 9 Aug 2024 12:47:27 +0800 Subject: [PATCH] update --- .../flink/cfg/DorisExecutionOptions.java | 4 +- .../sink/batch/BatchBufferHttpEntity.java | 77 +++++++++ .../flink/sink/batch/BatchBufferStream.java | 74 ++++++++ .../flink/sink/batch/BatchRecordBuffer.java | 68 ++------ .../sink/batch/DorisBatchStreamLoad.java | 101 +++++++---- .../flink/sink/batch/DorisBatchWriter.java | 4 +- .../flink/sink/copy/BatchRecordBuffer.java | 163 ++++++++++++++++++ .../doris/flink/sink/copy/BatchStageLoad.java | 4 +- .../doris/flink/table/DorisConfigOptions.java | 4 +- .../sink/batch/TestDorisBatchStreamLoad.java | 15 +- .../TestBatchRecordBuffer.java | 2 +- 11 files changed, 415 insertions(+), 101 deletions(-) create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java create mode 100644 flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchRecordBuffer.java rename flink-doris-connector/src/test/java/org/apache/doris/flink/sink/{batch => copy}/TestBatchRecordBuffer.java (99%) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 8f3cc240d..09a467282 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -40,8 +40,8 @@ public class DorisExecutionOptions implements Serializable { private static final int DEFAULT_BUFFER_COUNT = 3; // batch flush private static final int DEFAULT_FLUSH_QUEUE_SIZE = 2; - private static final int DEFAULT_BUFFER_FLUSH_MAX_ROWS = 50000; - private static final int DEFAULT_BUFFER_FLUSH_MAX_BYTES = 10 * 1024 * 1024; + private static final int DEFAULT_BUFFER_FLUSH_MAX_ROWS = 500000; + private static final int DEFAULT_BUFFER_FLUSH_MAX_BYTES = 100 * 1024 * 1024; private static final long DEFAULT_BUFFER_FLUSH_INTERVAL_MS = 10 * 1000; private final int checkInterval; private final int maxRetries; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java new file mode 100644 index 000000000..a70d26323 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.batch; + +import org.apache.http.entity.AbstractHttpEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + +public class BatchBufferHttpEntity extends AbstractHttpEntity { + + private static final Logger LOG = LoggerFactory.getLogger(BatchBufferHttpEntity.class); + + protected static final int OUTPUT_BUFFER_SIZE = 4096; + private final List buffer; + private final long contentLength; + + public BatchBufferHttpEntity(BatchRecordBuffer recordBuffer) { + this.buffer = recordBuffer.getBuffer(); + this.contentLength = recordBuffer.getBufferSizeBytes(); + } + + @Override + public boolean isRepeatable() { + return true; + } + + @Override + public boolean isChunked() { + return false; + } + + @Override + public long getContentLength() { + return contentLength; + } + + @Override + public InputStream getContent() { + return new BatchBufferStream(buffer); + } + + @Override + public void writeTo(OutputStream outStream) throws IOException { + try (InputStream inStream = new BatchBufferStream(buffer)) { + final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE]; + int readLen; + while ((readLen = inStream.read(buffer)) != -1) { + outStream.write(buffer, 0, readLen); + } + } + } + + @Override + public boolean isStreaming() { + return false; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java new file mode 100644 index 000000000..e0d4dc374 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.batch; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.List; + +public class BatchBufferStream extends InputStream { + private final Iterator iterator; + private byte[] currentItem; + private int currentPos; + + public BatchBufferStream(List buffer) { + this.iterator = buffer.iterator(); + } + + @Override + public int read() throws IOException { + return 0; + } + + @Override + public int read(byte[] buf) throws IOException { + return read(buf, 0, buf.length); + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + if (!iterator.hasNext() && currentItem == null) { + return -1; + } + + byte[] item = currentItem; + int pos = currentPos; + int readBytes = 0; + while (readBytes < len && (item != null || iterator.hasNext())) { + if (item == null) { + item = iterator.next(); + pos = 0; + } + + int size = Math.min(len - readBytes, item.length - pos); + System.arraycopy(item, pos, buf, off + readBytes, size); + readBytes += size; + pos += size; + + if (pos == item.length) { + item = null; + pos = 0; + } + } + + currentItem = item; + currentPos = pos; + return readBytes; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java index 5869276d4..54b4b2abc 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java @@ -17,19 +17,17 @@ package org.apache.doris.flink.sink.batch; -import org.apache.flink.annotation.VisibleForTesting; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.ByteBuffer; +import java.util.LinkedList; /** buffer to queue. */ public class BatchRecordBuffer { private static final Logger LOG = LoggerFactory.getLogger(BatchRecordBuffer.class); public static final String LINE_SEPARATOR = "\n"; private String labelName; - private ByteBuffer buffer; + private LinkedList buffer; private byte[] lineDelimiter; private int numOfRecords = 0; private int bufferSizeBytes = 0; @@ -39,64 +37,30 @@ public class BatchRecordBuffer { private final long createTime = System.currentTimeMillis(); private long retainTime = 0; - public BatchRecordBuffer() {} - - public BatchRecordBuffer(byte[] lineDelimiter, int bufferSize) { - super(); - this.lineDelimiter = lineDelimiter; - this.buffer = ByteBuffer.allocate(bufferSize); + public BatchRecordBuffer() { + this.buffer = new LinkedList<>(); } - public BatchRecordBuffer( - String database, String table, byte[] lineDelimiter, int bufferSize, long retainTime) { + public BatchRecordBuffer(String database, String table, byte[] lineDelimiter, long retainTime) { super(); this.database = database; this.table = table; this.lineDelimiter = lineDelimiter; - this.buffer = ByteBuffer.allocate(bufferSize); + this.buffer = new LinkedList<>(); this.retainTime = retainTime; } public void insert(byte[] record) { - ensureCapacity(record.length); if (loadBatchFirstRecord) { loadBatchFirstRecord = false; } else if (lineDelimiter != null) { - this.buffer.put(this.lineDelimiter); + this.buffer.add(this.lineDelimiter); } - this.buffer.put(record); + this.buffer.add(record); setNumOfRecords(getNumOfRecords() + 1); setBufferSizeBytes(getBufferSizeBytes() + record.length); } - @VisibleForTesting - public void ensureCapacity(int length) { - int lineDelimiterSize = this.lineDelimiter == null ? 0 : this.lineDelimiter.length; - if (buffer.remaining() - lineDelimiterSize >= length) { - return; - } - int currentRemain = buffer.remaining(); - int currentCapacity = buffer.capacity(); - // add lineDelimiter length - int needed = length - buffer.remaining() + lineDelimiterSize; - // grow at least 1MB - long grow = Math.max(needed, 1024 * 1024); - // grow at least 50% of the current size - grow = Math.max(buffer.capacity() / 2, grow); - int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.capacity() + grow); - ByteBuffer tmp = ByteBuffer.allocate(newCapacity); - buffer.flip(); - tmp.put(buffer); - buffer.clear(); - buffer = tmp; - LOG.info( - "record length {},buffer remain {} ,grow capacity {} to {}", - length, - currentRemain, - currentCapacity, - newCapacity); - } - public String getLabelName() { return labelName; } @@ -110,13 +74,6 @@ public boolean isEmpty() { return numOfRecords == 0; } - public ByteBuffer getData() { - // change mode - buffer.flip(); - LOG.debug("flush buffer: {} records, {} bytes", getNumOfRecords(), getBufferSizeBytes()); - return buffer; - } - public void clear() { this.buffer.clear(); this.numOfRecords = 0; @@ -125,7 +82,7 @@ public void clear() { this.loadBatchFirstRecord = true; } - public ByteBuffer getBuffer() { + public LinkedList getBuffer() { return buffer; } @@ -165,6 +122,13 @@ public void setTable(String table) { this.table = table; } + public String getTableIdentifier() { + if (database != null && table != null) { + return database + "." + table; + } + return null; + } + public boolean shouldFlush() { return System.currentTimeMillis() - createTime > retainTime; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 5ce8685a2..c98ace43f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -36,7 +36,6 @@ import org.apache.doris.flink.sink.writer.LabelGenerator; import org.apache.http.client.entity.GzipCompressingEntity; import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; @@ -45,7 +44,6 @@ import java.io.IOException; import java.io.Serializable; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -103,7 +101,6 @@ public class DorisBatchStreamLoad implements Serializable { private boolean enableGroupCommit; private boolean enableGzCompress; private int subTaskId; - private final AtomicBoolean flushing = new AtomicBoolean(false); public DorisBatchStreamLoad( DorisOptions dorisOptions, @@ -145,7 +142,7 @@ public DorisBatchStreamLoad( "tableIdentifier input error, the format is database.table"); this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, tableInfo[0], tableInfo[1]); } - this.loadAsyncExecutor = new LoadAsyncExecutor(); + this.loadAsyncExecutor = new LoadAsyncExecutor(executionOptions.getFlushQueueSize()); this.loadExecutorService = new ThreadPoolExecutor( 1, @@ -178,32 +175,36 @@ public synchronized void writeRecord(String database, String table, byte[] recor database, table, this.lineDelimiter, - executionOptions.getBufferFlushMaxBytes(), executionOptions.getBufferFlushIntervalMs())); buffer.insert(record); - // When it exceeds 80% of the byteSize,to flush, to avoid triggering bytebuffer expansion - if (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() * 0.8 - || (executionOptions.getBufferFlushMaxRows() != 0 - && buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) { - boolean flush = doFlush(bufferKey, false, true); + if (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() + || buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows()) { + boolean flush = bufferFullFlush(bufferKey); LOG.info("trigger flush by buffer full, flush: {}", flush); } } - public synchronized boolean doFlush(String bufferKey, boolean waitUtilDone, boolean bufferFull) - throws InterruptedException { + public synchronized boolean bufferFullFlush(String bufferKey) throws InterruptedException { + return doFlush(bufferKey, false, true); + } + + public synchronized boolean intervalFlush() throws InterruptedException { + return doFlush(null, false, false); + } + + public synchronized boolean checkpointFlush() throws InterruptedException { + return doFlush(null, true, false); + } + + private synchronized boolean doFlush( + String bufferKey, boolean waitUtilDone, boolean bufferFull) { checkFlushException(); if (waitUtilDone || bufferFull) { flush(bufferKey, waitUtilDone); return true; - } else if (flushing.compareAndSet(false, true)) { - if (flushQueue.size() < executionOptions.getFlushQueueSize()) { - flush(bufferKey, false); - return true; - } else { - flushing.compareAndSet(true, false); - return false; - } + } else if (flushQueue.size() < executionOptions.getFlushQueueSize()) { + flush(bufferKey, false); + return true; } return false; } @@ -220,7 +221,7 @@ private synchronized void flush(String bufferKey, boolean waitUtilDone) { } } if (!waitUtilDone && !flush) { - flushing.compareAndSet(true, false); + return; } } else if (bufferMap.containsKey(bufferKey)) { flushBuffer(bufferKey); @@ -277,26 +278,35 @@ public void close() { } class LoadAsyncExecutor implements Runnable { + + private int flushQueueSize; + + public LoadAsyncExecutor(int flushQueueSize) { + this.flushQueueSize = flushQueueSize; + } + @Override public void run() { LOG.info("LoadAsyncExecutor start"); loadThreadAlive = true; + List recordList = new ArrayList<>(flushQueueSize); while (started.get()) { - BatchRecordBuffer buffer = null; + recordList.clear(); try { - buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS); - if (buffer == null) { + BatchRecordBuffer buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS); + if (buffer == null || buffer.getLabelName() == null) { + // label is empty and does not need to load. It is the flag of waitUtilDone continue; } - if (buffer.getLabelName() != null) { - load(buffer.getLabelName(), buffer); - } - if (flushQueue.size() < executionOptions.getFlushQueueSize()) { - flushing.compareAndSet(true, false); - if (flushQueue.isEmpty()) { - // Avoid waiting for 2 rounds of intervalMs - doFlush(null, false, false); - } + + recordList.add(buffer); + flushQueue.drainTo(recordList, flushQueueSize - 1); + mergeBuffer(recordList, buffer); + load(buffer.getLabelName(), buffer); + + if (flushQueue.isEmpty()) { + // Avoid waiting for 2 rounds of intervalMs + doFlush(null, false, false); } } catch (Exception e) { LOG.error("worker running error", e); @@ -310,15 +320,34 @@ public void run() { loadThreadAlive = false; } + private void mergeBuffer(List recordList, BatchRecordBuffer buffer) { + if (recordList.size() > 1) { + boolean sameTable = + recordList.stream() + .map(BatchRecordBuffer::getTableIdentifier) + .distinct() + .count() + == 1; + // Buffers can be merged only if they belong to the same table. + if (sameTable) { + for (BatchRecordBuffer recordBuffer : recordList) { + if (recordBuffer.getLabelName() != null + && !buffer.getLabelName().equals(recordBuffer.getLabelName())) { + buffer.getBuffer().addAll(recordBuffer.getBuffer()); + } + } + } + } + } + /** execute stream load. */ public void load(String label, BatchRecordBuffer buffer) throws IOException { if (enableGroupCommit) { label = null; } refreshLoadUrl(buffer.getDatabase(), buffer.getTable()); - ByteBuffer data = buffer.getData(); - ByteArrayEntity entity = - new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit()); + + BatchBufferHttpEntity entity = new BatchBufferHttpEntity(buffer); HttpPutBuilder putBuilder = new HttpPutBuilder(); putBuilder .setUrl(loadUrl) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java index 30116c9a2..db486bcb8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java @@ -107,7 +107,7 @@ public void initializeLoad() { private void intervalFlush() { try { - boolean flush = batchStreamLoad.doFlush(null, false, false); + boolean flush = batchStreamLoad.intervalFlush(); LOG.debug("interval flush trigger, flush: {}", flush); } catch (Exception e) { flushException = e; @@ -125,7 +125,7 @@ public void flush(boolean flush) throws IOException, InterruptedException { checkFlushException(); writeOneDorisRecord(serializer.flush()); LOG.info("checkpoint flush triggered."); - batchStreamLoad.doFlush(null, true, false); + batchStreamLoad.checkpointFlush(); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchRecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchRecordBuffer.java new file mode 100644 index 000000000..e5f4c4ebb --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchRecordBuffer.java @@ -0,0 +1,163 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.copy; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; + +/** buffer to queue. */ +public class BatchRecordBuffer { + private static final Logger LOG = LoggerFactory.getLogger(BatchRecordBuffer.class); + public static final String LINE_SEPARATOR = "\n"; + private String labelName; + private ByteBuffer buffer; + private byte[] lineDelimiter; + private int numOfRecords = 0; + private int bufferSizeBytes = 0; + private boolean loadBatchFirstRecord = true; + private String database; + private String table; + + public BatchRecordBuffer() {} + + public BatchRecordBuffer(byte[] lineDelimiter, int bufferSize) { + super(); + this.lineDelimiter = lineDelimiter; + this.buffer = ByteBuffer.allocate(bufferSize); + } + + public BatchRecordBuffer(String database, String table, byte[] lineDelimiter, int bufferSize) { + super(); + this.database = database; + this.table = table; + this.lineDelimiter = lineDelimiter; + this.buffer = ByteBuffer.allocate(bufferSize); + } + + public void insert(byte[] record) { + ensureCapacity(record.length); + if (loadBatchFirstRecord) { + loadBatchFirstRecord = false; + } else if (lineDelimiter != null) { + this.buffer.put(this.lineDelimiter); + } + this.buffer.put(record); + setNumOfRecords(getNumOfRecords() + 1); + setBufferSizeBytes(getBufferSizeBytes() + record.length); + } + + @VisibleForTesting + public void ensureCapacity(int length) { + int lineDelimiterSize = this.lineDelimiter == null ? 0 : this.lineDelimiter.length; + if (buffer.remaining() - lineDelimiterSize >= length) { + return; + } + int currentRemain = buffer.remaining(); + int currentCapacity = buffer.capacity(); + // add lineDelimiter length + int needed = length - buffer.remaining() + lineDelimiterSize; + // grow at least 1MB + long grow = Math.max(needed, 1024 * 1024); + // grow at least 50% of the current size + grow = Math.max(buffer.capacity() / 2, grow); + int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.capacity() + grow); + ByteBuffer tmp = ByteBuffer.allocate(newCapacity); + buffer.flip(); + tmp.put(buffer); + buffer.clear(); + buffer = tmp; + LOG.info( + "record length {},buffer remain {} ,grow capacity {} to {}", + length, + currentRemain, + currentCapacity, + newCapacity); + } + + public String getLabelName() { + return labelName; + } + + public void setLabelName(String labelName) { + this.labelName = labelName; + } + + /** @return true if buffer is empty */ + public boolean isEmpty() { + return numOfRecords == 0; + } + + public ByteBuffer getData() { + // change mode + buffer.flip(); + LOG.debug("flush buffer: {} records, {} bytes", getNumOfRecords(), getBufferSizeBytes()); + return buffer; + } + + public void clear() { + this.buffer.clear(); + this.numOfRecords = 0; + this.bufferSizeBytes = 0; + this.labelName = null; + this.loadBatchFirstRecord = true; + } + + public ByteBuffer getBuffer() { + return buffer; + } + + /** @return Number of records in this buffer */ + public int getNumOfRecords() { + return numOfRecords; + } + + /** @return Buffer size in bytes */ + public int getBufferSizeBytes() { + return bufferSizeBytes; + } + + /** @param numOfRecords Updates number of records (Usually by 1) */ + public void setNumOfRecords(int numOfRecords) { + this.numOfRecords = numOfRecords; + } + + /** @param bufferSizeBytes Updates sum of size of records present in this buffer (Bytes) */ + public void setBufferSizeBytes(int bufferSizeBytes) { + this.bufferSizeBytes = bufferSizeBytes; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java index 9cc27ced3..2c5ed5c27 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java @@ -28,7 +28,6 @@ import org.apache.doris.flink.sink.EscapeHandler; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; -import org.apache.doris.flink.sink.batch.BatchRecordBuffer; import org.apache.doris.flink.sink.writer.LabelGenerator; import org.apache.http.Header; import org.apache.http.HttpEntity; @@ -139,8 +138,7 @@ public synchronized void writeRecord(String database, String table, byte[] recor database, table, this.lineDelimiter, - executionOptions.getBufferFlushMaxBytes(), - executionOptions.getBufferFlushIntervalMs())); + executionOptions.getBufferFlushMaxBytes())); buffer.insert(record); // When it exceeds 80% of the byteSize,to flush, to avoid triggering bytebuffer expansion if (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() * 0.8 diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index 4b0b56c4e..fd2526da6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -275,14 +275,14 @@ public class DorisConfigOptions { public static final ConfigOption SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions.key("sink.buffer-flush.max-rows") .intType() - .defaultValue(50000) + .defaultValue(500000) .withDescription( "The maximum number of flush items in each batch, the default is 5w"); public static final ConfigOption SINK_BUFFER_FLUSH_MAX_BYTES = ConfigOptions.key("sink.buffer-flush.max-bytes") .memoryType() - .defaultValue(MemorySize.parse("10mb")) + .defaultValue(MemorySize.parse("100mb")) .withDescription( "The maximum number of bytes flushed in each batch, the default is 10MB"); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java index cd0ac267e..b80737ebb 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java @@ -118,8 +118,13 @@ public void testLoadFail() throws Exception { when(httpClientBuilder.build()).thenReturn(httpClient); when(httpClient.execute(any())).thenReturn(response); loader.writeRecord("db", "tbl", "1,data".getBytes()); - loader.doFlush("db.tbl", true, false); + loader.checkpointFlush(); + TestUtil.waitUntilCondition( + () -> !loader.isLoadThreadAlive(), + Deadline.fromNow(Duration.ofSeconds(10)), + 100L, + "Condition was not met in given timeout."); AtomicReference exception = loader.getException(); Assert.assertTrue(exception.get() instanceof Exception); Assert.assertTrue(exception.get().getMessage().contains("stream load error")); @@ -161,10 +166,14 @@ public void testLoadError() throws Exception { when(httpClientBuilder.build()).thenReturn(httpClient); when(httpClient.execute(any())).thenReturn(response); loader.writeRecord("db", "tbl", "1,data".getBytes()); - loader.doFlush("db.tbl", true, false); + loader.checkpointFlush(); + TestUtil.waitUntilCondition( + () -> !loader.isLoadThreadAlive(), + Deadline.fromNow(Duration.ofSeconds(10)), + 100L, + "Condition was not met in given timeout."); AtomicReference exception = loader.getException(); - Assert.assertTrue(exception.get() instanceof Exception); Assert.assertTrue(exception.get().getMessage().contains("stream load error")); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestBatchRecordBuffer.java similarity index 99% rename from flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java rename to flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestBatchRecordBuffer.java index 1a6897c9c..3107225a9 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestBatchRecordBuffer.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.flink.sink.batch; +package org.apache.doris.flink.sink.copy; import org.junit.Assert; import org.junit.Test;