Skip to content

Commit

Permalink
Refactor and work on the PR comments (opensearch-project#10352)
Browse files Browse the repository at this point in the history
Signed-off-by: Ticheng Lin <[email protected]>
  • Loading branch information
ticheng-aws committed Oct 12, 2023
1 parent 45c8f78 commit 8e540e1
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.opensearch.search.dfs.AggregatedDfs;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.Timer;
import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;
import org.opensearch.search.profile.query.ProfileWeight;
import org.opensearch.search.profile.query.QueryProfiler;
import org.opensearch.search.profile.query.QueryTimingType;
Expand Down Expand Up @@ -107,7 +106,6 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
private QueryProfiler profiler;
private MutableQueryTimeout cancellable;
private SearchContext searchContext;
private boolean fromConcurrentPath = false;

public ContextIndexSearcher(
IndexReader reader,
Expand Down Expand Up @@ -187,26 +185,15 @@ public void setAggregatedDfs(AggregatedDfs aggregatedDfs) {

@Override
public Query rewrite(Query original) throws IOException {
Timer concurrentPathRewriteTimer = null;
if (profiler != null) {
if (fromConcurrentPath) {
concurrentPathRewriteTimer = new Timer();
profiler.getConcurrentPathRewriteTimers().add(concurrentPathRewriteTimer);
concurrentPathRewriteTimer.start();
} else {
profiler.startRewriteTime();
}
profiler.startRewriteTime();
}

try {
return super.rewrite(original);
} finally {
if (profiler != null) {
if (fromConcurrentPath) {
concurrentPathRewriteTimer.stop();
} else {
profiler.stopAndAddRewriteTime();
}
profiler.stopAndAddRewriteTime();
}
}
}
Expand All @@ -217,15 +204,7 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws
// createWeight() is called for each query in the tree, so we tell the queryProfiler
// each invocation so that it can build an internal representation of the query
// tree
ContextualProfileBreakdown<QueryTimingType> profile;
if (searchContext.shouldUseConcurrentSearch()) {
long threadId = Thread.currentThread().getId();
ConcurrentQueryProfileTree profileTree = profiler.getThreadToProfileTree()
.computeIfAbsent(threadId, k -> new ConcurrentQueryProfileTree());
profile = profileTree.getProfileBreakdown(query);
} else {
profile = profiler.getQueryBreakdown(query);
}
ContextualProfileBreakdown<QueryTimingType> profile = profiler.getQueryBreakdown(query);
Timer timer = profile.getTimer(QueryTimingType.CREATE_WEIGHT);
timer.start();
final Weight weight;
Expand Down Expand Up @@ -288,9 +267,6 @@ public void search(

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
if (searchContext.shouldUseConcurrentSearch()) {
fromConcurrentPath = true;
}
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@

package org.opensearch.search.profile;

import org.opensearch.search.profile.query.ConcurrentQueryProfileTree;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Base class for a profiler
Expand All @@ -46,7 +42,6 @@
public class AbstractProfiler<PB extends AbstractProfileBreakdown<?>, E> {

protected final AbstractInternalProfileTree<PB, E> profileTree;
protected Map<Long, ConcurrentQueryProfileTree> threadToProfileTree;

public AbstractProfiler(AbstractInternalProfileTree<PB, E> profileTree) {
this.profileTree = profileTree;
Expand All @@ -64,27 +59,14 @@ public PB getQueryBreakdown(E query) {
* Removes the last (e.g. most recent) element on the stack.
*/
public void pollLastElement() {
if (threadToProfileTree == null) {
profileTree.pollLast();
} else {
long threadId = Thread.currentThread().getId();
ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(threadId);
concurrentProfileTree.pollLast();
}
profileTree.pollLast();
}

/**
* @return a hierarchical representation of the profiled tree
*/
public List<ProfileResult> getTree() {
if (threadToProfileTree == null) {
return profileTree.getTree();
}
List<ProfileResult> profileResults = new ArrayList<>();
for (Map.Entry<Long, ConcurrentQueryProfileTree> profile : threadToProfileTree.entrySet()) {
profileResults.addAll(profile.getValue().getTree());
}
return profileResults;
return profileTree.getTree();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.profile.aggregation.AggregationProfiler;
import org.opensearch.search.profile.aggregation.ConcurrentAggregationProfiler;
import org.opensearch.search.profile.query.ConcurrentQueryProfiler;
import org.opensearch.search.profile.query.InternalQueryProfileTree;
import org.opensearch.search.profile.query.QueryProfiler;

import java.util.ArrayList;
Expand Down Expand Up @@ -64,7 +66,9 @@ public Profilers(ContextIndexSearcher searcher, boolean isConcurrentSegmentSearc

/** Switch to a new profile. */
public QueryProfiler addQueryProfiler() {
QueryProfiler profiler = new QueryProfiler(isConcurrentSegmentSearchEnabled);
QueryProfiler profiler = isConcurrentSegmentSearchEnabled
? new ConcurrentQueryProfiler()
: new QueryProfiler(new InternalQueryProfileTree());
searcher.setProfiler(profiler);
queryProfilers.add(profiler);
return profiler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
import org.apache.lucene.search.Collector;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.Timer;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand All @@ -24,7 +22,6 @@
* @opensearch.internal
*/
public class ConcurrentQueryProfileTree extends AbstractQueryProfileTree {
protected List<Timer> concurrentPathRewriteTimers = new ArrayList<>();

@Override
protected ContextualProfileBreakdown<QueryTimingType> createProfileBreakdown() {
Expand Down Expand Up @@ -91,11 +88,4 @@ private void updateCollectorToLeavesForChildBreakdowns(Integer parentToken, Map<
}
}
}

/**
* @return the concurrent path rewrite timer list for this profile tree
*/
public List<Timer> getConcurrentPathRewriteTimers() {
return concurrentPathRewriteTimers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.profile.query;

import org.apache.lucene.search.Query;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.search.profile.Timer;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* This class acts as a thread-local storage for profiling a query with concurrent execution
*
* @opensearch.internal
*/
public final class ConcurrentQueryProfiler extends QueryProfiler {

private final Map<Long, ConcurrentQueryProfileTree> threadToProfileTree;
private final Map<Long, LinkedList<Timer>> threadToRewriteTimers;

public ConcurrentQueryProfiler() {
super(new ConcurrentQueryProfileTree());
long threadId = getCurrentThreadId();
threadToProfileTree = new ConcurrentHashMap<>();
threadToProfileTree.put(threadId, (ConcurrentQueryProfileTree) profileTree);
threadToRewriteTimers = new ConcurrentHashMap<>();
threadToRewriteTimers.put(threadId, new LinkedList<>());
}

@Override
public ContextualProfileBreakdown<QueryTimingType> getQueryBreakdown(Query query) {
ConcurrentQueryProfileTree profileTree = threadToProfileTree.computeIfAbsent(
getCurrentThreadId(),
k -> new ConcurrentQueryProfileTree()
);
return profileTree.getProfileBreakdown(query);
}

/**
* Removes the last (e.g. most recent) element on ConcurrentQueryProfileTree stack.
*/
@Override
public void pollLastElement() {
ConcurrentQueryProfileTree concurrentProfileTree = threadToProfileTree.get(getCurrentThreadId());
concurrentProfileTree.pollLast();
}

/**
* @return a hierarchical representation of the profiled tree
*/
@Override
public List<ProfileResult> getTree() {
List<ProfileResult> profileResults = new ArrayList<>();
for (Map.Entry<Long, ConcurrentQueryProfileTree> profile : threadToProfileTree.entrySet()) {
profileResults.addAll(profile.getValue().getTree());
}
return profileResults;
}

/**
* Begin timing the rewrite phase of a request
*/
@Override
public void startRewriteTime() {
Timer rewriteTimer = new Timer();
threadToRewriteTimers.computeIfAbsent(getCurrentThreadId(), k -> new LinkedList<>()).add(rewriteTimer);
rewriteTimer.start();
}

/**
* Stop recording the current rewrite timer
*/
public void stopAndAddRewriteTime() {
Timer rewriteTimer = threadToRewriteTimers.get(getCurrentThreadId()).getLast();
rewriteTimer.stop();
}

/**
* @return total time taken to rewrite all queries in this concurrent query profiler
*/
@Override
public long getRewriteTime() {
long totalRewriteTime = 0L;
List<Timer> rewriteTimers = new LinkedList<>();
threadToRewriteTimers.values().forEach(rewriteTimers::addAll);
LinkedList<long[]> mergedIntervals = mergeRewriteTimeIntervals(rewriteTimers);
for (long[] interval : mergedIntervals) {
totalRewriteTime += interval[1] - interval[0];
}
return totalRewriteTime;
}

// package private for unit testing
LinkedList<long[]> mergeRewriteTimeIntervals(List<Timer> timers) {
LinkedList<long[]> mergedIntervals = new LinkedList<>();
timers.sort(Comparator.comparingLong(Timer::getEarliestTimerStartTime));
for (Timer timer : timers) {
long startTime = timer.getEarliestTimerStartTime();
long endTime = timer.getEarliestTimerStartTime() + timer.getApproximateTiming();
if (mergedIntervals.isEmpty() || mergedIntervals.getLast()[1] < timer.getEarliestTimerStartTime()) {
long[] interval = new long[2];
interval[0] = startTime;
interval[1] = endTime;
mergedIntervals.add(interval);
} else {
mergedIntervals.getLast()[1] = Math.max(mergedIntervals.getLast()[1], endTime);
}
}
return mergedIntervals;
}

private long getCurrentThreadId() {
return Thread.currentThread().getId();
}
}
Loading

0 comments on commit 8e540e1

Please sign in to comment.