Skip to content

Commit

Permalink
[core] Support product function for agg table (apache#2572)
Browse files Browse the repository at this point in the history
  • Loading branch information
TaoZex authored Dec 26, 2023
1 parent 600cc03 commit 5178dc7
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 6 deletions.
6 changes: 5 additions & 1 deletion docs/content/concepts/primary-key-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ Current supported aggregate functions and data types are:
The sum function aggregates the values across multiple rows.
It supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE data types.

* `product` function:
The product function can compute product values across multiple lines.
It supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE data types.

* `count` function:
The count function counts the values across multiple rows.
It supports INTEGER, BIGINT data types.
Expand Down Expand Up @@ -322,7 +326,7 @@ Current supported aggregate functions and data types are:
The first_not_null_value function selects the first non-null value in a data set.
It supports all data types.

Only `sum` supports retraction (`UPDATE_BEFORE` and `DELETE`), others aggregate functions do not support retraction.
Only `sum` and `product` supports retraction (`UPDATE_BEFORE` and `DELETE`), others aggregate functions do not support retraction.
If you allow some functions to ignore retraction messages, you can configure:
`'fields.${field_name}.ignore-retract'='true'`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public static FieldAggregator createFieldAggregator(
case FieldCountAgg.NAME:
fieldAggregator = new FieldCountAgg(fieldType);
break;
case FieldProductAgg.NAME:
fieldAggregator = new FieldProductAgg(fieldType);
break;
default:
throw new RuntimeException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.mergetree.compact.aggregate;

import org.apache.paimon.data.Decimal;
import org.apache.paimon.types.DataType;

import java.math.BigDecimal;

import static org.apache.paimon.data.Decimal.fromBigDecimal;

/** product value aggregate a field of a row. */
public class FieldProductAgg extends FieldAggregator {

public static final String NAME = "product";

public FieldProductAgg(DataType dataType) {
super(dataType);
}

@Override
String name() {
return NAME;
}

@Override
public Object agg(Object accumulator, Object inputField) {
Object product;

if (accumulator == null || inputField == null) {
product = (accumulator == null ? inputField : accumulator);
} else {
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
case DECIMAL:
Decimal mergeFieldDD = (Decimal) accumulator;
Decimal inFieldDD = (Decimal) inputField;
assert mergeFieldDD.scale() == inFieldDD.scale()
: "Inconsistent scale of aggregate Decimal!";
assert mergeFieldDD.precision() == inFieldDD.precision()
: "Inconsistent precision of aggregate Decimal!";
BigDecimal bigDecimal = mergeFieldDD.toBigDecimal();
BigDecimal bigDecimal1 = inFieldDD.toBigDecimal();
BigDecimal mul = bigDecimal.multiply(bigDecimal1);
product = fromBigDecimal(mul, mergeFieldDD.precision(), mergeFieldDD.scale());
break;
case TINYINT:
product = (byte) ((byte) accumulator * (byte) inputField);
break;
case SMALLINT:
product = (short) ((short) accumulator * (short) inputField);
break;
case INTEGER:
product = (int) accumulator * (int) inputField;
break;
case BIGINT:
product = (long) accumulator * (long) inputField;
break;
case FLOAT:
product = (float) accumulator * (float) inputField;
break;
case DOUBLE:
product = (double) accumulator * (double) inputField;
break;
default:
throw new IllegalArgumentException();
}
}
return product;
}

@Override
public Object retract(Object accumulator, Object inputField) {
Object product;

if (accumulator == null || inputField == null) {
product = (accumulator == null ? inputField : accumulator);
} else {
switch (fieldType.getTypeRoot()) {
case DECIMAL:
Decimal mergeFieldDD = (Decimal) accumulator;
Decimal inFieldDD = (Decimal) inputField;
assert mergeFieldDD.scale() == inFieldDD.scale()
: "Inconsistent scale of aggregate Decimal!";
assert mergeFieldDD.precision() == inFieldDD.precision()
: "Inconsistent precision of aggregate Decimal!";
BigDecimal bigDecimal = mergeFieldDD.toBigDecimal();
BigDecimal bigDecimal1 = inFieldDD.toBigDecimal();
BigDecimal div = bigDecimal.divide(bigDecimal1);
product = fromBigDecimal(div, mergeFieldDD.precision(), mergeFieldDD.scale());
break;
case TINYINT:
product = (byte) ((byte) accumulator / (byte) inputField);
break;
case SMALLINT:
product = (short) ((short) accumulator / (short) inputField);
break;
case INTEGER:
product = (int) accumulator / (int) inputField;
break;
case BIGINT:
product = (long) accumulator / (long) inputField;
break;
case FLOAT:
product = (float) accumulator / (float) inputField;
break;
case DOUBLE:
product = (double) accumulator / (double) inputField;
break;
default:
throw new IllegalArgumentException();
}
}
return product;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,20 @@ public void testFieldSumIntAgg() {

@Test
public void testFieldCountIntAgg() {
FieldCountAgg fieldAvgAgg = new FieldCountAgg(new IntType());
assertThat(fieldAvgAgg.agg(null, 10)).isEqualTo(1);
assertThat(fieldAvgAgg.agg(1, 5)).isEqualTo(2);
assertThat(fieldAvgAgg.agg(2, 15)).isEqualTo(3);
assertThat(fieldAvgAgg.agg(3, 25)).isEqualTo(4);
FieldCountAgg fieldCountAgg = new FieldCountAgg(new IntType());
assertThat(fieldCountAgg.agg(null, 10)).isEqualTo(1);
assertThat(fieldCountAgg.agg(1, 5)).isEqualTo(2);
assertThat(fieldCountAgg.agg(2, 15)).isEqualTo(3);
assertThat(fieldCountAgg.agg(3, 25)).isEqualTo(4);
}

@Test
public void testFieldProductIntAgg() {
FieldProductAgg fieldProductAgg = new FieldProductAgg(new IntType());
assertThat(fieldProductAgg.agg(null, 10)).isEqualTo(10);
assertThat(fieldProductAgg.agg(1, 10)).isEqualTo(10);
assertThat(fieldProductAgg.retract(10, 5)).isEqualTo(2);
assertThat(fieldProductAgg.retract(null, 5)).isEqualTo(5);
}

@Test
Expand All @@ -162,6 +171,24 @@ public void testFieldSumByteAgg() {
assertThat(fieldSumAgg.retract(null, (byte) 5)).isEqualTo((byte) -5);
}

@Test
public void testFieldProductByteAgg() {
FieldProductAgg fieldProductAgg = new FieldProductAgg(new TinyIntType());
assertThat(fieldProductAgg.agg(null, (byte) 10)).isEqualTo((byte) 10);
assertThat(fieldProductAgg.agg((byte) 1, (byte) 10)).isEqualTo((byte) 10);
assertThat(fieldProductAgg.retract((byte) 10, (byte) 5)).isEqualTo((byte) 2);
assertThat(fieldProductAgg.retract(null, (byte) 5)).isEqualTo((byte) 5);
}

@Test
public void testFieldProductShortAgg() {
FieldProductAgg fieldProductAgg = new FieldProductAgg(new SmallIntType());
assertThat(fieldProductAgg.agg(null, (short) 10)).isEqualTo((short) 10);
assertThat(fieldProductAgg.agg((short) 1, (short) 10)).isEqualTo((short) 10);
assertThat(fieldProductAgg.retract((short) 10, (short) 5)).isEqualTo((short) 2);
assertThat(fieldProductAgg.retract(null, (short) 5)).isEqualTo((short) 5);
}

@Test
public void testFieldSumShortAgg() {
FieldSumAgg fieldSumAgg = new FieldSumAgg(new SmallIntType());
Expand All @@ -180,6 +207,24 @@ public void testFieldSumLongAgg() {
assertThat(fieldSumAgg.retract(null, 5L)).isEqualTo(-5L);
}

@Test
public void testFieldProductLongAgg() {
FieldProductAgg fieldProductAgg = new FieldProductAgg(new BigIntType());
assertThat(fieldProductAgg.agg(null, 10L)).isEqualTo(10L);
assertThat(fieldProductAgg.agg(1L, 10L)).isEqualTo(10L);
assertThat(fieldProductAgg.retract(10L, 5L)).isEqualTo(2L);
assertThat(fieldProductAgg.retract(null, 5L)).isEqualTo(5L);
}

@Test
public void testFieldProductFloatAgg() {
FieldProductAgg fieldProductAgg = new FieldProductAgg(new FloatType());
assertThat(fieldProductAgg.agg(null, (float) 10)).isEqualTo((float) 10);
assertThat(fieldProductAgg.agg((float) 1, (float) 10)).isEqualTo((float) 10);
assertThat(fieldProductAgg.retract((float) 10, (float) 5)).isEqualTo((float) 2);
assertThat(fieldProductAgg.retract(null, (float) 5)).isEqualTo((float) 5);
}

@Test
public void testFieldSumFloatAgg() {
FieldSumAgg fieldSumAgg = new FieldSumAgg(new FloatType());
Expand All @@ -189,6 +234,15 @@ public void testFieldSumFloatAgg() {
assertThat(fieldSumAgg.retract(null, (float) 5)).isEqualTo((float) -5);
}

@Test
public void testFieldProductDoubleAgg() {
FieldProductAgg fieldProductAgg = new FieldProductAgg(new DoubleType());
assertThat(fieldProductAgg.agg(null, (double) 10)).isEqualTo((double) 10);
assertThat(fieldProductAgg.agg((double) 1, (double) 10)).isEqualTo((double) 10);
assertThat(fieldProductAgg.retract((double) 10, (double) 5)).isEqualTo((double) 2);
assertThat(fieldProductAgg.retract(null, (double) 5)).isEqualTo((double) 5);
}

@Test
public void testFieldSumDoubleAgg() {
FieldSumAgg fieldSumAgg = new FieldSumAgg(new DoubleType());
Expand All @@ -198,6 +252,15 @@ public void testFieldSumDoubleAgg() {
assertThat(fieldSumAgg.retract(null, (double) 5)).isEqualTo((double) -5);
}

@Test
public void testFieldProductDecimalAgg() {
FieldProductAgg fieldProductAgg = new FieldProductAgg(new DecimalType());
assertThat(fieldProductAgg.agg(null, toDecimal(10))).isEqualTo(toDecimal(10));
assertThat(fieldProductAgg.agg(toDecimal(1), toDecimal(10))).isEqualTo(toDecimal(10));
assertThat(fieldProductAgg.retract(toDecimal(10), toDecimal(5))).isEqualTo(toDecimal(2));
assertThat(fieldProductAgg.retract(null, toDecimal(5))).isEqualTo(toDecimal(5));
}

@Test
public void testFieldSumDecimalAgg() {
FieldSumAgg fieldSumAgg = new FieldSumAgg(new DecimalType());
Expand Down

0 comments on commit 5178dc7

Please sign in to comment.