Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracing for deep search path #12099

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625))
- [Admission Control] Integrate CPU AC with ResourceUsageCollector and add CPU AC stats to nodes/stats ([#10887](https://github.com/opensearch-project/OpenSearch/pull/10887))
- [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028))
- Tracing for deep search path ([#12099](https://github.com/opensearch-project/OpenSearch/pull/12099))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.pipeline.PipelinedRequest;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.transport.Transport;

import java.util.ArrayDeque;
Expand Down Expand Up @@ -220,6 +221,7 @@ public final void start() {
null
)
);
searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(searchRequestContext);
return;
}
executePhase(this);
Expand Down Expand Up @@ -439,21 +441,23 @@ private void onPhaseEnd(SearchRequestContext searchRequestContext) {
}
}

void onPhaseStart(SearchPhase phase) {
void onPhaseStart(SearchPhase phase, SearchRequestContext searchRequestContext) {
setCurrentPhase(phase);
if (SearchPhaseName.isValidName(phase.getName())) {
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this);
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this, searchRequestContext);
}
}

private void onRequestEnd(SearchRequestContext searchRequestContext) {
this.searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(this, searchRequestContext);
this.searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(searchRequestContext);
}

private void executePhase(SearchPhase phase) {
try {
onPhaseStart(phase);
phase.recordAndRun();
onPhaseStart(phase, searchRequestContext);
try (SpanScope spanScope = searchRequestContext.getTracer().withSpanInScope(searchRequestContext.getPhaseSpan())) {
phase.recordAndRun();
}
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
Expand Down Expand Up @@ -705,6 +709,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
searchContextId = null;
}
}
searchRequestContext.setSearchTask(getTask());
searchRequestContext.setTotalHits(internalSearchResponse.hits().getTotalHits());
searchRequestContext.setShardStats(results.getNumShards(), successfulOps.get(), skippedOps.get(), failures.length);
onPhaseEnd(searchRequestContext);
Expand All @@ -717,7 +722,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
if (SearchPhaseName.isValidName(phase.getName())) {
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this);
this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this, searchRequestContext);
}
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

import org.apache.lucene.search.TotalHits;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.noop.NoopSpan;
import org.opensearch.telemetry.tracing.noop.NoopTracer;

import java.util.EnumMap;
import java.util.HashMap;
Expand All @@ -23,20 +27,45 @@
*/
@InternalApi
class SearchRequestContext {
private final SearchRequest searchRequest;
private SearchTask searchTask;
private final SearchRequestOperationsListener searchRequestOperationsListener;
private long absoluteStartNanos;
private final Map<String, Long> phaseTookMap;
private TotalHits totalHits;
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;
private final Tracer tracer;
private Span requestSpan;
private Span phaseSpan;

private final SearchRequest searchRequest;
/**
* This constructor is for testing only
*/
SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener, SearchRequest searchRequest) {
this(searchRequestOperationsListener, searchRequest, NoopTracer.INSTANCE);
}

SearchRequestContext(final SearchRequestOperationsListener searchRequestOperationsListener, final SearchRequest searchRequest) {
SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener, SearchRequest searchRequest, Tracer tracer) {
this.searchRequestOperationsListener = searchRequestOperationsListener;
this.searchRequest = searchRequest;
this.tracer = tracer;
this.absoluteStartNanos = System.nanoTime();
this.phaseTookMap = new HashMap<>();
this.shardStats = new EnumMap<>(ShardStatsFieldNames.class);
this.searchRequest = searchRequest;
this.requestSpan = NoopSpan.INSTANCE;
this.phaseSpan = NoopSpan.INSTANCE;
}

SearchRequest getSearchRequest() {
return searchRequest;
}

void setSearchTask(SearchTask searchTask) {
this.searchTask = searchTask;
}

SearchTask getSearchTask() {
return searchTask;
}

SearchRequestOperationsListener getSearchRequestOperationsListener() {
Expand Down Expand Up @@ -107,6 +136,26 @@ String formattedShardStats() {
);
}
}

Tracer getTracer() {
return tracer;
}

void setRequestSpan(Span requestSpan) {
this.requestSpan = requestSpan;
}

Span getRequestSpan() {
return requestSpan;
}

void setPhaseSpan(Span phaseSpan) {
this.phaseSpan = phaseSpan;
}

Span getPhaseSpan() {
return phaseSpan;
}
}

