Skip to content

Commit

Permalink
Backport of PRs for profile related changes in concurrent search path (
Browse files Browse the repository at this point in the history
…opensearch-project#10898)

* Add support for query profiler with concurrent aggregation (opensearch-project#9248)

* Add support for query profiler with concurrent aggregation (opensearch-project#9248)

Signed-off-by: Ticheng Lin <[email protected]>

* Refactor and work on the PR comments

Signed-off-by: Ticheng Lin <[email protected]>

* Update collectorToLeaves mapping for children breakdowns post profile metric collection and before creating the results

Signed-off-by: Sorabh Hamirwasia <[email protected]>

* Refactor logic to compute the slice level breakdown stats and query level breakdown stats for concurrent search case

Signed-off-by: Sorabh Hamirwasia <[email protected]>

* Fix QueryProfilePhaseTests and QueryProfileTests, and parameterize QueryProfilerIT with concurrent search enabled

Signed-off-by: Ticheng Lin <[email protected]>

* Handle the case when there are no leaf context to compute the profile stats to return default stats
for all breakdown type along with min/max/avg values. Replace queryStart and queryEnd time with queryNodeTime

Signed-off-by: Sorabh Hamirwasia <[email protected]>

* Add UTs for ConcurrentQueryProfileBreakdown

Signed-off-by: Sorabh Hamirwasia <[email protected]>

* Add concurrent search stats test into the QueryProfilerIT

Signed-off-by: Ticheng Lin <[email protected]>

* Address review comments

Signed-off-by: Sorabh Hamirwasia <[email protected]>

---------

Signed-off-by: Ticheng Lin <[email protected]>
Signed-off-by: Sorabh Hamirwasia <[email protected]>
Co-authored-by: Sorabh Hamirwasia <[email protected]>

* Fix NPE in ConcurrentQueryProfile while computing the breakdown map for slices (opensearch-project#10111)

* Fix NPE in ConcurrentQueryProfile while computing the breakdown map for slices.

There can be cases where one or more slice may not have timing related information for its leaves in
contexts map. During creation of slice and query level breakdown map it needs to handle such cases by using
default values correctly. Also updating the min/max/avg sliceNodeTime to not include time to create
weight and wait times by slice threads. It will reflect the min/max/avg execution time of each slice
whereas totalNodeTime will reflect the total query time.

Signed-off-by: Sorabh Hamirwasia <[email protected]>

* Address review comments

Signed-off-by: Sorabh Hamirwasia <[email protected]>

---------

Signed-off-by: Sorabh Hamirwasia <[email protected]>

* Fix flaky query profile phase tests with concurrent search enabled (opensearch-project#10547) (opensearch-project#10547)

Signed-off-by: Ticheng Lin <[email protected]>

* Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight (opensearch-project#10352)

* Fix timer race condition in profile rewrite and create weight for concurrent segment search (opensearch-project#10352)

Signed-off-by: Ticheng Lin <[email protected]>

* Refactor and work on the PR comments (opensearch-project#10352)

Signed-off-by: Ticheng Lin <[email protected]>

---------

Signed-off-by: Ticheng Lin <[email protected]>

---------

Signed-off-by: Ticheng Lin <[email protected]>
Signed-off-by: Sorabh Hamirwasia <[email protected]>
Co-authored-by: Ticheng Lin <[email protected]>
  • Loading branch information
sohami and ticheng-aws authored Oct 31, 2023
1 parent 7922295 commit 397c9ef
Show file tree
Hide file tree
Showing 20 changed files with 1,576 additions and 118 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
- Update the indexRandom function to create more segments for concurrent search tests ([10247](https://github.com/opensearch-project/OpenSearch/pull/10247))
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))
- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352))

### Dependencies
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.search.profile.query;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.tests.util.English;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.MultiSearchResponse;
Expand All @@ -40,29 +42,56 @@
import org.opensearch.action.search.SearchType;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedOpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.profile.query.RandomQueryGenerator.randomQueryBuilder;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

public class QueryProfilerIT extends ParameterizedOpenSearchIntegTestCase {
private final boolean concurrentSearchEnabled;
private static final String MAX_PREFIX = "max_";
private static final String MIN_PREFIX = "min_";
private static final String AVG_PREFIX = "avg_";
private static final String TIMING_TYPE_COUNT_SUFFIX = "_count";

public QueryProfilerIT(Settings settings, boolean concurrentSearchEnabled) {
super(settings);
this.concurrentSearchEnabled = concurrentSearchEnabled;
}

public class QueryProfilerIT extends OpenSearchIntegTestCase {
@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build(), false },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(), true }
);
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build();
}

/**
* This test simply checks to make sure nothing crashes. Test indexes 100-150 documents,
Expand Down Expand Up @@ -229,6 +258,7 @@ public void testSimpleMatch() throws Exception {
assertEquals(result.getLuceneDescription(), "field1:one");
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -271,6 +301,7 @@ public void testBool() throws Exception {
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertEquals(result.getProfiledChildren().size(), 2);
assertQueryProfileResult(result);

// Check the children
List<ProfileResult> children = result.getProfiledChildren();
Expand All @@ -282,12 +313,14 @@ public void testBool() throws Exception {
assertThat(childProfile.getTime(), greaterThan(0L));
assertNotNull(childProfile.getTimeBreakdown());
assertEquals(childProfile.getProfiledChildren().size(), 0);
assertQueryProfileResult(childProfile);

childProfile = children.get(1);
assertEquals(childProfile.getQueryName(), "TermQuery");
assertEquals(childProfile.getLuceneDescription(), "field1:two");
assertThat(childProfile.getTime(), greaterThan(0L));
assertNotNull(childProfile.getTimeBreakdown());
assertQueryProfileResult(childProfile);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -330,6 +363,7 @@ public void testEmptyBool() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -375,6 +409,7 @@ public void testCollapsingBool() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -415,6 +450,90 @@ public void testBoosting() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
assertThat(result.getName(), is(not(emptyOrNullString())));
assertThat(result.getTime(), greaterThan(0L));
}
}
}

public void testSearchLeafForItsLeavesAndRewriteQuery() throws Exception {
createIndex("test");
ensureGreen();

int numDocs = 122;
IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];
for (int i = 0; i < numDocs; i++) {
docs[i] = client().prepareIndex("test").setId(String.valueOf(i)).setSource("field1", English.intToEnglish(i), "field2", i);
}

List<String> terms = Arrays.asList("zero", "zero", "one");

indexRandom(true, docs);

refresh();

QueryBuilder q = QueryBuilders.boostingQuery(
QueryBuilders.idsQuery().addIds(String.valueOf(randomInt()), String.valueOf(randomInt())),
QueryBuilders.termsQuery("field1", terms)
).boost(randomFloat()).negativeBoost(randomFloat());
logger.info("Query: {}", q);

SearchResponse resp = client().prepareSearch()
.setQuery(q)
.setTrackTotalHits(true)
.setProfile(true)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.get();

assertNotNull("Profile response element should not be null", resp.getProfileResults());
assertThat("Profile response should not be an empty array", resp.getProfileResults().size(), not(0));

for (Map.Entry<String, ProfileShardResult> shardResult : resp.getProfileResults().entrySet()) {
assertThat(shardResult.getValue().getNetworkTime().getInboundNetworkTime(), greaterThanOrEqualTo(0L));
assertThat(shardResult.getValue().getNetworkTime().getOutboundNetworkTime(), greaterThanOrEqualTo(0L));
for (QueryProfileShardResult searchProfiles : shardResult.getValue().getQueryProfileResults()) {
List<ProfileResult> results = searchProfiles.getQueryResults();
for (ProfileResult result : results) {
assertNotNull(result.getQueryName());
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
Map<String, Long> breakdown = result.getTimeBreakdown();
Long maxSliceTime = result.getMaxSliceTime();
Long minSliceTime = result.getMinSliceTime();
Long avgSliceTime = result.getAvgSliceTime();
if (concurrentSearchEnabled && results.get(0).equals(result)) {
assertNotNull(maxSliceTime);
assertNotNull(minSliceTime);
assertNotNull(avgSliceTime);
assertThat(breakdown.size(), equalTo(66));
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
if (queryTimingType != QueryTimingType.CREATE_WEIGHT) {
String maxTimingType = MAX_PREFIX + queryTimingType;
String minTimingType = MIN_PREFIX + queryTimingType;
String avgTimingType = AVG_PREFIX + queryTimingType;
assertNotNull(breakdown.get(maxTimingType));
assertNotNull(breakdown.get(minTimingType));
assertNotNull(breakdown.get(avgTimingType));
assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX));
}
}
} else if (concurrentSearchEnabled) {
assertThat(maxSliceTime, equalTo(0L));
assertThat(minSliceTime, equalTo(0L));
assertThat(avgSliceTime, equalTo(0L));
assertThat(breakdown.size(), equalTo(27));
} else {
assertThat(maxSliceTime, is(nullValue()));
assertThat(minSliceTime, is(nullValue()));
assertThat(avgSliceTime, is(nullValue()));
assertThat(breakdown.size(), equalTo(27));
}
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -455,6 +574,7 @@ public void testDisMaxRange() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -494,6 +614,7 @@ public void testRange() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -547,6 +668,7 @@ public void testPhrase() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -579,4 +701,35 @@ public void testNoProfile() throws Exception {
assertThat("Profile response element should be an empty map", resp.getProfileResults().size(), equalTo(0));
}

private void assertQueryProfileResult(ProfileResult result) {
Map<String, Long> breakdown = result.getTimeBreakdown();
Long maxSliceTime = result.getMaxSliceTime();
Long minSliceTime = result.getMinSliceTime();
Long avgSliceTime = result.getAvgSliceTime();
if (concurrentSearchEnabled) {
assertNotNull(maxSliceTime);
assertNotNull(minSliceTime);
assertNotNull(avgSliceTime);
assertThat(breakdown.size(), equalTo(66));
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
if (queryTimingType != QueryTimingType.CREATE_WEIGHT) {
String maxTimingType = MAX_PREFIX + queryTimingType;
String minTimingType = MIN_PREFIX + queryTimingType;
String avgTimingType = AVG_PREFIX + queryTimingType;
assertNotNull(breakdown.get(maxTimingType));
assertNotNull(breakdown.get(minTimingType));
assertNotNull(breakdown.get(avgTimingType));
assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX));
}
}
} else {
assertThat(maxSliceTime, is(nullValue()));
assertThat(minSliceTime, is(nullValue()));
assertThat(avgSliceTime, is(nullValue()));
assertThat(breakdown.size(), equalTo(27));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
final LeafCollector leafCollector;
try {
cancellable.checkCancelled();
if (weight instanceof ProfileWeight) {
((ProfileWeight) weight).associateCollectorToLeaves(ctx, collector);
}
weight = wrapWeight(weight);
// See please https://github.com/apache/lucene/pull/964
collector.setWeight(weight);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ private ProfileResult doGetTree(int token) {
// calculating the same times over and over...but worth the effort?
String type = getTypeFromElement(element);
String description = getDescriptionFromElement(element);
return createProfileResult(type, description, breakdown, childrenProfileResults);
}

protected ProfileResult createProfileResult(String type, String description, PB breakdown, List<ProfileResult> childrenProfileResults) {
return new ProfileResult(
type,
description,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,19 @@ public Map<String, Long> toBreakdownMap() {
for (T timingType : this.timingTypes) {
map.put(timingType.toString(), this.timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType + TIMING_TYPE_COUNT_SUFFIX, this.timings[timingType.ordinal()].getCount());
map.put(timingType + TIMING_TYPE_START_TIME_SUFFIX, this.timings[timingType.ordinal()].getEarliestTimerStartTime());
}
return Collections.unmodifiableMap(map);
}

/**
* Fetch extra debugging information.
*/
protected Map<String, Object> toDebugMap() {
public Map<String, Object> toDebugMap() {
return emptyMap();
}

public final long toNodeTime() {
public long toNodeTime() {
long total = 0;
for (T timingType : timingTypes) {
total += timings[timingType.ordinal()].getApproximateTiming();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@

package org.opensearch.search.profile;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;

import java.util.List;
import java.util.Map;

/**
* Provide contextual profile breakdowns which are associated with freestyle context. Used when concurrent
* search over segments is activated and each collector needs own non-shareable profile breakdown instance.
Expand All @@ -25,4 +31,8 @@ public ContextualProfileBreakdown(Class<T> clazz) {
* @return contextual profile breakdown instance
*/
public abstract AbstractProfileBreakdown<T> context(Object context);

public void associateCollectorToLeaves(Collector collector, LeafReaderContext leaf) {}

public void associateCollectorsToLeaves(Map<Collector, List<LeafReaderContext>> collectorToLeaves) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.profile.aggregation.AggregationProfiler;
import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler;
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
import org.opensearch.search.profile.query.ConcurrentQueryProfiler;
import org.opensearch.search.profile.query.InternalQueryProfileTree;
import org.opensearch.search.profile.query.QueryProfiler;

import java.util.ArrayList;
Expand Down Expand Up @@ -64,7 +67,9 @@ public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearc

/** Switch to a new profile. */
public QueryProfiler addQueryProfiler() {
QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled);
QueryProfiler profiler = isConcurrentSegmentSearchEnabled
? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree())
: new QueryProfiler(new InternalQueryProfileTree());
searcher.setProfiler(profiler);
queryProfilers.add(profiler);
return profiler;
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/search/profile/Timer.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ public class Timer {
private boolean doTiming;
private long timing, count, lastCount, start, earliestTimerStartTime;

public Timer() {
this(0, 0, 0, 0, 0);
}

public Timer(long timing, long count, long lastCount, long start, long earliestTimerStartTime) {
this.timing = timing;
this.count = count;
this.lastCount = lastCount;
this.start = start;
this.earliestTimerStartTime = earliestTimerStartTime;
}

/** pkg-private for testing */
long nanoTime() {
return System.nanoTime();
Expand Down
Loading

0 comments on commit 397c9ef

Please sign in to comment.