diff --git a/docs/content/primary-key-table/merge-engine/partial-update.md b/docs/content/primary-key-table/merge-engine/partial-update.md
index 70b12618e5aa..09c376c25e2f 100644
--- a/docs/content/primary-key-table/merge-engine/partial-update.md
+++ b/docs/content/primary-key-table/merge-engine/partial-update.md
@@ -49,6 +49,7 @@ but only returns input records.)
By default, Partial update can not accept delete records, you can choose one of the following solutions:
- Configure 'ignore-delete' to ignore delete records.
+- Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records.
- Configure 'sequence-group's to retract partial columns.
{{< /hint >}}
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 89db43b431fb..19a73cc5d410 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -485,6 +485,12 @@
Integer |
Turn off the dictionary encoding for all fields in parquet. |
+
+ partial-update.remove-record-on-delete |
+ false |
+ Boolean |
+ Whether to remove the whole row in partial-update engine when -D records are received. |
+
partition |
(none) |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 8864bc3d3344..9bb6ab91f82e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -553,6 +553,14 @@ public class CoreOptions implements Serializable {
"The field that generates the sequence number for primary key table,"
+ " the sequence number determines which data is the most recent.");
+ @Immutable
+ public static final ConfigOption PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE =
+ key("partial-update.remove-record-on-delete")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to remove the whole row in partial-update engine when -D records are received.");
+
@Immutable
public static final ConfigOption ROWKIND_FIELD =
key("rowkind.field")
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 154595859d1a..3013d6ad55ba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -29,6 +29,7 @@
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FieldsComparator;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.UserDefinedSeqComparator;
@@ -48,6 +49,7 @@
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
+import static org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;
/**
@@ -63,6 +65,7 @@ public class PartialUpdateMergeFunction implements MergeFunction {
private final Map fieldSeqComparators;
private final boolean fieldSequenceEnabled;
private final Map fieldAggregators;
+ private final boolean removeRecordOnDelete;
private InternalRow currentKey;
private long latestSequenceNumber;
@@ -74,12 +77,14 @@ protected PartialUpdateMergeFunction(
boolean ignoreDelete,
Map fieldSeqComparators,
Map fieldAggregators,
- boolean fieldSequenceEnabled) {
+ boolean fieldSequenceEnabled,
+ boolean removeRecordOnDelete) {
this.getters = getters;
this.ignoreDelete = ignoreDelete;
this.fieldSeqComparators = fieldSeqComparators;
this.fieldAggregators = fieldAggregators;
this.fieldSequenceEnabled = fieldSequenceEnabled;
+ this.removeRecordOnDelete = removeRecordOnDelete;
}
@Override
@@ -106,6 +111,14 @@ public void add(KeyValue kv) {
return;
}
+ if (removeRecordOnDelete) {
+ if (kv.valueKind() == RowKind.DELETE) {
+ row = null;
+ }
+ // ignore -U records
+ return;
+ }
+
String msg =
String.join(
"\n",
@@ -235,6 +248,9 @@ public KeyValue getResult() {
if (reused == null) {
reused = new KeyValue();
}
+ if (removeRecordOnDelete && row == null) {
+ return null;
+ }
return reused.replace(currentKey, latestSequenceNumber, RowKind.INSERT, row);
}
@@ -256,6 +272,8 @@ private static class Factory implements MergeFunctionFactory {
private final Map fieldAggregators;
+ private final boolean removeRecordOnDelete;
+
private Factory(Options options, RowType rowType, List primaryKeys) {
this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
this.rowType = rowType;
@@ -310,6 +328,19 @@ private Factory(Options options, RowType rowType, List primaryKeys) {
throw new IllegalArgumentException(
"Must use sequence group for aggregation functions.");
}
+
+ removeRecordOnDelete = options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
+
+ Preconditions.checkState(
+ !(removeRecordOnDelete && ignoreDelete),
+ String.format(
+ "%s and %s have conflicting behavior so should not be enabled at the same time.",
+ CoreOptions.IGNORE_DELETE, PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
+ Preconditions.checkState(
+ !removeRecordOnDelete || fieldSeqComparators.isEmpty(),
+ String.format(
+ "sequence group and %s have conflicting behavior so should not be enabled at the same time.",
+ PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
}
@Override
@@ -368,14 +399,16 @@ public MergeFunction create(@Nullable int[][] projection) {
ignoreDelete,
projectedSeqComparators,
projectedAggregators,
- !fieldSeqComparators.isEmpty());
+ !fieldSeqComparators.isEmpty(),
+ removeRecordOnDelete);
} else {
return new PartialUpdateMergeFunction(
createFieldGetters(tableTypes),
ignoreDelete,
fieldSeqComparators,
fieldAggregators,
- !fieldSeqComparators.isEmpty());
+ !fieldSeqComparators.isEmpty(),
+ removeRecordOnDelete);
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index c4867bcac7a3..b74f242e1429 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -1023,6 +1023,107 @@ public void testAuditLog() throws Exception {
assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]", "+I[10, 1]");
}
+ @Test
+ public void testPartialUpdateRemoveRecordOnDelete() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()
+ },
+ new String[] {"pt", "a", "b", "c"});
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ options.set("merge-engine", "partial-update");
+ options.set("partial-update.remove-record-on-delete", "true");
+ },
+ rowType);
+ Function rowToString = row -> internalRowToString(row, rowType);
+ SnapshotReader snapshotReader = table.newSnapshotReader();
+ TableRead read = table.newRead();
+ StreamTableWrite write = table.newWrite("");
+ StreamTableCommit commit = table.newCommit("");
+ // 1. inserts
+ write.write(GenericRow.of(1, 1, 3, 3));
+ write.write(GenericRow.of(1, 1, 1, 1));
+ write.write(GenericRow.of(1, 1, 2, 2));
+ commit.commit(0, write.prepareCommit(true, 0));
+ List result =
+ getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");
+
+ // 2. Update Before
+ write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
+ commit.commit(1, write.prepareCommit(true, 1));
+ result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");
+
+ // 3. Update After
+ write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
+ commit.commit(2, write.prepareCommit(true, 2));
+ result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 3]");
+
+ // 4. Retracts
+ write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
+ commit.commit(3, write.prepareCommit(true, 3));
+ result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
+ assertThat(result).isEmpty();
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testPartialUpdateWithAgg() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT()
+ },
+ new String[] {"pt", "a", "b", "c"});
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ options.set("merge-engine", "partial-update");
+ options.set("fields.a.sequence-group", "c");
+ options.set("fields.c.aggregate-function", "sum");
+ },
+ rowType);
+ Function rowToString = row -> internalRowToString(row, rowType);
+ SnapshotReader snapshotReader = table.newSnapshotReader();
+ TableRead read = table.newRead();
+ StreamTableWrite write = table.newWrite("");
+ StreamTableCommit commit = table.newCommit("");
+ // 1. inserts
+ write.write(GenericRow.of(1, 1, 3, 3));
+ write.write(GenericRow.of(1, 1, 1, 1));
+ write.write(GenericRow.of(1, 1, 2, 2));
+ commit.commit(0, write.prepareCommit(true, 0));
+ List result =
+ getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 6]");
+
+ // 2. Update Before
+ write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
+ commit.commit(1, write.prepareCommit(true, 1));
+ result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");
+
+ // 3. Update After
+ write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
+ commit.commit(2, write.prepareCommit(true, 2));
+ result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 7]");
+
+ // 4. Retracts
+ write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
+ commit.commit(3, write.prepareCommit(true, 3));
+ result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");
+ write.close();
+ commit.close();
+ }
+
@Test
public void testAggMergeFunc() throws Exception {
RowType rowType =