diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 62166f2a2..8aef65d6a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -190,9 +190,12 @@ public DorisSink buildDorisSink(String table) { sinkConfig.getOptional(DorisConfigOptions.SINK_MAX_RETRIES).ifPresent(executionBuilder::setMaxRetries); sinkConfig.getOptional(DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE).ifPresent(executionBuilder::setIgnoreUpdateBefore); - boolean enable2pc = sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC); - if(!enable2pc){ + + if(!sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC)){ executionBuilder.disable2PC(); + } else if(sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_2PC).isPresent()){ + //force open 2pc + executionBuilder.enable2PC(); } //batch option