Skip to content

Commit

Permalink
[core] Agg 'collect' and 'merge_map' support retract (#2816)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Jan 31, 2024
1 parent 89e908d commit 0bee22e
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 5 deletions.
22 changes: 18 additions & 4 deletions docs/content/concepts/primary-key-table/merge-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,29 @@ Current supported aggregate functions and data types are:
* `merge_map`:
The merge_map function merge input maps. It only supports MAP type.

Only `sum`, `product` and `count` supports retraction (`UPDATE_BEFORE` and `DELETE`), others aggregate functions do not support retraction.
If you allow some functions to ignore retraction messages, you can configure:
`'fields.${field_name}.ignore-retract'='true'`.

{{< hint info >}}
For streaming queries, `aggregation` merge engine must be used together with `lookup` or `full-compaction`
[changelog producer]({{< ref "concepts/primary-key-table/changelog-producer" >}}). ('input' changelog producer is also supported, but only returns input records.)
{{< /hint >}}

### Retract

Only `sum`, `product`, `count`, `collect` and `merge_map` supports retraction (`UPDATE_BEFORE` and `DELETE`), others aggregate functions do not support retraction.
If you allow some functions to ignore retraction messages, you can configure:
`'fields.${field_name}.ignore-retract'='true'`.

NOTE: The `collect` and `merge_map` make a best-effort attempt to handle retraction messages, but the results are not
guaranteed to be accurate. The following behaviors may occur when processing retraction messages:

1. It might fail to handle retraction messages if records are disordered. For example, the table uses `collect`, and the
upstreams send `+I['A', 'B']` and `-U['A']` respectively. If the table receives `-U['A']` first, it can do nothing; then it receives
`+I['A', 'B']`, the merge result will be `+I['A', 'B']` instead of `+I['B']`.

2. The retract message from one upstream will retract the result merged from multiple upstreams. For example, the table
uses `merge_map`, and one upstream sends `+I[1->A]`, another upstream sends `+I[1->B]`, `-D[1->B]` later. The table will
merge two insert values to `+I[1->B]` first, and then the `-D[1->B]` will retract the whole result, so the final result
is an empty map instead of `+I[1->A]`

## First Row

By specifying `'merge-engine' = 'first-row'`, users can keep the first row of the same primary key. It differs from the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,15 @@ public Map<Integer, String> fileFormatPerLevel() {
.collect(Collectors.toMap(e -> Integer.valueOf(e.getKey()), Map.Entry::getValue));
}

public boolean definedAggFunc() {
for (String key : options.toMap().keySet()) {
if (key.startsWith(FIELDS_PREFIX) && key.endsWith(AGG_FUNCTION)) {
return true;
}
}
return false;
}

public String fieldAggFunc(String fieldName) {
return options.get(
key(FIELDS_PREFIX + "." + fieldName + "." + AGG_FUNCTION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -147,4 +148,50 @@ private boolean contains(List<Object> list, @Nullable Object element) {
}
return false;
}

@Override
public Object retract(Object accumulator, Object retractField) {
if (accumulator == null) {
return null;
}

InternalArray acc = (InternalArray) accumulator;
InternalArray retract = (InternalArray) retractField;

List<Object> retractedElements = new ArrayList<>();
for (int i = 0; i < retract.size(); i++) {
retractedElements.add(elementGetter.getElementOrNull(retract, i));
}

List<Object> accElements = new ArrayList<>();
for (int i = 0; i < acc.size(); i++) {
Object candidate = elementGetter.getElementOrNull(acc, i);
if (!retract(retractedElements, candidate)) {
accElements.add(candidate);
}
}
return new GenericArray(accElements.toArray());
}

private boolean retract(List<Object> list, Object element) {
Iterator<Object> iterator = list.iterator();
while (iterator.hasNext()) {
Object o = iterator.next();
if (equals(o, element)) {
iterator.remove();
return true;
}
}
return false;
}

private boolean equals(Object a, Object b) {
if (a == null && b == null) {
return true;
} else if (a == null || b == null) {
return false;
} else {
return equaliser == null ? a.equals(b) : equaliser.apply(a, b);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.apache.paimon.types.MapType;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/** Collect elements into an ARRAY. */
/** Merge two maps. */
public class FieldMergeMapAgg extends FieldAggregator {

public static final String NAME = "merge_map";
Expand Down Expand Up @@ -69,4 +71,32 @@ private void putToMap(Map<Object, Object> map, Object data) {
valueGetter.getElementOrNull(valueArray, i));
}
}

@Override
public Object retract(Object accumulator, Object retractField) {
if (accumulator == null) {
return null;
}

InternalMap acc = (InternalMap) accumulator;
InternalMap retract = (InternalMap) retractField;

InternalArray retractKeyArray = retract.keyArray();
Set<Object> retractKeys = new HashSet<>();
for (int i = 0; i < retractKeyArray.size(); i++) {
retractKeys.add(keyGetter.getElementOrNull(retractKeyArray, i));
}

Map<Object, Object> resultMap = new HashMap<>();
InternalArray accKeyArray = acc.keyArray();
InternalArray accValueArray = acc.valueArray();
for (int i = 0; i < accKeyArray.size(); i++) {
Object accKey = keyGetter.getElementOrNull(accKeyArray, i);
if (!retractKeys.contains(accKey)) {
resultMap.put(accKey, valueGetter.getElementOrNull(accValueArray, i));
}
}

return new GenericMap(resultMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,178 @@ public void testFieldCollectAggWithoutDistinct() {
assertThat(unnest(result, elementGetter)).containsExactlyInAnyOrder(1, 1, 2, 2, 3);
}

@Test
public void testFieldCollectAggRetractWithDistinct() {
FieldCollectAgg agg;
InternalArray.ElementGetter elementGetter;

// primitive type
agg = new FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), true);
elementGetter = InternalArray.createElementGetter(DataTypes.INT());
InternalArray result =
(InternalArray)
agg.retract(
new GenericArray(new int[] {1, 2, 3}),
new GenericArray(new int[] {1}));
assertThat(unnest(result, elementGetter)).containsExactlyInAnyOrder(2, 3);

// row type
RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING());
agg = new FieldCollectAgg(DataTypes.ARRAY(rowType), true);
elementGetter = InternalArray.createElementGetter(rowType);

Object[] accElements =
new Object[] {
GenericRow.of(1, BinaryString.fromString("A")),
GenericRow.of(1, BinaryString.fromString("B")),
GenericRow.of(2, BinaryString.fromString("B"))
};
Object[] retractElements =
new Object[] {
GenericRow.of(1, BinaryString.fromString("A")),
GenericRow.of(2, BinaryString.fromString("B"))
};
result =
(InternalArray)
agg.retract(
new GenericArray(accElements), new GenericArray(retractElements));
assertThat(unnest(result, elementGetter))
.containsExactlyInAnyOrder(GenericRow.of(1, BinaryString.fromString("B")));

// array type
ArrayType arrayType = new ArrayType(DataTypes.INT());
agg = new FieldCollectAgg(DataTypes.ARRAY(arrayType), true);
elementGetter = InternalArray.createElementGetter(arrayType);

accElements =
new Object[] {
new GenericArray(new Object[] {1, 1}),
new GenericArray(new Object[] {1, 2}),
new GenericArray(new Object[] {2, 1})
};
retractElements =
new Object[] {
new GenericArray(new Object[] {1, 1}), new GenericArray(new Object[] {1, 2})
};
result =
(InternalArray)
agg.retract(
new GenericArray(accElements), new GenericArray(retractElements));
assertThat(unnest(result, elementGetter))
.containsExactlyInAnyOrder(new GenericArray(new Object[] {2, 1}));

// map type
MapType mapType = new MapType(DataTypes.INT(), DataTypes.STRING());
agg = new FieldCollectAgg(DataTypes.ARRAY(mapType), true);
elementGetter = InternalArray.createElementGetter(mapType);

accElements =
new Object[] {
new GenericMap(toMap(1, "A")),
new GenericMap(toMap(2, "B", 1, "A")),
new GenericMap(toMap(1, "C"))
};
retractElements =
new Object[] {new GenericMap(toMap(1, "A")), new GenericMap(toMap(1, "A", 2, "B"))};
result =
(InternalArray)
agg.retract(
new GenericArray(accElements), new GenericArray(retractElements));

assertThat(unnest(result, elementGetter))
.containsExactlyInAnyOrder(new GenericMap(toMap(1, "C")));
}

@Test
public void testFieldCollectAggRetractWithoutDistinct() {
FieldCollectAgg agg;
InternalArray.ElementGetter elementGetter;

// primitive type
agg = new FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), true);
elementGetter = InternalArray.createElementGetter(DataTypes.INT());
InternalArray result =
(InternalArray)
agg.retract(
new GenericArray(new int[] {1, 1, 2, 2, 3}),
new GenericArray(new int[] {1, 2, 3}));
assertThat(unnest(result, elementGetter)).containsExactlyInAnyOrder(1, 2);

// row type
RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING());
agg = new FieldCollectAgg(DataTypes.ARRAY(rowType), true);
elementGetter = InternalArray.createElementGetter(rowType);

Object[] accElements =
new Object[] {
GenericRow.of(1, BinaryString.fromString("A")),
GenericRow.of(1, BinaryString.fromString("A")),
GenericRow.of(1, BinaryString.fromString("B")),
GenericRow.of(2, BinaryString.fromString("B"))
};
Object[] retractElements =
new Object[] {
GenericRow.of(1, BinaryString.fromString("A")),
GenericRow.of(2, BinaryString.fromString("B"))
};
result =
(InternalArray)
agg.retract(
new GenericArray(accElements), new GenericArray(retractElements));
assertThat(unnest(result, elementGetter))
.containsExactlyInAnyOrder(
GenericRow.of(1, BinaryString.fromString("A")),
GenericRow.of(1, BinaryString.fromString("B")));

// array type
ArrayType arrayType = new ArrayType(DataTypes.INT());
agg = new FieldCollectAgg(DataTypes.ARRAY(arrayType), true);
elementGetter = InternalArray.createElementGetter(arrayType);

accElements =
new Object[] {
new GenericArray(new Object[] {1, 1}),
new GenericArray(new Object[] {1, 1}),
new GenericArray(new Object[] {1, 2}),
new GenericArray(new Object[] {2, 1})
};
retractElements =
new Object[] {
new GenericArray(new Object[] {1, 1}), new GenericArray(new Object[] {1, 2})
};
result =
(InternalArray)
agg.retract(
new GenericArray(accElements), new GenericArray(retractElements));
assertThat(unnest(result, elementGetter))
.containsExactlyInAnyOrder(
new GenericArray(new Object[] {1, 1}),
new GenericArray(new Object[] {2, 1}));

// map type
MapType mapType = new MapType(DataTypes.INT(), DataTypes.STRING());
agg = new FieldCollectAgg(DataTypes.ARRAY(mapType), true);
elementGetter = InternalArray.createElementGetter(mapType);

accElements =
new Object[] {
new GenericMap(toMap(1, "A")),
new GenericMap(toMap(1, "A")),
new GenericMap(toMap(2, "B", 1, "A")),
new GenericMap(toMap(1, "C"))
};
retractElements =
new Object[] {new GenericMap(toMap(1, "A")), new GenericMap(toMap(1, "A", 2, "B"))};
result =
(InternalArray)
agg.retract(
new GenericArray(accElements), new GenericArray(retractElements));

assertThat(unnest(result, elementGetter))
.containsExactlyInAnyOrder(
new GenericMap(toMap(1, "A")), new GenericMap(toMap(1, "C")));
}

