diff --git a/CHANGELOG.md b/CHANGELOG.md index 2165fcb4700fb..68f093781a693 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -95,6 +95,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added - Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351)) +- Request level coordinator slow logs ([#10650](https://github.com/opensearch-project/OpenSearch/pull/10650)) - [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567)) - Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255)) - Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352)) diff --git a/distribution/docker/src/docker/config/log4j2.properties b/distribution/docker/src/docker/config/log4j2.properties index 761478a9fdc6e..8edd6a7354a16 100644 --- a/distribution/docker/src/docker/config/log4j2.properties +++ b/distribution/docker/src/docker/config/log4j2.properties @@ -34,6 +34,16 @@ logger.deprecation.appenderRef.deprecation_rolling.ref = deprecation_rolling logger.deprecation.appenderRef.header_warning.ref = header_warning logger.deprecation.additivity = false +appender.search_request_slowlog_json_appender.type = Console +appender.search_request_slowlog_json_appender.name = search_request_slowlog_json_appender +appender.search_request_slowlog_json_appender.layout.type = OpenSearchJsonLayout +appender.search_request_slowlog_json_appender.layout.type_name = search_request_slowlog + +logger.search_request_slowlog_logger.name = cluster.search.request.slowlog +logger.search_request_slowlog_logger.level = trace +logger.search_request_slowlog_logger.appenderRef.search_request_slowlog_json_appender.ref = search_request_slowlog_json_appender +logger.search_request_slowlog_logger.additivity = false + appender.index_search_slowlog_rolling.type = Console appender.index_search_slowlog_rolling.name = index_search_slowlog_rolling appender.index_search_slowlog_rolling.layout.type = OpenSearchJsonLayout diff --git a/distribution/src/config/log4j2.properties b/distribution/src/config/log4j2.properties index bb27aaf2e22e6..d040afae82e53 100644 --- a/distribution/src/config/log4j2.properties +++ b/distribution/src/config/log4j2.properties @@ -113,6 +113,47 @@ logger.deprecation.appenderRef.deprecation_rolling_old.ref = deprecation_rolling logger.deprecation.appenderRef.header_warning.ref = header_warning logger.deprecation.additivity = false +######## Search Request Slowlog JSON #################### +appender.search_request_slowlog_json_appender.type = RollingFile +appender.search_request_slowlog_json_appender.name = search_request_slowlog_json_appender +appender.search_request_slowlog_json_appender.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs\ + .cluster_name}_index_search_slowlog.json +appender.search_request_slowlog_json_appender.filePermissions = rw-r----- +appender.search_request_slowlog_json_appender.layout.type = OpenSearchJsonLayout +appender.search_request_slowlog_json_appender.layout.type_name = search_request_slowlog +appender.search_request_slowlog_json_appender.layout.opensearchmessagefields=message,took,took_millis,phase_took,total_hits,search_type,shards,source,id + +appender.search_request_slowlog_json_appender.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs\ + .cluster_name}_index_search_slowlog-%i.json.gz +appender.search_request_slowlog_json_appender.policies.type = Policies +appender.search_request_slowlog_json_appender.policies.size.type = SizeBasedTriggeringPolicy +appender.search_request_slowlog_json_appender.policies.size.size = 1GB +appender.search_request_slowlog_json_appender.strategy.type = DefaultRolloverStrategy +appender.search_request_slowlog_json_appender.strategy.max = 4 +################################################# +######## Search Request Slowlog Log File - old style pattern #### +appender.search_request_slowlog_log_appender.type = RollingFile +appender.search_request_slowlog_log_appender.name = search_request_slowlog_log_appender +appender.search_request_slowlog_log_appender.fileName = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}\ + _index_search_slowlog.log +appender.search_request_slowlog_log_appender.filePermissions = rw-r----- +appender.search_request_slowlog_log_appender.layout.type = PatternLayout +appender.search_request_slowlog_log_appender.layout.pattern = [%d{ISO8601}][%-5p][%c{1.}] [%node_name]%marker %m%n + +appender.search_request_slowlog_log_appender.filePattern = ${sys:opensearch.logs.base_path}${sys:file.separator}${sys:opensearch.logs.cluster_name}\ + _index_search_slowlog-%i.log.gz +appender.search_request_slowlog_log_appender.policies.type = Policies +appender.search_request_slowlog_log_appender.policies.size.type = SizeBasedTriggeringPolicy +appender.search_request_slowlog_log_appender.policies.size.size = 1GB +appender.search_request_slowlog_log_appender.strategy.type = DefaultRolloverStrategy +appender.search_request_slowlog_log_appender.strategy.max = 4 +################################################# +logger.search_request_slowlog_logger.name = cluster.search.request.slowlog +logger.search_request_slowlog_logger.level = trace +logger.search_request_slowlog_logger.appenderRef.search_request_slowlog_json_appender.ref = search_request_slowlog_json_appender +logger.search_request_slowlog_logger.appenderRef.search_request_slowlog_log_appender.ref = search_request_slowlog_log_appender +logger.search_request_slowlog_logger.additivity = false + ######## Search slowlog JSON #################### appender.index_search_slowlog_rolling.type = RollingFile appender.index_search_slowlog_rolling.name = index_search_slowlog_rolling diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 14f57218ae1dc..f18bbb8a1cc13 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -65,10 +65,10 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; @@ -115,13 +115,12 @@ abstract class AbstractSearchAsyncAction exten private final int maxConcurrentRequestsPerNode; private final Map pendingExecutionsPerNode = new ConcurrentHashMap<>(); private final boolean throttleConcurrentRequests; + private final SearchRequestContext searchRequestContext; private SearchPhase currentPhase; private final List releasables = new ArrayList<>(); - private Optional searchRequestOperationsListener; - AbstractSearchAsyncAction( String name, Logger logger, @@ -140,7 +139,7 @@ abstract class AbstractSearchAsyncAction exten SearchPhaseResults resultConsumer, int maxConcurrentRequestsPerNode, SearchResponse.Clusters clusters, - SearchRequestOperationsListener searchRequestOperationsListener + SearchRequestContext searchRequestContext ) { super(name); final List toSkipIterators = new ArrayList<>(); @@ -176,7 +175,7 @@ abstract class AbstractSearchAsyncAction exten this.indexRoutings = indexRoutings; this.results = resultConsumer; this.clusters = clusters; - this.searchRequestOperationsListener = Optional.ofNullable(searchRequestOperationsListener); + this.searchRequestContext = searchRequestContext; } @Override @@ -427,18 +426,26 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha clusterState.version() ); } - onPhaseEnd(); + onPhaseEnd(searchRequestContext); executePhase(nextPhase); } } - private void onPhaseEnd() { - this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseEnd(this); }); + private void onPhaseEnd(SearchRequestContext searchRequestContext) { + if (getCurrentPhase() != null) { + long tookInNanos = System.nanoTime() - getCurrentPhase().getStartTimeInNanos(); + searchRequestContext.updatePhaseTookMap(getCurrentPhase().getName(), TimeUnit.NANOSECONDS.toMillis(tookInNanos)); + } + this.searchRequestContext.getSearchRequestOperationsListener().onPhaseEnd(this, searchRequestContext); } private void onPhaseStart(SearchPhase phase) { setCurrentPhase(phase); - this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseStart(this); }); + this.searchRequestContext.getSearchRequestOperationsListener().onPhaseStart(this); + } + + private void onRequestEnd(SearchRequestContext searchRequestContext) { + this.searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(this, searchRequestContext); } private void executePhase(SearchPhase phase) { @@ -696,15 +703,18 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At searchContextId = null; } } + searchRequestContext.setTotalHits(internalSearchResponse.hits().getTotalHits()); + searchRequestContext.setShardStats(results.getNumShards(), successfulOps.get(), skippedOps.get(), failures.length); + onPhaseEnd(searchRequestContext); + onRequestEnd(searchRequestContext); listener.onResponse(buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId)); } - onPhaseEnd(); setCurrentPhase(null); } @Override public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) { - this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> searchRequestOperations.onPhaseFailure(this)); + this.searchRequestContext.getSearchRequestOperationsListener().onPhaseFailure(this); raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures())); } diff --git a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java index ae481736ad0aa..c693eea4a2c33 100644 --- a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java @@ -91,7 +91,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction, SearchPhase> phaseFactory, SearchResponse.Clusters clusters, - SearchRequestOperationsListener searchRequestOperationsListener + SearchRequestContext searchRequestContext ) { // We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests super( @@ -112,7 +112,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction(shardsIts.size()), request.getMaxConcurrentShardRequests(), clusters, - searchRequestOperationsListener + searchRequestContext ); this.queryPhaseResultConsumer = queryPhaseResultConsumer; this.searchPhaseController = searchPhaseController; diff --git a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java index ca5ad087d3089..c26bd5eef8c15 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -82,7 +82,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction phaseTookMap; + private TotalHits totalHits; + private final EnumMap shardStats; + + public SearchRequestContext() { + this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger())); + } + + public SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) { + this.searchRequestOperationsListener = searchRequestOperationsListener; + this.absoluteStartNanos = System.nanoTime(); + this.phaseTookMap = new HashMap<>(); + this.shardStats = new EnumMap<>(ShardStatsFieldNames.class); + } + + SearchRequestOperationsListener getSearchRequestOperationsListener() { + return searchRequestOperationsListener; + } + + void updatePhaseTookMap(String phaseName, Long tookTime) { + this.phaseTookMap.put(phaseName, tookTime); + } + + Map phaseTookMap() { + return phaseTookMap; + } + + /** + * Override absoluteStartNanos set in constructor. + * For testing only + */ + void setAbsoluteStartNanos(long absoluteStartNanos) { + this.absoluteStartNanos = absoluteStartNanos; + } + + /** + * Request start time in nanos + */ + long getAbsoluteStartNanos() { + return absoluteStartNanos; + } + + void setTotalHits(TotalHits totalHits) { + this.totalHits = totalHits; + } + + TotalHits totalHits() { + return totalHits; + } + + void setShardStats(int total, int successful, int skipped, int failed) { + this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_TOTAL, total); + this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SUCCESSFUL, successful); + this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SKIPPED, skipped); + this.shardStats.put(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_FAILED, failed); + } + + String formattedShardStats() { + if (shardStats.isEmpty()) { + return ""; + } else { + return String.format( + Locale.ROOT, + "{%s:%s, %s:%s, %s:%s, %s:%s}", + ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_TOTAL.toString(), + shardStats.get(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_TOTAL), + ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SUCCESSFUL.toString(), + shardStats.get(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SUCCESSFUL), + ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SKIPPED.toString(), + shardStats.get(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_SKIPPED), + ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_FAILED.toString(), + shardStats.get(ShardStatsFieldNames.SEARCH_REQUEST_SLOWLOG_SHARD_FAILED) + ); + } + } +} + +enum ShardStatsFieldNames { + SEARCH_REQUEST_SLOWLOG_SHARD_TOTAL("total"), + SEARCH_REQUEST_SLOWLOG_SHARD_SUCCESSFUL("successful"), + SEARCH_REQUEST_SLOWLOG_SHARD_SKIPPED("skipped"), + SEARCH_REQUEST_SLOWLOG_SHARD_FAILED("failed"); + + private final String name; + + ShardStatsFieldNames(String name) { + this.name = name; + } + + @Override + public String toString() { + return this.name; + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java index 89d725b56bded..c12b659207fbe 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestOperationsListener.java @@ -22,10 +22,14 @@ public interface SearchRequestOperationsListener { void onPhaseStart(SearchPhaseContext context); - void onPhaseEnd(SearchPhaseContext context); + void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext); void onPhaseFailure(SearchPhaseContext context); + default void onRequestStart(SearchRequestContext searchRequestContext) {} + + default void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + /** * Holder of Composite Listeners * @@ -53,10 +57,10 @@ public void onPhaseStart(SearchPhaseContext context) { } @Override - public void onPhaseEnd(SearchPhaseContext context) { + public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { for (SearchRequestOperationsListener listener : listeners) { try { - listener.onPhaseEnd(context); + listener.onPhaseEnd(context, searchRequestContext); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onPhaseEnd listener [{}] failed", listener), e); } @@ -73,5 +77,27 @@ public void onPhaseFailure(SearchPhaseContext context) { } } } + + @Override + public void onRequestStart(SearchRequestContext searchRequestContext) { + for (SearchRequestOperationsListener listener : listeners) { + try { + listener.onRequestStart(searchRequestContext); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onRequestStart listener [{}] failed", listener), e); + } + } + } + + @Override + public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + for (SearchRequestOperationsListener listener : listeners) { + try { + listener.onRequestEnd(context, searchRequestContext); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onRequestEnd listener [{}] failed", listener), e); + } + } + } } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java new file mode 100644 index 0000000000000..6a0d60ffd3984 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestSlowLog.java @@ -0,0 +1,273 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.action.search; + +import com.fasterxml.jackson.core.io.JsonStringEncoder; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.logging.Loggers; +import org.opensearch.common.logging.OpenSearchLogMessage; +import org.opensearch.common.logging.SlowLogLevel; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.tasks.Task; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * The request-level search slow log implementation + * + * @opensearch.internal + */ +public final class SearchRequestSlowLog implements SearchRequestOperationsListener { + private static final Charset UTF_8 = StandardCharsets.UTF_8; + + private long warnThreshold; + private long infoThreshold; + private long debugThreshold; + private long traceThreshold; + private SlowLogLevel level; + + private final Logger logger; + + static final String CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX = "cluster.search.request.slowlog"; + + public static final Setting CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING = Setting.timeSetting( + CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX + ".threshold.warn", + TimeValue.timeValueNanos(-1), + TimeValue.timeValueMillis(-1), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + public static final Setting CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING = Setting.timeSetting( + CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX + ".threshold.info", + TimeValue.timeValueNanos(-1), + TimeValue.timeValueMillis(-1), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + public static final Setting CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING = Setting.timeSetting( + CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX + ".threshold.debug", + TimeValue.timeValueNanos(-1), + TimeValue.timeValueMillis(-1), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + public static final Setting CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING = Setting.timeSetting( + CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX + ".threshold.trace", + TimeValue.timeValueNanos(-1), + TimeValue.timeValueMillis(-1), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + public static final Setting CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL = new Setting<>( + CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX + ".level", + SlowLogLevel.TRACE.name(), + SlowLogLevel::parse, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); + + public SearchRequestSlowLog(ClusterService clusterService) { + this(clusterService, LogManager.getLogger(CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX)); // logger configured in log4j2.properties + } + + SearchRequestSlowLog(ClusterService clusterService, Logger logger) { + this.logger = logger; + Loggers.setLevel(this.logger, SlowLogLevel.TRACE.name()); + + this.warnThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING).nanos(); + this.infoThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING).nanos(); + this.debugThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING).nanos(); + this.traceThreshold = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING).nanos(); + this.level = clusterService.getClusterSettings().get(CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL); + + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING, this::setWarnThreshold); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING, this::setInfoThreshold); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING, this::setDebugThreshold); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING, this::setTraceThreshold); + clusterService.getClusterSettings().addSettingsUpdateConsumer(CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL, this::setLevel); + } + + @Override + public void onPhaseStart(SearchPhaseContext context) {} + + @Override + public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + + @Override + public void onPhaseFailure(SearchPhaseContext context) {} + + @Override + public void onRequestStart(SearchRequestContext searchRequestContext) {} + + @Override + public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + long tookInNanos = System.nanoTime() - searchRequestContext.getAbsoluteStartNanos(); + + if (warnThreshold >= 0 && tookInNanos > warnThreshold && level.isLevelEnabledFor(SlowLogLevel.WARN)) { + logger.warn(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext)); + } else if (infoThreshold >= 0 && tookInNanos > infoThreshold && level.isLevelEnabledFor(SlowLogLevel.INFO)) { + logger.info(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext)); + } else if (debugThreshold >= 0 && tookInNanos > debugThreshold && level.isLevelEnabledFor(SlowLogLevel.DEBUG)) { + logger.debug(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext)); + } else if (traceThreshold >= 0 && tookInNanos > traceThreshold && level.isLevelEnabledFor(SlowLogLevel.TRACE)) { + logger.trace(new SearchRequestSlowLogMessage(context, tookInNanos, searchRequestContext)); + } + } + + /** + * Search request slow log message + * + * @opensearch.internal + */ + static final class SearchRequestSlowLogMessage extends OpenSearchLogMessage { + + SearchRequestSlowLogMessage(SearchPhaseContext context, long tookInNanos, SearchRequestContext searchRequestContext) { + super(prepareMap(context, tookInNanos, searchRequestContext), message(context, tookInNanos, searchRequestContext)); + } + + private static Map prepareMap( + SearchPhaseContext context, + long tookInNanos, + SearchRequestContext searchRequestContext + ) { + final Map messageFields = new HashMap<>(); + messageFields.put("took", TimeValue.timeValueNanos(tookInNanos)); + messageFields.put("took_millis", TimeUnit.NANOSECONDS.toMillis(tookInNanos)); + messageFields.put("phase_took", searchRequestContext.phaseTookMap().toString()); + if (searchRequestContext.totalHits() != null) { + messageFields.put("total_hits", searchRequestContext.totalHits()); + } else { + messageFields.put("total_hits", "-1"); + } + messageFields.put("search_type", context.getRequest().searchType()); + messageFields.put("shards", searchRequestContext.formattedShardStats()); + + if (context.getRequest().source() != null) { + String source = escapeJson(context.getRequest().source().toString(FORMAT_PARAMS)); + messageFields.put("source", source); + } else { + messageFields.put("source", "{}"); + } + + messageFields.put("id", context.getTask().getHeader(Task.X_OPAQUE_ID)); + return messageFields; + } + + // Message will be used in plaintext logs + private static String message(SearchPhaseContext context, long tookInNanos, SearchRequestContext searchRequestContext) { + final StringBuilder sb = new StringBuilder(); + sb.append("took[").append(TimeValue.timeValueNanos(tookInNanos)).append("], "); + sb.append("took_millis[").append(TimeUnit.NANOSECONDS.toMillis(tookInNanos)).append("], "); + sb.append("phase_took_millis[").append(searchRequestContext.phaseTookMap().toString()).append("], "); + if (searchRequestContext.totalHits() != null) { + sb.append("total_hits[").append(searchRequestContext.totalHits()).append("], "); + } else { + sb.append("total_hits[-1]"); + } + sb.append("search_type[").append(context.getRequest().searchType()).append("], "); + sb.append("shards[").append(searchRequestContext.formattedShardStats()).append("], "); + if (context.getRequest().source() != null) { + sb.append("source[").append(context.getRequest().source().toString(FORMAT_PARAMS)).append("], "); + } else { + sb.append("source[], "); + } + if (context.getTask().getHeader(Task.X_OPAQUE_ID) != null) { + sb.append("id[").append(context.getTask().getHeader(Task.X_OPAQUE_ID)).append("]"); + } else { + sb.append("id[]"); + } + return sb.toString(); + } + + private static String escapeJson(String text) { + byte[] sourceEscaped = JsonStringEncoder.getInstance().quoteAsUTF8(text); + return new String(sourceEscaped, UTF_8); + } + } + + void setWarnThreshold(TimeValue warnThreshold) { + this.warnThreshold = warnThreshold.nanos(); + } + + void setInfoThreshold(TimeValue infoThreshold) { + this.infoThreshold = infoThreshold.nanos(); + } + + void setDebugThreshold(TimeValue debugThreshold) { + this.debugThreshold = debugThreshold.nanos(); + } + + void setTraceThreshold(TimeValue traceThreshold) { + this.traceThreshold = traceThreshold.nanos(); + } + + void setLevel(SlowLogLevel level) { + this.level = level; + } + + protected long getWarnThreshold() { + return warnThreshold; + } + + protected long getInfoThreshold() { + return infoThreshold; + } + + protected long getDebugThreshold() { + return debugThreshold; + } + + protected long getTraceThreshold() { + return traceThreshold; + } + + SlowLogLevel getLevel() { + return level; + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java index 6b7c94ec3037a..2813c41e043ee 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestStats.java @@ -51,7 +51,7 @@ public void onPhaseStart(SearchPhaseContext context) { } @Override - public void onPhaseEnd(SearchPhaseContext context) { + public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { StatsHolder phaseStats = phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()); phaseStats.current.dec(); phaseStats.total.inc(); diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 16b7e4810b130..62886f7e9d981 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -67,7 +67,6 @@ import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.indices.breaker.CircuitBreakerService; @@ -188,6 +187,7 @@ public class TransportSearchAction extends HandledTransportAction) SearchRequest::new); @@ -226,6 +227,7 @@ public TransportSearchAction( this.isRequestStatsEnabled = clusterService.getClusterSettings().get(SEARCH_REQUEST_STATS_ENABLED); clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setIsRequestStatsEnabled); this.searchRequestStats = searchRequestStats; + this.searchRequestSlowLog = searchRequestSlowLog; this.metricsRegistry = metricsRegistry; this.searchQueryMetricsEnabled = clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING); clusterService.getClusterSettings() @@ -334,10 +336,6 @@ public void setPhaseTook(boolean phaseTook) { this.phaseTook = phaseTook; } - public boolean isPhaseTook() { - return phaseTook; - } - SearchResponse.PhaseTook getPhaseTook() { if (phaseTook) { Map phaseTookMap = new HashMap<>(); @@ -357,7 +355,7 @@ SearchResponse.PhaseTook getPhaseTook() { public void onPhaseStart(SearchPhaseContext context) {} @Override - public void onPhaseEnd(SearchPhaseContext context) { + public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { phaseStatsMap.put( context.getCurrentPhase().getSearchPhaseName(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos()) @@ -428,7 +426,7 @@ public AbstractSearchAsyncAction asyncSearchAction( boolean preFilter, ThreadPool threadPool, SearchResponse.Clusters clusters, - SearchRequestOperationsListener searchRequestOperationsListener + SearchRequestContext searchRequestContext ) { return new AbstractSearchAsyncAction( actionName, @@ -448,7 +446,7 @@ public AbstractSearchAsyncAction asyncSearchAction( new ArraySearchPhaseResults<>(shardsIts.size()), searchRequest.getMaxConcurrentShardRequests(), clusters, - searchRequestOperationsListener + searchRequestContext ) { @Override protected void executePhaseOnShard( @@ -494,13 +492,10 @@ private void executeRequest( ); final List searchListenersList = createSearchListenerList(originalSearchRequest, timeProvider); - - final SearchRequestOperationsListener searchRequestOperationsListener; - if (!CollectionUtils.isEmpty(searchListenersList)) { - searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); - } else { - searchRequestOperationsListener = null; - } + SearchRequestContext searchRequestContext = new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger) + ); + searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext); PipelinedRequest searchRequest; ActionListener listener; @@ -527,7 +522,7 @@ private void executeRequest( timeProvider, searchAsyncActionProvider, listener, - searchRequestOperationsListener + searchRequestContext ); if (sr.source() == null) { rewriteListener.onResponse(sr.source()); @@ -548,7 +543,7 @@ private ActionListener buildRewriteListener( SearchTimeProvider timeProvider, SearchAsyncActionProvider searchAsyncActionProvider, ActionListener listener, - SearchRequestOperationsListener searchRequestOperationsListener + SearchRequestContext searchRequestContext ) { return ActionListener.wrap(source -> { if (source != searchRequest.source()) { @@ -581,7 +576,7 @@ private ActionListener buildRewriteListener( listener, searchContext, searchAsyncActionProvider, - searchRequestOperationsListener + searchRequestContext ); } else { if (shouldMinimizeRoundtrips(searchRequest)) { @@ -603,7 +598,7 @@ private ActionListener buildRewriteListener( l, searchContext, searchAsyncActionProvider, - searchRequestOperationsListener + searchRequestContext ) ); } else { @@ -654,7 +649,7 @@ private ActionListener buildRewriteListener( new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()), searchContext, searchAsyncActionProvider, - searchRequestOperationsListener + searchRequestContext ); }, listener::onFailure) ); @@ -925,7 +920,7 @@ private void executeLocalSearch( ActionListener listener, SearchContextId searchContext, SearchAsyncActionProvider searchAsyncActionProvider, - SearchRequestOperationsListener searchRequestOperationsListener + SearchRequestContext searchRequestContext ) { executeSearch( (SearchTask) task, @@ -940,7 +935,7 @@ private void executeLocalSearch( SearchResponse.Clusters.EMPTY, searchContext, searchAsyncActionProvider, - searchRequestOperationsListener + searchRequestContext ); } @@ -1059,7 +1054,7 @@ private void executeSearch( SearchResponse.Clusters clusters, @Nullable SearchContextId searchContext, SearchAsyncActionProvider searchAsyncActionProvider, - SearchRequestOperationsListener searchRequestOperationsListener + SearchRequestContext searchRequestContext ) { clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name @@ -1161,7 +1156,7 @@ private void executeSearch( preFilterSearchShards, threadPool, clusters, - searchRequestOperationsListener + searchRequestContext ).start(); } @@ -1245,7 +1240,7 @@ AbstractSearchAsyncAction asyncSearchAction( boolean preFilter, ThreadPool threadPool, SearchResponse.Clusters clusters, - SearchRequestOperationsListener searchRequestOperationsListener + SearchRequestContext searchRequestContext ); } @@ -1268,6 +1263,13 @@ private List createSearchListenerList(SearchReq searchListenersList.add(timeProvider); } + if (searchRequestSlowLog.getWarnThreshold() >= 0 + || searchRequestSlowLog.getInfoThreshold() >= 0 + || searchRequestSlowLog.getDebugThreshold() >= 0 + || searchRequestSlowLog.getTraceThreshold() >= 0) { + searchListenersList.add(searchRequestSlowLog); + } + return searchListenersList; } @@ -1286,7 +1288,7 @@ private AbstractSearchAsyncAction searchAsyncAction boolean preFilter, ThreadPool threadPool, SearchResponse.Clusters clusters, - SearchRequestOperationsListener searchRequestOperationsListener + SearchRequestContext searchRequestContext ) { if (preFilter) { return new CanMatchPreFilterSearchPhase( @@ -1319,7 +1321,7 @@ private AbstractSearchAsyncAction searchAsyncAction false, threadPool, clusters, - searchRequestOperationsListener + searchRequestContext ); return new SearchPhase(action.getName()) { @Override @@ -1329,7 +1331,7 @@ public void run() { }; }, clusters, - searchRequestOperationsListener + searchRequestContext ); } else { final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults( @@ -1360,7 +1362,7 @@ public void run() { clusterState, task, clusters, - searchRequestOperationsListener + searchRequestContext ); break; case QUERY_THEN_FETCH: @@ -1381,7 +1383,7 @@ public void run() { clusterState, task, clusters, - searchRequestOperationsListener + searchRequestContext ); break; default: diff --git a/server/src/main/java/org/opensearch/index/SlowLogLevel.java b/server/src/main/java/org/opensearch/common/logging/SlowLogLevel.java similarity index 94% rename from server/src/main/java/org/opensearch/index/SlowLogLevel.java rename to server/src/main/java/org/opensearch/common/logging/SlowLogLevel.java index 0a28edd59d491..9f744cceaa14d 100644 --- a/server/src/main/java/org/opensearch/index/SlowLogLevel.java +++ b/server/src/main/java/org/opensearch/common/logging/SlowLogLevel.java @@ -29,7 +29,7 @@ * GitHub history for details. */ -package org.opensearch.index; +package org.opensearch.common.logging; import java.util.Locale; @@ -54,7 +54,7 @@ public static SlowLogLevel parse(String level) { return valueOf(level.toUpperCase(Locale.ROOT)); } - boolean isLevelEnabledFor(SlowLogLevel levelToBeUsed) { + public boolean isLevelEnabledFor(SlowLogLevel levelToBeUsed) { // example: this.info(2) tries to log with levelToBeUsed.warn(3) - should allow return this.specificity <= levelToBeUsed.specificity; } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 2bb81064c9c71..a817e63328110 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -35,6 +35,7 @@ import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.opensearch.action.admin.indices.close.TransportCloseIndexAction; import org.opensearch.action.search.CreatePitController; +import org.opensearch.action.search.SearchRequestSlowLog; import org.opensearch.action.search.TransportSearchAction; import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.support.DestructiveOperations; @@ -681,6 +682,13 @@ public void apply(Settings value, Settings current, Settings previous) { TaskCancellationMonitoringSettings.IS_ENABLED_SETTING, TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING, + // Search request slow log settings + SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING, + SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING, + SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING, + SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING, + SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL, + // Remote cluster state settings RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexingSlowLog.java b/server/src/main/java/org/opensearch/index/IndexingSlowLog.java index 2ef4552103008..bc42fa304e2c5 100644 --- a/server/src/main/java/org/opensearch/index/IndexingSlowLog.java +++ b/server/src/main/java/org/opensearch/index/IndexingSlowLog.java @@ -38,6 +38,7 @@ import org.opensearch.common.Booleans; import org.opensearch.common.logging.Loggers; import org.opensearch.common.logging.OpenSearchLogMessage; +import org.opensearch.common.logging.SlowLogLevel; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.unit.TimeValue; diff --git a/server/src/main/java/org/opensearch/index/SearchSlowLog.java b/server/src/main/java/org/opensearch/index/SearchSlowLog.java index d8bcd6c848cef..cfdc2cf348d4d 100644 --- a/server/src/main/java/org/opensearch/index/SearchSlowLog.java +++ b/server/src/main/java/org/opensearch/index/SearchSlowLog.java @@ -38,6 +38,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.common.logging.Loggers; import org.opensearch.common.logging.OpenSearchLogMessage; +import org.opensearch.common.logging.SlowLogLevel; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.unit.TimeValue; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index e80b768074fc7..ca11ef3b1c261 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -46,6 +46,7 @@ import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; +import org.opensearch.action.search.SearchRequestSlowLog; import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.SearchTransportService; import org.opensearch.action.support.TransportAction; @@ -792,6 +793,7 @@ protected Node( ); final SearchRequestStats searchRequestStats = new SearchRequestStats(); + final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); final IndicesService indicesService = new IndicesService( @@ -1263,6 +1265,7 @@ protected Node( b.bind(IdentityService.class).toInstance(identityService); b.bind(Tracer.class).toInstance(tracer); b.bind(SearchRequestStats.class).toInstance(searchRequestStats); + b.bind(SearchRequestSlowLog.class).toInstance(searchRequestSlowLog); b.bind(MetricsRegistry.class).toInstance(metricsRegistry); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index edac50813e191..e17fbab32a12e 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -175,7 +175,7 @@ private AbstractSearchAsyncAction createAction( results, request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - null + new SearchRequestContext() ) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, SearchPhaseContext context) { @@ -710,7 +710,7 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct null, task, SearchResponse.Clusters.EMPTY, - new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger) + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger)) ); } @@ -760,7 +760,7 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction( null, task, SearchResponse.Clusters.EMPTY, - new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger) + new SearchRequestContext(new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger)) ) { @Override ShardSearchFailure[] buildShardFailures() { diff --git a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 43029fe57d5dd..4ed4797efe604 100644 --- a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -137,7 +137,7 @@ public void run() throws IOException { } }, SearchResponse.Clusters.EMPTY, - null + new SearchRequestContext() ); canMatchPhase.start(); @@ -229,7 +229,7 @@ public void run() throws IOException { } }, SearchResponse.Clusters.EMPTY, - null + new SearchRequestContext() ); canMatchPhase.start(); @@ -320,7 +320,7 @@ public void sendCanMatch( new ArraySearchPhaseResults<>(iter.size()), randomIntBetween(1, 32), SearchResponse.Clusters.EMPTY, - null + new SearchRequestContext() ) { @Override @@ -348,7 +348,7 @@ protected void executePhaseOnShard( } }, SearchResponse.Clusters.EMPTY, - null + new SearchRequestContext() ); canMatchPhase.start(); @@ -433,7 +433,7 @@ public void run() { } }, SearchResponse.Clusters.EMPTY, - null + new SearchRequestContext() ); canMatchPhase.start(); @@ -533,7 +533,7 @@ public void run() { } }, SearchResponse.Clusters.EMPTY, - null + new SearchRequestContext() ); canMatchPhase.start(); diff --git a/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java index b5e1050b968ee..04a00a09dcbc4 100644 --- a/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/opensearch/action/search/MockSearchPhaseContext.java @@ -65,11 +65,16 @@ public final class MockSearchPhaseContext implements SearchPhaseContext { final List failures = Collections.synchronizedList(new ArrayList<>()); SearchTransportService searchTransport; final Set releasedSearchContexts = new HashSet<>(); - final SearchRequest searchRequest = new SearchRequest(); + final SearchRequest searchRequest; final AtomicReference searchResponse = new AtomicReference<>(); public MockSearchPhaseContext(int numShards) { + this(numShards, new SearchRequest()); + } + + public MockSearchPhaseContext(int numShards, SearchRequest searchRequest) { this.numShards = numShards; + this.searchRequest = searchRequest; numSuccess = new AtomicInteger(numShards); } diff --git a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java index 4b94b6589c6c8..7b4fa1d8387df 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java @@ -136,7 +136,7 @@ public void testSkipSearchShards() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - null + new SearchRequestContext() ) { @Override @@ -255,7 +255,7 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - null + new SearchRequestContext() ) { @Override @@ -373,7 +373,7 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - null + new SearchRequestContext() ) { TestSearchResponse response = new TestSearchResponse(); @@ -496,7 +496,7 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - null + new SearchRequestContext() ) { TestSearchResponse response = new TestSearchResponse(); @@ -610,7 +610,7 @@ public void testAllowPartialResults() throws InterruptedException { new ArraySearchPhaseResults<>(shardsIter.size()), request.getMaxConcurrentShardRequests(), SearchResponse.Clusters.EMPTY, - null + new SearchRequestContext() ) { @Override protected void executePhaseOnShard( diff --git a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 6a22a7ea2b5e4..a8c0c43ac5080 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -215,7 +215,7 @@ public void sendExecuteQuery( null, task, SearchResponse.Clusters.EMPTY, - null + new SearchRequestContext() ) { @Override protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java index ef880043e863c..a53c35a8401b3 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java @@ -34,7 +34,7 @@ public void onPhaseStart(SearchPhaseContext context) { } @Override - public void onPhaseEnd(SearchPhaseContext context) { + public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); searchPhaseMap.get(context.getCurrentPhase().getSearchPhaseName()).total.inc(); } diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java new file mode 100644 index 0000000000000..e23f08c9415eb --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestSlowLogTests.java @@ -0,0 +1,410 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.action.search; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.lucene.search.TotalHits; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.logging.Loggers; +import org.opensearch.common.logging.MockAppender; +import org.opensearch.common.logging.SlowLogLevel; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SearchRequestSlowLogTests extends OpenSearchTestCase { + static MockAppender appender; + static Logger logger = LogManager.getLogger(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_PREFIX + ".SearchRequestSlowLog"); + + @BeforeClass + public static void init() throws IllegalAccessException { + appender = new MockAppender("trace_appender"); + appender.start(); + Loggers.addAppender(logger, appender); + } + + @AfterClass + public static void cleanup() { + Loggers.removeAppender(logger, appender); + appender.stop(); + } + + public void testMultipleSlowLoggersUseSingleLog4jLogger() { + LoggerContext context = (LoggerContext) LogManager.getContext(false); + + SearchPhaseContext searchPhaseContext1 = new MockSearchPhaseContext(1); + ClusterService clusterService1 = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestSlowLog searchRequestSlowLog1 = new SearchRequestSlowLog(clusterService1); + int numberOfLoggersBefore = context.getLoggers().size(); + + SearchPhaseContext searchPhaseContext2 = new MockSearchPhaseContext(1); + ClusterService clusterService2 = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null + ); + SearchRequestSlowLog searchRequestSlowLog2 = new SearchRequestSlowLog(clusterService2); + + int numberOfLoggersAfter = context.getLoggers().size(); + assertThat(numberOfLoggersAfter, equalTo(numberOfLoggersBefore)); + } + + public void testOnRequestEnd() throws InterruptedException { + final Logger logger = mock(Logger.class); + final SearchRequestContext searchRequestContext = mock(SearchRequestContext.class); + final SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); + final SearchRequest searchRequest = mock(SearchRequest.class); + final SearchTask searchTask = mock(SearchTask.class); + + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING.getKey(), "0ms"); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING.getKey(), "0ms"); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING.getKey(), "0ms"); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, logger); + final List searchListenersList = new ArrayList<>(List.of(searchRequestSlowLog)); + + when(searchRequestContext.getSearchRequestOperationsListener()).thenReturn( + new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger) + ); + when(searchRequestContext.getAbsoluteStartNanos()).thenReturn(System.nanoTime() - 1L); + when(searchPhaseContext.getRequest()).thenReturn(searchRequest); + when(searchPhaseContext.getTask()).thenReturn(searchTask); + when(searchRequest.searchType()).thenReturn(SearchType.QUERY_THEN_FETCH); + + searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(searchPhaseContext, searchRequestContext); + + verify(logger, never()).warn(any(SearchRequestSlowLog.SearchRequestSlowLogMessage.class)); + verify(logger, times(1)).info(any(SearchRequestSlowLog.SearchRequestSlowLogMessage.class)); + verify(logger, never()).debug(any(SearchRequestSlowLog.SearchRequestSlowLogMessage.class)); + verify(logger, never()).trace(any(SearchRequestSlowLog.SearchRequestSlowLogMessage.class)); + } + + public void testConcurrentOnRequestEnd() throws InterruptedException { + final Logger logger = mock(Logger.class); + final SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); + final SearchRequest searchRequest = mock(SearchRequest.class); + final SearchTask searchTask = mock(SearchTask.class); + + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING.getKey(), "-1"); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING.getKey(), "10s"); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING.getKey(), "-1"); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING.getKey(), "-1"); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService, logger); + final List searchListenersList = new ArrayList<>(List.of(searchRequestSlowLog)); + + when(searchPhaseContext.getRequest()).thenReturn(searchRequest); + when(searchPhaseContext.getTask()).thenReturn(searchTask); + when(searchRequest.searchType()).thenReturn(SearchType.QUERY_THEN_FETCH); + + int numRequests = 50; + int numRequestsLogged = randomIntBetween(0, 50); + Thread[] threads = new Thread[numRequests]; + Phaser phaser = new Phaser(numRequests + 1); + CountDownLatch countDownLatch = new CountDownLatch(numRequests); + + // create a list of SearchRequestContexts + // each SearchRequestContext contains unique composite SearchRequestOperationsListener + ArrayList searchRequestContexts = new ArrayList<>(); + for (int i = 0; i < numRequests; i++) { + SearchRequestContext searchRequestContext = new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger) + ); + searchRequestContext.setAbsoluteStartNanos((i < numRequestsLogged) ? 0 : System.nanoTime()); + searchRequestContexts.add(searchRequestContext); + } + + for (int i = 0; i < numRequests; i++) { + int finalI = i; + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + SearchRequestContext thisContext = searchRequestContexts.get(finalI); + thisContext.getSearchRequestOperationsListener().onRequestEnd(searchPhaseContext, thisContext); + countDownLatch.countDown(); + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + + verify(logger, never()).warn(any(SearchRequestSlowLog.SearchRequestSlowLogMessage.class)); + verify(logger, times(numRequestsLogged)).info(any(SearchRequestSlowLog.SearchRequestSlowLogMessage.class)); + verify(logger, never()).debug(any(SearchRequestSlowLog.SearchRequestSlowLogMessage.class)); + verify(logger, never()).trace(any(SearchRequestSlowLog.SearchRequestSlowLogMessage.class)); + } + + public void testSearchRequestSlowLogHasJsonFields_EmptySearchRequestContext() throws IOException { + SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); + SearchRequest searchRequest = new SearchRequest().source(source); + SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); + SearchRequestContext searchRequestContext = new SearchRequestContext(); + SearchRequestSlowLog.SearchRequestSlowLogMessage p = new SearchRequestSlowLog.SearchRequestSlowLogMessage( + searchPhaseContext, + 10, + searchRequestContext + ); + + assertThat(p.getValueFor("took"), equalTo("10nanos")); + assertThat(p.getValueFor("took_millis"), equalTo("0")); + assertThat(p.getValueFor("phase_took"), equalTo("{}")); + assertThat(p.getValueFor("total_hits"), equalTo("-1")); + assertThat(p.getValueFor("search_type"), equalTo("QUERY_THEN_FETCH")); + assertThat(p.getValueFor("shards"), equalTo("")); + assertThat(p.getValueFor("source"), equalTo("{\\\"query\\\":{\\\"match_all\\\":{\\\"boost\\\":1.0}}}")); + assertThat(p.getValueFor("id"), equalTo(null)); + } + + public void testSearchRequestSlowLogHasJsonFields_NotEmptySearchRequestContext() throws IOException { + SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); + SearchRequest searchRequest = new SearchRequest().source(source); + SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); + SearchRequestContext searchRequestContext = new SearchRequestContext(); + searchRequestContext.updatePhaseTookMap(SearchPhaseName.FETCH.getName(), 10L); + searchRequestContext.updatePhaseTookMap(SearchPhaseName.QUERY.getName(), 50L); + searchRequestContext.updatePhaseTookMap(SearchPhaseName.EXPAND.getName(), 5L); + searchRequestContext.setTotalHits(new TotalHits(3L, TotalHits.Relation.EQUAL_TO)); + searchRequestContext.setShardStats(10, 8, 1, 1); + SearchRequestSlowLog.SearchRequestSlowLogMessage p = new SearchRequestSlowLog.SearchRequestSlowLogMessage( + searchPhaseContext, + 10, + searchRequestContext + ); + + assertThat(p.getValueFor("took"), equalTo("10nanos")); + assertThat(p.getValueFor("took_millis"), equalTo("0")); + assertThat(p.getValueFor("phase_took"), equalTo("{expand=5, fetch=10, query=50}")); + assertThat(p.getValueFor("total_hits"), equalTo("3 hits")); + assertThat(p.getValueFor("search_type"), equalTo("QUERY_THEN_FETCH")); + assertThat(p.getValueFor("shards"), equalTo("{total:10, successful:8, skipped:1, failed:1}")); + assertThat(p.getValueFor("source"), equalTo("{\\\"query\\\":{\\\"match_all\\\":{\\\"boost\\\":1.0}}}")); + assertThat(p.getValueFor("id"), equalTo(null)); + } + + public void testSearchRequestSlowLogHasJsonFields_PartialContext() throws IOException { + SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); + SearchRequest searchRequest = new SearchRequest().source(source); + SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); + SearchRequestContext searchRequestContext = new SearchRequestContext(); + searchRequestContext.updatePhaseTookMap(SearchPhaseName.FETCH.getName(), 10L); + searchRequestContext.updatePhaseTookMap(SearchPhaseName.QUERY.getName(), 50L); + searchRequestContext.updatePhaseTookMap(SearchPhaseName.EXPAND.getName(), 5L); + searchRequestContext.setTotalHits(new TotalHits(3L, TotalHits.Relation.EQUAL_TO)); + searchRequestContext.setShardStats(5, 3, 1, 1); + SearchRequestSlowLog.SearchRequestSlowLogMessage p = new SearchRequestSlowLog.SearchRequestSlowLogMessage( + searchPhaseContext, + 10000000000L, + searchRequestContext + ); + + assertThat(p.getValueFor("took"), equalTo("10s")); + assertThat(p.getValueFor("took_millis"), equalTo("10000")); + assertThat(p.getValueFor("phase_took"), equalTo("{expand=5, fetch=10, query=50}")); + assertThat(p.getValueFor("total_hits"), equalTo("3 hits")); + assertThat(p.getValueFor("search_type"), equalTo("QUERY_THEN_FETCH")); + assertThat(p.getValueFor("shards"), equalTo("{total:5, successful:3, skipped:1, failed:1}")); + assertThat(p.getValueFor("source"), equalTo("{\\\"query\\\":{\\\"match_all\\\":{\\\"boost\\\":1.0}}}")); + assertThat(p.getValueFor("id"), equalTo(null)); + } + + public void testSearchRequestSlowLogSearchContextPrinterToLog() throws IOException { + SearchSourceBuilder source = SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()); + SearchRequest searchRequest = new SearchRequest().source(source); + SearchPhaseContext searchPhaseContext = new MockSearchPhaseContext(1, searchRequest); + SearchRequestContext searchRequestContext = new SearchRequestContext(); + searchRequestContext.updatePhaseTookMap(SearchPhaseName.FETCH.getName(), 10L); + searchRequestContext.updatePhaseTookMap(SearchPhaseName.QUERY.getName(), 50L); + searchRequestContext.updatePhaseTookMap(SearchPhaseName.EXPAND.getName(), 5L); + searchRequestContext.setTotalHits(new TotalHits(3L, TotalHits.Relation.EQUAL_TO)); + searchRequestContext.setShardStats(10, 8, 1, 1); + SearchRequestSlowLog.SearchRequestSlowLogMessage p = new SearchRequestSlowLog.SearchRequestSlowLogMessage( + searchPhaseContext, + 100000, + searchRequestContext + ); + + assertThat(p.getFormattedMessage(), startsWith("took[100micros]")); + assertThat(p.getFormattedMessage(), containsString("took_millis[0]")); + assertThat(p.getFormattedMessage(), containsString("phase_took_millis[{expand=5, fetch=10, query=50}]")); + assertThat(p.getFormattedMessage(), containsString("total_hits[3 hits]")); + assertThat(p.getFormattedMessage(), containsString("search_type[QUERY_THEN_FETCH]")); + assertThat(p.getFormattedMessage(), containsString("shards[{total:10, successful:8, skipped:1, failed:1}]")); + assertThat(p.getFormattedMessage(), containsString("source[{\"query\":{\"match_all\":{\"boost\":1.0}}}]")); + // Makes sure that output doesn't contain any new lines + assertThat(p.getFormattedMessage(), not(containsString("\n"))); + assertThat(p.getFormattedMessage(), endsWith("id[]")); + } + + public void testLevelSettingWarn() { + SlowLogLevel level = SlowLogLevel.WARN; + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL.getKey(), level); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + assertEquals(level, searchRequestSlowLog.getLevel()); + } + + public void testLevelSettingDebug() { + String level = "DEBUG"; + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL.getKey(), level); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + assertEquals(level, searchRequestSlowLog.getLevel().toString()); + } + + public void testLevelSettingFail() { + String level = "NOT A LEVEL"; + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL.getKey(), level); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + + try { + new SearchRequestSlowLog(clusterService); + fail(); + } catch (IllegalArgumentException ex) { + final String expected = "No enum constant org.opensearch.common.logging.SlowLogLevel.NOT A LEVEL"; + assertThat(ex, hasToString(containsString(expected))); + assertThat(ex, instanceOf(IllegalArgumentException.class)); + } + } + + public void testSetThresholds() { + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING.getKey(), "400ms"); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING.getKey(), "300ms"); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING.getKey(), "200ms"); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING.getKey(), "100ms"); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + assertEquals(TimeValue.timeValueMillis(400).nanos(), searchRequestSlowLog.getWarnThreshold()); + assertEquals(TimeValue.timeValueMillis(300).nanos(), searchRequestSlowLog.getInfoThreshold()); + assertEquals(TimeValue.timeValueMillis(200).nanos(), searchRequestSlowLog.getDebugThreshold()); + assertEquals(TimeValue.timeValueMillis(100).nanos(), searchRequestSlowLog.getTraceThreshold()); + assertEquals(SlowLogLevel.TRACE, searchRequestSlowLog.getLevel()); + } + + public void testSetThresholdsUnits() { + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING.getKey(), "400s"); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_INFO_SETTING.getKey(), "300ms"); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING.getKey(), "200micros"); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_TRACE_SETTING.getKey(), "100nanos"); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + assertEquals(TimeValue.timeValueSeconds(400).nanos(), searchRequestSlowLog.getWarnThreshold()); + assertEquals(TimeValue.timeValueMillis(300).nanos(), searchRequestSlowLog.getInfoThreshold()); + assertEquals(TimeValue.timeValueNanos(200000).nanos(), searchRequestSlowLog.getDebugThreshold()); + assertEquals(TimeValue.timeValueNanos(100).nanos(), searchRequestSlowLog.getTraceThreshold()); + assertEquals(SlowLogLevel.TRACE, searchRequestSlowLog.getLevel()); + } + + public void testSetThresholdsDefaults() { + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING.getKey(), "400ms"); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_DEBUG_SETTING.getKey(), "200ms"); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + assertEquals(TimeValue.timeValueMillis(400).nanos(), searchRequestSlowLog.getWarnThreshold()); + assertEquals(TimeValue.timeValueMillis(-1).nanos(), searchRequestSlowLog.getInfoThreshold()); + assertEquals(TimeValue.timeValueMillis(200).nanos(), searchRequestSlowLog.getDebugThreshold()); + assertEquals(TimeValue.timeValueMillis(-1).nanos(), searchRequestSlowLog.getTraceThreshold()); + assertEquals(SlowLogLevel.TRACE, searchRequestSlowLog.getLevel()); + } + + public void testSetThresholdsError() { + Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.put(SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_THRESHOLD_WARN_SETTING.getKey(), "NOT A TIME VALUE"); + Settings settings = settingsBuilder.build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = new ClusterService(settings, clusterSettings, null); + + try { + new SearchRequestSlowLog(clusterService); + fail(); + } catch (IllegalArgumentException ex) { + final String expected = + "failed to parse setting [cluster.search.request.slowlog.threshold.warn] with value [NOT A TIME VALUE] as a time value"; + assertThat(ex, hasToString(containsString(expected))); + assertThat(ex, instanceOf(IllegalArgumentException.class)); + } + } +} diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java index f24147a8194b4..93cf77933fdd5 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java @@ -50,7 +50,7 @@ public void testSearchRequestStats() { long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName)); - testRequestStats.onPhaseEnd(ctx); + testRequestStats.onPhaseEnd(ctx, new SearchRequestContext()); assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); assertEquals(1, testRequestStats.getPhaseTotal(searchPhaseName)); assertThat(testRequestStats.getPhaseMetric(searchPhaseName), greaterThanOrEqualTo(tookTimeInMillis)); @@ -102,7 +102,7 @@ public void testSearchRequestStatsOnPhaseEndConcurrently() throws InterruptedExc for (int i = 0; i < numTasks; i++) { threads[i] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); - testRequestStats.onPhaseEnd(ctx); + testRequestStats.onPhaseEnd(ctx, new SearchRequestContext()); countDownLatch.countDown(); }); threads[i].start(); diff --git a/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java b/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java index f0f1a43e6c21e..4d8a44417a3ee 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchTimeProviderTests.java @@ -47,7 +47,7 @@ public void testSearchTimeProviderPhaseEnd() { long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); assertNull(testTimeProvider.getPhaseTookTime(searchPhaseName)); - testTimeProvider.onPhaseEnd(ctx); + testTimeProvider.onPhaseEnd(ctx, new SearchRequestContext()); assertThat(testTimeProvider.getPhaseTookTime(searchPhaseName), greaterThanOrEqualTo(tookTimeInMillis)); } } diff --git a/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java b/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java index b2df78b473785..2d1c633f95ccb 100644 --- a/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java +++ b/server/src/test/java/org/opensearch/index/IndexingSlowLogTests.java @@ -44,6 +44,7 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.logging.Loggers; import org.opensearch.common.logging.MockAppender; +import org.opensearch.common.logging.SlowLogLevel; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.json.JsonXContent; @@ -412,7 +413,7 @@ public void testLevelSetting() { assertNotNull(ex.getCause()); assertThat(ex.getCause(), instanceOf(IllegalArgumentException.class)); final IllegalArgumentException cause = (IllegalArgumentException) ex.getCause(); - assertThat(cause, hasToString(containsString("No enum constant org.opensearch.index.SlowLogLevel.NOT A LEVEL"))); + assertThat(cause, hasToString(containsString("No enum constant org.opensearch.common.logging.SlowLogLevel.NOT A LEVEL"))); } assertEquals(SlowLogLevel.TRACE, log.getLevel()); diff --git a/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java b/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java index 7835916e14581..0c0dec29c9dbf 100644 --- a/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java +++ b/server/src/test/java/org/opensearch/index/SearchSlowLogTests.java @@ -41,6 +41,7 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.logging.Loggers; import org.opensearch.common.logging.MockAppender; +import org.opensearch.common.logging.SlowLogLevel; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; @@ -337,7 +338,7 @@ public void testLevelSetting() { assertNotNull(ex.getCause()); assertThat(ex.getCause(), instanceOf(IllegalArgumentException.class)); final IllegalArgumentException cause = (IllegalArgumentException) ex.getCause(); - assertThat(cause, hasToString(containsString("No enum constant org.opensearch.index.SlowLogLevel.NOT A LEVEL"))); + assertThat(cause, hasToString(containsString("No enum constant org.opensearch.common.logging.SlowLogLevel.NOT A LEVEL"))); } assertEquals(SlowLogLevel.TRACE, log.getLevel()); diff --git a/server/src/test/java/org/opensearch/index/SlowLogLevelTests.java b/server/src/test/java/org/opensearch/index/SlowLogLevelTests.java index 290af7360ce52..d0da07f7a7018 100644 --- a/server/src/test/java/org/opensearch/index/SlowLogLevelTests.java +++ b/server/src/test/java/org/opensearch/index/SlowLogLevelTests.java @@ -32,6 +32,7 @@ package org.opensearch.index; +import org.opensearch.common.logging.SlowLogLevel; import org.opensearch.test.OpenSearchTestCase; public class SlowLogLevelTests extends OpenSearchTestCase { diff --git a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java index c27e4bf27327a..2ebb033899698 100644 --- a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java +++ b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java @@ -35,6 +35,7 @@ import org.opensearch.action.search.SearchPhase; import org.opensearch.action.search.SearchPhaseContext; import org.opensearch.action.search.SearchPhaseName; +import org.opensearch.action.search.SearchRequestContext; import org.opensearch.action.search.SearchRequestStats; import org.opensearch.index.search.stats.SearchStats.Stats; import org.opensearch.test.OpenSearchTestCase; @@ -85,7 +86,7 @@ public void testShardLevelSearchGroupStats() throws Exception { when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); for (int iterator = 0; iterator < paramValue; iterator++) { testRequestStats.onPhaseStart(ctx); - testRequestStats.onPhaseEnd(ctx); + testRequestStats.onPhaseEnd(ctx, new SearchRequestContext()); } } searchStats1.setSearchRequestStats(testRequestStats); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index b7a2baacba611..3d30c30e4e55f 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -90,6 +90,7 @@ import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchRequestSlowLog; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchTransportService; import org.opensearch.action.search.TransportSearchAction; @@ -2309,6 +2310,7 @@ public void onFailure(final Exception e) { client ), null, + new SearchRequestSlowLog(clusterService), NoopMetricsRegistry.INSTANCE ) );