Skip to content

Commit

Permalink
[refactor] sink api refactor by FLIP-191 (apache#213)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Oct 24, 2023
1 parent 9f30144 commit a355ca9
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,28 @@
import org.apache.doris.flink.sink.committer.DorisCommitter;
import org.apache.doris.flink.sink.writer.DorisRecordSerializer;
import org.apache.doris.flink.sink.writer.DorisWriter;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.doris.flink.sink.writer.DorisWriterState;
import org.apache.doris.flink.sink.writer.DorisWriterStateSerializer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Collection;
import java.util.Collections;

/**
* Load data into Doris based on 2PC.
* see {@link DorisWriter} and {@link DorisCommitter}.
* @param <IN> type of record.
*/
public class DorisSink<IN> implements Sink<IN, DorisCommittable, DorisWriterState, DorisCommittable> {
public class DorisSink<IN>
implements StatefulSink<IN, DorisWriterState>,
TwoPhaseCommittingSink<IN, DorisCommittable>{

private static final Logger LOG = LoggerFactory.getLogger(DorisSink.class);
private final DorisOptions dorisOptions;
Expand Down Expand Up @@ -75,36 +76,32 @@ private void checkKeyType() {
}

@Override
public SinkWriter<IN, DorisCommittable, DorisWriterState> createWriter(InitContext initContext, List<DorisWriterState> state) throws IOException {
DorisWriter<IN> dorisWriter = new DorisWriter<IN>(initContext, state, serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions);
dorisWriter.initializeLoad(state);
public DorisWriter<IN> createWriter(InitContext initContext) throws IOException {
DorisWriter<IN> dorisWriter = new DorisWriter<>(initContext, Collections.emptyList(), serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions);
return dorisWriter;
}

@Override
public Optional<SimpleVersionedSerializer<DorisWriterState>> getWriterStateSerializer() {
return Optional.of(new DorisWriterStateSerializer());
public Committer<DorisCommittable> createCommitter() throws IOException {
return new DorisCommitter(dorisOptions, dorisReadOptions, dorisExecutionOptions.getMaxRetries());
}

@Override
public Optional<Committer<DorisCommittable>> createCommitter() throws IOException {
return Optional.of(new DorisCommitter(dorisOptions, dorisReadOptions, dorisExecutionOptions.getMaxRetries()));
public DorisWriter<IN> restoreWriter(InitContext initContext, Collection<DorisWriterState> recoveredState) throws IOException {
DorisWriter<IN> dorisWriter = new DorisWriter<>(initContext, recoveredState, serializer, dorisOptions, dorisReadOptions, dorisExecutionOptions);
return dorisWriter;
}

@Override
public Optional<GlobalCommitter<DorisCommittable, DorisCommittable>> createGlobalCommitter() throws IOException {
return Optional.empty();
public SimpleVersionedSerializer<DorisWriterState> getWriterStateSerializer() {
return new DorisWriterStateSerializer();
}

@Override
public Optional<SimpleVersionedSerializer<DorisCommittable>> getCommittableSerializer() {
return Optional.of(new DorisCommittableSerializer());
public SimpleVersionedSerializer<DorisCommittable> getCommittableSerializer() {
return new DorisCommittableSerializer();
}

@Override
public Optional<SimpleVersionedSerializer<DorisCommittable>> getGlobalCommittableSerializer() {
return Optional.empty();
}

public static <IN> Builder<IN> builder() {
return new Builder<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.ResponseUtil;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
Expand All @@ -38,18 +38,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;

/**
* The committer to commit transaction.
*/
public class DorisCommitter implements Committer<DorisCommittable> {
public class DorisCommitter implements Committer<DorisCommittable>, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(DorisCommitter.class);
private static final String commitPattern = "http://%s/api/%s/_stream_load_2pc";
private final CloseableHttpClient httpClient;
Expand All @@ -75,11 +75,10 @@ public DorisCommitter(DorisOptions dorisOptions, DorisReadOptions dorisReadOptio
}

@Override
public List<DorisCommittable> commit(List<DorisCommittable> committableList) throws IOException {
for (DorisCommittable committable : committableList) {
commitTransaction(committable);
public void commit(Collection<CommitRequest<DorisCommittable>> requests) throws IOException, InterruptedException {
for (CommitRequest<DorisCommittable> request: requests) {
commitTransaction(request.getCommittable());
}
return Collections.emptyList();
}

private void commitTransaction(DorisCommittable committable) throws IOException {
Expand Down Expand Up @@ -133,9 +132,12 @@ private void commitTransaction(DorisCommittable committable) throws IOException
}

@Override
public void close() throws Exception {
public void close() {
if (httpClient != null) {
httpClient.close();
try {
httpClient.close();
} catch (IOException e) {
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID);
while (true) {
try {
// TODO: According to label abort txn. Currently, it can only be aborted based on txnid,
// so we must first request a streamload based on the label to get the txnid.
String label = labelGenerator.generateLabel(startChkID);
HttpPutBuilder builder = new HttpPutBuilder();
builder.setUrl(loadUrlStr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.flink.sink.writer;

import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
Expand All @@ -27,11 +28,10 @@
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpUtil;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.util.Preconditions;
Expand All @@ -42,6 +42,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand All @@ -56,7 +57,8 @@
* Doris Writer will load data to doris.
* @param <IN>
*/
public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWriterState> {
public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, DorisWriterState>,
TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, DorisCommittable> {
private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.class);
private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
private final long lastCheckpointId;
Expand All @@ -78,7 +80,7 @@ public class DorisWriter<IN> implements SinkWriter<IN, DorisCommittable, DorisWr
private String currentLabel;

public DorisWriter(Sink.InitContext initContext,
List<DorisWriterState> state,
Collection<DorisWriterState> state,
DorisRecordSerializer<IN> serializer,
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
Expand All @@ -100,9 +102,11 @@ public DorisWriter(Sink.InitContext initContext,
this.executionOptions = executionOptions;
this.intervalTime = executionOptions.checkInterval();
this.loading = false;

initializeLoad(state);
}

public void initializeLoad(List<DorisWriterState> state) throws IOException {
public void initializeLoad(Collection<DorisWriterState> state) {
this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil(
dorisOptions.getBenodes())
: new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
Expand Down Expand Up @@ -144,7 +148,13 @@ public void write(IN in, Context context) throws IOException {
}

@Override
public List<DorisCommittable> prepareCommit(boolean flush) throws IOException {
public void flush(boolean flush) throws IOException, InterruptedException {

}


@Override
public Collection<DorisCommittable> prepareCommit() throws IOException, InterruptedException {
if(!loading){
//There is no data during the entire checkpoint period
return Collections.emptyList();
Expand Down Expand Up @@ -246,4 +256,5 @@ public void close() throws Exception {
dorisStreamLoad.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkProvider;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -104,7 +103,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.setDorisReadOptions(readOptions)
.setDorisExecutionOptions(executionOptions)
.setSerializer(serializerBuilder.build());
return SinkProvider.of(dorisSinkBuilder.build(), sinkParallelism);
return SinkV2Provider.of(dorisSinkBuilder.build(), sinkParallelism);
}else{
DorisBatchSink.Builder<RowData> dorisBatchSinkBuilder = DorisBatchSink.builder();
dorisBatchSinkBuilder.setDorisOptions(options)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.sink.committer;

import org.apache.flink.api.connector.sink2.Committer;

public class MockCommitRequest <CommT> implements Committer.CommitRequest<CommT>{

private final CommT committable;

public MockCommitRequest(CommT committable) {
this.committable = committable;
}

@Override
public CommT getCommittable() {
return committable;
}

@Override
public int getNumberOfRetries() {
return 0;
}

@Override
public void signalFailedWithKnownReason(Throwable throwable) {

}

@Override
public void signalFailedWithUnknownReason(Throwable throwable) {

}

@Override
public void retryLater() {

}

@Override
public void updateAndRetryLater(CommT commT) {

}

@Override
public void signalAlreadyCommitted() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpEntityMock;
import org.apache.doris.flink.sink.OptionUtils;

import org.apache.http.ProtocolVersion;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand All @@ -34,16 +33,15 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import org.mockito.MockedStatic;
import org.slf4j.Logger;

import java.util.Collections;

import static org.mockito.ArgumentMatchers.any;
import org.mockito.MockedStatic;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
import org.slf4j.Logger;

/**
* Test for Doris Committer.
Expand Down Expand Up @@ -83,7 +81,8 @@ public void testCommitted() throws Exception {
"\"msg\": \"errCode = 2, detailMessage = transaction [2] is already visible, not pre-committed.\"\n" +
"}";
this.entityMock.setValue(response);
dorisCommitter.commit(Collections.singletonList(dorisCommittable));
final MockCommitRequest<DorisCommittable> request = new MockCommitRequest<>(dorisCommittable);
dorisCommitter.commit(Collections.singletonList(request));

}

Expand All @@ -94,7 +93,8 @@ public void testCommitAbort() throws Exception {
"\"msg\": \"errCode = 2, detailMessage = transaction [25] is already aborted. abort reason: User Abort\"\n" +
"}";
this.entityMock.setValue(response);
dorisCommitter.commit(Collections.singletonList(dorisCommittable));
final MockCommitRequest<DorisCommittable> request = new MockCommitRequest<>(dorisCommittable);
dorisCommitter.commit(Collections.singletonList(request));
}

@After
Expand Down
Loading

0 comments on commit a355ca9

Please sign in to comment.