diff --git a/docs/content/concepts/primary-key-table.md b/docs/content/concepts/primary-key-table.md index f3eaa2e39ce3..db8933b7fbea 100644 --- a/docs/content/concepts/primary-key-table.md +++ b/docs/content/concepts/primary-key-table.md @@ -282,6 +282,10 @@ Current supported aggregate functions and data types are: The sum function aggregates the values across multiple rows. It supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE data types. +* `product` function: + The product function can compute product values across multiple lines. + It supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE data types. + * `count` function: The count function counts the values across multiple rows. It supports INTEGER, BIGINT data types. @@ -322,7 +326,7 @@ Current supported aggregate functions and data types are: The first_not_null_value function selects the first non-null value in a data set. It supports all data types. -Only `sum` supports retraction (`UPDATE_BEFORE` and `DELETE`), others aggregate functions do not support retraction. +Only `sum` and `product` supports retraction (`UPDATE_BEFORE` and `DELETE`), others aggregate functions do not support retraction. If you allow some functions to ignore retraction messages, you can configure: `'fields.${field_name}.ignore-retract'='true'`. diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java index c5e680e4722a..6b13468d44c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java @@ -80,6 +80,9 @@ public static FieldAggregator createFieldAggregator( case FieldCountAgg.NAME: fieldAggregator = new FieldCountAgg(fieldType); break; + case FieldProductAgg.NAME: + fieldAggregator = new FieldProductAgg(fieldType); + break; default: throw new RuntimeException( String.format( diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldProductAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldProductAgg.java new file mode 100644 index 000000000000..8427df476ac5 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldProductAgg.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact.aggregate; + +import org.apache.paimon.data.Decimal; +import org.apache.paimon.types.DataType; + +import java.math.BigDecimal; + +import static org.apache.paimon.data.Decimal.fromBigDecimal; + +/** product value aggregate a field of a row. */ +public class FieldProductAgg extends FieldAggregator { + + public static final String NAME = "product"; + + public FieldProductAgg(DataType dataType) { + super(dataType); + } + + @Override + String name() { + return NAME; + } + + @Override + public Object agg(Object accumulator, Object inputField) { + Object product; + + if (accumulator == null || inputField == null) { + product = (accumulator == null ? inputField : accumulator); + } else { + // ordered by type root definition + switch (fieldType.getTypeRoot()) { + case DECIMAL: + Decimal mergeFieldDD = (Decimal) accumulator; + Decimal inFieldDD = (Decimal) inputField; + assert mergeFieldDD.scale() == inFieldDD.scale() + : "Inconsistent scale of aggregate Decimal!"; + assert mergeFieldDD.precision() == inFieldDD.precision() + : "Inconsistent precision of aggregate Decimal!"; + BigDecimal bigDecimal = mergeFieldDD.toBigDecimal(); + BigDecimal bigDecimal1 = inFieldDD.toBigDecimal(); + BigDecimal mul = bigDecimal.multiply(bigDecimal1); + product = fromBigDecimal(mul, mergeFieldDD.precision(), mergeFieldDD.scale()); + break; + case TINYINT: + product = (byte) ((byte) accumulator * (byte) inputField); + break; + case SMALLINT: + product = (short) ((short) accumulator * (short) inputField); + break; + case INTEGER: + product = (int) accumulator * (int) inputField; + break; + case BIGINT: + product = (long) accumulator * (long) inputField; + break; + case FLOAT: + product = (float) accumulator * (float) inputField; + break; + case DOUBLE: + product = (double) accumulator * (double) inputField; + break; + default: + throw new IllegalArgumentException(); + } + } + return product; + } + + @Override + public Object retract(Object accumulator, Object inputField) { + Object product; + + if (accumulator == null || inputField == null) { + product = (accumulator == null ? inputField : accumulator); + } else { + switch (fieldType.getTypeRoot()) { + case DECIMAL: + Decimal mergeFieldDD = (Decimal) accumulator; + Decimal inFieldDD = (Decimal) inputField; + assert mergeFieldDD.scale() == inFieldDD.scale() + : "Inconsistent scale of aggregate Decimal!"; + assert mergeFieldDD.precision() == inFieldDD.precision() + : "Inconsistent precision of aggregate Decimal!"; + BigDecimal bigDecimal = mergeFieldDD.toBigDecimal(); + BigDecimal bigDecimal1 = inFieldDD.toBigDecimal(); + BigDecimal div = bigDecimal.divide(bigDecimal1); + product = fromBigDecimal(div, mergeFieldDD.precision(), mergeFieldDD.scale()); + break; + case TINYINT: + product = (byte) ((byte) accumulator / (byte) inputField); + break; + case SMALLINT: + product = (short) ((short) accumulator / (short) inputField); + break; + case INTEGER: + product = (int) accumulator / (int) inputField; + break; + case BIGINT: + product = (long) accumulator / (long) inputField; + break; + case FLOAT: + product = (float) accumulator / (float) inputField; + break; + case DOUBLE: + product = (double) accumulator / (double) inputField; + break; + default: + throw new IllegalArgumentException(); + } + } + return product; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index a51a1451bac7..fb026565d00b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -146,11 +146,20 @@ public void testFieldSumIntAgg() { @Test public void testFieldCountIntAgg() { - FieldCountAgg fieldAvgAgg = new FieldCountAgg(new IntType()); - assertThat(fieldAvgAgg.agg(null, 10)).isEqualTo(1); - assertThat(fieldAvgAgg.agg(1, 5)).isEqualTo(2); - assertThat(fieldAvgAgg.agg(2, 15)).isEqualTo(3); - assertThat(fieldAvgAgg.agg(3, 25)).isEqualTo(4); + FieldCountAgg fieldCountAgg = new FieldCountAgg(new IntType()); + assertThat(fieldCountAgg.agg(null, 10)).isEqualTo(1); + assertThat(fieldCountAgg.agg(1, 5)).isEqualTo(2); + assertThat(fieldCountAgg.agg(2, 15)).isEqualTo(3); + assertThat(fieldCountAgg.agg(3, 25)).isEqualTo(4); + } + + @Test + public void testFieldProductIntAgg() { + FieldProductAgg fieldProductAgg = new FieldProductAgg(new IntType()); + assertThat(fieldProductAgg.agg(null, 10)).isEqualTo(10); + assertThat(fieldProductAgg.agg(1, 10)).isEqualTo(10); + assertThat(fieldProductAgg.retract(10, 5)).isEqualTo(2); + assertThat(fieldProductAgg.retract(null, 5)).isEqualTo(5); } @Test @@ -162,6 +171,24 @@ public void testFieldSumByteAgg() { assertThat(fieldSumAgg.retract(null, (byte) 5)).isEqualTo((byte) -5); } + @Test + public void testFieldProductByteAgg() { + FieldProductAgg fieldProductAgg = new FieldProductAgg(new TinyIntType()); + assertThat(fieldProductAgg.agg(null, (byte) 10)).isEqualTo((byte) 10); + assertThat(fieldProductAgg.agg((byte) 1, (byte) 10)).isEqualTo((byte) 10); + assertThat(fieldProductAgg.retract((byte) 10, (byte) 5)).isEqualTo((byte) 2); + assertThat(fieldProductAgg.retract(null, (byte) 5)).isEqualTo((byte) 5); + } + + @Test + public void testFieldProductShortAgg() { + FieldProductAgg fieldProductAgg = new FieldProductAgg(new SmallIntType()); + assertThat(fieldProductAgg.agg(null, (short) 10)).isEqualTo((short) 10); + assertThat(fieldProductAgg.agg((short) 1, (short) 10)).isEqualTo((short) 10); + assertThat(fieldProductAgg.retract((short) 10, (short) 5)).isEqualTo((short) 2); + assertThat(fieldProductAgg.retract(null, (short) 5)).isEqualTo((short) 5); + } + @Test public void testFieldSumShortAgg() { FieldSumAgg fieldSumAgg = new FieldSumAgg(new SmallIntType()); @@ -180,6 +207,24 @@ public void testFieldSumLongAgg() { assertThat(fieldSumAgg.retract(null, 5L)).isEqualTo(-5L); } + @Test + public void testFieldProductLongAgg() { + FieldProductAgg fieldProductAgg = new FieldProductAgg(new BigIntType()); + assertThat(fieldProductAgg.agg(null, 10L)).isEqualTo(10L); + assertThat(fieldProductAgg.agg(1L, 10L)).isEqualTo(10L); + assertThat(fieldProductAgg.retract(10L, 5L)).isEqualTo(2L); + assertThat(fieldProductAgg.retract(null, 5L)).isEqualTo(5L); + } + + @Test + public void testFieldProductFloatAgg() { + FieldProductAgg fieldProductAgg = new FieldProductAgg(new FloatType()); + assertThat(fieldProductAgg.agg(null, (float) 10)).isEqualTo((float) 10); + assertThat(fieldProductAgg.agg((float) 1, (float) 10)).isEqualTo((float) 10); + assertThat(fieldProductAgg.retract((float) 10, (float) 5)).isEqualTo((float) 2); + assertThat(fieldProductAgg.retract(null, (float) 5)).isEqualTo((float) 5); + } + @Test public void testFieldSumFloatAgg() { FieldSumAgg fieldSumAgg = new FieldSumAgg(new FloatType()); @@ -189,6 +234,15 @@ public void testFieldSumFloatAgg() { assertThat(fieldSumAgg.retract(null, (float) 5)).isEqualTo((float) -5); } + @Test + public void testFieldProductDoubleAgg() { + FieldProductAgg fieldProductAgg = new FieldProductAgg(new DoubleType()); + assertThat(fieldProductAgg.agg(null, (double) 10)).isEqualTo((double) 10); + assertThat(fieldProductAgg.agg((double) 1, (double) 10)).isEqualTo((double) 10); + assertThat(fieldProductAgg.retract((double) 10, (double) 5)).isEqualTo((double) 2); + assertThat(fieldProductAgg.retract(null, (double) 5)).isEqualTo((double) 5); + } + @Test public void testFieldSumDoubleAgg() { FieldSumAgg fieldSumAgg = new FieldSumAgg(new DoubleType()); @@ -198,6 +252,15 @@ public void testFieldSumDoubleAgg() { assertThat(fieldSumAgg.retract(null, (double) 5)).isEqualTo((double) -5); } + @Test + public void testFieldProductDecimalAgg() { + FieldProductAgg fieldProductAgg = new FieldProductAgg(new DecimalType()); + assertThat(fieldProductAgg.agg(null, toDecimal(10))).isEqualTo(toDecimal(10)); + assertThat(fieldProductAgg.agg(toDecimal(1), toDecimal(10))).isEqualTo(toDecimal(10)); + assertThat(fieldProductAgg.retract(toDecimal(10), toDecimal(5))).isEqualTo(toDecimal(2)); + assertThat(fieldProductAgg.retract(null, toDecimal(5))).isEqualTo(toDecimal(5)); + } + @Test public void testFieldSumDecimalAgg() { FieldSumAgg fieldSumAgg = new FieldSumAgg(new DecimalType());