Skip to content

Commit

Permalink
rename SearchRequestListenerManager to SearchRequestOperationsListeners
Browse files Browse the repository at this point in the history
  • Loading branch information
ansjcy committed Jan 2, 2024
1 parent a486a17 commit 5a516d9
Show file tree
Hide file tree
Showing 13 changed files with 72 additions and 122 deletions.
6 changes: 5 additions & 1 deletion .idea/runConfigurations/Debug_OpenSearch.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,30 @@
import java.util.stream.Stream;

/**
* SearchRequestListenerManager manages listeners registered to search requests,
* SearchRequestOperationsListeners contains listeners registered to search requests,
* and is responsible for creating the {@link SearchRequestOperationsListener.CompositeListener}
* with the all listeners enabled at cluster-level and request-level.
*
*
* @opensearch.internal
*/
public class SearchRequestListenerManager {
public class SearchRequestOperationsListeners {
private final List<SearchRequestOperationsListener> searchRequestListenersList;

/**
* Create the SearchRequestListenerManager and add multiple {@link SearchRequestOperationsListener}
* Create the SearchRequestOperationsListeners and add multiple {@link SearchRequestOperationsListener}
* to the searchRequestListenersList.
* Those enabled listeners will be executed during each search request.
*
* @param listeners Multiple SearchRequestOperationsListener object to add.
* @throws IllegalArgumentException if any input listener is null or already exists in the list.
* @throws IllegalArgumentException if any input listener is null.
*/
public SearchRequestListenerManager(SearchRequestOperationsListener... listeners) {
public SearchRequestOperationsListeners(SearchRequestOperationsListener... listeners) {
searchRequestListenersList = new ArrayList<>();
for (SearchRequestOperationsListener listener : listeners) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
if (searchRequestListenersList.contains(listener)) {
throw new IllegalArgumentException("listener already added");
}
searchRequestListenersList.add(listener);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@

package org.opensearch.action.search;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.metrics.MeanMetric;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;

import java.util.EnumMap;
Expand All @@ -37,9 +37,9 @@ public final class SearchRequestStats extends SearchRequestOperationsListener {
);

@Inject
public SearchRequestStats(ClusterService clusterService) {
this.setEnabled(clusterService.getClusterSettings().get(SEARCH_REQUEST_STATS_ENABLED));
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled);
public SearchRequestStats(ClusterSettings clusterSettings) {
this.setEnabled(clusterSettings.get(SEARCH_REQUEST_STATS_ENABLED));
clusterSettings.addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled);
for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
phaseStatsMap.put(searchPhaseName, new StatsHolder());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final NamedWriteableRegistry namedWriteableRegistry;
private final CircuitBreaker circuitBreaker;
private final SearchPipelineService searchPipelineService;
private final SearchRequestListenerManager searchRequestListenerManager;
private final SearchRequestOperationsListeners searchRequestOperationsListeners;

private volatile boolean searchQueryMetricsEnabled;

Expand All @@ -196,7 +196,7 @@ public TransportSearchAction(
NamedWriteableRegistry namedWriteableRegistry,
SearchPipelineService searchPipelineService,
MetricsRegistry metricsRegistry,
SearchRequestListenerManager searchRequestListenerManager
SearchRequestOperationsListeners searchRequestOperationsListeners
) {
super(SearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
this.client = client;
Expand All @@ -213,7 +213,7 @@ public TransportSearchAction(
this.searchPipelineService = searchPipelineService;
this.metricsRegistry = metricsRegistry;
this.searchQueryMetricsEnabled = clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING);
this.searchRequestListenerManager = searchRequestListenerManager;
this.searchRequestOperationsListeners = searchRequestOperationsListeners;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);
}
Expand Down Expand Up @@ -480,10 +480,8 @@ private void executeRequest(
System::nanoTime
);
timeProvider.setEnabled(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED), originalSearchRequest);
SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestListenerManager.buildCompositeListener(
logger,
timeProvider
);
SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsListeners
.buildCompositeListener(logger, timeProvider);
SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners);
searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext);

Expand Down
10 changes: 5 additions & 5 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
import org.opensearch.action.search.SearchExecutionStatsCollector;
import org.opensearch.action.search.SearchPhaseController;
import org.opensearch.action.search.SearchRequestListenerManager;
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.action.search.SearchRequestOperationsListeners;
import org.opensearch.action.search.SearchRequestSlowLog;
import org.opensearch.action.search.SearchRequestStats;
import org.opensearch.action.search.SearchTransportService;
Expand Down Expand Up @@ -785,7 +785,7 @@ protected Node(
threadPool
);

final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService);
final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService.getClusterSettings());
final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService);

remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings);
Expand Down Expand Up @@ -881,8 +881,8 @@ protected Node(
)
.collect(Collectors.toList());

// register all standard SearchRequestOperationsListeners to the SearchRequestListenerManager
final SearchRequestListenerManager searchRequestListenerManager = new SearchRequestListenerManager(
// register all standard SearchRequestOperationsListeners to the SearchRequestOperationsListeners
final SearchRequestOperationsListeners searchRequestOperationsListeners = new SearchRequestOperationsListeners(
Stream.concat(
Stream.of(searchRequestStats, searchRequestSlowLog),
pluginComponents.stream()
Expand Down Expand Up @@ -1287,7 +1287,7 @@ protected Node(
b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker);
b.bind(SearchRequestListenerManager.class).toInstance(searchRequestListenerManager);
b.bind(SearchRequestOperationsListeners.class).toInstance(searchRequestOperationsListeners);
});
injector = modules.createInjector();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.WeightedRoutingStats;
import org.opensearch.cluster.service.ClusterManagerThrottlingStats;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.cluster.service.ClusterStateStats;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.metrics.OperationStats;
Expand Down Expand Up @@ -964,15 +963,11 @@ public void apply(String action, AdmissionControlActionType admissionControlActi
private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) {
NodeIndicesStats indicesStats = null;
if (remoteStoreStats) {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
indicesStats = new NodeIndicesStats(
new CommonStats(CommonStatsFlags.ALL),
new HashMap<>(),
new SearchRequestStats(clusterService)
new SearchRequestStats(clusterSettings)
);
RemoteSegmentStats remoteSegmentStats = indicesStats.getSegments().getRemoteSegmentStats();
remoteSegmentStats.addUploadBytesStarted(10L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -331,12 +330,8 @@ public void testSendSearchResponseDisallowPartialFailures() {
}

public void testOnPhaseFailureAndVerifyListeners() {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestStats testListener = new SearchRequestStats(clusterService);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
SearchRequestStats testListener = new SearchRequestStats(clusterSettings);

final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));
SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners);
Expand Down Expand Up @@ -599,12 +594,8 @@ public void onFailure(Exception e) {
}

public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedException {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestStats testListener = new SearchRequestStats(clusterService);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
SearchRequestStats testListener = new SearchRequestStats(clusterSettings);
final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));

long delay = (randomIntBetween(1, 5));
Expand Down Expand Up @@ -653,12 +644,8 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx
}

public void testOnPhaseListenersWithDfsType() throws InterruptedException {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestStats testListener = new SearchRequestStats(clusterService);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
SearchRequestStats testListener = new SearchRequestStats(clusterSettings);
final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));

SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,31 @@

import java.util.List;

