Skip to content

Commit

Permalink
improve support for multithreading
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Dec 12, 2023
1 parent 4c11760 commit 3448338
Show file tree
Hide file tree
Showing 15 changed files with 142 additions and 110 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Interpret byte array as primitive using VarHandles ([#11362](https://github.com/opensearch-project/OpenSearch/pull/11362))
- Change error message when per shard document limit is breached ([#11312](https://github.com/opensearch-project/OpenSearch/pull/11312))
- Restore support for Java 8 for RestClient ([#11562](https://github.com/opensearch-project/OpenSearch/pull/11562))
- Added Support for dynamically adding SearchRequestOperationsListeners ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@

import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Setting;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;


/**
Expand All @@ -38,7 +38,6 @@ public class SearchRequestListenerManager {
);
private final List<SearchRequestOperationsListener> searchRequestListenersList;

@Inject
public SearchRequestListenerManager(
ClusterService clusterService
) {
Expand All @@ -47,20 +46,22 @@ public SearchRequestListenerManager(
}

/**
* Add a {@link SearchRequestOperationsListener} to the searchRequestListenersList,
* which will be executed during each search request.
* Add multiple {@link SearchRequestOperationsListener} to the searchRequestListenersList.
* Those enabled listeners will be executed during each search request.
*
* @param listener A SearchRequestOperationsListener object to add.
* @throws IllegalArgumentException if the input listener is null or already exists in the list.
* @param listeners Multiple SearchRequestOperationsListener object to add.
* @throws IllegalArgumentException if any input listener is null or already exists in the list.
*/
public void addListener(SearchRequestOperationsListener listener) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
if (searchRequestListenersList.contains(listener)) {
throw new IllegalArgumentException("listener already added");
public void addListeners(SearchRequestOperationsListener... listeners) {
for (SearchRequestOperationsListener listener : listeners) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
if (searchRequestListenersList.contains(listener)) {
throw new IllegalArgumentException("listener already added");
}
searchRequestListenersList.add(listener);
}
searchRequestListenersList.add(listener);
}

/**
Expand Down Expand Up @@ -104,9 +105,9 @@ public SearchRequestOperationsListener.CompositeListener buildCompositeListener(
Logger logger,
SearchRequestOperationsListener... perRequestListeners
) {
final List<SearchRequestOperationsListener> searchListenersList = new ArrayList<>(searchRequestListenersList);
final List<SearchRequestOperationsListener> searchListenersList = searchRequestListenersList.stream().filter(SearchRequestOperationsListener::getEnabled).collect(Collectors.toList());

Arrays.stream(perRequestListeners).parallel().forEach((listener) -> {
Arrays.stream(perRequestListeners).forEach((listener) -> {
if (listener != null && listener.getClass() == TransportSearchAction.SearchTimeProvider.class) {
TransportSearchAction.SearchTimeProvider timeProvider = (TransportSearchAction.SearchTimeProvider) listener;
// phase_took is enabled with request param and/or cluster setting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@
* @opensearch.internal
*/
@InternalApi
abstract class SearchRequestOperationsListener {
protected SearchRequestListenerManager searchRequestListenerManager;
public abstract class SearchRequestOperationsListener {
private volatile boolean enabled;

protected SearchRequestOperationsListener() {
this.enabled = false;
}
protected SearchRequestOperationsListener(boolean enabled) {
this.enabled = enabled;
}

abstract void onPhaseStart(SearchPhaseContext context);

Expand All @@ -33,33 +40,13 @@ void onRequestStart(SearchRequestContext searchRequestContext) {}

void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

public void setEnabled(boolean enabled) {
if (enabled) {
register();
} else {
deregister();
}
}


/**
* Handler function to register this listener to certain components
* This function will be called when the listener is enabled.
*/
protected void register() {
if (this.searchRequestListenerManager != null) {
this.searchRequestListenerManager.addListener(this);
}
boolean getEnabled() {
return enabled;
}

/**
* Handler function to deregister this listener from certain components
* This function will be called when the listener is disabled.
*/
protected void deregister() {
if (this.searchRequestListenerManager != null) {
this.searchRequestListenerManager.removeListener(this);
}
void setEnabled(boolean enabled) {
this.enabled = enabled;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.logging.OpenSearchLogMessage;
import org.opensearch.common.logging.SlowLogLevel;
Expand Down Expand Up @@ -109,18 +108,14 @@ public final class SearchRequestSlowLog extends SearchRequestOperationsListener

private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

@Inject
public SearchRequestSlowLog(
ClusterService clusterService,
SearchRequestListenerManager searchRequestListenerManager
ClusterService clusterService
) {
this(clusterService, searchRequestListenerManager, LogManager.getLogger(CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX)); // logger configured in log4j2.properties
this(clusterService, LogManager.getLogger(CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX)); // logger configured in log4j2.properties
}

@Inject
SearchRequestSlowLog(ClusterService clusterService, SearchRequestListenerManager searchRequestListenerManager, Logger logger) {
SearchRequestSlowLog(ClusterService clusterService, Logger logger) {
this.logger = logger;
this.searchRequestListenerManager = searchRequestListenerManager;
Loggers.setLevel(this.logger, SlowLogLevel.TRACE.name());

this.warnThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING).nanos();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ public final class SearchRequestStats extends SearchRequestOperationsListener {

@Inject
public SearchRequestStats(
ClusterService clusterService,
SearchRequestListenerManager searchRequestListenerManager
ClusterService clusterService
) {
clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled);
for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
phaseStatsMap.put(searchPhaseName, new StatsHolder());
}
this.searchRequestListenerManager = searchRequestListenerManager;
}

public long getPhaseCurrent(SearchPhaseName searchPhaseName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@
import org.apache.logging.log4j.LogManager;
import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.opensearch.action.admin.indices.close.TransportCloseIndexAction;
import org.opensearch.action.search.*;
import org.opensearch.action.search.CreatePitController;
import org.opensearch.action.search.SearchRequestListenerManager;
import org.opensearch.action.search.SearchRequestSlowLog;
import org.opensearch.action.search.SearchRequestStats;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.support.AutoCreateIndex;
import org.opensearch.action.support.DestructiveOperations;
import org.opensearch.action.support.replication.TransportReplicationAction;
Expand Down
12 changes: 9 additions & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -783,10 +783,16 @@ protected Node(
repositoriesServiceReference::get,
threadPool
);
final SearchRequestListenerManager searchRequestListenerManager = new SearchRequestListenerManager(clusterService);

final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService, searchRequestListenerManager);
final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, searchRequestListenerManager);
final SearchRequestStats searchRequestStats = new SearchRequestStats(clusterService);
final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService);

// register all standard SearchRequestOperationsListeners to the SearchRequestListenerManager
final SearchRequestListenerManager searchRequestListenerManager = new SearchRequestListenerManager(clusterService);
searchRequestListenerManager.addListeners(
searchRequestStats,
searchRequestSlowLog
);

remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings);
final IndicesService indicesService = new IndicesService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.search.SearchRequestListenerManager;
import org.opensearch.action.search.SearchRequestStats;
import org.opensearch.cluster.coordination.PendingClusterStateStats;
import org.opensearch.cluster.coordination.PersistedStateStats;
Expand Down Expand Up @@ -970,8 +969,7 @@ private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>(), new SearchRequestStats(clusterService, listenerManager));
indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>(), new SearchRequestStats(clusterService));
RemoteSegmentStats remoteSegmentStats = indicesStats.getSegments().getRemoteSegmentStats();
remoteSegmentStats.addUploadBytesStarted(10L);
remoteSegmentStats.addUploadBytesSucceeded(10L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,7 @@ public void testOnPhaseFailureAndVerifyListeners() {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestStats testListener = new SearchRequestStats(clusterService, listenerManager);
SearchRequestStats testListener = new SearchRequestStats(clusterService);

final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));
SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners);
Expand Down Expand Up @@ -605,8 +604,7 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestStats testListener = new SearchRequestStats(clusterService, listenerManager);
SearchRequestStats testListener = new SearchRequestStats(clusterService);
final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));

long delay = (randomIntBetween(1, 5));
Expand Down Expand Up @@ -660,8 +658,7 @@ public void testOnPhaseListenersWithDfsType() throws InterruptedException {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null
);
SearchRequestListenerManager listenerManager = new SearchRequestListenerManager(clusterService);
SearchRequestStats testListener = new SearchRequestStats(clusterService, listenerManager);
SearchRequestStats testListener = new SearchRequestStats(clusterService);
final List<SearchRequestOperationsListener> requestOperationListeners = new ArrayList<>(List.of(testListener));

SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction(
Expand Down
Loading

0 comments on commit 3448338

Please sign in to comment.