diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index a890d3497..b1b49d1bf 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -63,6 +63,7 @@ public class DorisExecutionOptions implements Serializable { private final boolean enableBatchMode; private final boolean ignoreUpdateBefore; private final WriteMode writeMode; + private final boolean ignoreCommitError; public DorisExecutionOptions( int checkInterval, @@ -81,7 +82,8 @@ public DorisExecutionOptions( long bufferFlushIntervalMs, boolean ignoreUpdateBefore, boolean force2PC, - WriteMode writeMode) { + WriteMode writeMode, + boolean ignoreCommitError) { Preconditions.checkArgument(maxRetries >= 0); this.checkInterval = checkInterval; this.maxRetries = maxRetries; @@ -102,6 +104,7 @@ public DorisExecutionOptions( this.ignoreUpdateBefore = ignoreUpdateBefore; this.writeMode = writeMode; + this.ignoreCommitError = ignoreCommitError; } public static Builder builder() { @@ -205,6 +208,10 @@ public WriteMode getWriteMode() { return writeMode; } + public boolean ignoreCommitError() { + return ignoreCommitError; + } + /** Builder of {@link DorisExecutionOptions}. */ public static class Builder { private int checkInterval = DEFAULT_CHECK_INTERVAL; @@ -229,6 +236,7 @@ public static class Builder { private boolean ignoreUpdateBefore = true; private WriteMode writeMode = WriteMode.STREAM_LOAD; + private boolean ignoreCommitError = false; public Builder setCheckInterval(Integer checkInterval) { this.checkInterval = checkInterval; @@ -320,6 +328,11 @@ public Builder setWriteMode(WriteMode writeMode) { return this; } + public Builder setIgnoreCommitError(boolean ignoreCommitError) { + this.ignoreCommitError = ignoreCommitError; + return this; + } + public DorisExecutionOptions build() { // If format=json is set but read_json_by_line is not set, record may not be written. if (streamLoadProp != null @@ -344,7 +357,8 @@ public DorisExecutionOptions build() { bufferFlushIntervalMs, ignoreUpdateBefore, force2PC, - writeMode); + writeMode, + ignoreCommitError); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java index e9ba2ebdd..2f00c9cb7 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java @@ -89,8 +89,7 @@ public DorisAbstractWriter createWriter(InitContext initContext) throws IOExcept public Committer createCommitter() throws IOException { if (WriteMode.STREAM_LOAD.equals(dorisExecutionOptions.getWriteMode()) || WriteMode.STREAM_LOAD_BATCH.equals(dorisExecutionOptions.getWriteMode())) { - return new DorisCommitter( - dorisOptions, dorisReadOptions, dorisExecutionOptions.getMaxRetries()); + return new DorisCommitter(dorisOptions, dorisReadOptions, dorisExecutionOptions); } else if (WriteMode.COPY.equals(dorisExecutionOptions.getWriteMode())) { return new DorisCopyCommitter(dorisOptions, dorisExecutionOptions.getMaxRetries()); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java index 3959c26cc..763694eab 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; +import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; @@ -58,20 +59,24 @@ public class DorisCommitter implements Committer, Closeable { private final BackendUtil backendUtil; int maxRetry; + final boolean ignoreCommitError; public DorisCommitter( - DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, int maxRetry) { - this(dorisOptions, dorisReadOptions, maxRetry, new HttpUtil().getHttpClient()); + DorisOptions dorisOptions, + DorisReadOptions dorisReadOptions, + DorisExecutionOptions executionOptions) { + this(dorisOptions, dorisReadOptions, executionOptions, new HttpUtil().getHttpClient()); } public DorisCommitter( DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, - int maxRetry, + DorisExecutionOptions executionOptions, CloseableHttpClient client) { this.dorisOptions = dorisOptions; this.dorisReadOptions = dorisReadOptions; - this.maxRetry = maxRetry; + this.maxRetry = executionOptions.getMaxRetries(); + this.ignoreCommitError = executionOptions.ignoreCommitError(); this.httpClient = client; this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) @@ -136,7 +141,20 @@ private void commitTransaction(DorisCommittable committable) throws IOException } catch (Exception e) { LOG.error("commit transaction failed, to retry, {}", e.getMessage()); if (retry == maxRetry) { - throw new DorisRuntimeException("commit transaction error, ", e); + if (ignoreCommitError) { + // Generally used when txn(stored in checkpoint) expires and unexpected + // errors occur in commit. + + // It should be noted that you must manually ensure that the txn has been + // successfully submitted to doris, otherwise there may be a risk of data + // loss. + LOG.error( + "Unable to commit transaction {} and data has been potentially lost ", + committable, + e); + } else { + throw new DorisRuntimeException("commit transaction error, ", e); + } } hostPort = backendUtil.getAvailableBackend(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java index 2c7c753c6..c6988716d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java @@ -235,6 +235,13 @@ public class DorisConfigOptions { .defaultValue(WriteMode.STREAM_LOAD.name()) .withDescription("Write mode, supports stream_load, stream_load_batch"); + public static final ConfigOption SINK_IGNORE_COMMIT_ERROR = + ConfigOptions.key("sink.ignore.commit-error") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to ignore commit errors. Usually used when the checkpoint cannot be restored to skip the commit of txn. The default is false."); + public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; public static final ConfigOption SINK_ENABLE_BATCH_MODE = diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index c7d13f6da..bf5cd8ce3 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -73,6 +73,7 @@ import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_BATCH_MODE; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE; +import static org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX; import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES; @@ -156,6 +157,7 @@ public Set> optionalOptions() { options.add(SOURCE_USE_OLD_API); options.add(SINK_WRITE_MODE); + options.add(SINK_IGNORE_COMMIT_ERROR); return options; } @@ -226,6 +228,7 @@ private DorisExecutionOptions getDorisExecutionOptions( builder.setStreamLoadProp(streamLoadProp); builder.setDeletable(readableConfig.get(SINK_ENABLE_DELETE)); builder.setIgnoreUpdateBefore(readableConfig.get(SINK_IGNORE_UPDATE_BEFORE)); + builder.setIgnoreCommitError(readableConfig.get(SINK_IGNORE_COMMIT_ERROR)); if (!readableConfig.get(SINK_ENABLE_2PC)) { builder.disable2PC(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 8627a476e..17fa11f8b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -291,6 +291,9 @@ public DorisSink buildDorisSink(String table) { sinkConfig .getOptional(DorisConfigOptions.SINK_WRITE_MODE) .ifPresent(v -> executionBuilder.setWriteMode(WriteMode.of(v))); + sinkConfig + .getOptional(DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR) + .ifPresent(executionBuilder::setIgnoreCommitError); DorisExecutionOptions executionOptions = executionBuilder.build(); builder.setDorisReadOptions(DorisReadOptions.builder().build()) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java index f5c82bb1c..be5bb0d52 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.sink.committer; +import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; @@ -59,6 +60,7 @@ public class TestDorisCommitter { public void setUp() throws Exception { DorisOptions dorisOptions = OptionUtils.buildDorisOptions(); DorisReadOptions readOptions = OptionUtils.buildDorisReadOptions(); + DorisExecutionOptions executionOptions = OptionUtils.buildExecutionOptional(); dorisCommittable = new DorisCommittable("127.0.0.1:8710", "test", 0); CloseableHttpClient httpClient = mock(CloseableHttpClient.class); entityMock = new HttpEntityMock(); @@ -78,7 +80,8 @@ public void setUp() throws Exception { BackendV2.BackendRowV2.of("127.0.0.1", 8040, true))); backendUtilMockedStatic.when(() -> BackendUtil.tryHttpConnection(any())).thenReturn(true); - dorisCommitter = new DorisCommitter(dorisOptions, readOptions, 3, httpClient); + dorisCommitter = + new DorisCommitter(dorisOptions, readOptions, executionOptions, httpClient); } @Test