Skip to content

Commit

Permalink
Add dynamic index and cluster setting for concurrent segment search (o…
Browse files Browse the repository at this point in the history
…pensearch-project#7956)

* Add dynamic index and cluster setting for concurrent segment search

Signed-off-by: Jay Deng <[email protected]>

* Use feature flagged settings map

Signed-off-by: Jay Deng <[email protected]>

---------

Signed-off-by: Jay Deng <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
jed326 authored and shiv0408 committed Apr 25, 2024
1 parent 6ecf823 commit e4f5504
Show file tree
Hide file tree
Showing 17 changed files with 334 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
- Add support for ignoring missing Javadoc on generated code using annotation ([#7604](https://github.com/opensearch-project/OpenSearch/pull/7604))
- Implement concurrent aggregations support without profile option ([#7514](https://github.com/opensearch-project/OpenSearch/pull/7514))
- Add dynamic index and cluster setting for concurrent segment search ([#7956](https://github.com/opensearch-project/OpenSearch/pull/7956))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
7 changes: 7 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,11 @@ ${path.logs}
#
# Gates the search pipeline feature. This feature enables configurable processors
# for search requests and search responses, similar to ingest pipelines.
#
#opensearch.experimental.feature.search_pipeline.enabled: false
#
#
# Gates the concurrent segment search feature. This feature enables concurrent segment search in a separate
# index searcher threadpool.
#
#opensearch.experimental.feature.concurrent_segment_search.enabled: false
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,8 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesService.CLUSTER_REMOTE_STORE_REPOSITORY_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_STORE_ENABLED_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING
)
),
List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH),
List.of(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING,
IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
)
),
FeatureFlags.CONCURRENT_SEGMENT_SEARCH,
List.of(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING)
);

public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS);
Expand Down
15 changes: 14 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,13 @@ public final class IndexSettings {
Property.IndexScope
);

public static final Setting<Boolean> INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING = Setting.boolSetting(
"index.search.concurrent_segment_search.enabled",
false,
Property.IndexScope,
Property.Dynamic
);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down Expand Up @@ -1602,7 +1609,13 @@ public void setDefaultSearchPipeline(String defaultSearchPipeline) {
if (FeatureFlags.isEnabled(SEARCH_PIPELINE)) {
this.defaultSearchPipeline = defaultSearchPipeline;
} else {
throw new SettingsException("Unsupported setting: " + DEFAULT_SEARCH_PIPELINE.getKey());
throw new SettingsException(
"Unable to update setting: "
+ DEFAULT_SEARCH_PIPELINE.getKey()
+ ". This is an experimental feature that is currently disabled, please enable the "
+ SEARCH_PIPELINE
+ " feature flag first."
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.lease.Releasables;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.IndexService;
Expand Down Expand Up @@ -104,6 +105,8 @@
import java.util.function.Function;
import java.util.function.LongSupplier;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;

/**
* The main search context used during search phase
*
Expand Down Expand Up @@ -869,6 +872,25 @@ public Profilers getProfilers() {
return profilers;
}

/**
* Returns concurrent segment search status for the search context
*/
@Override
public boolean isConcurrentSegmentSearchEnabled() {
if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)
&& (clusterService != null)
&& (searcher().getExecutor() != null)) {
return indexService.getIndexSettings()
.getSettings()
.getAsBoolean(
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(),
clusterService.getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
} else {
return false;
}
}

public void setProfilers(Profilers profilers) {
this.profilers = profilers;
}
Expand Down
15 changes: 4 additions & 11 deletions server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.fetch.subphase.highlight.PlainHighlighter;
import org.opensearch.search.fetch.subphase.highlight.UnifiedHighlighter;
import org.opensearch.search.query.ConcurrentQueryPhaseSearcher;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.search.query.QueryPhaseSearcherWrapper;
import org.opensearch.search.rescore.QueryRescorerBuilder;
import org.opensearch.search.rescore.RescorerBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
Expand Down Expand Up @@ -1258,8 +1258,8 @@ private QueryPhaseSearcher registerQueryPhaseSearcher(List<SearchPlugin> plugins
}
}

if (searcher == null && FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) {
searcher = new ConcurrentQueryPhaseSearcher();
if (searcher == null) {
searcher = new QueryPhaseSearcherWrapper();
}
return searcher;
}
Expand Down Expand Up @@ -1290,14 +1290,7 @@ public FetchPhase getFetchPhase() {
}

public QueryPhase getQueryPhase() {
QueryPhase queryPhase;
if (queryPhaseSearcher == null) {
// use the defaults
queryPhase = new QueryPhase();
} else {
queryPhase = new QueryPhase(queryPhaseSearcher);
}
return queryPhase;
return new QueryPhase(queryPhaseSearcher);
}

public @Nullable ExecutorService getIndexSearcherExecutor(ThreadPool pool) {
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

public static final Setting<Boolean> CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING = Setting.boolSetting(
"search.concurrent_segment_search.enabled",
true,
Property.Dynamic,
Property.NodeScope
);

public static final int DEFAULT_SIZE = 10;
public static final int DEFAULT_FROM = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,13 @@ public final void assignRescoreDocIds(RescoreDocIds rescoreDocIds) {
*/
public abstract Profilers getProfilers();

/**
* Returns concurrent segment search status for the search context
*/
public boolean isConcurrentSegmentSearchEnabled() {
return false;
}

/**
* Adds a releasable that will be freed when this context is closed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,7 @@ protected boolean searchWithCollector(
boolean hasFilterCollector,
boolean hasTimeout
) throws IOException {
boolean couldUseConcurrentSegmentSearch = allowConcurrentSegmentSearch(searcher);

if (couldUseConcurrentSegmentSearch) {
LOGGER.debug("Using concurrent search over index segments (experimental)");
return searchWithCollectorManager(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
} else {
return super.searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
}
return searchWithCollectorManager(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
}

private static boolean searchWithCollectorManager(
Expand Down Expand Up @@ -108,9 +101,4 @@ private static boolean searchWithCollectorManager(
public AggregationProcessor aggregationProcessor(SearchContext searchContext) {
return aggregationProcessor;
}

private static boolean allowConcurrentSegmentSearch(final ContextIndexSearcher searcher) {
return (searcher.getExecutor() != null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.query;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.Query;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.search.aggregations.AggregationProcessor;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.SearchContext;
import org.apache.lucene.search.CollectorManager;

import java.io.IOException;
import java.util.LinkedList;

/**
* Wrapper class for QueryPhaseSearcher that handles path selection for concurrent vs
* non-concurrent search query phase and aggregation processor.
*
* @opensearch.internal
*/
public class QueryPhaseSearcherWrapper implements QueryPhaseSearcher {
private static final Logger LOGGER = LogManager.getLogger(QueryPhaseSearcherWrapper.class);
private final QueryPhaseSearcher defaultQueryPhaseSearcher;
private final QueryPhaseSearcher concurrentQueryPhaseSearcher;

public QueryPhaseSearcherWrapper() {
this.defaultQueryPhaseSearcher = new QueryPhase.DefaultQueryPhaseSearcher();
this.concurrentQueryPhaseSearcher = FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)
? new ConcurrentQueryPhaseSearcher()
: null;
}

/**
* Perform search using {@link CollectorManager}
*
* @param searchContext search context
* @param searcher context index searcher
* @param query query
* @param hasTimeout "true" if timeout was set, "false" otherwise
* @return is rescoring required or not
* @throws java.io.IOException IOException
*/
@Override
public boolean searchWith(
SearchContext searchContext,
ContextIndexSearcher searcher,
Query query,
LinkedList<QueryCollectorContext> collectors,
boolean hasFilterCollector,
boolean hasTimeout
) throws IOException {
if (searchContext.isConcurrentSegmentSearchEnabled()) {
LOGGER.info("Using concurrent search over segments (experimental)");
return concurrentQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
} else {
return defaultQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
}
}

/**
* {@link AggregationProcessor} to use to setup and post process aggregation related collectors during search request
* @param searchContext search context
* @return {@link AggregationProcessor} to use
*/
@Override
public AggregationProcessor aggregationProcessor(SearchContext searchContext) {
if (searchContext.isConcurrentSegmentSearchEnabled()) {
LOGGER.info("Using concurrent search over segments (experimental)");
return concurrentQueryPhaseSearcher.aggregationProcessor(searchContext);
} else {
return defaultQueryPhaseSearcher.aggregationProcessor(searchContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.opensearch.common.settings.Setting.Property;
import org.hamcrest.Matchers;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexSettings;
import org.opensearch.search.SearchService;
import org.opensearch.test.FeatureFlagSetter;

import java.util.Arrays;
Expand Down Expand Up @@ -282,4 +284,55 @@ public void testDynamicIndexSettingsRegistration() {
() -> module.registerDynamicSetting(Setting.floatSetting("index.custom.setting2", 1.0f, Property.IndexScope))
);
}

public void testConcurrentSegmentSearchClusterSettings() {
// Test that we throw an exception without the feature flag
Settings settings = Settings.builder().put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build();
SettingsException ex = expectThrows(SettingsException.class, () -> new SettingsModule(settings));
assertEquals(
"unknown setting ["
+ SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey()
+ "] please check that any required plugins are installed, or check the breaking "
+ "changes documentation for removed settings",
ex.getMessage()
);

// Test that the settings updates correctly with the feature flag
FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH);
boolean settingValue = randomBoolean();
Settings settingsWithFeatureFlag = Settings.builder()
.put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), settingValue)
.build();
SettingsModule settingsModule = new SettingsModule(settingsWithFeatureFlag);
assertEquals(settingValue, SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.get(settingsModule.getSettings()));
}

public void testConcurrentSegmentSearchIndexSettings() {
Settings.Builder target = Settings.builder().put(Settings.EMPTY);
Settings.Builder update = Settings.builder();

// Test that we throw an exception without the feature flag
SettingsModule module = new SettingsModule(Settings.EMPTY);
IndexScopedSettings indexScopedSettings = module.getIndexScopedSettings();
expectThrows(
SettingsException.class,
() -> indexScopedSettings.updateDynamicSettings(
Settings.builder().put(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(),
target,
update,
"node"
)
);

// Test that the settings updates correctly with the feature flag
FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH);
SettingsModule moduleWithFeatureFlag = new SettingsModule(Settings.EMPTY);
IndexScopedSettings indexScopedSettingsWithFeatureFlag = moduleWithFeatureFlag.getIndexScopedSettings();
indexScopedSettingsWithFeatureFlag.updateDynamicSettings(
Settings.builder().put(IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(),
target,
update,
"node"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.search.query.ConcurrentQueryPhaseSearcher;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.search.query.QueryPhaseSearcherWrapper;
import org.opensearch.search.rescore.QueryRescorerBuilder;
import org.opensearch.search.rescore.RescoreContext;
import org.opensearch.search.rescore.RescorerBuilder;
Expand Down Expand Up @@ -425,7 +426,7 @@ public void testDefaultQueryPhaseSearcher() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList());
TestSearchContext searchContext = new TestSearchContext(null);
QueryPhase queryPhase = searchModule.getQueryPhase();
assertTrue(queryPhase.getQueryPhaseSearcher() instanceof QueryPhase.DefaultQueryPhaseSearcher);
assertTrue(queryPhase.getQueryPhaseSearcher() instanceof QueryPhaseSearcherWrapper);
assertTrue(queryPhase.getQueryPhaseSearcher().aggregationProcessor(searchContext) instanceof DefaultAggregationProcessor);
}

Expand All @@ -434,8 +435,9 @@ public void testConcurrentQueryPhaseSearcher() {
FeatureFlags.initializeFeatureFlags(settings);
SearchModule searchModule = new SearchModule(settings, Collections.emptyList());
TestSearchContext searchContext = new TestSearchContext(null);
searchContext.setConcurrentSegmentSearchEnabled(true);
QueryPhase queryPhase = searchModule.getQueryPhase();
assertTrue(queryPhase.getQueryPhaseSearcher() instanceof ConcurrentQueryPhaseSearcher);
assertTrue(queryPhase.getQueryPhaseSearcher() instanceof QueryPhaseSearcherWrapper);
assertTrue(queryPhase.getQueryPhaseSearcher().aggregationProcessor(searchContext) instanceof ConcurrentAggregationProcessor);
FeatureFlags.initializeFeatureFlags(Settings.EMPTY);
}
Expand Down
Loading

0 comments on commit e4f5504

Please sign in to comment.