Skip to content

Commit

Permalink
add option ignore commit error
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Mar 4, 2024
1 parent 64fe57a commit 64e4fed
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class DorisExecutionOptions implements Serializable {
private final boolean enableBatchMode;
private final boolean ignoreUpdateBefore;
private final WriteMode writeMode;
private final boolean ignoreCommitError;

public DorisExecutionOptions(
int checkInterval,
Expand All @@ -81,7 +82,8 @@ public DorisExecutionOptions(
long bufferFlushIntervalMs,
boolean ignoreUpdateBefore,
boolean force2PC,
WriteMode writeMode) {
WriteMode writeMode,
boolean ignoreCommitError) {
Preconditions.checkArgument(maxRetries >= 0);
this.checkInterval = checkInterval;
this.maxRetries = maxRetries;
Expand All @@ -102,6 +104,7 @@ public DorisExecutionOptions(

this.ignoreUpdateBefore = ignoreUpdateBefore;
this.writeMode = writeMode;
this.ignoreCommitError = ignoreCommitError;
}

public static Builder builder() {
Expand Down Expand Up @@ -205,6 +208,10 @@ public WriteMode getWriteMode() {
return writeMode;
}

public boolean ignoreCommitError() {
return ignoreCommitError;
}

/** Builder of {@link DorisExecutionOptions}. */
public static class Builder {
private int checkInterval = DEFAULT_CHECK_INTERVAL;
Expand All @@ -229,6 +236,7 @@ public static class Builder {

private boolean ignoreUpdateBefore = true;
private WriteMode writeMode = WriteMode.STREAM_LOAD;
private boolean ignoreCommitError = false;

public Builder setCheckInterval(Integer checkInterval) {
this.checkInterval = checkInterval;
Expand Down Expand Up @@ -320,6 +328,11 @@ public Builder setWriteMode(WriteMode writeMode) {
return this;
}

public Builder setIgnoreCommitError(boolean ignoreCommitError) {
this.ignoreCommitError = ignoreCommitError;
return this;
}

public DorisExecutionOptions build() {
// If format=json is set but read_json_by_line is not set, record may not be written.
if (streamLoadProp != null
Expand All @@ -344,7 +357,8 @@ public DorisExecutionOptions build() {
bufferFlushIntervalMs,
ignoreUpdateBefore,
force2PC,
writeMode);
writeMode,
ignoreCommitError);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ public DorisAbstractWriter createWriter(InitContext initContext) throws IOExcept
public Committer createCommitter() throws IOException {
if (WriteMode.STREAM_LOAD.equals(dorisExecutionOptions.getWriteMode())
|| WriteMode.STREAM_LOAD_BATCH.equals(dorisExecutionOptions.getWriteMode())) {
return new DorisCommitter(
dorisOptions, dorisReadOptions, dorisExecutionOptions.getMaxRetries());
return new DorisCommitter(dorisOptions, dorisReadOptions, dorisExecutionOptions);
} else if (WriteMode.COPY.equals(dorisExecutionOptions.getWriteMode())) {
return new DorisCopyCommitter(dorisOptions, dorisExecutionOptions.getMaxRetries());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
Expand Down Expand Up @@ -58,20 +59,24 @@ public class DorisCommitter implements Committer<DorisCommittable>, Closeable {
private final BackendUtil backendUtil;

int maxRetry;
final boolean ignoreCommitError;

public DorisCommitter(
DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, int maxRetry) {
this(dorisOptions, dorisReadOptions, maxRetry, new HttpUtil().getHttpClient());
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions) {
this(dorisOptions, dorisReadOptions, executionOptions, new HttpUtil().getHttpClient());
}

public DorisCommitter(
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
int maxRetry,
DorisExecutionOptions executionOptions,
CloseableHttpClient client) {
this.dorisOptions = dorisOptions;
this.dorisReadOptions = dorisReadOptions;
this.maxRetry = maxRetry;
this.maxRetry = executionOptions.getMaxRetries();
this.ignoreCommitError = executionOptions.ignoreCommitError();
this.httpClient = client;
this.backendUtil =
StringUtils.isNotEmpty(dorisOptions.getBenodes())
Expand Down Expand Up @@ -136,7 +141,20 @@ private void commitTransaction(DorisCommittable committable) throws IOException
} catch (Exception e) {
LOG.error("commit transaction failed, to retry, {}", e.getMessage());
if (retry == maxRetry) {
throw new DorisRuntimeException("commit transaction error, ", e);
if (ignoreCommitError) {
// Generally used when txn(stored in checkpoint) expires and unexpected
// errors occur in commit.

// It should be noted that you must manually ensure that the txn has been
// successfully submitted to doris, otherwise there may be a risk of data
// loss.
LOG.error(
"Unable to commit transaction {} and data has been potentially lost ",
committable,
e);
} else {
throw new DorisRuntimeException("commit transaction error, ", e);
}
}
hostPort = backendUtil.getAvailableBackend();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ public class DorisConfigOptions {
.defaultValue(WriteMode.STREAM_LOAD.name())
.withDescription("Write mode, supports stream_load, stream_load_batch");

public static final ConfigOption<Boolean> SINK_IGNORE_COMMIT_ERROR =
ConfigOptions.key("sink.ignore.commit-error")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to ignore commit errors. Usually used when the checkpoint cannot be restored to skip the commit of txn. The default is false.");

public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;

public static final ConfigOption<Boolean> SINK_ENABLE_BATCH_MODE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_BATCH_MODE;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
Expand Down Expand Up @@ -156,6 +157,7 @@ public Set<ConfigOption<?>> optionalOptions() {

options.add(SOURCE_USE_OLD_API);
options.add(SINK_WRITE_MODE);
options.add(SINK_IGNORE_COMMIT_ERROR);
return options;
}

Expand Down Expand Up @@ -226,6 +228,7 @@ private DorisExecutionOptions getDorisExecutionOptions(
builder.setStreamLoadProp(streamLoadProp);
builder.setDeletable(readableConfig.get(SINK_ENABLE_DELETE));
builder.setIgnoreUpdateBefore(readableConfig.get(SINK_IGNORE_UPDATE_BEFORE));
builder.setIgnoreCommitError(readableConfig.get(SINK_IGNORE_COMMIT_ERROR));

if (!readableConfig.get(SINK_ENABLE_2PC)) {
builder.disable2PC();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ public DorisSink<String> buildDorisSink(String table) {
sinkConfig
.getOptional(DorisConfigOptions.SINK_WRITE_MODE)
.ifPresent(v -> executionBuilder.setWriteMode(WriteMode.of(v)));
sinkConfig
.getOptional(DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR)
.ifPresent(executionBuilder::setIgnoreCommitError);

DorisExecutionOptions executionOptions = executionBuilder.build();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.flink.sink.committer;

import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
Expand Down Expand Up @@ -59,6 +60,7 @@ public class TestDorisCommitter {
public void setUp() throws Exception {
DorisOptions dorisOptions = OptionUtils.buildDorisOptions();
DorisReadOptions readOptions = OptionUtils.buildDorisReadOptions();
DorisExecutionOptions executionOptions = OptionUtils.buildExecutionOptional();
dorisCommittable = new DorisCommittable("127.0.0.1:8710", "test", 0);
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
entityMock = new HttpEntityMock();
Expand All @@ -78,7 +80,8 @@ public void setUp() throws Exception {
BackendV2.BackendRowV2.of("127.0.0.1", 8040, true)));
backendUtilMockedStatic.when(() -> BackendUtil.tryHttpConnection(any())).thenReturn(true);

dorisCommitter = new DorisCommitter(dorisOptions, readOptions, 3, httpClient);
dorisCommitter =
new DorisCommitter(dorisOptions, readOptions, executionOptions, httpClient);
}

@Test
Expand Down

0 comments on commit 64e4fed

Please sign in to comment.