From 589d03cf8543fa9728ccb8ad5520f75e02ad9b05 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Mon, 15 Jan 2024 16:22:28 -0800 Subject: [PATCH] address comments Signed-off-by: bowenlan-amzn --- CHANGELOG.md | 2 +- .../bucket/FastFilterRewriteHelper.java | 242 +++++++++--------- .../bucket/composite/CompositeAggregator.java | 32 ++- .../AutoDateHistogramAggregator.java | 27 +- .../histogram/DateHistogramAggregator.java | 26 +- 5 files changed, 180 insertions(+), 149 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c6419fef1038..79491875c53dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -175,7 +175,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057)) - Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023)) - Performance improvement for date histogram aggregations without sub-aggregations ([#11083](https://github.com/opensearch-project/OpenSearch/pull/11083)) -- Apply the fast filter optimization to composite aggregation ([#11505](https://github.com/opensearch-project/OpenSearch/pull/11083)) +- Apply the fast filter optimization to composite aggregation of date histogram source ([#11505](https://github.com/opensearch-project/OpenSearch/pull/11083)) - Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087)) - Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528)) - Improved performance of numeric exact-match queries ([#11209](https://github.com/opensearch-project/OpenSearch/pull/11209)) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java index 59781aff9b822..b63f3db7bbc7b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/FastFilterRewriteHelper.java @@ -48,6 +48,7 @@ *
  • date histogram : date range filter. * Applied: DateHistogramAggregator, AutoDateHistogramAggregator, CompositeAggregator
  • * + * * @opensearch.internal */ public class FastFilterRewriteHelper { @@ -55,7 +56,7 @@ public class FastFilterRewriteHelper { private static final int MAX_NUM_FILTER_BUCKETS = 1024; private static final Map, Function> queryWrappers; - // Initialize the wrappers map for unwrapping the query + // Initialize the wrapper map for unwrapping the query static { queryWrappers = new HashMap<>(); queryWrappers.put(ConstantScoreQuery.class, q -> ((ConstantScoreQuery) q).getQuery()); @@ -77,9 +78,9 @@ private static Query unwrapIntoConcreteQuery(Query query) { } /** - * Finds the min and max bounds for segments within the passed search context + * Finds the min and max bounds of field values for the shard */ - private static long[] getIndexBoundsFromLeaves(final SearchContext context, final String fieldName) throws IOException { + private static long[] getIndexBounds(final SearchContext context, final String fieldName) throws IOException { final List leaves = context.searcher().getIndexReader().leaves(); long min = Long.MAX_VALUE, max = Long.MIN_VALUE; // Since the query does not specify bounds for aggregation, we can @@ -103,7 +104,7 @@ private static long[] getIndexBoundsFromLeaves(final SearchContext context, fina */ public static long[] getAggregationBounds(final SearchContext context, final String fieldName) throws IOException { final Query cq = unwrapIntoConcreteQuery(context.query()); - final long[] indexBounds = getIndexBoundsFromLeaves(context, fieldName); + final long[] indexBounds = getIndexBounds(context, fieldName); if (cq instanceof PointRangeQuery) { final PointRangeQuery prq = (PointRangeQuery) cq; // Ensure that the query and aggregation are on the same field @@ -117,8 +118,14 @@ public static long[] getAggregationBounds(final SearchContext context, final Str } else if (cq instanceof MatchAllDocsQuery) { return indexBounds; } - - return null; + // Check if the top-level query (which may be a PRQ on another field) is functionally match-all + Weight weight = context.searcher().createWeight(context.query(), ScoreMode.COMPLETE_NO_SCORES, 1f); + for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) { + if (weight.count(ctx) != ctx.reader().numDocs()) { + return null; + } + } + return indexBounds; } /** @@ -179,142 +186,135 @@ protected String toString(int dimension, byte[] value) { return filters; } - /** - * @param computeBounds get the lower and upper bound of the field in a shard search - * @param roundingFunction produce Rounding that contains interval of date range. - * Rounding is computed dynamically using the bounds in AutoDateHistogram - * @param preparedRoundingSupplier produce PreparedRounding to round values at call-time - */ - public static void buildFastFilter( - SearchContext context, - CheckedFunction computeBounds, - Function roundingFunction, - Supplier preparedRoundingSupplier, - FastFilterContext fastFilterContext - ) throws IOException { - assert fastFilterContext.fieldType instanceof DateFieldMapper.DateFieldType; - DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fastFilterContext.fieldType; - final long[] bounds = computeBounds.apply(fastFilterContext); // TODO b do we need to pass in the context? or specific things - if (bounds != null) { - final Rounding rounding = roundingFunction.apply(bounds); - final OptionalLong intervalOpt = Rounding.getInterval(rounding); - if (intervalOpt.isEmpty()) { - return; - } - final long interval = intervalOpt.getAsLong(); + public static class FastFilterContext { + private Weight[] filters = null; + public AggregationType aggregationType; - // afterKey is the last bucket key in previous response, while the bucket key - // is the start of the bucket values, so add the interval - if (fastFilterContext.afterKey != -1) { - bounds[0] = fastFilterContext.afterKey + interval; - } + public FastFilterContext() {} + + private void setFilters(Weight[] filters) { + this.filters = filters; + } + + public void setAggregationType(AggregationType aggregationType) { + this.aggregationType = aggregationType; + } - final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations( - context, - interval, - preparedRoundingSupplier.get(), - fieldType.name(), - fieldType, - bounds[0], - bounds[1] - ); - fastFilterContext.setFilters(filters); + public boolean isRewriteable(final Object parent, final int subAggLength) { + return aggregationType.isRewriteable(parent, subAggLength); + } + + /** + * This filter build method is for date histogram aggregation type + * + * @param computeBounds get the lower and upper bound of the field in a shard search + * @param roundingFunction produce Rounding that contains interval of date range. + * Rounding is computed dynamically using the bounds in AutoDateHistogram + * @param preparedRoundingSupplier produce PreparedRounding to round values at call-time + */ + public void buildFastFilter( + SearchContext context, + CheckedFunction computeBounds, + Function roundingFunction, + Supplier preparedRoundingSupplier + ) throws IOException { + assert this.aggregationType instanceof DateHistogramAggregationType; + DateHistogramAggregationType aggregationType = (DateHistogramAggregationType) this.aggregationType; + DateFieldMapper.DateFieldType fieldType = aggregationType.getFieldType(); + final long[] bounds = computeBounds.apply(aggregationType); + if (bounds != null) { + final Rounding rounding = roundingFunction.apply(bounds); + final OptionalLong intervalOpt = Rounding.getInterval(rounding); + if (intervalOpt.isEmpty()) { + return; + } + final long interval = intervalOpt.getAsLong(); + + // afterKey is the last bucket key in previous response, while the bucket key + // is the start of the bucket values, so add the interval + if (aggregationType instanceof CompositeAggregationType && ((CompositeAggregationType) aggregationType).afterKey != -1) { + bounds[0] = ((CompositeAggregationType) aggregationType).afterKey + interval; + } + + final Weight[] filters = FastFilterRewriteHelper.createFilterForAggregations( + context, + interval, + preparedRoundingSupplier.get(), + fieldType.name(), + fieldType, + bounds[0], + bounds[1] + ); + this.setFilters(filters); + } } } /** - * Encapsulates metadata about a value source needed to rewrite + * Different types have different pre-conditions, filter building logic, etc. */ - public static class FastFilterContext { - private boolean missing = false; // TODO b confirm UT that can catch this - private boolean hasScript = false; - private boolean showOtherBucket = false; + public interface AggregationType { + boolean isRewriteable(Object parent, int subAggLength); + } + public static class DateHistogramAggregationType implements AggregationType { private final MappedFieldType fieldType; + private final boolean missing; + private final boolean hasScript; - private long afterKey = -1L; - private int size = Integer.MAX_VALUE; // only used by composite aggregation for pagination - private Weight[] filters = null; - - private final Type type; - - private RoundingValuesSource valuesSource = null; - - public FastFilterContext(MappedFieldType fieldType) { + public DateHistogramAggregationType(MappedFieldType fieldType, boolean missing, boolean hasScript) { this.fieldType = fieldType; - this.type = Type.DATE_HISTO; + this.missing = missing; + this.hasScript = hasScript; } - public FastFilterContext(CompositeValuesSourceConfig[] sourceConfigs, CompositeKey rawAfterKey, List formats) { - if (sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource) { - this.fieldType = sourceConfigs[0].fieldType(); - this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource(); - this.missing = sourceConfigs[0].missingBucket(); - this.hasScript = sourceConfigs[0].hasScript(); - if (rawAfterKey != null) { - assert rawAfterKey.size() == 1 && formats.size() == 1; - this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> { - throw new IllegalArgumentException("now() is not supported in [after] key"); - }); - } - } else { - this.fieldType = null; + @Override + public boolean isRewriteable(Object parent, int subAggLength) { + if (parent == null && subAggLength == 0 && !missing && !hasScript) { + return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType; } - this.type = Type.DATE_HISTO; - } - - public FastFilterContext(Type type) { - this.fieldType = null; - this.type = type; + return false; } public DateFieldMapper.DateFieldType getFieldType() { assert fieldType instanceof DateFieldMapper.DateFieldType; return (DateFieldMapper.DateFieldType) fieldType; } + } - public RoundingValuesSource getDateHistogramSource() { - return valuesSource; - } - - public void setSize(int size) { + public static class CompositeAggregationType extends DateHistogramAggregationType { + private final RoundingValuesSource valuesSource; + private long afterKey = -1L; + private final int size; + + public CompositeAggregationType( + CompositeValuesSourceConfig[] sourceConfigs, + CompositeKey rawAfterKey, + List formats, + int size + ) { + super(sourceConfigs[0].fieldType(), sourceConfigs[0].missingBucket(), sourceConfigs[0].hasScript()); + this.valuesSource = (RoundingValuesSource) sourceConfigs[0].valuesSource(); this.size = size; + if (rawAfterKey != null) { + assert rawAfterKey.size() == 1 && formats.size() == 1; + this.afterKey = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> { + throw new IllegalArgumentException("now() is not supported in [after] key"); + }); + } } - public void setFilters(Weight[] filters) { - this.filters = filters; - } - - public void setMissing(boolean missing) { - this.missing = missing; - } - - public void setHasScript(boolean hasScript) { - this.hasScript = hasScript; - } - - public void setShowOtherBucket(boolean showOtherBucket) { - this.showOtherBucket = showOtherBucket; + public Rounding getRounding() { + return valuesSource.getRounding(); } - public boolean isRewriteable(final Object parent, final int subAggLength) { - if (parent == null && subAggLength == 0 && !missing && !hasScript) { - if (type == Type.FILTERS) { - return !showOtherBucket; - } else if (type == Type.DATE_HISTO) { - return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType; - } - } - return false; + public Rounding.Prepared getRoundingPreparer() { + return valuesSource.getPreparedRounding(); } + } - /** - * Different types have different pre-conditions, filter building logic, etc. - */ - public enum Type { - FILTERS, - DATE_HISTO - } + public static boolean isCompositeAggRewriteable(CompositeValuesSourceConfig[] sourceConfigs) { + return sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource; } public static long getBucketOrd(long bucketOrd) { @@ -326,7 +326,7 @@ public static long getBucketOrd(long bucketOrd) { } /** - * This should be executed for each segment + * This is executed for each segment by passing the leaf reader context * * @param incrementDocCount takes in the bucket key value and the bucket count */ @@ -339,7 +339,6 @@ public static boolean tryFastFilterAggregation( if (fastFilterContext.filters == null) return false; final Weight[] filters = fastFilterContext.filters; - // TODO b refactor the type conversion to the context final int[] counts = new int[filters.length]; int i; for (i = 0; i < filters.length; i++) { @@ -352,18 +351,23 @@ public static boolean tryFastFilterAggregation( } int s = 0; + int size = Integer.MAX_VALUE; for (i = 0; i < filters.length; i++) { if (counts[i] > 0) { long bucketKey = i; // the index of filters is the key for filters aggregation - if (fastFilterContext.type == FastFilterContext.Type.DATE_HISTO) { - final DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fastFilterContext.fieldType; + if (fastFilterContext.aggregationType instanceof DateHistogramAggregationType) { + final DateFieldMapper.DateFieldType fieldType = ((DateHistogramAggregationType) fastFilterContext.aggregationType) + .getFieldType(); bucketKey = fieldType.convertNanosToMillis( NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0) ); + if (fastFilterContext.aggregationType instanceof CompositeAggregationType) { + size = ((CompositeAggregationType) fastFilterContext.aggregationType).size; + } } incrementDocCount.accept(bucketKey, counts[i]); s++; - if (s > fastFilterContext.size) return true; + if (s > size) return true; } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index e79e4602a3a2d..36f94dc833a0b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -163,20 +163,23 @@ final class CompositeAggregator extends BucketsAggregator { this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey); this.rawAfterKey = rawAfterKey; - fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(sourceConfigs, rawAfterKey, formats); + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return; + fastFilterContext.setAggregationType( + new FastFilterRewriteHelper.CompositeAggregationType(sourceConfigs, rawAfterKey, formats, size) + ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - // Currently the filter rewrite is only supported for date histograms - RoundingValuesSource dateHistogramSource = fastFilterContext.getDateHistogramSource(); - preparedRounding = dateHistogramSource.getPreparedRounding(); // bucketOrds is the data structure for saving date histogram results bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE); - fastFilterContext.setSize(size); - FastFilterRewriteHelper.buildFastFilter( + // Currently the filter rewrite is only supported for date histograms + FastFilterRewriteHelper.CompositeAggregationType aggregationType = + (FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType; + preparedRounding = aggregationType.getRoundingPreparer(); + fastFilterContext.buildFastFilter( context, fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()), - x -> dateHistogramSource.getRounding(), - () -> preparedRounding, - fastFilterContext + x -> aggregationType.getRounding(), + () -> preparedRounding ); } } @@ -513,9 +516,14 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t @Override protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { - boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(ctx, fastFilterContext, (key, count) -> { - incrementBucketDocCount(FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), count); - }); + boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation( + ctx, + fastFilterContext, + (key, count) -> incrementBucketDocCount( + FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), + count + ) + ); if (optimized) throw new CollectionTerminatedException(); finishLeaf(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 7a0e49c275542..0ea820abbedf4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -156,18 +156,22 @@ private AutoDateHistogramAggregator( this.roundingPreparer = roundingPreparer; this.preparedRounding = prepareRounding(0); - fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(valuesSourceConfig.fieldType()); - fastFilterContext.setMissing(valuesSourceConfig.missing() != null); - fastFilterContext.setHasScript(valuesSourceConfig.script() != null); + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext.setAggregationType( + new FastFilterRewriteHelper.DateHistogramAggregationType( + valuesSourceConfig.fieldType(), + valuesSourceConfig.missing() != null, + valuesSourceConfig.script() != null + ) + ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - FastFilterRewriteHelper.buildFastFilter( + fastFilterContext.buildFastFilter( context, fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()), b -> getMinimumRounding(b[0], b[1]), // Passing prepared rounding as supplier to ensure the correct prepared // rounding is set as it is done during getMinimumRounding - () -> preparedRounding, - fastFilterContext + () -> preparedRounding ); } } @@ -222,9 +226,14 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc return LeafBucketCollector.NO_OP_COLLECTOR; } - boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(ctx, fastFilterContext, (key, count) -> { - incrementBucketDocCount(FastFilterRewriteHelper.getBucketOrd(getBucketOrds().add(0, preparedRounding.round(key))), count); - }); + boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation( + ctx, + fastFilterContext, + (key, count) -> incrementBucketDocCount( + FastFilterRewriteHelper.getBucketOrd(getBucketOrds().add(0, preparedRounding.round(key))), + count + ) + ); if (optimized) throw new CollectionTerminatedException(); final SortedNumericDocValues values = valuesSource.longValues(ctx); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index afcb711a11ba1..b95bd093b82a6 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -115,15 +115,20 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality); - fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(valuesSourceConfig.fieldType()); - fastFilterContext.setMissing(valuesSourceConfig.missing() != null); - fastFilterContext.setHasScript(valuesSourceConfig.script() != null); + fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(); + fastFilterContext.setAggregationType( + new FastFilterRewriteHelper.DateHistogramAggregationType( + valuesSourceConfig.fieldType(), + valuesSourceConfig.missing() != null, + valuesSourceConfig.script() != null + ) + ); if (fastFilterContext.isRewriteable(parent, subAggregators.length)) { - FastFilterRewriteHelper.buildFastFilter(context, this::computeBounds, x -> rounding, () -> preparedRounding, fastFilterContext); + fastFilterContext.buildFastFilter(context, this::computeBounds, x -> rounding, () -> preparedRounding); } } - private long[] computeBounds(final FastFilterRewriteHelper.FastFilterContext fieldContext) throws IOException { + private long[] computeBounds(final FastFilterRewriteHelper.DateHistogramAggregationType fieldContext) throws IOException { final long[] bounds = FastFilterRewriteHelper.getAggregationBounds(context, fieldContext.getFieldType().name()); if (bounds != null) { // Update min/max limit if user specified any hard bounds @@ -149,9 +154,14 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol return LeafBucketCollector.NO_OP_COLLECTOR; } - boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(ctx, fastFilterContext, (key, count) -> { - incrementBucketDocCount(FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), count); - }); + boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation( + ctx, + fastFilterContext, + (key, count) -> incrementBucketDocCount( + FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), + count + ) + ); if (optimized) throw new CollectionTerminatedException(); SortedNumericDocValues values = valuesSource.longValues(ctx);