diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java index b15d9388aa35..4d720cb3f075 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -51,7 +52,6 @@ import static org.apache.paimon.CoreOptions.FIELDS_PREFIX; import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR; import static org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE; -import static org.apache.paimon.mergetree.compact.aggregate.FieldAggregator.createFieldAggregator; import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters; /** @@ -495,7 +495,7 @@ private Map> createFieldAggregators( fieldAggregators.put( i, () -> - createFieldAggregator( + FieldAggregatorFactory.create( fieldType, strAggFunc, ignoreRetract, @@ -506,7 +506,7 @@ private Map> createFieldAggregators( fieldAggregators.put( i, () -> - createFieldAggregator( + FieldAggregatorFactory.create( fieldType, defaultAggFunc, ignoreRetract, diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java index e73bfe8e9acc..bad77ba91da5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.mergetree.compact.MergeFunction; import org.apache.paimon.mergetree.compact.MergeFunctionFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowKind; @@ -142,7 +143,7 @@ public MergeFunction create(@Nullable int[][] projection) { boolean ignoreRetract = options.fieldAggIgnoreRetract(fieldName); fieldAggregators[i] = - FieldAggregator.createFieldAggregator( + FieldAggregatorFactory.create( fieldType, strAggFunc, ignoreRetract, diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java index b776f0a2ec93..cd368a818bdd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java @@ -18,62 +18,23 @@ package org.apache.paimon.mergetree.compact.aggregate; -import org.apache.paimon.CoreOptions; -import org.apache.paimon.factories.FactoryUtil; -import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory; import org.apache.paimon.types.DataType; -import javax.annotation.Nullable; - import java.io.Serializable; /** abstract class of aggregating a field of a row. */ public abstract class FieldAggregator implements Serializable { - protected DataType fieldType; private static final long serialVersionUID = 1L; - public FieldAggregator(DataType dataType) { - this.fieldType = dataType; - } - - public static FieldAggregator createFieldAggregator( - DataType fieldType, - @Nullable String strAgg, - boolean ignoreRetract, - boolean isPrimaryKey, - CoreOptions options, - String field) { - FieldAggregator fieldAggregator; - if (isPrimaryKey) { - strAgg = FieldPrimaryKeyAgg.NAME; - } else if (strAgg == null) { - strAgg = FieldLastNonNullValueAgg.NAME; - } - - FieldAggregatorFactory fieldAggregatorFactory = - FactoryUtil.discoverFactory( - FieldAggregator.class.getClassLoader(), - FieldAggregatorFactory.class, - strAgg); - if (fieldAggregatorFactory == null) { - throw new RuntimeException( - String.format( - "Use unsupported aggregation: %s or spell aggregate function incorrectly!", - strAgg)); - } + protected final DataType fieldType; + protected final String name; - fieldAggregator = fieldAggregatorFactory.create(fieldType, options, field); - - if (ignoreRetract) { - fieldAggregator = new FieldIgnoreRetractAgg(fieldAggregator); - } - - return fieldAggregator; + public FieldAggregator(String name, DataType dataType) { + this.name = name; + this.fieldType = dataType; } - public abstract String name(); - public abstract Object agg(Object accumulator, Object inputField); public Object aggReversed(Object accumulator, Object inputField) { @@ -89,6 +50,6 @@ public Object retract(Object accumulator, Object retractField) { "Aggregate function '%s' does not support retraction," + " If you allow this function to ignore retraction messages," + " you can configure 'fields.${field_name}.ignore-retract'='true'.", - name())); + name)); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolAndAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolAndAgg.java index 5acf2595a2df..cc44ce5ed9f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolAndAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolAndAgg.java @@ -18,43 +18,22 @@ package org.apache.paimon.mergetree.compact.aggregate; -import org.apache.paimon.types.DataType; +import org.apache.paimon.types.BooleanType; /** bool_and aggregate a field of a row. */ public class FieldBoolAndAgg extends FieldAggregator { - public static final String NAME = "bool_and"; - private static final long serialVersionUID = 1L; - public FieldBoolAndAgg(DataType dataType) { - super(dataType); - } - - @Override - public String name() { - return NAME; + public FieldBoolAndAgg(String name, BooleanType dataType) { + super(name, dataType); } @Override public Object agg(Object accumulator, Object inputField) { - Object boolAnd; - if (accumulator == null || inputField == null) { - boolAnd = (inputField == null) ? accumulator : inputField; - } else { - switch (fieldType.getTypeRoot()) { - case BOOLEAN: - boolAnd = (boolean) accumulator && (boolean) inputField; - break; - default: - String msg = - String.format( - "type %s not support in %s", - fieldType.getTypeRoot().toString(), this.getClass().getName()); - throw new IllegalArgumentException(msg); - } + return accumulator == null ? inputField : accumulator; } - return boolAnd; + return (boolean) accumulator && (boolean) inputField; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolOrAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolOrAgg.java index 03a0c1c3ccb1..105f4219118d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolOrAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolOrAgg.java @@ -18,43 +18,22 @@ package org.apache.paimon.mergetree.compact.aggregate; -import org.apache.paimon.types.DataType; +import org.apache.paimon.types.BooleanType; /** bool_or aggregate a field of a row. */ public class FieldBoolOrAgg extends FieldAggregator { - public static final String NAME = "bool_or"; - private static final long serialVersionUID = 1L; - public FieldBoolOrAgg(DataType dataType) { - super(dataType); - } - - @Override - public String name() { - return NAME; + public FieldBoolOrAgg(String name, BooleanType dataType) { + super(name, dataType); } @Override public Object agg(Object accumulator, Object inputField) { - Object boolOr; - if (accumulator == null || inputField == null) { - boolOr = (inputField == null) ? accumulator : inputField; - } else { - switch (fieldType.getTypeRoot()) { - case BOOLEAN: - boolOr = (boolean) accumulator || (boolean) inputField; - break; - default: - String msg = - String.format( - "type %s not support in %s", - fieldType.getTypeRoot().toString(), this.getClass().getName()); - throw new IllegalArgumentException(msg); - } + return accumulator == null ? inputField : accumulator; } - return boolOr; + return (boolean) accumulator || (boolean) inputField; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java index 64ef223faf2c..afe5e05e70c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java @@ -43,16 +43,14 @@ /** Collect elements into an ARRAY. */ public class FieldCollectAgg extends FieldAggregator { - public static final String NAME = "collect"; - private static final long serialVersionUID = 1L; private final boolean distinct; private final InternalArray.ElementGetter elementGetter; @Nullable private final BiFunction equaliser; - public FieldCollectAgg(ArrayType dataType, boolean distinct) { - super(dataType); + public FieldCollectAgg(String name, ArrayType dataType, boolean distinct) { + super(name, dataType); this.distinct = distinct; this.elementGetter = InternalArray.createElementGetter(dataType.getElementType()); @@ -84,11 +82,6 @@ public FieldCollectAgg(ArrayType dataType, boolean distinct) { } } - @Override - public String name() { - return NAME; - } - @Override public Object aggReversed(Object accumulator, Object inputField) { // we don't need to actually do the reverse here for this agg diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstNonNullValueAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstNonNullValueAgg.java index 0bd950bbf9d4..273af1a957f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstNonNullValueAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstNonNullValueAgg.java @@ -23,20 +23,12 @@ /** first non-null value aggregate a field of a row. */ public class FieldFirstNonNullValueAgg extends FieldAggregator { - public static final String NAME = "first_non_null_value"; - public static final String LEGACY_NAME = "first_not_null_value"; - private static final long serialVersionUID = 1L; private boolean initialized; - public FieldFirstNonNullValueAgg(DataType dataType) { - super(dataType); - } - - @Override - public String name() { - return NAME; + public FieldFirstNonNullValueAgg(String name, DataType dataType) { + super(name, dataType); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstValueAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstValueAgg.java index d31a6e0ae144..436f841d95dc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstValueAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstValueAgg.java @@ -23,19 +23,12 @@ /** first value aggregate a field of a row. */ public class FieldFirstValueAgg extends FieldAggregator { - public static final String NAME = "first_value"; - private static final long serialVersionUID = 1L; private boolean initialized; - public FieldFirstValueAgg(DataType dataType) { - super(dataType); - } - - @Override - public String name() { - return NAME; + public FieldFirstValueAgg(String name, DataType dataType) { + super(name, dataType); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java index 0ccf4af6497c..aa399ac37e32 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java @@ -24,25 +24,14 @@ /** HllSketch aggregate a field of a row. */ public class FieldHllSketchAgg extends FieldAggregator { - public static final String NAME = "hll_sketch"; - private static final long serialVersionUID = 1L; - public FieldHllSketchAgg(VarBinaryType dataType) { - super(dataType); - } - - @Override - public String name() { - return NAME; + public FieldHllSketchAgg(String name, VarBinaryType dataType) { + super(name, dataType); } @Override public Object agg(Object accumulator, Object inputField) { - if (accumulator == null && inputField == null) { - return null; - } - if (accumulator == null || inputField == null) { return accumulator == null ? inputField : accumulator; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldIgnoreRetractAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldIgnoreRetractAgg.java index 40772c6d1259..e98e64852b6f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldIgnoreRetractAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldIgnoreRetractAgg.java @@ -21,20 +21,15 @@ /** An aggregator which ignores retraction messages. */ public class FieldIgnoreRetractAgg extends FieldAggregator { - private final FieldAggregator aggregator; - private static final long serialVersionUID = 1L; + private final FieldAggregator aggregator; + public FieldIgnoreRetractAgg(FieldAggregator aggregator) { - super(aggregator.fieldType); + super(aggregator.name, aggregator.fieldType); this.aggregator = aggregator; } - @Override - public String name() { - return aggregator.name(); - } - @Override public Object agg(Object accumulator, Object inputField) { return aggregator.agg(accumulator, inputField); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java index e189c20b2be7..cc5383739861 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java @@ -23,17 +23,10 @@ /** last non-null value aggregate a field of a row. */ public class FieldLastNonNullValueAgg extends FieldAggregator { - public static final String NAME = "last_non_null_value"; - private static final long serialVersionUID = 1L; - public FieldLastNonNullValueAgg(DataType dataType) { - super(dataType); - } - - @Override - public String name() { - return NAME; + public FieldLastNonNullValueAgg(String name, DataType dataType) { + super(name, dataType); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastValueAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastValueAgg.java index 22c2b3da16de..592f080fbd77 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastValueAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastValueAgg.java @@ -23,17 +23,10 @@ /** last value aggregate a field of a row. */ public class FieldLastValueAgg extends FieldAggregator { - public static final String NAME = "last_value"; - private static final long serialVersionUID = 1L; - public FieldLastValueAgg(DataType dataType) { - super(dataType); - } - - @Override - public String name() { - return NAME; + public FieldLastValueAgg(String name, DataType dataType) { + super(name, dataType); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java index 3bde7e7cc5c8..a01891501119 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java @@ -20,53 +20,32 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; -import org.apache.paimon.types.DataType; +import org.apache.paimon.types.VarCharType; import org.apache.paimon.utils.StringUtils; /** listagg aggregate a field of a row. */ public class FieldListaggAgg extends FieldAggregator { - public static final String NAME = "listagg"; - private static final long serialVersionUID = 1L; private final String delimiter; - public FieldListaggAgg(DataType dataType, CoreOptions options, String field) { - super(dataType); + public FieldListaggAgg(String name, VarCharType dataType, CoreOptions options, String field) { + super(name, dataType); this.delimiter = options.fieldListAggDelimiter(field); } - @Override - public String name() { - return NAME; - } - @Override public Object agg(Object accumulator, Object inputField) { - Object concatenate; - - if (inputField == null || accumulator == null) { - concatenate = (inputField == null) ? accumulator : inputField; - } else { - // ordered by type root definition - switch (fieldType.getTypeRoot()) { - case VARCHAR: - // TODO: ensure not VARCHAR(n) - BinaryString mergeFieldSD = (BinaryString) accumulator; - BinaryString inFieldSD = (BinaryString) inputField; - concatenate = - StringUtils.concat( - mergeFieldSD, BinaryString.fromString(delimiter), inFieldSD); - break; - default: - String msg = - String.format( - "type %s not support in %s", - fieldType.getTypeRoot().toString(), this.getClass().getName()); - throw new IllegalArgumentException(msg); - } + if (accumulator == null || inputField == null) { + return accumulator == null ? inputField : accumulator; } - return concatenate; + // ordered by type root definition + + // TODO: ensure not VARCHAR(n) + BinaryString mergeFieldSD = (BinaryString) accumulator; + BinaryString inFieldSD = (BinaryString) inputField; + + return StringUtils.concat(mergeFieldSD, BinaryString.fromString(delimiter), inFieldSD); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMaxAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMaxAgg.java index 34e7dd139ab8..06628244e44d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMaxAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMaxAgg.java @@ -25,33 +25,20 @@ /** max aggregate a field of a row. */ public class FieldMaxAgg extends FieldAggregator { - public static final String NAME = "max"; - private static final long serialVersionUID = 1L; - public FieldMaxAgg(DataType dataType) { - super(dataType); - } - - @Override - public String name() { - return NAME; + public FieldMaxAgg(String name, DataType dataType) { + super(name, dataType); } @Override public Object agg(Object accumulator, Object inputField) { - Object max; - if (accumulator == null || inputField == null) { - max = (accumulator == null ? inputField : accumulator); - } else { - DataTypeRoot type = fieldType.getTypeRoot(); - if (InternalRowUtils.compare(accumulator, inputField, type) < 0) { - max = inputField; - } else { - max = accumulator; - } + return accumulator == null ? inputField : accumulator; } - return max; + DataTypeRoot type = fieldType.getTypeRoot(); + return InternalRowUtils.compare(accumulator, inputField, type) < 0 + ? inputField + : accumulator; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java index f597e8de5dd0..9965339afd2b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java @@ -31,25 +31,18 @@ /** Merge two maps. */ public class FieldMergeMapAgg extends FieldAggregator { - public static final String NAME = "merge_map"; - private static final long serialVersionUID = 1L; private final InternalArray.ElementGetter keyGetter; private final InternalArray.ElementGetter valueGetter; - public FieldMergeMapAgg(MapType dataType) { - super(dataType); + public FieldMergeMapAgg(String name, MapType dataType) { + super(name, dataType); this.keyGetter = InternalArray.createElementGetter(dataType.getKeyType()); this.valueGetter = InternalArray.createElementGetter(dataType.getValueType()); } - @Override - public String name() { - return NAME; - } - @Override public Object agg(Object accumulator, Object inputField) { if (accumulator == null || inputField == null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMinAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMinAgg.java index 4002966dbb2b..01b0403baec3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMinAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMinAgg.java @@ -25,33 +25,21 @@ /** min aggregate a field of a row. */ public class FieldMinAgg extends FieldAggregator { - public static final String NAME = "min"; - private static final long serialVersionUID = 1L; - public FieldMinAgg(DataType dataType) { - super(dataType); - } - - @Override - public String name() { - return NAME; + public FieldMinAgg(String name, DataType dataType) { + super(name, dataType); } @Override public Object agg(Object accumulator, Object inputField) { - Object min; - if (accumulator == null || inputField == null) { - min = (accumulator == null ? inputField : accumulator); - } else { - DataTypeRoot type = fieldType.getTypeRoot(); - if (InternalRowUtils.compare(accumulator, inputField, type) < 0) { - min = accumulator; - } else { - min = inputField; - } + return accumulator == null ? inputField : accumulator; } - return min; + + DataTypeRoot type = fieldType.getTypeRoot(); + return InternalRowUtils.compare(accumulator, inputField, type) < 0 + ? accumulator + : inputField; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java index 3cd29f127041..005bf7b17f1f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java @@ -44,8 +44,6 @@ */ public class FieldNestedUpdateAgg extends FieldAggregator { - public static final String NAME = "nested_update"; - private static final long serialVersionUID = 1L; private final int nestedFields; @@ -53,8 +51,8 @@ public class FieldNestedUpdateAgg extends FieldAggregator { @Nullable private final Projection keyProjection; @Nullable private final RecordEqualiser elementEqualiser; - public FieldNestedUpdateAgg(ArrayType dataType, List nestedKey) { - super(dataType); + public FieldNestedUpdateAgg(String name, ArrayType dataType, List nestedKey) { + super(name, dataType); RowType nestedType = (RowType) dataType.getElementType(); this.nestedFields = nestedType.getFieldCount(); if (nestedKey.isEmpty()) { @@ -66,18 +64,10 @@ public FieldNestedUpdateAgg(ArrayType dataType, List nestedKey) { } } - @Override - public String name() { - return NAME; - } - @Override public Object agg(Object accumulator, Object inputField) { - if (accumulator == null) { - return inputField; - } - if (inputField == null) { - return accumulator; + if (accumulator == null || inputField == null) { + return accumulator == null ? inputField : accumulator; } InternalArray acc = (InternalArray) accumulator; diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldPrimaryKeyAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldPrimaryKeyAgg.java index e8053be1e627..3db4e9b3246d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldPrimaryKeyAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldPrimaryKeyAgg.java @@ -23,17 +23,10 @@ /** primary key aggregate a field of a row. */ public class FieldPrimaryKeyAgg extends FieldAggregator { - public static final String NAME = "primary-key"; - private static final long serialVersionUID = 1L; - public FieldPrimaryKeyAgg(DataType dataType) { - super(dataType); - } - - @Override - public String name() { - return NAME; + public FieldPrimaryKeyAgg(String name, DataType dataType) { + super(name, dataType); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldProductAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldProductAgg.java index c3fb18232dbf..26a0c0c52e14 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldProductAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldProductAgg.java @@ -28,65 +28,58 @@ /** product value aggregate a field of a row. */ public class FieldProductAgg extends FieldAggregator { - public static final String NAME = "product"; - private static final long serialVersionUID = 1L; - public FieldProductAgg(DataType dataType) { - super(dataType); - } - - @Override - public String name() { - return NAME; + public FieldProductAgg(String name, DataType dataType) { + super(name, dataType); } @Override public Object agg(Object accumulator, Object inputField) { + if (accumulator == null || inputField == null) { + return accumulator == null ? inputField : accumulator; + } + Object product; - if (accumulator == null || inputField == null) { - product = (accumulator == null ? inputField : accumulator); - } else { - // ordered by type root definition - switch (fieldType.getTypeRoot()) { - case DECIMAL: - Decimal mergeFieldDD = (Decimal) accumulator; - Decimal inFieldDD = (Decimal) inputField; - assert mergeFieldDD.scale() == inFieldDD.scale() - : "Inconsistent scale of aggregate Decimal!"; - assert mergeFieldDD.precision() == inFieldDD.precision() - : "Inconsistent precision of aggregate Decimal!"; - BigDecimal bigDecimal = mergeFieldDD.toBigDecimal(); - BigDecimal bigDecimal1 = inFieldDD.toBigDecimal(); - BigDecimal mul = bigDecimal.multiply(bigDecimal1); - product = fromBigDecimal(mul, mergeFieldDD.precision(), mergeFieldDD.scale()); - break; - case TINYINT: - product = (byte) ((byte) accumulator * (byte) inputField); - break; - case SMALLINT: - product = (short) ((short) accumulator * (short) inputField); - break; - case INTEGER: - product = (int) accumulator * (int) inputField; - break; - case BIGINT: - product = (long) accumulator * (long) inputField; - break; - case FLOAT: - product = (float) accumulator * (float) inputField; - break; - case DOUBLE: - product = (double) accumulator * (double) inputField; - break; - default: - String msg = - String.format( - "type %s not support in %s", - fieldType.getTypeRoot().toString(), this.getClass().getName()); - throw new IllegalArgumentException(msg); - } + // ordered by type root definition + switch (fieldType.getTypeRoot()) { + case DECIMAL: + Decimal mergeFieldDD = (Decimal) accumulator; + Decimal inFieldDD = (Decimal) inputField; + assert mergeFieldDD.scale() == inFieldDD.scale() + : "Inconsistent scale of aggregate Decimal!"; + assert mergeFieldDD.precision() == inFieldDD.precision() + : "Inconsistent precision of aggregate Decimal!"; + BigDecimal bigDecimal = mergeFieldDD.toBigDecimal(); + BigDecimal bigDecimal1 = inFieldDD.toBigDecimal(); + BigDecimal mul = bigDecimal.multiply(bigDecimal1); + product = fromBigDecimal(mul, mergeFieldDD.precision(), mergeFieldDD.scale()); + break; + case TINYINT: + product = (byte) ((byte) accumulator * (byte) inputField); + break; + case SMALLINT: + product = (short) ((short) accumulator * (short) inputField); + break; + case INTEGER: + product = (int) accumulator * (int) inputField; + break; + case BIGINT: + product = (long) accumulator * (long) inputField; + break; + case FLOAT: + product = (float) accumulator * (float) inputField; + break; + case DOUBLE: + product = (double) accumulator * (double) inputField; + break; + default: + String msg = + String.format( + "type %s not support in %s", + fieldType.getTypeRoot().toString(), this.getClass().getName()); + throw new IllegalArgumentException(msg); } return product; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap32Agg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap32Agg.java index 15cbc2b96e37..ef7ac20e839a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap32Agg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap32Agg.java @@ -26,29 +26,18 @@ /** roaring bitmap aggregate a field of a row. */ public class FieldRoaringBitmap32Agg extends FieldAggregator { - public static final String NAME = "rbm32"; - private static final long serialVersionUID = 1L; private final RoaringBitmap32 roaringBitmapAcc; private final RoaringBitmap32 roaringBitmapInput; - public FieldRoaringBitmap32Agg(VarBinaryType dataType) { - super(dataType); + public FieldRoaringBitmap32Agg(String name, VarBinaryType dataType) { + super(name, dataType); this.roaringBitmapAcc = new RoaringBitmap32(); this.roaringBitmapInput = new RoaringBitmap32(); } - @Override - public String name() { - return NAME; - } - @Override public Object agg(Object accumulator, Object inputField) { - if (accumulator == null && inputField == null) { - return null; - } - if (accumulator == null || inputField == null) { return accumulator == null ? inputField : accumulator; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap64Agg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap64Agg.java index aa9cff1fe120..b1d096497465 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap64Agg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap64Agg.java @@ -26,29 +26,18 @@ /** roaring bitmap aggregate a field of a row. */ public class FieldRoaringBitmap64Agg extends FieldAggregator { - public static final String NAME = "rbm64"; - private static final long serialVersionUID = 1L; private final RoaringBitmap64 roaringBitmapAcc; private final RoaringBitmap64 roaringBitmapInput; - public FieldRoaringBitmap64Agg(VarBinaryType dataType) { - super(dataType); + public FieldRoaringBitmap64Agg(String name, VarBinaryType dataType) { + super(name, dataType); this.roaringBitmapAcc = new RoaringBitmap64(); this.roaringBitmapInput = new RoaringBitmap64(); } - @Override - public String name() { - return NAME; - } - @Override public Object agg(Object accumulator, Object inputField) { - if (accumulator == null && inputField == null) { - return null; - } - if (accumulator == null || inputField == null) { return accumulator == null ? inputField : accumulator; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java index 18e4bbeaf85c..4b3ad12aea17 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java @@ -25,118 +25,109 @@ /** sum aggregate a field of a row. */ public class FieldSumAgg extends FieldAggregator { - public static final String NAME = "sum"; - private static final long serialVersionUID = 1L; - public FieldSumAgg(DataType dataType) { - super(dataType); - } - - @Override - public String name() { - return NAME; + public FieldSumAgg(String name, DataType dataType) { + super(name, dataType); } @Override public Object agg(Object accumulator, Object inputField) { + if (accumulator == null || inputField == null) { + return accumulator == null ? inputField : accumulator; + } Object sum; - if (accumulator == null || inputField == null) { - sum = (accumulator == null ? inputField : accumulator); - } else { - // ordered by type root definition - switch (fieldType.getTypeRoot()) { - case DECIMAL: - Decimal mergeFieldDD = (Decimal) accumulator; - Decimal inFieldDD = (Decimal) inputField; - assert mergeFieldDD.scale() == inFieldDD.scale() - : "Inconsistent scale of aggregate Decimal!"; - assert mergeFieldDD.precision() == inFieldDD.precision() - : "Inconsistent precision of aggregate Decimal!"; - sum = - DecimalUtils.add( - mergeFieldDD, - inFieldDD, - mergeFieldDD.precision(), - mergeFieldDD.scale()); - break; - case TINYINT: - sum = (byte) ((byte) accumulator + (byte) inputField); - break; - case SMALLINT: - sum = (short) ((short) accumulator + (short) inputField); - break; - case INTEGER: - sum = (int) accumulator + (int) inputField; - break; - case BIGINT: - sum = (long) accumulator + (long) inputField; - break; - case FLOAT: - sum = (float) accumulator + (float) inputField; - break; - case DOUBLE: - sum = (double) accumulator + (double) inputField; - break; - default: - String msg = - String.format( - "type %s not support in %s", - fieldType.getTypeRoot().toString(), this.getClass().getName()); - throw new IllegalArgumentException(msg); - } + // ordered by type root definition + switch (fieldType.getTypeRoot()) { + case DECIMAL: + Decimal mergeFieldDD = (Decimal) accumulator; + Decimal inFieldDD = (Decimal) inputField; + assert mergeFieldDD.scale() == inFieldDD.scale() + : "Inconsistent scale of aggregate Decimal!"; + assert mergeFieldDD.precision() == inFieldDD.precision() + : "Inconsistent precision of aggregate Decimal!"; + sum = + DecimalUtils.add( + mergeFieldDD, + inFieldDD, + mergeFieldDD.precision(), + mergeFieldDD.scale()); + break; + case TINYINT: + sum = (byte) ((byte) accumulator + (byte) inputField); + break; + case SMALLINT: + sum = (short) ((short) accumulator + (short) inputField); + break; + case INTEGER: + sum = (int) accumulator + (int) inputField; + break; + case BIGINT: + sum = (long) accumulator + (long) inputField; + break; + case FLOAT: + sum = (float) accumulator + (float) inputField; + break; + case DOUBLE: + sum = (double) accumulator + (double) inputField; + break; + default: + String msg = + String.format( + "type %s not support in %s", + fieldType.getTypeRoot().toString(), this.getClass().getName()); + throw new IllegalArgumentException(msg); } return sum; } @Override public Object retract(Object accumulator, Object inputField) { - Object sum; if (accumulator == null || inputField == null) { - sum = (accumulator == null ? negative(inputField) : accumulator); - } else { - switch (fieldType.getTypeRoot()) { - case DECIMAL: - Decimal mergeFieldDD = (Decimal) accumulator; - Decimal inFieldDD = (Decimal) inputField; - assert mergeFieldDD.scale() == inFieldDD.scale() - : "Inconsistent scale of aggregate Decimal!"; - assert mergeFieldDD.precision() == inFieldDD.precision() - : "Inconsistent precision of aggregate Decimal!"; - sum = - DecimalUtils.subtract( - mergeFieldDD, - inFieldDD, - mergeFieldDD.precision(), - mergeFieldDD.scale()); - break; - case TINYINT: - sum = (byte) ((byte) accumulator - (byte) inputField); - break; - case SMALLINT: - sum = (short) ((short) accumulator - (short) inputField); - break; - case INTEGER: - sum = (int) accumulator - (int) inputField; - break; - case BIGINT: - sum = (long) accumulator - (long) inputField; - break; - case FLOAT: - sum = (float) accumulator - (float) inputField; - break; - case DOUBLE: - sum = (double) accumulator - (double) inputField; - break; - default: - String msg = - String.format( - "type %s not support in %s", - fieldType.getTypeRoot().toString(), this.getClass().getName()); - throw new IllegalArgumentException(msg); - } + return (accumulator == null ? negative(inputField) : accumulator); + } + Object sum; + switch (fieldType.getTypeRoot()) { + case DECIMAL: + Decimal mergeFieldDD = (Decimal) accumulator; + Decimal inFieldDD = (Decimal) inputField; + assert mergeFieldDD.scale() == inFieldDD.scale() + : "Inconsistent scale of aggregate Decimal!"; + assert mergeFieldDD.precision() == inFieldDD.precision() + : "Inconsistent precision of aggregate Decimal!"; + sum = + DecimalUtils.subtract( + mergeFieldDD, + inFieldDD, + mergeFieldDD.precision(), + mergeFieldDD.scale()); + break; + case TINYINT: + sum = (byte) ((byte) accumulator - (byte) inputField); + break; + case SMALLINT: + sum = (short) ((short) accumulator - (short) inputField); + break; + case INTEGER: + sum = (int) accumulator - (int) inputField; + break; + case BIGINT: + sum = (long) accumulator - (long) inputField; + break; + case FLOAT: + sum = (float) accumulator - (float) inputField; + break; + case DOUBLE: + sum = (double) accumulator - (double) inputField; + break; + default: + String msg = + String.format( + "type %s not support in %s", + fieldType.getTypeRoot().toString(), this.getClass().getName()); + throw new IllegalArgumentException(msg); } return sum; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java index 7182a6744317..9622b4aff0b2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java @@ -24,25 +24,14 @@ /** ThetaSketch aggregate a field of a row. */ public class FieldThetaSketchAgg extends FieldAggregator { - public static final String NAME = "theta_sketch"; - private static final long serialVersionUID = 1L; - public FieldThetaSketchAgg(VarBinaryType dataType) { - super(dataType); - } - - @Override - public String name() { - return NAME; + public FieldThetaSketchAgg(String name, VarBinaryType dataType) { + super(name, dataType); } @Override public Object agg(Object accumulator, Object inputField) { - if (accumulator == null && inputField == null) { - return null; - } - if (accumulator == null || inputField == null) { return accumulator == null ? inputField : accumulator; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java index 44f2439fe745..d2ce0e4760ad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java @@ -20,13 +20,52 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.factories.Factory; +import org.apache.paimon.factories.FactoryUtil; import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; +import org.apache.paimon.mergetree.compact.aggregate.FieldIgnoreRetractAgg; import org.apache.paimon.types.DataType; +import javax.annotation.Nullable; + /** Factory for {@link FieldAggregator}. */ public interface FieldAggregatorFactory extends Factory { FieldAggregator create(DataType fieldType, CoreOptions options, String field); String identifier(); + + static FieldAggregator create( + DataType fieldType, + @Nullable String strAgg, + boolean ignoreRetract, + boolean isPrimaryKey, + CoreOptions options, + String field) { + FieldAggregator fieldAggregator; + if (isPrimaryKey) { + strAgg = FieldPrimaryKeyAggFactory.NAME; + } else if (strAgg == null) { + strAgg = FieldLastNonNullValueAggFactory.NAME; + } + + FieldAggregatorFactory fieldAggregatorFactory = + FactoryUtil.discoverFactory( + FieldAggregator.class.getClassLoader(), + FieldAggregatorFactory.class, + strAgg); + if (fieldAggregatorFactory == null) { + throw new RuntimeException( + String.format( + "Use unsupported aggregation: %s or spell aggregate function incorrectly!", + strAgg)); + } + + fieldAggregator = fieldAggregatorFactory.create(fieldType, options, field); + + if (ignoreRetract) { + fieldAggregator = new FieldIgnoreRetractAgg(fieldAggregator); + } + + return fieldAggregator; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolAndAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolAndAggFactory.java index 4a3a2dc88932..45bb8708da39 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolAndAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolAndAggFactory.java @@ -19,19 +19,28 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldBoolAndAgg; +import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.DataType; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** Factory for #{@link FieldBoolAndAgg}. */ public class FieldBoolAndAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "bool_and"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { - return new FieldBoolAndAgg(fieldType); + public FieldBoolAndAgg create(DataType fieldType, CoreOptions options, String field) { + checkArgument( + fieldType instanceof BooleanType, + "Data type for bool and column must be 'BooleanType' but was '%s'.", + fieldType); + return new FieldBoolAndAgg(identifier(), (BooleanType) fieldType); } @Override public String identifier() { - return FieldBoolAndAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolOrAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolOrAggFactory.java index 488325eaefa3..266ccad6a215 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolOrAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolOrAggFactory.java @@ -19,19 +19,28 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldBoolOrAgg; +import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.DataType; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** Factory for #{@link FieldBoolOrAgg}. */ public class FieldBoolOrAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "bool_or"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { - return new FieldBoolOrAgg(fieldType); + public FieldBoolOrAgg create(DataType fieldType, CoreOptions options, String field) { + checkArgument( + fieldType instanceof BooleanType, + "Data type for bool or column must be 'BooleanType' but was '%s'.", + fieldType); + return new FieldBoolOrAgg(identifier(), (BooleanType) fieldType); } @Override public String identifier() { - return FieldBoolOrAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldCollectAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldCollectAggFactory.java index b20453e074f2..a4325d165bea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldCollectAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldCollectAggFactory.java @@ -19,7 +19,6 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldCollectAgg; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataType; @@ -28,17 +27,21 @@ /** Factory for #{@link FieldCollectAgg}. */ public class FieldCollectAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "collect"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { + public FieldCollectAgg create(DataType fieldType, CoreOptions options, String field) { checkArgument( fieldType instanceof ArrayType, "Data type for collect column must be 'Array' but was '%s'.", fieldType); - return new FieldCollectAgg((ArrayType) fieldType, options.fieldCollectAggDistinct(field)); + return new FieldCollectAgg( + identifier(), (ArrayType) fieldType, options.fieldCollectAggDistinct(field)); } @Override public String identifier() { - return FieldCollectAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggFactory.java index 141da6342487..51e3a6d62e46 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggFactory.java @@ -19,19 +19,21 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldFirstNonNullValueAgg; import org.apache.paimon.types.DataType; /** Factory for #{@link FieldFirstNonNullValueAgg}. */ public class FieldFirstNonNullValueAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "first_non_null_value"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { - return new FieldFirstNonNullValueAgg(fieldType); + public FieldFirstNonNullValueAgg create(DataType fieldType, CoreOptions options, String field) { + return new FieldFirstNonNullValueAgg(identifier(), fieldType); } @Override public String identifier() { - return FieldFirstNonNullValueAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggLegacyFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggLegacyFactory.java index 1d92dd3ec857..507ecbb5c7c6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggLegacyFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggLegacyFactory.java @@ -25,13 +25,16 @@ /** Factory for legacy name of #{@link FieldFirstNonNullValueAgg}. */ public class FieldFirstNonNullValueAggLegacyFactory implements FieldAggregatorFactory { + + public static final String LEGACY_NAME = "first_not_null_value"; + @Override public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { - return new FieldFirstNonNullValueAgg(fieldType); + return new FieldFirstNonNullValueAgg(identifier(), fieldType); } @Override public String identifier() { - return FieldFirstNonNullValueAgg.LEGACY_NAME; + return LEGACY_NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstValueAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstValueAggFactory.java index cc36928c654e..84db12ffcdf6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstValueAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstValueAggFactory.java @@ -19,19 +19,21 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldFirstValueAgg; import org.apache.paimon.types.DataType; /** Factory for #{@link FieldFirstValueAgg}. */ public class FieldFirstValueAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "first_value"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { - return new FieldFirstValueAgg(fieldType); + public FieldFirstValueAgg create(DataType fieldType, CoreOptions options, String field) { + return new FieldFirstValueAgg(identifier(), fieldType); } @Override public String identifier() { - return FieldFirstValueAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldHllSketchAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldHllSketchAggFactory.java index 9f57abaee782..5777c6a416c4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldHllSketchAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldHllSketchAggFactory.java @@ -19,7 +19,6 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldHllSketchAgg; import org.apache.paimon.types.DataType; import org.apache.paimon.types.VarBinaryType; @@ -28,17 +27,20 @@ /** Factory for #{@link FieldHllSketchAgg}. */ public class FieldHllSketchAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "hll_sketch"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { + public FieldHllSketchAgg create(DataType fieldType, CoreOptions options, String field) { checkArgument( fieldType instanceof VarBinaryType, "Data type for hll sketch column must be 'VarBinaryType' but was '%s'.", fieldType); - return new FieldHllSketchAgg((VarBinaryType) fieldType); + return new FieldHllSketchAgg(identifier(), (VarBinaryType) fieldType); } @Override public String identifier() { - return FieldHllSketchAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastNonNullValueAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastNonNullValueAggFactory.java index e3e2ff079238..bbc6402bb118 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastNonNullValueAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastNonNullValueAggFactory.java @@ -19,19 +19,21 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldLastNonNullValueAgg; import org.apache.paimon.types.DataType; /** Factory for #{@link FieldLastNonNullValueAgg}. */ public class FieldLastNonNullValueAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "last_non_null_value"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { - return new FieldLastNonNullValueAgg(fieldType); + public FieldLastNonNullValueAgg create(DataType fieldType, CoreOptions options, String field) { + return new FieldLastNonNullValueAgg(identifier(), fieldType); } @Override public String identifier() { - return FieldLastNonNullValueAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastValueAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastValueAggFactory.java index b3423a39e66d..c825825a159c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastValueAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastValueAggFactory.java @@ -19,19 +19,21 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldLastValueAgg; import org.apache.paimon.types.DataType; /** Factory for #{@link FieldLastValueAgg}. */ public class FieldLastValueAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "last_value"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { - return new FieldLastValueAgg(fieldType); + public FieldLastValueAgg create(DataType fieldType, CoreOptions options, String field) { + return new FieldLastValueAgg(identifier(), fieldType); } @Override public String identifier() { - return FieldLastValueAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldListaggAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldListaggAggFactory.java index e5e85dbabdbb..cdb9c128a67a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldListaggAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldListaggAggFactory.java @@ -19,19 +19,28 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldListaggAgg; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.VarCharType; + +import static org.apache.paimon.utils.Preconditions.checkArgument; /** Factory for #{@link FieldListaggAgg}. */ public class FieldListaggAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "listagg"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { - return new FieldListaggAgg(fieldType, options, field); + public FieldListaggAgg create(DataType fieldType, CoreOptions options, String field) { + checkArgument( + fieldType instanceof VarCharType, + "Data type for list agg column must be 'VarCharType' but was '%s'.", + fieldType); + return new FieldListaggAgg(identifier(), (VarCharType) fieldType, options, field); } @Override public String identifier() { - return FieldListaggAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMaxAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMaxAggFactory.java index 2fda49a76b4c..4e3c33171a89 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMaxAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMaxAggFactory.java @@ -19,19 +19,21 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldMaxAgg; import org.apache.paimon.types.DataType; /** Factory for #{@link FieldMaxAgg}. */ public class FieldMaxAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "max"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { - return new FieldMaxAgg(fieldType); + public FieldMaxAgg create(DataType fieldType, CoreOptions options, String field) { + return new FieldMaxAgg(identifier(), fieldType); } @Override public String identifier() { - return FieldMaxAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapAggFactory.java index ac2409b25448..e10602f61810 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapAggFactory.java @@ -19,7 +19,6 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldMergeMapAgg; import org.apache.paimon.types.DataType; import org.apache.paimon.types.MapType; @@ -28,17 +27,20 @@ /** Factory for #{@link FieldMergeMapAgg}. */ public class FieldMergeMapAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "merge_map"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { + public FieldMergeMapAgg create(DataType fieldType, CoreOptions options, String field) { checkArgument( fieldType instanceof MapType, - "Data type of merge map column must be 'MAP' but was '%s'", + "Data type for merge map column must be 'MAP' but was '%s'", fieldType); - return new FieldMergeMapAgg((MapType) fieldType); + return new FieldMergeMapAgg(identifier(), (MapType) fieldType); } @Override public String identifier() { - return FieldMergeMapAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMinAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMinAggFactory.java index e939f8a73cdd..4ac7c08b1904 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMinAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMinAggFactory.java @@ -19,19 +19,21 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldMinAgg; import org.apache.paimon.types.DataType; /** Factory for #{@link FieldMinAgg}. */ public class FieldMinAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "min"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { - return new FieldMinAgg(fieldType); + public FieldMinAgg create(DataType fieldType, CoreOptions options, String field) { + return new FieldMinAgg(identifier(), fieldType); } @Override public String identifier() { - return FieldMinAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java index 41b9f3e41775..b92df641448b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java @@ -32,6 +32,9 @@ /** Factory for #{@link FieldNestedUpdateAgg}. */ public class FieldNestedUpdateAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "nested_update"; + @Override public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { return createFieldNestedUpdateAgg(fieldType, options.fieldNestedUpdateAggNestedKey(field)); @@ -39,20 +42,20 @@ public FieldAggregator create(DataType fieldType, CoreOptions options, String fi @Override public String identifier() { - return FieldNestedUpdateAgg.NAME; + return NAME; } - private static FieldAggregator createFieldNestedUpdateAgg( - DataType fieldType, List nestedKey) { + private FieldAggregator createFieldNestedUpdateAgg(DataType fieldType, List nestedKey) { if (nestedKey == null) { nestedKey = Collections.emptyList(); } - String typeErrorMsg = "Data type of nested table column must be 'Array' but was '%s'."; + String typeErrorMsg = + "Data type for nested table column must be 'Array' but was '%s'."; checkArgument(fieldType instanceof ArrayType, typeErrorMsg, fieldType); ArrayType arrayType = (ArrayType) fieldType; checkArgument(arrayType.getElementType() instanceof RowType, typeErrorMsg, fieldType); - return new FieldNestedUpdateAgg(arrayType, nestedKey); + return new FieldNestedUpdateAgg(identifier(), arrayType, nestedKey); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldPrimaryKeyAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldPrimaryKeyAggFactory.java index 312d29753b62..0e293bcf78b2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldPrimaryKeyAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldPrimaryKeyAggFactory.java @@ -25,13 +25,16 @@ /** Factory for #{@link FieldPrimaryKeyAgg}. */ public class FieldPrimaryKeyAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "primary-key"; + @Override public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { - return new FieldPrimaryKeyAgg(fieldType); + return new FieldPrimaryKeyAgg(identifier(), fieldType); } @Override public String identifier() { - return FieldPrimaryKeyAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldProductAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldProductAggFactory.java index 88be9c42a218..7dbdd9f5af5a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldProductAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldProductAggFactory.java @@ -19,19 +19,28 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldProductAgg; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeFamily; + +import static org.apache.paimon.utils.Preconditions.checkArgument; /** Factory for #{@link FieldProductAgg}. */ public class FieldProductAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "product"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { - return new FieldProductAgg(fieldType); + public FieldProductAgg create(DataType fieldType, CoreOptions options, String field) { + checkArgument( + fieldType.getTypeRoot().getFamilies().contains(DataTypeFamily.NUMERIC), + "Data type for product column must be 'NumericType' but was '%s'.", + fieldType); + return new FieldProductAgg(identifier(), fieldType); } @Override public String identifier() { - return FieldProductAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap32AggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap32AggFactory.java index 5b2a80b30817..91103791f984 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap32AggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap32AggFactory.java @@ -19,7 +19,6 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldRoaringBitmap32Agg; import org.apache.paimon.types.DataType; import org.apache.paimon.types.VarBinaryType; @@ -28,17 +27,20 @@ /** Factory for #{@link FieldRoaringBitmap32Agg}. */ public class FieldRoaringBitmap32AggFactory implements FieldAggregatorFactory { + + public static final String NAME = "rbm32"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { + public FieldRoaringBitmap32Agg create(DataType fieldType, CoreOptions options, String field) { checkArgument( fieldType instanceof VarBinaryType, "Data type for roaring bitmap column must be 'VarBinaryType' but was '%s'.", fieldType); - return new FieldRoaringBitmap32Agg((VarBinaryType) fieldType); + return new FieldRoaringBitmap32Agg(identifier(), (VarBinaryType) fieldType); } @Override public String identifier() { - return FieldRoaringBitmap32Agg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap64AggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap64AggFactory.java index a0e9fe652094..56f5554af1a9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap64AggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap64AggFactory.java @@ -19,7 +19,6 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldRoaringBitmap64Agg; import org.apache.paimon.types.DataType; import org.apache.paimon.types.VarBinaryType; @@ -28,17 +27,20 @@ /** Factory for #{@link FieldRoaringBitmap64Agg}. */ public class FieldRoaringBitmap64AggFactory implements FieldAggregatorFactory { + + public static final String NAME = "rbm64"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { + public FieldRoaringBitmap64Agg create(DataType fieldType, CoreOptions options, String field) { checkArgument( fieldType instanceof VarBinaryType, "Data type for roaring bitmap column must be 'VarBinaryType' but was '%s'.", fieldType); - return new FieldRoaringBitmap64Agg((VarBinaryType) fieldType); + return new FieldRoaringBitmap64Agg(identifier(), (VarBinaryType) fieldType); } @Override public String identifier() { - return FieldRoaringBitmap64Agg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldSumAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldSumAggFactory.java index 8470e6d1752c..5343f67b6ad7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldSumAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldSumAggFactory.java @@ -19,19 +19,28 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeFamily; + +import static org.apache.paimon.utils.Preconditions.checkArgument; /** Factory for #{@link FieldSumAgg}. */ public class FieldSumAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "sum"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { - return new FieldSumAgg(fieldType); + public FieldSumAgg create(DataType fieldType, CoreOptions options, String field) { + checkArgument( + fieldType.getTypeRoot().getFamilies().contains(DataTypeFamily.NUMERIC), + "Data type for sum column must be 'NumericType' but was '%s'.", + fieldType); + return new FieldSumAgg(identifier(), fieldType); } @Override public String identifier() { - return FieldSumAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldThetaSketchAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldThetaSketchAggFactory.java index 6aa5d7bc423f..c30fb7df7fb9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldThetaSketchAggFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldThetaSketchAggFactory.java @@ -19,7 +19,6 @@ package org.apache.paimon.mergetree.compact.aggregate.factory; import org.apache.paimon.CoreOptions; -import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; import org.apache.paimon.mergetree.compact.aggregate.FieldThetaSketchAgg; import org.apache.paimon.types.DataType; import org.apache.paimon.types.VarBinaryType; @@ -28,17 +27,20 @@ /** Factory for #{@link FieldThetaSketchAgg}. */ public class FieldThetaSketchAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "theta_sketch"; + @Override - public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { + public FieldThetaSketchAgg create(DataType fieldType, CoreOptions options, String field) { checkArgument( fieldType instanceof VarBinaryType, "Data type for theta sketch column must be 'VarBinaryType' but was '%s'.", fieldType); - return new FieldThetaSketchAgg((VarBinaryType) fieldType); + return new FieldThetaSketchAgg(identifier(), (VarBinaryType) fieldType); } @Override public String identifier() { - return FieldThetaSketchAgg.NAME; + return NAME; } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java index c8344c44d5bb..28cb4c099aa5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java @@ -26,8 +26,8 @@ import org.apache.paimon.lookup.LookupStrategy; import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction; import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator; -import org.apache.paimon.mergetree.compact.aggregate.FieldLastValueAgg; -import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastValueAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldSumAggFactory; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; @@ -293,7 +293,8 @@ public void testSum(boolean changelogRowDeduplicate) { row -> row.isNullAt(0) ? null : row.getInt(0) }, new FieldAggregator[] { - new FieldSumAgg(DataTypes.INT()) + new FieldSumAggFactory() + .create(DataTypes.INT(), null, null) }), RowType.of(DataTypes.INT()), RowType.of(DataTypes.INT())), @@ -381,7 +382,8 @@ public void testMergeHighLevelOrder() { row -> row.isNullAt(0) ? null : row.getInt(0) }, new FieldAggregator[] { - new FieldLastValueAgg(DataTypes.INT()) + new FieldLastValueAggFactory() + .create(DataTypes.INT(), null, null) }), RowType.of(DataTypes.INT()), RowType.of(DataTypes.INT())), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index 0fa1433e78e1..d32098b80f00 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -27,6 +27,25 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldBoolAndAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldBoolOrAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldCollectAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldFirstNonNullValueAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldFirstValueAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldHllSketchAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastValueAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldListaggAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldProductAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap32AggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap64AggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldSumAggFactory; +import org.apache.paimon.mergetree.compact.aggregate.factory.FieldThetaSketchAggFactory; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.BooleanType; @@ -67,14 +86,16 @@ public class FieldAggregatorTest { @Test public void testFieldBoolAndAgg() { - FieldBoolAndAgg fieldBoolAndAgg = new FieldBoolAndAgg(new BooleanType()); + FieldBoolAndAgg fieldBoolAndAgg = + new FieldBoolAndAggFactory().create(new BooleanType(), null, null); assertThat(fieldBoolAndAgg.agg(false, true)).isEqualTo(false); assertThat(fieldBoolAndAgg.agg(true, true)).isEqualTo(true); } @Test public void testFieldBoolOrAgg() { - FieldBoolOrAgg fieldBoolOrAgg = new FieldBoolOrAgg(new BooleanType()); + FieldBoolOrAgg fieldBoolOrAgg = + new FieldBoolOrAggFactory().create(new BooleanType(), null, null); assertThat(fieldBoolOrAgg.agg(false, true)).isEqualTo(true); assertThat(fieldBoolOrAgg.agg(false, false)).isEqualTo(false); } @@ -82,7 +103,7 @@ public void testFieldBoolOrAgg() { @Test public void testFieldLastNonNullValueAgg() { FieldLastNonNullValueAgg fieldLastNonNullValueAgg = - new FieldLastNonNullValueAgg(new IntType()); + new FieldLastNonNullValueAggFactory().create(new IntType(), null, null); Integer accumulator = null; Integer inputField = 1; assertThat(fieldLastNonNullValueAgg.agg(accumulator, inputField)).isEqualTo(1); @@ -94,7 +115,8 @@ public void testFieldLastNonNullValueAgg() { @Test public void testFieldLastValueAgg() { - FieldLastValueAgg fieldLastValueAgg = new FieldLastValueAgg(new IntType()); + FieldLastValueAgg fieldLastValueAgg = + new FieldLastValueAggFactory().create(new IntType(), null, null); Integer accumulator = null; Integer inputField = 1; assertThat(fieldLastValueAgg.agg(accumulator, inputField)).isEqualTo(1); @@ -106,7 +128,8 @@ public void testFieldLastValueAgg() { @Test public void testFieldFirstValueAgg() { - FieldFirstValueAgg fieldFirstValueAgg = new FieldFirstValueAgg(new IntType()); + FieldFirstValueAgg fieldFirstValueAgg = + new FieldFirstValueAggFactory().create(new IntType(), null, null); assertThat(fieldFirstValueAgg.agg(null, 1)).isEqualTo(1); assertThat(fieldFirstValueAgg.agg(1, 2)).isEqualTo(1); @@ -117,7 +140,7 @@ public void testFieldFirstValueAgg() { @Test public void testFieldFirstNonNullValueAgg() { FieldFirstNonNullValueAgg fieldFirstNonNullValueAgg = - new FieldFirstNonNullValueAgg(new IntType()); + new FieldFirstNonNullValueAggFactory().create(new IntType(), null, null); assertThat(fieldFirstNonNullValueAgg.agg(null, null)).isNull(); assertThat(fieldFirstNonNullValueAgg.agg(null, 1)).isEqualTo(1); assertThat(fieldFirstNonNullValueAgg.agg(1, 2)).isEqualTo(1); @@ -129,8 +152,8 @@ public void testFieldFirstNonNullValueAgg() { @Test public void testFieldListAggWithDefaultDelimiter() { FieldListaggAgg fieldListaggAgg = - new FieldListaggAgg( - new VarCharType(), new CoreOptions(new HashMap<>()), "fieldName"); + new FieldListaggAggFactory() + .create(new VarCharType(), new CoreOptions(new HashMap<>()), "fieldName"); BinaryString accumulator = BinaryString.fromString("user1"); BinaryString inputField = BinaryString.fromString("user2"); assertThat(fieldListaggAgg.agg(accumulator, inputField).toString()) @@ -140,11 +163,13 @@ public void testFieldListAggWithDefaultDelimiter() { @Test public void testFieldListAggWithCustomDelimiter() { FieldListaggAgg fieldListaggAgg = - new FieldListaggAgg( - new VarCharType(), - CoreOptions.fromMap( - ImmutableMap.of("fields.fieldName.list-agg-delimiter", "-")), - "fieldName"); + new FieldListaggAggFactory() + .create( + new VarCharType(), + CoreOptions.fromMap( + ImmutableMap.of( + "fields.fieldName.list-agg-delimiter", "-")), + "fieldName"); BinaryString accumulator = BinaryString.fromString("user1"); BinaryString inputField = BinaryString.fromString("user2"); assertThat(fieldListaggAgg.agg(accumulator, inputField).toString()) @@ -153,7 +178,7 @@ public void testFieldListAggWithCustomDelimiter() { @Test public void testFieldMaxAgg() { - FieldMaxAgg fieldMaxAgg = new FieldMaxAgg(new IntType()); + FieldMaxAgg fieldMaxAgg = new FieldMaxAggFactory().create(new IntType(), null, null); Integer accumulator = 1; Integer inputField = 10; assertThat(fieldMaxAgg.agg(accumulator, inputField)).isEqualTo(10); @@ -161,7 +186,7 @@ public void testFieldMaxAgg() { @Test public void testFieldMinAgg() { - FieldMinAgg fieldMinAgg = new FieldMinAgg(new IntType()); + FieldMinAgg fieldMinAgg = new FieldMinAggFactory().create(new IntType(), null, null); Integer accumulator = 1; Integer inputField = 10; assertThat(fieldMinAgg.agg(accumulator, inputField)).isEqualTo(1); @@ -169,7 +194,7 @@ public void testFieldMinAgg() { @Test public void testFieldSumIntAgg() { - FieldSumAgg fieldSumAgg = new FieldSumAgg(new IntType()); + FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new IntType(), null, null); assertThat(fieldSumAgg.agg(null, 10)).isEqualTo(10); assertThat(fieldSumAgg.agg(1, 10)).isEqualTo(11); assertThat(fieldSumAgg.retract(10, 5)).isEqualTo(5); @@ -178,7 +203,8 @@ public void testFieldSumIntAgg() { @Test public void testFieldProductIntAgg() { - FieldProductAgg fieldProductAgg = new FieldProductAgg(new IntType()); + FieldProductAgg fieldProductAgg = + new FieldProductAggFactory().create(new IntType(), null, null); assertThat(fieldProductAgg.agg(null, 10)).isEqualTo(10); assertThat(fieldProductAgg.agg(1, 10)).isEqualTo(10); assertThat(fieldProductAgg.retract(10, 5)).isEqualTo(2); @@ -187,7 +213,7 @@ public void testFieldProductIntAgg() { @Test public void testFieldSumByteAgg() { - FieldSumAgg fieldSumAgg = new FieldSumAgg(new TinyIntType()); + FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new TinyIntType(), null, null); assertThat(fieldSumAgg.agg(null, (byte) 10)).isEqualTo((byte) 10); assertThat(fieldSumAgg.agg((byte) 1, (byte) 10)).isEqualTo((byte) 11); assertThat(fieldSumAgg.retract((byte) 10, (byte) 5)).isEqualTo((byte) 5); @@ -196,7 +222,8 @@ public void testFieldSumByteAgg() { @Test public void testFieldProductByteAgg() { - FieldProductAgg fieldProductAgg = new FieldProductAgg(new TinyIntType()); + FieldProductAgg fieldProductAgg = + new FieldProductAggFactory().create(new TinyIntType(), null, null); assertThat(fieldProductAgg.agg(null, (byte) 10)).isEqualTo((byte) 10); assertThat(fieldProductAgg.agg((byte) 1, (byte) 10)).isEqualTo((byte) 10); assertThat(fieldProductAgg.retract((byte) 10, (byte) 5)).isEqualTo((byte) 2); @@ -205,7 +232,8 @@ public void testFieldProductByteAgg() { @Test public void testFieldProductShortAgg() { - FieldProductAgg fieldProductAgg = new FieldProductAgg(new SmallIntType()); + FieldProductAgg fieldProductAgg = + new FieldProductAggFactory().create(new SmallIntType(), null, null); assertThat(fieldProductAgg.agg(null, (short) 10)).isEqualTo((short) 10); assertThat(fieldProductAgg.agg((short) 1, (short) 10)).isEqualTo((short) 10); assertThat(fieldProductAgg.retract((short) 10, (short) 5)).isEqualTo((short) 2); @@ -214,7 +242,7 @@ public void testFieldProductShortAgg() { @Test public void testFieldSumShortAgg() { - FieldSumAgg fieldSumAgg = new FieldSumAgg(new SmallIntType()); + FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new SmallIntType(), null, null); assertThat(fieldSumAgg.agg(null, (short) 10)).isEqualTo((short) 10); assertThat(fieldSumAgg.agg((short) 1, (short) 10)).isEqualTo((short) 11); assertThat(fieldSumAgg.retract((short) 10, (short) 5)).isEqualTo((short) 5); @@ -223,7 +251,7 @@ public void testFieldSumShortAgg() { @Test public void testFieldSumLongAgg() { - FieldSumAgg fieldSumAgg = new FieldSumAgg(new BigIntType()); + FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new BigIntType(), null, null); assertThat(fieldSumAgg.agg(null, 10L)).isEqualTo(10L); assertThat(fieldSumAgg.agg(1L, 10L)).isEqualTo(11L); assertThat(fieldSumAgg.retract(10L, 5L)).isEqualTo(5L); @@ -232,7 +260,8 @@ public void testFieldSumLongAgg() { @Test public void testFieldProductLongAgg() { - FieldProductAgg fieldProductAgg = new FieldProductAgg(new BigIntType()); + FieldProductAgg fieldProductAgg = + new FieldProductAggFactory().create(new BigIntType(), null, null); assertThat(fieldProductAgg.agg(null, 10L)).isEqualTo(10L); assertThat(fieldProductAgg.agg(1L, 10L)).isEqualTo(10L); assertThat(fieldProductAgg.retract(10L, 5L)).isEqualTo(2L); @@ -241,7 +270,8 @@ public void testFieldProductLongAgg() { @Test public void testFieldProductFloatAgg() { - FieldProductAgg fieldProductAgg = new FieldProductAgg(new FloatType()); + FieldProductAgg fieldProductAgg = + new FieldProductAggFactory().create(new FloatType(), null, null); assertThat(fieldProductAgg.agg(null, (float) 10)).isEqualTo((float) 10); assertThat(fieldProductAgg.agg((float) 1, (float) 10)).isEqualTo((float) 10); assertThat(fieldProductAgg.retract((float) 10, (float) 5)).isEqualTo((float) 2); @@ -250,7 +280,7 @@ public void testFieldProductFloatAgg() { @Test public void testFieldSumFloatAgg() { - FieldSumAgg fieldSumAgg = new FieldSumAgg(new FloatType()); + FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new FloatType(), null, null); assertThat(fieldSumAgg.agg(null, (float) 10)).isEqualTo((float) 10); assertThat(fieldSumAgg.agg((float) 1, (float) 10)).isEqualTo((float) 11); assertThat(fieldSumAgg.retract((float) 10, (float) 5)).isEqualTo((float) 5); @@ -259,7 +289,8 @@ public void testFieldSumFloatAgg() { @Test public void testFieldProductDoubleAgg() { - FieldProductAgg fieldProductAgg = new FieldProductAgg(new DoubleType()); + FieldProductAgg fieldProductAgg = + new FieldProductAggFactory().create(new DoubleType(), null, null); assertThat(fieldProductAgg.agg(null, (double) 10)).isEqualTo((double) 10); assertThat(fieldProductAgg.agg((double) 1, (double) 10)).isEqualTo((double) 10); assertThat(fieldProductAgg.retract((double) 10, (double) 5)).isEqualTo((double) 2); @@ -268,7 +299,7 @@ public void testFieldProductDoubleAgg() { @Test public void testFieldSumDoubleAgg() { - FieldSumAgg fieldSumAgg = new FieldSumAgg(new DoubleType()); + FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new DoubleType(), null, null); assertThat(fieldSumAgg.agg(null, (double) 10)).isEqualTo((double) 10); assertThat(fieldSumAgg.agg((double) 1, (double) 10)).isEqualTo((double) 11); assertThat(fieldSumAgg.retract((double) 10, (double) 5)).isEqualTo((double) 5); @@ -277,7 +308,8 @@ public void testFieldSumDoubleAgg() { @Test public void testFieldProductDecimalAgg() { - FieldProductAgg fieldProductAgg = new FieldProductAgg(new DecimalType()); + FieldProductAgg fieldProductAgg = + new FieldProductAggFactory().create(new DecimalType(), null, null); assertThat(fieldProductAgg.agg(null, toDecimal(10))).isEqualTo(toDecimal(10)); assertThat(fieldProductAgg.agg(toDecimal(1), toDecimal(10))).isEqualTo(toDecimal(10)); assertThat(fieldProductAgg.retract(toDecimal(10), toDecimal(5))).isEqualTo(toDecimal(2)); @@ -286,7 +318,7 @@ public void testFieldProductDecimalAgg() { @Test public void testFieldSumDecimalAgg() { - FieldSumAgg fieldSumAgg = new FieldSumAgg(new DecimalType()); + FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new DecimalType(), null, null); assertThat(fieldSumAgg.agg(null, toDecimal(10))).isEqualTo(toDecimal(10)); assertThat(fieldSumAgg.agg(toDecimal(1), toDecimal(10))).isEqualTo(toDecimal(11)); assertThat(fieldSumAgg.retract(toDecimal(10), toDecimal(5))).isEqualTo(toDecimal(5)); @@ -307,6 +339,7 @@ public void testFieldNestedUpdateAgg() { DataTypes.FIELD(2, "v", DataTypes.STRING())); FieldNestedUpdateAgg agg = new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, DataTypes.ARRAY( DataTypes.ROW( DataTypes.FIELD(0, "k0", DataTypes.INT()), @@ -347,7 +380,10 @@ public void testFieldNestedAppendAgg() { DataTypes.FIELD(1, "k1", DataTypes.INT()), DataTypes.FIELD(2, "v", DataTypes.STRING())); FieldNestedUpdateAgg agg = - new FieldNestedUpdateAgg(DataTypes.ARRAY(elementRowType), Collections.emptyList()); + new FieldNestedUpdateAgg( + FieldNestedUpdateAggFactory.NAME, + DataTypes.ARRAY(elementRowType), + Collections.emptyList()); InternalArray accumulator = null; InternalArray.ElementGetter elementGetter = @@ -385,7 +421,13 @@ private InternalRow row(int k0, int k1, String v) { @Test public void testFieldCollectAggWithDistinct() { - FieldCollectAgg agg = new FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), true); + FieldCollectAgg agg = + new FieldCollectAggFactory() + .create( + DataTypes.ARRAY(DataTypes.INT()), + CoreOptions.fromMap( + ImmutableMap.of("fields.fieldName.distinct", "true")), + "fieldName"); InternalArray result; InternalArray.ElementGetter elementGetter = @@ -407,7 +449,13 @@ public void testFieldCollectAggWithDistinct() { @Test public void testFiledCollectAggWithRowType() { RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING()); - FieldCollectAgg agg = new FieldCollectAgg(DataTypes.ARRAY(rowType), true); + FieldCollectAgg agg = + new FieldCollectAggFactory() + .create( + DataTypes.ARRAY(rowType), + CoreOptions.fromMap( + ImmutableMap.of("fields.fieldName.distinct", "true")), + "fieldName"); InternalArray result; InternalArray.ElementGetter elementGetter = InternalArray.createElementGetter(rowType); @@ -438,7 +486,13 @@ public void testFiledCollectAggWithRowType() { @Test public void testFiledCollectAggWithArrayType() { ArrayType arrayType = new ArrayType(DataTypes.INT()); - FieldCollectAgg agg = new FieldCollectAgg(DataTypes.ARRAY(arrayType), true); + FieldCollectAgg agg = + new FieldCollectAggFactory() + .create( + DataTypes.ARRAY(arrayType), + CoreOptions.fromMap( + ImmutableMap.of("fields.fieldName.distinct", "true")), + "fieldName"); InternalArray result; InternalArray.ElementGetter elementGetter = InternalArray.createElementGetter(arrayType); @@ -469,7 +523,13 @@ public void testFiledCollectAggWithArrayType() { @Test public void testFiledCollectAggWithMapType() { MapType mapType = new MapType(DataTypes.INT(), DataTypes.STRING()); - FieldCollectAgg agg = new FieldCollectAgg(DataTypes.ARRAY(mapType), true); + FieldCollectAgg agg = + new FieldCollectAggFactory() + .create( + DataTypes.ARRAY(mapType), + CoreOptions.fromMap( + ImmutableMap.of("fields.fieldName.distinct", "true")), + "fieldName"); InternalArray result; InternalArray.ElementGetter elementGetter = InternalArray.createElementGetter(mapType); @@ -497,7 +557,13 @@ public void testFiledCollectAggWithMapType() { @Test public void testFieldCollectAggWithoutDistinct() { - FieldCollectAgg agg = new FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), false); + FieldCollectAgg agg = + new FieldCollectAggFactory() + .create( + DataTypes.ARRAY(DataTypes.INT()), + CoreOptions.fromMap( + ImmutableMap.of("fields.fieldName.distinct", "false")), + "fieldName"); InternalArray result; InternalArray.ElementGetter elementGetter = @@ -522,7 +588,13 @@ public void testFieldCollectAggRetractWithDistinct() { InternalArray.ElementGetter elementGetter; // primitive type - agg = new FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), true); + agg = + new FieldCollectAggFactory() + .create( + DataTypes.ARRAY(DataTypes.INT()), + CoreOptions.fromMap( + ImmutableMap.of("fields.fieldName.distinct", "true")), + "fieldName"); elementGetter = InternalArray.createElementGetter(DataTypes.INT()); InternalArray result = (InternalArray) @@ -533,7 +605,13 @@ public void testFieldCollectAggRetractWithDistinct() { // row type RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING()); - agg = new FieldCollectAgg(DataTypes.ARRAY(rowType), true); + agg = + new FieldCollectAggFactory() + .create( + DataTypes.ARRAY(rowType), + CoreOptions.fromMap( + ImmutableMap.of("fields.fieldName.distinct", "true")), + "fieldName"); elementGetter = InternalArray.createElementGetter(rowType); Object[] accElements = @@ -556,7 +634,13 @@ public void testFieldCollectAggRetractWithDistinct() { // array type ArrayType arrayType = new ArrayType(DataTypes.INT()); - agg = new FieldCollectAgg(DataTypes.ARRAY(arrayType), true); + agg = + new FieldCollectAggFactory() + .create( + DataTypes.ARRAY(arrayType), + CoreOptions.fromMap( + ImmutableMap.of("fields.fieldName.distinct", "true")), + "fieldName"); elementGetter = InternalArray.createElementGetter(arrayType); accElements = @@ -578,7 +662,13 @@ public void testFieldCollectAggRetractWithDistinct() { // map type MapType mapType = new MapType(DataTypes.INT(), DataTypes.STRING()); - agg = new FieldCollectAgg(DataTypes.ARRAY(mapType), true); + agg = + new FieldCollectAggFactory() + .create( + DataTypes.ARRAY(mapType), + CoreOptions.fromMap( + ImmutableMap.of("fields.fieldName.distinct", "true")), + "fieldName"); elementGetter = InternalArray.createElementGetter(mapType); accElements = @@ -604,7 +694,13 @@ public void testFieldCollectAggRetractWithoutDistinct() { InternalArray.ElementGetter elementGetter; // primitive type - agg = new FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), true); + agg = + new FieldCollectAggFactory() + .create( + DataTypes.ARRAY(DataTypes.INT()), + CoreOptions.fromMap( + ImmutableMap.of("fields.fieldName.distinct", "true")), + "fieldName"); elementGetter = InternalArray.createElementGetter(DataTypes.INT()); InternalArray result = (InternalArray) @@ -615,7 +711,13 @@ public void testFieldCollectAggRetractWithoutDistinct() { // row type RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING()); - agg = new FieldCollectAgg(DataTypes.ARRAY(rowType), true); + agg = + new FieldCollectAggFactory() + .create( + DataTypes.ARRAY(rowType), + CoreOptions.fromMap( + ImmutableMap.of("fields.fieldName.distinct", "true")), + "fieldName"); elementGetter = InternalArray.createElementGetter(rowType); Object[] accElements = @@ -641,7 +743,13 @@ public void testFieldCollectAggRetractWithoutDistinct() { // array type ArrayType arrayType = new ArrayType(DataTypes.INT()); - agg = new FieldCollectAgg(DataTypes.ARRAY(arrayType), true); + agg = + new FieldCollectAggFactory() + .create( + DataTypes.ARRAY(arrayType), + CoreOptions.fromMap( + ImmutableMap.of("fields.fieldName.distinct", "true")), + "fieldName"); elementGetter = InternalArray.createElementGetter(arrayType); accElements = @@ -666,7 +774,13 @@ public void testFieldCollectAggRetractWithoutDistinct() { // map type MapType mapType = new MapType(DataTypes.INT(), DataTypes.STRING()); - agg = new FieldCollectAgg(DataTypes.ARRAY(mapType), true); + agg = + new FieldCollectAggFactory() + .create( + DataTypes.ARRAY(mapType), + CoreOptions.fromMap( + ImmutableMap.of("fields.fieldName.distinct", "true")), + "fieldName"); elementGetter = InternalArray.createElementGetter(mapType); accElements = @@ -691,7 +805,8 @@ public void testFieldCollectAggRetractWithoutDistinct() { @Test public void testFieldMergeMapAgg() { FieldMergeMapAgg agg = - new FieldMergeMapAgg(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())); + new FieldMergeMapAggFactory() + .create(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()), null, null); assertThat(agg.agg(null, null)).isNull(); @@ -710,7 +825,8 @@ public void testFieldMergeMapAgg() { @Test public void testFieldMergeMapAggRetract() { FieldMergeMapAgg agg = - new FieldMergeMapAgg(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())); + new FieldMergeMapAggFactory() + .create(DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()), null, null); Object result = agg.retract( new GenericMap(toMap(1, "A", 2, "B", 3, "C")), @@ -720,7 +836,8 @@ public void testFieldMergeMapAggRetract() { @Test public void testFieldThetaSketchAgg() { - FieldThetaSketchAgg agg = new FieldThetaSketchAgg(DataTypes.VARBINARY(20)); + FieldThetaSketchAgg agg = + new FieldThetaSketchAggFactory().create(DataTypes.VARBINARY(20), null, null); byte[] inputVal = sketchOf(1); byte[] acc1 = sketchOf(2, 3); @@ -743,7 +860,8 @@ public void testFieldThetaSketchAgg() { @Test public void testFieldHllSketchAgg() { - FieldHllSketchAgg agg = new FieldHllSketchAgg(DataTypes.VARBINARY(20)); + FieldHllSketchAgg agg = + new FieldHllSketchAggFactory().create(DataTypes.VARBINARY(20), null, null); byte[] inputVal = HllSketchUtil.sketchOf(1); byte[] acc1 = HllSketchUtil.sketchOf(2, 3); @@ -766,7 +884,8 @@ public void testFieldHllSketchAgg() { @Test public void testFieldRoaringBitmap32Agg() { - FieldRoaringBitmap32Agg agg = new FieldRoaringBitmap32Agg(DataTypes.VARBINARY(20)); + FieldRoaringBitmap32Agg agg = + new FieldRoaringBitmap32AggFactory().create(DataTypes.VARBINARY(20), null, null); byte[] inputVal = RoaringBitmap32.bitmapOf(1).serialize(); byte[] acc1 = RoaringBitmap32.bitmapOf(2, 3).serialize(); @@ -789,7 +908,8 @@ public void testFieldRoaringBitmap32Agg() { @Test public void testFieldRoaringBitmap64Agg() throws IOException { - FieldRoaringBitmap64Agg agg = new FieldRoaringBitmap64Agg(DataTypes.VARBINARY(20)); + FieldRoaringBitmap64Agg agg = + new FieldRoaringBitmap64AggFactory().create(DataTypes.VARBINARY(20), null, null); byte[] inputVal = RoaringBitmap64.bitmapOf(1L).serialize(); byte[] acc1 = RoaringBitmap64.bitmapOf(2L, 3L).serialize(); @@ -813,7 +933,7 @@ public void testFieldRoaringBitmap64Agg() throws IOException { @Test public void testCustomAgg() throws IOException { FieldAggregator fieldAggregator = - FieldAggregator.createFieldAggregator( + FieldAggregatorFactory.create( DataTypes.STRING(), "custom", false, diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAgg.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAgg.java similarity index 84% rename from paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAgg.java rename to paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAgg.java index aedf6a373dab..3550ebe27715 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAgg.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAgg.java @@ -21,14 +21,10 @@ import org.apache.paimon.types.DataType; /** Custom FieldAggregator for Test. */ -public class TestCostomAgg extends FieldAggregator { - public TestCostomAgg(DataType dataType) { - super(dataType); - } +public class TestCustomAgg extends FieldAggregator { - @Override - public String name() { - return "custom"; + public TestCustomAgg(String name, DataType dataType) { + super(name, dataType); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAggFactory.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAggFactory.java similarity index 82% rename from paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAggFactory.java rename to paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAggFactory.java index e8884bfb50ae..7e7715f6d91b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAggFactory.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAggFactory.java @@ -22,15 +22,18 @@ import org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory; import org.apache.paimon.types.DataType; -/** FieldAggregatorFactory for test. */ -public class TestCostomAggFactory implements FieldAggregatorFactory { +/** FieldAggregatorFactory for #{@link TestCustomAgg} test. */ +public class TestCustomAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "custom"; + @Override public FieldAggregator create(DataType fieldType, CoreOptions options, String field) { - return new TestCostomAgg(fieldType); + return new TestCustomAgg(identifier(), fieldType); } @Override public String identifier() { - return "custom"; + return NAME; } } diff --git a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory index f3e74bb8925d..7eb517ab9835 100644 --- a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.paimon.mergetree.compact.aggregate.TestCostomAggFactory \ No newline at end of file +org.apache.paimon.mergetree.compact.aggregate.TestCustomAggFactory \ No newline at end of file