diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ccecd42c5af0..59d03551a894e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471)) - MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637)) - [Remote Publication] Upload incremental cluster state on master re-election ([#15145](https://github.com/opensearch-project/OpenSearch/pull/15145)) +- Making _cat/allocation API use indexLevelStats ([#15292](https://github.com/opensearch-project/OpenSearch/pull/15292)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index e5dd44a70a11c..f1b36194bf62d 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -867,7 +867,7 @@ public IndexSettings getIndexSettings() { * {@link IndexReader}-specific optimizations, such as rewriting containing range queries. */ public QueryShardContext newQueryShardContext(int shardId, IndexSearcher searcher, LongSupplier nowInMillis, String clusterAlias) { - return newQueryShardContext(shardId, searcher, nowInMillis, clusterAlias, false, false); + return newQueryShardContext(shardId, searcher, nowInMillis, clusterAlias, false); } /** @@ -913,6 +913,22 @@ public QueryShardContext newQueryShardContext( ); } + /** + * Creates a new QueryShardContext. + *

+ * Passing a {@code null} {@link IndexSearcher} will return a valid context, however it won't be able to make + * {@link IndexReader}-specific optimizations, such as rewriting containing range queries. + */ + public QueryShardContext newQueryShardContext( + int shardId, + IndexSearcher searcher, + LongSupplier nowInMillis, + String clusterAlias, + boolean validate + ) { + return newQueryShardContext(shardId, searcher, nowInMillis, clusterAlias, validate, false); + } + /** * The {@link ThreadPool} to use for this index. */ diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 34ed957542aff..a8d4ebcf23dab 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -235,7 +235,7 @@ import org.opensearch.search.aggregations.support.AggregationUsageService; import org.opensearch.search.backpressure.SearchBackpressureService; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.pipeline.SearchPipelineService; import org.opensearch.search.query.QueryPhase; @@ -1344,7 +1344,7 @@ protected Node( circuitBreakerService, searchModule.getIndexSearcherExecutor(threadPool), taskResourceTrackingService, - searchModule.getConcurrentSearchDeciders() + searchModule.getConcurrentSearchRequestDeciderFactories() ); final List> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class) @@ -2004,7 +2004,7 @@ protected SearchService newSearchService( CircuitBreakerService circuitBreakerService, Executor indexSearcherExecutor, TaskResourceTrackingService taskResourceTrackingService, - Collection concurrentSearchDecidersList + Collection concurrentSearchDeciderFactories ) { return new SearchService( clusterService, @@ -2018,7 +2018,7 @@ protected SearchService newSearchService( circuitBreakerService, indexSearcherExecutor, taskResourceTrackingService, - concurrentSearchDecidersList + concurrentSearchDeciderFactories ); } diff --git a/server/src/main/java/org/opensearch/plugins/SearchPlugin.java b/server/src/main/java/org/opensearch/plugins/SearchPlugin.java index 498da4042fa33..710dd32371f83 100644 --- a/server/src/main/java/org/opensearch/plugins/SearchPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SearchPlugin.java @@ -65,7 +65,7 @@ import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.fetch.FetchSubPhase; import org.opensearch.search.fetch.subphase.highlight.Highlighter; import org.opensearch.search.query.QueryPhaseSearcher; @@ -141,12 +141,12 @@ default Map getHighlighters() { } /** - * Allows plugins to register custom decider for concurrent search - * @return A {@link ConcurrentSearchDecider} + * Allows plugins to register a factory to create custom decider for concurrent search + * @return A {@link ConcurrentSearchRequestDecider.Factory} */ @ExperimentalApi - default ConcurrentSearchDecider getConcurrentSearchDecider() { - return null; + default Optional getConcurrentSearchRequestDeciderFactory() { + return Optional.empty(); } /** diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java index 07b0fbbe4a911..912e5173f8e01 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java @@ -101,6 +101,7 @@ public void processResponse(final ClusterStateResponse state) { statsRequest.clear() .addMetric(NodesStatsRequest.Metric.FS.metricName()) .indices(new CommonStatsFlags(CommonStatsFlags.Flag.Store)); + statsRequest.indices().setIncludeIndicesStatsByLevel(true); client.admin().cluster().nodesStats(statsRequest, new RestResponseListener(channel) { @Override diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 1706c27ccf922..74a7482d975df 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -72,8 +72,8 @@ import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.collapse.CollapseContext; -import org.opensearch.search.deciders.ConcurrentSearchDecider; import org.opensearch.search.deciders.ConcurrentSearchDecision; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.deciders.ConcurrentSearchVisitor; import org.opensearch.search.dfs.DfsSearchResult; import org.opensearch.search.fetch.FetchPhase; @@ -106,13 +106,14 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; import java.util.function.Function; import java.util.function.LongSupplier; -import java.util.stream.Collectors; import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE; @@ -137,7 +138,7 @@ final class DefaultSearchContext extends SearchContext { private final ShardSearchRequest request; private final SearchShardTarget shardTarget; private final LongSupplier relativeTimeSupplier; - private final Collection concurrentSearchDeciders; + private final Collection concurrentSearchDeciderFactories; private SearchType searchType; private final BigArrays bigArrays; private final IndexShard indexShard; @@ -223,7 +224,7 @@ final class DefaultSearchContext extends SearchContext { boolean validate, Executor executor, Function requestToAggReduceContextBuilder, - Collection concurrentSearchDeciders + Collection concurrentSearchDeciderFactories ) throws IOException { this.readerContext = readerContext; this.request = request; @@ -267,7 +268,7 @@ final class DefaultSearchContext extends SearchContext { this.maxAggRewriteFilters = evaluateFilterRewriteSetting(); this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold(); - this.concurrentSearchDeciders = concurrentSearchDeciders; + this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories; this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled(); } @@ -932,14 +933,21 @@ public boolean shouldUseConcurrentSearch() { private boolean evaluateAutoMode() { - // filter out deciders that want to opt-out of decision-making - final Set filteredDeciders = concurrentSearchDeciders.stream() - .filter(concurrentSearchDecider -> concurrentSearchDecider.canEvaluateForIndex(indexService.getIndexSettings())) - .collect(Collectors.toSet()); + final Set concurrentSearchRequestDeciders = new HashSet<>(); + + // create the ConcurrentSearchRequestDeciders using registered factories + for (ConcurrentSearchRequestDecider.Factory deciderFactory : concurrentSearchDeciderFactories) { + final Optional concurrentSearchRequestDecider = deciderFactory.create( + indexService.getIndexSettings() + ); + concurrentSearchRequestDecider.ifPresent(concurrentSearchRequestDeciders::add); + + } + // evaluate based on concurrent search query visitor - if (filteredDeciders.size() > 0) { + if (concurrentSearchRequestDeciders.size() > 0) { ConcurrentSearchVisitor concurrentSearchVisitor = new ConcurrentSearchVisitor( - filteredDeciders, + concurrentSearchRequestDeciders, indexService.getIndexSettings() ); if (request().source() != null && request().source().query() != null) { @@ -949,7 +957,7 @@ private boolean evaluateAutoMode() { } final List decisions = new ArrayList<>(); - for (ConcurrentSearchDecider decider : filteredDeciders) { + for (ConcurrentSearchRequestDecider decider : concurrentSearchRequestDeciders) { ConcurrentSearchDecision decision = decider.getConcurrentSearchDecision(); if (decision != null) { if (logger.isDebugEnabled()) { diff --git a/server/src/main/java/org/opensearch/search/SearchModule.java b/server/src/main/java/org/opensearch/search/SearchModule.java index c51004a1ea95e..3a746259af3b5 100644 --- a/server/src/main/java/org/opensearch/search/SearchModule.java +++ b/server/src/main/java/org/opensearch/search/SearchModule.java @@ -239,7 +239,7 @@ import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.fetch.FetchSubPhase; import org.opensearch.search.fetch.subphase.ExplainPhase; @@ -318,7 +318,7 @@ public class SearchModule { private final QueryPhaseSearcher queryPhaseSearcher; private final SearchPlugin.ExecutorServiceProvider indexSearcherExecutorProvider; - private final Collection concurrentSearchDeciders; + private final Collection concurrentSearchDeciderFactories; /** * Constructs a new SearchModule object @@ -348,25 +348,23 @@ public SearchModule(Settings settings, List plugins) { queryPhaseSearcher = registerQueryPhaseSearcher(plugins); indexSearcherExecutorProvider = registerIndexSearcherExecutorProvider(plugins); namedWriteables.addAll(SortValue.namedWriteables()); - concurrentSearchDeciders = registerConcurrentSearchDeciders(plugins); + concurrentSearchDeciderFactories = registerConcurrentSearchDeciderFactories(plugins); } - private Collection registerConcurrentSearchDeciders(List plugins) { - List concurrentSearchDeciders = new ArrayList<>(); + private Collection registerConcurrentSearchDeciderFactories(List plugins) { + List concurrentSearchDeciderFactories = new ArrayList<>(); for (SearchPlugin plugin : plugins) { - ConcurrentSearchDecider decider = plugin.getConcurrentSearchDecider(); - if (decider != null) { - concurrentSearchDeciders.add(decider); - } + final Optional deciderFactory = plugin.getConcurrentSearchRequestDeciderFactory(); + deciderFactory.ifPresent(concurrentSearchDeciderFactories::add); } - return concurrentSearchDeciders; + return concurrentSearchDeciderFactories; } /** - * Returns the concurrent search deciders that the plugins have registered + * Returns the concurrent search decider factories that the plugins have registered */ - public Collection getConcurrentSearchDeciders() { - return concurrentSearchDeciders; + public Collection getConcurrentSearchRequestDeciderFactories() { + return concurrentSearchDeciderFactories; } public List getNamedWriteables() { diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index e2a804a674d8f..40afdbfbdaa9e 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -104,7 +104,7 @@ import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.collapse.CollapseContext; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.dfs.DfsPhase; import org.opensearch.search.dfs.DfsSearchResult; import org.opensearch.search.fetch.FetchPhase; @@ -364,7 +364,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final QueryPhase queryPhase; private final FetchPhase fetchPhase; - private final Collection concurrentSearchDeciders; + private final Collection concurrentSearchDeciderFactories; private volatile long defaultKeepAlive; @@ -410,7 +410,7 @@ public SearchService( CircuitBreakerService circuitBreakerService, Executor indexSearcherExecutor, TaskResourceTrackingService taskResourceTrackingService, - Collection concurrentSearchDeciders + Collection concurrentSearchDeciderFactories ) { Settings settings = clusterService.getSettings(); this.threadPool = threadPool; @@ -466,7 +466,7 @@ public SearchService( allowDerivedField = CLUSTER_ALLOW_DERIVED_FIELD_SETTING.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_ALLOW_DERIVED_FIELD_SETTING, this::setAllowDerivedField); - this.concurrentSearchDeciders = concurrentSearchDeciders; + this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories; } private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) { @@ -1167,7 +1167,7 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear validate, indexSearcherExecutor, this::aggReduceContextBuilder, - concurrentSearchDeciders + concurrentSearchDeciderFactories ); // we clone the query shard context here just for rewriting otherwise we // might end up with incorrect state since we are using now() or script services diff --git a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecision.java b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecision.java index 2a30413eff9c8..4ac47221856d1 100644 --- a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecision.java +++ b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecision.java @@ -13,7 +13,7 @@ import java.util.Collection; /** - * This Class defines the decisions that a {@link ConcurrentSearchDecider#getConcurrentSearchDecision} can return. + * This Class defines the decisions that a {@link ConcurrentSearchRequestDecider#getConcurrentSearchDecision} can return. * */ @ExperimentalApi diff --git a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecider.java b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchRequestDecider.java similarity index 50% rename from server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecider.java rename to server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchRequestDecider.java index 9c588bb45b4ec..ec40527314454 100644 --- a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecider.java +++ b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchRequestDecider.java @@ -12,17 +12,21 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.query.QueryBuilder; +import java.util.Optional; + /** - * {@link ConcurrentSearchDecider} allows pluggable way to evaluate if a query in the search request + * {@link ConcurrentSearchRequestDecider} allows pluggable way to evaluate if a query in the search request * can use concurrent segment search using the passed in queryBuilders from query tree and index settings * on a per shard request basis. - * Implementations can also opt out of the evaluation process for certain indices based on the index settings. - * For all the deciders which can evaluate query tree for an index, its evaluateForQuery method - * will be called for each node in the query tree. After traversing of the query tree is completed, the final - * decision from the deciders will be obtained using {@link ConcurrentSearchDecider#getConcurrentSearchDecision} + * Implementations will need to implement the Factory interface that can be used to create the ConcurrentSearchRequestDecider + * This factory will be called on each shard search request to create the ConcurrentSearchRequestDecider and get the + * concurrent search decision from the created decider on a per-request basis. + * For all the deciders the evaluateForQuery method will be called for each node in the query tree. + * After traversing of the query tree is completed, the final decision from the deciders will be + * obtained using {@link ConcurrentSearchRequestDecider#getConcurrentSearchDecision} */ @ExperimentalApi -public abstract class ConcurrentSearchDecider { +public abstract class ConcurrentSearchRequestDecider { /** * Evaluate for the passed in queryBuilder node in the query tree of the search request @@ -31,14 +35,6 @@ public abstract class ConcurrentSearchDecider { */ public abstract void evaluateForQuery(QueryBuilder queryBuilder, IndexSettings indexSettings); - /** - * Provides a way for deciders to opt out of decision-making process for certain requests based on - * index settings. - * Return true if interested in decision making for index, - * false, otherwise - */ - public abstract boolean canEvaluateForIndex(IndexSettings indexSettings); - /** * Provide the final decision for concurrent search based on all evaluations * Plugins may need to maintain internal state of evaluations to provide a final decision @@ -47,4 +43,16 @@ public abstract class ConcurrentSearchDecider { */ public abstract ConcurrentSearchDecision getConcurrentSearchDecision(); + /** + * Factory interface that can be implemented to create the ConcurrentSearchRequestDecider object. + * Implementations can use the passed in indexSettings to decide whether to create the decider object or + * return {@link Optional#empty()}. + */ + @ExperimentalApi + public interface Factory { + default Optional create(IndexSettings indexSettings) { + return Optional.empty(); + } + } + } diff --git a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchVisitor.java b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchVisitor.java index 12ba1b2a9cc5f..d1a4fa982dc7e 100644 --- a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchVisitor.java +++ b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchVisitor.java @@ -19,15 +19,15 @@ /** * Class to traverse the QueryBuilder tree and invoke the - * {@link ConcurrentSearchDecider#evaluateForQuery} at each node of the query tree + * {@link ConcurrentSearchRequestDecider#evaluateForQuery} at each node of the query tree */ @ExperimentalApi public class ConcurrentSearchVisitor implements QueryBuilderVisitor { - private final Set deciders; + private final Set deciders; private final IndexSettings indexSettings; - public ConcurrentSearchVisitor(Set concurrentSearchVisitorDeciders, IndexSettings idxSettings) { + public ConcurrentSearchVisitor(Set concurrentSearchVisitorDeciders, IndexSettings idxSettings) { Objects.requireNonNull(concurrentSearchVisitorDeciders, "Concurrent search deciders cannot be null"); deciders = concurrentSearchVisitorDeciders; indexSettings = idxSettings; diff --git a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java index 7e213218eb97b..55b30d5068daa 100644 --- a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java @@ -76,8 +76,8 @@ import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.search.deciders.ConcurrentSearchDecider; import org.opensearch.search.deciders.ConcurrentSearchDecision; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.LegacyReaderContext; import org.opensearch.search.internal.PitReaderContext; @@ -96,6 +96,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -984,14 +986,34 @@ protected Engine.Searcher acquireSearcherInternal(String source) { // Case4: multiple deciders are registered and all of them opt out of decision-making // with supported agg query so concurrent path is used - ConcurrentSearchDecider decider1 = mock(ConcurrentSearchDecider.class); - when(decider1.canEvaluateForIndex(any())).thenReturn(false); - ConcurrentSearchDecider decider2 = mock(ConcurrentSearchDecider.class); - when(decider2.canEvaluateForIndex(any())).thenReturn(false); + ConcurrentSearchRequestDecider decider1 = mock(ConcurrentSearchRequestDecider.class); - Collection concurrentSearchDeciders = new ArrayList<>(); - concurrentSearchDeciders.add(decider1); - concurrentSearchDeciders.add(decider2); + ConcurrentSearchRequestDecider decider2 = mock(ConcurrentSearchRequestDecider.class); + + ConcurrentSearchRequestDecider.Factory factory1 = new ConcurrentSearchRequestDecider.Factory() { + @Override + public Optional create(IndexSettings indexSettings) { + return Optional.ofNullable(decider1); + } + }; + + ConcurrentSearchRequestDecider.Factory factory2 = new ConcurrentSearchRequestDecider.Factory() { + @Override + public Optional create(IndexSettings indexSettings) { + return Optional.ofNullable(decider2); + } + }; + ConcurrentSearchRequestDecider.Factory factory3 = new ConcurrentSearchRequestDecider.Factory() { + @Override + public Optional create(IndexSettings indexSettings) { + return Optional.empty(); + } + }; + + List concurrentSearchRequestDeciders = new ArrayList<>(); + concurrentSearchRequestDeciders.add(factory1); + concurrentSearchRequestDeciders.add(factory2); + concurrentSearchRequestDeciders.add(factory3); context = new DefaultSearchContext( readerContext, @@ -1007,7 +1029,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) { false, executor, null, - concurrentSearchDeciders + concurrentSearchRequestDeciders ); // create a supported agg operation context.aggregations(mockAggregations); @@ -1021,15 +1043,9 @@ protected Engine.Searcher acquireSearcherInternal(String source) { // Case5: multiple deciders are registered and one of them returns ConcurrentSearchDecision.DecisionStatus.NO // use non-concurrent path even if query contains supported agg - when(decider1.canEvaluateForIndex(any())).thenReturn(true); when(decider1.getConcurrentSearchDecision()).thenReturn( new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO, "disable concurrent search") ); - when(decider2.canEvaluateForIndex(any())).thenReturn(false); - - concurrentSearchDeciders.clear(); - concurrentSearchDeciders.add(decider1); - concurrentSearchDeciders.add(decider2); // create a source so that query tree is parsed by visitor SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); @@ -1051,7 +1067,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) { false, executor, null, - concurrentSearchDeciders + concurrentSearchRequestDeciders ); // create a supported agg operation @@ -1067,20 +1083,17 @@ protected Engine.Searcher acquireSearcherInternal(String source) { // Case6: multiple deciders are registered and first decider returns ConcurrentSearchDecision.DecisionStatus.YES // while second decider returns ConcurrentSearchDecision.DecisionStatus.NO // use non-concurrent path even if query contains supported agg - when(decider1.canEvaluateForIndex(any())).thenReturn(true); + when(decider1.getConcurrentSearchDecision()).thenReturn( new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.YES, "enable concurrent search") ); - when(decider2.canEvaluateForIndex(any())).thenReturn(true); + when(decider2.getConcurrentSearchDecision()).thenReturn( new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO, "disable concurrent search") ); - concurrentSearchDeciders.clear(); - concurrentSearchDeciders.add(decider1); - concurrentSearchDeciders.add(decider2); - // create a source so that query tree is parsed by visitor + when(shardSearchRequest.source()).thenReturn(sourceBuilder); context = new DefaultSearchContext( readerContext, @@ -1096,7 +1109,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) { false, executor, null, - concurrentSearchDeciders + concurrentSearchRequestDeciders ); // create a supported agg operation @@ -1111,22 +1124,19 @@ protected Engine.Searcher acquireSearcherInternal(String source) { // Case7: multiple deciders are registered and all return ConcurrentSearchDecision.DecisionStatus.NO_OP // but un-supported agg query is present, use non-concurrent path - when(decider1.canEvaluateForIndex(any())).thenReturn(true); + when(decider1.getConcurrentSearchDecision()).thenReturn( new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "noop") ); - when(decider2.canEvaluateForIndex(any())).thenReturn(true); + when(decider2.getConcurrentSearchDecision()).thenReturn( new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "noop") ); when(mockAggregations.factories().allFactoriesSupportConcurrentSearch()).thenReturn(false); - concurrentSearchDeciders.clear(); - concurrentSearchDeciders.add(decider1); - concurrentSearchDeciders.add(decider2); - // create a source so that query tree is parsed by visitor + when(shardSearchRequest.source()).thenReturn(sourceBuilder); context = new DefaultSearchContext( readerContext, @@ -1142,7 +1152,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) { false, executor, null, - concurrentSearchDeciders + concurrentSearchRequestDeciders ); // create a supported agg operation diff --git a/server/src/test/java/org/opensearch/search/SearchModuleTests.java b/server/src/test/java/org/opensearch/search/SearchModuleTests.java index 71bbe7c718f47..9e8e7afe332f1 100644 --- a/server/src/test/java/org/opensearch/search/SearchModuleTests.java +++ b/server/src/test/java/org/opensearch/search/SearchModuleTests.java @@ -41,6 +41,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.IndexSettings; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.query.QueryShardContext; @@ -69,7 +70,7 @@ import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.aggregations.support.ValuesSourceType; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.fetch.FetchSubPhase; import org.opensearch.search.fetch.subphase.ExplainPhase; import org.opensearch.search.fetch.subphase.highlight.CustomHighlighter; @@ -501,12 +502,12 @@ public Optional getIndexSearcherExecutorProvider() { expectThrows(IllegalStateException.class, () -> new SearchModule(Settings.EMPTY, searchPlugins)); } - public void testRegisterConcurrentSearchDecidersNoExternalPlugins() { + public void testRegisterConcurrentSearchRequestDecidersNoExternalPlugins() { SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList()); - assertEquals(searchModule.getConcurrentSearchDeciders().size(), 0); + assertEquals(searchModule.getConcurrentSearchRequestDeciderFactories().size(), 0); } - public void testRegisterConcurrentSearchDecidersExternalPluginsWithNoDeciders() { + public void testRegisterConcurrentSearchRequestDecidersExternalPluginsWithNoDeciders() { SearchPlugin plugin1 = new SearchPlugin() { @Override public Optional getIndexSearcherExecutorProvider() { @@ -521,10 +522,10 @@ public Optional getIndexSearcherExecutorProvider() { searchPlugins.add(plugin2); SearchModule searchModule = new SearchModule(Settings.EMPTY, searchPlugins); - assertEquals(searchModule.getConcurrentSearchDeciders().size(), 0); + assertEquals(searchModule.getConcurrentSearchRequestDeciderFactories().size(), 0); } - public void testRegisterConcurrentSearchDecidersExternalPluginsWithDeciders() { + public void testRegisterConcurrentSearchRequestDecidersExternalPluginsWithDeciders() { SearchPlugin pluginDecider1 = new SearchPlugin() { @Override public Optional getIndexSearcherExecutorProvider() { @@ -532,15 +533,25 @@ public Optional getIndexSearcherExecutorProvider() { } @Override - public ConcurrentSearchDecider getConcurrentSearchDecider() { - return mock(ConcurrentSearchDecider.class); + public Optional getConcurrentSearchRequestDeciderFactory() { + return Optional.of(new ConcurrentSearchRequestDecider.Factory() { + @Override + public Optional create(IndexSettings indexSettings) { + return Optional.of(mock(ConcurrentSearchRequestDecider.class)); + } + }); } }; SearchPlugin pluginDecider2 = new SearchPlugin() { @Override - public ConcurrentSearchDecider getConcurrentSearchDecider() { - return mock(ConcurrentSearchDecider.class); + public Optional getConcurrentSearchRequestDeciderFactory() { + return Optional.of(new ConcurrentSearchRequestDecider.Factory() { + @Override + public Optional create(IndexSettings indexSettings) { + return Optional.of(mock(ConcurrentSearchRequestDecider.class)); + } + }); } }; @@ -549,23 +560,7 @@ public ConcurrentSearchDecider getConcurrentSearchDecider() { searchPlugins.add(pluginDecider2); SearchModule searchModule = new SearchModule(Settings.EMPTY, searchPlugins); - assertEquals(searchModule.getConcurrentSearchDeciders().size(), 2); - } - - public void testRegisterConcurrentSearchDecidersPluginWithNullDecider() { - SearchPlugin pluginWithNullDecider = new SearchPlugin() { - @Override - public ConcurrentSearchDecider getConcurrentSearchDecider() { - return null; - } - }; - - List searchPlugins = new ArrayList<>(); - searchPlugins.add(pluginWithNullDecider); - SearchModule searchModule = new SearchModule(Settings.EMPTY, searchPlugins); - // null decider is filtered out, so 0 deciders - assertEquals(searchModule.getConcurrentSearchDeciders().size(), 0); - + assertEquals(searchModule.getConcurrentSearchRequestDeciderFactories().size(), 2); } private static final String[] NON_DEPRECATED_QUERIES = new String[] { diff --git a/test/framework/src/main/java/org/opensearch/node/MockNode.java b/test/framework/src/main/java/org/opensearch/node/MockNode.java index 09df9b85320f0..97c06962ca2e7 100644 --- a/test/framework/src/main/java/org/opensearch/node/MockNode.java +++ b/test/framework/src/main/java/org/opensearch/node/MockNode.java @@ -57,7 +57,7 @@ import org.opensearch.script.ScriptService; import org.opensearch.search.MockSearchService; import org.opensearch.search.SearchService; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.query.QueryPhase; import org.opensearch.tasks.TaskResourceTrackingService; @@ -158,7 +158,7 @@ protected SearchService newSearchService( CircuitBreakerService circuitBreakerService, Executor indexSearcherExecutor, TaskResourceTrackingService taskResourceTrackingService, - Collection concurrentSearchDecidersList + Collection concurrentSearchDeciderFactories ) { if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) { return super.newSearchService( @@ -173,7 +173,7 @@ protected SearchService newSearchService( circuitBreakerService, indexSearcherExecutor, taskResourceTrackingService, - concurrentSearchDecidersList + concurrentSearchDeciderFactories ); } return new MockSearchService(