diff --git a/docs/content/primary-key-table/merge-engine.md b/docs/content/primary-key-table/merge-engine.md index f4daa7bf73570..cd3f257f4b9fb 100644 --- a/docs/content/primary-key-table/merge-engine.md +++ b/docs/content/primary-key-table/merge-engine.md @@ -58,7 +58,7 @@ For streaming queries, `partial-update` merge engine must be used together with {{< /hint >}} {{< hint info >}} -By default, Partial update can not accept delete records, you can choose one of the following solutions: +By default, Partial update will remove the entire row when -D records are received, you can choose one of the following solutions: - Configure 'ignore-delete' to ignore delete records. - Configure 'sequence-group's to retract partial columns. {{< /hint >}} diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index 154595859d1ad..f458e01cc8935 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -58,6 +58,7 @@ public class PartialUpdateMergeFunction implements MergeFunction { public static final String SEQUENCE_GROUP = "sequence-group"; + private final boolean[] primaryKeyMask; private final InternalRow.FieldGetter[] getters; private final boolean ignoreDelete; private final Map fieldSeqComparators; @@ -70,11 +71,13 @@ public class PartialUpdateMergeFunction implements MergeFunction { private KeyValue reused; protected PartialUpdateMergeFunction( + boolean[] primaryKeyMask, InternalRow.FieldGetter[] getters, boolean ignoreDelete, Map fieldSeqComparators, Map fieldAggregators, boolean fieldSequenceEnabled) { + this.primaryKeyMask = primaryKeyMask; this.getters = getters; this.ignoreDelete = ignoreDelete; this.fieldSeqComparators = fieldSeqComparators; @@ -106,15 +109,11 @@ public void add(KeyValue kv) { return; } - String msg = - String.join( - "\n", - "By default, Partial update can not accept delete records," - + " you can choose one of the following solutions:", - "1. Configure 'ignore-delete' to ignore delete records.", - "2. Configure 'sequence-group's to retract partial columns."); + if (kv.valueKind() == RowKind.DELETE) { + retract(kv); + } - throw new IllegalArgumentException(msg); + return; } latestSequenceNumber = kv.sequenceNumber(); @@ -182,6 +181,25 @@ private boolean isEmptySequenceGroup(KeyValue kv, FieldsComparator comparator) { return true; } + private void retract(KeyValue kv) { + for (int i = 0; i < getters.length; i++) { + FieldAggregator aggregator = fieldAggregators.get(i); + // retract normal field + if (aggregator == null) { + if (primaryKeyMask[i]) { + row.setField(i, getters[i].getFieldOrNull(kv.value())); + } else { + row.setField(i, null); + } + } else { + // retract agg field + Object accumulator = getters[i].getFieldOrNull(row); + row.setField( + i, aggregator.retract(accumulator, getters[i].getFieldOrNull(kv.value()))); + } + } + } + private void retractWithSequenceGroup(KeyValue kv) { Set updatedSequenceFields = new HashSet<>(); @@ -235,9 +253,29 @@ public KeyValue getResult() { if (reused == null) { reused = new KeyValue(); } + if (isAllValueFieldNull()) { + row.setRowKind(RowKind.DELETE); + return reused.replace(currentKey, latestSequenceNumber, RowKind.DELETE, row); + } + row.setRowKind(RowKind.INSERT); return reused.replace(currentKey, latestSequenceNumber, RowKind.INSERT, row); } + private boolean isAllValueFieldNull() { + for (int i = 0; i < getters.length; i++) { + if (getters[i].getFieldOrNull(row) == null) { + continue; + } + + if (primaryKeyMask[i]) { + continue; + } + + return false; + } + return true; + } + public static MergeFunctionFactory factory( Options options, RowType rowType, List primaryKeys) { return new Factory(options, rowType, primaryKeys); @@ -256,6 +294,8 @@ private static class Factory implements MergeFunctionFactory { private final Map fieldAggregators; + private final boolean[] primaryKeyMask; + private Factory(Options options, RowType rowType, List primaryKeys) { this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE); this.rowType = rowType; @@ -310,6 +350,11 @@ private Factory(Options options, RowType rowType, List primaryKeys) { throw new IllegalArgumentException( "Must use sequence group for aggregation functions."); } + + primaryKeyMask = new boolean[rowType.getFieldCount()]; + for (String primaryKey : primaryKeys) { + primaryKeyMask[rowType.getFieldIndex(primaryKey)] = true; + } } @Override @@ -357,13 +402,16 @@ public MergeFunction create(@Nullable int[][] projection) { newRowType, newSequenceFields)); } }); + boolean[] primaryKeyMask = new boolean[projects.length]; for (int i = 0; i < projects.length; i++) { if (fieldAggregators.containsKey(projects[i])) { projectedAggregators.put(i, fieldAggregators.get(projects[i])); } + primaryKeyMask[i] = this.primaryKeyMask[projects[i]]; } return new PartialUpdateMergeFunction( + primaryKeyMask, createFieldGetters(Projection.of(projection).project(tableTypes)), ignoreDelete, projectedSeqComparators, @@ -371,6 +419,7 @@ public MergeFunction create(@Nullable int[][] projection) { !fieldSeqComparators.isEmpty()); } else { return new PartialUpdateMergeFunction( + primaryKeyMask, createFieldGetters(tableTypes), ignoreDelete, fieldSeqComparators, diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 095511ae1d35e..d64ec86d8f749 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -1009,6 +1009,106 @@ public void testAuditLog() throws Exception { assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]", "+I[10, 1]"); } + @Test + public void testPartialUpdate() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT() + }, + new String[] {"pt", "a", "b", "c"}); + FileStoreTable table = + createFileStoreTable( + options -> { + options.set("merge-engine", "partial-update"); + }, + rowType); + Function rowToString = row -> internalRowToString(row, rowType); + SnapshotReader snapshotReader = table.newSnapshotReader(); + TableRead read = table.newRead(); + StreamTableWrite write = table.newWrite(""); + StreamTableCommit commit = table.newCommit(""); + // 1. inserts + write.write(GenericRow.of(1, 1, 3, 3)); + write.write(GenericRow.of(1, 1, 1, 1)); + write.write(GenericRow.of(1, 1, 2, 2)); + commit.commit(0, write.prepareCommit(true, 0)); + List result = + getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]"); + + // 2. Update Before + write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2)); + commit.commit(1, write.prepareCommit(true, 1)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]"); + + // 3. Update After + write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3)); + commit.commit(2, write.prepareCommit(true, 2)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 3]"); + + // 4. Retracts + write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3)); + commit.commit(3, write.prepareCommit(true, 3)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).isEmpty(); + write.close(); + commit.close(); + } + + @Test + public void testPartialUpdateWithAgg() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT() + }, + new String[] {"pt", "a", "b", "c"}); + FileStoreTable table = + createFileStoreTable( + options -> { + options.set("merge-engine", "partial-update"); + options.set("fields.a.sequence-group", "c"); + options.set("fields.c.aggregate-function", "sum"); + }, + rowType); + Function rowToString = row -> internalRowToString(row, rowType); + SnapshotReader snapshotReader = table.newSnapshotReader(); + TableRead read = table.newRead(); + StreamTableWrite write = table.newWrite(""); + StreamTableCommit commit = table.newCommit(""); + // 1. inserts + write.write(GenericRow.of(1, 1, 3, 3)); + write.write(GenericRow.of(1, 1, 1, 1)); + write.write(GenericRow.of(1, 1, 2, 2)); + commit.commit(0, write.prepareCommit(true, 0)); + List result = + getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 6]"); + + // 2. Update Before + write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2)); + commit.commit(1, write.prepareCommit(true, 1)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]"); + + // 3. Update After + write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3)); + commit.commit(2, write.prepareCommit(true, 2)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 7]"); + + // 4. Retracts + write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3)); + commit.commit(3, write.prepareCommit(true, 3)); + result = getResult(read, toSplits(snapshotReader.read().dataSplits()), rowToString); + assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]"); + write.close(); + commit.close(); + } + @Test public void testAggMergeFunc() throws Exception { RowType rowType =