Skip to content

Commit

Permalink
[core] support user define delimiter with list agg (#3930)
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 authored Aug 11, 2024
1 parent 875cadb commit 26db324
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 9 deletions.
1 change: 1 addition & 0 deletions docs/content/primary-key-table/merge-engine/aggregation.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Current supported aggregate functions and data types are:
### listagg
The listagg function concatenates multiple string values into a single string.
It supports STRING data type.
Each field not part of the primary keys can be given a list agg delimiter, specified by the fields.<field-name>.list-agg-delimiter table property, otherwise it will use "," as default.

### bool_and
The bool_and function evaluates whether all values in a boolean set are true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public class CoreOptions implements Serializable {

public static final String DISTINCT = "distinct";

public static final String LIST_AGG_DELIMITER = "list-agg-delimiter";

public static final String FILE_INDEX = "file-index";

public static final String COLUMNS = "columns";
Expand Down Expand Up @@ -1474,6 +1476,13 @@ public boolean fieldCollectAggDistinct(String fieldName) {
.defaultValue(false));
}

public String fieldListAggDelimiter(String fieldName) {
return options.get(
key(FIELDS_PREFIX + "." + fieldName + "." + LIST_AGG_DELIMITER)
.stringType()
.defaultValue(","));
}

@Nullable
public String fileCompression() {
return options.get(FILE_COMPRESSION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static FieldAggregator createFieldAggregator(
fieldAggregator = new FieldLastValueAgg(fieldType);
break;
case FieldListaggAgg.NAME:
fieldAggregator = new FieldListaggAgg(fieldType);
fieldAggregator = new FieldListaggAgg(fieldType, options, field);
break;
case FieldBoolOrAgg.NAME:
fieldAggregator = new FieldBoolOrAgg(fieldType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.mergetree.compact.aggregate;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.StringUtils;
Expand All @@ -27,11 +28,11 @@ public class FieldListaggAgg extends FieldAggregator {

public static final String NAME = "listagg";

// TODO: make it configurable by with clause
public static final String DELIMITER = ",";
private final String delimiter;

public FieldListaggAgg(DataType dataType) {
public FieldListaggAgg(DataType dataType, CoreOptions options, String field) {
super(dataType);
this.delimiter = options.fieldListAggDelimiter(field);
}

@Override
Expand All @@ -54,7 +55,7 @@ public Object agg(Object accumulator, Object inputField) {
BinaryString inFieldSD = (BinaryString) inputField;
concatenate =
StringUtils.concat(
mergeFieldSD, BinaryString.fromString(DELIMITER), inFieldSD);
mergeFieldSD, BinaryString.fromString(delimiter), inFieldSD);
break;
default:
String msg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@
/** Tests for {@link IntervalPartition}. */
public class IntervalPartitionTest {

private static final RecordComparator COMPARATOR =
(RecordComparator) (o1, o2) -> o1.getInt(0) - o2.getInt(0);
private static final RecordComparator COMPARATOR = (o1, o2) -> o1.getInt(0) - o2.getInt(0);

@Test
public void testSameMinKey() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.mergetree.compact.aggregate;

import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.options.Options;
Expand All @@ -35,6 +36,7 @@

/** Test for aggregate merge function. */
class AggregateMergeFunctionTest {

@Test
void testDefaultAggFunc() {
Options options = new Options();
Expand Down Expand Up @@ -62,8 +64,89 @@ void testDefaultAggFunc() {
assertThat(aggregateFunction.getResult().value()).isEqualTo(GenericRow.of(1, 2, 13, 1, 1));
}

@Test
void tesListAggFunc() {
Options options = new Options();
options.set("fields.a.aggregate-function", "listagg");
options.set("fields.b.aggregate-function", "listagg");
options.set("fields.b.list-agg-delimiter", "-");
options.set("fields.d.aggregate-function", "listagg");
options.set("fields.d.list-agg-delimiter", "/");

MergeFunction<KeyValue> aggregateFunction =
AggregateMergeFunction.factory(
options,
Arrays.asList("k", "a", "b", "c", "d"),
Arrays.asList(
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.STRING()),
Collections.singletonList("k"))
.create();
aggregateFunction.reset();

aggregateFunction.add(
value(
1,
BinaryString.fromString("1"),
BinaryString.fromString("1"),
1,
BinaryString.fromString("1")));
aggregateFunction.add(
value(
1,
BinaryString.fromString("2"),
BinaryString.fromString("2"),
2,
BinaryString.fromString("2")));
aggregateFunction.add(
value(
1,
BinaryString.fromString("3"),
BinaryString.fromString("3"),
3,
BinaryString.fromString("3")));
aggregateFunction.add(
value(
1,
BinaryString.fromString("4"),
BinaryString.fromString("4"),
4,
BinaryString.fromString("4")));
aggregateFunction.add(
value(
1,
BinaryString.fromString("5"),
BinaryString.fromString("5"),
5,
BinaryString.fromString("5")));
assertThat(aggregateFunction.getResult().value())
.isEqualTo(
GenericRow.of(
1,
BinaryString.fromString("1,2,3,4,5"),
BinaryString.fromString("1-2-3-4-5"),
5,
BinaryString.fromString("1/2/3/4/5")));
}

private KeyValue value(Integer... values) {
return new KeyValue()
.replace(GenericRow.of(values[0]), RowKind.INSERT, GenericRow.of(values));
}

private KeyValue value(
Integer value1,
BinaryString value2,
BinaryString value3,
Integer value4,
BinaryString value5) {
return new KeyValue()
.replace(
GenericRow.of(value1),
RowKind.INSERT,
GenericRow.of(value1, value2, value3, value4, value5));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.mergetree.compact.aggregate;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
Expand All @@ -43,6 +44,8 @@
import org.apache.paimon.utils.RoaringBitmap32;
import org.apache.paimon.utils.RoaringBitmap64;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;

import org.junit.jupiter.api.Test;

import java.io.IOException;
Expand Down Expand Up @@ -123,14 +126,30 @@ public void testFieldFirstNonNullValueAgg() {
}

@Test
public void testFieldListaggAgg() {
FieldListaggAgg fieldListaggAgg = new FieldListaggAgg(new VarCharType());
public void testFieldListAggWithDefaultDelimiter() {
FieldListaggAgg fieldListaggAgg =
new FieldListaggAgg(
new VarCharType(), new CoreOptions(new HashMap<>()), "fieldName");
BinaryString accumulator = BinaryString.fromString("user1");
BinaryString inputField = BinaryString.fromString("user2");
assertThat(fieldListaggAgg.agg(accumulator, inputField).toString())
.isEqualTo("user1,user2");
}

@Test
public void testFieldListAggWithCustomDelimiter() {
FieldListaggAgg fieldListaggAgg =
new FieldListaggAgg(
new VarCharType(),
CoreOptions.fromMap(
ImmutableMap.of("fields.fieldName.list-agg-delimiter", "-")),
"fieldName");
BinaryString accumulator = BinaryString.fromString("user1");
BinaryString inputField = BinaryString.fromString("user2");
assertThat(fieldListaggAgg.agg(accumulator, inputField).toString())
.isEqualTo("user1-user2");
}

@Test
public void testFieldMaxAgg() {
FieldMaxAgg fieldMaxAgg = new FieldMaxAgg(new IntType());
Expand Down

0 comments on commit 26db324

Please sign in to comment.