From 0bee22e5d181fbf1f4e77e6080a5e0295f2b38d8 Mon Sep 17 00:00:00 2001 From: yuzelin <33053040+yuzelin@users.noreply.github.com> Date: Wed, 31 Jan 2024 16:45:09 +0800 Subject: [PATCH] [core] Agg 'collect' and 'merge_map' support retract (#2816) --- .../primary-key-table/merge-engine.md | 22 ++- .../java/org/apache/paimon/CoreOptions.java | 9 + .../compact/aggregate/FieldCollectAgg.java | 47 +++++ .../compact/aggregate/FieldMergeMapAgg.java | 32 ++- .../aggregate/FieldAggregatorTest.java | 183 ++++++++++++++++++ .../paimon/flink/sink/FlinkTableSinkBase.java | 6 + .../paimon/flink/PreAggregationITCase.java | 77 ++++++++ 7 files changed, 371 insertions(+), 5 deletions(-) diff --git a/docs/content/concepts/primary-key-table/merge-engine.md b/docs/content/concepts/primary-key-table/merge-engine.md index 4a65d7151818..daa455507a25 100644 --- a/docs/content/concepts/primary-key-table/merge-engine.md +++ b/docs/content/concepts/primary-key-table/merge-engine.md @@ -295,15 +295,29 @@ Current supported aggregate functions and data types are: * `merge_map`: The merge_map function merge input maps. It only supports MAP type. -Only `sum`, `product` and `count` supports retraction (`UPDATE_BEFORE` and `DELETE`), others aggregate functions do not support retraction. -If you allow some functions to ignore retraction messages, you can configure: -`'fields.${field_name}.ignore-retract'='true'`. - {{< hint info >}} For streaming queries, `aggregation` merge engine must be used together with `lookup` or `full-compaction` [changelog producer]({{< ref "concepts/primary-key-table/changelog-producer" >}}). ('input' changelog producer is also supported, but only returns input records.) {{< /hint >}} +### Retract + +Only `sum`, `product`, `count`, `collect` and `merge_map` supports retraction (`UPDATE_BEFORE` and `DELETE`), others aggregate functions do not support retraction. +If you allow some functions to ignore retraction messages, you can configure: +`'fields.${field_name}.ignore-retract'='true'`. + +NOTE: The `collect` and `merge_map` make a best-effort attempt to handle retraction messages, but the results are not +guaranteed to be accurate. The following behaviors may occur when processing retraction messages: + +1. It might fail to handle retraction messages if records are disordered. For example, the table uses `collect`, and the +upstreams send `+I['A', 'B']` and `-U['A']` respectively. If the table receives `-U['A']` first, it can do nothing; then it receives +`+I['A', 'B']`, the merge result will be `+I['A', 'B']` instead of `+I['B']`. + +2. The retract message from one upstream will retract the result merged from multiple upstreams. For example, the table +uses `merge_map`, and one upstream sends `+I[1->A]`, another upstream sends `+I[1->B]`, `-D[1->B]` later. The table will +merge two insert values to `+I[1->B]` first, and then the `-D[1->B]` will retract the whole result, so the final result +is an empty map instead of `+I[1->A]` + ## First Row By specifying `'merge-engine' = 'first-row'`, users can keep the first row of the same primary key. It differs from the diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 64cbc5b6ff97..6e7730928b1e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1162,6 +1162,15 @@ public Map fileFormatPerLevel() { .collect(Collectors.toMap(e -> Integer.valueOf(e.getKey()), Map.Entry::getValue)); } + public boolean definedAggFunc() { + for (String key : options.toMap().keySet()) { + if (key.startsWith(FIELDS_PREFIX) && key.endsWith(AGG_FUNCTION)) { + return true; + } + } + return false; + } + public String fieldAggFunc(String fieldName) { return options.get( key(FIELDS_PREFIX + "." + fieldName + "." + AGG_FUNCTION) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java index 543211b40b7f..d335b327a342 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java @@ -35,6 +35,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.function.BiFunction; @@ -147,4 +148,50 @@ private boolean contains(List list, @Nullable Object element) { } return false; } + + @Override + public Object retract(Object accumulator, Object retractField) { + if (accumulator == null) { + return null; + } + + InternalArray acc = (InternalArray) accumulator; + InternalArray retract = (InternalArray) retractField; + + List retractedElements = new ArrayList<>(); + for (int i = 0; i < retract.size(); i++) { + retractedElements.add(elementGetter.getElementOrNull(retract, i)); + } + + List accElements = new ArrayList<>(); + for (int i = 0; i < acc.size(); i++) { + Object candidate = elementGetter.getElementOrNull(acc, i); + if (!retract(retractedElements, candidate)) { + accElements.add(candidate); + } + } + return new GenericArray(accElements.toArray()); + } + + private boolean retract(List list, Object element) { + Iterator iterator = list.iterator(); + while (iterator.hasNext()) { + Object o = iterator.next(); + if (equals(o, element)) { + iterator.remove(); + return true; + } + } + return false; + } + + private boolean equals(Object a, Object b) { + if (a == null && b == null) { + return true; + } else if (a == null || b == null) { + return false; + } else { + return equaliser == null ? a.equals(b) : equaliser.apply(a, b); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java index c2e26e176de5..9042adc51533 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java @@ -24,9 +24,11 @@ import org.apache.paimon.types.MapType; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; -/** Collect elements into an ARRAY. */ +/** Merge two maps. */ public class FieldMergeMapAgg extends FieldAggregator { public static final String NAME = "merge_map"; @@ -69,4 +71,32 @@ private void putToMap(Map map, Object data) { valueGetter.getElementOrNull(valueArray, i)); } } + + @Override + public Object retract(Object accumulator, Object retractField) { + if (accumulator == null) { + return null; + } + + InternalMap acc = (InternalMap) accumulator; + InternalMap retract = (InternalMap) retractField; + + InternalArray retractKeyArray = retract.keyArray(); + Set retractKeys = new HashSet<>(); + for (int i = 0; i < retractKeyArray.size(); i++) { + retractKeys.add(keyGetter.getElementOrNull(retractKeyArray, i)); + } + + Map resultMap = new HashMap<>(); + InternalArray accKeyArray = acc.keyArray(); + InternalArray accValueArray = acc.valueArray(); + for (int i = 0; i < accKeyArray.size(); i++) { + Object accKey = keyGetter.getElementOrNull(accKeyArray, i); + if (!retractKeys.contains(accKey)) { + resultMap.put(accKey, valueGetter.getElementOrNull(accValueArray, i)); + } + } + + return new GenericMap(resultMap); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index a13d3fbeaa3a..aaf7754205fd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -501,6 +501,178 @@ public void testFieldCollectAggWithoutDistinct() { assertThat(unnest(result, elementGetter)).containsExactlyInAnyOrder(1, 1, 2, 2, 3); } + @Test + public void testFieldCollectAggRetractWithDistinct() { + FieldCollectAgg agg; + InternalArray.ElementGetter elementGetter; + + // primitive type + agg = new FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), true); + elementGetter = InternalArray.createElementGetter(DataTypes.INT()); + InternalArray result = + (InternalArray) + agg.retract( + new GenericArray(new int[] {1, 2, 3}), + new GenericArray(new int[] {1})); + assertThat(unnest(result, elementGetter)).containsExactlyInAnyOrder(2, 3); + + // row type + RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING()); + agg = new FieldCollectAgg(DataTypes.ARRAY(rowType), true); + elementGetter = InternalArray.createElementGetter(rowType); + + Object[] accElements = + new Object[] { + GenericRow.of(1, BinaryString.fromString("A")), + GenericRow.of(1, BinaryString.fromString("B")), + GenericRow.of(2, BinaryString.fromString("B")) + }; + Object[] retractElements = + new Object[] { + GenericRow.of(1, BinaryString.fromString("A")), + GenericRow.of(2, BinaryString.fromString("B")) + }; + result = + (InternalArray) + agg.retract( + new GenericArray(accElements), new GenericArray(retractElements)); + assertThat(unnest(result, elementGetter)) + .containsExactlyInAnyOrder(GenericRow.of(1, BinaryString.fromString("B"))); + + // array type + ArrayType arrayType = new ArrayType(DataTypes.INT()); + agg = new FieldCollectAgg(DataTypes.ARRAY(arrayType), true); + elementGetter = InternalArray.createElementGetter(arrayType); + + accElements = + new Object[] { + new GenericArray(new Object[] {1, 1}), + new GenericArray(new Object[] {1, 2}), + new GenericArray(new Object[] {2, 1}) + }; + retractElements = + new Object[] { + new GenericArray(new Object[] {1, 1}), new GenericArray(new Object[] {1, 2}) + }; + result = + (InternalArray) + agg.retract( + new GenericArray(accElements), new GenericArray(retractElements)); + assertThat(unnest(result, elementGetter)) + .containsExactlyInAnyOrder(new GenericArray(new Object[] {2, 1})); + + // map type + MapType mapType = new MapType(DataTypes.INT(), DataTypes.STRING()); + agg = new FieldCollectAgg(DataTypes.ARRAY(mapType), true); + elementGetter = InternalArray.createElementGetter(mapType); + + accElements = + new Object[] { + new GenericMap(toMap(1, "A")), + new GenericMap(toMap(2, "B", 1, "A")), + new GenericMap(toMap(1, "C")) + }; + retractElements = + new Object[] {new GenericMap(toMap(1, "A")), new GenericMap(toMap(1, "A", 2, "B"))}; + result = + (InternalArray) + agg.retract( + new GenericArray(accElements), new GenericArray(retractElements)); + + assertThat(unnest(result, elementGetter)) + .containsExactlyInAnyOrder(new GenericMap(toMap(1, "C"))); + } + + @Test + public void testFieldCollectAggRetractWithoutDistinct() { + FieldCollectAgg agg; + InternalArray.ElementGetter elementGetter; + + // primitive type + agg = new FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), true); + elementGetter = InternalArray.createElementGetter(DataTypes.INT()); + InternalArray result = + (InternalArray) + agg.retract( + new GenericArray(new int[] {1, 1, 2, 2, 3}), + new GenericArray(new int[] {1, 2, 3})); + assertThat(unnest(result, elementGetter)).containsExactlyInAnyOrder(1, 2); + + // row type + RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING()); + agg = new FieldCollectAgg(DataTypes.ARRAY(rowType), true); + elementGetter = InternalArray.createElementGetter(rowType); + + Object[] accElements = + new Object[] { + GenericRow.of(1, BinaryString.fromString("A")), + GenericRow.of(1, BinaryString.fromString("A")), + GenericRow.of(1, BinaryString.fromString("B")), + GenericRow.of(2, BinaryString.fromString("B")) + }; + Object[] retractElements = + new Object[] { + GenericRow.of(1, BinaryString.fromString("A")), + GenericRow.of(2, BinaryString.fromString("B")) + }; + result = + (InternalArray) + agg.retract( + new GenericArray(accElements), new GenericArray(retractElements)); + assertThat(unnest(result, elementGetter)) + .containsExactlyInAnyOrder( + GenericRow.of(1, BinaryString.fromString("A")), + GenericRow.of(1, BinaryString.fromString("B"))); + + // array type + ArrayType arrayType = new ArrayType(DataTypes.INT()); + agg = new FieldCollectAgg(DataTypes.ARRAY(arrayType), true); + elementGetter = InternalArray.createElementGetter(arrayType); + + accElements = + new Object[] { + new GenericArray(new Object[] {1, 1}), + new GenericArray(new Object[] {1, 1}), + new GenericArray(new Object[] {1, 2}), + new GenericArray(new Object[] {2, 1}) + }; + retractElements = + new Object[] { + new GenericArray(new Object[] {1, 1}), new GenericArray(new Object[] {1, 2}) + }; + result = + (InternalArray) + agg.retract( + new GenericArray(accElements), new GenericArray(retractElements)); + assertThat(unnest(result, elementGetter)) + .containsExactlyInAnyOrder( + new GenericArray(new Object[] {1, 1}), + new GenericArray(new Object[] {2, 1})); + + // map type + MapType mapType = new MapType(DataTypes.INT(), DataTypes.STRING()); + agg = new FieldCollectAgg(DataTypes.ARRAY(mapType), true); + elementGetter = InternalArray.createElementGetter(mapType); + + accElements = + new Object[] { + new GenericMap(toMap(1, "A")), + new GenericMap(toMap(1, "A")), + new GenericMap(toMap(2, "B", 1, "A")), + new GenericMap(toMap(1, "C")) + }; + retractElements = + new Object[] {new GenericMap(toMap(1, "A")), new GenericMap(toMap(1, "A", 2, "B"))}; + result = + (InternalArray) + agg.retract( + new GenericArray(accElements), new GenericArray(retractElements)); + + assertThat(unnest(result, elementGetter)) + .containsExactlyInAnyOrder( + new GenericMap(toMap(1, "A")), new GenericMap(toMap(1, "C"))); + } + @Test public void testFieldMergeMapAgg() { FieldMergeMapAgg agg = @@ -520,6 +692,17 @@ public void testFieldMergeMapAgg() { .containsExactlyInAnyOrderEntriesOf(toMap(1, "a", 2, "B", 3, "c")); } + @Test + public void testFieldMergeMapAggRetract() { + FieldMergeMapAgg agg = + new FieldMergeMapAgg(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())); + Object result = + agg.retract( + new GenericMap(toMap(1, "A", 2, "B", 3, "C")), + new GenericMap(toMap(1, "A", 2, "A"))); + assertThat(toJavaMap(result)).containsExactlyInAnyOrderEntriesOf(toMap(3, "C")); + } + private Map toMap(Object... kvs) { Map result = new HashMap<>(); for (int i = 0; i < kvs.length; i += 2) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java index 7434ce07f4c9..30039188d148 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.LogChangelogMode; import org.apache.paimon.CoreOptions.MergeEngine; @@ -87,6 +88,11 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { return requestedMode; } + if (options.get(MERGE_ENGINE) == MergeEngine.PARTIAL_UPDATE + && new CoreOptions(options).definedAggFunc()) { + return requestedMode; + } + if (options.get(LOG_CHANGELOG_MODE) == LogChangelogMode.ALL) { return requestedMode; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java index 6525454c6a8e..6e1778917724 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java @@ -24,6 +24,7 @@ import org.apache.paimon.utils.BlockingIterator; import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; @@ -1442,6 +1443,12 @@ private boolean checkNestedTable(Row[] nestedTable, Row... subOrders) { /** ITCase for {@link FieldCollectAgg}. */ public static class CollectAggregationITCase extends CatalogITCaseBase { + @Override + protected int defaultParallelism() { + // set parallelism to 1 so that the order of input data is determined + return 1; + } + @Test public void testAggWithDistinct() { sql( @@ -1511,6 +1518,76 @@ public void testAggWithoutDistinct() { checkOneRecord(result.get(2), 3, "car", "watch"); } + @Test + public void testRetractWithAggregation() throws Exception { + sql( + "CREATE TABLE test_collect(" + + " id INT PRIMARY KEY NOT ENFORCED," + + " f0 ARRAY" + + ") WITH (" + + " 'merge-engine' = 'aggregation'," + + " 'fields.f0.aggregate-function' = 'collect'" + + ")"); + + innerTestRetract(false); + } + + @Test + public void testRetractWithPartialUpdate() throws Exception { + sql( + "CREATE TABLE test_collect(" + + " id INT PRIMARY KEY NOT ENFORCED," + + " f0 ARRAY," + + " f1 INT" + + ") WITH (" + + " 'merge-engine' = 'partial-update'," + + " 'fields.f0.aggregate-function' = 'collect'," + + " 'fields.f1.sequence-group' = 'f0'" + + ")"); + + innerTestRetract(true); + } + + private void innerTestRetract(boolean partialUpdate) throws Exception { + String temporaryTable = + "CREATE TEMPORARY TABLE INPUT (" + + " id INT PRIMARY KEY NOT ENFORCED," + + " f0 ARRAY" + + " %s) WITH (\n" + + " 'connector' = 'values',\n" + + " 'data-id' = '%s',\n" + + " 'bounded' = 'true',\n" + + " 'changelog-mode' = 'I,UA,UB'\n" + + ")"; + + String f1; + List inputRecords; + if (partialUpdate) { + f1 = ", f1 INT"; + inputRecords = + Arrays.asList( + Row.ofKind(RowKind.INSERT, 1, new String[] {"A", "B"}, 10), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, new String[] {"A", "B"}, 10), + Row.ofKind(RowKind.UPDATE_AFTER, 1, new String[] {"C", "D"}, 20)); + } else { + f1 = ""; + inputRecords = + Arrays.asList( + Row.ofKind(RowKind.INSERT, 1, new String[] {"A", "B"}), + Row.ofKind(RowKind.UPDATE_BEFORE, 1, new String[] {"A", "B"}), + Row.ofKind(RowKind.UPDATE_AFTER, 1, new String[] {"C", "D"})); + } + streamSqlIter(temporaryTable, f1, TestValuesTableFactory.registerData(inputRecords)) + .close(); + + sEnv.executeSql("INSERT INTO test_collect SELECT * FROM INPUT").await(); + + List result = sql("SELECT * FROM test_collect"); + assertThat(result.size()).isEqualTo(1); + // if not retracted, the result would be ['A', 'B', 'C', 'D'] + checkOneRecord(result.get(0), 1, "C", "D"); + } + private void checkOneRecord(Row row, int id, String... elements) { assertThat(row.getField(0)).isEqualTo(id); if (elements == null || elements.length == 0) {