enum ShardStatsFieldNames {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.telemetry.tracing.AttributeNames;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanContext;
import org.opensearch.telemetry.tracing.Tracer;

import static org.opensearch.core.common.Strings.capitalize;

/**
* Listener for search request tracing on the coordinator node
*
* @opensearch.internal
*/
public final class SearchRequestCoordinatorTrace extends SearchRequestOperationsListener {
private final Tracer tracer;

public SearchRequestCoordinatorTrace(Tracer tracer) {
this.tracer = tracer;
}

@Override
void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
searchRequestContext.setPhaseSpan(
tracer.startSpan(
SpanBuilder.from(
"coord" + capitalize(context.getCurrentPhase().getName()),
new SpanContext(searchRequestContext.getRequestSpan())
)
)
);
}

@Override
void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
searchRequestContext.getPhaseSpan().endSpan();
}

@Override
void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
searchRequestContext.getPhaseSpan().endSpan();
}

@Override
void onRequestEnd(SearchRequestContext searchRequestContext) {
Span requestSpan = searchRequestContext.getRequestSpan();

// add response-related attributes on request end
requestSpan.addAttribute(
AttributeNames.TOTAL_HITS,
searchRequestContext.totalHits() == null ? "0" : searchRequestContext.totalHits().toString()
);
requestSpan.addAttribute(
AttributeNames.SHARDS,
searchRequestContext.formattedShardStats().isEmpty() ? "null" : searchRequestContext.formattedShardStats()
);
requestSpan.addAttribute(
AttributeNames.SOURCE,
searchRequestContext.getSearchRequest().source() == null ? "null" : searchRequestContext.getSearchRequest().source().toString()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ protected SearchRequestOperationsListener(final boolean enabled) {
this.enabled = enabled;
}

abstract void onPhaseStart(SearchPhaseContext context);
abstract void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext);

abstract void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext);

abstract void onPhaseFailure(SearchPhaseContext context);
abstract void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext);

void onRequestStart(SearchRequestContext searchRequestContext) {}

void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}
void onRequestEnd(SearchRequestContext searchRequestContext) {}

