Skip to content

Commit

Permalink
upload
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Jan 29, 2024
1 parent ea65d88 commit 51e1272
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,13 @@
public enum WriteMode {
STREAM_LOAD,
STREAM_LOAD_BATCH,
COPY
COPY;

public static WriteMode of(String name) {
try {
return WriteMode.valueOf(name.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unsupported write mode: " + name);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private DorisExecutionOptions getDorisExecutionOptions(
builder.enable2PC();
}

builder.setWriteMode(WriteMode.valueOf(readableConfig.get(SINK_WRITE_MODE).toUpperCase()));
builder.setWriteMode(WriteMode.of(readableConfig.get(SINK_WRITE_MODE)));
builder.setBatchMode(readableConfig.get(SINK_ENABLE_BATCH_MODE));
// Compatible with previous versions
if (readableConfig.get(SINK_ENABLE_BATCH_MODE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.batch.DorisBatchSink;
import org.apache.doris.flink.sink.writer.WriteMode;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;

import java.util.Arrays;
Expand All @@ -41,7 +43,7 @@ public static void main(String[] args) throws Exception {
// env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,
// Time.milliseconds(30000)));
DorisBatchSink.Builder<String> builder = DorisBatchSink.builder();
DorisSink.Builder<String> builder = DorisSink.builder();
final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
readOptionBuilder
.setDeserializeArrowAsync(false)
Expand Down Expand Up @@ -70,7 +72,8 @@ public static void main(String[] args) throws Exception {
.setDeletable(false)
.setBufferFlushMaxBytes(8 * 1024)
.setBufferFlushMaxRows(900)
.setBufferFlushIntervalMs(1000 * 10);
.setBufferFlushIntervalMs(1000 * 10)
.setWriteMode(WriteMode.STREAM_LOAD_BATCH);

builder.setDorisReadOptions(readOptionBuilder.build())
.setDorisExecutionOptions(executionBuilder.build())
Expand Down

0 comments on commit 51e1272

Please sign in to comment.