public class SearchRequestListenerManagerTests extends OpenSearchTestCase {
public class SearchRequestOperationsListenersTests extends OpenSearchTestCase {
public void testAddAndGetListeners() {
SearchRequestOperationsListener testListener = createTestSearchRequestOperationsListener();
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(testListener);
assertEquals(1, listenerManager.getListeners().size());
assertEquals(testListener, listenerManager.getListeners().get(0));
SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener);
assertEquals(1, requestListeners.getListeners().size());
assertEquals(testListener, requestListeners.getListeners().get(0));
}

public void testStandardListenersEnabled() {
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener();
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(testListener1, testListener2);
SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1, testListener2);
testListener2.setEnabled(true);
SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(logger);
SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener(logger);
List<SearchRequestOperationsListener> listeners = compositeListener.getListeners();
assertEquals(1, listeners.size());
assertEquals(testListener2, listeners.get(0));
assertEquals(2, listenerManager.getListeners().size());
assertEquals(testListener1, listenerManager.getListeners().get(0));
assertEquals(testListener2, listenerManager.getListeners().get(1));
assertEquals(2, requestListeners.getListeners().size());
assertEquals(testListener1, requestListeners.getListeners().get(0));
assertEquals(testListener2, requestListeners.getListeners().get(1));
}

public void testStandardListenersAndTimeProvider() {
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(testListener1);
SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1);

testListener1.setEnabled(true);
TransportSearchAction.SearchTimeProvider timeProviderListener = new TransportSearchAction.SearchTimeProvider(
Expand All @@ -50,21 +50,21 @@ public void testStandardListenersAndTimeProvider() {
SearchRequest searchRequest = new SearchRequest().source(source);
searchRequest.setPhaseTook(true);
timeProviderListener.setEnabled(false, searchRequest);
SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(
SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener(
logger,
timeProviderListener
);
List<SearchRequestOperationsListener> listeners = compositeListener.getListeners();
assertEquals(2, listeners.size());
assertEquals(testListener1, listeners.get(0));
assertEquals(timeProviderListener, listeners.get(1));
assertEquals(1, listenerManager.getListeners().size());
assertEquals(testListener1, listenerManager.getListeners().get(0));
assertEquals(1, requestListeners.getListeners().size());
assertEquals(testListener1, requestListeners.getListeners().get(0));
}

public void testStandardListenersDisabledAndTimeProvider() {
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(testListener1);
SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1);
TransportSearchAction.SearchTimeProvider timeProviderListener = new TransportSearchAction.SearchTimeProvider(
0,
System.nanoTime(),
Expand All @@ -74,21 +74,21 @@ public void testStandardListenersDisabledAndTimeProvider() {
SearchRequest searchRequest = new SearchRequest().source(source);
searchRequest.setPhaseTook(true);
timeProviderListener.setEnabled(false, searchRequest);
SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(
SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener(
logger,
timeProviderListener
);
List<SearchRequestOperationsListener> listeners = compositeListener.getListeners();
assertEquals(1, listeners.size());
assertEquals(timeProviderListener, listeners.get(0));
assertEquals(1, listenerManager.getListeners().size());
assertEquals(testListener1, listenerManager.getListeners().get(0));
assertFalse(listenerManager.getListeners().get(0).getEnabled());
assertEquals(1, requestListeners.getListeners().size());
assertEquals(testListener1, requestListeners.getListeners().get(0));
assertFalse(requestListeners.getListeners().get(0).getEnabled());
}

public void testStandardListenerAndTimeProviderDisabled() {
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(testListener1);
SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1);

testListener1.setEnabled(true);
SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider(
Expand All @@ -99,15 +99,15 @@ public void testStandardListenerAndTimeProviderDisabled() {
SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery());
SearchRequest searchRequest = new SearchRequest().source(source);
searchRequest.setPhaseTook(false);
SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(
SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener(
logger,
timeProviderListener
);
List<SearchRequestOperationsListener> listeners = compositeListener.getListeners();
assertEquals(1, listeners.size());
assertEquals(testListener1, listeners.get(0));
assertEquals(1, listenerManager.getListeners().size());
assertEquals(testListener1, listenerManager.getListeners().get(0));
assertEquals(1, requestListeners.getListeners().size());
assertEquals(testListener1, requestListeners.getListeners().get(0));
}

public SearchRequestOperationsListener createTestSearchRequestOperationsListener() {
Expand Down
Loading

0 comments on commit 5a516d9

Please sign in to comment.