diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/FlinkConnectorOptions.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/FlinkConnectorOptions.java index 3bc76cd27..31b9c4ecc 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/FlinkConnectorOptions.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/FlinkConnectorOptions.java @@ -98,6 +98,12 @@ public class FlinkConnectorOptions { + "the new partitions for partitioned table while scanning." + " A non-positive value disables the partition discovery."); + public static final ConfigOption SINK_IGNORE_DELETE = + ConfigOptions.key("sink.ignore_delete") + .booleanType() + .defaultValue(false) + .withDescription("Whether to ignore retract(-U/-D) record."); + // -------------------------------------------------------------------------------------------- // table storage specific options // -------------------------------------------------------------------------------------------- diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java index 7f08a4765..4f02323d7 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java @@ -150,7 +150,8 @@ public DynamicTableSink createDynamicTableSink(Context context) { rowType, context.getPrimaryKeyIndexes(), isStreamingMode, - tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE))); + tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)), + tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE)); } @Override @@ -174,6 +175,7 @@ public Set> optionalOptions() { FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP, FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL, FlinkConnectorOptions.LOOKUP_ASYNC, + FlinkConnectorOptions.SINK_IGNORE_DELETE, LookupOptions.MAX_RETRIES, LookupOptions.CACHE_TYPE, LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/AppendSinkFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/AppendSinkFunction.java index 5b96fb851..5e92a0c3a 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/AppendSinkFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/AppendSinkFunction.java @@ -35,8 +35,12 @@ class AppendSinkFunction extends FlinkSinkFunction { private transient AppendWriter appendWriter; - AppendSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) { - super(tablePath, flussConfig, tableRowType); + AppendSinkFunction( + TablePath tablePath, + Configuration flussConfig, + RowType tableRowType, + boolean ignoreDelete) { + super(tablePath, flussConfig, tableRowType, ignoreDelete); } @Override diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkFunction.java index 171870cc3..309e199be 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkFunction.java @@ -63,6 +63,7 @@ abstract class FlinkSinkFunction extends RichSinkFunction private final Configuration flussConfig; protected final RowType tableRowType; protected final @Nullable int[] targetColumnIndexes; + private final boolean ignoreDelete; private transient Connection connection; protected transient Table table; @@ -75,19 +76,25 @@ abstract class FlinkSinkFunction extends RichSinkFunction private transient Counter numRecordsOutErrorsCounter; private volatile Throwable asyncWriterException; - public FlinkSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) { - this(tablePath, flussConfig, tableRowType, null); + public FlinkSinkFunction( + TablePath tablePath, + Configuration flussConfig, + RowType tableRowType, + boolean ignoreDelete) { + this(tablePath, flussConfig, tableRowType, null, ignoreDelete); } public FlinkSinkFunction( TablePath tablePath, Configuration flussConfig, RowType tableRowType, - @Nullable int[] targetColumns) { + @Nullable int[] targetColumns, + boolean ignoreDelete) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.targetColumnIndexes = targetColumns; this.tableRowType = tableRowType; + this.ignoreDelete = ignoreDelete; } @Override @@ -117,6 +124,12 @@ protected void initMetrics() { @Override public void invoke(RowData value, SinkFunction.Context context) throws IOException { checkAsyncException(); + if (ignoreDelete + && (value.getRowKind() == RowKind.UPDATE_BEFORE + || value.getRowKind() == RowKind.DELETE)) { + return; + } + InternalRow internalRow = dataConverter.toInternalRow(value); CompletableFuture writeFuture = writeRow(value.getRowKind(), internalRow); writeFuture.exceptionally( diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java index d9a9b96db..e83e70220 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java @@ -65,6 +65,7 @@ public class FlinkTableSink private final int[] primaryKeyIndexes; private final boolean streaming; @Nullable private final MergeEngine mergeEngine; + private final boolean ignoreDelete; private boolean appliedUpdates = false; @Nullable private GenericRow deleteRow; @@ -75,13 +76,15 @@ public FlinkTableSink( RowType tableRowType, int[] primaryKeyIndexes, boolean streaming, - @Nullable MergeEngine mergeEngine) { + @Nullable MergeEngine mergeEngine, + boolean ignoreDelete) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableRowType = tableRowType; this.primaryKeyIndexes = primaryKeyIndexes; this.streaming = streaming; this.mergeEngine = mergeEngine; + this.ignoreDelete = ignoreDelete; } @Override @@ -89,18 +92,22 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { if (!streaming) { return ChangelogMode.insertOnly(); } else { - if (primaryKeyIndexes.length > 0) { - // pk table - ChangelogMode.Builder builder = ChangelogMode.newBuilder(); - for (RowKind kind : requestedMode.getContainedKinds()) { - if (kind != RowKind.UPDATE_BEFORE) { - builder.addContainedKind(kind); + if (!streaming) { + return ChangelogMode.insertOnly(); + } else { + if (primaryKeyIndexes.length > 0 || ignoreDelete) { + // primary-key table or ignore_delete mode can accept RowKind.DELETE + ChangelogMode.Builder builder = ChangelogMode.newBuilder(); + for (RowKind kind : requestedMode.getContainedKinds()) { + // optimize out the update_before messages + if (kind != RowKind.UPDATE_BEFORE) { + builder.addContainedKind(kind); + } } + return builder.build(); + } else { + return ChangelogMode.insertOnly(); } - return builder.build(); - } else { - // append only - return ChangelogMode.insertOnly(); } } } @@ -159,8 +166,13 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { FlinkSinkFunction sinkFunction = primaryKeyIndexes.length > 0 ? new UpsertSinkFunction( - tablePath, flussConfig, tableRowType, targetColumnIndexes) - : new AppendSinkFunction(tablePath, flussConfig, tableRowType); + tablePath, + flussConfig, + tableRowType, + targetColumnIndexes, + ignoreDelete) + : new AppendSinkFunction( + tablePath, flussConfig, tableRowType, ignoreDelete); return SinkFunctionProvider.of(sinkFunction); } @@ -182,7 +194,8 @@ public DynamicTableSink copy() { tableRowType, primaryKeyIndexes, streaming, - mergeEngine); + mergeEngine, + ignoreDelete); sink.appliedUpdates = appliedUpdates; sink.deleteRow = deleteRow; return sink; diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/UpsertSinkFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/UpsertSinkFunction.java index 6f878af6a..d87a91988 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/UpsertSinkFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/UpsertSinkFunction.java @@ -42,8 +42,9 @@ class UpsertSinkFunction extends FlinkSinkFunction { TablePath tablePath, Configuration flussConfig, RowType tableRowType, - @Nullable int[] targetColumnIndexes) { - super(tablePath, flussConfig, tableRowType, targetColumnIndexes); + @Nullable int[] targetColumnIndexes, + boolean ignoreDelete) { + super(tablePath, flussConfig, tableRowType, targetColumnIndexes, ignoreDelete); } @Override diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkFunctionTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkFunctionTest.java index 09428a7df..35e86a989 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkFunctionTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkSinkFunctionTest.java @@ -67,7 +67,8 @@ void testSinkMetrics(String clientId) throws Exception { Arrays.asList( new RowType.RowField("id", DataTypes.INT().getLogicalType()), new RowType.RowField("name", DataTypes.STRING().getLogicalType()))); - FlinkSinkFunction flinkSinkFunction = new AppendSinkFunction(tablePath, flussConf, rowType); + FlinkSinkFunction flinkSinkFunction = + new AppendSinkFunction(tablePath, flussConf, rowType, false); InterceptingOperatorMetricGroup interceptingOperatorMetricGroup = new InterceptingOperatorMetricGroup(); MockStreamingRuntimeContext mockStreamingRuntimeContext = diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java index bcc77cfd5..1d6568c77 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java @@ -41,6 +41,7 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -412,6 +413,44 @@ void testInsertWithoutSpecifiedCols() { .isEqualTo(expectPlan); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testIgnoreDelete(boolean isPrimaryKeyTable) throws Exception { + String sinkName = + isPrimaryKeyTable + ? "ignore_delete_primary_key_table_sink" + : "ignore_delete_log_table_sink"; + String sourceName = isPrimaryKeyTable ? "source_primary_key_table" : "source_log_table"; + org.apache.flink.table.api.Table cdcSourceData = + tEnv.fromChangelogStream( + env.fromData( + Row.ofKind(RowKind.INSERT, 1, 3501L, "Tim"), + Row.ofKind(RowKind.DELETE, 1, 3501L, "Tim"), + Row.ofKind(RowKind.INSERT, 2, 3502L, "Fabian"), + Row.ofKind(RowKind.UPDATE_BEFORE, 2, 3502L, "Fabian"), + Row.ofKind(RowKind.UPDATE_AFTER, 3, 3503L, "coco"))); + tEnv.createTemporaryView(String.format("%s", sourceName), cdcSourceData); + + tEnv.executeSql( + String.format( + "create table %s (" + + "a int not null, " + + "b bigint, " + + "c string " + + (isPrimaryKeyTable ? ", primary key (a) NOT ENFORCED" : "") + + ") with('bucket.num' = '3'," + + " 'sink.ignore_delete'='true')", + sinkName)); + tEnv.executeSql(String.format("INSERT INTO %s SELECT * FROM %s", sinkName, sourceName)) + .await(); + + CloseableIterator rowIter = + tEnv.executeSql(String.format("select * from %s", sinkName)).collect(); + List expectedRows = + Arrays.asList("+I[1, 3501, Tim]", "+I[2, 3502, Fabian]", "+I[3, 3503, coco]"); + assertResultsIgnoreOrder(rowIter, expectedRows, true); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) void testWritePartitionedTable(boolean isPrimaryKeyTable) throws Exception { diff --git a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java index 805180d22..508013a65 100644 --- a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java +++ b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/testutils/TestingDatabaseSyncSink.java @@ -88,7 +88,8 @@ public void invoke(MultiplexCdcRecord record, SinkFunction.Context context) thro FlinkConversions.toFlinkRowType(rowType), tableDescriptor.getSchema().getPrimaryKeyIndexes(), true, - null); + null, + false); sinkFunction = ((SinkFunctionProvider)