Skip to content

Commit

Permalink
change option default when format is json
Browse files Browse the repository at this point in the history
  • Loading branch information
wudi committed Nov 22, 2023
1 parent 943a638 commit 0a9e1e6
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import java.io.Serializable;
import java.util.Properties;

import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
import static org.apache.doris.flink.sink.writer.LoadConstants.READ_JSON_BY_LINE;

/**
* Doris sink batch options.
*/
Expand Down Expand Up @@ -297,6 +301,12 @@ public Builder setIgnoreUpdateBefore(boolean ignoreUpdateBefore) {
}

public DorisExecutionOptions build() {
//If format=json is set but read_json_by_line is not set, record may not be written.
if(streamLoadProp != null
&& streamLoadProp.containsKey(FORMAT_KEY)
&& JSON.equals(streamLoadProp.getProperty(FORMAT_KEY))){
streamLoadProp.put(READ_JSON_BY_LINE, true);
}
return new DorisExecutionOptions(checkInterval, maxRetries, bufferSize, bufferCount, labelPrefix, useCache,
streamLoadProp, enableDelete, enable2PC, enableBatchMode, flushQueueSize, bufferFlushMaxRows,
bufferFlushMaxBytes, bufferFlushIntervalMs, ignoreUpdateBefore, force2PC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public Builder setAutoRedirect(boolean autoRedirect) {

public DorisOptions build() {
checkNotNull(fenodes, "No fenodes supplied.");
checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
//multi table load, don't need check
//checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
return new DorisOptions(fenodes, benodes, username, password, tableIdentifier, jdbcUrl, autoRedirect);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ public class LoadConstants {
public static final String CSV = "csv";
public static final String NULL_VALUE = "\\N";
public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
public static final String READ_JSON_BY_LINE = "read_json_by_line";

}

0 comments on commit 0a9e1e6

Please sign in to comment.