diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 9a43d77ff..25e153515 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -119,6 +119,13 @@ public static DorisExecutionOptions defaults() { return new Builder().setStreamLoadProp(properties).build(); } + public static Properties defaultsProperties() { + Properties properties = new Properties(); + properties.setProperty("format", "json"); + properties.setProperty("read_json_by_line", "true"); + return properties; + } + public Integer checkInterval() { return checkInterval; } @@ -269,8 +276,8 @@ public Builder enable2PC() { return this; } - public Builder enableBatchMode() { - this.enableBatchMode = true; + public Builder setBatchMode(Boolean enableBatchMode) { + this.enableBatchMode = enableBatchMode; return this; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index bccb8b7f3..521d7412a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -224,10 +224,7 @@ private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableCo builder.enable2PC(); } - if(readableConfig.get(SINK_ENABLE_BATCH_MODE)) { - builder.enableBatchMode(); - } - + builder.setBatchMode(readableConfig.get(SINK_ENABLE_BATCH_MODE)); builder.setFlushQueueSize(readableConfig.get(SINK_FLUSH_QUEUE_SIZE)); builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS)); builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES)); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 4dfa6d529..bf262146f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -221,10 +221,7 @@ public DorisSink buildDorisSink(String table) { executionBuilder.enable2PC(); } - //batch option - if(sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_BATCH_MODE)){ - executionBuilder.enableBatchMode(); - } + sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_BATCH_MODE).ifPresent(executionBuilder::setBatchMode); sinkConfig.getOptional(DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE).ifPresent(executionBuilder::setFlushQueueSize); sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS).ifPresent(executionBuilder::setBufferFlushMaxRows); sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES).ifPresent(executionBuilder::setBufferFlushMaxBytes);