Skip to content

Commit

Permalink
move resource usages interactions into TaskResourceTrackingService
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Jun 7, 2024
1 parent d671dd2 commit 69629ff
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 110 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
- Add support for query-level resource usage tracking ([#13172](https://github.com/opensearch-project/OpenSearch/pull/13172))
- Add support for query level resource usage tracking ([#13172](https://github.com/opensearch-project/OpenSearch/pull/13172))

### Dependencies
- Bump `com.github.spullara.mustache.java:compiler` from 0.9.10 to 0.9.13 ([#13329](https://github.com/opensearch-project/OpenSearch/pull/13329), [#13559](https://github.com/opensearch-project/OpenSearch/pull/13559))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ShardOperationFailedException;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -628,7 +629,7 @@ protected void onShardResult(Result result, SearchShardIterator shardIt) {
}

public void setPhaseResourceUsages() {
String taskResourceUsage = searchRequestContext.getTaskResourceUsageSupplier().get();
TaskResourceInfo taskResourceUsage = searchRequestContext.getTaskResourceUsageSupplier().get();
searchRequestContext.recordPhaseResourceUsage(taskResourceUsage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,15 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.annotation.InternalApi;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;

/**
Expand All @@ -44,20 +38,20 @@ public class SearchRequestContext {
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;

private final SearchRequest searchRequest;
private final List<TaskResourceInfo> phaseResourceUsage;
private final Supplier<String> taskResourceUsageSupplier;
private final LinkedBlockingQueue<TaskResourceInfo> phaseResourceUsage;
private final Supplier<TaskResourceInfo> taskResourceUsageSupplier;

SearchRequestContext(
final SearchRequestOperationsListener searchRequestOperationsListener,
final SearchRequest searchRequest,
final Supplier<String> taskResourceUsageSupplier
final Supplier<TaskResourceInfo> taskResourceUsageSupplier
) {
this.searchRequestOperationsListener = searchRequestOperationsListener;
this.absoluteStartNanos = System.nanoTime();
this.phaseTookMap = new HashMap<>();
this.shardStats = new EnumMap<>(ShardStatsFieldNames.class);
this.searchRequest = searchRequest;
this.phaseResourceUsage = new ArrayList<>();
this.phaseResourceUsage = new LinkedBlockingQueue<>();
this.taskResourceUsageSupplier = taskResourceUsageSupplier;
}

Expand Down Expand Up @@ -130,32 +124,22 @@ String formattedShardStats() {
}
}

public Supplier<String> getTaskResourceUsageSupplier() {
public Supplier<TaskResourceInfo> getTaskResourceUsageSupplier() {
return taskResourceUsageSupplier;
}

public SearchRequest getRequest() {
return searchRequest;
}

public void recordPhaseResourceUsage(String usage) {
try {
if (usage != null && !usage.isEmpty()) {
XContentParser parser = XContentHelper.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
new BytesArray(usage),
MediaTypeRegistry.JSON
);
this.phaseResourceUsage.add(TaskResourceInfo.PARSER.apply(parser, null));
}
} catch (IOException e) {
logger.debug("fail to parse phase resource usages: ", e);
public void recordPhaseResourceUsage(TaskResourceInfo usage) {
if (usage != null) {
this.phaseResourceUsage.add(usage);
}
}

public List<TaskResourceInfo> getPhaseResourceUsage() {
return phaseResourceUsage;
return new ArrayList<>(phaseResourceUsage);

Check warning on line 138 in server/src/main/java/org/opensearch/action/search/SearchRequestContext.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/search/SearchRequestContext.java#L138

Added line #L138 was not covered by tests
}

public SearchRequest getRequest() {
return searchRequest;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
Expand Down Expand Up @@ -125,7 +126,6 @@
import static org.opensearch.action.search.SearchType.DFS_QUERY_THEN_FETCH;
import static org.opensearch.action.search.SearchType.QUERY_THEN_FETCH;
import static org.opensearch.search.sort.FieldSortBuilder.hasPrimaryFieldSort;
import static org.opensearch.tasks.TaskResourceTrackingService.TASK_RESOURCE_USAGE;

/**
* Perform search action
Expand Down Expand Up @@ -187,6 +187,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final MetricsRegistry metricsRegistry;

private SearchQueryCategorizer searchQueryCategorizer;
private TaskResourceTrackingService taskResourceTrackingService;

@Inject
public TransportSearchAction(
Expand All @@ -204,7 +205,8 @@ public TransportSearchAction(
SearchPipelineService searchPipelineService,
MetricsRegistry metricsRegistry,
SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory,
Tracer tracer
Tracer tracer,
TaskResourceTrackingService taskResourceTrackingService
) {
super(SearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
this.client = client;
Expand All @@ -225,6 +227,7 @@ public TransportSearchAction(
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);
this.tracer = tracer;
this.taskResourceTrackingService = taskResourceTrackingService;
}

private void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) {
Expand Down Expand Up @@ -452,14 +455,10 @@ private void executeRequest(
logger,
TraceableSearchRequestOperationsListener.create(tracer, requestSpan)
);
SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, originalSearchRequest, () -> {
List<String> taskResourceUsages = threadPool.getThreadContext().getResponseHeaders().get(TASK_RESOURCE_USAGE);
if (taskResourceUsages != null && taskResourceUsages.size() > 0) {
return taskResourceUsages.get(0);
}
return null;
}

SearchRequestContext searchRequestContext = new SearchRequestContext(
requestOperationsListeners,
originalSearchRequest,
taskResourceTrackingService::getTaskResourceUsageFromThreadContext
);
searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext);

Expand Down
70 changes: 6 additions & 64 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,6 @@
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.tasks.resourcetracker.ResourceStats;
import org.opensearch.core.tasks.resourcetracker.ResourceStatsType;
import org.opensearch.core.tasks.resourcetracker.ResourceUsageInfo;
import org.opensearch.core.tasks.resourcetracker.ResourceUsageMetric;
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
import org.opensearch.core.tasks.resourcetracker.ThreadResourceInfo;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
Expand Down Expand Up @@ -168,7 +161,6 @@
import static org.opensearch.common.unit.TimeValue.timeValueHours;
import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.common.unit.TimeValue.timeValueMinutes;
import static org.opensearch.tasks.TaskResourceTrackingService.TASK_RESOURCE_USAGE;

/**
* The main search service
Expand Down Expand Up @@ -571,7 +563,7 @@ private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchShardT
processFailure(readerContext, e);
throw e;
} finally {
writeTaskResourceUsage(task);
taskResourceTrackingService.writeTaskResourceUsage(task, clusterService.localNode().getId());
}
}

Expand Down Expand Up @@ -675,7 +667,7 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchSh
processFailure(readerContext, e);
throw e;
} finally {
writeTaskResourceUsage(task);
taskResourceTrackingService.writeTaskResourceUsage(task, clusterService.localNode().getId());
}
}

Expand Down Expand Up @@ -722,7 +714,7 @@ public void executeQueryPhase(
// we handle the failure in the failure listener below
throw e;
} finally {
writeTaskResourceUsage(task);
taskResourceTrackingService.writeTaskResourceUsage(task, clusterService.localNode().getId());
}
}, wrapFailureListener(listener, readerContext, markAsUsed));
}
Expand Down Expand Up @@ -756,7 +748,7 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task,
// we handle the failure in the failure listener below
throw e;
} finally {
writeTaskResourceUsage(task);
taskResourceTrackingService.writeTaskResourceUsage(task, clusterService.localNode().getId());
}
}, wrapFailureListener(listener, readerContext, markAsUsed));
}
Expand Down Expand Up @@ -807,7 +799,7 @@ public void executeFetchPhase(
// we handle the failure in the failure listener below
throw e;
} finally {
writeTaskResourceUsage(task);
taskResourceTrackingService.writeTaskResourceUsage(task, clusterService.localNode().getId());
}
}, wrapFailureListener(listener, readerContext, markAsUsed));
}
Expand Down Expand Up @@ -839,7 +831,7 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A
// we handle the failure in the failure listener below
throw e;
} finally {
writeTaskResourceUsage(task);
taskResourceTrackingService.writeTaskResourceUsage(task, clusterService.localNode().getId());
}
}, wrapFailureListener(listener, readerContext, markAsUsed));
}
Expand Down Expand Up @@ -1139,56 +1131,6 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
return searchContext;
}

private void writeTaskResourceUsage(SearchShardTask task) {
try {
// Get resource usages from when the task started
ThreadResourceInfo threadResourceInfo = task.getActiveThreadResourceInfo(
Thread.currentThread().getId(),
ResourceStatsType.WORKER_STATS
);
if (threadResourceInfo == null) {
return;
}
Map<ResourceStats, ResourceUsageInfo.ResourceStatsInfo> startValues = threadResourceInfo.getResourceUsageInfo().getStatsInfo();
if (!(startValues.containsKey(ResourceStats.CPU) && startValues.containsKey(ResourceStats.MEMORY))) {
return;
}
// Get current resource usages
ResourceUsageMetric[] endValues = taskResourceTrackingService.getResourceUsageMetricsForThread(Thread.currentThread().getId());
long cpu = -1, mem = -1;
for (ResourceUsageMetric endValue : endValues) {
if (endValue.getStats() == ResourceStats.MEMORY) {
mem = endValue.getValue();
} else if (endValue.getStats() == ResourceStats.CPU) {
cpu = endValue.getValue();
}
}
if (cpu == -1 || mem == -1) {
logger.debug("Invalid resource usage value, cpu [{}], memory [{}]: ", cpu, mem);
return;
}

// Build task resource usage info
TaskResourceInfo taskResourceInfo = new TaskResourceInfo.Builder().setAction(task.getAction())
.setTaskId(task.getId())
.setParentTaskId(task.getParentTaskId().getId())
.setNodeId(clusterService.localNode().getId())
.setTaskResourceUsage(
new TaskResourceUsage(
cpu - startValues.get(ResourceStats.CPU).getStartValue(),
mem - startValues.get(ResourceStats.MEMORY).getStartValue()
)
)
.build();

// Remove the existing TASK_RESOURCE_USAGE header since it would have come from an earlier phase in the same request.
threadPool.getThreadContext().removeResponseHeader(TASK_RESOURCE_USAGE);
threadPool.getThreadContext().addResponseHeader(TASK_RESOURCE_USAGE, taskResourceInfo.toString());
} catch (Exception e) {
logger.debug("Error during writing task resource usage: ", e);
}
}

private void freeAllContextForIndex(Index index) {
assert index != null;
for (ReaderContext ctx : activeReaders.values()) {
Expand Down
Loading

0 comments on commit 69629ff

Please sign in to comment.