Skip to content

Commit

Permalink
[connector/flink] Fluss sink supports ignore_delete. (#272)
Browse files Browse the repository at this point in the history
  • Loading branch information
loserwang1024 authored Jan 10, 2025
1 parent 28b3c3c commit c25c3b0
Showing 9 changed files with 104 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -98,6 +98,12 @@ public class FlinkConnectorOptions {
+ "the new partitions for partitioned table while scanning."
+ " A non-positive value disables the partition discovery.");

public static final ConfigOption<Boolean> SINK_IGNORE_DELETE =
ConfigOptions.key("sink.ignore_delete")
.booleanType()
.defaultValue(false)
.withDescription("Whether to ignore retract(-U/-D) record.");

// --------------------------------------------------------------------------------------------
// table storage specific options
// --------------------------------------------------------------------------------------------
Original file line number Diff line number Diff line change
@@ -150,7 +150,8 @@ public DynamicTableSink createDynamicTableSink(Context context) {
rowType,
context.getPrimaryKeyIndexes(),
isStreamingMode,
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE));
}

@Override
@@ -174,6 +175,7 @@ public Set<ConfigOption<?>> optionalOptions() {
FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP,
FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL,
FlinkConnectorOptions.LOOKUP_ASYNC,
FlinkConnectorOptions.SINK_IGNORE_DELETE,
LookupOptions.MAX_RETRIES,
LookupOptions.CACHE_TYPE,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
Original file line number Diff line number Diff line change
@@ -35,8 +35,12 @@ class AppendSinkFunction extends FlinkSinkFunction {

private transient AppendWriter appendWriter;

AppendSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
super(tablePath, flussConfig, tableRowType);
AppendSinkFunction(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
boolean ignoreDelete) {
super(tablePath, flussConfig, tableRowType, ignoreDelete);
}

@Override
Original file line number Diff line number Diff line change
@@ -63,6 +63,7 @@ abstract class FlinkSinkFunction extends RichSinkFunction<RowData>
private final Configuration flussConfig;
protected final RowType tableRowType;
protected final @Nullable int[] targetColumnIndexes;
private final boolean ignoreDelete;

private transient Connection connection;
protected transient Table table;
@@ -75,19 +76,25 @@ abstract class FlinkSinkFunction extends RichSinkFunction<RowData>
private transient Counter numRecordsOutErrorsCounter;
private volatile Throwable asyncWriterException;

public FlinkSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
this(tablePath, flussConfig, tableRowType, null);
public FlinkSinkFunction(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
boolean ignoreDelete) {
this(tablePath, flussConfig, tableRowType, null, ignoreDelete);
}

public FlinkSinkFunction(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
@Nullable int[] targetColumns) {
@Nullable int[] targetColumns,
boolean ignoreDelete) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.targetColumnIndexes = targetColumns;
this.tableRowType = tableRowType;
this.ignoreDelete = ignoreDelete;
}

@Override
@@ -117,6 +124,12 @@ protected void initMetrics() {
@Override
public void invoke(RowData value, SinkFunction.Context context) throws IOException {
checkAsyncException();
if (ignoreDelete
&& (value.getRowKind() == RowKind.UPDATE_BEFORE
|| value.getRowKind() == RowKind.DELETE)) {
return;
}

InternalRow internalRow = dataConverter.toInternalRow(value);
CompletableFuture<Void> writeFuture = writeRow(value.getRowKind(), internalRow);
writeFuture.exceptionally(
Original file line number Diff line number Diff line change
@@ -65,6 +65,7 @@ public class FlinkTableSink
private final int[] primaryKeyIndexes;
private final boolean streaming;
@Nullable private final MergeEngine mergeEngine;
private final boolean ignoreDelete;

private boolean appliedUpdates = false;
@Nullable private GenericRow deleteRow;
@@ -75,32 +76,38 @@ public FlinkTableSink(
RowType tableRowType,
int[] primaryKeyIndexes,
boolean streaming,
@Nullable MergeEngine mergeEngine) {
@Nullable MergeEngine mergeEngine,
boolean ignoreDelete) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
this.primaryKeyIndexes = primaryKeyIndexes;
this.streaming = streaming;
this.mergeEngine = mergeEngine;
this.ignoreDelete = ignoreDelete;
}

@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
if (!streaming) {
return ChangelogMode.insertOnly();
} else {
if (primaryKeyIndexes.length > 0) {
// pk table
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
for (RowKind kind : requestedMode.getContainedKinds()) {
if (kind != RowKind.UPDATE_BEFORE) {
builder.addContainedKind(kind);
if (!streaming) {
return ChangelogMode.insertOnly();
} else {
if (primaryKeyIndexes.length > 0 || ignoreDelete) {
// primary-key table or ignore_delete mode can accept RowKind.DELETE
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
for (RowKind kind : requestedMode.getContainedKinds()) {
// optimize out the update_before messages
if (kind != RowKind.UPDATE_BEFORE) {
builder.addContainedKind(kind);
}
}
return builder.build();
} else {
return ChangelogMode.insertOnly();
}
return builder.build();
} else {
// append only
return ChangelogMode.insertOnly();
}
}
}
@@ -159,8 +166,13 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
FlinkSinkFunction sinkFunction =
primaryKeyIndexes.length > 0
? new UpsertSinkFunction(
tablePath, flussConfig, tableRowType, targetColumnIndexes)
: new AppendSinkFunction(tablePath, flussConfig, tableRowType);
tablePath,
flussConfig,
tableRowType,
targetColumnIndexes,
ignoreDelete)
: new AppendSinkFunction(
tablePath, flussConfig, tableRowType, ignoreDelete);

return SinkFunctionProvider.of(sinkFunction);
}
@@ -182,7 +194,8 @@ public DynamicTableSink copy() {
tableRowType,
primaryKeyIndexes,
streaming,
mergeEngine);
mergeEngine,
ignoreDelete);
sink.appliedUpdates = appliedUpdates;
sink.deleteRow = deleteRow;
return sink;
Original file line number Diff line number Diff line change
@@ -42,8 +42,9 @@ class UpsertSinkFunction extends FlinkSinkFunction {
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
@Nullable int[] targetColumnIndexes) {
super(tablePath, flussConfig, tableRowType, targetColumnIndexes);
@Nullable int[] targetColumnIndexes,
boolean ignoreDelete) {
super(tablePath, flussConfig, tableRowType, targetColumnIndexes, ignoreDelete);
}

@Override
Original file line number Diff line number Diff line change
@@ -67,7 +67,8 @@ void testSinkMetrics(String clientId) throws Exception {
Arrays.asList(
new RowType.RowField("id", DataTypes.INT().getLogicalType()),
new RowType.RowField("name", DataTypes.STRING().getLogicalType())));
FlinkSinkFunction flinkSinkFunction = new AppendSinkFunction(tablePath, flussConf, rowType);
FlinkSinkFunction flinkSinkFunction =
new AppendSinkFunction(tablePath, flussConf, rowType, false);
InterceptingOperatorMetricGroup interceptingOperatorMetricGroup =
new InterceptingOperatorMetricGroup();
MockStreamingRuntimeContext mockStreamingRuntimeContext =
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -412,6 +413,44 @@ void testInsertWithoutSpecifiedCols() {
.isEqualTo(expectPlan);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testIgnoreDelete(boolean isPrimaryKeyTable) throws Exception {
String sinkName =
isPrimaryKeyTable
? "ignore_delete_primary_key_table_sink"
: "ignore_delete_log_table_sink";
String sourceName = isPrimaryKeyTable ? "source_primary_key_table" : "source_log_table";
org.apache.flink.table.api.Table cdcSourceData =
tEnv.fromChangelogStream(
env.fromData(
Row.ofKind(RowKind.INSERT, 1, 3501L, "Tim"),
Row.ofKind(RowKind.DELETE, 1, 3501L, "Tim"),
Row.ofKind(RowKind.INSERT, 2, 3502L, "Fabian"),
Row.ofKind(RowKind.UPDATE_BEFORE, 2, 3502L, "Fabian"),
Row.ofKind(RowKind.UPDATE_AFTER, 3, 3503L, "coco")));
tEnv.createTemporaryView(String.format("%s", sourceName), cdcSourceData);

tEnv.executeSql(
String.format(
"create table %s ("
+ "a int not null, "
+ "b bigint, "
+ "c string "
+ (isPrimaryKeyTable ? ", primary key (a) NOT ENFORCED" : "")
+ ") with('bucket.num' = '3',"
+ " 'sink.ignore_delete'='true')",
sinkName));
tEnv.executeSql(String.format("INSERT INTO %s SELECT * FROM %s", sinkName, sourceName))
.await();

CloseableIterator<Row> rowIter =
tEnv.executeSql(String.format("select * from %s", sinkName)).collect();
List<String> expectedRows =
Arrays.asList("+I[1, 3501, Tim]", "+I[2, 3502, Fabian]", "+I[3, 3503, coco]");
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testWritePartitionedTable(boolean isPrimaryKeyTable) throws Exception {
Original file line number Diff line number Diff line change
@@ -88,7 +88,8 @@ public void invoke(MultiplexCdcRecord record, SinkFunction.Context context) thro
FlinkConversions.toFlinkRowType(rowType),
tableDescriptor.getSchema().getPrimaryKeyIndexes(),
true,
null);
null,
false);

sinkFunction =
((SinkFunctionProvider)

0 comments on commit c25c3b0

Please sign in to comment.