Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
loserwang1024 committed Jan 7, 2025
1 parent e448f76 commit 0c4b74d
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.alibaba.fluss.connector.flink;

import com.alibaba.fluss.config.FlussConfigUtils;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;

import org.apache.flink.configuration.ConfigOption;
Expand Down Expand Up @@ -99,14 +98,11 @@ public class FlinkConnectorOptions {
+ "the new partitions for partitioned table while scanning."
+ " A non-positive value disables the partition discovery.");

public static final ConfigOption<DeleteStrategy> SINK_DELETE_STRATEGY =
ConfigOptions.key("sink.delete-strategy")
.enumType(DeleteStrategy.class)
.defaultValue(DeleteStrategy.CHANGELOG_STANDARD)
.withDescription(
"This field is used to decide what to do when data of type -D/-U is received. "
+ "`IGNORE_DELETE` means ignoring the `-D` and `-U` type message. "
+ "`CHANGELOG_STANDARD` means neither `-U` nor `-D` is ignored, they both cause the corresponding row in fluss to be deleted");
public static final ConfigOption<Boolean> SINK_IGNORE_DELETE =
ConfigOptions.key("sink.ignore_delete")
.booleanType()
.defaultValue(false)
.withDescription("Whether to ignore retract(-U/-D) record.");

// --------------------------------------------------------------------------------------------
// table storage specific options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
rowType,
context.getPrimaryKeyIndexes(),
isStreamingMode,
tableOptions.get(FlinkConnectorOptions.SINK_DELETE_STRATEGY));
tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE));
}

