Skip to content

Commit

Permalink
[core] The agg table model supports count types (apache#2565)
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo authored Dec 25, 2023
1 parent b59c5d2 commit bd6b900
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public static FieldAggregator createFieldAggregator(
case FieldFirstNotNullValueAgg.NAME:
fieldAggregator = new FieldFirstNotNullValueAgg(fieldType);
break;
case FieldCountAgg.NAME:
fieldAggregator = new FieldCountAgg(fieldType);
break;
default:
throw new RuntimeException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.mergetree.compact.aggregate;

import org.apache.paimon.types.DataType;

/** count value aggregate a field of a row. */
public class FieldCountAgg extends FieldAggregator {

public static final String NAME = "count";

public FieldCountAgg(DataType dataType) {
super(dataType);
}

@Override
String name() {
return NAME;
}

@Override
public Object agg(Object accumulator, Object inputField) {
Object count;
if (accumulator == null || inputField == null) {
count = (accumulator == null ? 1 : accumulator);
} else {
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
case INTEGER:
count = (int) accumulator + 1;
break;
case BIGINT:
count = (long) accumulator + 1L;
break;
default:
throw new IllegalArgumentException();
}
}
return count;
}

@Override
public Object retract(Object accumulator, Object inputField) {
Object count;
if (accumulator == null || inputField == null) {
count = (accumulator == null ? 1 : accumulator);
} else {
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
case INTEGER:
count = (int) accumulator - 1;
break;
case BIGINT:
count = (long) accumulator - 1L;
break;
default:
throw new IllegalArgumentException();
}
}
return count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,15 @@ public void testFieldSumIntAgg() {
assertThat(fieldSumAgg.retract(null, 5)).isEqualTo(-5);
}

@Test
public void testFieldCountIntAgg() {
FieldCountAgg fieldAvgAgg = new FieldCountAgg(new IntType());
assertThat(fieldAvgAgg.agg(null, 10)).isEqualTo(1);
assertThat(fieldAvgAgg.agg(1, 5)).isEqualTo(2);
assertThat(fieldAvgAgg.agg(2, 15)).isEqualTo(3);
assertThat(fieldAvgAgg.agg(3, 25)).isEqualTo(4);
}

@Test
public void testFieldSumByteAgg() {
FieldSumAgg fieldSumAgg = new FieldSumAgg(new TinyIntType());
Expand Down

0 comments on commit bd6b900

Please sign in to comment.