Skip to content

Commit

Permalink
Tracing for deep search path
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Feb 7, 2024
1 parent 52b27f4 commit 4657a39
Show file tree
Hide file tree
Showing 16 changed files with 219 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625))
- [Admission Control] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats ([#10887](https://github.com/opensearch-project/OpenSearch/pull/10887))
- [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028))
- Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void onPhaseStart(SearchPhaseContext context) {}
public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
public void onPhaseFailure(SearchPhaseContext context) {}
public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}

@Override
public void onRequestStart(SearchRequestContext searchRequestContext) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public final void start() {
null
)
);
onRequestEnd(searchRequestContext);
return;
}
executePhase(this);
Expand Down Expand Up @@ -717,7 +718,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
if (SearchPhaseName.isValidName(phase.getName())) {
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, cause);
}
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void setTotalHits(TotalHits totalHits) {
this.totalHits = totalHits;
}

TotalHits totalHits() {
public TotalHits totalHits() {
return totalHits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected SearchRequestOperationsListener(final boolean enabled) {

protected abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext);

protected abstract void onPhaseFailure(SearchPhaseContext context);
protected abstract void onPhaseFailure(SearchPhaseContext context, Throwable cause);

protected void onRequestStart(SearchRequestContext searchRequestContext) {}

Expand Down Expand Up @@ -91,10 +91,10 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc
}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onPhaseFailure(context);
listener.onPhaseFailure(context, cause);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected void onPhaseStart(SearchPhaseContext context) {}
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {}
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}

@Override
protected void onRequestStart(SearchRequestContext searchRequestContext) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searc
}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listener.TraceableActionListener;
import org.opensearch.telemetry.tracing.listener.TraceableSearchRequestOperationsListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteClusterAware;
import org.opensearch.transport.RemoteClusterService;
Expand Down Expand Up @@ -173,6 +179,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final CircuitBreaker circuitBreaker;
private final SearchPipelineService searchPipelineService;
private final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory;
private final Tracer tracer;

private volatile boolean searchQueryMetricsEnabled;

Expand All @@ -195,7 +202,8 @@ public TransportSearchAction(
NamedWriteableRegistry namedWriteableRegistry,
SearchPipelineService searchPipelineService,
MetricsRegistry metricsRegistry,
SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory
SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory,
Tracer tracer
) {
super(SearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
this.client = client;
Expand All @@ -215,6 +223,7 @@ public TransportSearchAction(
this.searchRequestOperationsCompositeListenerFactory = searchRequestOperationsCompositeListenerFactory;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);
this.tracer = tracer;
}

private void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) {
Expand Down Expand Up @@ -431,49 +440,68 @@ private void executeRequest(
if (originalSearchRequest.isPhaseTook() == null) {
originalSearchRequest.setPhaseTook(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED));
}
SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsCompositeListenerFactory
.buildCompositeListener(originalSearchRequest, logger);
SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, originalSearchRequest);
searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext);

PipelinedRequest searchRequest;
ActionListener<SearchResponse> listener;
try {
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest);
listener = searchRequest.transformResponseListener(originalListener);
} catch (Exception e) {
originalListener.onFailure(e);
return;
}

ActionListener<SearchRequest> requestTransformListener = ActionListener.wrap(sr -> {
if (searchQueryMetricsEnabled) {
try {
searchQueryCategorizer.categorize(sr.source());
} catch (Exception e) {
logger.error("Error while trying to categorize the query.", e);
}
}
final Span requestSpan = tracer.startSpan(SpanBuilder.from(task, actionName));
try (final SpanScope spanScope = tracer.withSpanInScope(requestSpan)) {

ActionListener<SearchSourceBuilder> rewriteListener = buildRewriteListener(
sr,
task,
timeProvider,
searchAsyncActionProvider,
listener,
searchRequestContext
);
if (sr.source() == null) {
rewriteListener.onResponse(sr.source());
SearchRequestOperationsListener.CompositeListener requestOperationsListeners;
if (tracer.isRecording()) {
originalListener = TraceableActionListener.create(originalListener, requestSpan, tracer);
TraceableSearchRequestOperationsListener traceableSearchRequestOperationsListener =
new TraceableSearchRequestOperationsListener(tracer, requestSpan);
requestOperationsListeners = searchRequestOperationsCompositeListenerFactory.buildCompositeListener(
originalSearchRequest,
logger,
traceableSearchRequestOperationsListener
);
} else {
Rewriteable.rewriteAndFetch(
sr.source(),
searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener
requestOperationsListeners = searchRequestOperationsCompositeListenerFactory.buildCompositeListener(
originalSearchRequest,
logger
);
}
}, listener::onFailure);
searchRequest.transformRequest(requestTransformListener);
SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, originalSearchRequest);
searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext);

PipelinedRequest searchRequest;
ActionListener<SearchResponse> listener;
try {
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest);
listener = searchRequest.transformResponseListener(originalListener);
} catch (Exception e) {
originalListener.onFailure(e);
return;
}

