Skip to content

Commit

Permalink
rename SearchRequestListenerManager to SearchRequestOperationsListeners
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Jan 5, 2024
1 parent 152e6db commit 74aa960
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,30 @@
import java.util.stream.Stream;

/**
* SearchRequestListenerManager manages listeners registered to search requests,
* SearchRequestOperationsListeners contains listeners registered to search requests,
* and is responsible for creating the {@link SearchRequestOperationsListener.CompositeListener}
* with the all listeners enabled at cluster-level and request-level.
*
*
* @opensearch.internal
*/
public class SearchRequestListenerManager {
public class SearchRequestOperationsListeners {
private final List<SearchRequestOperationsListener> searchRequestListenersList;

/**
* Create the SearchRequestListenerManager and add multiple {@link SearchRequestOperationsListener}
* Create the SearchRequestOperationsListeners and add multiple {@link SearchRequestOperationsListener}
* to the searchRequestListenersList.
* Those enabled listeners will be executed during each search request.
*
* @param listeners Multiple SearchRequestOperationsListener object to add.
* @throws IllegalArgumentException if any input listener is null or already exists in the list.
* @throws IllegalArgumentException if any input listener is null.
*/
public SearchRequestListenerManager(SearchRequestOperationsListener... listeners) {
public SearchRequestOperationsListeners(SearchRequestOperationsListener... listeners) {
searchRequestListenersList = new ArrayList<>();
for (SearchRequestOperationsListener listener : listeners) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
if (searchRequestListenersList.contains(listener)) {
throw new IllegalArgumentException("listener already added");
}
searchRequestListenersList.add(listener);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@

package org.opensearch.action.search;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.metrics.MeanMetric;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;

import java.util.EnumMap;
Expand All @@ -37,9 +37,9 @@ public final class SearchRequestStats extends SearchRequestOperationsListener {
);

@Inject
public SearchRequestStats(ClusterService clusterService) {
this.setEnabled(clusterService.getClusterSettings().get(SEARCH_REQUEST_STATS_ENABLED));
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled);
public SearchRequestStats(ClusterSettings clusterSettings) {
this.setEnabled(clusterSettings.get(SEARCH_REQUEST_STATS_ENABLED));
clusterSettings.addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled);
for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
phaseStatsMap.put(searchPhaseName, new StatsHolder());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final NamedWriteableRegistry namedWriteableRegistry;
private final CircuitBreaker circuitBreaker;
private final SearchPipelineService searchPipelineService;
private final SearchRequestListenerManager searchRequestListenerManager;
private final SearchRequestOperationsListeners searchRequestOperationsListeners;

private volatile boolean searchQueryMetricsEnabled;

Expand All @@ -196,7 +196,7 @@ public TransportSearchAction(
NamedWriteableRegistry namedWriteableRegistry,
SearchPipelineService searchPipelineService,
MetricsRegistry metricsRegistry,
SearchRequestListenerManager searchRequestListenerManager
SearchRequestOperationsListeners searchRequestOperationsListeners
) {
super(SearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
this.client = client;
Expand All @@ -213,7 +213,7 @@ public TransportSearchAction(
this.searchPipelineService = searchPipelineService;
this.metricsRegistry = metricsRegistry;
this.searchQueryMetricsEnabled = clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING);
this.searchRequestListenerManager = searchRequestListenerManager;
this.searchRequestOperationsListeners = searchRequestOperationsListeners;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);
}
Expand Down Expand Up @@ -480,10 +480,8 @@ private void executeRequest(
System::nanoTime
);
timeProvider.setEnabled(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED), originalSearchRequest);
SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestListenerManager.buildCompositeListener(
logger,
timeProvider
);
SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsListeners
.buildCompositeListener(logger, timeProvider);
SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners);
searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext);

Expand Down
10 changes: 5 additions & 5 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus;
import org.opensearch.action.search.SearchExecutionStatsCollector;
import org.opensearch.action.search.SearchPhaseController;
import org.opensearch.action.search.SearchRequestListenerManager;
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.action.search.SearchRequestOperationsListeners;
import org.opensearch.action.search.SearchRequestSlowLog;
import org.opensearch.action.search.SearchRequestStats;
import org.opensearch.action.search.SearchTransportService;
Expand Down Expand Up @@ -785,7 +785,7 @@ protected Node(
threadPool
);

final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService);
final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService.getClusterSettings());
final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService);

remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings);
Expand Down Expand Up @@ -881,8 +881,8 @@ protected Node(
)
.collect(Collectors.toList());

// register all standard SearchRequestOperationsListeners to the SearchRequestListenerManager
final SearchRequestListenerManager searchRequestListenerManager = new SearchRequestListenerManager(
// register all standard SearchRequestOperationsListeners to the SearchRequestOperationsListeners
final SearchRequestOperationsListeners searchRequestOperationsListeners = new SearchRequestOperationsListeners(
Stream.concat(
Stream.of(searchRequestStats, searchRequestSlowLog),
pluginComponents.stream()
Expand Down Expand Up @@ -1287,7 +1287,7 @@ protected Node(
b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker);
b.bind(SearchRequestListenerManager.class).toInstance(searchRequestListenerManager);
b.bind(SearchRequestOperationsListeners.class).toInstance(searchRequestOperationsListeners);
});
injector = modules.createInjector();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.WeightedRoutingStats;
import org.opensearch.cluster.service.ClusterManagerThrottlingStats;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.cluster.service.ClusterStateStats;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.metrics.OperationStats;
Expand Down Expand Up @@ -964,15 +963,11 @@ public void apply(String action, AdmissionControlActionType admissionControlActi
private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) {
NodeIndicesStats indicesStats = null;
if (remoteStoreStats) {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
indicesStats = new NodeIndicesStats(
new CommonStats(CommonStatsFlags.ALL),
new HashMap<>(),
new SearchRequestStats(clusterService)
new SearchRequestStats(clusterSettings)
);
RemoteSegmentStats remoteSegmentStats = indicesStats.getSegments().getRemoteSegmentStats();
remoteSegmentStats.addUploadBytesStarted(10L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -331,12 +330,8 @@ public void testSendSearchResponseDisallowPartialFailures() {
}

public void testOnPhaseFailureAndVerifyListeners() {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestStats testListener = new SearchRequestStats(clusterService);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
SearchRequestStats testListener = new SearchRequestStats(clusterSettings);

final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));
SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners);
Expand Down Expand Up @@ -599,12 +594,8 @@ public void onFailure(Exception e) {
}

public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedException {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestStats testListener = new SearchRequestStats(clusterService);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
SearchRequestStats testListener = new SearchRequestStats(clusterSettings);
final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));

long delay = (randomIntBetween(1, 5));
Expand Down Expand Up @@ -653,12 +644,8 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx
}

public void testOnPhaseListenersWithDfsType() throws InterruptedException {
ClusterService clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestStats testListener = new SearchRequestStats(clusterService);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
SearchRequestStats testListener = new SearchRequestStats(clusterSettings);
final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));

SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,31 @@

import java.util.List;

public class SearchRequestListenerManagerTests extends OpenSearchTestCase {
public class SearchRequestOperationsListenersTests extends OpenSearchTestCase {
public void testAddAndGetListeners() {
SearchRequestOperationsListener testListener = createTestSearchRequestOperationsListener();
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(testListener);
assertEquals(1, listenerManager.getListeners().size());
assertEquals(testListener, listenerManager.getListeners().get(0));
SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener);
assertEquals(1, requestListeners.getListeners().size());
assertEquals(testListener, requestListeners.getListeners().get(0));
}

public void testStandardListenersEnabled() {
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
SearchRequestOperationsListener testListener2 = createTestSearchRequestOperationsListener();
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(testListener1, testListener2);
SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1, testListener2);
testListener2.setEnabled(true);
SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(logger);
SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener(logger);
List<SearchRequestOperationsListener> listeners = compositeListener.getListeners();
assertEquals(1, listeners.size());
assertEquals(testListener2, listeners.get(0));
assertEquals(2, listenerManager.getListeners().size());
assertEquals(testListener1, listenerManager.getListeners().get(0));
assertEquals(testListener2, listenerManager.getListeners().get(1));
assertEquals(2, requestListeners.getListeners().size());
assertEquals(testListener1, requestListeners.getListeners().get(0));
assertEquals(testListener2, requestListeners.getListeners().get(1));
}

