diff --git a/docs/content/primary-key-table/merge-engine/aggregation.md b/docs/content/primary-key-table/merge-engine/aggregation.md index c009d1d7ac1e..7cdd9044a1e4 100644 --- a/docs/content/primary-key-table/merge-engine/aggregation.md +++ b/docs/content/primary-key-table/merge-engine/aggregation.md @@ -93,6 +93,7 @@ Current supported aggregate functions and data types are: ### listagg The listagg function concatenates multiple string values into a single string. It supports STRING data type. + Each field not part of the primary keys can be given a list agg delimiter, specified by the fields..list-agg-delimiter table property, otherwise it will use "," as default. ### bool_and The bool_and function evaluates whether all values in a boolean set are true. diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index aa4bcb6858ea..c48527d2a23d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -81,6 +81,8 @@ public class CoreOptions implements Serializable { public static final String DISTINCT = "distinct"; + public static final String LIST_AGG_DELIMITER = "list-agg-delimiter"; + public static final String FILE_INDEX = "file-index"; public static final String COLUMNS = "columns"; @@ -1474,6 +1476,13 @@ public boolean fieldCollectAggDistinct(String fieldName) { .defaultValue(false)); } + public String fieldListAggDelimiter(String fieldName) { + return options.get( + key(FIELDS_PREFIX + "." + fieldName + "." + LIST_AGG_DELIMITER) + .stringType() + .defaultValue(",")); + } + @Nullable public String fileCompression() { return options.get(FILE_COMPRESSION); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java index 77eeecd8d8c0..8d0f5b79ff06 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java @@ -74,7 +74,7 @@ public static FieldAggregator createFieldAggregator( fieldAggregator = new FieldLastValueAgg(fieldType); break; case FieldListaggAgg.NAME: - fieldAggregator = new FieldListaggAgg(fieldType); + fieldAggregator = new FieldListaggAgg(fieldType, options, field); break; case FieldBoolOrAgg.NAME: fieldAggregator = new FieldBoolOrAgg(fieldType); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java index a4937a929307..e0286bbb746d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java @@ -18,6 +18,7 @@ package org.apache.paimon.mergetree.compact.aggregate; +import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; import org.apache.paimon.types.DataType; import org.apache.paimon.utils.StringUtils; @@ -27,11 +28,11 @@ public class FieldListaggAgg extends FieldAggregator { public static final String NAME = "listagg"; - // TODO: make it configurable by with clause - public static final String DELIMITER = ","; + private final String delimiter; - public FieldListaggAgg(DataType dataType) { + public FieldListaggAgg(DataType dataType, CoreOptions options, String field) { super(dataType); + this.delimiter = options.fieldListAggDelimiter(field); } @Override @@ -54,7 +55,7 @@ public Object agg(Object accumulator, Object inputField) { BinaryString inFieldSD = (BinaryString) inputField; concatenate = StringUtils.concat( - mergeFieldSD, BinaryString.fromString(DELIMITER), inFieldSD); + mergeFieldSD, BinaryString.fromString(delimiter), inFieldSD); break; default: String msg = diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java index 4d4117c7d11f..05fce451a40e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java @@ -48,8 +48,7 @@ /** Tests for {@link IntervalPartition}. */ public class IntervalPartitionTest { - private static final RecordComparator COMPARATOR = - (RecordComparator) (o1, o2) -> o1.getInt(0) - o2.getInt(0); + private static final RecordComparator COMPARATOR = (o1, o2) -> o1.getInt(0) - o2.getInt(0); @Test public void testSameMinKey() { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java index 4a5f5f8dfdb5..70cdbe310350 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.mergetree.compact.aggregate; import org.apache.paimon.KeyValue; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.mergetree.compact.MergeFunction; import org.apache.paimon.options.Options; @@ -35,6 +36,7 @@ /** Test for aggregate merge function. */ class AggregateMergeFunctionTest { + @Test void testDefaultAggFunc() { Options options = new Options(); @@ -62,8 +64,89 @@ void testDefaultAggFunc() { assertThat(aggregateFunction.getResult().value()).isEqualTo(GenericRow.of(1, 2, 13, 1, 1)); } + @Test + void tesListAggFunc() { + Options options = new Options(); + options.set("fields.a.aggregate-function", "listagg"); + options.set("fields.b.aggregate-function", "listagg"); + options.set("fields.b.list-agg-delimiter", "-"); + options.set("fields.d.aggregate-function", "listagg"); + options.set("fields.d.list-agg-delimiter", "/"); + + MergeFunction aggregateFunction = + AggregateMergeFunction.factory( + options, + Arrays.asList("k", "a", "b", "c", "d"), + Arrays.asList( + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.STRING()), + Collections.singletonList("k")) + .create(); + aggregateFunction.reset(); + + aggregateFunction.add( + value( + 1, + BinaryString.fromString("1"), + BinaryString.fromString("1"), + 1, + BinaryString.fromString("1"))); + aggregateFunction.add( + value( + 1, + BinaryString.fromString("2"), + BinaryString.fromString("2"), + 2, + BinaryString.fromString("2"))); + aggregateFunction.add( + value( + 1, + BinaryString.fromString("3"), + BinaryString.fromString("3"), + 3, + BinaryString.fromString("3"))); + aggregateFunction.add( + value( + 1, + BinaryString.fromString("4"), + BinaryString.fromString("4"), + 4, + BinaryString.fromString("4"))); + aggregateFunction.add( + value( + 1, + BinaryString.fromString("5"), + BinaryString.fromString("5"), + 5, + BinaryString.fromString("5"))); + assertThat(aggregateFunction.getResult().value()) + .isEqualTo( + GenericRow.of( + 1, + BinaryString.fromString("1,2,3,4,5"), + BinaryString.fromString("1-2-3-4-5"), + 5, + BinaryString.fromString("1/2/3/4/5"))); + } + private KeyValue value(Integer... values) { return new KeyValue() .replace(GenericRow.of(values[0]), RowKind.INSERT, GenericRow.of(values)); } + + private KeyValue value( + Integer value1, + BinaryString value2, + BinaryString value3, + Integer value4, + BinaryString value5) { + return new KeyValue() + .replace( + GenericRow.of(value1), + RowKind.INSERT, + GenericRow.of(value1, value2, value3, value4, value5)); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index 3140ba5f1452..7fae506226c7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.mergetree.compact.aggregate; +import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericArray; @@ -43,6 +44,8 @@ import org.apache.paimon.utils.RoaringBitmap32; import org.apache.paimon.utils.RoaringBitmap64; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; + import org.junit.jupiter.api.Test; import java.io.IOException; @@ -123,14 +126,30 @@ public void testFieldFirstNonNullValueAgg() { } @Test - public void testFieldListaggAgg() { - FieldListaggAgg fieldListaggAgg = new FieldListaggAgg(new VarCharType()); + public void testFieldListAggWithDefaultDelimiter() { + FieldListaggAgg fieldListaggAgg = + new FieldListaggAgg( + new VarCharType(), new CoreOptions(new HashMap<>()), "fieldName"); BinaryString accumulator = BinaryString.fromString("user1"); BinaryString inputField = BinaryString.fromString("user2"); assertThat(fieldListaggAgg.agg(accumulator, inputField).toString()) .isEqualTo("user1,user2"); } + @Test + public void testFieldListAggWithCustomDelimiter() { + FieldListaggAgg fieldListaggAgg = + new FieldListaggAgg( + new VarCharType(), + CoreOptions.fromMap( + ImmutableMap.of("fields.fieldName.list-agg-delimiter", "-")), + "fieldName"); + BinaryString accumulator = BinaryString.fromString("user1"); + BinaryString inputField = BinaryString.fromString("user2"); + assertThat(fieldListaggAgg.agg(accumulator, inputField).toString()) + .isEqualTo("user1-user2"); + } + @Test public void testFieldMaxAgg() { FieldMaxAgg fieldMaxAgg = new FieldMaxAgg(new IntType());