From 5a382f2e7d7bf5c782d77d5e49e97583633f6ff6 Mon Sep 17 00:00:00 2001 From: David Cromberge Date: Fri, 26 Jan 2024 18:21:18 +0000 Subject: [PATCH] Shared aggregations in StarTree (#12164) --- .../function/AggregationFunctionUtils.java | 17 +-- .../pinot/core/startree/StarTreeUtils.java | 2 +- .../executor/StarTreeAggregationExecutor.java | 2 +- .../executor/StarTreeGroupByExecutor.java | 2 +- .../core/startree/v2/BaseStarTreeV2Test.java | 23 ++-- .../v2/DistinctCountRawHLLStarTreeV2Test.java | 58 ++++++++ .../DistinctCountHLLPlusValueAggregator.java | 2 +- .../segment/store/StarTreeIndexReader.java | 29 ++-- .../v2/builder/StarTreeV2BuilderConfig.java | 12 +- .../v2/store/StarTreeIndexMapUtils.java | 25 ++-- .../segment/local/utils/TableConfigUtils.java | 11 ++ .../builder/StarTreeV2BuilderConfigTest.java | 57 +++++++- .../AggregationFunctionColumnPair.java | 42 ++++++ .../spi/index/startree/AggregationSpec.java | 8 ++ .../index/startree/StarTreeV2Metadata.java | 14 +- .../AggregationFunctionColumnPairTest.java | 59 ++++++++ .../startree/StarTreeV2MetadataTest.java | 130 ++++++++++++++++++ 17 files changed, 439 insertions(+), 54 deletions(-) create mode 100644 pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountRawHLLStarTreeV2Test.java create mode 100644 pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/startree/AggregationFunctionColumnPairTest.java create mode 100644 pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2MetadataTest.java diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java index cb0d3179d4eb..48e5d4784cf3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java @@ -60,22 +60,23 @@ private AggregationFunctionUtils() { } /** - * (For Star-Tree) Creates an {@link AggregationFunctionColumnPair} from the {@link AggregationFunction}. Returns - * {@code null} if the {@link AggregationFunction} cannot be represented as an {@link AggregationFunctionColumnPair} - * (e.g. has multiple arguments, argument is not column etc.). + * (For Star-Tree) Creates an {@link AggregationFunctionColumnPair} in stored type from the + * {@link AggregationFunction}. Returns {@code null} if the {@link AggregationFunction} cannot be represented as an + * {@link AggregationFunctionColumnPair} (e.g. has multiple arguments, argument is not column etc.). + * TODO: Allow multiple arguments for aggregation functions, e.g. percentileEst */ @Nullable - public static AggregationFunctionColumnPair getAggregationFunctionColumnPair( - AggregationFunction aggregationFunction) { - AggregationFunctionType aggregationFunctionType = aggregationFunction.getType(); - if (aggregationFunctionType == AggregationFunctionType.COUNT) { + public static AggregationFunctionColumnPair getStoredFunctionColumnPair(AggregationFunction aggregationFunction) { + AggregationFunctionType functionType = aggregationFunction.getType(); + if (functionType == AggregationFunctionType.COUNT) { return AggregationFunctionColumnPair.COUNT_STAR; } List inputExpressions = aggregationFunction.getInputExpressions(); if (inputExpressions.size() == 1) { ExpressionContext inputExpression = inputExpressions.get(0); if (inputExpression.getType() == ExpressionContext.Type.IDENTIFIER) { - return new AggregationFunctionColumnPair(aggregationFunctionType, inputExpression.getIdentifier()); + return new AggregationFunctionColumnPair(AggregationFunctionColumnPair.getStoredType(functionType), + inputExpression.getIdentifier()); } } return null; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java index ceeca782d4c8..f79070ae9f6a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java @@ -64,7 +64,7 @@ public static AggregationFunctionColumnPair[] extractAggregationFunctionPairs( new AggregationFunctionColumnPair[numAggregationFunctions]; for (int i = 0; i < numAggregationFunctions; i++) { AggregationFunctionColumnPair aggregationFunctionColumnPair = - AggregationFunctionUtils.getAggregationFunctionColumnPair(aggregationFunctions[i]); + AggregationFunctionUtils.getStoredFunctionColumnPair(aggregationFunctions[i]); if (aggregationFunctionColumnPair != null) { aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair; } else { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeAggregationExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeAggregationExecutor.java index e6cd8aa15386..6953aabddf5d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeAggregationExecutor.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeAggregationExecutor.java @@ -43,7 +43,7 @@ public StarTreeAggregationExecutor(AggregationFunction[] aggregationFunctions) { _aggregationFunctionColumnPairs = new AggregationFunctionColumnPair[numAggregationFunctions]; for (int i = 0; i < numAggregationFunctions; i++) { _aggregationFunctionColumnPairs[i] = - AggregationFunctionUtils.getAggregationFunctionColumnPair(aggregationFunctions[i]); + AggregationFunctionUtils.getStoredFunctionColumnPair(aggregationFunctions[i]); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeGroupByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeGroupByExecutor.java index 6441fcc98d90..8d7d3dd65fb7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeGroupByExecutor.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/executor/StarTreeGroupByExecutor.java @@ -65,7 +65,7 @@ public StarTreeGroupByExecutor(QueryContext queryContext, AggregationFunction[] _aggregationFunctionColumnPairs = new AggregationFunctionColumnPair[numAggregationFunctions]; for (int i = 0; i < numAggregationFunctions; i++) { _aggregationFunctionColumnPairs[i] = - AggregationFunctionUtils.getAggregationFunctionColumnPair(aggregationFunctions[i]); + AggregationFunctionUtils.getStoredFunctionColumnPair(aggregationFunctions[i]); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java index 63cc2c2bda05..6737fc877c9b 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java @@ -129,16 +129,7 @@ public void setUp() throws Exception { _valueAggregator = getValueAggregator(); _aggregatedValueType = _valueAggregator.getAggregatedValueType(); - AggregationFunctionType aggregationType = _valueAggregator.getAggregationType(); - if (aggregationType == AggregationFunctionType.COUNT) { - _aggregation = "COUNT(*)"; - } else if (aggregationType == AggregationFunctionType.PERCENTILEEST - || aggregationType == AggregationFunctionType.PERCENTILETDIGEST) { - // Append a percentile number for percentile functions - _aggregation = String.format("%s(%s, 50)", aggregationType.getName(), METRIC); - } else { - _aggregation = String.format("%s(%s)", aggregationType.getName(), METRIC); - } + _aggregation = getAggregation(_valueAggregator.getAggregationType()); Schema.SchemaBuilder schemaBuilder = new Schema.SchemaBuilder().addSingleValueDimension(DIMENSION_D1, DataType.INT) .addSingleValueDimension(DIMENSION_D2, DataType.INT); @@ -185,6 +176,18 @@ public void setUp() _starTreeV2 = _indexSegment.getStarTrees().get(0); } + String getAggregation(AggregationFunctionType aggregationType) { + if (aggregationType == AggregationFunctionType.COUNT) { + return "COUNT(*)"; + } else if (aggregationType == AggregationFunctionType.PERCENTILEEST + || aggregationType == AggregationFunctionType.PERCENTILETDIGEST) { + // Append a percentile number for percentile functions + return String.format("%s(%s, 50)", aggregationType.getName(), METRIC); + } else { + return String.format("%s(%s)", aggregationType.getName(), METRIC); + } + } + @Test public void testUnsupportedFilters() { String query = String.format("SELECT %s FROM %s", _aggregation, TABLE_NAME); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountRawHLLStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountRawHLLStarTreeV2Test.java new file mode 100644 index 000000000000..b1772d0e36af --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/DistinctCountRawHLLStarTreeV2Test.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.startree.v2; + +import com.clearspring.analytics.stream.cardinality.HyperLogLog; +import java.util.Collections; +import java.util.Random; +import org.apache.pinot.segment.local.aggregator.DistinctCountHLLValueAggregator; +import org.apache.pinot.segment.local.aggregator.ValueAggregator; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.data.FieldSpec.DataType; + +import static org.testng.Assert.assertEquals; + + +public class DistinctCountRawHLLStarTreeV2Test extends BaseStarTreeV2Test { + + @Override + String getAggregation(AggregationFunctionType aggregationType) { + return "distinctCountRawHLL(m)"; + } + + @Override + ValueAggregator getValueAggregator() { + return new DistinctCountHLLValueAggregator(Collections.emptyList()); + } + + @Override + DataType getRawValueType() { + return DataType.INT; + } + + @Override + Object getRandomRawValue(Random random) { + return random.nextInt(100); + } + + @Override + void assertAggregatedValue(HyperLogLog starTreeResult, HyperLogLog nonStarTreeResult) { + assertEquals(starTreeResult.cardinality(), nonStarTreeResult.cardinality()); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java index 66e38cd151da..86949b42e486 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLPlusValueAggregator.java @@ -54,7 +54,7 @@ public DistinctCountHLLPlusValueAggregator(List arguments) { @Override public AggregationFunctionType getAggregationType() { - return AggregationFunctionType.DISTINCTCOUNTHLL; + return AggregationFunctionType.DISTINCTCOUNTHLLPLUS; } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/StarTreeIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/StarTreeIndexReader.java index 770af1b58bcf..4c10f46143bb 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/StarTreeIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/StarTreeIndexReader.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteOrder; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -56,7 +57,7 @@ public class StarTreeIndexReader implements Closeable { private final int _numStarTrees; // StarTree index can contain multiple index instances, identified by ids like 0, 1, etc. - private final Map> _indexColumnEntries; + private final List> _indexColumnEntries; private PinotDataBuffer _dataBuffer; /** @@ -78,7 +79,7 @@ public StarTreeIndexReader(File segmentDirectory, SegmentMetadataImpl segmentMet _readMode = readMode; _numStarTrees = _segmentMetadata.getStarTreeV2MetadataList().size(); _indexFile = new File(_segmentDirectory, StarTreeV2Constants.INDEX_FILE_NAME); - _indexColumnEntries = new HashMap<>(_numStarTrees); + _indexColumnEntries = new ArrayList<>(_numStarTrees); load(); } @@ -104,27 +105,25 @@ private void load() private void mapBufferEntries(int starTreeId, Map indexMap) { - Map columnEntries = - _indexColumnEntries.computeIfAbsent(starTreeId, k -> new HashMap<>()); + Map columnEntries = new HashMap<>(); + _indexColumnEntries.add(columnEntries); // Load star-tree index. The index tree doesn't have corresponding column name or column index type to create an // IndexKey. As it's a kind of inverted index, we uniquely identify it with index id and inverted index type. - columnEntries.computeIfAbsent(new IndexKey(String.valueOf(starTreeId), StandardIndexes.inverted()), - k -> new StarTreeIndexEntry(indexMap.get(StarTreeIndexMapUtils.STAR_TREE_INDEX_KEY), _dataBuffer, + columnEntries.put(new IndexKey(String.valueOf(starTreeId), StandardIndexes.inverted()), + new StarTreeIndexEntry(indexMap.get(StarTreeIndexMapUtils.STAR_TREE_INDEX_KEY), _dataBuffer, ByteOrder.LITTLE_ENDIAN)); List starTreeMetadataList = _segmentMetadata.getStarTreeV2MetadataList(); StarTreeV2Metadata starTreeMetadata = starTreeMetadataList.get(starTreeId); // Load dimension forward indexes for (String dimension : starTreeMetadata.getDimensionsSplitOrder()) { - IndexKey indexKey = new IndexKey(dimension, StandardIndexes.forward()); - columnEntries.computeIfAbsent(indexKey, k -> new StarTreeIndexEntry( + columnEntries.put(new IndexKey(dimension, StandardIndexes.forward()), new StarTreeIndexEntry( indexMap.get(new StarTreeIndexMapUtils.IndexKey(StarTreeIndexMapUtils.IndexType.FORWARD_INDEX, dimension)), _dataBuffer, ByteOrder.BIG_ENDIAN)); } // Load metric (function-column pair) forward indexes for (AggregationFunctionColumnPair functionColumnPair : starTreeMetadata.getFunctionColumnPairs()) { String metric = functionColumnPair.toColumnName(); - IndexKey indexKey = new IndexKey(metric, StandardIndexes.forward()); - columnEntries.computeIfAbsent(indexKey, k -> new StarTreeIndexEntry( + columnEntries.put(new IndexKey(metric, StandardIndexes.forward()), new StarTreeIndexEntry( indexMap.get(new StarTreeIndexMapUtils.IndexKey(StarTreeIndexMapUtils.IndexType.FORWARD_INDEX, metric)), _dataBuffer, ByteOrder.BIG_ENDIAN)); } @@ -132,12 +131,11 @@ private void mapBufferEntries(int starTreeId, public PinotDataBuffer getBuffer(int starTreeId, String column, IndexType type) throws IOException { - Map columnEntries = _indexColumnEntries.get(starTreeId); - if (columnEntries == null) { + if (_indexColumnEntries.size() <= starTreeId) { throw new RuntimeException( String.format("Could not find StarTree index: %s in segment: %s", starTreeId, _segmentDirectory.toString())); } - StarTreeIndexEntry entry = columnEntries.get(new IndexKey(column, type)); + StarTreeIndexEntry entry = _indexColumnEntries.get(starTreeId).get(new IndexKey(column, type)); if (entry != null && entry._buffer != null) { return entry._buffer; } @@ -147,11 +145,10 @@ public PinotDataBuffer getBuffer(int starTreeId, String column, IndexType type) { - Map columnEntries = _indexColumnEntries.get(starTreeId); - if (columnEntries == null) { + if (_indexColumnEntries.size() <= starTreeId) { return false; } - return columnEntries.containsKey(new IndexKey(column, type)); + return _indexColumnEntries.get(starTreeId).containsKey(new IndexKey(column, type)); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfig.java index cd02bba46261..b11e3b2b24ca 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfig.java @@ -76,16 +76,24 @@ public static StarTreeV2BuilderConfig fromIndexConfig(StarTreeIndexConfig indexC for (String functionColumnPair : indexConfig.getFunctionColumnPairs()) { AggregationFunctionColumnPair aggregationFunctionColumnPair = AggregationFunctionColumnPair.fromColumnName(functionColumnPair); - aggregationSpecs.put(aggregationFunctionColumnPair, AggregationSpec.DEFAULT); + AggregationFunctionColumnPair storedType = + AggregationFunctionColumnPair.resolveToStoredType(aggregationFunctionColumnPair); + // If there is already an equivalent functionColumnPair in the map, do not load another. + // This prevents the duplication of the aggregation when the StarTree is constructed. + aggregationSpecs.putIfAbsent(storedType, AggregationSpec.DEFAULT); } } if (indexConfig.getAggregationConfigs() != null) { for (StarTreeAggregationConfig aggregationConfig : indexConfig.getAggregationConfigs()) { AggregationFunctionColumnPair aggregationFunctionColumnPair = AggregationFunctionColumnPair.fromAggregationConfig(aggregationConfig); + AggregationFunctionColumnPair storedType = + AggregationFunctionColumnPair.resolveToStoredType(aggregationFunctionColumnPair); ChunkCompressionType compressionType = ChunkCompressionType.valueOf(aggregationConfig.getCompressionCodec().name()); - aggregationSpecs.put(aggregationFunctionColumnPair, new AggregationSpec(compressionType)); + // If there is already an equivalent functionColumnPair in the map, do not load another. + // This prevents the duplication of the aggregation when the StarTree is constructed. + aggregationSpecs.putIfAbsent(storedType, new AggregationSpec(compressionType)); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexMapUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexMapUtils.java index 8cbf7d4958dd..793487925396 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexMapUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexMapUtils.java @@ -33,8 +33,10 @@ import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; import org.apache.pinot.spi.env.CommonsConfigurationUtils; + /** * The {@code StarTreeIndexMapUtils} class is a utility class to store/load star-tree index map to/from file. *

@@ -182,24 +184,29 @@ public static List> loadFromInputStream(InputStream in int starTreeId = Integer.parseInt(split[0]); Map indexMap = indexMaps.get(starTreeId); - // Handle the case of column name containing '.' - String column; int columnSplitEndIndex = split.length - 2; - if (columnSplitEndIndex == 2) { - column = split[1]; - } else { - column = StringUtils.join(split, KEY_SEPARATOR, 1, columnSplitEndIndex); - } - IndexType indexType = IndexType.valueOf(split[columnSplitEndIndex]); IndexKey indexKey; if (indexType == IndexType.STAR_TREE) { indexKey = STAR_TREE_INDEX_KEY; } else { + // Handle the case of column name containing '.' + String column; + if (columnSplitEndIndex == 2) { + column = split[1]; + } else { + column = StringUtils.join(split, KEY_SEPARATOR, 1, columnSplitEndIndex); + } + // Convert metric (function-column pair) to stored name for backward-compatibility + if (column.contains(AggregationFunctionColumnPair.DELIMITER)) { + AggregationFunctionColumnPair functionColumnPair = AggregationFunctionColumnPair.fromColumnName(column); + column = AggregationFunctionColumnPair.resolveToStoredType(functionColumnPair).toColumnName(); + } indexKey = new IndexKey(IndexType.FORWARD_INDEX, column); } - IndexValue indexValue = indexMap.computeIfAbsent(indexKey, (k) -> new IndexValue()); + long value = configuration.getLong(key); + IndexValue indexValue = indexMap.computeIfAbsent(indexKey, k -> new IndexValue()); if (split[columnSplitEndIndex + 1].equals(OFFSET_SUFFIX)) { indexValue._offset = value; } else { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java index 8c1809c6a574..60b0bf6809df 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java @@ -1033,6 +1033,7 @@ private static void validateIndexingConfig(IndexingConfig indexingConfig, @Nulla } List starTreeIndexConfigList = indexingConfig.getStarTreeIndexConfigs(); + Set storedTypes = new HashSet<>(); if (starTreeIndexConfigList != null) { for (StarTreeIndexConfig starTreeIndexConfig : starTreeIndexConfigList) { // Dimension split order cannot be null @@ -1049,6 +1050,11 @@ private static void validateIndexingConfig(IndexingConfig indexingConfig, @Nulla throw new IllegalStateException("Invalid StarTreeIndex config: " + functionColumnPair + ". Must be" + "in the form __"); } + AggregationFunctionColumnPair storedType = AggregationFunctionColumnPair.resolveToStoredType(columnPair); + if (!storedTypes.add(storedType)) { + LOGGER.warn("StarTreeIndex config duplication: {} already matches existing function column pair: {}. ", + columnPair, storedType); + } String columnName = columnPair.getColumn(); if (!columnName.equals(AggregationFunctionColumnPair.STAR)) { columnNameToConfigMap.put(columnName, STAR_TREE_CONFIG_NAME); @@ -1064,6 +1070,11 @@ private static void validateIndexingConfig(IndexingConfig indexingConfig, @Nulla } catch (Exception e) { throw new IllegalStateException("Invalid StarTreeIndex config: " + aggregationConfig); } + AggregationFunctionColumnPair storedType = AggregationFunctionColumnPair.resolveToStoredType(columnPair); + if (!storedTypes.add(storedType)) { + LOGGER.warn("StarTreeIndex config duplication: {} already matches existing function column pair: {}. ", + columnPair, storedType); + } String columnName = columnPair.getColumn(); if (!columnName.equals(AggregationFunctionColumnPair.STAR)) { columnNameToConfigMap.put(columnName, STAR_TREE_CONFIG_NAME); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfigTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfigTest.java index 115eb71311dc..7004f901b331 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfigTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/startree/v2/builder/StarTreeV2BuilderConfigTest.java @@ -19,13 +19,21 @@ package org.apache.pinot.segment.local.startree.v2.builder; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.Constants; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; +import org.apache.pinot.segment.spi.index.startree.AggregationSpec; +import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec; +import org.apache.pinot.spi.config.table.StarTreeAggregationConfig; +import org.apache.pinot.spi.config.table.StarTreeIndexConfig; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.TimeGranularitySpec; @@ -83,12 +91,57 @@ public void testDefaultConfig() { // No column should be skipped for star-node creation assertTrue(defaultConfig.getSkipStarNodeCreationForDimensions().isEmpty()); // Should have COUNT(*) and SUM(m1) as the function column pairs - assertEquals(defaultConfig.getFunctionColumnPairs(), new HashSet<>(Arrays - .asList(AggregationFunctionColumnPair.COUNT_STAR, + assertEquals(defaultConfig.getFunctionColumnPairs(), new HashSet<>( + Arrays.asList(AggregationFunctionColumnPair.COUNT_STAR, new AggregationFunctionColumnPair(AggregationFunctionType.SUM, "m1")))); assertEquals(defaultConfig.getMaxLeafRecords(), StarTreeV2BuilderConfig.DEFAULT_MAX_LEAF_RECORDS); } + @Test + public void testBuildFromIndexConfig() { + List aggregationConfigs = + List.of(new StarTreeAggregationConfig("m1", "SUM", CompressionCodec.LZ4)); + StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(List.of("d1"), null, null, aggregationConfigs, 1); + StarTreeV2BuilderConfig builderConfig = StarTreeV2BuilderConfig.fromIndexConfig(starTreeIndexConfig); + assertEquals(builderConfig.getMaxLeafRecords(), 1); + assertEquals(builderConfig.getDimensionsSplitOrder(), List.of("d1")); + assertEquals(builderConfig.getFunctionColumnPairs(), + Set.of(new AggregationFunctionColumnPair(AggregationFunctionType.SUM, "m1"))); + assertTrue(builderConfig.getSkipStarNodeCreationForDimensions().isEmpty()); + assertEquals(builderConfig.getAggregationSpecs().values(), + Collections.singleton(new AggregationSpec(ChunkCompressionType.LZ4))); + } + + @Test + public void testAggregationSpecUniqueness() { + List aggregationConfigs = + List.of(new StarTreeAggregationConfig("m1", "distinctCountThetaSketch", CompressionCodec.LZ4), + new StarTreeAggregationConfig("m1", "distinctCountRawThetaSketch", CompressionCodec.LZ4)); + StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(List.of("d1"), null, null, aggregationConfigs, 1); + StarTreeV2BuilderConfig builderConfig = StarTreeV2BuilderConfig.fromIndexConfig(starTreeIndexConfig); + assertEquals(builderConfig.getMaxLeafRecords(), 1); + assertEquals(builderConfig.getDimensionsSplitOrder(), List.of("d1")); + assertEquals(builderConfig.getFunctionColumnPairs(), + Set.of(new AggregationFunctionColumnPair(AggregationFunctionType.DISTINCTCOUNTTHETASKETCH, "m1"))); + assertTrue(builderConfig.getSkipStarNodeCreationForDimensions().isEmpty()); + assertEquals(builderConfig.getAggregationSpecs().values(), + Collections.singleton(new AggregationSpec(ChunkCompressionType.LZ4))); + } + + @Test + public void testFunctionColumnPairUniqueness() { + List functionColumnPairs = List.of("distinctCountThetaSketch__m1", "distinctCountRawThetaSketch__m1"); + StarTreeIndexConfig starTreeIndexConfig = + new StarTreeIndexConfig(List.of("d1"), null, functionColumnPairs, null, 1); + StarTreeV2BuilderConfig builderConfig = StarTreeV2BuilderConfig.fromIndexConfig(starTreeIndexConfig); + assertEquals(builderConfig.getMaxLeafRecords(), 1); + assertEquals(builderConfig.getDimensionsSplitOrder(), List.of("d1")); + assertEquals(builderConfig.getFunctionColumnPairs(), + Set.of(new AggregationFunctionColumnPair(AggregationFunctionType.DISTINCTCOUNTTHETASKETCH, "m1"))); + assertTrue(builderConfig.getSkipStarNodeCreationForDimensions().isEmpty()); + assertEquals(builderConfig.getAggregationSpecs().values(), Collections.singleton(AggregationSpec.DEFAULT)); + } + private ColumnMetadata getColumnMetadata(String column, boolean hasDictionary, int cardinality) { ColumnMetadata columnMetadata = mock(ColumnMetadata.class); when(columnMetadata.getColumnName()).thenReturn(column); diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/AggregationFunctionColumnPair.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/AggregationFunctionColumnPair.java index 3ba3583c5257..31c9c744c57b 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/AggregationFunctionColumnPair.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/AggregationFunctionColumnPair.java @@ -66,6 +66,48 @@ public static AggregationFunctionColumnPair fromAggregationConfig(StarTreeAggreg return fromFunctionAndColumnName(aggregationConfig.getAggregationFunction(), aggregationConfig.getColumnName()); } + /** + * Return a new {@code AggregationFunctionColumnPair} from an existing functionColumnPair where the new pair + * has the {@link AggregationFunctionType} set to the underlying stored type used in the segment or indexes. + * @param functionColumnPair the existing functionColumnPair + * @return the new functionColumnPair + */ + public static AggregationFunctionColumnPair resolveToStoredType(AggregationFunctionColumnPair functionColumnPair) { + AggregationFunctionType storedType = getStoredType(functionColumnPair.getFunctionType()); + return new AggregationFunctionColumnPair(storedType, functionColumnPair.getColumn()); + } + + /** + * Returns the stored {@code AggregationFunctionType} used to create the underlying value in the segment or index. + * Some aggregation functions share the same stored type but are used for different purposes in queries. + * @param aggregationType the aggregation type used in a query + * @return the underlying value aggregation type used in storage e.g. StarTree index + */ + public static AggregationFunctionType getStoredType(AggregationFunctionType aggregationType) { + switch (aggregationType) { + case DISTINCTCOUNTRAWHLL: + return AggregationFunctionType.DISTINCTCOUNTHLL; + case PERCENTILERAWEST: + return AggregationFunctionType.PERCENTILEEST; + case PERCENTILERAWTDIGEST: + return AggregationFunctionType.PERCENTILETDIGEST; + case DISTINCTCOUNTRAWTHETASKETCH: + return AggregationFunctionType.DISTINCTCOUNTTHETASKETCH; + case DISTINCTCOUNTRAWHLLPLUS: + return AggregationFunctionType.DISTINCTCOUNTHLLPLUS; + case DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH: + case AVGVALUEINTEGERSUMTUPLESKETCH: + case SUMVALUESINTEGERSUMTUPLESKETCH: + return AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH; + case DISTINCTCOUNTRAWCPCSKETCH: + return AggregationFunctionType.DISTINCTCOUNTCPCSKETCH; + case DISTINCTCOUNTRAWULL: + return AggregationFunctionType.DISTINCTCOUNTULL; + default: + return aggregationType; + } + } + private static AggregationFunctionColumnPair fromFunctionAndColumnName(String functionName, String columnName) { AggregationFunctionType functionType = AggregationFunctionType.getAggregationFunctionType(functionName); if (functionType == AggregationFunctionType.COUNT) { diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/AggregationSpec.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/AggregationSpec.java index e9f038fa4a66..f5b7770061e2 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/AggregationSpec.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/AggregationSpec.java @@ -18,6 +18,8 @@ */ package org.apache.pinot.segment.spi.index.startree; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; @@ -50,4 +52,10 @@ public boolean equals(Object o) { public int hashCode() { return _compressionType.hashCode(); } + + @Override + public String toString() { + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("compressionType", _compressionType) + .toString(); + } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2Metadata.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2Metadata.java index 4e53ad376397..38cd7f2b9391 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2Metadata.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2Metadata.java @@ -52,17 +52,25 @@ public StarTreeV2Metadata(Configuration metadataProperties) { AggregationFunctionType functionType = AggregationFunctionType.getAggregationFunctionType(aggregationConfig.getString(MetadataKey.FUNCTION_TYPE)); String columnName = aggregationConfig.getString(MetadataKey.COLUMN_NAME); + AggregationFunctionColumnPair functionColumnPair = new AggregationFunctionColumnPair(functionType, columnName); + // Lookup the stored aggregation type + AggregationFunctionColumnPair storedType = + AggregationFunctionColumnPair.resolveToStoredType(functionColumnPair); ChunkCompressionType compressionType = ChunkCompressionType.valueOf(aggregationConfig.getString(MetadataKey.COMPRESSION_CODEC)); - _aggregationSpecs.put(new AggregationFunctionColumnPair(functionType, columnName), - new AggregationSpec(compressionType)); + // If there is already an equivalent functionColumnPair in the map for the stored type, do not load another. + _aggregationSpecs.putIfAbsent(storedType, new AggregationSpec(compressionType)); } } else { // Backward compatibility with columnName format for (String functionColumnPairName : metadataProperties.getStringArray(MetadataKey.FUNCTION_COLUMN_PAIRS)) { AggregationFunctionColumnPair functionColumnPair = AggregationFunctionColumnPair.fromColumnName(functionColumnPairName); - _aggregationSpecs.put(functionColumnPair, AggregationSpec.DEFAULT); + // Lookup the stored aggregation type + AggregationFunctionColumnPair storedType = + AggregationFunctionColumnPair.resolveToStoredType(functionColumnPair); + // If there is already an equivalent functionColumnPair in the map for the stored type, do not load another. + _aggregationSpecs.putIfAbsent(storedType, AggregationSpec.DEFAULT); } } _maxLeafRecords = metadataProperties.getInt(MetadataKey.MAX_LEAF_RECORDS); diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/startree/AggregationFunctionColumnPairTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/startree/AggregationFunctionColumnPairTest.java new file mode 100644 index 000000000000..18a8534a7b58 --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/startree/AggregationFunctionColumnPairTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.index.startree; + +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.testng.annotations.Test; + +import static org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair.getStoredType; +import static org.testng.AssertJUnit.assertEquals; + + +public class AggregationFunctionColumnPairTest { + + @Test + public void testResolveToStoredType() { + assertEquals(AggregationFunctionColumnPair.fromColumnName("distinctCountThetaSketch__dimX"), + AggregationFunctionColumnPair.resolveToStoredType( + AggregationFunctionColumnPair.fromColumnName("distinctCountRawThetaSketch__dimX"))); + assertEquals(AggregationFunctionColumnPair.fromColumnName("count__*"), + AggregationFunctionColumnPair.resolveToStoredType(AggregationFunctionColumnPair.fromColumnName("count__*"))); + assertEquals(AggregationFunctionColumnPair.fromColumnName("sum__dimY"), + AggregationFunctionColumnPair.resolveToStoredType(AggregationFunctionColumnPair.fromColumnName("sum__dimY"))); + } + + @Test + public void testGetStoredType() { + assertEquals(getStoredType(AggregationFunctionType.DISTINCTCOUNTRAWHLL), AggregationFunctionType.DISTINCTCOUNTHLL); + assertEquals(getStoredType(AggregationFunctionType.PERCENTILERAWTDIGEST), + AggregationFunctionType.PERCENTILETDIGEST); + assertEquals(getStoredType(AggregationFunctionType.DISTINCTCOUNTRAWTHETASKETCH), + AggregationFunctionType.DISTINCTCOUNTTHETASKETCH); + assertEquals(getStoredType(AggregationFunctionType.DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH), + AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH); + assertEquals(getStoredType(AggregationFunctionType.SUMVALUESINTEGERSUMTUPLESKETCH), + AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH); + assertEquals(getStoredType(AggregationFunctionType.AVGVALUEINTEGERSUMTUPLESKETCH), + AggregationFunctionType.DISTINCTCOUNTTUPLESKETCH); + assertEquals(getStoredType(AggregationFunctionType.DISTINCTCOUNTHLLPLUS), + AggregationFunctionType.DISTINCTCOUNTHLLPLUS); + // Default case + assertEquals(getStoredType(AggregationFunctionType.COUNT), AggregationFunctionType.COUNT); + } +} diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2MetadataTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2MetadataTest.java new file mode 100644 index 000000000000..270c68b8808d --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/index/startree/StarTreeV2MetadataTest.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.index.startree; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import org.apache.commons.configuration2.Configuration; +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.testng.annotations.Test; + +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; + + +public class StarTreeV2MetadataTest { + + @Test + public void testUniqueAggregationSpecs() { + TreeMap expected = new TreeMap<>(); + expected.put(AggregationFunctionColumnPair.fromColumnName("count__*"), AggregationSpec.DEFAULT); + expected.put(AggregationFunctionColumnPair.fromColumnName("sum__dimX"), AggregationSpec.DEFAULT); + + Configuration configuration = createConfiguration(Collections.singletonList("dimX"), null, expected); + StarTreeV2Metadata starTreeV2Metadata = new StarTreeV2Metadata(configuration); + TreeMap actual = starTreeV2Metadata.getAggregationSpecs(); + assertEquals(expected, actual); + } + + @Test + public void testDuplicateAggregationSpecs() { + AggregationFunctionColumnPair thetaColumnPair = + AggregationFunctionColumnPair.fromColumnName("distinctCountThetaSketch__dimX"); + AggregationFunctionColumnPair rawThetaColumnPair = + AggregationFunctionColumnPair.fromColumnName("distinctCountRawThetaSketch__dimX"); + + TreeMap expected = new TreeMap<>(); + expected.put(thetaColumnPair, AggregationSpec.DEFAULT); + expected.put(rawThetaColumnPair, AggregationSpec.DEFAULT); + + Configuration configuration = createConfiguration(Collections.singletonList("dimX"), null, expected); + StarTreeV2Metadata starTreeV2Metadata = new StarTreeV2Metadata(configuration); + TreeMap actual = starTreeV2Metadata.getAggregationSpecs(); + expected.remove(rawThetaColumnPair); + assertEquals(expected, actual); + assertTrue(starTreeV2Metadata.containsFunctionColumnPair(thetaColumnPair)); + } + + @Test + public void testUniqueFunctionColumnPairs() { + Set expected = new HashSet<>(); + expected.add(AggregationFunctionColumnPair.fromColumnName("count__*")); + expected.add(AggregationFunctionColumnPair.fromColumnName("sum__dimX")); + + Configuration configuration = createConfiguration(Collections.singletonList("dimX"), expected, null); + StarTreeV2Metadata starTreeV2Metadata = new StarTreeV2Metadata(configuration); + Set actual = starTreeV2Metadata.getFunctionColumnPairs(); + assertEquals(expected, actual); + } + + @Test + public void testDuplicateFunctionColumnPairs() { + AggregationFunctionColumnPair thetaColumnPair = + AggregationFunctionColumnPair.fromColumnName("distinctCountThetaSketch__dimX"); + AggregationFunctionColumnPair rawThetaColumnPair = + AggregationFunctionColumnPair.fromColumnName("distinctCountRawThetaSketch__dimX"); + + Set expected = new HashSet<>(); + expected.add(thetaColumnPair); + expected.add(rawThetaColumnPair); + + Configuration configuration = createConfiguration(Collections.singletonList("dimX"), expected, null); + StarTreeV2Metadata starTreeV2Metadata = new StarTreeV2Metadata(configuration); + Set actual = starTreeV2Metadata.getFunctionColumnPairs(); + + expected.remove(rawThetaColumnPair); + assertEquals(expected, actual); + assertTrue(starTreeV2Metadata.containsFunctionColumnPair(thetaColumnPair)); + } + + private static Configuration createConfiguration(List dimensionsSplitOrder, + Set functionColumnPairs, + TreeMap aggregationSpecs) { + Configuration metadataProperties = new PropertiesConfiguration(); + metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.TOTAL_DOCS, 1); + metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.DIMENSIONS_SPLIT_ORDER, dimensionsSplitOrder); + if (functionColumnPairs != null) { + metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.FUNCTION_COLUMN_PAIRS, functionColumnPairs); + metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.AGGREGATION_COUNT, 0); + } else { + metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.AGGREGATION_COUNT, aggregationSpecs.size()); + int index = 0; + for (Map.Entry entry : aggregationSpecs.entrySet()) { + AggregationFunctionColumnPair functionColumnPair = entry.getKey(); + AggregationSpec aggregationSpec = entry.getValue(); + String prefix = StarTreeV2Constants.MetadataKey.AGGREGATION_PREFIX + index + '.'; + metadataProperties.setProperty(prefix + StarTreeV2Constants.MetadataKey.FUNCTION_TYPE, + functionColumnPair.getFunctionType().getName()); + metadataProperties.setProperty(prefix + StarTreeV2Constants.MetadataKey.COLUMN_NAME, + functionColumnPair.getColumn()); + metadataProperties.setProperty(prefix + StarTreeV2Constants.MetadataKey.COMPRESSION_CODEC, + aggregationSpec.getCompressionType()); + index++; + } + } + metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.MAX_LEAF_RECORDS, 10000); + metadataProperties.setProperty(StarTreeV2Constants.MetadataKey.SKIP_STAR_NODE_CREATION_FOR_DIMENSIONS, + new HashSet<>()); + return metadataProperties; + } +}