Skip to content

Commit

Permalink
Disable concurrent segment search for system indices and throttled se…
Browse files Browse the repository at this point in the history
…arch requests

Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Mar 28, 2024
1 parent 6ddbdcd commit 68450d8
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add a counter to node stat api to track shard going from idle to non-idle ([#12768](https://github.com/opensearch-project/OpenSearch/pull/12768))
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,11 @@ public boolean shouldUseConcurrentSearch() {
* Evaluate if parsed request supports concurrent segment search
*/
public void evaluateRequestShouldUseConcurrentSearch() {
if (sort != null && sort.isSortOnTimeSeriesField()) {
// Do not use concurrent segment search for system indices or throttled requests. See:
// https://github.com/opensearch-project/OpenSearch/issues/12951
if (indexShard.isSystem() || indexShard.indexSettings().isSearchThrottled()) {
requestShouldUseConcurrentSearch.set(false);
} else if (sort != null && sort.isSortOnTimeSeriesField()) {
requestShouldUseConcurrentSearch.set(false);
} else if (aggregations() != null
&& aggregations().factories() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import java.util.function.Function;
import java.util.function.Supplier;

import static org.opensearch.index.IndexSettings.INDEX_SEARCH_THROTTLED;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.any;
Expand Down Expand Up @@ -551,7 +552,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
}
}

public void testSearchPathEvaluationUsingSortField() throws Exception {
public void testSearchPathEvaluation() throws Exception {
ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class);
when(shardSearchRequest.searchType()).thenReturn(SearchType.DEFAULT);
ShardId shardId = new ShardId("index", UUID.randomUUID().toString(), 1);
Expand All @@ -578,9 +579,24 @@ public void testSearchPathEvaluationUsingSortField() throws Exception {
IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build();
IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
when(indexService.getIndexSettings()).thenReturn(indexSettings);
when(indexShard.indexSettings()).thenReturn(indexSettings);

BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());

IndexShard systemIndexShard = mock(IndexShard.class);
when(systemIndexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy);
when(systemIndexShard.getThreadPool()).thenReturn(threadPool);
when(systemIndexShard.isSystem()).thenReturn(true);

IndexShard throttledIndexShard = mock(IndexShard.class);
when(throttledIndexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy);
when(throttledIndexShard.getThreadPool()).thenReturn(threadPool);
IndexSettings throttledIndexSettings = new IndexSettings(
indexMetadata,
Settings.builder().put(INDEX_SEARCH_THROTTLED.getKey(), true).build()
);
when(throttledIndexShard.indexSettings()).thenReturn(throttledIndexSettings);

try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {

final Supplier<Engine.SearcherSupplier> searcherSupplier = () -> new Engine.SearcherSupplier(Function.identity()) {
Expand Down Expand Up @@ -697,6 +713,62 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
}
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);

// Case 4: With a system index concurrent segment search is not used
readerContext = new ReaderContext(
newContextId(),
indexService,
systemIndexShard,
searcherSupplier.get(),
randomNonNegativeLong(),
false
);
context = new DefaultSearchContext(
readerContext,
shardSearchRequest,
target,
null,
bigArrays,
null,
null,
null,
false,
Version.CURRENT,
false,
executor,
null
);
context.evaluateRequestShouldUseConcurrentSearch();
assertFalse(context.shouldUseConcurrentSearch());
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);

// Case 5: When search is throttled concurrent segment search is not used
readerContext = new ReaderContext(
newContextId(),
indexService,
throttledIndexShard,
searcherSupplier.get(),
randomNonNegativeLong(),
false
);
context = new DefaultSearchContext(
readerContext,
shardSearchRequest,
target,
null,
bigArrays,
null,
null,
null,
false,
Version.CURRENT,
false,
executor,
null
);
context.evaluateRequestShouldUseConcurrentSearch();
assertFalse(context.shouldUseConcurrentSearch());
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);

// shutdown the threadpool
threadPool.shutdown();
}
Expand Down

0 comments on commit 68450d8

Please sign in to comment.