Skip to content

Commit

Permalink
scaled float support for star tree
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <[email protected]>
  • Loading branch information
sarthakaggarwal97 committed Aug 28, 2024
1 parent 5e44976 commit 71f3349
Show file tree
Hide file tree
Showing 33 changed files with 307 additions and 280 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@
import java.util.function.Supplier;

/** A {@link FieldMapper} for scaled floats. Values are internally multiplied
* by a scaling factor and rounded to the closest long. */
* by a scaling factor and rounded to the closest long.
*/
public class ScaledFloatFieldMapper extends ParametrizedFieldMapper {

public static final String CONTENT_TYPE = "scaled_float";
Expand Down Expand Up @@ -162,11 +163,21 @@ public ScaledFloatFieldMapper build(BuilderContext context) {
);
return new ScaledFloatFieldMapper(name, type, multiFieldsBuilder.build(this, context), copyTo.build(), this);
}

@Override
public boolean isDataCubeDimensionSupported() {
return true;
}

@Override
public boolean isDataCubeMetricSupported() {
return true;
}
}

public static final TypeParser PARSER = new TypeParser((n, c) -> new Builder(n, c.getSettings()));

public static final class ScaledFloatFieldType extends SimpleMappedFieldType implements NumericPointEncoder {
public static final class ScaledFloatFieldType extends SimpleMappedFieldType implements NumericPointEncoder, NumericFieldConverter {

private final double scalingFactor;
private final Double nullValue;
Expand Down Expand Up @@ -340,6 +351,12 @@ public DocValueFormat docValueFormat(String format, ZoneId timeZone) {
private double scale(Object input) {
return new BigDecimal(Double.toString(parse(input))).multiply(BigDecimal.valueOf(scalingFactor)).doubleValue();
}

@Override
public double toDoubleValue(Long value) {
double inverseScalingFactor = 1d / scalingFactor;
return value * inverseScalingFactor;
}
}

private final Explicit<Boolean> ignoreMalformed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.opensearch.index.compositeindex.datacube.startree.StarTreeIndexSettings;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.Mapper;
import org.opensearch.index.mapper.NumberFieldMapper;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -57,7 +56,7 @@ public static Dimension parseAndCreateDimension(
) {
if (builder instanceof DateFieldMapper.Builder) {
return parseAndCreateDateDimension(name, dimensionMap, c);
} else if (builder instanceof NumberFieldMapper.Builder) {
} else if (builder.isDataCubeDimensionSupported()) {
return new NumericDimension(name);
}
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@
*/
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.mapper.NumberFieldMapper;

/**
* Count value aggregator for star tree
*
* @opensearch.experimental
*/
class CountValueAggregator implements ValueAggregator<Long> {
public class CountValueAggregator implements ValueAggregator<Long> {

public static final long DEFAULT_INITIAL_VALUE = 1L;
private static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.LONG;
private static final NumberFieldMapper.NumberType VALUE_AGGREGATOR_TYPE = NumberFieldMapper.NumberType.LONG;

public CountValueAggregator() {}

@Override
public StarTreeNumericType getAggregatedValueType() {
public NumberFieldMapper.NumberType getAggregatedValueType() {
return VALUE_AGGREGATOR_TYPE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.mapper.NumberFieldMapper;

/**
* Aggregator to handle '_doc_count' field
Expand All @@ -17,12 +17,12 @@
*/
public class DocCountAggregator implements ValueAggregator<Long> {

private static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.LONG;
private static final NumberFieldMapper.NumberType VALUE_AGGREGATOR_TYPE = NumberFieldMapper.NumberType.LONG;

public DocCountAggregator() {}

@Override
public StarTreeNumericType getAggregatedValueType() {
public NumberFieldMapper.NumberType getAggregatedValueType() {
return VALUE_AGGREGATOR_TYPE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.mapper.NumericFieldConverter;

/**
* Max value aggregator for star tree
Expand All @@ -16,8 +16,8 @@
*/
class MaxValueAggregator extends StatelessDoubleValueAggregator {

public MaxValueAggregator(StarTreeNumericType starTreeNumericType) {
super(starTreeNumericType, null);
public MaxValueAggregator(NumericFieldConverter numericFieldConverter) {
super(numericFieldConverter, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.fielddata.IndexNumericFieldData;
import org.opensearch.index.mapper.NumericFieldConverter;

import java.util.Comparator;
import java.util.Objects;
Expand All @@ -27,15 +26,15 @@ public class MetricAggregatorInfo implements Comparable<MetricAggregatorInfo> {
private final MetricStat metricStat;
private final String field;
private final ValueAggregator valueAggregators;
private final StarTreeNumericType starTreeNumericType;
private final NumericFieldConverter numericFieldConverter;

/**
* Constructor for MetricAggregatorInfo
*/
public MetricAggregatorInfo(MetricStat metricStat, String field, String starFieldName, IndexNumericFieldData.NumericType numericType) {
public MetricAggregatorInfo(MetricStat metricStat, String field, String starFieldName, NumericFieldConverter numericFieldConverter) {
this.metricStat = metricStat;
this.starTreeNumericType = StarTreeNumericType.fromNumericType(numericType);
this.valueAggregators = ValueAggregatorFactory.getValueAggregator(metricStat, this.starTreeNumericType);
this.numericFieldConverter = numericFieldConverter;
this.valueAggregators = ValueAggregatorFactory.getValueAggregator(metricStat, this.numericFieldConverter);
this.field = field;
this.starFieldName = starFieldName;
this.metric = toFieldName();
Expand Down Expand Up @@ -72,8 +71,8 @@ public ValueAggregator getValueAggregators() {
/**
* @return star tree aggregated value type
*/
public StarTreeNumericType getAggregatedValueType() {
return starTreeNumericType;
public NumericFieldConverter getNumericFieldConverter() {
return numericFieldConverter;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.mapper.NumericFieldConverter;

/**
* Min value aggregator for star tree
Expand All @@ -16,8 +16,8 @@
*/
class MinValueAggregator extends StatelessDoubleValueAggregator {

public MinValueAggregator(StarTreeNumericType starTreeNumericType) {
super(starTreeNumericType, null);
public MinValueAggregator(NumericFieldConverter numericFieldConverter) {
super(numericFieldConverter, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
*/
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.mapper.NumberFieldMapper;
import org.opensearch.index.mapper.NumericFieldConverter;

/**
* This is an abstract class that defines the common methods for all double value aggregators
Expand All @@ -17,17 +18,17 @@
*/
abstract class StatelessDoubleValueAggregator implements ValueAggregator<Double> {

protected final StarTreeNumericType starTreeNumericType;
protected final NumericFieldConverter numericFieldConverter;
protected final Double identityValue;
private static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.DOUBLE;
private static final NumberFieldMapper.NumberType VALUE_AGGREGATOR_TYPE = NumberFieldMapper.NumberType.DOUBLE;

public StatelessDoubleValueAggregator(StarTreeNumericType starTreeNumericType, Double identityValue) {
this.starTreeNumericType = starTreeNumericType;
public StatelessDoubleValueAggregator(NumericFieldConverter numericFieldConverter, Double identityValue) {
this.numericFieldConverter = numericFieldConverter;
this.identityValue = identityValue;
}

@Override
public StarTreeNumericType getAggregatedValueType() {
public NumberFieldMapper.NumberType getAggregatedValueType() {
return VALUE_AGGREGATOR_TYPE;
}

Expand All @@ -36,7 +37,7 @@ public Double getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue)
if (segmentDocValue == null) {
return getIdentityMetricValue();
}
return starTreeNumericType.getDoubleValue(segmentDocValue);
return numericFieldConverter.toDoubleValue(segmentDocValue);
}

@Override
Expand All @@ -57,7 +58,7 @@ public Double toAggregatedValueType(Long value) {
if (value == null) {
return getIdentityMetricValue();
}
return VALUE_AGGREGATOR_TYPE.getDoubleValue(value);
return VALUE_AGGREGATOR_TYPE.toDoubleValue(value);
} catch (Exception e) {
throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
*/
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.mapper.NumberFieldMapper;
import org.opensearch.index.mapper.NumericFieldConverter;
import org.opensearch.search.aggregations.metrics.CompensatedSum;

/**
Expand All @@ -22,17 +23,17 @@
*/
class SumValueAggregator implements ValueAggregator<Double> {

private final StarTreeNumericType starTreeNumericType;
private static final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.DOUBLE;
private final NumericFieldConverter numericFieldConverter;
private static final NumberFieldMapper.NumberType VALUE_AGGREGATOR_TYPE = NumberFieldMapper.NumberType.DOUBLE;

private CompensatedSum kahanSummation = new CompensatedSum(0, 0);

public SumValueAggregator(StarTreeNumericType starTreeNumericType) {
this.starTreeNumericType = starTreeNumericType;
public SumValueAggregator(NumericFieldConverter numericFieldConverter) {
this.numericFieldConverter = numericFieldConverter;
}

@Override
public StarTreeNumericType getAggregatedValueType() {
public NumberFieldMapper.NumberType getAggregatedValueType() {
return VALUE_AGGREGATOR_TYPE;
}

Expand All @@ -41,7 +42,7 @@ public Double getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue)
kahanSummation.reset(0, 0);
// add takes care of the sum and compensation internally
if (segmentDocValue != null) {
kahanSummation.add(starTreeNumericType.getDoubleValue(segmentDocValue));
kahanSummation.add(numericFieldConverter.toDoubleValue(segmentDocValue));
} else {
kahanSummation.add(getIdentityMetricValue());
}
Expand All @@ -55,7 +56,7 @@ public Double mergeAggregatedValueAndSegmentValue(Double value, Long segmentDocV
assert value == null || kahanSummation.value() == value;
// add takes care of the sum and compensation internally
if (segmentDocValue != null) {
kahanSummation.add(starTreeNumericType.getDoubleValue(segmentDocValue));
kahanSummation.add(numericFieldConverter.toDoubleValue(segmentDocValue));
} else {
kahanSummation.add(getIdentityMetricValue());
}
Expand Down Expand Up @@ -92,7 +93,7 @@ public Double toAggregatedValueType(Long value) {
if (value == null) {
return getIdentityMetricValue();
}
return VALUE_AGGREGATOR_TYPE.getDoubleValue(value);
return VALUE_AGGREGATOR_TYPE.toDoubleValue(value);
} catch (Exception e) {
throw new IllegalStateException("Cannot convert " + value + " to sortable aggregation type", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.mapper.NumberFieldMapper;

/**
* A value aggregator that pre-aggregates on the input values for a specific type of aggregation.
Expand All @@ -19,7 +19,7 @@ public interface ValueAggregator<A> {
/**
* Returns the data type of the aggregated value.
*/
StarTreeNumericType getAggregatedValueType();
NumberFieldMapper.NumberType getAggregatedValueType();

/**
* Returns the initial aggregated value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;
import org.opensearch.index.mapper.NumericFieldConverter;

/**
* Value aggregator factory for a given aggregation type
Expand All @@ -22,20 +22,20 @@ private ValueAggregatorFactory() {}
* Returns a new instance of value aggregator for the given aggregation type.
*
* @param aggregationType Aggregation type
* @param starTreeNumericType Numeric type associated with star tree field ( as specified in index mapping )
* @param numericFieldConverter Numeric type converter associated with star tree field ( as specified in index mapping )
* @return Value aggregator
*/
public static ValueAggregator getValueAggregator(MetricStat aggregationType, StarTreeNumericType starTreeNumericType) {
public static ValueAggregator getValueAggregator(MetricStat aggregationType, NumericFieldConverter numericFieldConverter) {
switch (aggregationType) {
// avg aggregator will be covered in the part of query (using count and sum)
case SUM:
return new SumValueAggregator(starTreeNumericType);
return new SumValueAggregator(numericFieldConverter);
case VALUE_COUNT:
return new CountValueAggregator();
case MIN:
return new MinValueAggregator(starTreeNumericType);
return new MinValueAggregator(numericFieldConverter);
case MAX:
return new MaxValueAggregator(starTreeNumericType);
return new MaxValueAggregator(numericFieldConverter);
case DOC_COUNT:
return new DocCountAggregator();
default:
Expand Down
Loading

0 comments on commit 71f3349

Please sign in to comment.