Skip to content

Commit

Permalink
Support dynamically adding SearchRequestOperationsListener (opensearc…
Browse files Browse the repository at this point in the history
…h-project#11526)

Along the way, also refactored TransportSearchAction.TimeProvider,
so that it's no longer a (redundant) listener.

---------

Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy authored Jan 11, 2024
1 parent 3cf1ce6 commit 6aab360
Show file tree
Hide file tree
Showing 28 changed files with 577 additions and 246 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add deleted doc count in _cat/shards ([#11678](https://github.com/opensearch-project/OpenSearch/pull/11678))
- Capture information for additional query types and aggregation types ([#11582](https://github.com/opensearch-project/OpenSearch/pull/11582))
- Use slice_size == shard_size heuristic in terms aggs for concurrent segment search and properly calculate the doc_count_error ([#11732](https://github.com/opensearch-project/OpenSearch/pull/11732))
- Added Support for dynamically adding SearchRequestOperationsListeners with SearchRequestOperationsCompositeListenerFactory ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.action.search.SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
import java.util.Set;
import java.util.function.Function;

import static org.opensearch.action.search.TransportSearchAction.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.action.search.SearchRequestStats.SEARCH_REQUEST_STATS_ENABLED_KEY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public final void start() {
0,
0,
buildTookInMillis(),
timeProvider.getPhaseTook(),
searchRequestContext.getPhaseTook(),
ShardSearchFailure.EMPTY_ARRAY,
clusters,
null
Expand Down Expand Up @@ -670,7 +670,7 @@ protected final SearchResponse buildSearchResponse(
successfulOps.get(),
skippedOps.get(),
buildTookInMillis(),
timeProvider.getPhaseTook(),
searchRequestContext.getPhaseTook(),
failures,
clusters,
searchContextId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.annotation.InternalApi;

import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

Expand All @@ -31,18 +29,14 @@ class SearchRequestContext {
private TotalHits totalHits;
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;

/**
* This constructor is for testing only
*/
SearchRequestContext() {
this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()));
}
private final SearchRequest searchRequest;

SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) {
SearchRequestContext(final SearchRequestOperationsListener searchRequestOperationsListener, final SearchRequest searchRequest) {
this.searchRequestOperationsListener = searchRequestOperationsListener;
this.absoluteStartNanos = System.nanoTime();
this.phaseTookMap = new HashMap<>();
this.shardStats = new EnumMap<>(ShardStatsFieldNames.class);
this.searchRequest = searchRequest;
}

SearchRequestOperationsListener getSearchRequestOperationsListener() {
Expand All @@ -57,6 +51,14 @@ Map<String, Long> phaseTookMap() {
return phaseTookMap;
}

SearchResponse.PhaseTook getPhaseTook() {
if (searchRequest != null && searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()) {
return new SearchResponse.PhaseTook(phaseTookMap);
} else {
return null;
}
}

/**
* Override absoluteStartNanos set in constructor.
* For testing only
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* SearchRequestOperationsCompositeListenerFactory contains listeners registered to search requests,
* and is responsible for creating the {@link SearchRequestOperationsListener.CompositeListener}
* with the all listeners enabled at cluster-level and request-level.
*
*
* @opensearch.internal
*/
public final class SearchRequestOperationsCompositeListenerFactory {
private final List<SearchRequestOperationsListener> searchRequestListenersList;

/**
* Create the SearchRequestOperationsCompositeListenerFactory and add multiple {@link SearchRequestOperationsListener}
* to the searchRequestListenersList.
* Those enabled listeners will be executed during each search request.
*
* @param listeners Multiple SearchRequestOperationsListener object to add.
* @throws IllegalArgumentException if any input listener is null.
*/
public SearchRequestOperationsCompositeListenerFactory(final SearchRequestOperationsListener... listeners) {
searchRequestListenersList = new ArrayList<>();
for (SearchRequestOperationsListener listener : listeners) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
searchRequestListenersList.add(listener);
}
}

/**
* Get searchRequestListenersList,
*
* @return List of SearchRequestOperationsListener
*/
public List<SearchRequestOperationsListener> getListeners() {
return searchRequestListenersList;
}

/**
* Create the {@link SearchRequestOperationsListener.CompositeListener}
* with the all listeners enabled at cluster-level and request-level.
*
* @param searchRequest The SearchRequest object used to decide which request-level listeners to add based on states/flags
* @param logger Logger to be attached to the {@link SearchRequestOperationsListener.CompositeListener}
* @param perRequestListeners the per-request listeners that can be optionally added to the returned CompositeListener list.
* @return SearchRequestOperationsListener.CompositeListener
*/
public SearchRequestOperationsListener.CompositeListener buildCompositeListener(
final SearchRequest searchRequest,
final Logger logger,
final SearchRequestOperationsListener... perRequestListeners
) {
final List<SearchRequestOperationsListener> searchListenersList = Stream.concat(
searchRequestListenersList.stream(),
Arrays.stream(perRequestListeners)
)
.filter((searchRequestOperationsListener -> searchRequestOperationsListener.isEnabled(searchRequest)))
.collect(Collectors.toList());

return new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,16 @@
* @opensearch.internal
*/
@InternalApi
abstract class SearchRequestOperationsListener {
public abstract class SearchRequestOperationsListener {
private volatile boolean enabled;

protected SearchRequestOperationsListener() {
this.enabled = true;
}

protected SearchRequestOperationsListener(final boolean enabled) {
this.enabled = enabled;
}

abstract void onPhaseStart(SearchPhaseContext context);

Expand All @@ -32,6 +41,18 @@ void onRequestStart(SearchRequestContext searchRequestContext) {}

void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

boolean isEnabled(SearchRequest searchRequest) {
return isEnabled();
}

boolean isEnabled() {
return enabled;
}

protected void setEnabled(final boolean enabled) {
this.enabled = enabled;
}

/**
* Holder of Composite Listeners
*
Expand Down Expand Up @@ -101,5 +122,9 @@ public void onRequestEnd(SearchPhaseContext context, SearchRequestContext search
}
}
}

public List<SearchRequestOperationsListener> getListeners() {
return listeners;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ public SearchRequestSlowLog(ClusterService clusterService) {
this.logger = logger;
Loggers.setLevel(this.logger, SlowLogLevel.TRACE.name());

this.warnThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING).nanos();
this.infoThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING).nanos();
this.debugThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING).nanos();
this.traceThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING).nanos();
this.level = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL);
this.setWarnThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING));
this.setInfoThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING));
this.setDebugThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING));
this.setTraceThreshold(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING));
this.setLevel(clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL));

clusterService.getClusterSettings()
.addSettingsUpdateConsumer(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING, this::setWarnThreshold);
Expand Down Expand Up @@ -233,18 +233,22 @@ private static String escapeJson(String text) {

void setWarnThreshold(TimeValue warnThreshold) {
this.warnThreshold = warnThreshold.nanos();
setEnabledIfThresholdExceed();
}

void setInfoThreshold(TimeValue infoThreshold) {
this.infoThreshold = infoThreshold.nanos();
setEnabledIfThresholdExceed();
}

void setDebugThreshold(TimeValue debugThreshold) {
this.debugThreshold = debugThreshold.nanos();
setEnabledIfThresholdExceed();
}

void setTraceThreshold(TimeValue traceThreshold) {
this.traceThreshold = traceThreshold.nanos();
setEnabledIfThresholdExceed();
}

void setLevel(SlowLogLevel level) {
Expand All @@ -270,4 +274,8 @@ protected long getTraceThreshold() {
SlowLogLevel getLevel() {
return level;
}

private void setEnabledIfThresholdExceed() {
super.setEnabled(this.warnThreshold >= 0 || this.debugThreshold >= 0 || this.infoThreshold >= 0 || this.traceThreshold >= 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.metrics.CounterMetric;
import org.opensearch.common.metrics.MeanMetric;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;

import java.util.EnumMap;
import java.util.Map;
Expand All @@ -26,8 +28,18 @@
public final class SearchRequestStats extends SearchRequestOperationsListener {
Map<SearchPhaseName, StatsHolder> phaseStatsMap = new EnumMap<>(SearchPhaseName.class);

public static final String SEARCH_REQUEST_STATS_ENABLED_KEY = "search.request_stats_enabled";
public static final Setting<Boolean> SEARCH_REQUEST_STATS_ENABLED = Setting.boolSetting(
SEARCH_REQUEST_STATS_ENABLED_KEY,
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

@Inject
public SearchRequestStats() {
public SearchRequestStats(ClusterSettings clusterSettings) {
this.setEnabled(clusterSettings.get(SEARCH_REQUEST_STATS_ENABLED));
clusterSettings.addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setEnabled);
for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
phaseStatsMap.put(searchPhaseName, new StatsHolder());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ final class SearchResponseMerger {
/**
* Add a search response to the list of responses to be merged together into one.
* Merges currently happen at once when all responses are available and
* {@link #getMergedResponse(SearchResponse.Clusters)} )} is called.
* {@link #getMergedResponse(SearchResponse.Clusters, SearchRequestContext)} )} is called.
* That may change in the future as it's possible to introduce incremental merges as responses come in if necessary.
*/
void add(SearchResponse searchResponse) {
Expand All @@ -126,7 +126,7 @@ int numResponses() {
* Returns the merged response. To be called once all responses have been added through {@link #add(SearchResponse)}
* so that all responses are merged into a single one.
*/
SearchResponse getMergedResponse(SearchResponse.Clusters clusters) {
SearchResponse getMergedResponse(SearchResponse.Clusters clusters, SearchRequestContext searchRequestContext) {
// if the search is only across remote clusters, none of them are available, and all of them have skip_unavailable set to true,
// we end up calling merge without anything to merge, we just return an empty search response
if (searchResponses.size() == 0) {
Expand Down Expand Up @@ -236,7 +236,7 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters) {
successfulShards,
skippedShards,
tookInMillis,
searchTimeProvider.getPhaseTook(),
searchRequestContext.getPhaseTook(),
shardFailures,
clusters,
null
Expand Down
Loading

0 comments on commit 6aab360

Please sign in to comment.