diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 09a467282..25bd497f9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -358,9 +358,6 @@ public Builder setFlushQueueSize(int flushQueueSize) { } public Builder setBufferFlushIntervalMs(long bufferFlushIntervalMs) { - Preconditions.checkState( - bufferFlushIntervalMs >= 1000, - "bufferFlushIntervalMs must be greater than or equal to 1 second"); this.bufferFlushIntervalMs = bufferFlushIntervalMs; return this; } @@ -397,6 +394,8 @@ public DorisExecutionOptions build() { && JSON.equals(streamLoadProp.getProperty(FORMAT_KEY))) { streamLoadProp.put(READ_JSON_BY_LINE, true); } + checkParams(); + return new DorisExecutionOptions( checkInterval, maxRetries, @@ -417,5 +416,17 @@ public DorisExecutionOptions build() { writeMode, ignoreCommitError); } + + private void checkParams() { + Preconditions.checkState( + bufferFlushIntervalMs >= 1000, + "bufferFlushIntervalMs must be greater than or equal to 1 second"); + Preconditions.checkState( + bufferFlushMaxBytes >= 52428800, + "bufferFlushMaxBytes must be greater than or equal to 52428800(50mb)"); + Preconditions.checkState( + bufferFlushMaxRows >= 50000, + "bufferFlushMaxRows must be greater than or equal to 50000"); + } } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java index aa3d00dae..25d3a7a21 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java @@ -208,8 +208,8 @@ public void testTableBatch() throws Exception { + " 'sink.enable.batch-mode' = 'true'," + " 'sink.enable-delete' = 'true'," + " 'sink.flush.queue-size' = '2'," - + " 'sink.buffer-flush.max-rows' = '1'," - + " 'sink.buffer-flush.max-bytes' = '5'," + + " 'sink.buffer-flush.max-rows' = '100000'," + + " 'sink.buffer-flush.max-bytes' = '5000000'," + " 'sink.buffer-flush.interval' = '10s'" + ")", getFenodes(), @@ -295,8 +295,8 @@ public void testTableGroupCommit() throws Exception { + " 'sink.enable.batch-mode' = 'true'," + " 'sink.enable-delete' = 'true'," + " 'sink.flush.queue-size' = '2'," - + " 'sink.buffer-flush.max-rows' = '3'," - + " 'sink.buffer-flush.max-bytes' = '5000'," + + " 'sink.buffer-flush.max-rows' = '300000'," + + " 'sink.buffer-flush.max-bytes' = '5000000'," + " 'sink.buffer-flush.interval' = '10s'" + ")", getFenodes(),