From f457d35e460a2d84c8e58b2fa87b31b339b5b302 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Wed, 17 Jul 2024 16:13:28 +0800 Subject: [PATCH] [core] Support delete row in partial updates (#3602) --- .../merge-engine/partial-update.md | 1 + .../generated/core_configuration.html | 6 ++ .../java/org/apache/paimon/CoreOptions.java | 8 ++ .../compact/PartialUpdateMergeFunction.java | 39 ++++++- .../table/PrimaryKeyFileStoreTableTest.java | 101 ++++++++++++++++++ 5 files changed, 152 insertions(+), 3 deletions(-) diff --git a/docs/content/primary-key-table/merge-engine/partial-update.md b/docs/content/primary-key-table/merge-engine/partial-update.md index 70b12618e5aa..09c376c25e2f 100644 --- a/docs/content/primary-key-table/merge-engine/partial-update.md +++ b/docs/content/primary-key-table/merge-engine/partial-update.md @@ -49,6 +49,7 @@ but only returns input records.) By default, Partial update can not accept delete records, you can choose one of the following solutions: - Configure 'ignore-delete' to ignore delete records. +- Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records. - Configure 'sequence-group's to retract partial columns. {{< /hint >}} diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 89db43b431fb..19a73cc5d410 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -485,6 +485,12 @@ Integer Turn off the dictionary encoding for all fields in parquet. + +
partial-update.remove-record-on-delete
+ false + Boolean + Whether to remove the whole row in partial-update engine when -D records are received. +
partition
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 8864bc3d3344..9bb6ab91f82e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -553,6 +553,14 @@ public class CoreOptions implements Serializable { "The field that generates the sequence number for primary key table," + " the sequence number determines which data is the most recent."); + @Immutable + public static final ConfigOption PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE = + key("partial-update.remove-record-on-delete") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to remove the whole row in partial-update engine when -D records are received."); + @Immutable public static final ConfigOption ROWKIND_FIELD = key("rowkind.field") diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index 154595859d1a..3013d6ad55ba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -29,6 +29,7 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FieldsComparator; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.Projection; import org.apache.paimon.utils.UserDefinedSeqComparator; @@ -48,6 +49,7 @@ import static org.apache.paimon.CoreOptions.FIELDS_PREFIX; import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR; +import static org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE; import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters; /** @@ -63,6 +65,7 @@ public class PartialUpdateMergeFunction implements MergeFunction { private final Map fieldSeqComparators; private final boolean fieldSequenceEnabled; private final Map fieldAggregators; + private final boolean removeRecordOnDelete; private InternalRow currentKey; private long latestSequenceNumber; @@ -74,12 +77,14 @@ protected PartialUpdateMergeFunction( boolean ignoreDelete, Map fieldSeqComparators, Map fieldAggregators, - boolean fieldSequenceEnabled) { + boolean fieldSequenceEnabled, + boolean removeRecordOnDelete) { this.getters = getters; this.ignoreDelete = ignoreDelete; this.fieldSeqComparators = fieldSeqComparators; this.fieldAggregators = fieldAggregators; this.fieldSequenceEnabled = fieldSequenceEnabled; + this.removeRecordOnDelete = removeRecordOnDelete; } @Override @@ -106,6 +111,14 @@ public void add(KeyValue kv) { return; } + if (removeRecordOnDelete) { + if (kv.valueKind() == RowKind.DELETE) { + row = null; + } + // ignore -U records + return; + } + String msg = String.join( "\n", @@ -235,6 +248,9 @@ public KeyValue getResult() { if (reused == null) { reused = new KeyValue(); } + if (removeRecordOnDelete && row == null) { + return null; + } return reused.replace(currentKey, latestSequenceNumber, RowKind.INSERT, row); } @@ -256,6 +272,8 @@ private static class Factory implements MergeFunctionFactory { private final Map fieldAggregators; + private final boolean removeRecordOnDelete; + private Factory(Options options, RowType rowType, List primaryKeys) { this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE); this.rowType = rowType; @@ -310,6 +328,19 @@ private Factory(Options options, RowType rowType, List primaryKeys) { throw new IllegalArgumentException( "Must use sequence group for aggregation functions."); } + + removeRecordOnDelete = options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE); + + Preconditions.checkState( + !(removeRecordOnDelete && ignoreDelete), + String.format( + "%s and %s have conflicting behavior so should not be enabled at the same time.", + CoreOptions.IGNORE_DELETE, PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE)); + Preconditions.checkState( + !removeRecordOnDelete || fieldSeqComparators.isEmpty(), + String.format( + "sequence group and %s have conflicting behavior so should not be enabled at the same time.", + PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE)); } @Override @@ -368,14 +399,16 @@ public MergeFunction create(@Nullable int[][] projection) { ignoreDelete, projectedSeqComparators, projectedAggregators, - !fieldSeqComparators.isEmpty()); + !fieldSeqComparators.isEmpty(), + removeRecordOnDelete); } else { return new PartialUpdateMergeFunction( createFieldGetters(tableTypes), ignoreDelete, fieldSeqComparators, fieldAggregators, - !fieldSeqComparators.isEmpty()); + !fieldSeqComparators.isEmpty(), + removeRecordOnDelete); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index c4867bcac7a3..b74f242e1429 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -1023,6 +1023,107 @@ public void testAuditLog() throws Exception { assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]", "+I[10, 1]"); } + @Test + public void testPartialUpdateRemoveRecordOnDelete() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT() + }, + new String[] {"pt", "a", "b", "c"}); + FileStoreTable table = + createFileStoreTable( + options -> { + options.set("merge-engine", "partial-update"); + options.set("partial-update.remove-record-on-delete", "true"); + }, + rowType); + Function rowToString = row -> internalRowToString(row, rowType); + SnapshotReader snapshotReader = table.newSnapshotReader(); + TableRead read = table.newRead(); + StreamTableWrite write = table.newWrite(""); + StreamTableCommit commit = table.newCommit(""); + // 1. inserts + write.write(GenericRow.of(1, 1, 3, 3)); + write.write(GenericRow.of(1, 1, 1, 1)); + write.write(GenericRow.of(1, 1, 2, 2)); + commit.commit(0, write.prepareCommit(true, 0)); + List result = + getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]"); + + // 2. Update Before + write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2)); + commit.commit(1, write.prepareCommit(true, 1)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]"); + + // 3. Update After + write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3)); + commit.commit(2, write.prepareCommit(true, 2)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 3]"); + + // 4. Retracts + write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3)); + commit.commit(3, write.prepareCommit(true, 3)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).isEmpty(); + write.close(); + commit.close(); + } + + @Test + public void testPartialUpdateWithAgg() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT() + }, + new String[] {"pt", "a", "b", "c"}); + FileStoreTable table = + createFileStoreTable( + options -> { + options.set("merge-engine", "partial-update"); + options.set("fields.a.sequence-group", "c"); + options.set("fields.c.aggregate-function", "sum"); + }, + rowType); + Function rowToString = row -> internalRowToString(row, rowType); + SnapshotReader snapshotReader = table.newSnapshotReader(); + TableRead read = table.newRead(); + StreamTableWrite write = table.newWrite(""); + StreamTableCommit commit = table.newCommit(""); + // 1. inserts + write.write(GenericRow.of(1, 1, 3, 3)); + write.write(GenericRow.of(1, 1, 1, 1)); + write.write(GenericRow.of(1, 1, 2, 2)); + commit.commit(0, write.prepareCommit(true, 0)); + List result = + getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 6]"); + + // 2. Update Before + write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2)); + commit.commit(1, write.prepareCommit(true, 1)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]"); + + // 3. Update After + write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3)); + commit.commit(2, write.prepareCommit(true, 2)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 7]"); + + // 4. Retracts + write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3)); + commit.commit(3, write.prepareCommit(true, 3)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]"); + write.close(); + commit.close(); + } + @Test public void testAggMergeFunc() throws Exception { RowType rowType =