Skip to content

Commit

Permalink
[improve] change batch mode param and add default streamload prop (ap…
Browse files Browse the repository at this point in the history
…ache#251)

Co-authored-by: wudi <>
  • Loading branch information
JNSimba authored Nov 30, 2023
1 parent 323b872 commit e2e7165
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ public static DorisExecutionOptions defaults() {
return new Builder().setStreamLoadProp(properties).build();
}

public static Properties defaultsProperties() {
Properties properties = new Properties();
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");
return properties;
}

public Integer checkInterval() {
return checkInterval;
}
Expand Down Expand Up @@ -269,8 +276,8 @@ public Builder enable2PC() {
return this;
}

public Builder enableBatchMode() {
this.enableBatchMode = true;
public Builder setBatchMode(Boolean enableBatchMode) {
this.enableBatchMode = enableBatchMode;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,7 @@ private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableCo
builder.enable2PC();
}

if(readableConfig.get(SINK_ENABLE_BATCH_MODE)) {
builder.enableBatchMode();
}

builder.setBatchMode(readableConfig.get(SINK_ENABLE_BATCH_MODE));
builder.setFlushQueueSize(readableConfig.get(SINK_FLUSH_QUEUE_SIZE));
builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,7 @@ public DorisSink<String> buildDorisSink(String table) {
executionBuilder.enable2PC();
}

//batch option
if(sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_BATCH_MODE)){
executionBuilder.enableBatchMode();
}
sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_BATCH_MODE).ifPresent(executionBuilder::setBatchMode);
sinkConfig.getOptional(DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE).ifPresent(executionBuilder::setFlushQueueSize);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS).ifPresent(executionBuilder::setBufferFlushMaxRows);
sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES).ifPresent(executionBuilder::setBufferFlushMaxBytes);
Expand Down

0 comments on commit e2e7165

Please sign in to comment.