From 0a9e1e605bed50de449b2c7fedef6d566f4d4116 Mon Sep 17 00:00:00 2001 From: wudi <> Date: Wed, 22 Nov 2023 15:27:25 +0800 Subject: [PATCH] change option default when format is json --- .../apache/doris/flink/cfg/DorisExecutionOptions.java | 10 ++++++++++ .../java/org/apache/doris/flink/cfg/DorisOptions.java | 3 ++- .../apache/doris/flink/sink/writer/LoadConstants.java | 1 + 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 4a03024dd..9a43d77ff 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -22,6 +22,10 @@ import java.io.Serializable; import java.util.Properties; +import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; +import static org.apache.doris.flink.sink.writer.LoadConstants.JSON; +import static org.apache.doris.flink.sink.writer.LoadConstants.READ_JSON_BY_LINE; + /** * Doris sink batch options. */ @@ -297,6 +301,12 @@ public Builder setIgnoreUpdateBefore(boolean ignoreUpdateBefore) { } public DorisExecutionOptions build() { + //If format=json is set but read_json_by_line is not set, record may not be written. + if(streamLoadProp != null + && streamLoadProp.containsKey(FORMAT_KEY) + && JSON.equals(streamLoadProp.getProperty(FORMAT_KEY))){ + streamLoadProp.put(READ_JSON_BY_LINE, true); + } return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, useCache, streamLoadProp, enableDelete, enable2PC, enableBatchMode, flushQueueSize, bufferFlushMaxRows, bufferFlushMaxBytes, bufferFlushIntervalMs, ignoreUpdateBefore, force2PC); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java index 6391e9142..cb7565f39 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java @@ -132,7 +132,8 @@ public Builder setAutoRedirect(boolean autoRedirect) { public DorisOptions build() { checkNotNull(fenodes, "No fenodes supplied."); - checkNotNull(tableIdentifier, "No tableIdentifier supplied."); + //multi table load, don't need check + //checkNotNull(tableIdentifier, "No tableIdentifier supplied."); return new DorisOptions(fenodes, benodes, username, password, tableIdentifier, jdbcUrl, autoRedirect); } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java index e6566e39a..d38f9602d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java @@ -31,5 +31,6 @@ public class LoadConstants { public static final String CSV = "csv"; public static final String NULL_VALUE = "\\N"; public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__"; + public static final String READ_JSON_BY_LINE = "read_json_by_line"; }