Skip to content

Commit

Permalink
multiple concurrency, make streamload on each BE are more balanced.
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed May 9, 2024
1 parent 2ebee2b commit 9fe85ca
Showing 1 changed file with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr
private final DorisReadOptions dorisReadOptions;
private final DorisExecutionOptions executionOptions;
private final String labelPrefix;
private final int subtaskId;
private final LabelGenerator labelGenerator;
private final int intervalTime;
private final DorisWriterState dorisWriterState;
Expand All @@ -91,6 +92,7 @@ public DorisWriter(Sink.InitContext initContext,
LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
this.dorisWriterState = new DorisWriterState(executionOptions.getLabelPrefix());
this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId();
this.subtaskId = initContext.getSubtaskId();
this.labelGenerator = new LabelGenerator(labelPrefix, executionOptions.enabled2PC());
this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-check"));
this.serializer = serializer;
Expand Down Expand Up @@ -240,7 +242,7 @@ public void close() throws Exception {
public String getAvailableBackend() {
long tmp = pos + backends.size();
while (pos < tmp) {
BackendV2.BackendRowV2 backend = backends.get((int) (pos % backends.size()));
BackendV2.BackendRowV2 backend = backends.get((int) ((pos + subtaskId) % backends.size()));
String res = backend.toBackendString();
if(tryHttpConnection(res)){
pos++;
Expand Down

0 comments on commit 9fe85ca

Please sign in to comment.