From ea44ef2dd12b52fda4a903db2269495a2f2d3aff Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Mon, 11 Dec 2023 17:41:36 -0800 Subject: [PATCH] Dynamically add search request operation listeners with SearchRequestListenerManager Signed-off-by: Chenyang Ji --- .../search/SearchRequestListenerManager.java | 124 +++++++++++++++++ .../SearchRequestOperationsListener.java | 15 +- .../action/search/SearchRequestSlowLog.java | 39 ++---- .../action/search/SearchRequestStats.java | 22 +-- .../action/search/TransportSearchAction.java | 77 ++--------- .../common/settings/ClusterSettings.java | 7 +- .../main/java/org/opensearch/node/Node.java | 7 +- .../cluster/node/stats/NodeStatsTests.java | 12 +- .../AbstractSearchAsyncActionTests.java | 27 +++- .../SearchRequestListenerManagerTests.java | 129 ++++++++++++++++++ .../search/SearchRequestSlowLogTests.java | 33 +++-- .../search/SearchRequestStatsTests.java | 43 +++++- .../index/search/stats/SearchStatsTests.java | 17 ++- .../indices/NodeIndicesStatsTests.java | 12 +- .../snapshots/SnapshotResiliencyTests.java | 5 +- 15 files changed, 421 insertions(+), 148 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/search/SearchRequestListenerManager.java create mode 100644 server/src/test/java/org/opensearch/action/search/SearchRequestListenerManagerTests.java diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestListenerManager.java b/server/src/main/java/org/opensearch/action/search/SearchRequestListenerManager.java new file mode 100644 index 0000000000000..b1c275555349f --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestListenerManager.java @@ -0,0 +1,124 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Setting; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + + +/** + * SearchRequestListenerManager manages listeners registered to search requests, + * and is responsible for creating the {@link SearchRequestOperationsListener.CompositeListener} + * with the all listeners enabled at cluster-level and request-level. + * + * + * @opensearch.internal + */ +public class SearchRequestListenerManager { + + private final ClusterService clusterService; + public static final String SEARCH_PHASE_TOOK_ENABLED_KEY = "search.phase_took_enabled"; + public static final Setting SEARCH_PHASE_TOOK_ENABLED = Setting.boolSetting( + SEARCH_PHASE_TOOK_ENABLED_KEY, + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + private final List searchRequestListenersList; + + @Inject + public SearchRequestListenerManager( + ClusterService clusterService + ) { + this.clusterService = clusterService; + searchRequestListenersList = new ArrayList<>(); + } + + /** + * Add a {@link SearchRequestOperationsListener} to the searchRequestListenersList, + * which will be executed during each search request. + * + * @param listener A SearchRequestOperationsListener object to add. + * @throws IllegalArgumentException if the input listener is null or already exists in the list. + */ + public void addListener(SearchRequestOperationsListener listener) { + if (listener == null) { + throw new IllegalArgumentException("listener must not be null"); + } + if (searchRequestListenersList.contains(listener)) { + throw new IllegalArgumentException("listener already added"); + } + searchRequestListenersList.add(listener); + } + + /** + * Remove a {@link SearchRequestOperationsListener} from the searchRequestListenersList, + * + * @param listener A SearchRequestOperationsListener object to remove. + * @throws IllegalArgumentException if the input listener is null or already exists in the list. + */ + public void removeListener(SearchRequestOperationsListener listener) { + if (listener == null) { + throw new IllegalArgumentException("listener must not be null"); + } + if (!searchRequestListenersList.contains(listener)) { + throw new IllegalArgumentException("listener does not exist in the listeners list"); + } + searchRequestListenersList.remove(listener); + } + + /** + * Get searchRequestListenersList, + * + * @return List of SearchRequestOperationsListener + * @throws IllegalArgumentException if the input listener is null or already exists in the list. + */ + public List getListeners() { + return searchRequestListenersList; + } + + + /** + * Create the {@link SearchRequestOperationsListener.CompositeListener} + * with the all listeners enabled at cluster-level and request-level. + * + * @param searchRequest The SearchRequest object. SearchRequestListenerManager will decide which request-level listeners to add based on states/flags of the request + * @param logger Logger to be attached to the {@link SearchRequestOperationsListener.CompositeListener} + * @param perRequestListeners the per-request listeners that can be optionally added to the returned CompositeListener list. + * @return SearchRequestOperationsListener.CompositeListener + */ + public SearchRequestOperationsListener.CompositeListener buildCompositeListener( + SearchRequest searchRequest, + Logger logger, + SearchRequestOperationsListener... perRequestListeners + ) { + final List searchListenersList = new ArrayList<>(searchRequestListenersList); + + Arrays.stream(perRequestListeners).parallel().forEach((listener) -> { + if (listener != null && listener.getClass() == TransportSearchAction.SearchTimeProvider.class) { + TransportSearchAction.SearchTimeProvider timeProvider = (TransportSearchAction.SearchTimeProvider) listener; + // phase_took is enabled with request param and/or cluster setting + boolean phaseTookEnabled = (searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) || + clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED); + if (phaseTookEnabled) { + timeProvider.setPhaseTook(true); + searchListenersList.add(timeProvider); + } + } + }); + return new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); + } + +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index 9132aac387085..59bdc10a32978 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -21,7 +21,7 @@ */ @InternalApi abstract class SearchRequestOperationsListener { - protected boolean enabled; + protected SearchRequestListenerManager searchRequestListenerManager; abstract void onPhaseStart(SearchPhaseContext context); @@ -34,7 +34,6 @@ void onRequestStart(SearchRequestContext searchRequestContext) {} void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} public void setEnabled(boolean enabled) { - this.enabled = enabled; if (enabled) { register(); } else { @@ -47,13 +46,21 @@ public void setEnabled(boolean enabled) { * Handler function to register this listener to certain components * This function will be called when the listener is enabled. */ - protected void register() {} + protected void register() { + if (this.searchRequestListenerManager != null) { + this.searchRequestListenerManager.addListener(this); + } + } /** * Handler function to deregister this listener from certain components * This function will be called when the listener is disabled. */ - protected void deregister() {} + protected void deregister() { + if (this.searchRequestListenerManager != null) { + this.searchRequestListenerManager.removeListener(this); + } + } /** * Holder of Composite Listeners diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java index 0971e252996a3..f06619ec6f263 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java @@ -37,6 +37,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; import org.opensearch.common.logging.Loggers; import org.opensearch.common.logging.OpenSearchLogMessage; import org.opensearch.common.logging.SlowLogLevel; @@ -108,12 +109,18 @@ public final class SearchRequestSlowLog extends SearchRequestOperationsListener private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); - public SearchRequestSlowLog(ClusterService clusterService) { - this(clusterService, LogManager.getLogger(CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX)); // logger configured in log4j2.properties + @Inject + public SearchRequestSlowLog( + ClusterService clusterService, + SearchRequestListenerManager searchRequestListenerManager + ) { + this(clusterService, searchRequestListenerManager, LogManager.getLogger(CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX)); // logger configured in log4j2.properties } - SearchRequestSlowLog(ClusterService clusterService, Logger logger) { + @Inject + SearchRequestSlowLog(ClusterService clusterService, SearchRequestListenerManager searchRequestListenerManager, Logger logger) { this.logger = logger; + this.searchRequestListenerManager = searchRequestListenerManager; Loggers.setLevel(this.logger, SlowLogLevel.TRACE.name()); this.warnThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING).nanos(); @@ -160,22 +167,6 @@ void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequest } } - /** - * register this listener to TransportSearchAction - */ - @Override - protected void register() { - TransportSearchAction.addSearchOperationsListener(this); - } - - /** - * deregister this listener to TransportSearchAction - */ - @Override - protected void deregister() { - TransportSearchAction.removeSearchOperationsListener(this); - } - /** * Search request slow log message * @@ -249,22 +240,22 @@ private static String escapeJson(String text) { void setWarnThreshold(TimeValue warnThreshold) { this.warnThreshold = warnThreshold.nanos(); - changeEnabledIfNeeded(); + setEnabled(); } void setInfoThreshold(TimeValue infoThreshold) { this.infoThreshold = infoThreshold.nanos(); - changeEnabledIfNeeded(); + setEnabled(); } void setDebugThreshold(TimeValue debugThreshold) { this.debugThreshold = debugThreshold.nanos(); - changeEnabledIfNeeded(); + setEnabled(); } void setTraceThreshold(TimeValue traceThreshold) { this.traceThreshold = traceThreshold.nanos(); - changeEnabledIfNeeded(); + setEnabled(); } void setLevel(SlowLogLevel level) { @@ -291,7 +282,7 @@ SlowLogLevel getLevel() { return level; } - private void changeEnabledIfNeeded() { + private void setEnabled() { super.setEnabled(this.warnThreshold >= 0 || this.debugThreshold >= 0 || this.infoThreshold >= 0 diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java index 2a1b62b235301..185ac2a66a859 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java @@ -37,27 +37,15 @@ public final class SearchRequestStats extends SearchRequestOperationsListener { ); @Inject - public SearchRequestStats(ClusterService clusterService) { + public SearchRequestStats( + ClusterService clusterService, + SearchRequestListenerManager searchRequestListenerManager + ) { clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled); for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { phaseStatsMap.put(searchPhaseName, new StatsHolder()); } - } - - /** - * register this listener to TransportSearchAction - */ - @Override - protected void register() { - TransportSearchAction.addSearchOperationsListener(this); - } - - /** - * deregister this listener to TransportSearchAction - */ - @Override - protected void deregister() { - TransportSearchAction.removeSearchOperationsListener(this); + this.searchRequestListenerManager = searchRequestListenerManager; } public long getPhaseCurrent(SearchPhaseName searchPhaseName) { diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 5dc4e505e9b05..17b220ae324a4 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -154,17 +154,6 @@ public class TransportSearchAction extends HandledTransportAction SEARCH_PHASE_TOOK_ENABLED = Setting.boolSetting( - SEARCH_PHASE_TOOK_ENABLED_KEY, - false, - Property.Dynamic, - Property.NodeScope - ); - - private static final List searchRequestOperationsListenersList = new ArrayList<>(); - private final NodeClient client; private final ThreadPool threadPool; private final ClusterService clusterService; @@ -176,6 +165,7 @@ public class TransportSearchAction extends HandledTransportAction) SearchRequest::new); this.client = client; @@ -214,6 +205,7 @@ public TransportSearchAction( this.searchPipelineService = searchPipelineService; this.metricsRegistry = metricsRegistry; this.searchQueryMetricsEnabled = clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING); + this.searchRequestListenerManager = searchRequestListenerManager; clusterService.getClusterSettings() .addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled); } @@ -470,11 +462,12 @@ private void executeRequest( relativeStartNanos, System::nanoTime ); - - final List searchListenersList = createSearchListenerList(originalSearchRequest, timeProvider); - SearchRequestContext searchRequestContext = new SearchRequestContext( - new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger) + SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestListenerManager.buildCompositeListener( + originalSearchRequest, + logger, + timeProvider ); + SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners); searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext); PipelinedRequest searchRequest; @@ -1224,24 +1217,6 @@ AbstractSearchAsyncAction asyncSearchAction( ); } - private List createSearchListenerList(SearchRequest searchRequest, SearchTimeProvider timeProvider) { - final List searchListenersList = new ArrayList<>(searchRequestOperationsListenersList); - - // phase_took is enabled with request param and/or cluster setting - Boolean phaseTookRequestParam = searchRequest.isPhaseTook(); - if (phaseTookRequestParam == null) { // check cluster setting only when request param is undefined - if (clusterService.getClusterSettings().get(TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED)) { - timeProvider.setPhaseTook(true); - searchListenersList.add(timeProvider); - } - } else if (phaseTookRequestParam == true) { - timeProvider.setPhaseTook(true); - searchListenersList.add(timeProvider); - } - - return searchListenersList; - } - private AbstractSearchAsyncAction searchAsyncAction( SearchTask task, SearchRequest searchRequest, @@ -1509,38 +1484,4 @@ static List getLocalLocalShardsIteratorFromPointInTime( } return iterators; } - - - /** - * Add a {@link SearchRequestOperationsListener} to the searchRequestOperationsListenersList, - * which will be executed during each search request. - * - * @param listener A SearchRequestOperationsListener object to add. - * @throws IllegalArgumentException if the input listener is null or already exists in the list. - */ - public static void addSearchOperationsListener(SearchRequestOperationsListener listener) { - if (listener == null) { - throw new IllegalArgumentException("listener must not be null"); - } - if (searchRequestOperationsListenersList.contains(listener)) { - throw new IllegalArgumentException("listener already added"); - } - searchRequestOperationsListenersList.add(listener); - } - - /** - * Remove a {@link SearchRequestOperationsListener} from the searchRequestOperationsListenersList, - * - * @param listener A SearchRequestOperationsListener object to remove. - * @throws IllegalArgumentException if the input listener is null or already exists in the list. - */ - public static void removeSearchOperationsListener(SearchRequestOperationsListener listener) { - if (listener == null) { - throw new IllegalArgumentException("listener must not be null"); - } - if (!searchRequestOperationsListenersList.contains(listener)) { - throw new IllegalArgumentException("listener does not exist in the listeners list"); - } - searchRequestOperationsListenersList.remove(listener); - } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 8b00d7290cfdd..12145bf298f63 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -34,10 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.opensearch.action.admin.indices.close.TransportCloseIndexAction; -import org.opensearch.action.search.CreatePitController; -import org.opensearch.action.search.SearchRequestSlowLog; -import org.opensearch.action.search.SearchRequestStats; -import org.opensearch.action.search.TransportSearchAction; +import org.opensearch.action.search.*; import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.support.DestructiveOperations; import org.opensearch.action.support.replication.TransportReplicationAction; @@ -381,8 +378,8 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING, - TransportSearchAction.SEARCH_PHASE_TOOK_ENABLED, TransportSearchAction.SEARCH_QUERY_METRICS_ENABLED_SETTING, + SearchRequestListenerManager.SEARCH_PHASE_TOOK_ENABLED, SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 7fe6f03951113..59672eaf183bc 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -46,6 +46,7 @@ import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; +import org.opensearch.action.search.SearchRequestListenerManager; import org.opensearch.action.search.SearchRequestSlowLog; import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.SearchTransportService; @@ -782,9 +783,10 @@ protected Node( repositoriesServiceReference::get, threadPool ); + final SearchRequestListenerManager searchRequestListenerManager = new SearchRequestListenerManager(clusterService); - final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService); - final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService, searchRequestListenerManager); + final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, searchRequestListenerManager); remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); final IndicesService indicesService = new IndicesService( @@ -1275,6 +1277,7 @@ protected Node( b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker); + b.bind(SearchRequestListenerManager.class).toInstance(searchRequestListenerManager); }); injector = modules.createInjector(); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index b8ab5c935fa34..1704238868907 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -34,6 +34,7 @@ import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.action.search.SearchRequestListenerManager; import org.opensearch.action.search.SearchRequestStats; import org.opensearch.cluster.coordination.PendingClusterStateStats; import org.opensearch.cluster.coordination.PersistedStateStats; @@ -41,9 +42,12 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.WeightedRoutingStats; import org.opensearch.cluster.service.ClusterManagerThrottlingStats; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.cluster.service.ClusterStateStats; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.metrics.OperationStats; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.indices.breaker.AllCircuitBreakerStats; @@ -961,7 +965,13 @@ public void apply(String action, AdmissionControlActionType admissionControlActi private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) { NodeIndicesStats indicesStats = null; if (remoteStoreStats) { - indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>(), new SearchRequestStats()); + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>(), new SearchRequestStats(clusterService, listenerManager)); RemoteSegmentStats remoteSegmentStats = indicesStats.getSegments().getRemoteSegmentStats(); remoteSegmentStats.addUploadBytesStarted(10L); remoteSegmentStats.addUploadBytesSucceeded(10L); diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index e17fbab32a12e..d835fbda107c4 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -36,8 +36,11 @@ import org.opensearch.action.support.IndicesOptions; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.routing.GroupShardsIterator; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.UUIDs; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.set.Sets; @@ -328,7 +331,13 @@ public void testSendSearchResponseDisallowPartialFailures() { } public void testOnPhaseFailureAndVerifyListeners() { - SearchRequestStats testListener = new SearchRequestStats(); + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestStats testListener = new SearchRequestStats(clusterService, listenerManager); final List requestOperationListeners = new ArrayList<>(List.of(testListener)); SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners); @@ -591,7 +600,13 @@ public void onFailure(Exception e) { } public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedException { - SearchRequestStats testListener = new SearchRequestStats(); + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestStats testListener = new SearchRequestStats(clusterService, listenerManager); final List requestOperationListeners = new ArrayList<>(List.of(testListener)); long delay = (randomIntBetween(1, 5)); @@ -640,7 +655,13 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx } public void testOnPhaseListenersWithDfsType() throws InterruptedException { - SearchRequestStats testListener = new SearchRequestStats(); + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestStats testListener = new SearchRequestStats(clusterService, listenerManager); final List requestOperationListeners = new ArrayList<>(List.of(testListener)); SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction( diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestListenerManagerTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestListenerManagerTests.java new file mode 100644 index 0000000000000..7cd1010f1e362 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestListenerManagerTests.java @@ -0,0 +1,129 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.lang.reflect.Field; +import java.util.List; + + +public class SearchRequestListenerManagerTests extends OpenSearchTestCase { + public void testAddAndGetListeners() { + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestOperationsListener testListener = createTestSearchRequestOperationsListener(); + listenerManager.addListener(testListener); + assertEquals(1, listenerManager.getListeners().size()); + assertEquals(testListener, listenerManager.getListeners().get(0)); + } + + public void testRemoveListeners() { + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); + SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); + listenerManager.addListener(testListener1); + listenerManager.addListener(testListener2); + assertEquals(2, listenerManager.getListeners().size()); + listenerManager.removeListener(testListener2); + assertEquals(1, listenerManager.getListeners().size()); + assertEquals(testListener1, listenerManager.getListeners().get(0)); + } + + public void testBuildCompositeListenersWithTimeProvider() throws NoSuchFieldException, IllegalAccessException { + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); + SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider( + 0, + System.nanoTime(), + System::nanoTime + ); + listenerManager.addListener(testListener1); + SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); + SearchRequest searchRequest = new SearchRequest().source(source); + searchRequest.setPhaseTook(true); + SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener( + searchRequest, + logger, + timeProviderListener + ); + Field listenersField = SearchRequestOperationsListener.CompositeListener.class.getDeclaredField("listeners"); + listenersField.setAccessible(true); + List listeners = (List) listenersField.get(compositeListener); + assertEquals(2, listeners.size()); + assertEquals(testListener1, listeners.get(0)); + assertEquals(timeProviderListener, listeners.get(1)); + assertEquals(1, listenerManager.getListeners().size()); + assertEquals(testListener1, listenerManager.getListeners().get(0)); + } + + public void testBuildCompositeListenersWithPhaseTookDisabled() throws NoSuchFieldException, IllegalAccessException { + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); + SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider( + 0, + System.nanoTime(), + System::nanoTime + ); + listenerManager.addListener(testListener1); + SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); + SearchRequest searchRequest = new SearchRequest().source(source); + searchRequest.setPhaseTook(false); + SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener( + searchRequest, + logger, + timeProviderListener + ); + Field listenersField = SearchRequestOperationsListener.CompositeListener.class.getDeclaredField("listeners"); + listenersField.setAccessible(true); + List listeners = (List) listenersField.get(compositeListener); + assertEquals(1, listeners.size()); + assertEquals(testListener1, listeners.get(0)); + assertEquals(1, listenerManager.getListeners().size()); + assertEquals(testListener1, listenerManager.getListeners().get(0)); + } + + + public SearchRequestOperationsListener createTestSearchRequestOperationsListener() { + return new SearchRequestOperationsListener() { + @Override + void onPhaseStart(SearchPhaseContext context) {} + + @Override + void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + + @Override + void onPhaseFailure(SearchPhaseContext context) {} + }; + } +} diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java index e23f08c9415eb..fc8300e0174cf 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java @@ -95,7 +95,8 @@ public void testMultipleSlowLoggersUseSingleLog4jLogger() { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestSlowLog searchRequestSlowLog1 = new SearchRequestSlowLog(clusterService1); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService1); + SearchRequestSlowLog searchRequestSlowLog1 = new SearchRequestSlowLog(clusterService1, listenerManager); int numberOfLoggersBefore = context.getLoggers().size(); SearchPhaseContext searchPhaseContext2 = new MockSearchPhaseContext(1); @@ -104,7 +105,8 @@ public void testMultipleSlowLoggersUseSingleLog4jLogger() { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestSlowLog searchRequestSlowLog2 = new SearchRequestSlowLog(clusterService2); + SearchRequestListenerManager listenerManager2 = new SearchRequestListenerManager(clusterService2); + SearchRequestSlowLog searchRequestSlowLog2 = new SearchRequestSlowLog(clusterService2, listenerManager2); int numberOfLoggersAfter = context.getLoggers().size(); assertThat(numberOfLoggersAfter, equalTo(numberOfLoggersBefore)); @@ -124,7 +126,8 @@ public void testOnRequestEnd() throws InterruptedException { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, logger); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, listenerManager, logger); final List searchListenersList = new ArrayList<>(List.of(searchRequestSlowLog)); when(searchRequestContext.getSearchRequestOperationsListener()).thenReturn( @@ -157,7 +160,8 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, logger); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, listenerManager, logger); final List searchListenersList = new ArrayList<>(List.of(searchRequestSlowLog)); when(searchPhaseContext.getRequest()).thenReturn(searchRequest); @@ -308,7 +312,8 @@ public void testLevelSettingWarn() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, listenerManager); assertEquals(level, searchRequestSlowLog.getLevel()); } @@ -319,7 +324,8 @@ public void testLevelSettingDebug() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, listenerManager); assertEquals(level, searchRequestSlowLog.getLevel().toString()); } @@ -330,9 +336,10 @@ public void testLevelSettingFail() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); try { - new SearchRequestSlowLog(clusterService); + new SearchRequestSlowLog(clusterService, listenerManager); fail(); } catch (IllegalArgumentException ex) { final String expected = "No enum constant org.opensearch.common.logging.SlowLogLevel.NOT A LEVEL"; @@ -350,7 +357,8 @@ public void testSetThresholds() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, listenerManager); assertEquals(TimeValue.timeValueMillis(400).nanos(), searchRequestSlowLog.getWarnThreshold()); assertEquals(TimeValue.timeValueMillis(300).nanos(), searchRequestSlowLog.getInfoThreshold()); assertEquals(TimeValue.timeValueMillis(200).nanos(), searchRequestSlowLog.getDebugThreshold()); @@ -367,7 +375,8 @@ public void testSetThresholdsUnits() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, listenerManager); assertEquals(TimeValue.timeValueSeconds(400).nanos(), searchRequestSlowLog.getWarnThreshold()); assertEquals(TimeValue.timeValueMillis(300).nanos(), searchRequestSlowLog.getInfoThreshold()); assertEquals(TimeValue.timeValueNanos(200000).nanos(), searchRequestSlowLog.getDebugThreshold()); @@ -382,7 +391,8 @@ public void testSetThresholdsDefaults() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, listenerManager); assertEquals(TimeValue.timeValueMillis(400).nanos(), searchRequestSlowLog.getWarnThreshold()); assertEquals(TimeValue.timeValueMillis(-1).nanos(), searchRequestSlowLog.getInfoThreshold()); assertEquals(TimeValue.timeValueMillis(200).nanos(), searchRequestSlowLog.getDebugThreshold()); @@ -396,9 +406,10 @@ public void testSetThresholdsError() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); try { - new SearchRequestSlowLog(clusterService); + new SearchRequestSlowLog(clusterService, listenerManager); fail(); } catch (IllegalArgumentException ex) { final String expected = diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java index 93cf77933fdd5..6ac7eb6040f13 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java @@ -8,6 +8,9 @@ package org.opensearch.action.search; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.test.OpenSearchTestCase; import java.util.HashMap; @@ -22,7 +25,13 @@ public class SearchRequestStatsTests extends OpenSearchTestCase { public void testSearchRequestPhaseFailure() { - SearchRequestStats testRequestStats = new SearchRequestStats(); + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterService, listenerManager); SearchPhaseContext ctx = mock(SearchPhaseContext.class); SearchPhase mockSearchPhase = mock(SearchPhase.class); when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); @@ -37,7 +46,13 @@ public void testSearchRequestPhaseFailure() { } public void testSearchRequestStats() { - SearchRequestStats testRequestStats = new SearchRequestStats(); + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterService, listenerManager); SearchPhaseContext ctx = mock(SearchPhaseContext.class); SearchPhase mockSearchPhase = mock(SearchPhase.class); @@ -58,7 +73,13 @@ public void testSearchRequestStats() { } public void testSearchRequestStatsOnPhaseStartConcurrently() throws InterruptedException { - SearchRequestStats testRequestStats = new SearchRequestStats(); + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterService, listenerManager); int numTasks = randomIntBetween(5, 50); Thread[] threads = new Thread[numTasks * SearchPhaseName.values().length]; Phaser phaser = new Phaser(numTasks * SearchPhaseName.values().length + 1); @@ -85,7 +106,13 @@ public void testSearchRequestStatsOnPhaseStartConcurrently() throws InterruptedE } public void testSearchRequestStatsOnPhaseEndConcurrently() throws InterruptedException { - SearchRequestStats testRequestStats = new SearchRequestStats(); + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterService, listenerManager); int numTasks = randomIntBetween(5, 50); Thread[] threads = new Thread[numTasks * SearchPhaseName.values().length]; Phaser phaser = new Phaser(numTasks * SearchPhaseName.values().length + 1); @@ -121,7 +148,13 @@ public void testSearchRequestStatsOnPhaseEndConcurrently() throws InterruptedExc } public void testSearchRequestStatsOnPhaseFailureConcurrently() throws InterruptedException { - SearchRequestStats testRequestStats = new SearchRequestStats(); + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterService, listenerManager); int numTasks = randomIntBetween(5, 50); Thread[] threads = new Thread[numTasks * SearchPhaseName.values().length]; Phaser phaser = new Phaser(numTasks * SearchPhaseName.values().length + 1); diff --git a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java index 52b272094cd86..612d3a39a1149 100644 --- a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java +++ b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java @@ -32,11 +32,10 @@ package org.opensearch.index.search.stats; -import org.opensearch.action.search.SearchPhase; -import org.opensearch.action.search.SearchPhaseContext; -import org.opensearch.action.search.SearchPhaseName; -import org.opensearch.action.search.SearchRequestOperationsListenerSupport; -import org.opensearch.action.search.SearchRequestStats; +import org.opensearch.action.search.*; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.index.search.stats.SearchStats.Stats; import org.opensearch.test.OpenSearchTestCase; @@ -77,7 +76,13 @@ public void testShardLevelSearchGroupStats() throws Exception { long paramValue = randomIntBetween(2, 50); // Testing for request stats - SearchRequestStats testRequestStats = new SearchRequestStats(); + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterService, listenerManager); SearchPhaseContext ctx = mock(SearchPhaseContext.class); for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { SearchPhase mockSearchPhase = mock(SearchPhase.class); diff --git a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java index 6f36d22b7e17b..174dbc895206b 100644 --- a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java +++ b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java @@ -33,7 +33,11 @@ package org.opensearch.indices; import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.search.SearchRequestListenerManager; import org.opensearch.action.search.SearchRequestStats; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.test.OpenSearchTestCase; @@ -46,7 +50,13 @@ public class NodeIndicesStatsTests extends OpenSearchTestCase { public void testInvalidLevel() { CommonStats oldStats = new CommonStats(); - SearchRequestStats requestStats = new SearchRequestStats(); + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestStats requestStats = new SearchRequestStats(clusterService, listenerManager); final NodeIndicesStats stats = new NodeIndicesStats(oldStats, Collections.emptyMap(), requestStats); final String level = randomAlphaOfLength(16); final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level)); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 281ae14193308..519250d40cae6 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -91,6 +91,7 @@ import org.opensearch.action.search.SearchPhaseController; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchRequestSlowLog; +import org.opensearch.action.search.SearchRequestListenerManager; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchTransportService; import org.opensearch.action.search.TransportSearchAction; @@ -2285,6 +2286,7 @@ public void onFailure(final Exception e) { writableRegistry(), searchService::aggReduceContextBuilder ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); actions.put( SearchAction.INSTANCE, new TransportSearchAction( @@ -2310,7 +2312,8 @@ public void onFailure(final Exception e) { List.of(), client ), - NoopMetricsRegistry.INSTANCE + NoopMetricsRegistry.INSTANCE, + listenerManager ) ); actions.put(