boolean isEnabled(SearchRequest searchRequest) {
return isEnabled();
Expand Down Expand Up @@ -69,10 +69,10 @@ static final class CompositeListener extends SearchRequestOperationsListener {
}

@Override
void onPhaseStart(SearchPhaseContext context) {
void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onPhaseStart(context);
listener.onPhaseStart(context, searchRequestContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onPhaseStart listener [{}] failed", listener), e);
}
Expand All @@ -91,10 +91,10 @@ void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestCo
}

@Override
void onPhaseFailure(SearchPhaseContext context) {
void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onPhaseFailure(context);
listener.onPhaseFailure(context, searchRequestContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onPhaseFailure listener [{}] failed", listener), e);
}
Expand All @@ -113,10 +113,10 @@ void onRequestStart(SearchRequestContext searchRequestContext) {
}

@Override
public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
public void onRequestEnd(SearchRequestContext searchRequestContext) {
for (SearchRequestOperationsListener listener : listeners) {
try {
listener.onRequestEnd(context, searchRequestContext);
listener.onRequestEnd(searchRequestContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onRequestEnd listener [{}] failed", listener), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,29 +134,29 @@ public SearchRequestSlowLog(ClusterService clusterService) {
}

@Override
void onPhaseStart(SearchPhaseContext context) {}
void onPhaseStart(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
void onPhaseFailure(SearchPhaseContext context) {}
void onPhaseFailure(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
void onRequestStart(SearchRequestContext searchRequestContext) {}

@Override
void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
void onRequestEnd(SearchRequestContext searchRequestContext) {
long tookInNanos = System.nanoTime() - searchRequestContext.getAbsoluteStartNanos();

if (warnThreshold >= 0 && tookInNanos > warnThreshold && level.isLevelEnabledFor(SlowLogLevel.WARN)) {
logger.warn(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext));
logger.warn(new SearchRequestSlowLogMessage(tookInNanos, searchRequestContext));
} else if (infoThreshold >= 0 && tookInNanos > infoThreshold && level.isLevelEnabledFor(SlowLogLevel.INFO)) {
logger.info(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext));
logger.info(new SearchRequestSlowLogMessage(tookInNanos, searchRequestContext));
} else if (debugThreshold >= 0 && tookInNanos > debugThreshold && level.isLevelEnabledFor(SlowLogLevel.DEBUG)) {
logger.debug(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext));
logger.debug(new SearchRequestSlowLogMessage(tookInNanos, searchRequestContext));
} else if (traceThreshold >= 0 && tookInNanos > traceThreshold && level.isLevelEnabledFor(SlowLogLevel.TRACE)) {
logger.trace(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext));
logger.trace(new SearchRequestSlowLogMessage(tookInNanos, searchRequestContext));
}
}

Expand All @@ -167,15 +167,11 @@ void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequest
*/
static final class SearchRequestSlowLogMessage extends OpenSearchLogMessage {

SearchRequestSlowLogMessage(SearchPhaseContext context, long tookInNanos, SearchRequestContext searchRequestContext) {
super(prepareMap(context, tookInNanos, searchRequestContext), message(context, tookInNanos, searchRequestContext));
SearchRequestSlowLogMessage(long tookInNanos, SearchRequestContext searchRequestContext) {
super(prepareMap(tookInNanos, searchRequestContext), message(tookInNanos, searchRequestContext));
}

private static Map<String, Object> prepareMap(
SearchPhaseContext context,
long tookInNanos,
SearchRequestContext searchRequestContext
) {
private static Map<String, Object> prepareMap(long tookInNanos, SearchRequestContext searchRequestContext) {
final Map<String, Object> messageFields = new HashMap<>();
messageFields.put("took", TimeValue.timeValueNanos(tookInNanos));
messageFields.put("took_millis", TimeUnit.NANOSECONDS.toMillis(tookInNanos));
Expand All @@ -185,22 +181,24 @@ private static Map<String, Object> prepareMap(
} else {
messageFields.put("total_hits", "-1");
}
messageFields.put("search_type", context.getRequest().searchType());
messageFields.put("search_type", searchRequestContext.getSearchRequest().searchType());
messageFields.put("shards", searchRequestContext.formattedShardStats());

if (context.getRequest().source() != null) {
String source = escapeJson(context.getRequest().source().toString(FORMAT_PARAMS));
if (searchRequestContext.getSearchRequest().source() != null) {
String source = escapeJson(searchRequestContext.getSearchRequest().source().toString(FORMAT_PARAMS));
messageFields.put("source", source);
} else {
messageFields.put("source", "{}");
}

messageFields.put("id", context.getTask().getHeader(Task.X_OPAQUE_ID));
if (searchRequestContext.getSearchTask() != null) {
messageFields.put("id", searchRequestContext.getSearchTask().getHeader(Task.X_OPAQUE_ID));
} else {
messageFields.put("id", "");
}
return messageFields;
}

// Message will be used in plaintext logs
private static String message(SearchPhaseContext context, long tookInNanos, SearchRequestContext searchRequestContext) {
private static String message(long tookInNanos, SearchRequestContext searchRequestContext) {
final StringBuilder sb = new StringBuilder();
sb.append("took[").append(TimeValue.timeValueNanos(tookInNanos)).append("], ");
sb.append("took_millis[").append(TimeUnit.NANOSECONDS.toMillis(tookInNanos)).append("], ");
Expand All @@ -210,15 +208,15 @@ private static String message(SearchPhaseContext context, long tookInNanos, Sear
} else {
sb.append("total_hits[-1]");
}
sb.append("search_type[").append(context.getRequest().searchType()).append("], ");
sb.append("search_type[").append(searchRequestContext.getSearchRequest().searchType()).append("], ");
sb.append("shards[").append(searchRequestContext.formattedShardStats()).append("], ");
if (context.getRequest().source() != null) {
sb.append("source[").append(context.getRequest().source().toString(FORMAT_PARAMS)).append("], ");
if (searchRequestContext.getSearchRequest().source() != null) {
sb.append("source[").append(searchRequestContext.getSearchRequest().source().toString(FORMAT_PARAMS)).append("], ");
} else {
sb.append("source[], ");
}
if (context.getTask().getHeader(Task.X_OPAQUE_ID) != null) {
sb.append("id[").append(context.getTask().getHeader(Task.X_OPAQUE_ID)).append("]");
if (searchRequestContext.getSearchTask() != null) {
sb.append("id[").append(searchRequestContext.getSearchTask().getHeader(Task.X_OPAQUE_ID)).append("]");
} else {
sb.append("id[]");
}
Expand Down
Loading
Loading