@Override
Expand All @@ -168,7 +168,7 @@ public Set<ConfigOption<?>> optionalOptions() {
FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP,
FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL,
FlinkConnectorOptions.LOOKUP_ASYNC,
FlinkConnectorOptions.SINK_DELETE_STRATEGY,
FlinkConnectorOptions.SINK_IGNORE_DELETE,
LookupOptions.MAX_RETRIES,
LookupOptions.CACHE_TYPE,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.alibaba.fluss.client.table.writer.AppendWriter;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkRowToFlussRowConverter;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;
Expand All @@ -40,8 +39,8 @@ class AppendSinkFunction extends FlinkSinkFunction {
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
DeleteStrategy deleteStrategy) {
super(tablePath, flussConfig, tableRowType, deleteStrategy);
boolean ignoreDelete) {
super(tablePath, flussConfig, tableRowType, ignoreDelete);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.alibaba.fluss.client.table.Table;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.metrics.FlinkMetricRegistry;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.connector.flink.utils.FlinkRowToFlussRowConverter;
import com.alibaba.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -64,7 +63,7 @@ abstract class FlinkSinkFunction extends RichSinkFunction<RowData>
private final Configuration flussConfig;
protected final RowType tableRowType;
protected final @Nullable int[] targetColumnIndexes;
private final DeleteStrategy deleteStrategy;
private final boolean ignoreDelete;

private transient Connection connection;
protected transient Table table;
Expand All @@ -77,26 +76,25 @@ abstract class FlinkSinkFunction extends RichSinkFunction<RowData>
private transient Counter numRecordsOutErrorsCounter;
private volatile Throwable asyncWriterException;


public FlinkSinkFunction(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
DeleteStrategy deleteStrategy) {
this(tablePath, flussConfig, tableRowType, null, deleteStrategy);
boolean ignoreDelete) {
this(tablePath, flussConfig, tableRowType, null, ignoreDelete);
}

public FlinkSinkFunction(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
@Nullable int[] targetColumns,
DeleteStrategy deleteStrategy) {
boolean ignoreDelete) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.targetColumnIndexes = targetColumns;
this.tableRowType = tableRowType;
this.deleteStrategy = deleteStrategy;
this.ignoreDelete = ignoreDelete;
}

@Override
Expand Down Expand Up @@ -126,7 +124,7 @@ protected void initMetrics() {
@Override
public void invoke(RowData value, SinkFunction.Context context) throws IOException {
checkAsyncException();
if (DeleteStrategy.IGNORE_DELETE.equals(deleteStrategy)
if (ignoreDelete
&& (value.getRowKind() == RowKind.UPDATE_BEFORE
|| value.getRowKind() == RowKind.DELETE)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.alibaba.fluss.connector.flink.sink;

import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.FieldEqual;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion;
Expand Down Expand Up @@ -64,7 +63,7 @@ public class FlinkTableSink
private final RowType tableRowType;
private final int[] primaryKeyIndexes;
private final boolean streaming;
private final DeleteStrategy deleteStrategy;
private final boolean ignoreDelete;

private boolean appliedUpdates = false;
@Nullable private GenericRow deleteRow;
Expand All @@ -75,13 +74,13 @@ public FlinkTableSink(
RowType tableRowType,
int[] primaryKeyIndexes,
boolean streaming,
DeleteStrategy deleteStrategy) {
boolean ignoreDelete) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
this.primaryKeyIndexes = primaryKeyIndexes;
this.streaming = streaming;
this.deleteStrategy = deleteStrategy;
this.ignoreDelete = ignoreDelete;
}

@Override
Expand All @@ -92,8 +91,7 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
if (!streaming) {
return ChangelogMode.insertOnly();
} else {
if (primaryKeyIndexes.length > 0
|| DeleteStrategy.IGNORE_DELETE.equals(deleteStrategy)) {
if (primaryKeyIndexes.length > 0 || ignoreDelete) {
// primary-key table or ignore_delete mode can accept RowKind.DELETE
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
for (RowKind kind : requestedMode.getContainedKinds()) {
Expand Down Expand Up @@ -160,9 +158,9 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
flussConfig,
tableRowType,
targetColumnIndexes,
deleteStrategy)
ignoreDelete)
: new AppendSinkFunction(
tablePath, flussConfig, tableRowType, deleteStrategy);
tablePath, flussConfig, tableRowType, ignoreDelete);

return SinkFunctionProvider.of(sinkFunction);
}
Expand All @@ -184,7 +182,7 @@ public DynamicTableSink copy() {
tableRowType,
primaryKeyIndexes,
streaming,
deleteStrategy);
ignoreDelete);
sink.appliedUpdates = appliedUpdates;
sink.deleteRow = deleteRow;
return sink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.alibaba.fluss.client.table.writer.UpsertWrite;
import com.alibaba.fluss.client.table.writer.UpsertWriter;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkRowToFlussRowConverter;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;
Expand All @@ -44,8 +43,8 @@ class UpsertSinkFunction extends FlinkSinkFunction {
Configuration flussConfig,
RowType tableRowType,
@Nullable int[] targetColumnIndexes,
DeleteStrategy deleteStrategy) {
super(tablePath, flussConfig, tableRowType, targetColumnIndexes, deleteStrategy);
boolean ignoreDelete) {
super(tablePath, flussConfig, tableRowType, targetColumnIndexes, ignoreDelete);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.source.testutils.FlinkTestBase;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -69,8 +68,7 @@ void testSinkMetrics(String clientId) throws Exception {
new RowType.RowField("id", DataTypes.INT().getLogicalType()),
new RowType.RowField("name", DataTypes.STRING().getLogicalType())));
FlinkSinkFunction flinkSinkFunction =
new AppendSinkFunction(
tablePath, flussConf, rowType, DeleteStrategy.CHANGELOG_STANDARD);
new AppendSinkFunction(tablePath, flussConf, rowType, false);
InterceptingOperatorMetricGroup interceptingOperatorMetricGroup =
new InterceptingOperatorMetricGroup();
MockStreamingRuntimeContext mockStreamingRuntimeContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,14 @@ void testInsertWithoutSpecifiedCols() {
.isEqualTo(expectPlan);
}

@Test
void testIgnoreDelete() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testIgnoreDelete(boolean isPrimaryKeyTable) throws Exception {
String sinkName =
isPrimaryKeyTable
? "ignore_delete_primary_key_table_sink"
: "ignore_delete_log_table_sink";
String sourceName = isPrimaryKeyTable ? "source_primary_key_table" : "source_log_table";
org.apache.flink.table.api.Table cdcSourceData =
tEnv.fromChangelogStream(
env.fromData(
Expand All @@ -389,13 +395,23 @@ void testIgnoreDelete() throws Exception {
Row.ofKind(RowKind.INSERT, 2, 3502L, "Fabian"),
Row.ofKind(RowKind.UPDATE_BEFORE, 2, 3502L, "Fabian"),
Row.ofKind(RowKind.UPDATE_AFTER, 3, 3503L, "coco")));
tEnv.createTemporaryView("cdcSourceData", cdcSourceData);
tEnv.createTemporaryView(String.format("%s", sourceName), cdcSourceData);

tEnv.executeSql(
"create table sink_test (a int not null, b bigint, c string) with('bucket.num' = '3', 'sink.delete-strategy'='ignore_delete')");
tEnv.executeSql("INSERT INTO sink_test SELECT * FROM cdcSourceData").await();
String.format(
"create table %s ("
+ "a int not null, "
+ "b bigint, "
+ "c string "
+ (isPrimaryKeyTable ? ", primary key (a) NOT ENFORCED" : "")
+ ") with('bucket.num' = '3',"
+ " 'sink.ignore_delete'='true')",
sinkName));
tEnv.executeSql(String.format("INSERT INTO %s SELECT * FROM %s", sinkName, sourceName))
.await();

CloseableIterator<Row> rowIter = tEnv.executeSql("select * from sink_test").collect();
CloseableIterator<Row> rowIter =
tEnv.executeSql(String.format("select * from %s", sinkName)).collect();
List<String> expectedRows =
Arrays.asList("+I[1, 3501, Tim]", "+I[2, 3502, Fabian]", "+I[3, 3503, coco]");
assertResultsIgnoreOrder(rowIter, expectedRows, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.alibaba.fluss.client.ConnectionFactory;
import com.alibaba.fluss.client.admin.Admin;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.sink.FlinkTableSink;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.lakehouse.paimon.record.MultiplexCdcRecord;
Expand Down Expand Up @@ -89,7 +88,7 @@ public void invoke(MultiplexCdcRecord record, SinkFunction.Context context) thro
FlinkConversions.toFlinkRowType(rowType),
tableDescriptor.getSchema().getPrimaryKeyIndexes(),
true,
DeleteStrategy.CHANGELOG_STANDARD);
false);

sinkFunction =
((SinkFunctionProvider)
Expand Down

0 comments on commit 0c4b74d

Please sign in to comment.