Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] support setting uid suffix for source/sink to improve state compatibility when jobGraph changes. #4425

Merged
merged 2 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,18 @@
<td>Boolean</td>
<td>If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator.</td>
</tr>
<tr>
<td><h5>sink.operator-uid.suffix</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Set the uid suffix for the writer, dynamic bucket assigner and committer operators. The uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will automatically generate the operator uid, which may be incompatible when the topology changes.</td>
</tr>
<tr>
<td><h5>source.operator-uid.suffix</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Set the uid suffix for the source operators. After setting, the uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will automatically generate the operator uid, which may be incompatible when the topology changes.</td>
</tr>
<tr>
<td><h5>source.checkpoint-align.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,24 @@ public class FlinkConnectorOptions {
+ "in order to compact several changelog files from the same partition into large ones, "
+ "which can decrease the number of small files. ");

public static final ConfigOption<String> SOURCE_OPERATOR_UID_SUFFIX =
key("source.operator-uid.suffix")
.stringType()
.noDefaultValue()
.withDescription(
"Set the uid suffix for the source operators. After setting, the uid format is "
+ "${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will "
+ "automatically generate the operator uid, which may be incompatible when the topology changes.");

public static final ConfigOption<String> SINK_OPERATOR_UID_SUFFIX =
key("sink.operator-uid.suffix")
liming30 marked this conversation as resolved.
Show resolved Hide resolved
.stringType()
.noDefaultValue()
.withDescription(
"Set the uid suffix for the writer, dynamic bucket assigner and committer operators. The uid format is "
+ "${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will "
+ "automatically generate the operator uid, which may be incompatible when the topology changes.");

public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,30 @@
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
import org.apache.paimon.utils.SerializableFunction;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import javax.annotation.Nullable;

import java.util.Map;

import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;

/** Sink for dynamic bucket table. */
public abstract class DynamicBucketSink<T> extends FlinkWriteSink<Tuple2<T, Integer>> {

private static final long serialVersionUID = 1L;

private static final String DYNAMIC_BUCKET_ASSIGNER_NAME = "dynamic-bucket-assigner";

public DynamicBucketSink(
FileStoreTable table, @Nullable Map<String, String> overwritePartition) {
super(table, overwritePartition);
Expand Down Expand Up @@ -88,11 +93,21 @@ public DataStreamSink<?> build(DataStream<T> input, @Nullable Integer parallelis
initialCommitUser, table, numAssigners, extractorFunction(), false);
TupleTypeInfo<Tuple2<T, Integer>> rowWithBucketType =
new TupleTypeInfo<>(partitionByKeyHash.getType(), BasicTypeInfo.INT_TYPE_INFO);
DataStream<Tuple2<T, Integer>> bucketAssigned =
SingleOutputStreamOperator<Tuple2<T, Integer>> bucketAssigned =
partitionByKeyHash
.transform("dynamic-bucket-assigner", rowWithBucketType, assignerOperator)
.transform(
DYNAMIC_BUCKET_ASSIGNER_NAME, rowWithBucketType, assignerOperator)
.setParallelism(partitionByKeyHash.getParallelism());

String uidSuffix = table.options().get(SINK_OPERATOR_UID_SUFFIX.key());
if (!StringUtils.isNullOrWhitespaceOnly(uidSuffix)) {
bucketAssigned =
bucketAssigned.uid(
String.format(
liming30 marked this conversation as resolved.
Show resolved Hide resolved
"%s_%s_%s",
DYNAMIC_BUCKET_ASSIGNER_NAME, table.name(), uidSuffix));
}

// 3. shuffle by partition & bucket
DataStream<Tuple2<T, Integer>> partitionByBucket =
partition(bucketAssigned, channelComputer2(), parallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_MEMORY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
import static org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand Down Expand Up @@ -228,6 +229,12 @@ public DataStream<Committable> doWrite(
.setParallelism(parallelism == null ? input.getParallelism() : parallelism);

Options options = Options.fromMap(table.options());

String uidSuffix = options.get(SINK_OPERATOR_UID_SUFFIX);
if (options.get(SINK_OPERATOR_UID_SUFFIX) != null) {
written = written.uid(String.format("%s_%s_%s", WRITER_NAME, table.name(), uidSuffix));
}

if (options.get(SINK_USE_MANAGED_MEMORY)) {
declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
}
Expand Down Expand Up @@ -295,6 +302,14 @@ protected DataStreamSink<?> doCommit(DataStream<Committable> written, String com
committerOperator)
.setParallelism(1)
.setMaxParallelism(1);
if (options.get(SINK_OPERATOR_UID_SUFFIX) != null) {
committed.uid(
String.format(
"%s_%s_%s",
GLOBAL_COMMITTER_NAME,
table.name(),
options.get(SINK_OPERATOR_UID_SUFFIX)));
}
configureGlobalCommitter(
committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY));
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
Expand Down Expand Up @@ -63,6 +64,7 @@

import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.apache.paimon.CoreOptions.StreamingReadMode.FILE;
import static org.apache.paimon.flink.FlinkConnectorOptions.SOURCE_OPERATOR_UID_SUFFIX;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
Expand All @@ -73,6 +75,7 @@
* @since 0.8
*/
public class FlinkSourceBuilder {
private static final String SOURCE_NAME = "Source";

private final Table table;
private final Options conf;
Expand Down Expand Up @@ -210,6 +213,16 @@ private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
: watermarkStrategy,
sourceName,
produceTypeInfo());

String uidSuffix = table.options().get(SOURCE_OPERATOR_UID_SUFFIX.key());
if (!StringUtils.isNullOrWhitespaceOnly(uidSuffix)) {
dataStream =
(DataStreamSource<RowData>)
dataStream.uid(
String.format(
"%s_%s_%s", SOURCE_NAME, table.name(), uidSuffix));
}

if (parallelism != null) {
dataStream.setParallelism(parallelism);
}
Expand Down
Loading
Loading