From f37bc3d4cbfb913bcdcf1b3e31e0862c95faa57d Mon Sep 17 00:00:00 2001 From: Raghav Agrawal Date: Thu, 16 Jan 2025 07:37:43 +0530 Subject: [PATCH] Add support for KLL sketch aggregation in minion jobs (#14702) Co-authored-by: Raghav Agrawal --- .../pinot/core/common/MinionConstants.java | 2 +- .../PercentileKLLAggregationFunction.java | 6 +- .../PercentileKLLSketchAggregator.java | 63 +++++++++++++++++++ .../aggregator/ValueAggregatorFactory.java | 3 + .../apache/pinot/segment/spi/Constants.java | 1 + .../pinot/spi/utils/CommonConstants.java | 4 ++ 6 files changed, 76 insertions(+), 3 deletions(-) create mode 100644 pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileKLLSketchAggregator.java diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 9fef661075b0..79d8388fc849 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -160,7 +160,7 @@ public static class RealtimeToOfflineSegmentsTask extends MergeTask { DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH, SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH, DISTINCTCOUNTHLLPLUS, DISTINCTCOUNTRAWHLLPLUS, DISTINCTCOUNTCPCSKETCH, DISTINCTCOUNTRAWCPCSKETCH, DISTINCTCOUNTULL, - DISTINCTCOUNTRAWULL); + DISTINCTCOUNTRAWULL, PERCENTILEKLL, PERCENTILERAWKLL); } // Generate segment and push to controller based on batch ingestion configs diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLAggregationFunction.java index bcf025a80149..a4551af570b3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/PercentileKLLAggregationFunction.java @@ -32,6 +32,7 @@ import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.utils.CommonConstants; /** @@ -62,7 +63,6 @@ */ public class PercentileKLLAggregationFunction extends NullableSingleInputAggregationFunction> { - protected static final int DEFAULT_K_VALUE = 200; protected final double _percentile; protected int _kValue; @@ -79,7 +79,9 @@ public PercentileKLLAggregationFunction(List arguments, boole Preconditions.checkArgument(_percentile >= 0 && _percentile <= 100, "Percentile value needs to be in range 0-100, inclusive"); - _kValue = numArguments == 3 ? arguments.get(2).getLiteral().getIntValue() : DEFAULT_K_VALUE; + _kValue = (numArguments == 3) + ? arguments.get(2).getLiteral().getIntValue() + : CommonConstants.Helix.DEFAULT_KLL_SKETCH_K; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileKLLSketchAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileKLLSketchAggregator.java new file mode 100644 index 000000000000..04b9dd42e503 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileKLLSketchAggregator.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.aggregator; + +import java.util.Map; +import org.apache.datasketches.common.SketchesArgumentException; +import org.apache.datasketches.kll.KllDoublesSketch; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.segment.spi.Constants; +import org.apache.pinot.spi.utils.CommonConstants; + + +/** + * Class to merge KLL doubles sketch for minion merge/rollup tasks. + */ +public class PercentileKLLSketchAggregator implements ValueAggregator { + + /** + * Given two kll doubles sketches, return the aggregated kll doubles sketches + * @return aggregated sketch given two kll doubles sketches + */ + @Override + public Object aggregate(Object value1, Object value2, Map functionParameters) { + try { + String kParam = functionParameters.get(Constants.KLL_DOUBLE_SKETCH_K); + + int sketchKValue; + + // Check if nominal entries values match + if (kParam != null) { + sketchKValue = Integer.parseInt(kParam); + } else { + // If the functionParameters don't have an explicit K use the default value for K + sketchKValue = CommonConstants.Helix.DEFAULT_KLL_SKETCH_K; + } + + KllDoublesSketch first = ObjectSerDeUtils.KLL_SKETCH_SER_DE.deserialize((byte[]) value1); + KllDoublesSketch second = ObjectSerDeUtils.KLL_SKETCH_SER_DE.deserialize((byte[]) value2); + KllDoublesSketch union = KllDoublesSketch.newHeapInstance(sketchKValue); + union.merge(first); + union.merge(second); + return ObjectSerDeUtils.KLL_SKETCH_SER_DE.serialize(union); + } catch (SketchesArgumentException e) { + throw new RuntimeException(e); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java index 3b51f417871b..d126cad0d536 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java @@ -61,6 +61,9 @@ public static ValueAggregator getValueAggregator(AggregationFunctionType aggrega case DISTINCTCOUNTULL: case DISTINCTCOUNTRAWULL: return new DistinctCountULLAggregator(); + case PERCENTILEKLL: + case PERCENTILERAWKLL: + return new PercentileKLLSketchAggregator(); default: throw new IllegalStateException("Unsupported aggregation type: " + aggregationType); } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/Constants.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/Constants.java index d1e717ca6c11..911bde9a421e 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/Constants.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/Constants.java @@ -33,4 +33,5 @@ private Constants() { public static final String THETA_TUPLE_SKETCH_SAMPLING_PROBABILITY = "samplingProbability"; public static final String PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY = "compressionFactor"; public static final String SUMPRECISION_PRECISION_KEY = "precision"; + public static final String KLL_DOUBLE_SKETCH_K = "K"; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index c44bd246a0f5..e3c3e0d48348 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -129,6 +129,10 @@ public static class Helix { public static final int DEFAULT_CPC_SKETCH_LGK = 12; public static final int DEFAULT_ULTRALOGLOG_P = 12; + // K is set to 200, for tradeoffs see datasketches library documentation: + // https://datasketches.apache.org/docs/KLL/KLLAccuracyAndSize.html#:~: + public static final int DEFAULT_KLL_SKETCH_K = 200; + // Whether to rewrite DistinctCount to DistinctCountBitmap public static final String ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY = "enable.distinct.count.bitmap.override";