From 8c2ad21ab15bfea05e7ebaf2f3cc2fa562669a65 Mon Sep 17 00:00:00 2001 From: Ticheng Lin Date: Thu, 12 Oct 2023 06:55:18 +0000 Subject: [PATCH] Refactor and work on the PR comments (#10352) Signed-off-by: Ticheng Lin --- .../search/profile/query/QueryProfilerIT.java | 83 +++++++++++ .../search/internal/ContextIndexSearcher.java | 30 +--- .../search/profile/AbstractProfiler.java | 22 +-- .../opensearch/search/profile/Profilers.java | 7 +- .../ConcurrentQueryProfileBreakdown.java | 27 +++- .../query/ConcurrentQueryProfileTree.java | 10 -- .../query/ConcurrentQueryProfiler.java | 134 ++++++++++++++++++ .../search/profile/query/QueryProfiler.java | 68 +-------- .../ConcurrentQueryProfileBreakdownTests.java | 52 +++++++ .../query/ConcurrentQueryProfilerTests.java | 36 +++++ .../profile/query/QueryProfilerTests.java | 34 ++--- 11 files changed, 356 insertions(+), 147 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java create mode 100644 server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java b/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java index 24f2b1bbdabfc..ef73438114079 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/profile/query/QueryProfilerIT.java @@ -460,6 +460,89 @@ public void testBoosting() throws Exception { } } + public void testSearchLeafForItsLeavesAndRewriteQuery() throws Exception { + createIndex("test"); + ensureGreen(); + + int numDocs = 122; + IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < numDocs; i++) { + docs[i] = client().prepareIndex("test").setId(String.valueOf(i)).setSource("field1", English.intToEnglish(i), "field2", i); + } + + List terms = Arrays.asList("zero", "zero", "one"); + + indexRandom(true, docs); + + refresh(); + + QueryBuilder q = QueryBuilders.boostingQuery( + QueryBuilders.idsQuery().addIds(String.valueOf(randomInt()), String.valueOf(randomInt())), + QueryBuilders.termsQuery("field1", terms) + ).boost(randomFloat()).negativeBoost(randomFloat()); + logger.info("Query: {}", q); + + SearchResponse resp = client().prepareSearch() + .setQuery(q) + .setTrackTotalHits(true) + .setProfile(true) + .setSearchType(SearchType.QUERY_THEN_FETCH) + .get(); + + assertNotNull("Profile response element should not be null", resp.getProfileResults()); + assertThat("Profile response should not be an empty array", resp.getProfileResults().size(), not(0)); + + for (Map.Entry shardResult : resp.getProfileResults().entrySet()) { + assertThat(shardResult.getValue().getNetworkTime().getInboundNetworkTime(), greaterThanOrEqualTo(0L)); + assertThat(shardResult.getValue().getNetworkTime().getOutboundNetworkTime(), greaterThanOrEqualTo(0L)); + for (QueryProfileShardResult searchProfiles : shardResult.getValue().getQueryProfileResults()) { + List results = searchProfiles.getQueryResults(); + for (ProfileResult result : results) { + assertNotNull(result.getQueryName()); + assertNotNull(result.getLuceneDescription()); + assertThat(result.getTime(), greaterThan(0L)); + Map breakdown = result.getTimeBreakdown(); + Long maxSliceTime = result.getMaxSliceTime(); + Long minSliceTime = result.getMinSliceTime(); + Long avgSliceTime = result.getAvgSliceTime(); + if (concurrentSearchEnabled && results.get(0).equals(result)) { + assertNotNull(maxSliceTime); + assertNotNull(minSliceTime); + assertNotNull(avgSliceTime); + assertThat(breakdown.size(), equalTo(66)); + for (QueryTimingType queryTimingType : QueryTimingType.values()) { + if (queryTimingType != QueryTimingType.CREATE_WEIGHT) { + String maxTimingType = MAX_PREFIX + queryTimingType; + String minTimingType = MIN_PREFIX + queryTimingType; + String avgTimingType = AVG_PREFIX + queryTimingType; + assertNotNull(breakdown.get(maxTimingType)); + assertNotNull(breakdown.get(minTimingType)); + assertNotNull(breakdown.get(avgTimingType)); + assertNotNull(breakdown.get(maxTimingType + TIMING_TYPE_COUNT_SUFFIX)); + assertNotNull(breakdown.get(minTimingType + TIMING_TYPE_COUNT_SUFFIX)); + assertNotNull(breakdown.get(avgTimingType + TIMING_TYPE_COUNT_SUFFIX)); + } + } + } else if (concurrentSearchEnabled) { + assertThat(maxSliceTime, equalTo(0L)); + assertThat(minSliceTime, equalTo(0L)); + assertThat(avgSliceTime, equalTo(0L)); + assertThat(breakdown.size(), equalTo(27)); + } else { + assertThat(maxSliceTime, is(nullValue())); + assertThat(minSliceTime, is(nullValue())); + assertThat(avgSliceTime, is(nullValue())); + assertThat(breakdown.size(), equalTo(27)); + } + } + + CollectorResult result = searchProfiles.getCollectorResult(); + assertThat(result.getName(), is(not(emptyOrNullString()))); + assertThat(result.getTime(), greaterThan(0L)); + } + } + } + public void testDisMaxRange() throws Exception { createIndex("test"); ensureGreen(); diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index afa28d04db9cf..a520086de8051 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -71,7 +71,6 @@ import org.opensearch.search.dfs.AggregatedDfs; import org.opensearch.search.profile.ContextualProfileBreakdown; import org.opensearch.search.profile.Timer; -import org.opensearch.search.profile.query.ConcurrentQueryProfileTree; import org.opensearch.search.profile.query.ProfileWeight; import org.opensearch.search.profile.query.QueryProfiler; import org.opensearch.search.profile.query.QueryTimingType; @@ -107,7 +106,6 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable { private QueryProfiler profiler; private MutableQueryTimeout cancellable; private SearchContext searchContext; - private boolean fromConcurrentPath = false; public ContextIndexSearcher( IndexReader reader, @@ -187,26 +185,15 @@ public void setAggregatedDfs(AggregatedDfs aggregatedDfs) { @Override public Query rewrite(Query original) throws IOException { - Timer concurrentPathRewriteTimer = null; if (profiler != null) { - if (fromConcurrentPath) { - concurrentPathRewriteTimer = new Timer(); - profiler.getConcurrentPathRewriteTimers().add(concurrentPathRewriteTimer); - concurrentPathRewriteTimer.start(); - } else { - profiler.startRewriteTime(); - } + profiler.startRewriteTime(); } try { return super.rewrite(original); } finally { if (profiler != null) { - if (fromConcurrentPath) { - concurrentPathRewriteTimer.stop(); - } else { - profiler.stopAndAddRewriteTime(); - } + profiler.stopAndAddRewriteTime(); } } } @@ -217,15 +204,7 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws // createWeight() is called for each query in the tree, so we tell the queryProfiler // each invocation so that it can build an internal representation of the query // tree - ContextualProfileBreakdown profile; - if (searchContext.shouldUseConcurrentSearch()) { - long threadId = Thread.currentThread().getId(); - ConcurrentQueryProfileTree profileTree = profiler.getThreadToProfileTree() - .computeIfAbsent(threadId, k -> new ConcurrentQueryProfileTree()); - profile = profileTree.getProfileBreakdown(query); - } else { - profile = profiler.getQueryBreakdown(query); - } + ContextualProfileBreakdown profile = profiler.getQueryBreakdown(query); Timer timer = profile.getTimer(QueryTimingType.CREATE_WEIGHT); timer.start(); final Weight weight; @@ -288,9 +267,6 @@ public void search( @Override protected void search(List leaves, Weight weight, Collector collector) throws IOException { - if (searchContext.shouldUseConcurrentSearch()) { - fromConcurrentPath = true; - } // Time series based workload by default traverses segments in desc order i.e. latest to the oldest order. // This is actually beneficial for search queries to start search on latest segments first for time series workload. // That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf diff --git a/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java b/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java index 9150c5ed212e5..4db1cb87a231d 100644 --- a/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java +++ b/server/src/main/java/org/opensearch/search/profile/AbstractProfiler.java @@ -32,11 +32,7 @@ package org.opensearch.search.profile; -import org.opensearch.search.profile.query.ConcurrentQueryProfileTree; - -import java.util.ArrayList; import java.util.List; -import java.util.Map; /** * Base class for a profiler @@ -46,7 +42,6 @@ public class AbstractProfiler, E> { protected final AbstractInternalProfileTree profileTree; - protected Map threadToProfileTree; public AbstractProfiler(AbstractInternalProfileTree profileTree) { this.profileTree = profileTree; @@ -64,27 +59,14 @@ public PB getQueryBreakdown(E query) { * Removes the last (e.g. most recent) element on the stack. */ public void pollLastElement() { - if (threadToProfileTree == null) { - profileTree.pollLast(); - } else { - long threadId = Thread.currentThread().getId(); - ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(threadId); - concurrentProfileTree.pollLast(); - } + profileTree.pollLast(); } /** * @return a hierarchical representation of the profiled tree */ public List getTree() { - if (threadToProfileTree == null) { - return profileTree.getTree(); - } - List profileResults = new ArrayList<>(); - for (Map.Entry profile : threadToProfileTree.entrySet()) { - profileResults.addAll(profile.getValue().getTree()); - } - return profileResults; + return profileTree.getTree(); } } diff --git a/server/src/main/java/org/opensearch/search/profile/Profilers.java b/server/src/main/java/org/opensearch/search/profile/Profilers.java index 8e87c7ff4acd4..68cf05c988b5b 100644 --- a/server/src/main/java/org/opensearch/search/profile/Profilers.java +++ b/server/src/main/java/org/opensearch/search/profile/Profilers.java @@ -35,6 +35,9 @@ import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.profile.aggregation.AggregationProfiler; import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler; +import org.opensearch.search.profile.query.ConcurrentQueryProfileTree; +import org.opensearch.search.profile.query.ConcurrentQueryProfiler; +import org.opensearch.search.profile.query.InternalQueryProfileTree; import org.opensearch.search.profile.query.QueryProfiler; import java.util.ArrayList; @@ -64,7 +67,9 @@ public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearc /** Switch to a new profile. */ public QueryProfiler addQueryProfiler() { - QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled); + QueryProfiler profiler = isConcurrentSegmentSearchEnabled + ? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()) + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); queryProfilers.add(profiler); return profiler; diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java index e567fdd2d436c..59ef01f9f947a 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdown.java @@ -70,7 +70,7 @@ public Map toBreakdownMap() { ); final long createWeightTime = topLevelBreakdownMapWithWeightTime.get(QueryTimingType.CREATE_WEIGHT.toString()); - if (sliceCollectorsToLeaves.isEmpty() || contexts.isEmpty()) { + if (contexts.isEmpty()) { // If there are no leaf contexts, then return the default concurrent query level breakdown, which will include the // create_weight time/count queryNodeTime = createWeightTime; @@ -78,6 +78,21 @@ public Map toBreakdownMap() { minSliceNodeTime = 0L; avgSliceNodeTime = 0L; return buildDefaultQueryBreakdownMap(createWeightTime); + } else if (sliceCollectorsToLeaves.isEmpty()) { + // This will happen when each slice executes search leaf for its leaves and query is rewritten for the leaf being searched. It + // creates a new weight and breakdown map for each rewritten query. This new breakdown map captures the timing information for + // the new rewritten query. The sliceCollectorsToLeaves is empty because this breakdown for rewritten query gets created later + // in search leaf path which doesn't have collector. Also, this is not needed since this breakdown is per leaf and there is no + // concurrency involved. An empty sliceCollectorsToLeaves could also happen in the case of early termination. + AbstractProfileBreakdown breakdown = contexts.values().iterator().next(); + queryNodeTime = breakdown.toNodeTime() + createWeightTime; + maxSliceNodeTime = 0L; + minSliceNodeTime = 0L; + avgSliceNodeTime = 0L; + Map queryBreakdownMap = new HashMap<>(breakdown.toBreakdownMap()); + queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT.toString(), createWeightTime); + queryBreakdownMap.put(QueryTimingType.CREATE_WEIGHT + TIMING_TYPE_COUNT_SUFFIX, 1L); + return queryBreakdownMap; } // first create the slice level breakdowns @@ -191,10 +206,12 @@ Map> buildSliceLevelBreakdown() { } // compute sliceMaxEndTime as max of sliceEndTime across all timing types sliceMaxEndTime = Math.max(sliceMaxEndTime, currentSliceBreakdown.getOrDefault(timingTypeSliceEndTimeKey, Long.MIN_VALUE)); - sliceMinStartTime = Math.min( - sliceMinStartTime, - currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE) - ); + long currentSliceStartTime = currentSliceBreakdown.getOrDefault(timingTypeSliceStartTimeKey, Long.MAX_VALUE); + if (currentSliceStartTime == 0L) { + // The timer for the current timing type never starts, so we continue here + continue; + } + sliceMinStartTime = Math.min(sliceMinStartTime, currentSliceStartTime); // compute total time for each timing type at slice level using sliceEndTime and sliceStartTime currentSliceBreakdown.put( timingType.toString(), diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java index d6e5b79743b82..4e54178c3b4fb 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfileTree.java @@ -12,9 +12,7 @@ import org.apache.lucene.search.Collector; import org.opensearch.search.profile.ContextualProfileBreakdown; import org.opensearch.search.profile.ProfileResult; -import org.opensearch.search.profile.Timer; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -24,7 +22,6 @@ * @opensearch.internal */ public class ConcurrentQueryProfileTree extends AbstractQueryProfileTree { - protected List concurrentPathRewriteTimers = new ArrayList<>(); @Override protected ContextualProfileBreakdown createProfileBreakdown() { @@ -91,11 +88,4 @@ private void updateCollectorToLeavesForChildBreakdowns(Integer parentToken, Map< } } } - - /** - * @return the concurrent path rewrite timer list for this profile tree - */ - public List getConcurrentPathRewriteTimers() { - return concurrentPathRewriteTimers; - } } diff --git a/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java new file mode 100644 index 0000000000000..42bf23bb13fbe --- /dev/null +++ b/server/src/main/java/org/opensearch/search/profile/query/ConcurrentQueryProfiler.java @@ -0,0 +1,134 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.profile.query; + +import org.apache.lucene.search.Query; +import org.opensearch.search.profile.ContextualProfileBreakdown; +import org.opensearch.search.profile.ProfileResult; +import org.opensearch.search.profile.Timer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This class acts as a thread-local storage for profiling a query with concurrent execution + * + * @opensearch.internal + */ +public final class ConcurrentQueryProfiler extends QueryProfiler { + + private final Map threadToProfileTree; + // The LinkedList does not need to be thread safe, as the map associates thread IDs with LinkedList, and only + // one thread will access the LinkedList at a time. + private final Map> threadToRewriteTimers; + + public ConcurrentQueryProfiler(AbstractQueryProfileTree profileTree) { + super(profileTree); + long threadId = getCurrentThreadId(); + // We utilize LinkedHashMap to preserve the insertion order of the profiled queries + threadToProfileTree = Collections.synchronizedMap(new LinkedHashMap<>()); + threadToProfileTree.put(threadId, (ConcurrentQueryProfileTree) profileTree); + threadToRewriteTimers = new ConcurrentHashMap<>(); + threadToRewriteTimers.put(threadId, new LinkedList<>()); + } + + @Override + public ContextualProfileBreakdown getQueryBreakdown(Query query) { + ConcurrentQueryProfileTree profileTree = threadToProfileTree.computeIfAbsent( + getCurrentThreadId(), + k -> new ConcurrentQueryProfileTree() + ); + return profileTree.getProfileBreakdown(query); + } + + /** + * Removes the last (e.g. most recent) element on ConcurrentQueryProfileTree stack. + */ + @Override + public void pollLastElement() { + ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(getCurrentThreadId()); + if (concurrentProfileTree != null) { + concurrentProfileTree.pollLast(); + } + } + + /** + * @return a hierarchical representation of the profiled tree + */ + @Override + public List getTree() { + List profileResults = new ArrayList<>(); + for (Map.Entry profile : threadToProfileTree.entrySet()) { + profileResults.addAll(profile.getValue().getTree()); + } + return profileResults; + } + + /** + * Begin timing the rewrite phase of a request + */ + @Override + public void startRewriteTime() { + Timer rewriteTimer = new Timer(); + threadToRewriteTimers.computeIfAbsent(getCurrentThreadId(), k -> new LinkedList<>()).add(rewriteTimer); + rewriteTimer.start(); + } + + /** + * Stop recording the current rewrite timer + */ + public void stopAndAddRewriteTime() { + Timer rewriteTimer = threadToRewriteTimers.get(getCurrentThreadId()).getLast(); + rewriteTimer.stop(); + } + + /** + * @return total time taken to rewrite all queries in this concurrent query profiler + */ + @Override + public long getRewriteTime() { + long totalRewriteTime = 0L; + List rewriteTimers = new LinkedList<>(); + threadToRewriteTimers.values().forEach(rewriteTimers::addAll); + LinkedList mergedIntervals = mergeRewriteTimeIntervals(rewriteTimers); + for (long[] interval : mergedIntervals) { + totalRewriteTime += interval[1] - interval[0]; + } + return totalRewriteTime; + } + + // package private for unit testing + LinkedList mergeRewriteTimeIntervals(List timers) { + LinkedList mergedIntervals = new LinkedList<>(); + timers.sort(Comparator.comparingLong(Timer::getEarliestTimerStartTime)); + for (Timer timer : timers) { + long startTime = timer.getEarliestTimerStartTime(); + long endTime = startTime + timer.getApproximateTiming(); + if (mergedIntervals.isEmpty() || mergedIntervals.getLast()[1] < startTime) { + long[] interval = new long[2]; + interval[0] = startTime; + interval[1] = endTime; + mergedIntervals.add(interval); + } else { + mergedIntervals.getLast()[1] = Math.max(mergedIntervals.getLast()[1], endTime); + } + } + return mergedIntervals; + } + + private long getCurrentThreadId() { + return Thread.currentThread().getId(); + } +} diff --git a/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java b/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java index af8852355bebe..332c4b3551450 100644 --- a/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java +++ b/server/src/main/java/org/opensearch/search/profile/query/QueryProfiler.java @@ -35,14 +35,8 @@ import org.apache.lucene.search.Query; import org.opensearch.search.profile.AbstractProfiler; import org.opensearch.search.profile.ContextualProfileBreakdown; -import org.opensearch.search.profile.Timer; -import java.util.Comparator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; /** * This class acts as a thread-local storage for profiling a query. It also @@ -57,20 +51,15 @@ * * @opensearch.internal */ -public final class QueryProfiler extends AbstractProfiler, Query> { +public class QueryProfiler extends AbstractProfiler, Query> { /** * The root Collector used in the search */ private InternalProfileComponent collector; - public QueryProfiler(boolean concurrent) { - super(concurrent ? new ConcurrentQueryProfileTree() : new InternalQueryProfileTree()); - if (concurrent) { - long threadId = Thread.currentThread().getId(); - threadToProfileTree = new ConcurrentHashMap<>(); - threadToProfileTree.put(threadId, (ConcurrentQueryProfileTree) profileTree); - } + public QueryProfiler(AbstractQueryProfileTree profileTree) { + super(profileTree); } /** Set the collector that is associated with this profiler. */ @@ -92,63 +81,18 @@ public void startRewriteTime() { /** * Stop recording the current rewrite and add it's time to the total tally, returning the * cumulative time so far. - * - * @return cumulative rewrite time */ - public long stopAndAddRewriteTime() { - return ((AbstractQueryProfileTree) profileTree).stopAndAddRewriteTime(); + public void stopAndAddRewriteTime() { + ((AbstractQueryProfileTree) profileTree).stopAndAddRewriteTime(); } /** * The rewriting process is complex and hard to display because queries can undergo significant changes. * Instead of showing intermediate results, we display the cumulative time for the non-concurrent search case. - * In the concurrent search case, we compute the sum of the non-overlapping time across all slices and add - * the rewrite time from the non-concurrent path. * @return total time taken to rewrite all queries in this profile */ public long getRewriteTime() { - long rewriteTime = ((AbstractQueryProfileTree) profileTree).getRewriteTime(); - if (profileTree instanceof ConcurrentQueryProfileTree) { - List timers = getConcurrentPathRewriteTimers(); - LinkedList mergedIntervals = mergeRewriteTimeIntervals(timers); - for (long[] interval : mergedIntervals) { - rewriteTime += interval[1] - interval[0]; - } - } - return rewriteTime; - } - - // package private for unit testing - LinkedList mergeRewriteTimeIntervals(List timers) { - LinkedList mergedIntervals = new LinkedList<>(); - timers.sort(Comparator.comparingLong(Timer::getEarliestTimerStartTime)); - for (Timer timer : timers) { - long startTime = timer.getEarliestTimerStartTime(); - long endTime = timer.getEarliestTimerStartTime() + timer.getApproximateTiming(); - if (mergedIntervals.isEmpty() || mergedIntervals.getLast()[1] < timer.getEarliestTimerStartTime()) { - long[] interval = new long[2]; - interval[0] = startTime; - interval[1] = endTime; - mergedIntervals.add(interval); - } else { - mergedIntervals.getLast()[1] = Math.max(mergedIntervals.getLast()[1], endTime); - } - } - return mergedIntervals; - } - - /** - * @return a list of concurrent path rewrite timers for this concurrent search - */ - public List getConcurrentPathRewriteTimers() { - return ((ConcurrentQueryProfileTree) profileTree).getConcurrentPathRewriteTimers(); - } - - /** - * @return the thread to profile tree map for this concurrent search - */ - public Map getThreadToProfileTree() { - return threadToProfileTree; + return ((AbstractQueryProfileTree) profileTree).getRewriteTime(); } /** diff --git a/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java index f29ba3b0cea07..db14eb90ef839 100644 --- a/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java +++ b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfileBreakdownTests.java @@ -333,6 +333,58 @@ public void testBreakDownMapWithMultipleSlicesAndOneSliceWithNoLeafContext() thr directory.close(); } + public void testOneLeafContextWithEmptySliceCollectorsToLeaves() throws Exception { + final DirectoryReader directoryReader = getDirectoryReader(1); + final Directory directory = directoryReader.directory(); + final long createWeightEarliestStartTime = createWeightTimer.getEarliestTimerStartTime(); + final long createWeightEndTime = createWeightEarliestStartTime + createWeightTimer.getApproximateTiming(); + final Map leafProfileBreakdownMap_1 = getLeafBreakdownMap(createWeightEndTime + 10, 10, 1); + final AbstractProfileBreakdown leafProfileBreakdown_1 = new TestQueryProfileBreakdown( + QueryTimingType.class, + leafProfileBreakdownMap_1 + ); + testQueryProfileBreakdown.getContexts().put(directoryReader.leaves().get(0), leafProfileBreakdown_1); + final Map queryBreakDownMap = testQueryProfileBreakdown.toBreakdownMap(); + assertFalse(queryBreakDownMap == null || queryBreakDownMap.isEmpty()); + assertEquals(26, queryBreakDownMap.size()); + for (QueryTimingType queryTimingType : QueryTimingType.values()) { + String timingTypeKey = queryTimingType.toString(); + String timingTypeCountKey = queryTimingType + TIMING_TYPE_COUNT_SUFFIX; + + if (queryTimingType.equals(QueryTimingType.CREATE_WEIGHT)) { + final long createWeightTime = queryBreakDownMap.get(timingTypeKey); + assertEquals(createWeightTimer.getApproximateTiming(), createWeightTime); + assertEquals(1, (long) queryBreakDownMap.get(timingTypeCountKey)); + // verify there is no min/max/avg for weight type stats + assertFalse( + queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeCountKey) + ); + continue; + } + assertNotNull(queryBreakDownMap.get(timingTypeKey)); + assertNotNull(queryBreakDownMap.get(timingTypeCountKey)); + // verify there is no min/max/avg for current breakdown type stats + assertFalse( + queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.MAX_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(MIN_PREFIX + timingTypeCountKey) + || queryBreakDownMap.containsKey(ConcurrentQueryProfileBreakdown.AVG_PREFIX + timingTypeCountKey) + ); + } + assertEquals(0, testQueryProfileBreakdown.getMaxSliceNodeTime()); + assertEquals(0, testQueryProfileBreakdown.getMinSliceNodeTime()); + assertEquals(0, testQueryProfileBreakdown.getAvgSliceNodeTime()); + directoryReader.close(); + directory.close(); + } + private Map getLeafBreakdownMap(long startTime, long timeTaken, long count) { Map leafBreakDownMap = new HashMap<>(); for (QueryTimingType timingType : QueryTimingType.values()) { diff --git a/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java new file mode 100644 index 0000000000000..736bbcdd9e8dd --- /dev/null +++ b/server/src/test/java/org/opensearch/search/profile/query/ConcurrentQueryProfilerTests.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.profile.query; + +import org.opensearch.search.profile.Timer; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.LinkedList; +import java.util.List; + +import static org.hamcrest.Matchers.equalTo; + +public class ConcurrentQueryProfilerTests extends OpenSearchTestCase { + + public void testMergeRewriteTimeIntervals() { + ConcurrentQueryProfiler profiler = new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()); + List timers = new LinkedList<>(); + timers.add(new Timer(217134L, 1L, 1L, 0L, 553074511206907L)); + timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287335L)); + timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287336L)); + LinkedList mergedIntervals = profiler.mergeRewriteTimeIntervals(timers); + assertThat(mergedIntervals.size(), equalTo(2)); + long[] interval = mergedIntervals.get(0); + assertThat(interval[0], equalTo(553074509287335L)); + assertThat(interval[1], equalTo(553074509516290L)); + interval = mergedIntervals.get(1); + assertThat(interval[0], equalTo(553074511206907L)); + assertThat(interval[1], equalTo(553074511424041L)); + } +} diff --git a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java index 23bb99b8282b3..481a224f2ff0e 100644 --- a/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java +++ b/server/src/test/java/org/opensearch/search/profile/query/QueryProfilerTests.java @@ -65,7 +65,6 @@ import org.opensearch.search.internal.ContextIndexSearcher; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.profile.ProfileResult; -import org.opensearch.search.profile.Timer; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; import org.junit.After; @@ -74,7 +73,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -163,7 +161,9 @@ public void tearDown() throws Exception { } public void testBasic() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null + ? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()) + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new TermQuery(new Term("foo", "bar")); searcher.search(query, 1); @@ -230,7 +230,9 @@ public void testBasic() throws IOException { } public void testNoScoring() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null + ? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()) + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new TermQuery(new Term("foo", "bar")); searcher.search(query, 1, Sort.INDEXORDER); // scores are not needed @@ -297,7 +299,9 @@ public void testNoScoring() throws IOException { } public void testUseIndexStats() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null + ? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()) + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new TermQuery(new Term("foo", "bar")); searcher.count(query); // will use index stats @@ -311,7 +315,9 @@ public void testUseIndexStats() throws IOException { } public void testApproximations() throws IOException { - QueryProfiler profiler = new QueryProfiler(executor != null); + QueryProfiler profiler = executor != null + ? new ConcurrentQueryProfiler(new ConcurrentQueryProfileTree()) + : new QueryProfiler(new InternalQueryProfileTree()); searcher.setProfiler(profiler); Query query = new RandomApproximationQuery(new TermQuery(new Term("foo", "bar")), random()); searcher.count(query); @@ -484,20 +490,4 @@ public boolean shouldCache(Query query) throws IOException { } }; - - public void testMergeRewriteTimeIntervals() { - QueryProfiler profiler = new QueryProfiler(executor != null); - List timers = new LinkedList<>(); - timers.add(new Timer(217134L, 1L, 1L, 0L, 553074511206907L)); - timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287335L)); - timers.add(new Timer(228954L, 1L, 1L, 0L, 553074509287336L)); - LinkedList mergedIntervals = profiler.mergeRewriteTimeIntervals(timers); - assertThat(mergedIntervals.size(), equalTo(2)); - long[] interval = mergedIntervals.get(0); - assertThat(interval[0], equalTo(553074509287335L)); - assertThat(interval[1], equalTo(553074509516290L)); - interval = mergedIntervals.get(1); - assertThat(interval[0], equalTo(553074511206907L)); - assertThat(interval[1], equalTo(553074511424041L)); - } }