Skip to content

Commit

Permalink
Add support for KLL sketch aggregation in minion jobs (apache#14702)
Browse files Browse the repository at this point in the history
Co-authored-by: Raghav Agrawal <[email protected]>
  • Loading branch information
raghavagrawal and Raghav Agrawal authored Jan 16, 2025
1 parent a0433aa commit f37bc3d
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public static class RealtimeToOfflineSegmentsTask extends MergeTask {
DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH,
SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH, DISTINCTCOUNTHLLPLUS,
DISTINCTCOUNTRAWHLLPLUS, DISTINCTCOUNTCPCSKETCH, DISTINCTCOUNTRAWCPCSKETCH, DISTINCTCOUNTULL,
DISTINCTCOUNTRAWULL);
DISTINCTCOUNTRAWULL, PERCENTILEKLL, PERCENTILERAWKLL);
}

// Generate segment and push to controller based on batch ingestion configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.CommonConstants;


/**
Expand Down Expand Up @@ -62,7 +63,6 @@
*/
public class PercentileKLLAggregationFunction
extends NullableSingleInputAggregationFunction<KllDoublesSketch, Comparable<?>> {
protected static final int DEFAULT_K_VALUE = 200;

protected final double _percentile;
protected int _kValue;
Expand All @@ -79,7 +79,9 @@ public PercentileKLLAggregationFunction(List<ExpressionContext> arguments, boole
Preconditions.checkArgument(_percentile >= 0 && _percentile <= 100,
"Percentile value needs to be in range 0-100, inclusive");

_kValue = numArguments == 3 ? arguments.get(2).getLiteral().getIntValue() : DEFAULT_K_VALUE;
_kValue = (numArguments == 3)
? arguments.get(2).getLiteral().getIntValue()
: CommonConstants.Helix.DEFAULT_KLL_SKETCH_K;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.segment.processing.aggregator;

import java.util.Map;
import org.apache.datasketches.common.SketchesArgumentException;
import org.apache.datasketches.kll.KllDoublesSketch;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.spi.utils.CommonConstants;


/**
* Class to merge KLL doubles sketch for minion merge/rollup tasks.
*/
public class PercentileKLLSketchAggregator implements ValueAggregator {

/**
* Given two kll doubles sketches, return the aggregated kll doubles sketches
* @return aggregated sketch given two kll doubles sketches
*/
@Override
public Object aggregate(Object value1, Object value2, Map<String, String> functionParameters) {
try {
String kParam = functionParameters.get(Constants.KLL_DOUBLE_SKETCH_K);

int sketchKValue;

// Check if nominal entries values match
if (kParam != null) {
sketchKValue = Integer.parseInt(kParam);
} else {
// If the functionParameters don't have an explicit K use the default value for K
sketchKValue = CommonConstants.Helix.DEFAULT_KLL_SKETCH_K;
}

KllDoublesSketch first = ObjectSerDeUtils.KLL_SKETCH_SER_DE.deserialize((byte[]) value1);
KllDoublesSketch second = ObjectSerDeUtils.KLL_SKETCH_SER_DE.deserialize((byte[]) value2);
KllDoublesSketch union = KllDoublesSketch.newHeapInstance(sketchKValue);
union.merge(first);
union.merge(second);
return ObjectSerDeUtils.KLL_SKETCH_SER_DE.serialize(union);
} catch (SketchesArgumentException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public static ValueAggregator getValueAggregator(AggregationFunctionType aggrega
case DISTINCTCOUNTULL:
case DISTINCTCOUNTRAWULL:
return new DistinctCountULLAggregator();
case PERCENTILEKLL:
case PERCENTILERAWKLL:
return new PercentileKLLSketchAggregator();
default:
throw new IllegalStateException("Unsupported aggregation type: " + aggregationType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ private Constants() {
public static final String THETA_TUPLE_SKETCH_SAMPLING_PROBABILITY = "samplingProbability";
public static final String PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY = "compressionFactor";
public static final String SUMPRECISION_PRECISION_KEY = "precision";
public static final String KLL_DOUBLE_SKETCH_K = "K";
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ public static class Helix {
public static final int DEFAULT_CPC_SKETCH_LGK = 12;
public static final int DEFAULT_ULTRALOGLOG_P = 12;

// K is set to 200, for tradeoffs see datasketches library documentation:
// https://datasketches.apache.org/docs/KLL/KLLAccuracyAndSize.html#:~:
public static final int DEFAULT_KLL_SKETCH_K = 200;

// Whether to rewrite DistinctCount to DistinctCountBitmap
public static final String ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY = "enable.distinct.count.bitmap.override";

Expand Down

0 comments on commit f37bc3d

Please sign in to comment.