Skip to content

Commit

Permalink
For sort request on timeseries field use non concurrent search path (o…
Browse files Browse the repository at this point in the history
…pensearch-project#9562)

* For sort request on timeseries field use non concurrent search path

Signed-off-by: Sorabh Hamirwasia <[email protected]>

* Address review feedback

Signed-off-by: Sorabh Hamirwasia <[email protected]>

---------

Signed-off-by: Sorabh Hamirwasia <[email protected]>
  • Loading branch information
sohami authored Aug 28, 2023
1 parent a08d588 commit e5c4f9d
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Separate request-based and settings-based concurrent segment search controls and introduce AggregatorFactory method to determine concurrent search support ([#9469](https://github.com/opensearch-project/OpenSearch/pull/9469))
- [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/))
- [Remote Store] Implicitly use replication type SEGMENT for remote store clusters ([#9264](https://github.com/opensearch-project/OpenSearch/pull/9264))
- Use non-concurrent path for sort request on timeseries index and field([#9562](https://github.com/opensearch-project/OpenSearch/pull/9562))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,11 +890,15 @@ public boolean shouldUseConcurrentSearch() {
* Evaluate if parsed request supports concurrent segment search
*/
public void evaluateRequestShouldUseConcurrentSearch() {
if (aggregations() != null && aggregations().factories() != null) {
requestShouldUseConcurrentSearch.set(aggregations().factories().allFactoriesSupportConcurrentSearch());
} else {
requestShouldUseConcurrentSearch.set(true);
}
if (sort != null && sort.isSortOnTimeSeriesField()) {
requestShouldUseConcurrentSearch.set(false);
} else if (aggregations() != null
&& aggregations().factories() != null
&& !aggregations().factories().allFactoriesSupportConcurrentSearch()) {
requestShouldUseConcurrentSearch.set(false);
} else {
requestShouldUseConcurrentSearch.set(true);
}
}

public void setProfilers(Profilers profilers) {
Expand Down Expand Up @@ -965,4 +969,12 @@ public int getTargetMaxSliceCount() {
}
return clusterService.getClusterSettings().get(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING);
}

@Override
public boolean shouldUseTimeSeriesDescSortOptimization() {
return indexShard.isTimeSeriesDescSortOptimizationEnabled()
&& sort != null
&& sort.isSortOnTimeSeriesField()
&& sort.sort.getSort()[0].getReverse() == false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.CombinedBitSet;
import org.apache.lucene.util.SparseFixedBitSet;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.search.DocValueFormat;
Expand Down Expand Up @@ -268,10 +267,11 @@ public void search(

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
if (shouldReverseLeafReaderContexts()) {
// reverse the segment search order if this flag is true.
// Certain queries can benefit if we reverse the segment read order,
// for example time series based queries if searched for desc sort order.
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
for (int i = leaves.size() - 1; i >= 0; i--) {
searchLeaf(leaves.get(i), weight, collector);
}
Expand Down Expand Up @@ -517,26 +517,6 @@ private boolean canMatchSearchAfter(LeafReaderContext ctx) throws IOException {
return true;
}

private boolean shouldReverseLeafReaderContexts() {
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled()) {
// Only reverse order for asc order sort queries
if (searchContext.sort() != null
&& searchContext.sort().sort != null
&& searchContext.sort().sort.getSort() != null
&& searchContext.sort().sort.getSort().length > 0
&& searchContext.sort().sort.getSort()[0].getReverse() == false
&& searchContext.sort().sort.getSort()[0].getField() != null
&& searchContext.sort().sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME)) {
return true;
}
}
return false;
}

// package-private for testing
LeafSlice[] slicesInternal(List<LeafReaderContext> leaves, int targetMaxSlice) {
LeafSlice[] leafSlices;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,4 +569,9 @@ public boolean shouldUseConcurrentSearch() {
public int getTargetMaxSliceCount() {
return in.getTargetMaxSliceCount();
}

@Override
public boolean shouldUseTimeSeriesDescSortOptimization() {
return in.shouldUseTimeSeriesDescSortOptimization();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -487,4 +487,6 @@ public String toString() {
public abstract BucketCollectorProcessor bucketCollectorProcessor();

public abstract int getTargetMaxSliceCount();

public abstract boolean shouldUseTimeSeriesDescSortOptimization();
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ public boolean searchWith(
boolean hasTimeout
) throws IOException {
if (searchContext.shouldUseConcurrentSearch()) {
LOGGER.info("Using concurrent search over segments (experimental)");
LOGGER.debug("Using concurrent search over segments (experimental) for request with context id {}", searchContext.id());
return concurrentQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
} else {
LOGGER.debug("Using non-concurrent search over segments for request with context id {}", searchContext.id());
return defaultQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
}
}
Expand All @@ -73,9 +74,13 @@ public boolean searchWith(
@Override
public AggregationProcessor aggregationProcessor(SearchContext searchContext) {
if (searchContext.shouldUseConcurrentSearch()) {
LOGGER.info("Using concurrent search over segments (experimental)");
LOGGER.debug(
"Using concurrent aggregation processor over segments (experimental) for request with context id {}",
searchContext.id()
);
return concurrentQueryPhaseSearcher.aggregationProcessor(searchContext);
} else {
LOGGER.debug("Using non-concurrent aggregation processor over segments for request with context id {}", searchContext.id());
return defaultQueryPhaseSearcher.aggregationProcessor(searchContext);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.search.sort;

import org.apache.lucene.search.Sort;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.search.DocValueFormat;

/**
Expand All @@ -52,4 +53,13 @@ public SortAndFormats(Sort sort, DocValueFormat[] formats) {
this.formats = formats;
}

/**
* @return true: if sort is on timestamp field, false: otherwise
*/
public boolean isSortOnTimeSeriesField() {
return sort.getSort().length > 0
&& sort.getSort()[0].getField() != null
&& sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,21 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.opensearch.Version;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.SetOnce;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.MockBigArrays;
import org.opensearch.common.util.MockPageCacheRecycler;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
Expand All @@ -75,6 +80,7 @@
import org.opensearch.search.rescore.RescoreContext;
import org.opensearch.search.slice.SliceBuilder;
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -547,6 +553,159 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
}
}

public void testSearchPathEvaluationUsingSortField() throws Exception {
// enable the concurrent set FeatureFlag
FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH);
ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class);
when(shardSearchRequest.searchType()).thenReturn(SearchType.DEFAULT);
ShardId shardId = new ShardId("index", UUID.randomUUID().toString(), 1);
when(shardSearchRequest.shardId()).thenReturn(shardId);

ThreadPool threadPool = new TestThreadPool(this.getClass().getName());
IndexShard indexShard = mock(IndexShard.class);
QueryCachingPolicy queryCachingPolicy = mock(QueryCachingPolicy.class);
when(indexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy);
when(indexShard.getThreadPool()).thenReturn(threadPool);

Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
.build();

IndexService indexService = mock(IndexService.class);
QueryShardContext queryShardContext = mock(QueryShardContext.class);
when(indexService.newQueryShardContext(eq(shardId.id()), any(), any(), nullable(String.class), anyBoolean())).thenReturn(
queryShardContext
);

IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build();
IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
when(indexService.getIndexSettings()).thenReturn(indexSettings);

BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());

try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {

final Supplier<Engine.SearcherSupplier> searcherSupplier = () -> new Engine.SearcherSupplier(Function.identity()) {
@Override
protected void doClose() {}

@Override
protected Engine.Searcher acquireSearcherInternal(String source) {
try {
IndexReader reader = w.getReader();
return new Engine.Searcher(
"test",
reader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
reader
);
} catch (IOException exc) {
throw new AssertionError(exc);
}
}
};

SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE);
ReaderContext readerContext = new ReaderContext(
newContextId(),
indexService,
indexShard,
searcherSupplier.get(),
randomNonNegativeLong(),
false
);

final ClusterService clusterService = mock(ClusterService.class);
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
clusterSettings.registerSetting(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING);
clusterSettings.applySettings(
Settings.builder().put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build()
);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
DefaultSearchContext context = new DefaultSearchContext(
readerContext,
shardSearchRequest,
target,
null,
bigArrays,
null,
null,
null,
false,
Version.CURRENT,
false,
executor,
null
);

// Case1: if sort is on timestamp field, non-concurrent path is used
context.sort(
new SortAndFormats(new Sort(new SortField("@timestamp", SortField.Type.INT)), new DocValueFormat[] { DocValueFormat.RAW })
);
context.evaluateRequestShouldUseConcurrentSearch();
assertFalse(context.shouldUseConcurrentSearch());
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);

// Case2: if sort is on other field, concurrent path is used
context = new DefaultSearchContext(
readerContext,
shardSearchRequest,
target,
clusterService,
bigArrays,
null,
null,
null,
false,
Version.CURRENT,
false,
executor,
null
);
context.sort(
new SortAndFormats(new Sort(new SortField("test2", SortField.Type.INT)), new DocValueFormat[] { DocValueFormat.RAW })
);
context.evaluateRequestShouldUseConcurrentSearch();
if (executor == null) {
assertFalse(context.shouldUseConcurrentSearch());
} else {
assertTrue(context.shouldUseConcurrentSearch());
}
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);

// Case 3: With no sort, concurrent path is used
context = new DefaultSearchContext(
readerContext,
shardSearchRequest,
target,
clusterService,
bigArrays,
null,
null,
null,
false,
Version.CURRENT,
false,
executor,
null
);
context.evaluateRequestShouldUseConcurrentSearch();
if (executor == null) {
assertFalse(context.shouldUseConcurrentSearch());
} else {
assertTrue(context.shouldUseConcurrentSearch());
}
assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch);

// shutdown the threadpool
threadPool.shutdown();
}
}

private ShardSearchContextId newContextId() {
return new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,15 @@ public int getTargetMaxSliceCount() {
return maxSliceCount;
}

@Override
public boolean shouldUseTimeSeriesDescSortOptimization() {
return indexShard != null
&& indexShard.isTimeSeriesDescSortOptimizationEnabled()
&& sort != null
&& sort.isSortOnTimeSeriesField()
&& sort.sort.getSort()[0].getReverse() == false;
}

/**
* Clean the query results by consuming all of it
*/
Expand Down

0 comments on commit e5c4f9d

Please sign in to comment.