public void testStandardListenersAndTimeProvider() {
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(testListener1);
SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1);

testListener1.setEnabled(true);
TransportSearchAction.SearchTimeProvider timeProviderListener = new TransportSearchAction.SearchTimeProvider(
Expand All @@ -50,21 +50,21 @@ public void testStandardListenersAndTimeProvider() {
SearchRequest searchRequest = new SearchRequest().source(source);
searchRequest.setPhaseTook(true);
timeProviderListener.setEnabled(false, searchRequest);
SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(
SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener(
logger,
timeProviderListener
);
List<SearchRequestOperationsListener> listeners = compositeListener.getListeners();
assertEquals(2, listeners.size());
assertEquals(testListener1, listeners.get(0));
assertEquals(timeProviderListener, listeners.get(1));
assertEquals(1, listenerManager.getListeners().size());
assertEquals(testListener1, listenerManager.getListeners().get(0));
assertEquals(1, requestListeners.getListeners().size());
assertEquals(testListener1, requestListeners.getListeners().get(0));
}

public void testStandardListenersDisabledAndTimeProvider() {
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(testListener1);
SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1);
TransportSearchAction.SearchTimeProvider timeProviderListener = new TransportSearchAction.SearchTimeProvider(
0,
System.nanoTime(),
Expand All @@ -74,21 +74,21 @@ public void testStandardListenersDisabledAndTimeProvider() {
SearchRequest searchRequest = new SearchRequest().source(source);
searchRequest.setPhaseTook(true);
timeProviderListener.setEnabled(false, searchRequest);
SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(
SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener(
logger,
timeProviderListener
);
List<SearchRequestOperationsListener> listeners = compositeListener.getListeners();
assertEquals(1, listeners.size());
assertEquals(timeProviderListener, listeners.get(0));
assertEquals(1, listenerManager.getListeners().size());
assertEquals(testListener1, listenerManager.getListeners().get(0));
assertFalse(listenerManager.getListeners().get(0).getEnabled());
assertEquals(1, requestListeners.getListeners().size());
assertEquals(testListener1, requestListeners.getListeners().get(0));
assertFalse(requestListeners.getListeners().get(0).getEnabled());
}

public void testStandardListenerAndTimeProviderDisabled() {
SearchRequestOperationsListener testListener1 = createTestSearchRequestOperationsListener();
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(testListener1);
SearchRequestOperationsListeners requestListeners = new SearchRequestOperationsListeners(testListener1);

testListener1.setEnabled(true);
SearchRequestOperationsListener timeProviderListener = new TransportSearchAction.SearchTimeProvider(
Expand All @@ -99,15 +99,15 @@ public void testStandardListenerAndTimeProviderDisabled() {
SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery());
SearchRequest searchRequest = new SearchRequest().source(source);
searchRequest.setPhaseTook(false);
SearchRequestOperationsListener.CompositeListener compositeListener = listenerManager.buildCompositeListener(
SearchRequestOperationsListener.CompositeListener compositeListener = requestListeners.buildCompositeListener(
logger,
timeProviderListener
);
List<SearchRequestOperationsListener> listeners = compositeListener.getListeners();
assertEquals(1, listeners.size());
assertEquals(testListener1, listeners.get(0));
assertEquals(1, listenerManager.getListeners().size());
assertEquals(testListener1, listenerManager.getListeners().get(0));
assertEquals(1, requestListeners.getListeners().size());
assertEquals(testListener1, requestListeners.getListeners().get(0));
}

public SearchRequestOperationsListener createTestSearchRequestOperationsListener() {
Expand Down
Loading

0 comments on commit 74aa960

Please sign in to comment.