From e2e71651e1c154cc75c5fd7bbca648da9ad4a813 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 30 Nov 2023 17:41:14 +0800 Subject: [PATCH] [improve] change batch mode param and add default streamload prop (#251) Co-authored-by: wudi <> --- .../apache/doris/flink/cfg/DorisExecutionOptions.java | 11 +++++++++-- .../doris/flink/table/DorisDynamicTableFactory.java | 5 +---- .../apache/doris/flink/tools/cdc/DatabaseSync.java | 5 +---- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 9a43d77ff..25e153515 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -119,6 +119,13 @@ public static DorisExecutionOptions defaults() { return new Builder().setStreamLoadProp(properties).build(); } + public static Properties defaultsProperties() { + Properties properties = new Properties(); + properties.setProperty("format", "json"); + properties.setProperty("read_json_by_line", "true"); + return properties; + } + public Integer checkInterval() { return checkInterval; } @@ -269,8 +276,8 @@ public Builder enable2PC() { return this; } - public Builder enableBatchMode() { - this.enableBatchMode = true; + public Builder setBatchMode(Boolean enableBatchMode) { + this.enableBatchMode = enableBatchMode; return this; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index bccb8b7f3..521d7412a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -224,10 +224,7 @@ private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableCo builder.enable2PC(); } - if(readableConfig.get(SINK_ENABLE_BATCH_MODE)) { - builder.enableBatchMode(); - } - + builder.setBatchMode(readableConfig.get(SINK_ENABLE_BATCH_MODE)); builder.setFlushQueueSize(readableConfig.get(SINK_FLUSH_QUEUE_SIZE)); builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS)); builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES)); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 4dfa6d529..bf262146f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -221,10 +221,7 @@ public DorisSink buildDorisSink(String table) { executionBuilder.enable2PC(); } - //batch option - if(sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_BATCH_MODE)){ - executionBuilder.enableBatchMode(); - } + sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_BATCH_MODE).ifPresent(executionBuilder::setBatchMode); sinkConfig.getOptional(DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE).ifPresent(executionBuilder::setFlushQueueSize); sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS).ifPresent(executionBuilder::setBufferFlushMaxRows); sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES).ifPresent(executionBuilder::setBufferFlushMaxBytes);