diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index adfbe988a..44d1b425c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -84,6 +84,7 @@ public class DorisBatchStreamLoad implements Serializable { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); + private static final long STREAM_LOAD_MAX_BYTES = 10 * 1024 * 1024 * 1024L; //10 GB private final LabelGenerator labelGenerator; private final byte[] lineDelimiter; private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load"; @@ -106,9 +107,7 @@ public class DorisBatchStreamLoad implements Serializable { private boolean enableGzCompress; private int subTaskId; private long streamLoadMaxRows = Integer.MAX_VALUE; - // todoļ¼šCan be configured - private long streamLoadMaxBytes = 1024 * 1024 * 1024; - private long maxBlockedBytes = 200 * 1024 * 1024; + private long maxBlockedBytes; private final AtomicLong currentCacheBytes = new AtomicLong(0L); private final Lock lock = new ReentrantLock(); private final Condition block = lock.newCondition(); @@ -146,6 +145,8 @@ public DorisBatchStreamLoad( this.enableGzCompress = loadProps.getProperty(COMPRESS_TYPE, "").equals(COMPRESS_TYPE_GZ); this.executionOptions = executionOptions; this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize()); + // maxBlockedBytes ensures that a buffer can be written even if the queue is full + this.maxBlockedBytes = (long) executionOptions.getBufferFlushMaxBytes() * (executionOptions.getFlushQueueSize() + 1); if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) { String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); Preconditions.checkState( @@ -216,7 +217,7 @@ public synchronized void writeRecord(String database, String table, byte[] recor LOG.info("trigger flush by buffer full, flush: {}", flush); } else if (buffer.getBufferSizeBytes() >= streamLoadMaxRows - || buffer.getNumOfRecords() >= streamLoadMaxBytes) { + || buffer.getNumOfRecords() >= STREAM_LOAD_MAX_BYTES) { // The buffer capacity exceeds the stream load limit, flush boolean flush = bufferFullFlush(bufferKey); LOG.info("trigger flush by buffer exceeding the limit, flush: {}", flush); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java index 06b371b79..9cc197166 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java @@ -52,9 +52,9 @@ public void testEquals() { .setWriteMode(WriteMode.STREAM_LOAD) .setLabelPrefix("doris") .enable2PC() - .setBufferFlushMaxBytes(524288001) + .setBufferFlushMaxBytes(10) .setBufferFlushIntervalMs(10000) - .setBufferFlushMaxRows(500001) + .setBufferFlushMaxRows(12) .setCheckInterval(10) .setIgnoreCommitError(true) .setDeletable(true) @@ -72,9 +72,9 @@ public void testEquals() { .setWriteMode(WriteMode.STREAM_LOAD) .setLabelPrefix("doris") .enable2PC() - .setBufferFlushMaxBytes(524288001) + .setBufferFlushMaxBytes(10) .setBufferFlushIntervalMs(10000) - .setBufferFlushMaxRows(500001) + .setBufferFlushMaxRows(12) .setCheckInterval(10) .setIgnoreCommitError(true) .setDeletable(true) @@ -111,17 +111,17 @@ public void testEquals() { Assert.assertNotEquals(exceptOptions, builder.build()); builder.enable2PC(); - builder.setBufferFlushMaxBytes(524288002); + builder.setBufferFlushMaxBytes(11); Assert.assertNotEquals(exceptOptions, builder.build()); - builder.setBufferFlushMaxBytes(524288001); + builder.setBufferFlushMaxBytes(10); builder.setBufferFlushIntervalMs(100001); Assert.assertNotEquals(exceptOptions, builder.build()); builder.setBufferFlushIntervalMs(10000); - builder.setBufferFlushMaxRows(500002); + builder.setBufferFlushMaxRows(2); Assert.assertNotEquals(exceptOptions, builder.build()); - builder.setBufferFlushMaxRows(500001); + builder.setBufferFlushMaxRows(12); builder.setCheckInterval(11); Assert.assertNotEquals(exceptOptions, builder.build()); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index 97aa22a5b..aa3d00dae 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -208,8 +208,8 @@ public void testTableBatch() throws Exception { + " 'sink.enable.batch-mode' = 'true'," + " 'sink.enable-delete' = 'true'," + " 'sink.flush.queue-size' = '2'," - + " 'sink.buffer-flush.max-rows' = '100000'," - + " 'sink.buffer-flush.max-bytes' = '50MB'," + + " 'sink.buffer-flush.max-rows' = '1'," + + " 'sink.buffer-flush.max-bytes' = '5'," + " 'sink.buffer-flush.interval' = '10s'" + ")", getFenodes(), @@ -295,8 +295,8 @@ public void testTableGroupCommit() throws Exception { + " 'sink.enable.batch-mode' = 'true'," + " 'sink.enable-delete' = 'true'," + " 'sink.flush.queue-size' = '2'," - + " 'sink.buffer-flush.max-rows' = '300000'," - + " 'sink.buffer-flush.max-bytes' = '50MB'," + + " 'sink.buffer-flush.max-rows' = '3'," + + " 'sink.buffer-flush.max-bytes' = '5000'," + " 'sink.buffer-flush.interval' = '10s'" + ")", getFenodes(), diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java index 62797c77e..0004af05c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java @@ -132,8 +132,8 @@ public void testDorisSinkProperties() { properties.put("sink.use-cache", "true"); properties.put("sink.enable.batch-mode", "true"); properties.put("sink.flush.queue-size", "2"); - properties.put("sink.buffer-flush.max-rows", "50000"); - properties.put("sink.buffer-flush.max-bytes", "50MB"); + properties.put("sink.buffer-flush.max-rows", "1000"); + properties.put("sink.buffer-flush.max-bytes", "10MB"); properties.put("sink.buffer-flush.interval", "10s"); properties.put("sink.ignore.update-before", "true"); properties.put("sink.ignore.commit-error", "false"); @@ -165,8 +165,8 @@ public void testDorisSinkProperties() { .setBatchMode(true) .enable2PC() .setBufferFlushIntervalMs(10000) - .setBufferFlushMaxBytes(50 * 1024 * 1024) - .setBufferFlushMaxRows(50000) + .setBufferFlushMaxBytes(10 * 1024 * 1024) + .setBufferFlushMaxRows(1000) .setFlushQueueSize(2) .setUseCache(true) .setIgnoreCommitError(false)