ActionListener<SearchRequest> requestTransformListener = ActionListener.wrap(sr -> {
if (searchQueryMetricsEnabled) {
try {
searchQueryCategorizer.categorize(sr.source());
} catch (Exception e) {
logger.error("Error while trying to categorize the query.", e);
}
}

ActionListener<SearchSourceBuilder> rewriteListener = buildRewriteListener(
sr,
task,
timeProvider,
searchAsyncActionProvider,
listener,
searchRequestContext
);
if (sr.source() == null) {
rewriteListener.onResponse(sr.source());
} else {
Rewriteable.rewriteAndFetch(
sr.source(),
searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener
);
}
}, listener::onFailure);
searchRequest.transformRequest(requestTransformListener);
}
}

private ActionListener<SearchSourceBuilder> buildRewriteListener(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ private AttributeNames() {
*/
public static final String TRANSPORT_ACTION = "action";

/**
* Task id
*/
public static final String TASK_ID = "task_id";

/**
* Parent task id
*/
public static final String PARENT_TASK_ID = "parent_task_id";

/**
* Index Name
*/
Expand All @@ -94,4 +104,9 @@ private AttributeNames() {
* Refresh Policy
*/
public static final String REFRESH_POLICY = "refresh_policy";

/**
* Search Response Total Hits
*/
public static final String TOTAL_HITS = "total_hits";
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.http.HttpRequest;
import org.opensearch.rest.RestRequest;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.transport.TcpChannel;
import org.opensearch.transport.Transport;
Expand Down Expand Up @@ -170,4 +171,43 @@ private static Attributes buildSpanAttributes(String nodeId, ReplicatedWriteRequ
return attributes;
}

/**
* Creates {@link SpanCreationContext} with parent set to specified SpanContext.
* @param spanName name of span.
* @param parentSpan target parent span.
* @return context
*/
public static SpanCreationContext from(String spanName, SpanContext parentSpan) {
return SpanCreationContext.server().name(spanName).parent(parentSpan);
}

/**
* Creates {@link SpanCreationContext} with parent set to specified SpanContext.
* @param task search task.
* @param actionName action.
* @return context
*/
public static SpanCreationContext from(Task task, String actionName) {
return SpanCreationContext.server().name(createSpanName(task, actionName)).attributes(buildSpanAttributes(task, actionName));
}

private static Attributes buildSpanAttributes(Task task, String actionName) {
Attributes attributes = Attributes.create().addAttribute(AttributeNames.TRANSPORT_ACTION, actionName);
if (task != null) {
attributes.addAttribute(AttributeNames.TASK_ID, task.getId());
if (task.getParentTaskId() != null && task.getParentTaskId().isSet()) {
attributes.addAttribute(AttributeNames.PARENT_TASK_ID, task.getParentTaskId().getId());
}
}
return attributes;

}

private static String createSpanName(Task task, String actionName) {
if (task != null) {
return task.getType() + SEPARATOR + task.getAction();
} else {
return actionName;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.listener;

import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchRequestContext;
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.telemetry.tracing.AttributeNames;
import org.opensearch.telemetry.tracing.ScopedSpan;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanContext;
import org.opensearch.telemetry.tracing.Tracer;

import static org.opensearch.core.common.Strings.capitalize;

/**
* SearchRequestOperationsListener subscriber for search request tracing
*
* @opensearch.internal
*/
public final class TraceableSearchRequestOperationsListener extends SearchRequestOperationsListener {
private final Tracer tracer;
private final Span requestSpan;
private ScopedSpan phaseScopedSpan;

public TraceableSearchRequestOperationsListener(final Tracer tracer, final Span requestSpan) {
this.tracer = tracer;
this.requestSpan = requestSpan;
this.phaseScopedSpan = null;
}

@Override
protected void onPhaseStart(SearchPhaseContext context) {
phaseScopedSpan = tracer.startScopedSpan(
SpanBuilder.from(capitalize(context.getCurrentPhase().getName()), new SpanContext(requestSpan))
);
}

@Override
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
phaseScopedSpan.close();
phaseScopedSpan = null;
}

@Override
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
phaseScopedSpan.setError(new Exception(cause));
phaseScopedSpan.close();
phaseScopedSpan = null;
}

@Override
public void onRequestStart(SearchRequestContext searchRequestContext) {}

@Override
public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
// add response-related attributes on request end
requestSpan.addAttribute(
AttributeNames.TOTAL_HITS,
searchRequestContext.totalHits() == null ? 0 : searchRequestContext.totalHits().value
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ protected void onPhaseStart(SearchPhaseContext context) {}
protected void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
protected void onPhaseFailure(SearchPhaseContext context) {}
protected void onPhaseFailure(SearchPhaseContext context, Throwable cause) {}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRe
}

@Override
public void onPhaseFailure(SearchPhaseContext context) {
public void onPhaseFailure(SearchPhaseContext context, Throwable cause) {
searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec();
}
};
Expand Down
Loading

0 comments on commit 4657a39

Please sign in to comment.