diff --git a/CHANGELOG.md b/CHANGELOG.md index 948bf7946164e..6945d589eb395 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471)) - Static RemotePublication setting added, removed experimental feature flag ([#15478](https://github.com/opensearch-project/OpenSearch/pull/15478)) - MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637)) +- Making _cat/allocation API use indexLevelStats ([#15292](https://github.com/opensearch-project/OpenSearch/pull/15292)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java index 63f44f272c0be..548d58fd4af29 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java @@ -8,11 +8,15 @@ package org.opensearch.gateway.remote; -import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationState; +import org.opensearch.cluster.coordination.PersistedStateRegistry; +import org.opensearch.cluster.coordination.PublishClusterStateStats; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; import org.opensearch.discovery.DiscoveryStats; @@ -26,6 +30,7 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.ReloadableFsRepository; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; import org.opensearch.test.OpenSearchIntegTestCase.Scope; import org.junit.Before; @@ -33,11 +38,17 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Base64; +import java.util.HashSet; import java.util.Locale; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import static org.opensearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.SETTINGS; +import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY; +import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; @@ -213,11 +224,121 @@ public void testRemotePublicationDownloadStats() { NodesStatsResponse nodesStatsResponseDataNode = client().admin() .cluster() .prepareNodesStats(dataNode) - .addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName()) + .addMetric(DISCOVERY.metricName()) .get(); assertDataNodeDownloadStats(nodesStatsResponseDataNode); + } + + public void testRemotePublicationDisabledByRollingRestart() throws Exception { + prepareCluster(3, 2, INDEX_NAME, 1, 2); + ensureStableCluster(5); + ensureGreen(INDEX_NAME); + + Set clusterManagers = internalCluster().getClusterManagerNames(); + Set restartedMasters = new HashSet<>(); + + for (String clusterManager : clusterManagers) { + internalCluster().restartNode(clusterManager, new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) { + restartedMasters.add(nodeName); + return Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, false).build(); + } + + @Override + public void doAfterNodes(int n, Client client) { + String activeCM = internalCluster().getClusterManagerName(); + Set followingCMs = clusterManagers.stream() + .filter(node -> !Objects.equals(node, activeCM)) + .collect(Collectors.toSet()); + boolean activeCMRestarted = restartedMasters.contains(activeCM); + NodesStatsResponse response = client().admin() + .cluster() + .prepareNodesStats(followingCMs.toArray(new String[0])) + .clear() + .addMetric(DISCOVERY.metricName()) + .get(); + // after master is flipped to restarted master, publication should happen on Transport + response.getNodes().forEach(nodeStats -> { + if (activeCMRestarted) { + PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats(); + assertTrue( + stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0 + ); + assertEquals(0, stats.getIncompatibleClusterStateDiffReceivedCount()); + } else { + DiscoveryStats stats = nodeStats.getDiscoveryStats(); + assertEquals(0, stats.getPublishStats().getFullClusterStateReceivedCount()); + assertEquals(0, stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount()); + assertEquals(0, stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount()); + } + }); + + NodesInfoResponse nodesInfoResponse = client().admin() + .cluster() + .prepareNodesInfo(activeCM) + .clear() + .addMetric(SETTINGS.metricName()) + .get(); + // if masterRestarted is true Publication Setting should be false, and vice versa + assertTrue( + REMOTE_PUBLICATION_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings()) != activeCMRestarted + ); + + followingCMs.forEach(node -> { + PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node); + CoordinationState.PersistedState remoteState = registry.getPersistedState( + PersistedStateRegistry.PersistedStateType.REMOTE + ); + if (activeCMRestarted) { + assertNull(remoteState.getLastAcceptedState()); + // assertNull(remoteState.getLastAcceptedManifest()); + } else { + ClusterState localState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL) + .getLastAcceptedState(); + ClusterState remotePersistedState = remoteState.getLastAcceptedState(); + assertTrue(isGlobalStateEquals(localState.metadata(), remotePersistedState.metadata())); + assertEquals(localState.nodes(), remotePersistedState.nodes()); + assertEquals(localState.routingTable(), remotePersistedState.routingTable()); + assertEquals(localState.customs(), remotePersistedState.customs()); + } + }); + } + }); + + } + ensureGreen(INDEX_NAME); + ensureStableCluster(5); + + String activeCM = internalCluster().getClusterManagerName(); + Set followingCMs = clusterManagers.stream().filter(node -> !Objects.equals(node, activeCM)).collect(Collectors.toSet()); + NodesStatsResponse response = client().admin() + .cluster() + .prepareNodesStats(followingCMs.toArray(new String[0])) + .clear() + .addMetric(DISCOVERY.metricName()) + .get(); + response.getNodes().forEach(nodeStats -> { + PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats(); + assertTrue(stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0); + assertEquals(0, stats.getIncompatibleClusterStateDiffReceivedCount()); + }); + NodesInfoResponse nodesInfoResponse = client().admin() + .cluster() + .prepareNodesInfo(activeCM) + .clear() + .addMetric(SETTINGS.metricName()) + .get(); + // if masterRestarted is true Publication Setting should be false, and vice versa + assertFalse(REMOTE_PUBLICATION_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings())); + followingCMs.forEach(node -> { + PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node); + CoordinationState.PersistedState remoteState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE); + assertNull(remoteState.getLastAcceptedState()); + // assertNull(remoteState.getLastAcceptedManifest()); + }); } private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) { diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index e5dd44a70a11c..f1b36194bf62d 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -867,7 +867,7 @@ public IndexSettings getIndexSettings() { * {@link IndexReader}-specific optimizations, such as rewriting containing range queries. */ public QueryShardContext newQueryShardContext(int shardId, IndexSearcher searcher, LongSupplier nowInMillis, String clusterAlias) { - return newQueryShardContext(shardId, searcher, nowInMillis, clusterAlias, false, false); + return newQueryShardContext(shardId, searcher, nowInMillis, clusterAlias, false); } /** @@ -913,6 +913,22 @@ public QueryShardContext newQueryShardContext( ); } + /** + * Creates a new QueryShardContext. + *

+ * Passing a {@code null} {@link IndexSearcher} will return a valid context, however it won't be able to make + * {@link IndexReader}-specific optimizations, such as rewriting containing range queries. + */ + public QueryShardContext newQueryShardContext( + int shardId, + IndexSearcher searcher, + LongSupplier nowInMillis, + String clusterAlias, + boolean validate + ) { + return newQueryShardContext(shardId, searcher, nowInMillis, clusterAlias, validate, false); + } + /** * The {@link ThreadPool} to use for this index. */ diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 34ed957542aff..a8d4ebcf23dab 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -235,7 +235,7 @@ import org.opensearch.search.aggregations.support.AggregationUsageService; import org.opensearch.search.backpressure.SearchBackpressureService; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.pipeline.SearchPipelineService; import org.opensearch.search.query.QueryPhase; @@ -1344,7 +1344,7 @@ protected Node( circuitBreakerService, searchModule.getIndexSearcherExecutor(threadPool), taskResourceTrackingService, - searchModule.getConcurrentSearchDeciders() + searchModule.getConcurrentSearchRequestDeciderFactories() ); final List> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class) @@ -2004,7 +2004,7 @@ protected SearchService newSearchService( CircuitBreakerService circuitBreakerService, Executor indexSearcherExecutor, TaskResourceTrackingService taskResourceTrackingService, - Collection concurrentSearchDecidersList + Collection concurrentSearchDeciderFactories ) { return new SearchService( clusterService, @@ -2018,7 +2018,7 @@ protected SearchService newSearchService( circuitBreakerService, indexSearcherExecutor, taskResourceTrackingService, - concurrentSearchDecidersList + concurrentSearchDeciderFactories ); } diff --git a/server/src/main/java/org/opensearch/plugins/SearchPlugin.java b/server/src/main/java/org/opensearch/plugins/SearchPlugin.java index 498da4042fa33..710dd32371f83 100644 --- a/server/src/main/java/org/opensearch/plugins/SearchPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/SearchPlugin.java @@ -65,7 +65,7 @@ import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator; import org.opensearch.search.aggregations.pipeline.PipelineAggregator; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.fetch.FetchSubPhase; import org.opensearch.search.fetch.subphase.highlight.Highlighter; import org.opensearch.search.query.QueryPhaseSearcher; @@ -141,12 +141,12 @@ default Map getHighlighters() { } /** - * Allows plugins to register custom decider for concurrent search - * @return A {@link ConcurrentSearchDecider} + * Allows plugins to register a factory to create custom decider for concurrent search + * @return A {@link ConcurrentSearchRequestDecider.Factory} */ @ExperimentalApi - default ConcurrentSearchDecider getConcurrentSearchDecider() { - return null; + default Optional getConcurrentSearchRequestDeciderFactory() { + return Optional.empty(); } /** diff --git a/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java b/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java index 07b0fbbe4a911..912e5173f8e01 100644 --- a/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java +++ b/server/src/main/java/org/opensearch/rest/action/cat/RestAllocationAction.java @@ -101,6 +101,7 @@ public void processResponse(final ClusterStateResponse state) { statsRequest.clear() .addMetric(NodesStatsRequest.Metric.FS.metricName()) .indices(new CommonStatsFlags(CommonStatsFlags.Flag.Store)); + statsRequest.indices().setIncludeIndicesStatsByLevel(true); client.admin().cluster().nodesStats(statsRequest, new RestResponseListener(channel) { @Override diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 1706c27ccf922..74a7482d975df 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -72,8 +72,8 @@ import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.collapse.CollapseContext; -import org.opensearch.search.deciders.ConcurrentSearchDecider; import org.opensearch.search.deciders.ConcurrentSearchDecision; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.deciders.ConcurrentSearchVisitor; import org.opensearch.search.dfs.DfsSearchResult; import org.opensearch.search.fetch.FetchPhase; @@ -106,13 +106,14 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; import java.util.function.Function; import java.util.function.LongSupplier; -import java.util.stream.Collectors; import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE; @@ -137,7 +138,7 @@ final class DefaultSearchContext extends SearchContext { private final ShardSearchRequest request; private final SearchShardTarget shardTarget; private final LongSupplier relativeTimeSupplier; - private final Collection concurrentSearchDeciders; + private final Collection concurrentSearchDeciderFactories; private SearchType searchType; private final BigArrays bigArrays; private final IndexShard indexShard; @@ -223,7 +224,7 @@ final class DefaultSearchContext extends SearchContext { boolean validate, Executor executor, Function requestToAggReduceContextBuilder, - Collection concurrentSearchDeciders + Collection concurrentSearchDeciderFactories ) throws IOException { this.readerContext = readerContext; this.request = request; @@ -267,7 +268,7 @@ final class DefaultSearchContext extends SearchContext { this.maxAggRewriteFilters = evaluateFilterRewriteSetting(); this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold(); - this.concurrentSearchDeciders = concurrentSearchDeciders; + this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories; this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled(); } @@ -932,14 +933,21 @@ public boolean shouldUseConcurrentSearch() { private boolean evaluateAutoMode() { - // filter out deciders that want to opt-out of decision-making - final Set filteredDeciders = concurrentSearchDeciders.stream() - .filter(concurrentSearchDecider -> concurrentSearchDecider.canEvaluateForIndex(indexService.getIndexSettings())) - .collect(Collectors.toSet()); + final Set concurrentSearchRequestDeciders = new HashSet<>(); + + // create the ConcurrentSearchRequestDeciders using registered factories + for (ConcurrentSearchRequestDecider.Factory deciderFactory : concurrentSearchDeciderFactories) { + final Optional concurrentSearchRequestDecider = deciderFactory.create( + indexService.getIndexSettings() + ); + concurrentSearchRequestDecider.ifPresent(concurrentSearchRequestDeciders::add); + + } + // evaluate based on concurrent search query visitor - if (filteredDeciders.size() > 0) { + if (concurrentSearchRequestDeciders.size() > 0) { ConcurrentSearchVisitor concurrentSearchVisitor = new ConcurrentSearchVisitor( - filteredDeciders, + concurrentSearchRequestDeciders, indexService.getIndexSettings() ); if (request().source() != null && request().source().query() != null) { @@ -949,7 +957,7 @@ private boolean evaluateAutoMode() { } final List decisions = new ArrayList<>(); - for (ConcurrentSearchDecider decider : filteredDeciders) { + for (ConcurrentSearchRequestDecider decider : concurrentSearchRequestDeciders) { ConcurrentSearchDecision decision = decider.getConcurrentSearchDecision(); if (decision != null) { if (logger.isDebugEnabled()) { diff --git a/server/src/main/java/org/opensearch/search/SearchModule.java b/server/src/main/java/org/opensearch/search/SearchModule.java index c51004a1ea95e..3a746259af3b5 100644 --- a/server/src/main/java/org/opensearch/search/SearchModule.java +++ b/server/src/main/java/org/opensearch/search/SearchModule.java @@ -239,7 +239,7 @@ import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder; import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.fetch.FetchSubPhase; import org.opensearch.search.fetch.subphase.ExplainPhase; @@ -318,7 +318,7 @@ public class SearchModule { private final QueryPhaseSearcher queryPhaseSearcher; private final SearchPlugin.ExecutorServiceProvider indexSearcherExecutorProvider; - private final Collection concurrentSearchDeciders; + private final Collection concurrentSearchDeciderFactories; /** * Constructs a new SearchModule object @@ -348,25 +348,23 @@ public SearchModule(Settings settings, List plugins) { queryPhaseSearcher = registerQueryPhaseSearcher(plugins); indexSearcherExecutorProvider = registerIndexSearcherExecutorProvider(plugins); namedWriteables.addAll(SortValue.namedWriteables()); - concurrentSearchDeciders = registerConcurrentSearchDeciders(plugins); + concurrentSearchDeciderFactories = registerConcurrentSearchDeciderFactories(plugins); } - private Collection registerConcurrentSearchDeciders(List plugins) { - List concurrentSearchDeciders = new ArrayList<>(); + private Collection registerConcurrentSearchDeciderFactories(List plugins) { + List concurrentSearchDeciderFactories = new ArrayList<>(); for (SearchPlugin plugin : plugins) { - ConcurrentSearchDecider decider = plugin.getConcurrentSearchDecider(); - if (decider != null) { - concurrentSearchDeciders.add(decider); - } + final Optional deciderFactory = plugin.getConcurrentSearchRequestDeciderFactory(); + deciderFactory.ifPresent(concurrentSearchDeciderFactories::add); } - return concurrentSearchDeciders; + return concurrentSearchDeciderFactories; } /** - * Returns the concurrent search deciders that the plugins have registered + * Returns the concurrent search decider factories that the plugins have registered */ - public Collection getConcurrentSearchDeciders() { - return concurrentSearchDeciders; + public Collection getConcurrentSearchRequestDeciderFactories() { + return concurrentSearchDeciderFactories; } public List getNamedWriteables() { diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index e2a804a674d8f..40afdbfbdaa9e 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -104,7 +104,7 @@ import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.collapse.CollapseContext; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.dfs.DfsPhase; import org.opensearch.search.dfs.DfsSearchResult; import org.opensearch.search.fetch.FetchPhase; @@ -364,7 +364,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final QueryPhase queryPhase; private final FetchPhase fetchPhase; - private final Collection concurrentSearchDeciders; + private final Collection concurrentSearchDeciderFactories; private volatile long defaultKeepAlive; @@ -410,7 +410,7 @@ public SearchService( CircuitBreakerService circuitBreakerService, Executor indexSearcherExecutor, TaskResourceTrackingService taskResourceTrackingService, - Collection concurrentSearchDeciders + Collection concurrentSearchDeciderFactories ) { Settings settings = clusterService.getSettings(); this.threadPool = threadPool; @@ -466,7 +466,7 @@ public SearchService( allowDerivedField = CLUSTER_ALLOW_DERIVED_FIELD_SETTING.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_ALLOW_DERIVED_FIELD_SETTING, this::setAllowDerivedField); - this.concurrentSearchDeciders = concurrentSearchDeciders; + this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories; } private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) { @@ -1167,7 +1167,7 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear validate, indexSearcherExecutor, this::aggReduceContextBuilder, - concurrentSearchDeciders + concurrentSearchDeciderFactories ); // we clone the query shard context here just for rewriting otherwise we // might end up with incorrect state since we are using now() or script services diff --git a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecision.java b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecision.java index 2a30413eff9c8..4ac47221856d1 100644 --- a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecision.java +++ b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecision.java @@ -13,7 +13,7 @@ import java.util.Collection; /** - * This Class defines the decisions that a {@link ConcurrentSearchDecider#getConcurrentSearchDecision} can return. + * This Class defines the decisions that a {@link ConcurrentSearchRequestDecider#getConcurrentSearchDecision} can return. * */ @ExperimentalApi diff --git a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecider.java b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchRequestDecider.java similarity index 50% rename from server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecider.java rename to server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchRequestDecider.java index 9c588bb45b4ec..ec40527314454 100644 --- a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchDecider.java +++ b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchRequestDecider.java @@ -12,17 +12,21 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.query.QueryBuilder; +import java.util.Optional; + /** - * {@link ConcurrentSearchDecider} allows pluggable way to evaluate if a query in the search request + * {@link ConcurrentSearchRequestDecider} allows pluggable way to evaluate if a query in the search request * can use concurrent segment search using the passed in queryBuilders from query tree and index settings * on a per shard request basis. - * Implementations can also opt out of the evaluation process for certain indices based on the index settings. - * For all the deciders which can evaluate query tree for an index, its evaluateForQuery method - * will be called for each node in the query tree. After traversing of the query tree is completed, the final - * decision from the deciders will be obtained using {@link ConcurrentSearchDecider#getConcurrentSearchDecision} + * Implementations will need to implement the Factory interface that can be used to create the ConcurrentSearchRequestDecider + * This factory will be called on each shard search request to create the ConcurrentSearchRequestDecider and get the + * concurrent search decision from the created decider on a per-request basis. + * For all the deciders the evaluateForQuery method will be called for each node in the query tree. + * After traversing of the query tree is completed, the final decision from the deciders will be + * obtained using {@link ConcurrentSearchRequestDecider#getConcurrentSearchDecision} */ @ExperimentalApi -public abstract class ConcurrentSearchDecider { +public abstract class ConcurrentSearchRequestDecider { /** * Evaluate for the passed in queryBuilder node in the query tree of the search request @@ -31,14 +35,6 @@ public abstract class ConcurrentSearchDecider { */ public abstract void evaluateForQuery(QueryBuilder queryBuilder, IndexSettings indexSettings); - /** - * Provides a way for deciders to opt out of decision-making process for certain requests based on - * index settings. - * Return true if interested in decision making for index, - * false, otherwise - */ - public abstract boolean canEvaluateForIndex(IndexSettings indexSettings); - /** * Provide the final decision for concurrent search based on all evaluations * Plugins may need to maintain internal state of evaluations to provide a final decision @@ -47,4 +43,16 @@ public abstract class ConcurrentSearchDecider { */ public abstract ConcurrentSearchDecision getConcurrentSearchDecision(); + /** + * Factory interface that can be implemented to create the ConcurrentSearchRequestDecider object. + * Implementations can use the passed in indexSettings to decide whether to create the decider object or + * return {@link Optional#empty()}. + */ + @ExperimentalApi + public interface Factory { + default Optional create(IndexSettings indexSettings) { + return Optional.empty(); + } + } + } diff --git a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchVisitor.java b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchVisitor.java index 12ba1b2a9cc5f..d1a4fa982dc7e 100644 --- a/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchVisitor.java +++ b/server/src/main/java/org/opensearch/search/deciders/ConcurrentSearchVisitor.java @@ -19,15 +19,15 @@ /** * Class to traverse the QueryBuilder tree and invoke the - * {@link ConcurrentSearchDecider#evaluateForQuery} at each node of the query tree + * {@link ConcurrentSearchRequestDecider#evaluateForQuery} at each node of the query tree */ @ExperimentalApi public class ConcurrentSearchVisitor implements QueryBuilderVisitor { - private final Set deciders; + private final Set deciders; private final IndexSettings indexSettings; - public ConcurrentSearchVisitor(Set concurrentSearchVisitorDeciders, IndexSettings idxSettings) { + public ConcurrentSearchVisitor(Set concurrentSearchVisitorDeciders, IndexSettings idxSettings) { Objects.requireNonNull(concurrentSearchVisitorDeciders, "Concurrent search deciders cannot be null"); deciders = concurrentSearchVisitorDeciders; indexSettings = idxSettings; diff --git a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java index 7e213218eb97b..55b30d5068daa 100644 --- a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java @@ -76,8 +76,8 @@ import org.opensearch.search.aggregations.MultiBucketConsumerService; import org.opensearch.search.aggregations.SearchContextAggregations; import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.search.deciders.ConcurrentSearchDecider; import org.opensearch.search.deciders.ConcurrentSearchDecision; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.LegacyReaderContext; import org.opensearch.search.internal.PitReaderContext; @@ -96,6 +96,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -984,14 +986,34 @@ protected Engine.Searcher acquireSearcherInternal(String source) { // Case4: multiple deciders are registered and all of them opt out of decision-making // with supported agg query so concurrent path is used - ConcurrentSearchDecider decider1 = mock(ConcurrentSearchDecider.class); - when(decider1.canEvaluateForIndex(any())).thenReturn(false); - ConcurrentSearchDecider decider2 = mock(ConcurrentSearchDecider.class); - when(decider2.canEvaluateForIndex(any())).thenReturn(false); + ConcurrentSearchRequestDecider decider1 = mock(ConcurrentSearchRequestDecider.class); - Collection concurrentSearchDeciders = new ArrayList<>(); - concurrentSearchDeciders.add(decider1); - concurrentSearchDeciders.add(decider2); + ConcurrentSearchRequestDecider decider2 = mock(ConcurrentSearchRequestDecider.class); + + ConcurrentSearchRequestDecider.Factory factory1 = new ConcurrentSearchRequestDecider.Factory() { + @Override + public Optional create(IndexSettings indexSettings) { + return Optional.ofNullable(decider1); + } + }; + + ConcurrentSearchRequestDecider.Factory factory2 = new ConcurrentSearchRequestDecider.Factory() { + @Override + public Optional create(IndexSettings indexSettings) { + return Optional.ofNullable(decider2); + } + }; + ConcurrentSearchRequestDecider.Factory factory3 = new ConcurrentSearchRequestDecider.Factory() { + @Override + public Optional create(IndexSettings indexSettings) { + return Optional.empty(); + } + }; + + List concurrentSearchRequestDeciders = new ArrayList<>(); + concurrentSearchRequestDeciders.add(factory1); + concurrentSearchRequestDeciders.add(factory2); + concurrentSearchRequestDeciders.add(factory3); context = new DefaultSearchContext( readerContext, @@ -1007,7 +1029,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) { false, executor, null, - concurrentSearchDeciders + concurrentSearchRequestDeciders ); // create a supported agg operation context.aggregations(mockAggregations); @@ -1021,15 +1043,9 @@ protected Engine.Searcher acquireSearcherInternal(String source) { // Case5: multiple deciders are registered and one of them returns ConcurrentSearchDecision.DecisionStatus.NO // use non-concurrent path even if query contains supported agg - when(decider1.canEvaluateForIndex(any())).thenReturn(true); when(decider1.getConcurrentSearchDecision()).thenReturn( new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO, "disable concurrent search") ); - when(decider2.canEvaluateForIndex(any())).thenReturn(false); - - concurrentSearchDeciders.clear(); - concurrentSearchDeciders.add(decider1); - concurrentSearchDeciders.add(decider2); // create a source so that query tree is parsed by visitor SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); @@ -1051,7 +1067,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) { false, executor, null, - concurrentSearchDeciders + concurrentSearchRequestDeciders ); // create a supported agg operation @@ -1067,20 +1083,17 @@ protected Engine.Searcher acquireSearcherInternal(String source) { // Case6: multiple deciders are registered and first decider returns ConcurrentSearchDecision.DecisionStatus.YES // while second decider returns ConcurrentSearchDecision.DecisionStatus.NO // use non-concurrent path even if query contains supported agg - when(decider1.canEvaluateForIndex(any())).thenReturn(true); + when(decider1.getConcurrentSearchDecision()).thenReturn( new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.YES, "enable concurrent search") ); - when(decider2.canEvaluateForIndex(any())).thenReturn(true); + when(decider2.getConcurrentSearchDecision()).thenReturn( new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO, "disable concurrent search") ); - concurrentSearchDeciders.clear(); - concurrentSearchDeciders.add(decider1); - concurrentSearchDeciders.add(decider2); - // create a source so that query tree is parsed by visitor + when(shardSearchRequest.source()).thenReturn(sourceBuilder); context = new DefaultSearchContext( readerContext, @@ -1096,7 +1109,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) { false, executor, null, - concurrentSearchDeciders + concurrentSearchRequestDeciders ); // create a supported agg operation @@ -1111,22 +1124,19 @@ protected Engine.Searcher acquireSearcherInternal(String source) { // Case7: multiple deciders are registered and all return ConcurrentSearchDecision.DecisionStatus.NO_OP // but un-supported agg query is present, use non-concurrent path - when(decider1.canEvaluateForIndex(any())).thenReturn(true); + when(decider1.getConcurrentSearchDecision()).thenReturn( new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "noop") ); - when(decider2.canEvaluateForIndex(any())).thenReturn(true); + when(decider2.getConcurrentSearchDecision()).thenReturn( new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "noop") ); when(mockAggregations.factories().allFactoriesSupportConcurrentSearch()).thenReturn(false); - concurrentSearchDeciders.clear(); - concurrentSearchDeciders.add(decider1); - concurrentSearchDeciders.add(decider2); - // create a source so that query tree is parsed by visitor + when(shardSearchRequest.source()).thenReturn(sourceBuilder); context = new DefaultSearchContext( readerContext, @@ -1142,7 +1152,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) { false, executor, null, - concurrentSearchDeciders + concurrentSearchRequestDeciders ); // create a supported agg operation diff --git a/server/src/test/java/org/opensearch/search/SearchModuleTests.java b/server/src/test/java/org/opensearch/search/SearchModuleTests.java index 71bbe7c718f47..9e8e7afe332f1 100644 --- a/server/src/test/java/org/opensearch/search/SearchModuleTests.java +++ b/server/src/test/java/org/opensearch/search/SearchModuleTests.java @@ -41,6 +41,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.IndexSettings; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.query.QueryShardContext; @@ -69,7 +70,7 @@ import org.opensearch.search.aggregations.support.ValuesSourceConfig; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.aggregations.support.ValuesSourceType; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.fetch.FetchSubPhase; import org.opensearch.search.fetch.subphase.ExplainPhase; import org.opensearch.search.fetch.subphase.highlight.CustomHighlighter; @@ -501,12 +502,12 @@ public Optional getIndexSearcherExecutorProvider() { expectThrows(IllegalStateException.class, () -> new SearchModule(Settings.EMPTY, searchPlugins)); } - public void testRegisterConcurrentSearchDecidersNoExternalPlugins() { + public void testRegisterConcurrentSearchRequestDecidersNoExternalPlugins() { SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList()); - assertEquals(searchModule.getConcurrentSearchDeciders().size(), 0); + assertEquals(searchModule.getConcurrentSearchRequestDeciderFactories().size(), 0); } - public void testRegisterConcurrentSearchDecidersExternalPluginsWithNoDeciders() { + public void testRegisterConcurrentSearchRequestDecidersExternalPluginsWithNoDeciders() { SearchPlugin plugin1 = new SearchPlugin() { @Override public Optional getIndexSearcherExecutorProvider() { @@ -521,10 +522,10 @@ public Optional getIndexSearcherExecutorProvider() { searchPlugins.add(plugin2); SearchModule searchModule = new SearchModule(Settings.EMPTY, searchPlugins); - assertEquals(searchModule.getConcurrentSearchDeciders().size(), 0); + assertEquals(searchModule.getConcurrentSearchRequestDeciderFactories().size(), 0); } - public void testRegisterConcurrentSearchDecidersExternalPluginsWithDeciders() { + public void testRegisterConcurrentSearchRequestDecidersExternalPluginsWithDeciders() { SearchPlugin pluginDecider1 = new SearchPlugin() { @Override public Optional getIndexSearcherExecutorProvider() { @@ -532,15 +533,25 @@ public Optional getIndexSearcherExecutorProvider() { } @Override - public ConcurrentSearchDecider getConcurrentSearchDecider() { - return mock(ConcurrentSearchDecider.class); + public Optional getConcurrentSearchRequestDeciderFactory() { + return Optional.of(new ConcurrentSearchRequestDecider.Factory() { + @Override + public Optional create(IndexSettings indexSettings) { + return Optional.of(mock(ConcurrentSearchRequestDecider.class)); + } + }); } }; SearchPlugin pluginDecider2 = new SearchPlugin() { @Override - public ConcurrentSearchDecider getConcurrentSearchDecider() { - return mock(ConcurrentSearchDecider.class); + public Optional getConcurrentSearchRequestDeciderFactory() { + return Optional.of(new ConcurrentSearchRequestDecider.Factory() { + @Override + public Optional create(IndexSettings indexSettings) { + return Optional.of(mock(ConcurrentSearchRequestDecider.class)); + } + }); } }; @@ -549,23 +560,7 @@ public ConcurrentSearchDecider getConcurrentSearchDecider() { searchPlugins.add(pluginDecider2); SearchModule searchModule = new SearchModule(Settings.EMPTY, searchPlugins); - assertEquals(searchModule.getConcurrentSearchDeciders().size(), 2); - } - - public void testRegisterConcurrentSearchDecidersPluginWithNullDecider() { - SearchPlugin pluginWithNullDecider = new SearchPlugin() { - @Override - public ConcurrentSearchDecider getConcurrentSearchDecider() { - return null; - } - }; - - List searchPlugins = new ArrayList<>(); - searchPlugins.add(pluginWithNullDecider); - SearchModule searchModule = new SearchModule(Settings.EMPTY, searchPlugins); - // null decider is filtered out, so 0 deciders - assertEquals(searchModule.getConcurrentSearchDeciders().size(), 0); - + assertEquals(searchModule.getConcurrentSearchRequestDeciderFactories().size(), 2); } private static final String[] NON_DEPRECATED_QUERIES = new String[] { diff --git a/test/framework/src/main/java/org/opensearch/node/MockNode.java b/test/framework/src/main/java/org/opensearch/node/MockNode.java index 09df9b85320f0..97c06962ca2e7 100644 --- a/test/framework/src/main/java/org/opensearch/node/MockNode.java +++ b/test/framework/src/main/java/org/opensearch/node/MockNode.java @@ -57,7 +57,7 @@ import org.opensearch.script.ScriptService; import org.opensearch.search.MockSearchService; import org.opensearch.search.SearchService; -import org.opensearch.search.deciders.ConcurrentSearchDecider; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.query.QueryPhase; import org.opensearch.tasks.TaskResourceTrackingService; @@ -158,7 +158,7 @@ protected SearchService newSearchService( CircuitBreakerService circuitBreakerService, Executor indexSearcherExecutor, TaskResourceTrackingService taskResourceTrackingService, - Collection concurrentSearchDecidersList + Collection concurrentSearchDeciderFactories ) { if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) { return super.newSearchService( @@ -173,7 +173,7 @@ protected SearchService newSearchService( circuitBreakerService, indexSearcherExecutor, taskResourceTrackingService, - concurrentSearchDecidersList + concurrentSearchDeciderFactories ); } return new MockSearchService( diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index ec88002317284..7adff82e72245 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2143,6 +2143,17 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception } } + /** + * Returns the name of all the cluster managers in the cluster + */ + public Set getClusterManagerNames() { + return nodes.entrySet() + .stream() + .filter(entry -> CLUSTER_MANAGER_NODE_PREDICATE.test(entry.getValue())) + .map(entry -> entry.getKey()) + .collect(Collectors.toSet()); + } + /** * Returns the name of the current cluster-manager node in the cluster. */