@Test
public void testFieldMergeMapAgg() {
FieldMergeMapAgg agg =
Expand All @@ -520,6 +692,17 @@ public void testFieldMergeMapAgg() {
.containsExactlyInAnyOrderEntriesOf(toMap(1, "a", 2, "B", 3, "c"));
}

@Test
public void testFieldMergeMapAggRetract() {
FieldMergeMapAgg agg =
new FieldMergeMapAgg(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()));
Object result =
agg.retract(
new GenericMap(toMap(1, "A", 2, "B", 3, "C")),
new GenericMap(toMap(1, "A", 2, "A")));
assertThat(toJavaMap(result)).containsExactlyInAnyOrderEntriesOf(toMap(3, "C"));
}

private Map<Object, Object> toMap(Object... kvs) {
Map<Object, Object> result = new HashMap<>();
for (int i = 0; i < kvs.length; i += 2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.LogChangelogMode;
import org.apache.paimon.CoreOptions.MergeEngine;
Expand Down Expand Up @@ -87,6 +88,11 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return requestedMode;
}

if (options.get(MERGE_ENGINE) == MergeEngine.PARTIAL_UPDATE
&& new CoreOptions(options).definedAggFunc()) {
return requestedMode;
}

if (options.get(LOG_CHANGELOG_MODE) == LogChangelogMode.ALL) {
return requestedMode;
}
Expand Down
Loading

0 comments on commit 0bee22e

Please sign in to comment.