Skip to content

Commit

Permalink
[flink] support setting uid suffix for source/sink to improve state …
Browse files Browse the repository at this point in the history
…compatibility when jobGraph changes. (apache#4425)

* [flink] support setting uid suffix for source/sink to improve state compatibility when jobGraph changes.

* resolve comment
  • Loading branch information
liming30 authored and guanshi committed Nov 7, 2024
1 parent 34872b2 commit c0f4406
Show file tree
Hide file tree
Showing 6 changed files with 414 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,18 @@
<td>Boolean</td>
<td>If true, flink sink will use managed memory for merge tree; otherwise, it will create an independent memory allocator.</td>
</tr>
<tr>
<td><h5>sink.operator-uid.suffix</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Set the uid suffix for the writer, dynamic bucket assigner and committer operators. The uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will automatically generate the operator uid, which may be incompatible when the topology changes.</td>
</tr>
<tr>
<td><h5>source.operator-uid.suffix</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Set the uid suffix for the source operators. After setting, the uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will automatically generate the operator uid, which may be incompatible when the topology changes.</td>
</tr>
<tr>
<td><h5>source.checkpoint-align.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,24 @@ public class FlinkConnectorOptions {
+ "in order to compact several changelog files from the same partition into large ones, "
+ "which can decrease the number of small files. ");

public static final ConfigOption<String> SOURCE_OPERATOR_UID_SUFFIX =
key("source.operator-uid.suffix")
.stringType()
.noDefaultValue()
.withDescription(
"Set the uid suffix for the source operators. After setting, the uid format is "
+ "${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will "
+ "automatically generate the operator uid, which may be incompatible when the topology changes.");

public static final ConfigOption<String> SINK_OPERATOR_UID_SUFFIX =
key("sink.operator-uid.suffix")
.stringType()
.noDefaultValue()
.withDescription(
"Set the uid suffix for the writer, dynamic bucket assigner and committer operators. The uid format is "
+ "${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid suffix is not set, flink will "
+ "automatically generate the operator uid, which may be incompatible when the topology changes.");

public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
Expand All @@ -429,6 +447,11 @@ public static List<ConfigOption<?>> getOptions() {
return list;
}

public static String generateCustomUid(
String uidPrefix, String tableName, String userDefinedSuffix) {
return String.format("%s_%s_%s", uidPrefix, tableName, userDefinedSuffix);
}

/** The mode of lookup cache. */
public enum LookupCacheMode {
/** Auto mode, try to use partial mode. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,31 @@
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
import org.apache.paimon.utils.SerializableFunction;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import javax.annotation.Nullable;

import java.util.Map;

import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX;
import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;

/** Sink for dynamic bucket table. */
public abstract class DynamicBucketSink<T> extends FlinkWriteSink<Tuple2<T, Integer>> {

private static final long serialVersionUID = 1L;

private static final String DYNAMIC_BUCKET_ASSIGNER_NAME = "dynamic-bucket-assigner";

public DynamicBucketSink(
FileStoreTable table, @Nullable Map<String, String> overwritePartition) {
super(table, overwritePartition);
Expand Down Expand Up @@ -88,11 +94,20 @@ public DataStreamSink<?> build(DataStream<T> input, @Nullable Integer parallelis
initialCommitUser, table, numAssigners, extractorFunction(), false);
TupleTypeInfo<Tuple2<T, Integer>> rowWithBucketType =
new TupleTypeInfo<>(partitionByKeyHash.getType(), BasicTypeInfo.INT_TYPE_INFO);
DataStream<Tuple2<T, Integer>> bucketAssigned =
SingleOutputStreamOperator<Tuple2<T, Integer>> bucketAssigned =
partitionByKeyHash
.transform("dynamic-bucket-assigner", rowWithBucketType, assignerOperator)
.transform(
DYNAMIC_BUCKET_ASSIGNER_NAME, rowWithBucketType, assignerOperator)
.setParallelism(partitionByKeyHash.getParallelism());

String uidSuffix = table.options().get(SINK_OPERATOR_UID_SUFFIX.key());
if (!StringUtils.isNullOrWhitespaceOnly(uidSuffix)) {
bucketAssigned =
bucketAssigned.uid(
generateCustomUid(
DYNAMIC_BUCKET_ASSIGNER_NAME, table.name(), uidSuffix));
}

// 3. shuffle by partition & bucket
DataStream<Tuple2<T, Integer>> partitionByBucket =
partition(bucketAssigned, channelComputer2(), parallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_MEMORY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_OPERATOR_UID_SUFFIX;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY;
import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid;
import static org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory;
import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand Down Expand Up @@ -228,6 +230,12 @@ public DataStream<Committable> doWrite(
.setParallelism(parallelism == null ? input.getParallelism() : parallelism);

Options options = Options.fromMap(table.options());

String uidSuffix = options.get(SINK_OPERATOR_UID_SUFFIX);
if (options.get(SINK_OPERATOR_UID_SUFFIX) != null) {
written = written.uid(generateCustomUid(WRITER_NAME, table.name(), uidSuffix));
}

if (options.get(SINK_USE_MANAGED_MEMORY)) {
declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
}
Expand Down Expand Up @@ -295,6 +303,14 @@ protected DataStreamSink<?> doCommit(DataStream<Committable> written, String com
committerOperator)
.setParallelism(1)
.setMaxParallelism(1);
if (options.get(SINK_OPERATOR_UID_SUFFIX) != null) {
committed =
committed.uid(
generateCustomUid(
GLOBAL_COMMITTER_NAME,
table.name(),
options.get(SINK_OPERATOR_UID_SUFFIX)));
}
configureGlobalCommitter(
committed, options.get(SINK_COMMITTER_CPU), options.get(SINK_COMMITTER_MEMORY));
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.StringUtils;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
Expand Down Expand Up @@ -63,6 +64,8 @@

import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.apache.paimon.CoreOptions.StreamingReadMode.FILE;
import static org.apache.paimon.flink.FlinkConnectorOptions.SOURCE_OPERATOR_UID_SUFFIX;
import static org.apache.paimon.flink.FlinkConnectorOptions.generateCustomUid;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
Expand All @@ -73,6 +76,7 @@
* @since 0.8
*/
public class FlinkSourceBuilder {
private static final String SOURCE_NAME = "Source";

private final Table table;
private final Options conf;
Expand Down Expand Up @@ -210,6 +214,14 @@ private DataStream<RowData> toDataStream(Source<RowData, ?, ?> source) {
: watermarkStrategy,
sourceName,
produceTypeInfo());

String uidSuffix = table.options().get(SOURCE_OPERATOR_UID_SUFFIX.key());
if (!StringUtils.isNullOrWhitespaceOnly(uidSuffix)) {
dataStream =
(DataStreamSource<RowData>)
dataStream.uid(generateCustomUid(SOURCE_NAME, table.name(), uidSuffix));
}

if (parallelism != null) {
dataStream.setParallelism(parallelism);
}
Expand Down
Loading

0 comments on commit c0f4406

Please sign in to comment.