Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenlan-amzn committed Jan 16, 2024
1 parent 81d01fa commit 3414219
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 149 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057))
- Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023))
- Performance improvement for date histogram aggregations without sub-aggregations ([#11083](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Apply the fast filter optimization to composite aggregation ([#11505](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Apply the fast filter optimization to composite aggregation of date histogram source ([#11505](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087))
- Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528))
- Improved performance of numeric exact-match queries ([#11209](https://github.com/opensearch-project/OpenSearch/pull/11209))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,15 @@
* <li> date histogram : date range filter.
* Applied: DateHistogramAggregator, AutoDateHistogramAggregator, CompositeAggregator </li>
* </ul>
*
* @opensearch.internal
*/
public class FastFilterRewriteHelper {

private static final int MAX_NUM_FILTER_BUCKETS = 1024;
private static final Map<Class<?>, Function<Query, Query>> queryWrappers;

// Initialize the wrappers map for unwrapping the query
// Initialize the wrapper map for unwrapping the query
static {
queryWrappers = new HashMap<>();
queryWrappers.put(ConstantScoreQuery.class, q -> ((ConstantScoreQuery) q).getQuery());
Expand All @@ -77,9 +78,9 @@ private static Query unwrapIntoConcreteQuery(Query query) {
}

/**
* Finds the min and max bounds for segments within the passed search context
* Finds the min and max bounds of field values for the shard
*/
private static long[] getIndexBoundsFromLeaves(final SearchContext context, final String fieldName) throws IOException {
private static long[] getIndexBounds(final SearchContext context, final String fieldName) throws IOException {
final List<LeafReaderContext> leaves = context.searcher().getIndexReader().leaves();
long min = Long.MAX_VALUE, max = Long.MIN_VALUE;
// Since the query does not specify bounds for aggregation, we can
Expand All @@ -103,7 +104,7 @@ private static long[] getIndexBoundsFromLeaves(final SearchContext context, fina
*/
public static long[] getAggregationBounds(final SearchContext context, final String fieldName) throws IOException {
final Query cq = unwrapIntoConcreteQuery(context.query());
final long[] indexBounds = getIndexBoundsFromLeaves(context, fieldName);
final long[] indexBounds = getIndexBounds(context, fieldName);
if (cq instanceof PointRangeQuery) {
final PointRangeQuery prq = (PointRangeQuery) cq;
// Ensure that the query and aggregation are on the same field
Expand All @@ -117,8 +118,14 @@ public static long[] getAggregationBounds(final SearchContext context, final Str
} else if (cq instanceof MatchAllDocsQuery) {
return indexBounds;
}

return null;
// Check if the top-level query (which may be a PRQ on another field) is functionally match-all
Weight weight = context.searcher().createWeight(context.query(), ScoreMode.COMPLETE_NO_SCORES, 1f);
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
if (weight.count(ctx) != ctx.reader().numDocs()) {
return null;
}
}
return indexBounds;
}

/**
Expand Down Expand Up @@ -179,142 +186,135 @@ protected String toString(int dimension, byte[] value) {
return filters;
}

/**
* @param computeBounds get the lower and upper bound of the field in a shard search
* @param roundingFunction produce Rounding that contains interval of date range.
* Rounding is computed dynamically using the bounds in AutoDateHistogram
* @param preparedRoundingSupplier produce PreparedRounding to round values at call-time
*/
public static void buildFastFilter(
SearchContext context,
CheckedFunction<FastFilterContext, long[], IOException> computeBounds,
Function<long[], Rounding> roundingFunction,
Supplier<Rounding.Prepared> preparedRoundingSupplier,
FastFilterContext fastFilterContext
) throws IOException {
assert fastFilterContext.fieldType instanceof DateFieldMapper.DateFieldType;
DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fastFilterContext.fieldType;
final long[] bounds = computeBounds.apply(fastFilterContext); // TODO b do we need to pass in the context? or specific things
if (bounds != null) {
final Rounding rounding = roundingFunction.apply(bounds);
final OptionalLong intervalOpt = Rounding.getInterval(rounding);
if (intervalOpt.isEmpty()) {
return;
}
final long interval = intervalOpt.getAsLong();
public static class FastFilterContext {
private Weight[] filters = null;
public AggregationType aggregationType;

// afterKey is the last bucket key in previous response, while the bucket key
// is the start of the bucket values, so add the interval
if (fastFilterContext.afterKey != -1) {
bounds[0] = fastFilterContext.afterKey + interval;
}
public FastFilterContext() {}

private void setFilters(Weight[] filters) {
this.filters = filters;
}

public void setAggregationType(AggregationType aggregationType) {
this.aggregationType = aggregationType;
}

final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations(
context,
interval,
preparedRoundingSupplier.get(),
fieldType.name(),
fieldType,
bounds[0],
bounds[1]
);
fastFilterContext.setFilters(filters);
public boolean isRewriteable(final Object parent, final int subAggLength) {
return aggregationType.isRewriteable(parent, subAggLength);
}

/**
* This filter build method is for date histogram aggregation type
*
* @param computeBounds get the lower and upper bound of the field in a shard search
* @param roundingFunction produce Rounding that contains interval of date range.
* Rounding is computed dynamically using the bounds in AutoDateHistogram
* @param preparedRoundingSupplier produce PreparedRounding to round values at call-time
*/
public void buildFastFilter(
SearchContext context,
CheckedFunction<DateHistogramAggregationType, long[], IOException> computeBounds,
Function<long[], Rounding> roundingFunction,
Supplier<Rounding.Prepared> preparedRoundingSupplier
) throws IOException {
assert this.aggregationType instanceof DateHistogramAggregationType;
DateHistogramAggregationType aggregationType = (DateHistogramAggregationType) this.aggregationType;
DateFieldMapper.DateFieldType fieldType = aggregationType.getFieldType();
final long[] bounds = computeBounds.apply(aggregationType);
if (bounds != null) {
final Rounding rounding = roundingFunction.apply(bounds);
final OptionalLong intervalOpt = Rounding.getInterval(rounding);
if (intervalOpt.isEmpty()) {
return;
}
final long interval = intervalOpt.getAsLong();

// afterKey is the last bucket key in previous response, while the bucket key
// is the start of the bucket values, so add the interval
if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) {
bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval;
}

final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations(
context,
interval,
preparedRoundingSupplier.get(),
fieldType.name(),
fieldType,
bounds[0],
bounds[1]
);
this.setFilters(filters);
}
}
}

/**
* Encapsulates metadata about a value source needed to rewrite
* Different types have different pre-conditions, filter building logic, etc.
*/
public static class FastFilterContext {
private boolean missing = false; // TODO b confirm UT that can catch this
private boolean hasScript = false;
private boolean showOtherBucket = false;
public interface AggregationType {
boolean isRewriteable(Object parent, int subAggLength);
}

public static class DateHistogramAggregationType implements AggregationType {
private final MappedFieldType fieldType;
private final boolean missing;
private final boolean hasScript;

private long afterKey = -1L;
private int size = Integer.MAX_VALUE; // only used by composite aggregation for pagination
private Weight[] filters = null;

private final Type type;

private RoundingValuesSource valuesSource = null;

public FastFilterContext(MappedFieldType fieldType) {
public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) {
this.fieldType = fieldType;
this.type = Type.DATE_HISTO;
this.missing = missing;
this.hasScript = hasScript;
}

public FastFilterContext(CompositeValuesSourceConfig[] sourceConfigs, CompositeKey rawAfterKey, List<DocValueFormat> formats) {
if (sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource) {
this.fieldType = sourceConfigs[0].fieldType();
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
this.missing = sourceConfigs[0].missingBucket();
this.hasScript = sourceConfigs[0].hasScript();
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}
} else {
this.fieldType = null;
@Override
public boolean isRewriteable(Object parent, int subAggLength) {
if (parent == null && subAggLength == 0 && !missing && !hasScript) {
return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType;
}
this.type = Type.DATE_HISTO;
}

public FastFilterContext(Type type) {
this.fieldType = null;
this.type = type;
return false;
}

public DateFieldMapper.DateFieldType getFieldType() {
assert fieldType instanceof DateFieldMapper.DateFieldType;
return (DateFieldMapper.DateFieldType) fieldType;
}
}

public RoundingValuesSource getDateHistogramSource() {
return valuesSource;
}

public void setSize(int size) {
public static class CompositeAggregationType extends DateHistogramAggregationType {
private final RoundingValuesSource valuesSource;
private long afterKey = -1L;
private final int size;

public CompositeAggregationType(
CompositeValuesSourceConfig[] sourceConfigs,
CompositeKey rawAfterKey,
List<DocValueFormat> formats,
int size
) {
super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript());
this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
this.size = size;
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
throw new IllegalArgumentException("now() is not supported in [after] key");
});
}
}

public void setFilters(Weight[] filters) {
this.filters = filters;
}

public void setMissing(boolean missing) {
this.missing = missing;
}

public void setHasScript(boolean hasScript) {
this.hasScript = hasScript;
}

public void setShowOtherBucket(boolean showOtherBucket) {
this.showOtherBucket = showOtherBucket;
public Rounding getRounding() {
return valuesSource.getRounding();
}

public boolean isRewriteable(final Object parent, final int subAggLength) {
if (parent == null && subAggLength == 0 && !missing && !hasScript) {
if (type == Type.FILTERS) {
return !showOtherBucket;
} else if (type == Type.DATE_HISTO) {
return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType;
}
}
return false;
public Rounding.Prepared getRoundingPreparer() {
return valuesSource.getPreparedRounding();
}
}

/**
* Different types have different pre-conditions, filter building logic, etc.
*/
public enum Type {
FILTERS,
DATE_HISTO
}
public static boolean isCompositeAggRewriteable(CompositeValuesSourceConfig[] sourceConfigs) {
return sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource;
}

public static long getBucketOrd(long bucketOrd) {
Expand All @@ -326,7 +326,7 @@ public static long getBucketOrd(long bucketOrd) {
}

/**
* This should be executed for each segment
* This is executed for each segment by passing the leaf reader context
*
* @param incrementDocCount takes in the bucket key value and the bucket count
*/
Expand All @@ -339,7 +339,6 @@ public static boolean tryFastFilterAggregation(
if (fastFilterContext.filters == null) return false;

final Weight[] filters = fastFilterContext.filters;
// TODO b refactor the type conversion to the context
final int[] counts = new int[filters.length];
int i;
for (i = 0; i < filters.length; i++) {
Expand All @@ -352,18 +351,23 @@ public static boolean tryFastFilterAggregation(
}

int s = 0;
int size = Integer.MAX_VALUE;
for (i = 0; i < filters.length; i++) {
if (counts[i] > 0) {
long bucketKey = i; // the index of filters is the key for filters aggregation
if (fastFilterContext.type == FastFilterContext.Type.DATE_HISTO) {
final DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fastFilterContext.fieldType;
if (fastFilterContext.aggregationType instanceof DateHistogramAggregationType) {
final DateFieldMapper.DateFieldType fieldType = ((DateHistogramAggregationType) fastFilterContext.aggregationType)
.getFieldType();
bucketKey = fieldType.convertNanosToMillis(
NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
);
if (fastFilterContext.aggregationType instanceof CompositeAggregationType) {
size = ((CompositeAggregationType) fastFilterContext.aggregationType).size;
}
}
incrementDocCount.accept(bucketKey, counts[i]);
s++;
if (s > fastFilterContext.size) return true;
if (s > size) return true;
}
}

Expand Down
Loading

0 comments on commit 3414219

Please sign in to comment.