Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for query profiler with concurrent aggregation #9248

Merged
merged 9 commits into from
Sep 5, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Use non-concurrent path for sort request on timeseries index and field([#9562](https://github.com/opensearch-project/OpenSearch/pull/9562))
- Added sampler based on `Blanket Probabilistic Sampling rate` and `Override for on demand` ([#9621](https://github.com/opensearch-project/OpenSearch/issues/9621))
- [Remote Store] Add support for Remote Translog Store stats in `_remotestore/stats/` API ([#9263](https://github.com/opensearch-project/OpenSearch/pull/9263))
- Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.search.profile.query;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.tests.util.English;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.MultiSearchResponse;
Expand All @@ -40,29 +42,56 @@
import org.opensearch.action.search.SearchType;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.SearchHit;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedOpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.profile.query.RandomQueryGenerator.randomQueryBuilder;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;

public class QueryProfilerIT extends ParameterizedOpenSearchIntegTestCase {
private final boolean concurrentSearchEnabled;
private static final String MAX_PREFIX = "max_";
private static final String MIN_PREFIX = "min_";
private static final String AVG_PREFIX = "avg_";
private static final String TIMING_TYPE_COUNT_SUFFIX = "_count";

public QueryProfilerIT(Settings settings, boolean concurrentSearchEnabled) {
super(settings);
this.concurrentSearchEnabled = concurrentSearchEnabled;
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build(), false },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build(), true }
);
}

public class QueryProfilerIT extends OpenSearchIntegTestCase {
@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build();
}

/**
* This test simply checks to make sure nothing crashes. Test indexes 100-150 documents,
Expand Down Expand Up @@ -229,6 +258,7 @@ public void testSimpleMatch() throws Exception {
assertEquals(result.getLuceneDescription(), "field1:one");
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -271,6 +301,7 @@ public void testBool() throws Exception {
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertEquals(result.getProfiledChildren().size(), 2);
assertQueryProfileResult(result);

// Check the children
List<ProfileResult> children = result.getProfiledChildren();
Expand All @@ -282,12 +313,14 @@ public void testBool() throws Exception {
assertThat(childProfile.getTime(), greaterThan(0L));
assertNotNull(childProfile.getTimeBreakdown());
assertEquals(childProfile.getProfiledChildren().size(), 0);
assertQueryProfileResult(childProfile);

childProfile = children.get(1);
assertEquals(childProfile.getQueryName(), "TermQuery");
assertEquals(childProfile.getLuceneDescription(), "field1:two");
assertThat(childProfile.getTime(), greaterThan(0L));
assertNotNull(childProfile.getTimeBreakdown());
assertQueryProfileResult(childProfile);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -330,6 +363,7 @@ public void testEmptyBool() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -375,6 +409,7 @@ public void testCollapsingBool() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -415,6 +450,7 @@ public void testBoosting() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -455,6 +491,7 @@ public void testDisMaxRange() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -494,6 +531,7 @@ public void testRange() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -547,6 +585,7 @@ public void testPhrase() throws Exception {
assertNotNull(result.getLuceneDescription());
assertThat(result.getTime(), greaterThan(0L));
assertNotNull(result.getTimeBreakdown());
assertQueryProfileResult(result);
}

CollectorResult result = searchProfiles.getCollectorResult();
Expand Down Expand Up @@ -579,4 +618,35 @@ public void testNoProfile() throws Exception {
assertThat("Profile response element should be an empty map", resp.getProfileResults().size(), equalTo(0));
}

private void assertQueryProfileResult(ProfileResult result) {
Map<String, Long> breakdown = result.getTimeBreakdown();
Long maxSliceTime = result.getMaxSliceTime();
Long minSliceTime = result.getMinSliceTime();
Long avgSliceTime = result.getAvgSliceTime();
if (concurrentSearchEnabled) {
assertNotNull(maxSliceTime);
assertNotNull(minSliceTime);
assertNotNull(avgSliceTime);
assertThat(breakdown.size(), equalTo(66));
for (QueryTimingType queryTimingType : QueryTimingType.values()) {
if (queryTimingType != QueryTimingType.CREATE_WEIGHT) {
String maxTimingType = MAX_PREFIX + queryTimingType;
String minTimingType = MIN_PREFIX + queryTimingType;
String avgTimingType = AVG_PREFIX + queryTimingType;
assertNotNull(breakdown.get(maxTimingType));
assertNotNull(breakdown.get(minTimingType));
assertNotNull(breakdown.get(avgTimingType));
assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX));
assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX));
}
}
} else {
assertThat(maxSliceTime, is(nullValue()));
assertThat(minSliceTime, is(nullValue()));
assertThat(avgSliceTime, is(nullValue()));
assertThat(breakdown.size(), equalTo(27));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
final LeafCollector leafCollector;
try {
cancellable.checkCancelled();
if (weight instanceof ProfileWeight) {
((ProfileWeight) weight).associateCollectorToLeaves(ctx, collector);
}
weight = wrapWeight(weight);
// See please https://github.com/apache/lucene/pull/964
collector.setWeight(weight);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ private ProfileResult doGetTree(int token) {
// calculating the same times over and over...but worth the effort?
String type = getTypeFromElement(element);
String description = getDescriptionFromElement(element);
return createProfileResult(type, description, breakdown, childrenProfileResults);
}

protected ProfileResult createProfileResult(String type, String description, PB breakdown, List<ProfileResult> childrenProfileResults) {
return new ProfileResult(
type,
description,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,19 @@ public Map<String, Long> toBreakdownMap() {
for (T timingType : this.timingTypes) {
map.put(timingType.toString(), this.timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType + TIMING_TYPE_COUNT_SUFFIX, this.timings[timingType.ordinal()].getCount());
map.put(timingType + TIMING_TYPE_START_TIME_SUFFIX, this.timings[timingType.ordinal()].getEarliestTimerStartTime());
ticheng-aws marked this conversation as resolved.
Show resolved Hide resolved
}
return Collections.unmodifiableMap(map);
}

/**
* Fetch extra debugging information.
*/
protected Map<String, Object> toDebugMap() {
public Map<String, Object> toDebugMap() {
return emptyMap();
}

public final long toNodeTime() {
public long toNodeTime() {
long total = 0;
for (T timingType : timingTypes) {
total += timings[timingType.ordinal()].getApproximateTiming();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@

package org.opensearch.search.profile;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;

import java.util.List;
import java.util.Map;

/**
* Provide contextual profile breakdowns which are associated with freestyle context. Used when concurrent
* search over segments is activated and each collector needs own non-shareable profile breakdown instance.
Expand All @@ -25,4 +31,8 @@ public ContextualProfileBreakdown(Class<T> clazz) {
* @return contextual profile breakdown instance
*/
public abstract AbstractProfileBreakdown<T> context(Object context);

public void associateCollectorToLeaves(Collector collector, LeafReaderContext leaf) {}

public void associateCollectorsToLeaves(Map<Collector, List<LeafReaderContext>> collectorToLeaves) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import org.opensearch.search.profile.AbstractProfileBreakdown;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -60,21 +59,7 @@ public void addDebugInfo(String key, Object value) {
}

@Override
protected Map<String, Object> toDebugMap() {
public Map<String, Object> toDebugMap() {
return unmodifiableMap(extra);
}

/**
* Build a timing count startTime breakdown for aggregation timing types
*/
@Override
public Map<String, Long> toBreakdownMap() {
Map<String, Long> map = new HashMap<>(timings.length * 3);
for (AggregationTimingType timingType : timingTypes) {
map.put(timingType.toString(), timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType + TIMING_TYPE_COUNT_SUFFIX, timings[timingType.ordinal()].getCount());
map.put(timingType + TIMING_TYPE_START_TIME_SUFFIX, timings[timingType.ordinal()].getEarliestTimerStartTime());
}
return Collections.unmodifiableMap(map);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.profile.query;

import org.apache.lucene.search.Query;
import org.opensearch.search.profile.AbstractInternalProfileTree;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.ProfileResult;

/**
* This class tracks the dependency tree for queries (scoring and rewriting) and
* generates {@link QueryProfileBreakdown} for each node in the tree. It also finalizes the tree
* and returns a list of {@link ProfileResult} that can be serialized back to the client
*
* @opensearch.internal
*/
public abstract class AbstractQueryProfileTree extends AbstractInternalProfileTree<ContextualProfileBreakdown<QueryTimingType>, Query> {

/** Rewrite time */
private long rewriteTime;
private long rewriteScratch;

@Override
protected String getTypeFromElement(Query query) {
// Anonymous classes won't have a name,
// we need to get the super class
if (query.getClass().getSimpleName().isEmpty()) {
return query.getClass().getSuperclass().getSimpleName();
}
return query.getClass().getSimpleName();
}

@Override
protected String getDescriptionFromElement(Query query) {
return query.toString();
}

/**
* Begin timing a query for a specific Timing context
*/
public void startRewriteTime() {
assert rewriteScratch == 0;
rewriteScratch = System.nanoTime();
}

/**
* Halt the timing process and add the elapsed rewriting time.
* startRewriteTime() must be called for a particular context prior to calling
* stopAndAddRewriteTime(), otherwise the elapsed time will be negative and
* nonsensical
*
* @return The elapsed time
*/
public long stopAndAddRewriteTime() {
long time = Math.max(1, System.nanoTime() - rewriteScratch);
rewriteTime += time;
rewriteScratch = 0;
return time;
}

public long getRewriteTime() {
return rewriteTime;
}
}
Loading