diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7c6419fef1038..79491875c53dc 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -175,7 +175,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057))
- Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023))
- Performance improvement for date histogram aggregations without sub-aggregations ([#11083](https://github.com/opensearch-project/OpenSearch/pull/11083))
-- Apply the fast filter optimization to composite aggregation ([#11505](https://github.com/opensearch-project/OpenSearch/pull/11083))
+- Apply the fast filter optimization to composite aggregation of date histogram source ([#11505](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087))
- Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528))
- Improved performance of numeric exact-match queries ([#11209](https://github.com/opensearch-project/OpenSearch/pull/11209))
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java
index 59781aff9b822..b63f3db7bbc7b 100644
--- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java
+++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java
@@ -48,6 +48,7 @@
*
date histogram : date range filter.
* Applied: DateHistogramAggregator, AutoDateHistogramAggregator, CompositeAggregator
*
+ *
* @opensearch.internal
*/
public class FastFilterRewriteHelper {
@@ -55,7 +56,7 @@ public class FastFilterRewriteHelper {
private static final int MAX_NUM_FILTER_BUCKETS = 1024;
private static final Map, Function> queryWrappers;
- // Initialize the wrappers map for unwrapping the query
+ // Initialize the wrapper map for unwrapping the query
static {
queryWrappers = new HashMap<>();
queryWrappers.put(ConstantScoreQuery.class, q -> ((ConstantScoreQuery) q).getQuery());
@@ -77,9 +78,9 @@ private static Query unwrapIntoConcreteQuery(Query query) {
}
/**
- * Finds the min and max bounds for segments within the passed search context
+ * Finds the min and max bounds of field values for the shard
*/
- private static long[] getIndexBoundsFromLeaves(final SearchContext context, final String fieldName) throws IOException {
+ private static long[] getIndexBounds(final SearchContext context, final String fieldName) throws IOException {
final List leaves = context.searcher().getIndexReader().leaves();
long min = Long.MAX_VALUE, max = Long.MIN_VALUE;
// Since the query does not specify bounds for aggregation, we can
@@ -103,7 +104,7 @@ private static long[] getIndexBoundsFromLeaves(final SearchContext context, fina
*/
public static long[] getAggregationBounds(final SearchContext context, final String fieldName) throws IOException {
final Query cq = unwrapIntoConcreteQuery(context.query());
- final long[] indexBounds = getIndexBoundsFromLeaves(context, fieldName);
+ final long[] indexBounds = getIndexBounds(context, fieldName);
if (cq instanceof PointRangeQuery) {
final PointRangeQuery prq = (PointRangeQuery) cq;
// Ensure that the query and aggregation are on the same field
@@ -117,8 +118,14 @@ public static long[] getAggregationBounds(final SearchContext context, final Str
} else if (cq instanceof MatchAllDocsQuery) {
return indexBounds;
}
-
- return null;
+ // Check if the top-level query (which may be a PRQ on another field) is functionally match-all
+ Weight weight = context.searcher().createWeight(context.query(), ScoreMode.COMPLETE_NO_SCORES, 1f);
+ for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
+ if (weight.count(ctx) != ctx.reader().numDocs()) {
+ return null;
+ }
+ }
+ return indexBounds;
}
/**
@@ -179,142 +186,135 @@ protected String toString(int dimension, byte[] value) {
return filters;
}
- /**
- * @param computeBounds get the lower and upper bound of the field in a shard search
- * @param roundingFunction produce Rounding that contains interval of date range.
- * Rounding is computed dynamically using the bounds in AutoDateHistogram
- * @param preparedRoundingSupplier produce PreparedRounding to round values at call-time
- */
- public static void buildFastFilter(
- SearchContext context,
- CheckedFunction computeBounds,
- Function roundingFunction,
- Supplier preparedRoundingSupplier,
- FastFilterContext fastFilterContext
- ) throws IOException {
- assert fastFilterContext.fieldType instanceof DateFieldMapper.DateFieldType;
- DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fastFilterContext.fieldType;
- final long[] bounds = computeBounds.apply(fastFilterContext); // TODO b do we need to pass in the context? or specific things
- if (bounds != null) {
- final Rounding rounding = roundingFunction.apply(bounds);
- final OptionalLong intervalOpt = Rounding.getInterval(rounding);
- if (intervalOpt.isEmpty()) {
- return;
- }
- final long interval = intervalOpt.getAsLong();
+ public static class FastFilterContext {
+ private Weight[] filters = null;
+ public AggregationType aggregationType;
- // afterKey is the last bucket key in previous response, while the bucket key
- // is the start of the bucket values, so add the interval
- if (fastFilterContext.afterKey != -1) {
- bounds[0] = fastFilterContext.afterKey + interval;
- }
+ public FastFilterContext() {}
+
+ private void setFilters(Weight[] filters) {
+ this.filters = filters;
+ }
+
+ public void setAggregationType(AggregationType aggregationType) {
+ this.aggregationType = aggregationType;
+ }
- final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations(
- context,
- interval,
- preparedRoundingSupplier.get(),
- fieldType.name(),
- fieldType,
- bounds[0],
- bounds[1]
- );
- fastFilterContext.setFilters(filters);
+ public boolean isRewriteable(final Object parent, final int subAggLength) {
+ return aggregationType.isRewriteable(parent, subAggLength);
+ }
+
+ /**
+ * This filter build method is for date histogram aggregation type
+ *
+ * @param computeBounds get the lower and upper bound of the field in a shard search
+ * @param roundingFunction produce Rounding that contains interval of date range.
+ * Rounding is computed dynamically using the bounds in AutoDateHistogram
+ * @param preparedRoundingSupplier produce PreparedRounding to round values at call-time
+ */
+ public void buildFastFilter(
+ SearchContext context,
+ CheckedFunction computeBounds,
+ Function roundingFunction,
+ Supplier preparedRoundingSupplier
+ ) throws IOException {
+ assert this.aggregationType instanceof DateHistogramAggregationType;
+ DateHistogramAggregationType aggregationType = (DateHistogramAggregationType) this.aggregationType;
+ DateFieldMapper.DateFieldType fieldType = aggregationType.getFieldType();
+ final long[] bounds = computeBounds.apply(aggregationType);
+ if (bounds != null) {
+ final Rounding rounding = roundingFunction.apply(bounds);
+ final OptionalLong intervalOpt = Rounding.getInterval(rounding);
+ if (intervalOpt.isEmpty()) {
+ return;
+ }
+ final long interval = intervalOpt.getAsLong();
+
+ // afterKey is the last bucket key in previous response, while the bucket key
+ // is the start of the bucket values, so add the interval
+ if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) {
+ bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval;
+ }
+
+ final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations(
+ context,
+ interval,
+ preparedRoundingSupplier.get(),
+ fieldType.name(),
+ fieldType,
+ bounds[0],
+ bounds[1]
+ );
+ this.setFilters(filters);
+ }
}
}
/**
- * Encapsulates metadata about a value source needed to rewrite
+ * Different types have different pre-conditions, filter building logic, etc.
*/
- public static class FastFilterContext {
- private boolean missing = false; // TODO b confirm UT that can catch this
- private boolean hasScript = false;
- private boolean showOtherBucket = false;
+ public interface AggregationType {
+ boolean isRewriteable(Object parent, int subAggLength);
+ }
+ public static class DateHistogramAggregationType implements AggregationType {
private final MappedFieldType fieldType;
+ private final boolean missing;
+ private final boolean hasScript;
- private long afterKey = -1L;
- private int size = Integer.MAX_VALUE; // only used by composite aggregation for pagination
- private Weight[] filters = null;
-
- private final Type type;
-
- private RoundingValuesSource valuesSource = null;
-
- public FastFilterContext(MappedFieldType fieldType) {
+ public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) {
this.fieldType = fieldType;
- this.type = Type.DATE_HISTO;
+ this.missing = missing;
+ this.hasScript = hasScript;
}
- public FastFilterContext(CompositeValuesSourceConfig[] sourceConfigs, CompositeKey rawAfterKey, List formats) {
- if (sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource) {
- this.fieldType = sourceConfigs[0].fieldType();
- this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
- this.missing = sourceConfigs[0].missingBucket();
- this.hasScript = sourceConfigs[0].hasScript();
- if (rawAfterKey != null) {
- assert rawAfterKey.size() == 1 && formats.size() == 1;
- this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
- throw new IllegalArgumentException("now() is not supported in [after] key");
- });
- }
- } else {
- this.fieldType = null;
+ @Override
+ public boolean isRewriteable(Object parent, int subAggLength) {
+ if (parent == null && subAggLength == 0 && !missing && !hasScript) {
+ return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType;
}
- this.type = Type.DATE_HISTO;
- }
-
- public FastFilterContext(Type type) {
- this.fieldType = null;
- this.type = type;
+ return false;
}
public DateFieldMapper.DateFieldType getFieldType() {
assert fieldType instanceof DateFieldMapper.DateFieldType;
return (DateFieldMapper.DateFieldType) fieldType;
}
+ }
- public RoundingValuesSource getDateHistogramSource() {
- return valuesSource;
- }
-
- public void setSize(int size) {
+ public static class CompositeAggregationType extends DateHistogramAggregationType {
+ private final RoundingValuesSource valuesSource;
+ private long afterKey = -1L;
+ private final int size;
+
+ public CompositeAggregationType(
+ CompositeValuesSourceConfig[] sourceConfigs,
+ CompositeKey rawAfterKey,
+ List formats,
+ int size
+ ) {
+ super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript());
+ this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
this.size = size;
+ if (rawAfterKey != null) {
+ assert rawAfterKey.size() == 1 && formats.size() == 1;
+ this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
+ throw new IllegalArgumentException("now() is not supported in [after] key");
+ });
+ }
}
- public void setFilters(Weight[] filters) {
- this.filters = filters;
- }
-
- public void setMissing(boolean missing) {
- this.missing = missing;
- }
-
- public void setHasScript(boolean hasScript) {
- this.hasScript = hasScript;
- }
-
- public void setShowOtherBucket(boolean showOtherBucket) {
- this.showOtherBucket = showOtherBucket;
+ public Rounding getRounding() {
+ return valuesSource.getRounding();
}
- public boolean isRewriteable(final Object parent, final int subAggLength) {
- if (parent == null && subAggLength == 0 && !missing && !hasScript) {
- if (type == Type.FILTERS) {
- return !showOtherBucket;
- } else if (type == Type.DATE_HISTO) {
- return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType;
- }
- }
- return false;
+ public Rounding.Prepared getRoundingPreparer() {
+ return valuesSource.getPreparedRounding();
}
+ }
- /**
- * Different types have different pre-conditions, filter building logic, etc.
- */
- public enum Type {
- FILTERS,
- DATE_HISTO
- }
+ public static boolean isCompositeAggRewriteable(CompositeValuesSourceConfig[] sourceConfigs) {
+ return sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource;
}
public static long getBucketOrd(long bucketOrd) {
@@ -326,7 +326,7 @@ public static long getBucketOrd(long bucketOrd) {
}
/**
- * This should be executed for each segment
+ * This is executed for each segment by passing the leaf reader context
*
* @param incrementDocCount takes in the bucket key value and the bucket count
*/
@@ -339,7 +339,6 @@ public static boolean tryFastFilterAggregation(
if (fastFilterContext.filters == null) return false;
final Weight[] filters = fastFilterContext.filters;
- // TODO b refactor the type conversion to the context
final int[] counts = new int[filters.length];
int i;
for (i = 0; i < filters.length; i++) {
@@ -352,18 +351,23 @@ public static boolean tryFastFilterAggregation(
}
int s = 0;
+ int size = Integer.MAX_VALUE;
for (i = 0; i < filters.length; i++) {
if (counts[i] > 0) {
long bucketKey = i; // the index of filters is the key for filters aggregation
- if (fastFilterContext.type == FastFilterContext.Type.DATE_HISTO) {
- final DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fastFilterContext.fieldType;
+ if (fastFilterContext.aggregationType instanceof DateHistogramAggregationType) {
+ final DateFieldMapper.DateFieldType fieldType = ((DateHistogramAggregationType) fastFilterContext.aggregationType)
+ .getFieldType();
bucketKey = fieldType.convertNanosToMillis(
NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
);
+ if (fastFilterContext.aggregationType instanceof CompositeAggregationType) {
+ size = ((CompositeAggregationType) fastFilterContext.aggregationType).size;
+ }
}
incrementDocCount.accept(bucketKey, counts[i]);
s++;
- if (s > fastFilterContext.size) return true;
+ if (s > size) return true;
}
}
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java
index e79e4602a3a2d..36f94dc833a0b 100644
--- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java
+++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java
@@ -163,20 +163,23 @@ final class CompositeAggregator extends BucketsAggregator {
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;
- fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(sourceConfigs, rawAfterKey, formats);
+ fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
+ if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return;
+ fastFilterContext.setAggregationType(
+ new FastFilterRewriteHelper.CompositeAggregationType(sourceConfigs, rawAfterKey, formats, size)
+ );
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
- // Currently the filter rewrite is only supported for date histograms
- RoundingValuesSource dateHistogramSource = fastFilterContext.getDateHistogramSource();
- preparedRounding = dateHistogramSource.getPreparedRounding();
// bucketOrds is the data structure for saving date histogram results
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
- fastFilterContext.setSize(size);
- FastFilterRewriteHelper.buildFastFilter(
+ // Currently the filter rewrite is only supported for date histograms
+ FastFilterRewriteHelper.CompositeAggregationType aggregationType =
+ (FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType;
+ preparedRounding = aggregationType.getRoundingPreparer();
+ fastFilterContext.buildFastFilter(
context,
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
- x -> dateHistogramSource.getRounding(),
- () -> preparedRounding,
- fastFilterContext
+ x -> aggregationType.getRounding(),
+ () -> preparedRounding
);
}
}
@@ -513,9 +516,14 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t
@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
- boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(ctx, fastFilterContext, (key, count) -> {
- incrementBucketDocCount(FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), count);
- });
+ boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(
+ ctx,
+ fastFilterContext,
+ (key, count) -> incrementBucketDocCount(
+ FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))),
+ count
+ )
+ );
if (optimized) throw new CollectionTerminatedException();
finishLeaf();
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java
index 7a0e49c275542..0ea820abbedf4 100644
--- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java
+++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java
@@ -156,18 +156,22 @@ private AutoDateHistogramAggregator(
this.roundingPreparer = roundingPreparer;
this.preparedRounding = prepareRounding(0);
- fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(valuesSourceConfig.fieldType());
- fastFilterContext.setMissing(valuesSourceConfig.missing() != null);
- fastFilterContext.setHasScript(valuesSourceConfig.script() != null);
+ fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
+ fastFilterContext.setAggregationType(
+ new FastFilterRewriteHelper.DateHistogramAggregationType(
+ valuesSourceConfig.fieldType(),
+ valuesSourceConfig.missing() != null,
+ valuesSourceConfig.script() != null
+ )
+ );
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
- FastFilterRewriteHelper.buildFastFilter(
+ fastFilterContext.buildFastFilter(
context,
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
b -> getMinimumRounding(b[0], b[1]),
// Passing prepared rounding as supplier to ensure the correct prepared
// rounding is set as it is done during getMinimumRounding
- () -> preparedRounding,
- fastFilterContext
+ () -> preparedRounding
);
}
}
@@ -222,9 +226,14 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc
return LeafBucketCollector.NO_OP_COLLECTOR;
}
- boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(ctx, fastFilterContext, (key, count) -> {
- incrementBucketDocCount(FastFilterRewriteHelper.getBucketOrd(getBucketOrds().add(0, preparedRounding.round(key))), count);
- });
+ boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(
+ ctx,
+ fastFilterContext,
+ (key, count) -> incrementBucketDocCount(
+ FastFilterRewriteHelper.getBucketOrd(getBucketOrds().add(0, preparedRounding.round(key))),
+ count
+ )
+ );
if (optimized) throw new CollectionTerminatedException();
final SortedNumericDocValues values = valuesSource.longValues(ctx);
diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java
index afcb711a11ba1..b95bd093b82a6 100644
--- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java
+++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java
@@ -115,15 +115,20 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);
- fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(valuesSourceConfig.fieldType());
- fastFilterContext.setMissing(valuesSourceConfig.missing() != null);
- fastFilterContext.setHasScript(valuesSourceConfig.script() != null);
+ fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
+ fastFilterContext.setAggregationType(
+ new FastFilterRewriteHelper.DateHistogramAggregationType(
+ valuesSourceConfig.fieldType(),
+ valuesSourceConfig.missing() != null,
+ valuesSourceConfig.script() != null
+ )
+ );
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
- FastFilterRewriteHelper.buildFastFilter(context, this::computeBounds, x -> rounding, () -> preparedRounding, fastFilterContext);
+ fastFilterContext.buildFastFilter(context, this::computeBounds, x -> rounding, () -> preparedRounding);
}
}
- private long[] computeBounds(final FastFilterRewriteHelper.FastFilterContext fieldContext) throws IOException {
+ private long[] computeBounds(final FastFilterRewriteHelper.DateHistogramAggregationType fieldContext) throws IOException {
final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name());
if (bounds != null) {
// Update min/max limit if user specified any hard bounds
@@ -149,9 +154,14 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
return LeafBucketCollector.NO_OP_COLLECTOR;
}
- boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(ctx, fastFilterContext, (key, count) -> {
- incrementBucketDocCount(FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), count);
- });
+ boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(
+ ctx,
+ fastFilterContext,
+ (key, count) -> incrementBucketDocCount(
+ FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))),
+ count
+ )
+ );
if (optimized) throw new CollectionTerminatedException();
SortedNumericDocValues values = valuesSource.longValues(ctx);