diff --git a/CHANGELOG.md b/CHANGELOG.md index ab37a87ebc711..1d986fe5de861 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added +- Add coordinator level stats for search latency ([#8386](https://github.com/opensearch-project/OpenSearch/issues/8386)) - Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681)) - Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694)) - [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index ca0fb106c2d70..0975585e2984d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -16,6 +16,7 @@ import org.opensearch.action.get.MultiGetRequest; import org.opensearch.action.get.MultiGetResponse; import org.opensearch.action.index.IndexRequestBuilder; +import org.opensearch.action.search.SearchPhaseName; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; @@ -56,9 +57,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY; import static org.opensearch.search.aggregations.AggregationBuilders.terms; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3) @@ -74,6 +77,7 @@ public void testSearchWithWRRShardRouting() throws IOException { .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone" + ".values", "a,b,c") .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") .put("cluster.routing.weighted.fail_open", false) + .put(SEARCH_REQUEST_STATS_ENABLED_KEY, true) .build(); logger.info("--> starting 6 nodes on different zones"); @@ -180,12 +184,39 @@ public void testSearchWithWRRShardRouting() throws IOException { assertFalse(!hitNodes.contains(nodeId)); } nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet(); + int num = 0; + int coordNumber = 0; for (NodeStats stat : nodeStats.getNodes()) { SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal(); + if (searchStats.getRequestStatsLongHolder() + .getRequestStatsHolder() + .get(SearchPhaseName.QUERY.getName()) + .getTimeInMillis() > 0) { + assertThat( + searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTotal(), + greaterThan(0L) + ); + assertThat( + searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(), + greaterThan(0L) + ); + assertThat( + searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal(), + greaterThan(0L) + ); + assertThat( + searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal(), + greaterThan(0L) + ); + coordNumber += 1; + } Assert.assertTrue(searchStats.getQueryCount() > 0L); Assert.assertTrue(searchStats.getFetchCount() > 0L); + num++; } + assertThat(coordNumber, greaterThan(0)); + assertThat(num, greaterThan(0)); } private Map> setupCluster(int nodeCountPerAZ, Settings commonSettings) { diff --git a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java index 23d48b173a3db..253a8b2b14824 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java @@ -37,6 +37,7 @@ import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.search.SearchPhaseName; import org.opensearch.action.search.SearchResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.routing.GroupShardsIterator; @@ -63,6 +64,7 @@ import java.util.Set; import java.util.function.Function; +import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.index.query.QueryBuilders.matchAllQuery; @@ -78,7 +80,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; -@OpenSearchIntegTestCase.ClusterScope(minNumDataNodes = 2) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, minNumDataNodes = 2) public class SearchStatsIT extends ParameterizedOpenSearchIntegTestCase { public SearchStatsIT(Settings dynamicSettings) { @@ -126,6 +128,11 @@ public void testSimpleStats() throws Exception { assertThat(numNodes, greaterThanOrEqualTo(2)); final int shardsIdx1 = randomIntBetween(1, 10); // we make sure each node gets at least a single shard... final int shardsIdx2 = Math.max(numNodes - shardsIdx1, randomIntBetween(1, 10)); + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(SEARCH_REQUEST_STATS_ENABLED_KEY, true).build()) + .get(); assertThat(numNodes, lessThanOrEqualTo(shardsIdx1 + shardsIdx2)); assertAcked( prepareCreate("test1").setSettings( @@ -188,20 +195,40 @@ public void testSimpleStats() throws Exception { Set nodeIdsWithIndex = nodeIdsWithIndex("test1", "test2"); int num = 0; + int numOfCoordinators = 0; + for (NodeStats stat : nodeStats.getNodes()) { Stats total = stat.getIndices().getSearch().getTotal(); + if (total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTimeInMillis() > 0) { + assertThat( + total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(), + greaterThan(0L) + ); + assertEquals( + iters, + total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal() + ); + assertEquals( + iters, + total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal() + ); + assertEquals( + iters, + total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal() + ); + numOfCoordinators += 1; + } if (nodeIdsWithIndex.contains(stat.getNode().getId())) { assertThat(total.getQueryCount(), greaterThan(0L)); assertThat(total.getQueryTimeInMillis(), greaterThan(0L)); num++; } else { - assertThat(total.getQueryCount(), equalTo(0L)); + assertThat(total.getQueryCount(), greaterThanOrEqualTo(0L)); assertThat(total.getQueryTimeInMillis(), equalTo(0L)); } } - + assertThat(numOfCoordinators, greaterThan(0)); assertThat(num, greaterThan(0)); - } private Set nodeIdsWithIndex(String... indices) { diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index ee8aa10577956..1c0a1280ad550 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -65,6 +65,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -107,7 +108,6 @@ abstract class AbstractSearchAsyncAction exten private final AtomicInteger skippedOps = new AtomicInteger(); private final TransportSearchAction.SearchTimeProvider timeProvider; private final SearchResponse.Clusters clusters; - protected final GroupShardsIterator toSkipShardsIts; protected final GroupShardsIterator shardsIts; private final int expectedTotalOps; @@ -116,8 +116,12 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode = new ConcurrentHashMap<>(); private final boolean throttleConcurrentRequests; + private SearchPhase currentPhase; + private final List releasables = new ArrayList<>(); + private Optional searchRequestOperationsListener; + AbstractSearchAsyncAction( String name, Logger logger, @@ -135,7 +139,8 @@ abstract class AbstractSearchAsyncAction exten SearchTask task, SearchPhaseResults resultConsumer, int maxConcurrentRequestsPerNode, - SearchResponse.Clusters clusters + SearchResponse.Clusters clusters, + SearchRequestOperationsListener searchRequestOperationsListener ) { super(name); final List toSkipIterators = new ArrayList<>(); @@ -171,6 +176,7 @@ abstract class AbstractSearchAsyncAction exten this.indexRoutings = indexRoutings; this.results = resultConsumer; this.clusters = clusters; + this.searchRequestOperationsListener = Optional.ofNullable(searchRequestOperationsListener); } @Override @@ -371,6 +377,7 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha : OpenSearchException.guessRootCauses(shardSearchFailures[0].getCause())[0]; logger.debug(() -> new ParameterizedMessage("All shards failed for phase: [{}]", getName()), cause); onPhaseFailure(currentPhase, "all shards failed", cause); + } else { Boolean allowPartialResults = request.allowPartialSearchResults(); assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults"; @@ -419,13 +426,24 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha clusterState.version() ); } + onPhaseEnd(); executePhase(nextPhase); } } + private void onPhaseEnd() { + this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseEnd(this); }); + } + + private void onPhaseStart(SearchPhase phase) { + setCurrentPhase(phase); + this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseStart(this); }); + } + private void executePhase(SearchPhase phase) { try { - phase.run(); + onPhaseStart(phase); + phase.recordAndRun(); } catch (Exception e) { if (logger.isDebugEnabled()) { logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e); @@ -603,6 +621,14 @@ private void successfulShardExecution(SearchShardIterator shardsIt) { } } + public SearchPhase getCurrentPhase() { + return currentPhase; + } + + private void setCurrentPhase(SearchPhase phase) { + currentPhase = phase; + } + @Override public final int getNumShards() { return results.getNumShards(); @@ -670,10 +696,13 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At } listener.onResponse(buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId)); } + onPhaseEnd(); + setCurrentPhase(null); } @Override public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) { + this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> searchRequestOperations.onPhaseFailure(this)); raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures())); } diff --git a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java index 6c3ee652de2de..ae481736ad0aa 100644 --- a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java @@ -90,7 +90,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction, SearchPhase> phaseFactory, - SearchResponse.Clusters clusters + SearchResponse.Clusters clusters, + SearchRequestOperationsListener searchRequestOperationsListener ) { // We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests super( @@ -110,7 +111,8 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction(shardsIts.size()), request.getMaxConcurrentShardRequests(), - clusters + clusters, + searchRequestOperationsListener ); this.queryPhaseResultConsumer = queryPhaseResultConsumer; this.searchPhaseController = searchPhaseController; diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhase.java b/server/src/main/java/org/opensearch/action/search/SearchPhase.java index 50b0cd8e01c1d..1c7b3c1f1563c 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhase.java @@ -42,13 +42,23 @@ * * @opensearch.internal */ -abstract class SearchPhase implements CheckedRunnable { +public abstract class SearchPhase implements CheckedRunnable { private final String name; + private long startTimeInNanos; protected SearchPhase(String name) { this.name = Objects.requireNonNull(name, "name must not be null"); } + public long getStartTimeInNanos() { + return startTimeInNanos; + } + + public void recordAndRun() throws IOException { + this.startTimeInNanos = System.nanoTime(); + run(); + } + /** * Returns the phases name. */ diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java b/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java index 4ffd5521793f6..45d39a6f85ea2 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhaseContext.java @@ -73,6 +73,8 @@ public interface SearchPhaseContext extends Executor { */ SearchRequest getRequest(); + SearchPhase getCurrentPhase(); + /** * Builds and sends the final search response back to the user. * diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhaseName.java b/server/src/main/java/org/opensearch/action/search/SearchPhaseName.java index b6f842cf2cce1..4c0fe3ac06326 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhaseName.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhaseName.java @@ -13,6 +13,7 @@ * @opensearch.internal */ public enum SearchPhaseName { + DFS_PRE_QUERY("dfs_pre_query"), QUERY("query"), FETCH("fetch"), DFS_QUERY("dfs_query"), diff --git a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java index f75ab2554e693..ca5ad087d3089 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -81,10 +81,11 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listeners; + private final Logger logger; + + public CompositeListener(List listeners, Logger logger) { + this.listeners = listeners; + this.logger = logger; + } + + @Override + public void onPhaseStart(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + listener.onPhaseStart(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPhaseStart listener [{}] failed", listener), e); + } + } + } + + @Override + public void onPhaseEnd(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + listener.onPhaseEnd(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPhaseEnd listener [{}] failed", listener), e); + } + } + } + + @Override + public void onPhaseFailure(SearchPhaseContext context) { + for (SearchRequestOperationsListener listener : listeners) { + try { + listener.onPhaseFailure(context); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e); + } + } + } + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java new file mode 100644 index 0000000000000..ad299c11b987d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.common.inject.Inject; +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.common.metrics.MeanMetric; + +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Request level search stats to track coordinator level node search latencies + * + * @opensearch.internal + */ +public final class SearchRequestStats implements SearchRequestOperationsListener { + Map phaseStatsMap = new EnumMap<>(SearchPhaseName.class); + + @Inject + public SearchRequestStats() { + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + phaseStatsMap.put(searchPhaseName, new StatsHolder()); + } + } + + public long getPhaseCurrent(SearchPhaseName searchPhaseName) { + return phaseStatsMap.get(searchPhaseName).current.count(); + } + + public long getPhaseTotal(SearchPhaseName searchPhaseName) { + return phaseStatsMap.get(searchPhaseName).total.count(); + } + + public long getPhaseMetric(SearchPhaseName searchPhaseName) { + return phaseStatsMap.get(searchPhaseName).timing.sum(); + } + + @Override + public void onPhaseStart(SearchPhaseContext context) { + phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc(); + } + + @Override + public void onPhaseEnd(SearchPhaseContext context) { + StatsHolder phaseStats = phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()); + phaseStats.current.dec(); + phaseStats.total.inc(); + phaseStats.timing.inc(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos())); + } + + @Override + public void onPhaseFailure(SearchPhaseContext context) { + phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); + } + + /** + * Holder of statistics values + * + * @opensearch.internal + */ + + public static final class StatsHolder { + CounterMetric current = new CounterMetric(); + CounterMetric total = new CounterMetric(); + MeanMetric timing = new MeanMetric(); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 25ec0fc57d19f..cff1005beff27 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -67,6 +67,7 @@ import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.indices.breaker.CircuitBreakerService; @@ -145,6 +146,14 @@ public class TransportSearchAction extends HandledTransportAction SEARCH_REQUEST_STATS_ENABLED = Setting.boolSetting( + SEARCH_REQUEST_STATS_ENABLED_KEY, + false, + Property.Dynamic, + Property.NodeScope + ); + private final NodeClient client; private final ThreadPool threadPool; private final ClusterService clusterService; @@ -157,6 +166,10 @@ public class TransportSearchAction extends HandledTransportAction) SearchRequest::new); this.client = client; @@ -185,6 +199,13 @@ public TransportSearchAction( this.indexNameExpressionResolver = indexNameExpressionResolver; this.namedWriteableRegistry = namedWriteableRegistry; this.searchPipelineService = searchPipelineService; + this.isRequestStatsEnabled = clusterService.getClusterSettings().get(SEARCH_REQUEST_STATS_ENABLED); + clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setIsRequestStatsEnabled); + this.searchRequestStats = searchRequestStats; + } + + private void setIsRequestStatsEnabled(boolean isRequestStatsEnabled) { + this.isRequestStatsEnabled = isRequestStatsEnabled; } private Map buildPerIndexAliasFilter( @@ -311,6 +332,13 @@ public void executeRequest( SinglePhaseSearchAction phaseSearchAction, ActionListener listener ) { + final List searchListenersList = createSearchListenerList(); + final SearchRequestOperationsListener searchRequestOperationsListener; + if (!CollectionUtils.isEmpty(searchListenersList)) { + searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); + } else { + searchRequestOperationsListener = null; + } executeRequest(task, searchRequest, new SearchAsyncActionProvider() { @Override public AbstractSearchAsyncAction asyncSearchAction( @@ -346,7 +374,8 @@ public AbstractSearchAsyncAction asyncSearchAction( task, new ArraySearchPhaseResults<>(shardsIts.size()), searchRequest.getMaxConcurrentShardRequests(), - clusters + clusters, + searchRequestOperationsListener ) { @Override protected void executePhaseOnShard( @@ -916,9 +945,7 @@ private void executeSearch( @Nullable SearchContextId searchContext, SearchAsyncActionProvider searchAsyncActionProvider ) { - clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); - // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead // of just for the _search api @@ -968,11 +995,8 @@ private void executeSearch( indexRoutings = routingMap; } final GroupShardsIterator shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators); - failIfOverShardCountLimit(clusterService, shardIterators.size()); - Map concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState); - // optimize search type for cases where there is only one shard group to search on if (shardIterators.size() == 1) { // if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard @@ -1107,6 +1131,14 @@ AbstractSearchAsyncAction asyncSearchAction( ); } + private List createSearchListenerList() { + final List searchListenersList = new ArrayList<>(); + if (isRequestStatsEnabled) { + searchListenersList.add(searchRequestStats); + } + return searchListenersList; + } + private AbstractSearchAsyncAction searchAsyncAction( SearchTask task, SearchRequest searchRequest, @@ -1123,6 +1155,13 @@ private AbstractSearchAsyncAction searchAsyncAction ThreadPool threadPool, SearchResponse.Clusters clusters ) { + final List searchListenersList = createSearchListenerList(); + final SearchRequestOperationsListener searchRequestOperationsListener; + if (!CollectionUtils.isEmpty(searchListenersList)) { + searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); + } else { + searchRequestOperationsListener = null; + } if (preFilter) { return new CanMatchPreFilterSearchPhase( logger, @@ -1162,7 +1201,8 @@ public void run() { } }; }, - clusters + clusters, + searchRequestOperationsListener ); } else { final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults( @@ -1192,7 +1232,8 @@ public void run() { timeProvider, clusterState, task, - clusters + clusters, + searchRequestOperationsListener ); break; case QUERY_THEN_FETCH: @@ -1212,7 +1253,8 @@ public void run() { timeProvider, clusterState, task, - clusters + clusters, + searchRequestOperationsListener ); break; default: diff --git a/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java b/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java index 5c48c1f772ff0..cb181840406a5 100644 --- a/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java +++ b/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java @@ -62,4 +62,5 @@ public void dec(long n) { public long count() { return counter.sum(); } + } diff --git a/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java b/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java index 33f12c8cb42d3..359facdce633b 100644 --- a/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java +++ b/server/src/main/java/org/opensearch/common/metrics/MeanMetric.java @@ -79,4 +79,5 @@ public void clear() { counter.reset(); sum.reset(); } + } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index ad7d41b0c268a..9d176c45f69ae 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -375,6 +375,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING, + TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER, RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java index 76b216304b3b7..1f9144b28f286 100644 --- a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java +++ b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java @@ -33,6 +33,8 @@ package org.opensearch.index.search.stats; import org.opensearch.Version; +import org.opensearch.action.search.SearchPhaseName; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.common.Nullable; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; @@ -58,10 +60,79 @@ public class SearchStats implements Writeable, ToXContentFragment { /** - * Statistics for search + * Holds statistic values for a particular phase. * * @opensearch.internal */ + public static class PhaseStatsLongHolder implements Writeable { + + long current; + long total; + long timeInMillis; + + public long getCurrent() { + return current; + } + + public long getTotal() { + return total; + } + + public long getTimeInMillis() { + return timeInMillis; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(current); + out.writeVLong(total); + out.writeVLong(timeInMillis); + } + + PhaseStatsLongHolder() { + this(0, 0, 0); + } + + PhaseStatsLongHolder(long current, long total, long timeInMillis) { + this.current = current; + this.total = total; + this.timeInMillis = timeInMillis; + } + + PhaseStatsLongHolder(StreamInput in) throws IOException { + this.current = in.readVLong(); + this.total = in.readVLong(); + this.timeInMillis = in.readVLong(); + } + + } + + /** + * Holds requests stats for different phases. + * + * @opensearch.internal + */ + public static class RequestStatsLongHolder { + + Map requestStatsHolder = new HashMap<>(); + + public Map getRequestStatsHolder() { + return requestStatsHolder; + } + + RequestStatsLongHolder() { + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + requestStatsHolder.put(searchPhaseName.getName(), new PhaseStatsLongHolder()); + } + } + } + + /** + * Holder of statistics values + * + * @opensearch.internal + */ + public static class Stats implements Writeable, ToXContentFragment { private long queryCount; @@ -89,6 +160,13 @@ public static class Stats implements Writeable, ToXContentFragment { private long pitTimeInMillis; private long pitCurrent; + @Nullable + private RequestStatsLongHolder requestStatsLongHolder; + + public RequestStatsLongHolder getRequestStatsLongHolder() { + return requestStatsLongHolder; + } + private Stats() { // for internal use, initializes all counts to 0 } @@ -114,6 +192,7 @@ public Stats( long suggestTimeInMillis, long suggestCurrent ) { + this.requestStatsLongHolder = new RequestStatsLongHolder(); this.queryCount = queryCount; this.queryTimeInMillis = queryTimeInMillis; this.queryCurrent = queryCurrent; @@ -163,6 +242,10 @@ private Stats(StreamInput in) throws IOException { pitCurrent = in.readVLong(); } + if (in.getVersion().onOrAfter(Version.V_2_11_0)) { + this.requestStatsLongHolder = new RequestStatsLongHolder(); + requestStatsLongHolder.requestStatsHolder = in.readMap(StreamInput::readString, PhaseStatsLongHolder::new); + } if (in.getVersion().onOrAfter(Version.V_2_10_0)) { concurrentQueryCount = in.readVLong(); concurrentQueryTimeInMillis = in.readVLong(); @@ -354,6 +437,17 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(pitCurrent); } + if (out.getVersion().onOrAfter(Version.V_2_11_0)) { + if (requestStatsLongHolder == null) { + requestStatsLongHolder = new RequestStatsLongHolder(); + } + out.writeMap( + requestStatsLongHolder.getRequestStatsHolder(), + StreamOutput::writeString, + (stream, stats) -> stats.writeTo(stream) + ); + } + if (out.getVersion().onOrAfter(Version.V_2_10_0)) { out.writeVLong(concurrentQueryCount); out.writeVLong(concurrentQueryTimeInMillis); @@ -391,6 +485,22 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.humanReadableField(Fields.SUGGEST_TIME_IN_MILLIS, Fields.SUGGEST_TIME, getSuggestTime()); builder.field(Fields.SUGGEST_CURRENT, suggestCurrent); + if (requestStatsLongHolder != null) { + builder.startObject(Fields.REQUEST); + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + PhaseStatsLongHolder statsLongHolder = requestStatsLongHolder.requestStatsHolder.get(searchPhaseName.getName()); + if (statsLongHolder == null) { + continue; + } + builder.startObject(searchPhaseName.getName()); + builder.humanReadableField(Fields.TIME_IN_MILLIS, Fields.TIME, new TimeValue(statsLongHolder.timeInMillis)); + builder.field(Fields.CURRENT, statsLongHolder.current); + builder.field(Fields.TOTAL, statsLongHolder.total); + builder.endObject(); + } + builder.endObject(); + } return builder; } } @@ -405,6 +515,24 @@ public SearchStats() { totalStats = new Stats(); } + // Set the different Request Stats fields in here + public void setSearchRequestStats(SearchRequestStats searchRequestStats) { + if (totalStats.requestStatsLongHolder == null) { + totalStats.requestStatsLongHolder = new RequestStatsLongHolder(); + } + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + totalStats.requestStatsLongHolder.requestStatsHolder.put( + searchPhaseName.getName(), + new PhaseStatsLongHolder( + searchRequestStats.getPhaseCurrent(searchPhaseName), + searchRequestStats.getPhaseTotal(searchPhaseName), + searchRequestStats.getPhaseMetric(searchPhaseName) + ) + ); + } + } + public SearchStats(Stats totalStats, long openContexts, @Nullable Map groupStats) { this.totalStats = totalStats; this.openContexts = openContexts; @@ -520,6 +648,12 @@ static final class Fields { static final String SUGGEST_TIME = "suggest_time"; static final String SUGGEST_TIME_IN_MILLIS = "suggest_time_in_millis"; static final String SUGGEST_CURRENT = "suggest_current"; + static final String REQUEST = "request"; + static final String TIME_IN_MILLIS = "time_in_millis"; + static final String TIME = "time"; + static final String CURRENT = "current"; + static final String TOTAL = "total"; + } @Override diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index aeec814f21e1a..45af7d2b39989 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -49,6 +49,7 @@ import org.opensearch.action.admin.indices.stats.CommonStatsFlags.Flag; import org.opensearch.action.admin.indices.stats.IndexShardStats; import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.SearchType; import org.opensearch.client.Client; import org.opensearch.cluster.ClusterState; @@ -348,6 +349,8 @@ public class IndicesService extends AbstractLifecycleComponent private volatile TimeValue clusterRemoteTranslogBufferInterval; private final FileCacheCleaner fileCacheCleaner; + private final SearchRequestStats searchRequestStats; + @Override protected void doStart() { // Start thread that will manage cleaning the field data cache periodically @@ -378,6 +381,7 @@ public IndicesService( IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, Supplier repositoriesServiceSupplier, FileCacheCleaner fileCacheCleaner, + SearchRequestStats searchRequestStats, @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) { this.settings = settings; @@ -468,6 +472,7 @@ protected void closeInternal() { clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries); this.remoteDirectoryFactory = remoteDirectoryFactory; this.translogFactorySupplier = getTranslogFactorySupplier(repositoriesServiceSupplier, threadPool, remoteStoreStatsTrackerFactory); + this.searchRequestStats = searchRequestStats; this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings()); clusterService.getClusterSettings() .addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING, this::onRefreshIntervalUpdate); @@ -591,7 +596,7 @@ public NodeIndicesStats stats(CommonStatsFlags flags) { } } - return new NodeIndicesStats(commonStats, statsByShard(this, flags)); + return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats); } Map> statsByShard(final IndicesService indicesService, final CommonStatsFlags flags) { diff --git a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java index cc3d8193dfa6b..8a7aaba2726f4 100644 --- a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java +++ b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java @@ -35,6 +35,7 @@ import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.IndexShardStats; import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.common.Nullable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -71,7 +72,6 @@ * @opensearch.internal */ public class NodeIndicesStats implements Writeable, ToXContentFragment { - private CommonStats stats; private Map> statsByShard; @@ -92,7 +92,7 @@ public NodeIndicesStats(StreamInput in) throws IOException { } } - public NodeIndicesStats(CommonStats oldStats, Map> statsByShard) { + public NodeIndicesStats(CommonStats oldStats, Map> statsByShard, SearchRequestStats searchRequestStats) { // this.stats = stats; this.statsByShard = statsByShard; @@ -105,6 +105,9 @@ public NodeIndicesStats(CommonStats oldStats, Map> } } } + if (this.stats.search != null) { + this.stats.search.setSearchRequestStats(searchRequestStats); + } } @Nullable diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 1b22c919c313c..711b32c5ade91 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -46,6 +46,7 @@ import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.SearchTransportService; import org.opensearch.action.support.TransportAction; import org.opensearch.action.update.UpdateHelper; @@ -763,6 +764,8 @@ protected Node( threadPool ); + final SearchRequestStats searchRequestStats = new SearchRequestStats(); + remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); final IndicesService indicesService = new IndicesService( settings, @@ -788,6 +791,7 @@ protected Node( remoteDirectoryFactory, repositoriesServiceReference::get, fileCacheCleaner, + searchRequestStats, remoteStoreStatsTrackerFactory ); @@ -1200,6 +1204,7 @@ protected Node( b.bind(SystemIndices.class).toInstance(systemIndices); b.bind(IdentityService.class).toInstance(identityService); b.bind(Tracer.class).toInstance(tracer); + b.bind(SearchRequestStats.class).toInstance(searchRequestStats); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); }); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 672ebea8d01f9..e3f16463a5328 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -34,6 +34,7 @@ import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.cluster.coordination.PendingClusterStateStats; import org.opensearch.cluster.coordination.PublishClusterStateStats; import org.opensearch.cluster.node.DiscoveryNode; @@ -801,8 +802,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) { NodeIndicesStats indicesStats = null; if (remoteStoreStats) { - indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>()); - + indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>(), new SearchRequestStats()); RemoteSegmentStats remoteSegmentStats = indicesStats.getSegments().getRemoteSegmentStats(); remoteSegmentStats.addUploadBytesStarted(10L); remoteSegmentStats.addUploadBytesSucceeded(10L); diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index 705479ec21fc1..f628bb3201452 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -38,8 +38,12 @@ import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.common.UUIDs; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.util.concurrent.AtomicArray; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.set.Sets; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.breaker.CircuitBreaker; +import org.opensearch.core.common.breaker.NoopCircuitBreaker; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.query.MatchAllQueryBuilder; @@ -51,7 +55,10 @@ import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.search.query.QuerySearchResult; +import org.opensearch.test.InternalAggregationTestCase; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transport; import org.junit.After; import org.junit.Before; @@ -65,6 +72,7 @@ import java.util.UUID; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -77,18 +85,21 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.mock; public class AbstractSearchAsyncActionTests extends OpenSearchTestCase { private final List> resolvedNodes = new ArrayList<>(); private final Set releasedContexts = new CopyOnWriteArraySet<>(); private ExecutorService executor; + ThreadPool threadPool; @Before @Override public void setUp() throws Exception { super.setUp(); executor = Executors.newFixedThreadPool(1); + threadPool = new TestThreadPool(getClass().getName()); } @After @@ -97,6 +108,7 @@ public void tearDown() throws Exception { super.tearDown(); executor.shutdown(); assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS)); + ThreadPool.terminate(threadPool, 5, TimeUnit.SECONDS); } private AbstractSearchAsyncAction createAction( @@ -126,6 +138,7 @@ private AbstractSearchAsyncAction createAction( final AtomicLong expected, final SearchShardIterator... shards ) { + final Runnable runnable; final TransportSearchAction.SearchTimeProvider timeProvider; if (controlled) { @@ -161,7 +174,8 @@ private AbstractSearchAsyncAction createAction( null, results, request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, SearchPhaseContext context) { @@ -313,6 +327,53 @@ public void testSendSearchResponseDisallowPartialFailures() { assertEquals(requestIds, releasedContexts); } + public void testOnPhaseFailureAndVerifyListeners() { + SearchRequestStats testListener = new SearchRequestStats(); + + final List requestOperationListeners = new ArrayList<>(List.of(testListener)); + SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners); + action.start(); + assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName())); + action.onPhaseFailure(new SearchPhase("test") { + @Override + public void run() { + + } + }, "message", null); + assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName())); + + SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction( + requestOperationListeners + ); + searchDfsQueryThenFetchAsyncAction.start(); + assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName())); + searchDfsQueryThenFetchAsyncAction.onPhaseFailure(new SearchPhase("test") { + @Override + public void run() { + + } + }, "message", null); + assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName())); + + FetchSearchPhase fetchPhase = createFetchSearchPhase(); + ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); + SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE); + searchShardIterator.resetAndSkip(); + action.skipShard(searchShardIterator); + action.executeNextPhase(action, fetchPhase); + assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName())); + action.onPhaseFailure(new SearchPhase("test") { + @Override + public void run() { + + } + }, "message", null); + assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName())); + } + public void testOnPhaseFailure() { SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false); AtomicReference exception = new AtomicReference<>(); @@ -321,6 +382,7 @@ public void testOnPhaseFailure() { List> nodeLookups = new ArrayList<>(); ArraySearchPhaseResults phaseResults = phaseResults(requestIds, nodeLookups, 0); AbstractSearchAsyncAction action = createAction(searchRequest, phaseResults, listener, false, new AtomicLong()); + action.onPhaseFailure(new SearchPhase("test") { @Override public void run() { @@ -528,6 +590,215 @@ public void onFailure(Exception e) { assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length)); } + public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedException { + SearchRequestStats testListener = new SearchRequestStats(); + final List requestOperationListeners = new ArrayList<>(List.of(testListener)); + + long delay = (randomIntBetween(1, 5)); + delay = delay * 10; + + SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners); + action.start(); + + // Verify queryPhase current metric + assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName())); + TimeUnit.MILLISECONDS.sleep(delay); + + FetchSearchPhase fetchPhase = createFetchSearchPhase(); + ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); + SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE); + searchShardIterator.resetAndSkip(); + action.skipShard(searchShardIterator); + action.executeNextPhase(action, fetchPhase); + + // Verify queryPhase total, current and latency metrics + assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName())); + assertThat(testListener.getPhaseMetric(action.getSearchPhaseName()), greaterThanOrEqualTo(delay)); + assertEquals(1, testListener.getPhaseTotal(action.getSearchPhaseName())); + + // Verify fetchPhase current metric + assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName())); + TimeUnit.MILLISECONDS.sleep(delay); + + ExpandSearchPhase expandPhase = createExpandSearchPhase(); + action.executeNextPhase(fetchPhase, expandPhase); + TimeUnit.MILLISECONDS.sleep(delay); + + // Verify fetchPhase total, current and latency metrics + assertThat(testListener.getPhaseMetric(fetchPhase.getSearchPhaseName()), greaterThanOrEqualTo(delay)); + assertEquals(1, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName())); + + assertEquals(1, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName())); + + action.executeNextPhase(expandPhase, fetchPhase); + + action.sendSearchResponse(mock(InternalSearchResponse.class), mock(String.valueOf(QuerySearchResult.class))); + assertThat(testListener.getPhaseMetric(expandPhase.getSearchPhaseName()), greaterThanOrEqualTo(delay)); + assertEquals(1, testListener.getPhaseTotal(expandPhase.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName())); + } + + public void testOnPhaseListenersWithDfsType() throws InterruptedException { + SearchRequestStats testListener = new SearchRequestStats(); + final List requestOperationListeners = new ArrayList<>(List.of(testListener)); + + SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction( + requestOperationListeners + ); + long delay = (randomIntBetween(1, 5)); + + FetchSearchPhase fetchPhase = createFetchSearchPhase(); + searchDfsQueryThenFetchAsyncAction.start(); + assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName())); + TimeUnit.MILLISECONDS.sleep(delay); + ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); + SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE); + searchShardIterator.resetAndSkip(); + + searchDfsQueryThenFetchAsyncAction.skipShard(searchShardIterator); + searchDfsQueryThenFetchAsyncAction.executeNextPhase(searchDfsQueryThenFetchAsyncAction, fetchPhase); + + assertThat(testListener.getPhaseMetric(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()), greaterThanOrEqualTo(delay)); + assertEquals(1, testListener.getPhaseTotal(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName())); + } + + private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAction( + List searchRequestOperationsListeners + ) { + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), + r -> InternalAggregationTestCase.emptyReduceContextBuilder() + ); + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); + Executor executor = OpenSearchExecutors.newDirectExecutorService(); + SearchShardIterator shards = new SearchShardIterator(null, null, Collections.emptyList(), null); + GroupShardsIterator shardsIter = new GroupShardsIterator<>(List.of(shards)); + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( + searchRequest, + executor, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), + controller, + task.getProgressListener(), + writableRegistry(), + shardsIter.size(), + exc -> {} + ); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set); + + return new SearchDfsQueryThenFetchAsyncAction( + logger, + null, + null, + null, + null, + null, + null, + executor, + resultConsumer, + searchRequest, + listener, + shardsIter, + null, + null, + task, + SearchResponse.Clusters.EMPTY, + new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger) + ); + } + + private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction( + List searchRequestOperationsListeners + ) { + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), + r -> InternalAggregationTestCase.emptyReduceContextBuilder() + ); + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); + SearchTask task = new SearchTask(0, "n/a", "n/a", () -> "test", null, Collections.emptyMap()); + Executor executor = OpenSearchExecutors.newDirectExecutorService(); + SearchShardIterator shards = new SearchShardIterator(null, null, Collections.emptyList(), null); + GroupShardsIterator shardsIter = new GroupShardsIterator<>(List.of(shards)); + QueryPhaseResultConsumer resultConsumer = new QueryPhaseResultConsumer( + searchRequest, + executor, + new NoopCircuitBreaker(CircuitBreaker.REQUEST), + controller, + task.getProgressListener(), + writableRegistry(), + shardsIter.size(), + exc -> {} + ); + AtomicReference exception = new AtomicReference<>(); + ActionListener listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set); + return new SearchQueryThenFetchAsyncAction( + logger, + null, + null, + null, + null, + null, + null, + executor, + resultConsumer, + searchRequest, + listener, + shardsIter, + null, + null, + task, + SearchResponse.Clusters.EMPTY, + new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger) + ) { + @Override + ShardSearchFailure[] buildShardFailures() { + return ShardSearchFailure.EMPTY_ARRAY; + } + + @Override + public void sendSearchResponse(InternalSearchResponse internalSearchResponse, AtomicArray queryResults) { + start(); + } + }; + } + + private FetchSearchPhase createFetchSearchPhase() { + SearchPhaseController controller = new SearchPhaseController( + writableRegistry(), + r -> InternalAggregationTestCase.emptyReduceContextBuilder() + ); + MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); + QueryPhaseResultConsumer results = controller.newSearchPhaseResults( + OpenSearchExecutors.newDirectExecutorService(), + new NoopCircuitBreaker(CircuitBreaker.REQUEST), + SearchProgressListener.NOOP, + mockSearchPhaseContext.getRequest(), + 1, + exc -> {} + ); + return new FetchSearchPhase( + results, + controller, + null, + mockSearchPhaseContext, + (searchResponse, scrollId) -> new SearchPhase("test") { + @Override + public void run() { + mockSearchPhaseContext.sendSearchResponse(searchResponse, null); + } + } + ); + } + + private ExpandSearchPhase createExpandSearchPhase() { + MockSearchPhaseContext mockSearchPhaseContext = new MockSearchPhaseContext(1); + InternalSearchResponse internalSearchResponse = new InternalSearchResponse(null, null, null, null, false, null, 1); + return new ExpandSearchPhase(mockSearchPhaseContext, internalSearchResponse, null); + } + private static final class PhaseResult extends SearchPhaseResult { PhaseResult(ShardSearchContextId contextId) { this.contextId = contextId; diff --git a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 45f00a8418d5c..43029fe57d5dd 100644 --- a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -136,7 +136,8 @@ public void run() throws IOException { latch.countDown(); } }, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ); canMatchPhase.start(); @@ -227,7 +228,8 @@ public void run() throws IOException { latch.countDown(); } }, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ); canMatchPhase.start(); @@ -317,7 +319,8 @@ public void sendCanMatch( null, new ArraySearchPhaseResults<>(iter.size()), randomIntBetween(1, 32), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { @Override @@ -344,7 +347,8 @@ protected void executePhaseOnShard( } } }, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ); canMatchPhase.start(); @@ -428,7 +432,8 @@ public void run() { latch.countDown(); } }, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ); canMatchPhase.start(); @@ -527,7 +532,8 @@ public void run() { latch.countDown(); } }, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ); canMatchPhase.start(); diff --git a/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java index e078b4a467e91..b5e1050b968ee 100644 --- a/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java @@ -99,6 +99,11 @@ public SearchRequest getRequest() { return searchRequest; } + @Override + public SearchPhase getCurrentPhase() { + return null; + } + @Override public void sendSearchResponse(InternalSearchResponse internalSearchResponse, AtomicArray queryResults) { String scrollId = getRequest().scroll() != null ? TransportSearchHelper.buildScrollId(queryResults, Version.CURRENT) : null; diff --git a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java index 830fa99f90bb9..4b94b6589c6c8 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java @@ -135,7 +135,8 @@ public void testSkipSearchShards() throws InterruptedException { null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { @Override @@ -253,7 +254,8 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { @Override @@ -370,7 +372,8 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { TestSearchResponse response = new TestSearchResponse(); @@ -492,7 +495,8 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { TestSearchResponse response = new TestSearchResponse(); @@ -605,9 +609,9 @@ public void testAllowPartialResults() throws InterruptedException { null, new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { - @Override protected void executePhaseOnShard( SearchShardIterator shardIt, diff --git a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 0e2780c195cb8..6a22a7ea2b5e4 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -214,7 +214,8 @@ public void sendExecuteQuery( timeProvider, null, task, - SearchResponse.Clusters.EMPTY + SearchResponse.Clusters.EMPTY, + null ) { @Override protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { @@ -226,6 +227,7 @@ public void run() { }; } }; + action.start(); latch.await(); assertThat(successfulOps.get(), equalTo(numShards)); diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java new file mode 100644 index 0000000000000..ef880043e863c --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SearchRequestOperationsListenerTests extends OpenSearchTestCase { + + public void testListenersAreExecuted() { + Map searchPhaseMap = new HashMap<>(); + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + searchPhaseMap.put(searchPhaseName, new SearchRequestStats.StatsHolder()); + } + SearchRequestOperationsListener testListener = new SearchRequestOperationsListener() { + + @Override + public void onPhaseStart(SearchPhaseContext context) { + searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc(); + } + + @Override + public void onPhaseEnd(SearchPhaseContext context) { + searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); + searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).total.inc(); + } + + @Override + public void onPhaseFailure(SearchPhaseContext context) { + searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); + } + }; + + int totalListeners = randomIntBetween(1, 10); + final List requestOperationListeners = new ArrayList<>(); + for (int i = 0; i < totalListeners; i++) { + requestOperationListeners.add(testListener); + } + + SearchRequestOperationsListener compositeListener = new SearchRequestOperationsListener.CompositeListener( + requestOperationListeners, + logger + ); + + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + SearchPhase searchPhase = mock(SearchPhase.class); + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + when(ctx.getCurrentPhase()).thenReturn(searchPhase); + when(searchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + compositeListener.onPhaseStart(ctx); + assertEquals(totalListeners, searchPhaseMap.get(searchPhaseName).current.count()); + } + } +} diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java new file mode 100644 index 0000000000000..f24147a8194b4 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SearchRequestStatsTests extends OpenSearchTestCase { + public void testSearchRequestPhaseFailure() { + SearchRequestStats testRequestStats = new SearchRequestStats(); + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + testRequestStats.onPhaseStart(ctx); + assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName)); + testRequestStats.onPhaseFailure(ctx); + assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); + } + } + + public void testSearchRequestStats() { + SearchRequestStats testRequestStats = new SearchRequestStats(); + + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + long tookTimeInMillis = randomIntBetween(1, 10); + testRequestStats.onPhaseStart(ctx); + long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); + when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); + assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName)); + testRequestStats.onPhaseEnd(ctx); + assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); + assertEquals(1, testRequestStats.getPhaseTotal(searchPhaseName)); + assertThat(testRequestStats.getPhaseMetric(searchPhaseName), greaterThanOrEqualTo(tookTimeInMillis)); + } + } + + public void testSearchRequestStatsOnPhaseStartConcurrently() throws InterruptedException { + SearchRequestStats testRequestStats = new SearchRequestStats(); + int numTasks = randomIntBetween(5, 50); + Thread[] threads = new Thread[numTasks * SearchPhaseName.values().length]; + Phaser phaser = new Phaser(numTasks * SearchPhaseName.values().length + 1); + CountDownLatch countDownLatch = new CountDownLatch(numTasks * SearchPhaseName.values().length); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + for (int i = 0; i < numTasks; i++) { + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + testRequestStats.onPhaseStart(ctx); + countDownLatch.countDown(); + }); + threads[i].start(); + } + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + assertEquals(numTasks, testRequestStats.getPhaseCurrent(searchPhaseName)); + } + } + + public void testSearchRequestStatsOnPhaseEndConcurrently() throws InterruptedException { + SearchRequestStats testRequestStats = new SearchRequestStats(); + int numTasks = randomIntBetween(5, 50); + Thread[] threads = new Thread[numTasks * SearchPhaseName.values().length]; + Phaser phaser = new Phaser(numTasks * SearchPhaseName.values().length + 1); + CountDownLatch countDownLatch = new CountDownLatch(numTasks * SearchPhaseName.values().length); + Map searchPhaseNameLongMap = new HashMap<>(); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + long tookTimeInMillis = randomIntBetween(1, 10); + long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); + when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); + for (int i = 0; i < numTasks; i++) { + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + testRequestStats.onPhaseEnd(ctx); + countDownLatch.countDown(); + }); + threads[i].start(); + } + searchPhaseNameLongMap.put(searchPhaseName, tookTimeInMillis); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + assertEquals(numTasks, testRequestStats.getPhaseTotal(searchPhaseName)); + assertThat( + testRequestStats.getPhaseMetric(searchPhaseName), + greaterThanOrEqualTo((searchPhaseNameLongMap.get(searchPhaseName) * numTasks)) + ); + } + } + + public void testSearchRequestStatsOnPhaseFailureConcurrently() throws InterruptedException { + SearchRequestStats testRequestStats = new SearchRequestStats(); + int numTasks = randomIntBetween(5, 50); + Thread[] threads = new Thread[numTasks * SearchPhaseName.values().length]; + Phaser phaser = new Phaser(numTasks * SearchPhaseName.values().length + 1); + CountDownLatch countDownLatch = new CountDownLatch(numTasks * SearchPhaseName.values().length); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + for (int i = 0; i < numTasks; i++) { + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + testRequestStats.onPhaseStart(ctx); + testRequestStats.onPhaseFailure(ctx); + countDownLatch.countDown(); + }); + threads[i].start(); + } + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java index f65b0e231b1dd..c27e4bf27327a 100644 --- a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java +++ b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java @@ -32,11 +32,20 @@ package org.opensearch.index.search.stats; +import org.opensearch.action.search.SearchPhase; +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchPhaseName; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.index.search.stats.SearchStats.Stats; import org.opensearch.test.OpenSearchTestCase; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class SearchStatsTests extends OpenSearchTestCase { @@ -63,6 +72,37 @@ public void testShardLevelSearchGroupStats() throws Exception { // adding again would then return wrong search stats (would return 4! instead of 3) searchStats1.add(searchStats2); assertStats(groupStats1.get("group1"), 3); + + long paramValue = randomIntBetween(2, 50); + + // Testing for request stats + SearchRequestStats testRequestStats = new SearchRequestStats(); + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + when(mockSearchPhase.getStartTimeInNanos()).thenReturn(System.nanoTime() - TimeUnit.SECONDS.toNanos(paramValue)); + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + for (int iterator = 0; iterator < paramValue; iterator++) { + testRequestStats.onPhaseStart(ctx); + testRequestStats.onPhaseEnd(ctx); + } + } + searchStats1.setSearchRequestStats(testRequestStats); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + assertEquals( + 0, + searchStats1.getTotal().getRequestStatsLongHolder().getRequestStatsHolder().get(searchPhaseName.getName()).current + ); + assertEquals( + paramValue, + searchStats1.getTotal().getRequestStatsLongHolder().getRequestStatsHolder().get(searchPhaseName.getName()).total + ); + assertThat( + searchStats1.getTotal().getRequestStatsLongHolder().getRequestStatsHolder().get(searchPhaseName.getName()).timeInMillis, + greaterThanOrEqualTo(paramValue) + ); + } } private static void assertStats(Stats stats, long equalTo) { @@ -87,5 +127,4 @@ private static void assertStats(Stats stats, long equalTo) { // avg_concurrency is not summed up across stats assertEquals(1, stats.getConcurrentAvgSliceCount(), 0); } - } diff --git a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java index 9be45d4e77940..6f36d22b7e17b 100644 --- a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java +++ b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java @@ -32,6 +32,8 @@ package org.opensearch.indices; +import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.test.OpenSearchTestCase; @@ -43,7 +45,9 @@ public class NodeIndicesStatsTests extends OpenSearchTestCase { public void testInvalidLevel() { - final NodeIndicesStats stats = new NodeIndicesStats(null, Collections.emptyMap()); + CommonStats oldStats = new CommonStats(); + SearchRequestStats requestStats = new SearchRequestStats(); + final NodeIndicesStats stats = new NodeIndicesStats(oldStats, Collections.emptyMap(), requestStats); final String level = randomAlphaOfLength(16); final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level)); final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> stats.toXContent(null, params)); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index d0efbd6fa0db5..09f5c1bea1a5e 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2069,6 +2069,7 @@ public void onFailure(final Exception e) { new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), repositoriesServiceReference::get, fileCacheCleaner, + null, new RemoteStoreStatsTrackerFactory(clusterService, settings) ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); @@ -2298,7 +2299,8 @@ public void onFailure(final Exception e) { namedWriteableRegistry, List.of(), client - ) + ), + null ) ); actions.put(