From cb2b7a4f05f13bd8a677e8f54933ef3cb41f7eb7 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Thu, 28 Mar 2024 15:02:58 -0700 Subject: [PATCH] Disable concurrent segment search for system indices and throttled search requests (#12954) Signed-off-by: Jay Deng --- CHANGELOG.md | 1 + .../search/DefaultSearchContext.java | 9 +- .../search/DefaultSearchContextTests.java | 83 ++++++++++++++++++- 3 files changed, 90 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f04f8641396f..2f6b7be438080 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697)) - Convert ingest processor supports ip type ([#12818](https://github.com/opensearch-project/OpenSearch/pull/12818)) - Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865)) +- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 4195cc67f7af1..b49cdc1b823e0 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -962,6 +962,12 @@ public BucketCollectorProcessor bucketCollectorProcessor() { * false: otherwise */ private boolean evaluateConcurrentSegmentSearchSettings(Executor concurrentSearchExecutor) { + // Do not use concurrent segment search for system indices or throttled requests. See: + // https://github.com/opensearch-project/OpenSearch/issues/12951 + if (indexShard.isSystem() || indexShard.indexSettings().isSearchThrottled()) { + return false; + } + if ((clusterService != null) && (concurrentSearchExecutor != null)) { return indexService.getIndexSettings() .getSettings() @@ -969,9 +975,8 @@ private boolean evaluateConcurrentSegmentSearchSettings(Executor concurrentSearc IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), clusterService.getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING) ); - } else { - return false; } + return false; } @Override diff --git a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java index 3793249d569f0..a1a808c9faa9b 100644 --- a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java @@ -93,6 +93,7 @@ import java.util.function.Function; import java.util.function.Supplier; +import static org.opensearch.index.IndexSettings.INDEX_SEARCH_THROTTLED; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.mockito.Mockito.any; @@ -168,6 +169,7 @@ public void testPreProcess() throws Exception { IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY); when(indexService.getIndexSettings()).thenReturn(indexSettings); when(mapperService.getIndexSettings()).thenReturn(indexSettings); + when(indexShard.indexSettings()).thenReturn(indexSettings); BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); @@ -486,6 +488,14 @@ public void testClearQueryCancellationsOnClose() throws IOException { when(indexService.newQueryShardContext(eq(shardId.id()), any(), any(), nullable(String.class), anyBoolean())).thenReturn( queryShardContext ); + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + .build(); + IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build(); + IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY); + when(indexShard.indexSettings()).thenReturn(indexSettings); BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); @@ -551,7 +561,7 @@ protected Engine.Searcher acquireSearcherInternal(String source) { } } - public void testSearchPathEvaluationUsingSortField() throws Exception { + public void testSearchPathEvaluation() throws Exception { ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class); when(shardSearchRequest.searchType()).thenReturn(SearchType.DEFAULT); ShardId shardId = new ShardId("index", UUID.randomUUID().toString(), 1); @@ -578,9 +588,24 @@ public void testSearchPathEvaluationUsingSortField() throws Exception { IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build(); IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY); when(indexService.getIndexSettings()).thenReturn(indexSettings); + when(indexShard.indexSettings()).thenReturn(indexSettings); BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + IndexShard systemIndexShard = mock(IndexShard.class); + when(systemIndexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy); + when(systemIndexShard.getThreadPool()).thenReturn(threadPool); + when(systemIndexShard.isSystem()).thenReturn(true); + + IndexShard throttledIndexShard = mock(IndexShard.class); + when(throttledIndexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy); + when(throttledIndexShard.getThreadPool()).thenReturn(threadPool); + IndexSettings throttledIndexSettings = new IndexSettings( + indexMetadata, + Settings.builder().put(INDEX_SEARCH_THROTTLED.getKey(), true).build() + ); + when(throttledIndexShard.indexSettings()).thenReturn(throttledIndexSettings); + try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) { final Supplier searcherSupplier = () -> new Engine.SearcherSupplier(Function.identity()) { @@ -697,6 +722,62 @@ protected Engine.Searcher acquireSearcherInternal(String source) { } assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch); + // Case 4: With a system index concurrent segment search is not used + readerContext = new ReaderContext( + newContextId(), + indexService, + systemIndexShard, + searcherSupplier.get(), + randomNonNegativeLong(), + false + ); + context = new DefaultSearchContext( + readerContext, + shardSearchRequest, + target, + null, + bigArrays, + null, + null, + null, + false, + Version.CURRENT, + false, + executor, + null + ); + context.evaluateRequestShouldUseConcurrentSearch(); + assertFalse(context.shouldUseConcurrentSearch()); + assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch); + + // Case 5: When search is throttled concurrent segment search is not used + readerContext = new ReaderContext( + newContextId(), + indexService, + throttledIndexShard, + searcherSupplier.get(), + randomNonNegativeLong(), + false + ); + context = new DefaultSearchContext( + readerContext, + shardSearchRequest, + target, + null, + bigArrays, + null, + null, + null, + false, + Version.CURRENT, + false, + executor, + null + ); + context.evaluateRequestShouldUseConcurrentSearch(); + assertFalse(context.shouldUseConcurrentSearch()); + assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch); + // shutdown the threadpool threadPool.shutdown(); }