diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 82a8fc798b14..1c3c747f6b70 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -134,6 +134,12 @@
Duration |
The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication. |
+
+ deduplicate.ignore-delete |
+ false |
+ Boolean |
+ Whether to ignore delete records in deduplicate mode. |
+
dynamic-bucket.assigner-parallelism |
(none) |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 4274ea8d2b9e..90f74ad2a43b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -232,6 +232,12 @@ public class CoreOptions implements Serializable {
.defaultValue(MergeEngine.DEDUPLICATE)
.withDescription("Specify the merge engine for table with primary key.");
+ public static final ConfigOption DEDUPLICATE_IGNORE_DELETE =
+ key("deduplicate.ignore-delete")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to ignore delete records in deduplicate mode.");
+
public static final ConfigOption PARTIAL_UPDATE_IGNORE_DELETE =
key("partial-update.ignore-delete")
.booleanType()
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
index e05e2a33c1c0..b062ce9b6c03 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
@@ -18,7 +18,9 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.options.Options;
import javax.annotation.Nullable;
@@ -28,9 +30,13 @@
*/
public class DeduplicateMergeFunction implements MergeFunction {
+ private final boolean ignoreDelete;
+
private KeyValue latestKv;
- protected DeduplicateMergeFunction() {}
+ protected DeduplicateMergeFunction(boolean ignoreDelete) {
+ this.ignoreDelete = ignoreDelete;
+ }
@Override
public void reset() {
@@ -39,6 +45,9 @@ public void reset() {
@Override
public void add(KeyValue kv) {
+ if (ignoreDelete && kv.valueKind().isRetract()) {
+ return;
+ }
latestKv = kv;
}
@@ -49,16 +58,26 @@ public KeyValue getResult() {
}
public static MergeFunctionFactory factory() {
- return new Factory();
+ return new Factory(new Options());
+ }
+
+ public static MergeFunctionFactory factory(Options options) {
+ return new Factory(options);
}
private static class Factory implements MergeFunctionFactory {
private static final long serialVersionUID = 1L;
+ private final Options options;
+
+ private Factory(Options options) {
+ this.options = options;
+ }
+
@Override
public MergeFunction create(@Nullable int[][] projection) {
- return new DeduplicateMergeFunction();
+ return new DeduplicateMergeFunction(options.get(CoreOptions.DEDUPLICATE_IGNORE_DELETE));
}
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index 727664da2832..72ea4b456cb9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -64,7 +64,7 @@ public static MergeFunctionFactory createMergeFunctionFactory(
switch (mergeEngine) {
case DEDUPLICATE:
- return DeduplicateMergeFunction.factory();
+ return DeduplicateMergeFunction.factory(conf);
case PARTIAL_UPDATE:
return PartialUpdateMergeFunction.factory(conf, rowType, tableSchema.primaryKeys());
case AGGREGATE:
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
index 70e361f876bb..88bba772287f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
@@ -173,7 +173,7 @@ public abstract static class WithDeduplicateMergeFunctionTestBase
@Override
protected MergeFunction createMergeFunction() {
- return new DeduplicateMergeFunction();
+ return DeduplicateMergeFunction.factory().create();
}
@Override
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ReducerMergeFunctionWrapperTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ReducerMergeFunctionWrapperTestBase.java
index f729e023b80a..33aa970042ac 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ReducerMergeFunctionWrapperTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ReducerMergeFunctionWrapperTestBase.java
@@ -77,7 +77,7 @@ public static class WithDeduplicateMergeFunctionTest
@Override
protected MergeFunction createMergeFunction() {
- return new DeduplicateMergeFunction();
+ return DeduplicateMergeFunction.factory().create();
}
@Override
diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index 876280125784..6895815cdcdc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -107,7 +107,7 @@ protected List getExpected(List input) {
@Override
protected MergeFunction createMergeFunction() {
- return new DeduplicateMergeFunction();
+ return DeduplicateMergeFunction.factory().create();
}
}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 237b4366b1e2..62d87a7338b3 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action.cdc.kafka;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.FileSystemCatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
@@ -971,10 +972,11 @@ public void testCatalogAndTableConfig() {
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
}
- @Test
+ @ParameterizedTest(name = "ignore-delete = {0}")
+ @ValueSource(booleans = {true, false})
@Timeout(60)
- public void testCDCOperations() throws Exception {
- final String topic = "event-insert";
+ public void testCDCOperations(boolean ignoreDelete) throws Exception {
+ final String topic = "event-insert" + UUID.randomUUID();
createTestTopic(topic, 1, 1);
// ---------- Write the Canal json into Kafka -------------------
@@ -984,8 +986,11 @@ public void testCDCOperations() throws Exception {
kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
kafkaConfig.put(TOPIC.key(), topic);
+ Map tableConfig = getBasicTableConfig();
+ tableConfig.put(CoreOptions.DEDUPLICATE_IGNORE_DELETE.key(), String.valueOf(ignoreDelete));
+
KafkaSyncTableAction action =
- syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build();
+ syncTableActionBuilder(kafkaConfig).withTableConfig(tableConfig).build();
runActionWithDefaultEnv(action);
FileStoreTable table = getFileStoreTable(tableName);
@@ -1035,10 +1040,12 @@ public void testCDCOperations() throws Exception {
// For the DELETE operation
List expectedDelete =
- Arrays.asList(
- "+I[1, 2, second, NULL, NULL, NULL, NULL]",
- "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]",
- "+I[2, 4, four, NULL, NULL, NULL, NULL]");
+ ignoreDelete
+ ? expectedUpdate
+ : Arrays.asList(
+ "+I[1, 2, second, NULL, NULL, NULL, NULL]",
+ "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]",
+ "+I[2, 4, four, NULL, NULL, NULL, NULL]");
waitForResult(expectedDelete, table, rowType, primaryKeys);
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index fae1f5acf70a..ac68dcad7995 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -20,6 +20,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
@@ -368,4 +369,21 @@ public void testRowKindField() {
sql("INSERT INTO R_T VALUES (1, 2, '-D')");
assertThat(sql("SELECT * FROM R_T")).isEmpty();
}
+
+ @Test
+ public void testIgnoreDelete() throws Exception {
+ sql(
+ "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) "
+ + "WITH ('deduplicate.ignore-delete' = 'true')");
+ BlockingIterator iterator = streamSqlBlockIter("SELECT * FROM ignore_delete");
+
+ sql("INSERT INTO ignore_delete VALUES (1, 'A'), (2, 'B')");
+ sql("DELETE FROM ignore_delete WHERE pk = 1");
+ sql("INSERT INTO ignore_delete VALUES (1, 'B')");
+
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.INSERT, 1, "B"), Row.ofKind(RowKind.INSERT, 2, "B"));
+ iterator.close();
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index eda44b0bcd05..b056c9e869c0 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -505,4 +505,20 @@ public void testDynamicPartitionPruningNotWork() throws Exception {
Row.of(22, 202L, "ccc", 2, 2, "b"));
iterator.close();
}
+
+ @Test
+ public void testIgnoreDelete() {
+ sql(
+ "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) "
+ + "WITH ('deduplicate.ignore-delete' = 'true')");
+
+ sql("INSERT INTO ignore_delete VALUES (1, 'A')");
+ assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A"));
+
+ sql("DELETE FROM ignore_delete WHERE pk = 1");
+ assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A"));
+
+ sql("INSERT INTO ignore_delete VALUES (1, 'B')");
+ assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B"));
+ }
}