From 791d3bfbe8d2f542a85dec387f651c2bcb10a773 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Fri, 14 Jun 2024 16:55:17 +0800 Subject: [PATCH] [core] Support delete row in partial updates --- .../merge-engine/partial-update.md | 1 + .../generated/core_configuration.html | 6 ++ .../java/org/apache/paimon/CoreOptions.java | 8 ++ .../compact/PartialUpdateMergeFunction.java | 87 ++++++++++++++- .../table/PrimaryKeyFileStoreTableTest.java | 101 ++++++++++++++++++ 5 files changed, 200 insertions(+), 3 deletions(-) diff --git a/docs/content/primary-key-table/merge-engine/partial-update.md b/docs/content/primary-key-table/merge-engine/partial-update.md index 70b12618e5aa9..09c376c25e2f6 100644 --- a/docs/content/primary-key-table/merge-engine/partial-update.md +++ b/docs/content/primary-key-table/merge-engine/partial-update.md @@ -49,6 +49,7 @@ but only returns input records.) By default, Partial update can not accept delete records, you can choose one of the following solutions: - Configure 'ignore-delete' to ignore delete records. +- Configure 'partial-update.remove-record-on-delete' to remove the whole row when receiving delete records. - Configure 'sequence-group's to retract partial columns. {{< /hint >}} diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 9faeb0865a682..dc1723232ca2d 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -479,6 +479,12 @@ Integer Turn off the dictionary encoding for all fields in parquet. + +
partial-update.remove-record-on-delete
+ false + Boolean + Whether to remove the whole row in partial-update engine when -D records are received. +
partition
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 0ec6d59a085b7..4954c6c71d61d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -553,6 +553,14 @@ public class CoreOptions implements Serializable { "The field that generates the sequence number for primary key table," + " the sequence number determines which data is the most recent."); + @Immutable + public static final ConfigOption PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE = + key("partial-update.remove-record-on-delete") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to remove the whole row in partial-update engine when -D records are received."); + @Immutable public static final ConfigOption ROWKIND_FIELD = key("rowkind.field") diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index 154595859d1ad..ac2924628b262 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -29,6 +29,7 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FieldsComparator; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.Projection; import org.apache.paimon.utils.UserDefinedSeqComparator; @@ -48,6 +49,7 @@ import static org.apache.paimon.CoreOptions.FIELDS_PREFIX; import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR; +import static org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE; import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters; /** @@ -58,11 +60,13 @@ public class PartialUpdateMergeFunction implements MergeFunction { public static final String SEQUENCE_GROUP = "sequence-group"; + private final boolean[] primaryKeyMask; private final InternalRow.FieldGetter[] getters; private final boolean ignoreDelete; private final Map fieldSeqComparators; private final boolean fieldSequenceEnabled; private final Map fieldAggregators; + private final boolean removeRecordOnDelete; private InternalRow currentKey; private long latestSequenceNumber; @@ -70,16 +74,20 @@ public class PartialUpdateMergeFunction implements MergeFunction { private KeyValue reused; protected PartialUpdateMergeFunction( + boolean[] primaryKeyMask, InternalRow.FieldGetter[] getters, boolean ignoreDelete, Map fieldSeqComparators, Map fieldAggregators, - boolean fieldSequenceEnabled) { + boolean fieldSequenceEnabled, + boolean removeRecordOnDelete) { + this.primaryKeyMask = primaryKeyMask; this.getters = getters; this.ignoreDelete = ignoreDelete; this.fieldSeqComparators = fieldSeqComparators; this.fieldAggregators = fieldAggregators; this.fieldSequenceEnabled = fieldSequenceEnabled; + this.removeRecordOnDelete = removeRecordOnDelete; } @Override @@ -106,6 +114,14 @@ public void add(KeyValue kv) { return; } + if (removeRecordOnDelete) { + if (kv.valueKind() == RowKind.DELETE) { + clearRow(kv); + } + // ignore -U records + return; + } + String msg = String.join( "\n", @@ -182,6 +198,23 @@ private boolean isEmptySequenceGroup(KeyValue kv, FieldsComparator comparator) { return true; } + private void clearRow(KeyValue kv) { + for (int i = 0; i < getters.length; i++) { + FieldAggregator aggregator = fieldAggregators.get(i); + if (primaryKeyMask[i]) { + row.setField(i, getters[i].getFieldOrNull(kv.value())); + } else if (aggregator == null) { + // retract normal field + row.setField(i, null); + } else { + // retract agg field + Object accumulator = getters[i].getFieldOrNull(row); + row.setField( + i, aggregator.retract(accumulator, getters[i].getFieldOrNull(kv.value()))); + } + } + } + private void retractWithSequenceGroup(KeyValue kv) { Set updatedSequenceFields = new HashSet<>(); @@ -235,9 +268,29 @@ public KeyValue getResult() { if (reused == null) { reused = new KeyValue(); } + if (removeRecordOnDelete && isRowCleared()) { + row.setRowKind(RowKind.DELETE); + return reused.replace(currentKey, latestSequenceNumber, RowKind.DELETE, row); + } + row.setRowKind(RowKind.INSERT); return reused.replace(currentKey, latestSequenceNumber, RowKind.INSERT, row); } + private boolean isRowCleared() { + for (int i = 0; i < getters.length; i++) { + if (getters[i].getFieldOrNull(row) == null) { + continue; + } + + if (primaryKeyMask[i]) { + continue; + } + + return false; + } + return true; + } + public static MergeFunctionFactory factory( Options options, RowType rowType, List primaryKeys) { return new Factory(options, rowType, primaryKeys); @@ -256,6 +309,10 @@ private static class Factory implements MergeFunctionFactory { private final Map fieldAggregators; + private final boolean[] primaryKeyMask; + + private final boolean removeRecordOnDelete; + private Factory(Options options, RowType rowType, List primaryKeys) { this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE); this.rowType = rowType; @@ -310,6 +367,24 @@ private Factory(Options options, RowType rowType, List primaryKeys) { throw new IllegalArgumentException( "Must use sequence group for aggregation functions."); } + + primaryKeyMask = new boolean[rowType.getFieldCount()]; + for (String primaryKey : primaryKeys) { + primaryKeyMask[rowType.getFieldIndex(primaryKey)] = true; + } + + removeRecordOnDelete = options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE); + + Preconditions.checkState( + !(removeRecordOnDelete && ignoreDelete), + String.format( + "%s and %s have conflicting behavior so should not be enabled at the same time.", + CoreOptions.IGNORE_DELETE, PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE)); + Preconditions.checkState( + !removeRecordOnDelete || fieldSeqComparators.isEmpty(), + String.format( + "sequence group and %s have conflicting behavior so should not be enabled at the same time.", + PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE)); } @Override @@ -357,25 +432,31 @@ public MergeFunction create(@Nullable int[][] projection) { newRowType, newSequenceFields)); } }); + boolean[] primaryKeyMask = new boolean[projects.length]; for (int i = 0; i < projects.length; i++) { if (fieldAggregators.containsKey(projects[i])) { projectedAggregators.put(i, fieldAggregators.get(projects[i])); } + primaryKeyMask[i] = this.primaryKeyMask[projects[i]]; } return new PartialUpdateMergeFunction( + primaryKeyMask, createFieldGetters(Projection.of(projection).project(tableTypes)), ignoreDelete, projectedSeqComparators, projectedAggregators, - !fieldSeqComparators.isEmpty()); + !fieldSeqComparators.isEmpty(), + removeRecordOnDelete); } else { return new PartialUpdateMergeFunction( + primaryKeyMask, createFieldGetters(tableTypes), ignoreDelete, fieldSeqComparators, fieldAggregators, - !fieldSeqComparators.isEmpty()); + !fieldSeqComparators.isEmpty(), + removeRecordOnDelete); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index c4867bcac7a36..b74f242e14299 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -1023,6 +1023,107 @@ public void testAuditLog() throws Exception { assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]", "+I[10, 1]"); } + @Test + public void testPartialUpdateRemoveRecordOnDelete() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT() + }, + new String[] {"pt", "a", "b", "c"}); + FileStoreTable table = + createFileStoreTable( + options -> { + options.set("merge-engine", "partial-update"); + options.set("partial-update.remove-record-on-delete", "true"); + }, + rowType); + Function rowToString = row -> internalRowToString(row, rowType); + SnapshotReader snapshotReader = table.newSnapshotReader(); + TableRead read = table.newRead(); + StreamTableWrite write = table.newWrite(""); + StreamTableCommit commit = table.newCommit(""); + // 1. inserts + write.write(GenericRow.of(1, 1, 3, 3)); + write.write(GenericRow.of(1, 1, 1, 1)); + write.write(GenericRow.of(1, 1, 2, 2)); + commit.commit(0, write.prepareCommit(true, 0)); + List result = + getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]"); + + // 2. Update Before + write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2)); + commit.commit(1, write.prepareCommit(true, 1)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]"); + + // 3. Update After + write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3)); + commit.commit(2, write.prepareCommit(true, 2)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 3]"); + + // 4. Retracts + write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3)); + commit.commit(3, write.prepareCommit(true, 3)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).isEmpty(); + write.close(); + commit.close(); + } + + @Test + public void testPartialUpdateWithAgg() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT() + }, + new String[] {"pt", "a", "b", "c"}); + FileStoreTable table = + createFileStoreTable( + options -> { + options.set("merge-engine", "partial-update"); + options.set("fields.a.sequence-group", "c"); + options.set("fields.c.aggregate-function", "sum"); + }, + rowType); + Function rowToString = row -> internalRowToString(row, rowType); + SnapshotReader snapshotReader = table.newSnapshotReader(); + TableRead read = table.newRead(); + StreamTableWrite write = table.newWrite(""); + StreamTableCommit commit = table.newCommit(""); + // 1. inserts + write.write(GenericRow.of(1, 1, 3, 3)); + write.write(GenericRow.of(1, 1, 1, 1)); + write.write(GenericRow.of(1, 1, 2, 2)); + commit.commit(0, write.prepareCommit(true, 0)); + List result = + getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 6]"); + + // 2. Update Before + write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2)); + commit.commit(1, write.prepareCommit(true, 1)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]"); + + // 3. Update After + write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3)); + commit.commit(2, write.prepareCommit(true, 2)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 7]"); + + // 4. Retracts + write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3)); + commit.commit(3, write.prepareCommit(true, 3)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]"); + write.close(); + commit.close(); + } + @Test public void testAggMergeFunc() throws Exception { RowType rowType =