From 344833842e598e023283b2d9119621dd71d9551c Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Tue, 12 Dec 2023 14:25:13 -0800 Subject: [PATCH] improve support for multithreading Signed-off-by: Chenyang Ji --- CHANGELOG.md | 1 + .../search/SearchRequestListenerManager.java | 31 ++++---- .../SearchRequestOperationsListener.java | 39 ++++------ .../action/search/SearchRequestSlowLog.java | 11 +-- .../action/search/SearchRequestStats.java | 4 +- .../common/settings/ClusterSettings.java | 6 +- .../main/java/org/opensearch/node/Node.java | 12 ++- .../cluster/node/stats/NodeStatsTests.java | 4 +- .../AbstractSearchAsyncActionTests.java | 9 +-- .../SearchRequestListenerManagerTests.java | 74 +++++++++++++++++-- .../search/SearchRequestSlowLogTests.java | 32 +++----- .../search/SearchRequestStatsTests.java | 15 ++-- .../index/search/stats/SearchStatsTests.java | 9 ++- .../indices/NodeIndicesStatsTests.java | 4 +- .../snapshots/SnapshotResiliencyTests.java | 1 - 15 files changed, 142 insertions(+), 110 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4cd28cbbe3933..69ce91f2951cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -172,6 +172,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Interpret byte array as primitive using VarHandles ([#11362](https://github.com/opensearch-project/OpenSearch/pull/11362)) - Change error message when per shard document limit is breached ([#11312](https://github.com/opensearch-project/OpenSearch/pull/11312)) - Restore support for Java 8 for RestClient ([#11562](https://github.com/opensearch-project/OpenSearch/pull/11562)) +- Added Support for dynamically adding SearchRequestOperationsListeners ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestListenerManager.java b/server/src/main/java/org/opensearch/action/search/SearchRequestListenerManager.java index b1c275555349f..25e44a0a562e8 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestListenerManager.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestListenerManager.java @@ -10,12 +10,12 @@ import org.apache.logging.log4j.Logger; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Setting; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; /** @@ -38,7 +38,6 @@ public class SearchRequestListenerManager { ); private final List searchRequestListenersList; - @Inject public SearchRequestListenerManager( ClusterService clusterService ) { @@ -47,20 +46,22 @@ public SearchRequestListenerManager( } /** - * Add a {@link SearchRequestOperationsListener} to the searchRequestListenersList, - * which will be executed during each search request. + * Add multiple {@link SearchRequestOperationsListener} to the searchRequestListenersList. + * Those enabled listeners will be executed during each search request. * - * @param listener A SearchRequestOperationsListener object to add. - * @throws IllegalArgumentException if the input listener is null or already exists in the list. + * @param listeners Multiple SearchRequestOperationsListener object to add. + * @throws IllegalArgumentException if any input listener is null or already exists in the list. */ - public void addListener(SearchRequestOperationsListener listener) { - if (listener == null) { - throw new IllegalArgumentException("listener must not be null"); - } - if (searchRequestListenersList.contains(listener)) { - throw new IllegalArgumentException("listener already added"); + public void addListeners(SearchRequestOperationsListener... listeners) { + for (SearchRequestOperationsListener listener : listeners) { + if (listener == null) { + throw new IllegalArgumentException("listener must not be null"); + } + if (searchRequestListenersList.contains(listener)) { + throw new IllegalArgumentException("listener already added"); + } + searchRequestListenersList.add(listener); } - searchRequestListenersList.add(listener); } /** @@ -104,9 +105,9 @@ public SearchRequestOperationsListener.CompositeListener buildCompositeListener( Logger logger, SearchRequestOperationsListener... perRequestListeners ) { - final List searchListenersList = new ArrayList<>(searchRequestListenersList); + final List searchListenersList = searchRequestListenersList.stream().filter(SearchRequestOperationsListener::getEnabled).collect(Collectors.toList()); - Arrays.stream(perRequestListeners).parallel().forEach((listener) -> { + Arrays.stream(perRequestListeners).forEach((listener) -> { if (listener != null && listener.getClass() == TransportSearchAction.SearchTimeProvider.class) { TransportSearchAction.SearchTimeProvider timeProvider = (TransportSearchAction.SearchTimeProvider) listener; // phase_took is enabled with request param and/or cluster setting diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index 59bdc10a32978..7e318a542c812 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -20,8 +20,15 @@ * @opensearch.internal */ @InternalApi -abstract class SearchRequestOperationsListener { - protected SearchRequestListenerManager searchRequestListenerManager; +public abstract class SearchRequestOperationsListener { + private volatile boolean enabled; + + protected SearchRequestOperationsListener() { + this.enabled = false; + } + protected SearchRequestOperationsListener(boolean enabled) { + this.enabled = enabled; + } abstract void onPhaseStart(SearchPhaseContext context); @@ -33,33 +40,13 @@ void onRequestStart(SearchRequestContext searchRequestContext) {} void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} - public void setEnabled(boolean enabled) { - if (enabled) { - register(); - } else { - deregister(); - } - } - - /** - * Handler function to register this listener to certain components - * This function will be called when the listener is enabled. - */ - protected void register() { - if (this.searchRequestListenerManager != null) { - this.searchRequestListenerManager.addListener(this); - } + boolean getEnabled() { + return enabled; } - /** - * Handler function to deregister this listener from certain components - * This function will be called when the listener is disabled. - */ - protected void deregister() { - if (this.searchRequestListenerManager != null) { - this.searchRequestListenerManager.removeListener(this); - } + void setEnabled(boolean enabled) { + this.enabled = enabled; } /** diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java index f06619ec6f263..0fadbda6b62a9 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java @@ -37,7 +37,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.inject.Inject; import org.opensearch.common.logging.Loggers; import org.opensearch.common.logging.OpenSearchLogMessage; import org.opensearch.common.logging.SlowLogLevel; @@ -109,18 +108,14 @@ public final class SearchRequestSlowLog extends SearchRequestOperationsListener private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); - @Inject public SearchRequestSlowLog( - ClusterService clusterService, - SearchRequestListenerManager searchRequestListenerManager + ClusterService clusterService ) { - this(clusterService, searchRequestListenerManager, LogManager.getLogger(CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX)); // logger configured in log4j2.properties + this(clusterService, LogManager.getLogger(CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX)); // logger configured in log4j2.properties } - @Inject - SearchRequestSlowLog(ClusterService clusterService, SearchRequestListenerManager searchRequestListenerManager, Logger logger) { + SearchRequestSlowLog(ClusterService clusterService, Logger logger) { this.logger = logger; - this.searchRequestListenerManager = searchRequestListenerManager; Loggers.setLevel(this.logger, SlowLogLevel.TRACE.name()); this.warnThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING).nanos(); diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java index 185ac2a66a859..19cb9dc4b73d5 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java @@ -38,14 +38,12 @@ public final class SearchRequestStats extends SearchRequestOperationsListener { @Inject public SearchRequestStats( - ClusterService clusterService, - SearchRequestListenerManager searchRequestListenerManager + ClusterService clusterService ) { clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled); for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { phaseStatsMap.put(searchPhaseName, new StatsHolder()); } - this.searchRequestListenerManager = searchRequestListenerManager; } public long getPhaseCurrent(SearchPhaseName searchPhaseName) { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 1315fbc7c2b38..af211661d42a4 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -34,7 +34,11 @@ import org.apache.logging.log4j.LogManager; import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.opensearch.action.admin.indices.close.TransportCloseIndexAction; -import org.opensearch.action.search.*; +import org.opensearch.action.search.CreatePitController; +import org.opensearch.action.search.SearchRequestListenerManager; +import org.opensearch.action.search.SearchRequestSlowLog; +import org.opensearch.action.search.SearchRequestStats; +import org.opensearch.action.search.TransportSearchAction; import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.support.DestructiveOperations; import org.opensearch.action.support.replication.TransportReplicationAction; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 59672eaf183bc..6acc5105b7b29 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -783,10 +783,16 @@ protected Node( repositoriesServiceReference::get, threadPool ); - final SearchRequestListenerManager searchRequestListenerManager = new SearchRequestListenerManager(clusterService); - final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService, searchRequestListenerManager); - final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, searchRequestListenerManager); + final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService); + final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + + // register all standard SearchRequestOperationsListeners to the SearchRequestListenerManager + final SearchRequestListenerManager searchRequestListenerManager = new SearchRequestListenerManager(clusterService); + searchRequestListenerManager.addListeners( + searchRequestStats, + searchRequestSlowLog + ); remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); final IndicesService indicesService = new IndicesService( diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 1704238868907..04593fc0b66be 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -34,7 +34,6 @@ import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; -import org.opensearch.action.search.SearchRequestListenerManager; import org.opensearch.action.search.SearchRequestStats; import org.opensearch.cluster.coordination.PendingClusterStateStats; import org.opensearch.cluster.coordination.PersistedStateStats; @@ -970,8 +969,7 @@ private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>(), new SearchRequestStats(clusterService, listenerManager)); + indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>(), new SearchRequestStats(clusterService)); RemoteSegmentStats remoteSegmentStats = indicesStats.getSegments().getRemoteSegmentStats(); remoteSegmentStats.addUploadBytesStarted(10L); remoteSegmentStats.addUploadBytesSucceeded(10L); diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index d835fbda107c4..d1362acdc3e34 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -336,8 +336,7 @@ public void testOnPhaseFailureAndVerifyListeners() { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestStats testListener = new SearchRequestStats(clusterService, listenerManager); + SearchRequestStats testListener = new SearchRequestStats(clusterService); final List requestOperationListeners = new ArrayList<>(List.of(testListener)); SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners); @@ -605,8 +604,7 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestStats testListener = new SearchRequestStats(clusterService, listenerManager); + SearchRequestStats testListener = new SearchRequestStats(clusterService); final List requestOperationListeners = new ArrayList<>(List.of(testListener)); long delay = (randomIntBetween(1, 5)); @@ -660,8 +658,7 @@ public void testOnPhaseListenersWithDfsType() throws InterruptedException { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestStats testListener = new SearchRequestStats(clusterService, listenerManager); + SearchRequestStats testListener = new SearchRequestStats(clusterService); final List requestOperationListeners = new ArrayList<>(List.of(testListener)); SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction( diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestListenerManagerTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestListenerManagerTests.java index 7cd1010f1e362..348049dacf667 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestListenerManagerTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestListenerManagerTests.java @@ -28,7 +28,7 @@ public void testAddAndGetListeners() { ); SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); SearchRequestOperationsListener testListener = createTestSearchRequestOperationsListener(); - listenerManager.addListener(testListener); + listenerManager.addListeners(testListener); assertEquals(1, listenerManager.getListeners().size()); assertEquals(testListener, listenerManager.getListeners().get(0)); } @@ -42,15 +42,14 @@ public void testRemoveListeners() { SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); - listenerManager.addListener(testListener1); - listenerManager.addListener(testListener2); + listenerManager.addListeners(testListener1, testListener2); assertEquals(2, listenerManager.getListeners().size()); listenerManager.removeListener(testListener2); assertEquals(1, listenerManager.getListeners().size()); assertEquals(testListener1, listenerManager.getListeners().get(0)); } - public void testBuildCompositeListenersWithTimeProvider() throws NoSuchFieldException, IllegalAccessException { + public void testStandardListenersEnabled() throws NoSuchFieldException, IllegalAccessException { ClusterService clusterService = new ClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -58,12 +57,40 @@ public void testBuildCompositeListenersWithTimeProvider() throws NoSuchFieldExce ); SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); + SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener(); + testListener2.setEnabled(true); + listenerManager.addListeners(testListener1, testListener2); + SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); + SearchRequest searchRequest = new SearchRequest().source(source); + SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener( + searchRequest, + logger + ); + Field listenersField = SearchRequestOperationsListener.CompositeListener.class.getDeclaredField("listeners"); + listenersField.setAccessible(true); + List listeners = (List) listenersField.get(compositeListener); + assertEquals(1, listeners.size()); + assertEquals(testListener2, listeners.get(0)); + assertEquals(2, listenerManager.getListeners().size()); + assertEquals(testListener1, listenerManager.getListeners().get(0)); + assertEquals(testListener2, listenerManager.getListeners().get(1)); + } + + public void testStandardListenersAndTimeProvider() throws NoSuchFieldException, IllegalAccessException { + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); + testListener1.setEnabled(true); SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider( 0, System.nanoTime(), System::nanoTime ); - listenerManager.addListener(testListener1); + listenerManager.addListeners(testListener1); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); searchRequest.setPhaseTook(true); @@ -82,7 +109,39 @@ public void testBuildCompositeListenersWithTimeProvider() throws NoSuchFieldExce assertEquals(testListener1, listenerManager.getListeners().get(0)); } - public void testBuildCompositeListenersWithPhaseTookDisabled() throws NoSuchFieldException, IllegalAccessException { + public void testStandardListenersDisabledAndTimeProvider() throws NoSuchFieldException, IllegalAccessException { + ClusterService clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); + SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); + SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider( + 0, + System.nanoTime(), + System::nanoTime + ); + listenerManager.addListeners(testListener1); + SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); + SearchRequest searchRequest = new SearchRequest().source(source); + searchRequest.setPhaseTook(true); + SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener( + searchRequest, + logger, + timeProviderListener + ); + Field listenersField = SearchRequestOperationsListener.CompositeListener.class.getDeclaredField("listeners"); + listenersField.setAccessible(true); + List listeners = (List) listenersField.get(compositeListener); + assertEquals(1, listeners.size()); + assertEquals(timeProviderListener, listeners.get(0)); + assertEquals(1, listenerManager.getListeners().size()); + assertEquals(testListener1, listenerManager.getListeners().get(0)); + assertFalse(listenerManager.getListeners().get(0).getEnabled()); + } + + public void testStandardListenerAndTimeProviderDisabled() throws NoSuchFieldException, IllegalAccessException { ClusterService clusterService = new ClusterService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), @@ -90,12 +149,13 @@ public void testBuildCompositeListenersWithPhaseTookDisabled() throws NoSuchFiel ); SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener(); + testListener1.setEnabled(true); SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider( 0, System.nanoTime(), System::nanoTime ); - listenerManager.addListener(testListener1); + listenerManager.addListeners(testListener1); SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); SearchRequest searchRequest = new SearchRequest().source(source); searchRequest.setPhaseTook(false); diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java index fc8300e0174cf..5456ef02b9b8e 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java @@ -95,8 +95,7 @@ public void testMultipleSlowLoggersUseSingleLog4jLogger() { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService1); - SearchRequestSlowLog searchRequestSlowLog1 = new SearchRequestSlowLog(clusterService1, listenerManager); + SearchRequestSlowLog searchRequestSlowLog1 = new SearchRequestSlowLog(clusterService1); int numberOfLoggersBefore = context.getLoggers().size(); SearchPhaseContext searchPhaseContext2 = new MockSearchPhaseContext(1); @@ -106,7 +105,7 @@ public void testMultipleSlowLoggersUseSingleLog4jLogger() { null ); SearchRequestListenerManager listenerManager2 = new SearchRequestListenerManager(clusterService2); - SearchRequestSlowLog searchRequestSlowLog2 = new SearchRequestSlowLog(clusterService2, listenerManager2); + SearchRequestSlowLog searchRequestSlowLog2 = new SearchRequestSlowLog(clusterService2); int numberOfLoggersAfter = context.getLoggers().size(); assertThat(numberOfLoggersAfter, equalTo(numberOfLoggersBefore)); @@ -126,8 +125,7 @@ public void testOnRequestEnd() throws InterruptedException { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, listenerManager, logger); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, logger); final List searchListenersList = new ArrayList<>(List.of(searchRequestSlowLog)); when(searchRequestContext.getSearchRequestOperationsListener()).thenReturn( @@ -160,8 +158,7 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, listenerManager, logger); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, logger); final List searchListenersList = new ArrayList<>(List.of(searchRequestSlowLog)); when(searchPhaseContext.getRequest()).thenReturn(searchRequest); @@ -312,8 +309,7 @@ public void testLevelSettingWarn() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, listenerManager); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); assertEquals(level, searchRequestSlowLog.getLevel()); } @@ -324,8 +320,7 @@ public void testLevelSettingDebug() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, listenerManager); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); assertEquals(level, searchRequestSlowLog.getLevel().toString()); } @@ -336,10 +331,9 @@ public void testLevelSettingFail() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); try { - new SearchRequestSlowLog(clusterService, listenerManager); + new SearchRequestSlowLog(clusterService); fail(); } catch (IllegalArgumentException ex) { final String expected = "No enum constant org.opensearch.common.logging.SlowLogLevel.NOT A LEVEL"; @@ -357,8 +351,7 @@ public void testSetThresholds() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, listenerManager); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); assertEquals(TimeValue.timeValueMillis(400).nanos(), searchRequestSlowLog.getWarnThreshold()); assertEquals(TimeValue.timeValueMillis(300).nanos(), searchRequestSlowLog.getInfoThreshold()); assertEquals(TimeValue.timeValueMillis(200).nanos(), searchRequestSlowLog.getDebugThreshold()); @@ -375,8 +368,7 @@ public void testSetThresholdsUnits() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, listenerManager); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); assertEquals(TimeValue.timeValueSeconds(400).nanos(), searchRequestSlowLog.getWarnThreshold()); assertEquals(TimeValue.timeValueMillis(300).nanos(), searchRequestSlowLog.getInfoThreshold()); assertEquals(TimeValue.timeValueNanos(200000).nanos(), searchRequestSlowLog.getDebugThreshold()); @@ -391,8 +383,7 @@ public void testSetThresholdsDefaults() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, listenerManager); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); assertEquals(TimeValue.timeValueMillis(400).nanos(), searchRequestSlowLog.getWarnThreshold()); assertEquals(TimeValue.timeValueMillis(-1).nanos(), searchRequestSlowLog.getInfoThreshold()); assertEquals(TimeValue.timeValueMillis(200).nanos(), searchRequestSlowLog.getDebugThreshold()); @@ -406,10 +397,9 @@ public void testSetThresholdsError() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterService clusterService = new ClusterService(settings, clusterSettings, null); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); try { - new SearchRequestSlowLog(clusterService, listenerManager); + new SearchRequestSlowLog(clusterService); fail(); } catch (IllegalArgumentException ex) { final String expected = diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java index 6ac7eb6040f13..e2b21689bdb03 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java @@ -30,8 +30,7 @@ public void testSearchRequestPhaseFailure() { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestStats testRequestStats = new SearchRequestStats(clusterService, listenerManager); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterService); SearchPhaseContext ctx = mock(SearchPhaseContext.class); SearchPhase mockSearchPhase = mock(SearchPhase.class); when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); @@ -51,8 +50,7 @@ public void testSearchRequestStats() { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestStats testRequestStats = new SearchRequestStats(clusterService, listenerManager); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterService); SearchPhaseContext ctx = mock(SearchPhaseContext.class); SearchPhase mockSearchPhase = mock(SearchPhase.class); @@ -78,8 +76,7 @@ public void testSearchRequestStatsOnPhaseStartConcurrently() throws InterruptedE new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestStats testRequestStats = new SearchRequestStats(clusterService, listenerManager); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterService); int numTasks = randomIntBetween(5, 50); Thread[] threads = new Thread[numTasks * SearchPhaseName.values().length]; Phaser phaser = new Phaser(numTasks * SearchPhaseName.values().length + 1); @@ -111,8 +108,7 @@ public void testSearchRequestStatsOnPhaseEndConcurrently() throws InterruptedExc new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestStats testRequestStats = new SearchRequestStats(clusterService, listenerManager); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterService); int numTasks = randomIntBetween(5, 50); Thread[] threads = new Thread[numTasks * SearchPhaseName.values().length]; Phaser phaser = new Phaser(numTasks * SearchPhaseName.values().length + 1); @@ -153,8 +149,7 @@ public void testSearchRequestStatsOnPhaseFailureConcurrently() throws Interrupte new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestStats testRequestStats = new SearchRequestStats(clusterService, listenerManager); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterService); int numTasks = randomIntBetween(5, 50); Thread[] threads = new Thread[numTasks * SearchPhaseName.values().length]; Phaser phaser = new Phaser(numTasks * SearchPhaseName.values().length + 1); diff --git a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java index 612d3a39a1149..ea370ebc6cf76 100644 --- a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java +++ b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java @@ -32,7 +32,11 @@ package org.opensearch.index.search.stats; -import org.opensearch.action.search.*; +import org.opensearch.action.search.SearchPhase; +import org.opensearch.action.search.SearchPhaseContext; +import org.opensearch.action.search.SearchPhaseName; +import org.opensearch.action.search.SearchRequestOperationsListenerSupport; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -81,8 +85,7 @@ public void testShardLevelSearchGroupStats() throws Exception { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestStats testRequestStats = new SearchRequestStats(clusterService, listenerManager); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterService); SearchPhaseContext ctx = mock(SearchPhaseContext.class); for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { SearchPhase mockSearchPhase = mock(SearchPhase.class); diff --git a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java index 174dbc895206b..b07dc6d298045 100644 --- a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java +++ b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java @@ -33,7 +33,6 @@ package org.opensearch.indices; import org.opensearch.action.admin.indices.stats.CommonStats; -import org.opensearch.action.search.SearchRequestListenerManager; import org.opensearch.action.search.SearchRequestStats; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; @@ -55,8 +54,7 @@ public void testInvalidLevel() { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null ); - SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService); - SearchRequestStats requestStats = new SearchRequestStats(clusterService, listenerManager); + SearchRequestStats requestStats = new SearchRequestStats(clusterService); final NodeIndicesStats stats = new NodeIndicesStats(oldStats, Collections.emptyMap(), requestStats); final String level = randomAlphaOfLength(16); final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level)); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 519250d40cae6..bf1a6e1593f4c 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -90,7 +90,6 @@ import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchRequestSlowLog; import org.opensearch.action.search.SearchRequestListenerManager; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchTransportService;