diff --git a/CHANGELOG.md b/CHANGELOG.md index 504eba04993a2..cf69c3e2d5dbc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -120,6 +120,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583)) - Introduce a new setting `index.check_pending_flush.enabled` to expose the ability to disable the check for pending flushes by write threads ([#12710](https://github.com/opensearch-project/OpenSearch/pull/12710)) - Built-in secure transports support ([#12435](https://github.com/opensearch-project/OpenSearch/pull/12435)) +- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697)) ### Dependencies - Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java index 5a38ba670f1dc..a743f22a2ff77 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.List; +import static org.opensearch.indices.IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; @@ -50,23 +51,25 @@ public void setupSuiteScopeCluster() throws Exception { assertAcked( prepareCreate( "idx", - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), false) ).setMapping("type", "type=keyword", "num", "type=integer", "score", "type=integer") ); waitForRelocation(ClusterHealthStatus.GREEN); - client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5").get(); - client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50").get(); - refresh("idx"); - client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2").get(); - client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20").get(); - refresh("idx"); - client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10").get(); - client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15").get(); - refresh("idx"); - client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1").get(); - client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100").get(); - refresh("idx"); + indexRandom( + true, + client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5"), + client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50"), + client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2"), + client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20"), + client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10"), + client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15"), + client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1"), + client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100") + ); waitForRelocation(ClusterHealthStatus.GREEN); refresh(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java index 0bb2d1d7ca933..94e9ce5063277 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java @@ -15,9 +15,9 @@ import org.opensearch.search.query.ReduceableSearchResult; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Objects; /** * Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global @@ -56,17 +56,9 @@ public String getCollectorReason() { @Override public ReduceableSearchResult reduce(Collection collectors) throws IOException { - final List aggregators = context.bucketCollectorProcessor().toAggregators(collectors); - final List internals = new ArrayList<>(aggregators.size()); + final List internals = context.bucketCollectorProcessor().toInternalAggregations(collectors); + assert internals.stream().noneMatch(Objects::isNull); context.aggregations().resetBucketMultiConsumer(); - for (Aggregator aggregator : aggregators) { - try { - // post collection is called in ContextIndexSearcher after search on leaves are completed - internals.add(aggregator.buildTopLevel()); - } catch (IOException e) { - throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e); - } - } final InternalAggregations internalAggregations = InternalAggregations.from(internals); return buildAggregationResult(internalAggregations); diff --git a/server/src/main/java/org/opensearch/search/aggregations/Aggregator.java b/server/src/main/java/org/opensearch/search/aggregations/Aggregator.java index 8744d1f6a07d3..f4db8f61bf537 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/Aggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/Aggregator.java @@ -33,6 +33,7 @@ package org.opensearch.search.aggregations; import org.opensearch.OpenSearchParseException; +import org.opensearch.common.SetOnce; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.lease.Releasable; import org.opensearch.core.ParseField; @@ -61,6 +62,8 @@ @PublicApi(since = "1.0.0") public abstract class Aggregator extends BucketCollector implements Releasable { + private final SetOnce internalAggregation = new SetOnce<>(); + /** * Parses the aggregation request and creates the appropriate aggregator factory for it. * @@ -83,6 +86,13 @@ public interface Parser { AggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException; } + /** + * Returns the InternalAggregation stored during post collection + */ + public InternalAggregation getPostCollectionAggregation() { + return internalAggregation.get(); + } + /** * Return the name of this aggregator. */ @@ -185,13 +195,15 @@ public interface BucketComparator { /** * Build the result of this aggregation if it is at the "top level" - * of the aggregation tree. If, instead, it is a sub-aggregation of - * another aggregation then the aggregation that contains it will call - * {@link #buildAggregations(long[])}. + * of the aggregation tree and save it. This should get called + * during post collection. If, instead, it is a sub-aggregation + * of another aggregation then the aggregation that contains + * it will call {@link #buildAggregations(long[])}. */ public final InternalAggregation buildTopLevel() throws IOException { assert parent() == null; - return buildAggregations(new long[] { 0 })[0]; + this.internalAggregation.set(buildAggregations(new long[] { 0 })[0]); + return internalAggregation.get(); } /** diff --git a/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java index 135fda71a757a..1dcb3fb8597f2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java @@ -22,6 +22,7 @@ import java.util.Deque; import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.Queue; /** @@ -72,6 +73,15 @@ public void processPostCollection(Collector collectorTree) throws IOException { } } else if (currentCollector instanceof BucketCollector) { ((BucketCollector) currentCollector).postCollection(); + + // Perform build aggregation during post collection + if (currentCollector instanceof Aggregator) { + ((Aggregator) currentCollector).buildTopLevel(); + } else if (currentCollector instanceof MultiBucketCollector) { + for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) { + collectors.offer(innerCollector); + } + } } } } @@ -106,4 +116,51 @@ public List toAggregators(Collection collectors) { } return aggregators; } + + /** + * Unwraps the input collection of {@link Collector} to get the list of the {@link InternalAggregation}. The + * input is expected to contain the collectors related to Aggregations only as that is passed to {@link AggregationCollectorManager} + * during the reduce phase. This list of {@link InternalAggregation} is used to optionally perform reduce at shard level before + * returning response to coordinator + * @param collectors collection of aggregation collectors to reduce + * @return list of unwrapped {@link InternalAggregation} + */ + public List toInternalAggregations(Collection collectors) throws IOException { + List internalAggregations = new ArrayList<>(); + + final Deque allCollectors = new LinkedList<>(collectors); + while (!allCollectors.isEmpty()) { + final Collector currentCollector = allCollectors.pop(); + // This can just be Aggregator + if (currentCollector instanceof Aggregator) { + internalAggregations.add(((Aggregator) currentCollector).getPostCollectionAggregation()); + } else if (currentCollector instanceof InternalProfileCollector) { + if (((InternalProfileCollector) currentCollector).getCollector() instanceof Aggregator) { + internalAggregations.add( + ((Aggregator) ((InternalProfileCollector) currentCollector).getCollector()).getPostCollectionAggregation() + ); + } else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) { + allCollectors.addAll( + Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors()) + ); + } + } else if (currentCollector instanceof MultiBucketCollector) { + allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors())); + } + } + + // Check that internalAggregations does not contain any null objects. If so that means postCollection, and subsequently + // buildAggregation, was not called for a given collector. This can happen as collect will not get called whenever there are no + // leaves on a shard. Since we build the InternalAggregation in postCollection that will not get called in such cases either. + // Therefore we need to manually call it again here to build empty InternalAggregation objects for this collector tree. + if (internalAggregations.stream().anyMatch(Objects::isNull)) { + for (Collector c : collectors) { + processPostCollection(c); + } + // Iterate through collector tree again to get InternalAggregations object + return toInternalAggregations(collectors); + } else { + return internalAggregations; + } + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java b/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java index 1461dd3009b44..35186422fceaa 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java +++ b/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java @@ -131,12 +131,10 @@ public static class MultiBucketConsumer implements IntConsumer { private final int limit; private final CircuitBreaker breaker; - // aggregations execute in a single thread for both sequential - // and concurrent search, so no atomic here + // count is currently only updated in final reduce phase which is executed in single thread for both concurrent and non-concurrent + // search private int count; - - // will be updated by multiple threads in concurrent search - // hence making it as LongAdder + // will be updated by multiple threads in concurrent search hence making it as LongAdder private final LongAdder callCount; private volatile boolean circuitBreakerTripped; private final int availProcessors; diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java index 4af14ab014db5..6c5619a843fae 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java @@ -40,6 +40,7 @@ import org.opensearch.search.internal.SearchContext; import java.io.IOException; +import java.util.Arrays; import java.util.Map; /** @@ -80,7 +81,7 @@ protected Aggregator createInternal( @Override protected boolean supportsConcurrentSegmentSearch() { - // See https://github.com/opensearch-project/OpenSearch/issues/12331 for details - return false; + // Disable concurrent search if any scripting is used. See https://github.com/opensearch-project/OpenSearch/issues/12331 for details + return Arrays.stream(sources).noneMatch(CompositeValuesSourceConfig::hasScript); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index 69fda2f3f6133..eb09aac46186f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -94,7 +94,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr private final long valueCount; private final String fieldName; private Weight weight; - private final GlobalOrdLookupFunction lookupGlobalOrd; protected final CollectionStrategy collectionStrategy; protected int segmentsWithSingleValuedOrds = 0; protected int segmentsWithMultiValuedOrds = 0; @@ -129,11 +128,10 @@ public GlobalOrdinalsStringTermsAggregator( this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job. this.valuesSource = valuesSource; final IndexReader reader = context.searcher().getIndexReader(); - final SortedSetDocValues values = reader.leaves().size() > 0 + final SortedSetDocValues values = !reader.leaves().isEmpty() ? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0)) : DocValues.emptySortedSet(); this.valueCount = values.getValueCount(); - this.lookupGlobalOrd = values::lookupOrd; this.acceptedGlobalOrdinals = includeExclude == null ? ALWAYS_TRUE : includeExclude.acceptedGlobalOrdinals(values)::get; if (remapGlobalOrds) { this.collectionStrategy = new RemapGlobalOrds(cardinality); @@ -885,7 +883,12 @@ PriorityQueue buildPriorityQueue(int size) { } StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp) throws IOException { - BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd)); + // Recreate DocValues as needed for concurrent segment search + SortedSetDocValues values = !context.searcher().getIndexReader().leaves().isEmpty() + ? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0)) + : DocValues.emptySortedSet(); + BytesRef term = BytesRef.deepCopyOf(values.lookupOrd(temp.globalOrd)); + StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format); result.bucketOrd = temp.bucketOrd; result.docCountError = 0; @@ -1001,7 +1004,11 @@ BucketUpdater bucketUpdater(long owningBucketOrd) long subsetSize = subsetSize(owningBucketOrd); return (spare, globalOrd, bucketOrd, docCount) -> { spare.bucketOrd = bucketOrd; - oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes); + // Recreate DocValues as needed for concurrent segment search + SortedSetDocValues values = !context.searcher().getIndexReader().leaves().isEmpty() + ? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0)) + : DocValues.emptySortedSet(); + oversizedCopy(values.lookupOrd(globalOrd), spare.termBytes); spare.subsetDf = docCount; spare.subsetSize = subsetSize; spare.supersetDf = backgroundFrequencies.freq(spare.termBytes); diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 3d13378e58e5d..8f6001dc06755 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -113,6 +113,12 @@ public List toAggregators(Collection collectors) { // should not be called when there is no aggregation collector throw new IllegalStateException("Unexpected toAggregators call on NO_OP_BUCKET_COLLECTOR_PROCESSOR"); } + + @Override + public List toInternalAggregations(Collection collectors) { + // should not be called when there is no aggregation collector + throw new IllegalStateException("Unexpected toInternalAggregations call on NO_OP_BUCKET_COLLECTOR_PROCESSOR"); + } }; private final List releasables = new CopyOnWriteArrayList<>(); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java index aac3fca9e1e16..f381ebdb64fc2 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java @@ -211,7 +211,12 @@ "LuceneFixedGap", "LuceneVarGapFixedInterval", "LuceneVarGapDocFreqInterval", - "Lucene50" }) + "Lucene50", + "Lucene90", + "Lucene94", + "Lucene90", + "Lucene95", + "Lucene99" }) @LuceneTestCase.SuppressReproduceLine public abstract class OpenSearchTestCase extends LuceneTestCase {