diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java index d1aee44cc..bc2d45c86 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java @@ -24,27 +24,28 @@ import org.apache.doris.flink.sink.committer.DorisCommitter; import org.apache.doris.flink.sink.writer.DorisRecordSerializer; import org.apache.doris.flink.sink.writer.DorisWriter; -import org.apache.flink.api.connector.sink.Committer; import org.apache.doris.flink.sink.writer.DorisWriterState; import org.apache.doris.flink.sink.writer.DorisWriterStateSerializer; -import org.apache.flink.api.connector.sink.GlobalCommitter; -import org.apache.flink.api.connector.sink.Sink; -import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.List; -import java.util.Optional; +import java.util.Collection; +import java.util.Collections; /** * Load data into Doris based on 2PC. * see {@link DorisWriter} and {@link DorisCommitter}. * @param type of record. */ -public class DorisSink implements Sink { +public class DorisSink + implements StatefulSink, + TwoPhaseCommittingSink{ private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class); private final DorisOptions dorisOptions; @@ -75,36 +76,32 @@ private void checkKeyType() { } @Override - public SinkWriter createWriter(InitContext initContext, List state) throws IOException { - DorisWriter dorisWriter = new DorisWriter(initContext, state, serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions); - dorisWriter.initializeLoad(state); + public DorisWriter createWriter(InitContext initContext) throws IOException { + DorisWriter dorisWriter = new DorisWriter<>(initContext, Collections.emptyList(), serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions); return dorisWriter; } @Override - public Optional> getWriterStateSerializer() { - return Optional.of(new DorisWriterStateSerializer()); + public Committer createCommitter() throws IOException { + return new DorisCommitter(dorisOptions, dorisReadOptions, dorisExecutionOptions.getMaxRetries()); } @Override - public Optional> createCommitter() throws IOException { - return Optional.of(new DorisCommitter(dorisOptions, dorisReadOptions, dorisExecutionOptions.getMaxRetries())); + public DorisWriter restoreWriter(InitContext initContext, Collection recoveredState) throws IOException { + DorisWriter dorisWriter = new DorisWriter<>(initContext, recoveredState, serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions); + return dorisWriter; } @Override - public Optional> createGlobalCommitter() throws IOException { - return Optional.empty(); + public SimpleVersionedSerializer getWriterStateSerializer() { + return new DorisWriterStateSerializer(); } @Override - public Optional> getCommittableSerializer() { - return Optional.of(new DorisCommittableSerializer()); + public SimpleVersionedSerializer getCommittableSerializer() { + return new DorisCommittableSerializer(); } - @Override - public Optional> getGlobalCommittableSerializer() { - return Optional.empty(); - } public static Builder builder() { return new Builder<>(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java index 0e19b0f98..2a0fba04b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java @@ -29,7 +29,7 @@ import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; import org.apache.doris.flink.sink.ResponseUtil; -import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink2.Committer; import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPut; @@ -38,10 +38,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.IOException; -import java.util.Collections; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; @@ -49,7 +49,7 @@ /** * The committer to commit transaction. */ -public class DorisCommitter implements Committer { +public class DorisCommitter implements Committer, Closeable { private static final Logger LOG = LoggerFactory.getLogger(DorisCommitter.class); private static final String commitPattern = "http://%s/api/%s/_stream_load_2pc"; private final CloseableHttpClient httpClient; @@ -75,11 +75,10 @@ public DorisCommitter(DorisOptions dorisOptions, DorisReadOptions dorisReadOptio } @Override - public List commit(List committableList) throws IOException { - for (DorisCommittable committable : committableList) { - commitTransaction(committable); + public void commit(Collection> requests) throws IOException, InterruptedException { + for (CommitRequest request: requests) { + commitTransaction(request.getCommittable()); } - return Collections.emptyList(); } private void commitTransaction(DorisCommittable committable) throws IOException { @@ -133,9 +132,12 @@ private void commitTransaction(DorisCommittable committable) throws IOException } @Override - public void close() throws Exception { + public void close() { if (httpClient != null) { - httpClient.close(); + try { + httpClient.close(); + } catch (IOException e) { + } } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index cda3c0528..4fd9abfa9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -139,6 +139,8 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception { LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID); while (true) { try { + // TODO: According to label abort txn. Currently, it can only be aborted based on txnid, + // so we must first request a streamload based on the label to get the txnid. String label = labelGenerator.generateLabel(startChkID); HttpPutBuilder builder = new HttpPutBuilder(); builder.setUrl(loadUrlStr) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 230bad5f6..295a0be86 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -17,6 +17,7 @@ package org.apache.doris.flink.sink.writer; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; @@ -27,11 +28,10 @@ import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpUtil; - -import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.connector.sink.Sink; -import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; import org.apache.flink.util.Preconditions; @@ -42,6 +42,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -56,7 +57,8 @@ * Doris Writer will load data to doris. * @param */ -public class DorisWriter implements SinkWriter { +public class DorisWriter implements StatefulSink.StatefulSinkWriter, + TwoPhaseCommittingSink.PrecommittingSinkWriter { private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.class); private static final List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); private final long lastCheckpointId; @@ -78,7 +80,7 @@ public class DorisWriter implements SinkWriter state, + Collection state, DorisRecordSerializer serializer, DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, @@ -100,9 +102,11 @@ public DorisWriter(Sink.InitContext initContext, this.executionOptions = executionOptions; this.intervalTime = executionOptions.checkInterval(); this.loading = false; + + initializeLoad(state); } - public void initializeLoad(List state) throws IOException { + public void initializeLoad(Collection state) { this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil( dorisOptions.getBenodes()) : new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); @@ -144,7 +148,13 @@ public void write(IN in, Context context) throws IOException { } @Override - public List prepareCommit(boolean flush) throws IOException { + public void flush(boolean flush) throws IOException, InterruptedException { + + } + + + @Override + public Collection prepareCommit() throws IOException, InterruptedException { if(!loading){ //There is no data during the entire checkpoint period return Collections.emptyList(); @@ -246,4 +256,5 @@ public void close() throws Exception { dorisStreamLoad.close(); } } + } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java index 06f2bfb89..66d0227ba 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java @@ -27,7 +27,6 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.connector.sink.SinkProvider; import org.apache.flink.table.connector.sink.SinkV2Provider; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; @@ -104,7 +103,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { .setDorisReadOptions(readOptions) .setDorisExecutionOptions(executionOptions) .setSerializer(serializerBuilder.build()); - return SinkProvider.of(dorisSinkBuilder.build(), sinkParallelism); + return SinkV2Provider.of(dorisSinkBuilder.build(), sinkParallelism); }else{ DorisBatchSink.Builder dorisBatchSinkBuilder = DorisBatchSink.builder(); dorisBatchSinkBuilder.setDorisOptions(options) diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/MockCommitRequest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/MockCommitRequest.java new file mode 100644 index 000000000..d879e8126 --- /dev/null +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/MockCommitRequest.java @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.sink.committer; + +import org.apache.flink.api.connector.sink2.Committer; + +public class MockCommitRequest implements Committer.CommitRequest{ + + private final CommT committable; + + public MockCommitRequest(CommT committable) { + this.committable = committable; + } + + @Override + public CommT getCommittable() { + return committable; + } + + @Override + public int getNumberOfRetries() { + return 0; + } + + @Override + public void signalFailedWithKnownReason(Throwable throwable) { + + } + + @Override + public void signalFailedWithUnknownReason(Throwable throwable) { + + } + + @Override + public void retryLater() { + + } + + @Override + public void updateAndRetryLater(CommT commT) { + + } + + @Override + public void signalAlreadyCommitted() { + + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java index 7cc2a88f3..794f8068d 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java @@ -25,7 +25,6 @@ import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpEntityMock; import org.apache.doris.flink.sink.OptionUtils; - import org.apache.http.ProtocolVersion; import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; @@ -34,16 +33,15 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; - +import org.mockito.MockedStatic; +import org.slf4j.Logger; import java.util.Collections; import static org.mockito.ArgumentMatchers.any; -import org.mockito.MockedStatic; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.when; -import org.slf4j.Logger; /** * Test for Doris Committer. @@ -83,7 +81,8 @@ public void testCommitted() throws Exception { "\"msg\": \"errCode = 2, detailMessage = transaction [2] is already visible, not pre-committed.\"\n" + "}"; this.entityMock.setValue(response); - dorisCommitter.commit(Collections.singletonList(dorisCommittable)); + final MockCommitRequest request = new MockCommitRequest<>(dorisCommittable); + dorisCommitter.commit(Collections.singletonList(request)); } @@ -94,7 +93,8 @@ public void testCommitAbort() throws Exception { "\"msg\": \"errCode = 2, detailMessage = transaction [25] is already aborted. abort reason: User Abort\"\n" + "}"; this.entityMock.setValue(response); - dorisCommitter.commit(Collections.singletonList(dorisCommittable)); + final MockCommitRequest request = new MockCommitRequest<>(dorisCommittable); + dorisCommitter.commit(Collections.singletonList(request)); } @After diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java index e988d6b29..01e1559ad 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java @@ -23,7 +23,7 @@ import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpTestUtil; import org.apache.doris.flink.sink.OptionUtils; -import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink2.Sink; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.impl.client.CloseableHttpClient; import org.junit.Assert; @@ -31,6 +31,7 @@ import org.junit.Ignore; import org.junit.Test; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.OptionalLong; @@ -67,12 +68,13 @@ public void testPrepareCommit() throws Exception { when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1)); DorisWriter dorisWriter = new DorisWriter(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions); dorisWriter.setDorisStreamLoad(dorisStreamLoad); - List committableList = dorisWriter.prepareCommit(true); - + dorisWriter.write("doris,1",null); + Collection committableList = dorisWriter.prepareCommit(); Assert.assertEquals(1, committableList.size()); - Assert.assertEquals("local:8040", committableList.get(0).getHostPort()); - Assert.assertEquals("db_test", committableList.get(0).getDb()); - Assert.assertEquals(2, committableList.get(0).getTxnID()); + DorisCommittable dorisCommittable = committableList.stream().findFirst().get(); + Assert.assertEquals("local:8040", dorisCommittable.getHostPort()); + Assert.assertEquals("test", dorisCommittable.getDb()); + Assert.assertEquals(2, dorisCommittable.getTxnID()); Assert.assertFalse(dorisWriter.isLoading()); } @@ -91,6 +93,6 @@ public void testSnapshot() throws Exception { Assert.assertEquals(1, writerStates.size()); Assert.assertEquals("doris", writerStates.get(0).getLabelPrefix()); - Assert.assertTrue(dorisWriter.isLoading()); + Assert.assertTrue(!dorisWriter.isLoading()); } }