From 1f3092e9569d92a156e4b3fb9760496e2bd3496d Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Wed, 6 Dec 2023 16:18:28 -0800 Subject: [PATCH] Top N Queries Implementation and Basic Query Insight Framework Signed-off-by: Chenyang Ji --- .../org/opensearch/action/ActionModule.java | 9 + .../admin/cluster/insights/package-info.java | 10 + .../insights/top_queries/TopQueries.java | 76 ++++++ .../top_queries/TopQueriesAction.java | 27 +++ .../top_queries/TopQueriesRequest.java | 96 ++++++++ .../top_queries/TopQueriesRequestBuilder.java | 38 +++ .../top_queries/TopQueriesResponse.java | 113 +++++++++ .../TransportTopQueriesAction.java | 120 +++++++++ .../insights/top_queries/package-info.java | 10 + .../action/search/QueryInsightExporter.java | 61 +++++ .../QueryInsightLocalIndexExporter.java | 228 ++++++++++++++++++ .../action/search/QueryInsightService.java | 143 +++++++++++ .../search/QueryLatencyInsightService.java | 153 ++++++++++++ .../search/SearchQueryLatencyListener.java | 58 +++++ .../search/SearchQueryLatencyRecord.java | 78 ++++++ .../action/search/SearchQueryRecord.java | 180 ++++++++++++++ .../action/search/TransportSearchAction.java | 10 + .../opensearch/client/ClusterAdminClient.java | 26 ++ .../java/org/opensearch/client/Requests.java | 24 ++ .../client/support/AbstractClient.java | 19 ++ .../common/settings/ClusterSettings.java | 8 +- .../main/java/org/opensearch/node/Node.java | 12 + .../admin/cluster/RestTopQueriesAction.java | 90 +++++++ .../search/mappings/top_n_queries_record.json | 40 +++ .../top_queries/TopQueriesRequestTests.java | 45 ++++ .../insights/top_queries/TopQueriesTests.java | 101 ++++++++ .../SearchQueryLatencyListenerTests.java | 168 +++++++++++++ .../cluster/RestTopQueriesActionTests.java | 62 +++++ .../snapshots/SnapshotResiliencyTests.java | 7 + 29 files changed, 2011 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/insights/package-info.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueries.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesRequestBuilder.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesResponse.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TransportTopQueriesAction.java create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/package-info.java create mode 100644 server/src/main/java/org/opensearch/action/search/QueryInsightExporter.java create mode 100644 server/src/main/java/org/opensearch/action/search/QueryInsightLocalIndexExporter.java create mode 100644 server/src/main/java/org/opensearch/action/search/QueryInsightService.java create mode 100644 server/src/main/java/org/opensearch/action/search/QueryLatencyInsightService.java create mode 100644 server/src/main/java/org/opensearch/action/search/SearchQueryLatencyListener.java create mode 100644 server/src/main/java/org/opensearch/action/search/SearchQueryLatencyRecord.java create mode 100644 server/src/main/java/org/opensearch/action/search/SearchQueryRecord.java create mode 100644 server/src/main/java/org/opensearch/rest/action/admin/cluster/RestTopQueriesAction.java create mode 100644 server/src/main/resources/org/opensearch/action/search/mappings/top_n_queries_record.json create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesRequestTests.java create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesTests.java create mode 100644 server/src/test/java/org/opensearch/action/search/SearchQueryLatencyListenerTests.java create mode 100644 server/src/test/java/org/opensearch/rest/action/admin/cluster/RestTopQueriesActionTests.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 46775466aa615..a2817778b144a 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -48,6 +48,8 @@ import org.opensearch.action.admin.cluster.decommission.awareness.put.TransportDecommissionAction; import org.opensearch.action.admin.cluster.health.ClusterHealthAction; import org.opensearch.action.admin.cluster.health.TransportClusterHealthAction; +import org.opensearch.action.admin.cluster.insights.top_queries.TopQueriesAction; +import org.opensearch.action.admin.cluster.insights.top_queries.TransportTopQueriesAction; import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction; import org.opensearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction; import org.opensearch.action.admin.cluster.node.info.NodesInfoAction; @@ -359,6 +361,7 @@ import org.opensearch.rest.action.admin.cluster.RestRestoreRemoteStoreAction; import org.opensearch.rest.action.admin.cluster.RestRestoreSnapshotAction; import org.opensearch.rest.action.admin.cluster.RestSnapshotsStatusAction; +import org.opensearch.rest.action.admin.cluster.RestTopQueriesAction; import org.opensearch.rest.action.admin.cluster.RestVerifyRepositoryAction; import org.opensearch.rest.action.admin.cluster.dangling.RestDeleteDanglingIndexAction; import org.opensearch.rest.action.admin.cluster.dangling.RestImportDanglingIndexAction; @@ -762,6 +765,9 @@ public void reg actions.register(GetSearchPipelineAction.INSTANCE, GetSearchPipelineTransportAction.class); actions.register(DeleteSearchPipelineAction.INSTANCE, DeleteSearchPipelineTransportAction.class); + // Query Insight Actions + actions.register(TopQueriesAction.INSTANCE, TransportTopQueriesAction.class); + return unmodifiableMap(actions.getRegistry()); } @@ -974,6 +980,9 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestGetDecommissionStateAction()); registerHandler.accept(new RestRemoteStoreStatsAction()); registerHandler.accept(new RestRestoreRemoteStoreAction()); + + // Query insights API + registerHandler.accept(new RestTopQueriesAction()); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/insights/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/insights/package-info.java new file mode 100644 index 0000000000000..392958c22afe8 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/insights/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** insights transport handlers. */ +package org.opensearch.action.admin.cluster.insights; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueries.java b/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueries.java new file mode 100644 index 0000000000000..44db5e30788fb --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueries.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + + +package org.opensearch.action.admin.cluster.insights.top_queries; + +import org.opensearch.action.search.SearchQueryLatencyRecord; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.Nullable; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +/** + * Top Queries by resource usage / latency on a node + *

+ * Mainly used in the top N queries node response workflow. + * + * @opensearch.internal + */ +public class TopQueries extends BaseNodeResponse implements ToXContentObject { + /** The store to keep the top N queries with latency records */ + @Nullable + private final List latencyRecords; + + public TopQueries(StreamInput in) throws IOException { + super(in); + latencyRecords = in.readList(SearchQueryLatencyRecord::new); + } + + public TopQueries( + DiscoveryNode node, + @Nullable List latencyRecords + ) { + super(node); + this.latencyRecords = latencyRecords; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + if (latencyRecords != null) { + for (SearchQueryLatencyRecord record : latencyRecords) { + record.toXContent(builder, params); + } + } + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (latencyRecords != null) { + out.writeList(latencyRecords); + } + } + + /** + * Get all latency records + * + * @return the latency records in this node response + */ + public List getLatencyRecords() { + return latencyRecords; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesAction.java new file mode 100644 index 0000000000000..614abeb370ca8 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesAction.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + + +package org.opensearch.action.admin.cluster.insights.top_queries; + +import org.opensearch.action.ActionType; + +/** + * Transport action for cluster/node level top queries information. + * + * @opensearch.internal + */ +public class TopQueriesAction extends ActionType { + + public static final TopQueriesAction INSTANCE = new TopQueriesAction(); + public static final String NAME = "cluster:monitor/insights/top_queries"; + + private TopQueriesAction() { + super(NAME, TopQueriesResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesRequest.java new file mode 100644 index 0000000000000..20b4987f4b54f --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesRequest.java @@ -0,0 +1,96 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + + +package org.opensearch.action.admin.cluster.insights.top_queries; + +import org.opensearch.action.support.nodes.BaseNodesRequest; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +/** + * A request to get cluster/node level top queries information. + * + * @opensearch.api + */ +@PublicApi(since = "1.0.0") +public class TopQueriesRequest extends BaseNodesRequest { + + Metric metricType = Metric.LATENCY; + + /** + * Create a new TopQueriesRequest from a {@link StreamInput} object. + * + * @param in A stream input object. + * @throws IOException if the stream cannot be deserialized. + */ + public TopQueriesRequest(StreamInput in) throws IOException { + super(in); + setMetricType(in.readString()); + } + + /** + * Get top queries from nodes based on the nodes ids specified. + * If none are passed, cluster level top queries will be returned. + */ + public TopQueriesRequest(String... nodesIds) { + super(nodesIds); + } + + /** + * Get the type of requested metrics + */ + public Metric getMetricType() { + return metricType; + } + + /** + * Set the type of requested metrics + */ + public TopQueriesRequest setMetricType(String metricType) { + metricType = metricType.toUpperCase(); + if (Metric.allMetrics().contains(metricType) == false) { + throw new IllegalStateException("Invalid metric used in top queries request: " + metricType); + } + this.metricType = Metric.valueOf(metricType); + return this; + } + + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(metricType.metricName()); + } + + /** + * ALl supported metrics for top queries + */ + public enum Metric { + LATENCY("LATENCY"); + + private final String metricName; + + Metric(String name) { + this.metricName = name; + } + + public String metricName() { + return this.metricName; + } + + public static Set allMetrics() { + return Arrays.stream(values()).map(Metric::metricName).collect(Collectors.toSet()); + } + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesRequestBuilder.java new file mode 100644 index 0000000000000..e88e9aacc9c9d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesRequestBuilder.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + + +package org.opensearch.action.admin.cluster.insights.top_queries; + +import org.opensearch.action.support.nodes.NodesOperationRequestBuilder; +import org.opensearch.client.OpenSearchClient; +import org.opensearch.common.annotation.PublicApi; + +/** + * Builder class for requesting cluster/node level top queries information. + * + * @opensearch.api + */ +@PublicApi(since = "1.0.0") +public class TopQueriesRequestBuilder extends NodesOperationRequestBuilder { + + public TopQueriesRequestBuilder(OpenSearchClient client, TopQueriesAction action) { + super(client, action, new TopQueriesRequest()); + } + + /** + * set metric type for the request + * + * @param metric Name of metric as a string. + * @return This, for request chaining. + */ + public TopQueriesRequestBuilder setMetricType(String metric) { + request.setMetricType(metric); + return this; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesResponse.java new file mode 100644 index 0000000000000..c9d76cf3a1dd0 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesResponse.java @@ -0,0 +1,113 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + + +package org.opensearch.action.admin.cluster.insights.top_queries; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.search.SearchQueryLatencyRecord; +import org.opensearch.action.support.nodes.BaseNodesResponse; +import org.opensearch.cluster.ClusterName; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Transport response for cluster/node level top queries information. + * + * @opensearch.api + */ +@PublicApi(since = "1.0.0") +public class TopQueriesResponse extends BaseNodesResponse implements ToXContentFragment { + + private static final String CLUSTER_LEVEL_RESULTS_KEY = "top_queries"; + + public TopQueriesResponse(StreamInput in) throws IOException { + super(in); + } + + public TopQueriesResponse(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readList(TopQueries::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeList(nodes); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + List results = getNodes(); + toClusterLevelResult(builder, params, results); + return builder; + } + + @Override + public String toString() { + try { + XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + builder.startObject(); + this.toXContent(builder, EMPTY_PARAMS); + builder.endObject(); + return builder.toString(); + } catch (IOException e) { + return "{ \"error\" : \"" + e.getMessage() + "\"}"; + } + } + + /** + * Merge top n queries results from nodes into cluster level results in XContent format. + * + * @param builder XContent builder + * @param params serialization parameters + * @param results top queries results from all nodes + * @throws IOException if an error occurs + */ + private void toClusterLevelResult(XContentBuilder builder, Params params, List results) throws IOException { + List all_records = results.stream() + .map(TopQueries::getLatencyRecords) + .flatMap(Collection::stream).sorted(Collections.reverseOrder()).collect(Collectors.toList()); + builder.startArray(CLUSTER_LEVEL_RESULTS_KEY); + for (SearchQueryLatencyRecord record : all_records) { + record.toXContent(builder, params); + } + builder.endArray(); + } + + /** + * build node level top n queries results in XContent format. + * + * @param builder XContent builder + * @param params serialization parameters + * @param results top queries results from all nodes + * @throws IOException if an error occurs + */ + private void toNodeLevelResult(XContentBuilder builder, Params params, List results) throws IOException { + builder.startObject(CLUSTER_LEVEL_RESULTS_KEY); + for (TopQueries topQueries : results) { + builder.startArray(topQueries.getNode().getId()); + topQueries.toXContent(builder, params); + builder.endArray(); + } + builder.endObject(); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TransportTopQueriesAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TransportTopQueriesAction.java new file mode 100644 index 0000000000000..62f8750f6e0bb --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/TransportTopQueriesAction.java @@ -0,0 +1,120 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + + +package org.opensearch.action.admin.cluster.insights.top_queries; + +import org.opensearch.OpenSearchException; +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.search.QueryLatencyInsightService; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +/** + * Transport action for cluster/node level top queries information. + * + * @opensearch.internal + */ +public class TransportTopQueriesAction extends TransportNodesAction< + TopQueriesRequest, + TopQueriesResponse, + TransportTopQueriesAction.NodeRequest, + TopQueries> { + + private final QueryLatencyInsightService queryLatencyInsightService; + + @Inject + public TransportTopQueriesAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + QueryLatencyInsightService queryLatencyInsightService, + ActionFilters actionFilters + ) { + super( + TopQueriesAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + TopQueriesRequest::new, + NodeRequest::new, + ThreadPool.Names.GENERIC, + TopQueries.class + ); + this.queryLatencyInsightService = queryLatencyInsightService; + } + + @Override + protected TopQueriesResponse newResponse( + TopQueriesRequest topQueriesRequest, + List responses, + List failures + ) { + return new TopQueriesResponse(clusterService.getClusterName(), responses, failures); + } + + @Override + protected NodeRequest newNodeRequest(TopQueriesRequest request) { + return new NodeRequest(request); + } + + @Override + protected TopQueries newNodeResponse(StreamInput in) throws IOException { + return new TopQueries(in); + } + + @Override + protected TopQueries nodeOperation(NodeRequest nodeRequest) { + TopQueriesRequest request = nodeRequest.request; + if (request.getMetricType() == TopQueriesRequest.Metric.LATENCY) { + return new TopQueries( + clusterService.localNode(), + queryLatencyInsightService.getQueryData() + ); + } else { + throw new OpenSearchException(String.format("invalid metric type %s", request.getMetricType())); + } + + } + + /** + * Inner Node Top Queries Request + * + * @opensearch.internal + */ + public static class NodeRequest extends TransportRequest { + + TopQueriesRequest request; + + public NodeRequest(StreamInput in) throws IOException { + super(in); + request = new TopQueriesRequest(in); + } + + public NodeRequest(TopQueriesRequest request) { + this.request = request; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + request.writeTo(out); + } + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/package-info.java new file mode 100644 index 0000000000000..170b56f2c16a6 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/insights/top_queries/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Top n queries transport handlers. */ +package org.opensearch.action.admin.cluster.insights.top_queries; diff --git a/server/src/main/java/org/opensearch/action/search/QueryInsightExporter.java b/server/src/main/java/org/opensearch/action/search/QueryInsightExporter.java new file mode 100644 index 0000000000000..a323f27c9bbd7 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/QueryInsightExporter.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.common.unit.TimeValue; + +import java.util.List; + +/** + * Simple abstract class to export data collected by search query analyzers + *

+ * Mainly for use within the Query Insight framework + * + * @opensearch.internal + */ +abstract class QueryInsightExporter> { + + private boolean enabled = false; + + /** The export interval of this exporter, default to 60 seconds */ + private TimeValue exportInterval = TimeValue.timeValueSeconds(60); + + /** + * Initial set up the exporter + */ + abstract void setup() throws Exception; + + /** + * Export the data with the exporter. + * + * @param records the data to export + */ + abstract void export(List records) throws Exception; + + public boolean getEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public TimeValue getExportInterval() { + return exportInterval; + } + + /** + * Set the export interval for the exporter. + * + * @param interval export interval + */ + public void setExportInterval(TimeValue interval) { + this.exportInterval = interval; + } +} diff --git a/server/src/main/java/org/opensearch/action/search/QueryInsightLocalIndexExporter.java b/server/src/main/java/org/opensearch/action/search/QueryInsightLocalIndexExporter.java new file mode 100644 index 0000000000000..b65fba1e69e03 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/QueryInsightLocalIndexExporter.java @@ -0,0 +1,228 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Objects; + +/** + * Class to export data collected by search query analyzers to a local OpenSearch index + *

+ * Mainly for use within the Query Insight framework + * + * @opensearch.internal + */ +public class QueryInsightLocalIndexExporter> extends QueryInsightExporter{ + + private static final Logger log = LogManager.getLogger(QueryInsightLocalIndexExporter.class); + + private final ClusterService clusterService; + private final Client client; + + /** The OpenSearch index name to export the data to */ + private final String localIndexName; + + /** The mapping for the local index that holds the data */ + private final InputStream localIndexMapping; + + public QueryInsightLocalIndexExporter( + boolean enabled, + ClusterService clusterService, + Client client, + String localIndexName, + InputStream localIndexMapping) { + this.setEnabled(enabled); + this.clusterService = clusterService; + this.client = client; + this.localIndexName = localIndexName; + this.localIndexMapping = localIndexMapping; + try { + setup(); + } catch (IOException e) { + log.error(String.format("failed to set up with error %s.", e)); + } + } + + @Override + public void setup() throws IOException {} + + + /** + * Export the data to the predefined local OpenSearch Index + * + * @param records the data to export + * @throws IOException if an error occurs + */ + @Override + public synchronized void export(List records) throws IOException { + if (records.size() == 0) { + return; + } + if (checkIfIndexExists()) { + bulkRecord(records); + } else { + // local index not exist + initLocalIndex(new ActionListener<>() { + @Override + public void onResponse(CreateIndexResponse response) { + if (response.isAcknowledged()) { + log.debug(String.format("successfully initialized local index %s for query insight.", localIndexName)); + try { + bulkRecord(records); + } catch (IOException e) { + log.error(String.format("fail to ingest query insight data to local index, error: %s", e)); + } + } + else { + log.error(String.format("request to created local index %s for query insight not acknowledged.", localIndexName)); + } + } + + @Override + public void onFailure(Exception e) { + log.error(String.format("error creating local index for query insight: %s", e)); + } + }); + } + } + + /** + * Util function to check if a local OpenSearch Index exists + * + * @return boolean + */ + private boolean checkIfIndexExists() { + ClusterState clusterState = clusterService.state(); + return clusterState.getRoutingTable().hasIndex(this.localIndexName); + } + + /** + * Initialize the local OpenSearch Index for the exporter + * + * @param listener the listener to be notified upon completion + * @throws IOException if an error occurs + */ + private synchronized void initLocalIndex(ActionListener listener) throws IOException { + CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.localIndexName).mapping( + getIndexMappings() + ).settings(Settings.builder().put("index.hidden", false).build()); + client.admin().indices().create(createIndexRequest, listener); + } + + /** + * Drop the local OpenSearch Index created by the exporter + * + * @param listener the listener to be notified upon completion + * @throws IOException if an error occurs + */ + private synchronized void dropLocalIndex(ActionListener listener) throws IOException { + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(this.localIndexName); + client.admin().indices().delete(deleteIndexRequest, listener); + } + + /** + * Get the index mapping of the local index + * + * @return String to represent the index mapping + * @throws IOException if an error occurs + */ + private String getIndexMappings() throws IOException { + return new String( + Objects.requireNonNull(this.localIndexMapping) + .readAllBytes(), + Charset.defaultCharset() + ); + } + + /** + * Bulk ingest the data into to the predefined local OpenSearch Index + * + * @param records the data to export + * @throws IOException if an error occurs + */ + private synchronized void bulkRecord(List records) throws IOException { + BulkRequest bulkRequest = new BulkRequest() + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).timeout(TimeValue.timeValueSeconds(60)); + for (T record : records) { + bulkRequest.add( + new IndexRequest(localIndexName) + .source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + ); + } + client.bulk(bulkRequest, new ActionListener<>() { + @Override + public void onResponse(BulkResponse response) { + if (response.status().equals(RestStatus.CREATED) || response.status().equals(RestStatus.OK)) { + log.debug(String.format("successfully ingest data for %s! ", localIndexName)); + } + else { + log.error(String.format("error when ingesting data for %s", localIndexName)); + } + } + @Override + public void onFailure(Exception e) { + log.error(String.format("failed to ingest data for %s, %s", localIndexName, e)); + } + }); + } + + /** + * Index one document to the predefined local OpenSearch Index + * + * @param record the document to export + * @throws IOException if an error occurs + */ + private synchronized void indexRecord(T record) throws IOException { + IndexRequest indexRequest = new IndexRequest(localIndexName) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source(record.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .timeout(TimeValue.timeValueSeconds(60)); + + client.index(indexRequest, new ActionListener<>() { + @Override + public void onResponse(IndexResponse response) { + if (response.status().equals(RestStatus.CREATED) || response.status().equals(RestStatus.OK)) { + log.debug(String.format("successfully indexed data for %s ", localIndexName)); + } + else { + log.error(String.format("failed to index data for %s", localIndexName)); + } + } + + @Override + public void onFailure(Exception e) { + log.error(String.format("failed to index data for %s, error: %s", localIndexName, e)); + } + }); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/QueryInsightService.java b/server/src/main/java/org/opensearch/action/search/QueryInsightService.java new file mode 100644 index 0000000000000..dd26850dfb496 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/QueryInsightService.java @@ -0,0 +1,143 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.Nullable; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Service responsible for gathering, analyzing, storing and exporting data related to + * search queries, based on certain dimensions. + * + * @param The type of record that stores in the service + * @param The type of Collection that holds the aggregated data + * @param The type of exporter that exports the aggregated and processed data + * + * @opensearch.internal + */ +public abstract class QueryInsightService, S extends Collection, E extends QueryInsightExporter> + extends AbstractLifecycleComponent { + private static final Logger log = LogManager.getLogger(QueryInsightService.class); + private boolean enabled; + + /** The internal store that holds the query insight data */ + @Nullable + protected S store; + + /** The exporter that exports the query insight data to certain sink */ + @Nullable + protected E exporter; + + /** The internal OpenSearch thread pool that execute async processing and exporting tasks*/ + private final ThreadPool threadPool; + private volatile Scheduler.Cancellable scheduledFuture; + + + public static final String TOP_N_QUERIES_PREFIX = "search.top_n_queries"; + @Inject + public QueryInsightService( + ThreadPool threadPool, + @Nullable S store, + @Nullable E exporter + ) { + this.threadPool = threadPool; + this.store = store; + this.exporter = exporter; + } + + /** + * Ingest one record to the query insight store + * + * @param record the record to ingest + */ + protected void ingestQueryData(R record) { + if (this.store != null) { + this.store.add(record); + } + } + + /** + * Get all records that are in the query insight store, + * By default, return the records in sorted order. + * + * @return List of the records that are in the query insight store + * @throws IllegalArgumentException if query insight is disabled in the cluster + */ + public List getQueryData() throws IllegalArgumentException { + if (!enabled) { + throw new IllegalArgumentException("Cannot get query data when query insight feature is not enabled."); + } + clearOutdatedData(); + List queries = new ArrayList<>(store); + queries.sort(Collections.reverseOrder()); + return queries; + } + + /** + * Clear all outdated data in the store + */ + public abstract void clearOutdatedData(); + + /** + * Clear all data in the store + */ + public void clearAllData() { + store.clear(); + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public boolean getEnabled() { + return this.enabled; + } + + /** + * Start the Query Insight Service. + */ + @Override + protected void doStart() { + if (exporter != null && exporter.getEnabled()) { + scheduledFuture = threadPool.scheduleWithFixedDelay(() -> { + List topQueries = getQueryData(); + try { + exporter.export(topQueries); + log.debug(String.format("finish exporting query insight data to sink %s", topQueries)); + } catch (Exception e) { + throw new RuntimeException(String.format("failed to export query insight data to sink, error: %s", e)); + } + }, exporter.getExportInterval(), ThreadPool.Names.GENERIC); + } + } + + /** + * Stop the Query Insight Service + */ + @Override + protected void doStop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); + } + } + + @Override + protected void doClose() { + } +} diff --git a/server/src/main/java/org/opensearch/action/search/QueryLatencyInsightService.java b/server/src/main/java/org/opensearch/action/search/QueryLatencyInsightService.java new file mode 100644 index 0000000000000..e87b0cd8d5d2f --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/QueryLatencyInsightService.java @@ -0,0 +1,153 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.threadpool.ThreadPool; + +import java.util.Map; +import java.util.concurrent.PriorityBlockingQueue; + +/** + * Service responsible for gathering, analyzing, storing and exporting + * top N queries with high latency data for search queries + * + * @opensearch.internal + */ +public class QueryLatencyInsightService extends QueryInsightService< + SearchQueryLatencyRecord, + PriorityBlockingQueue, + QueryInsightLocalIndexExporter + > { + private static final Logger log = LogManager.getLogger(QueryLatencyInsightService.class); + + /** Default window size in seconds to keep the top N queries with latency data in query insight store */ + private static final int DEFAULT_WINDOW_SIZE = 60; + + /** Default top N size to keep the data in query insight store */ + private static final int DEFAULT_TOP_N_SIZE = 3; + public static final String TOP_N_LATENCY_QUERIES_PREFIX = TOP_N_QUERIES_PREFIX + ".latency"; + public static final Setting TOP_N_LATENCY_QUERIES_ENABLED = Setting.boolSetting( + TOP_N_LATENCY_QUERIES_PREFIX + ".enabled", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + public static final Setting TOP_N_LATENCY_QUERIES_SIZE = Setting.intSetting( + TOP_N_LATENCY_QUERIES_PREFIX + ".top_n_size", + DEFAULT_TOP_N_SIZE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + public static final Setting TOP_N_LATENCY_QUERIES_WINDOW_SIZE = Setting.intSetting( + TOP_N_LATENCY_QUERIES_PREFIX + ".window_size", + DEFAULT_WINDOW_SIZE, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private int topNSize = DEFAULT_TOP_N_SIZE; + + private TimeValue windowSize = TimeValue.timeValueSeconds(DEFAULT_WINDOW_SIZE); + + + @Inject + public QueryLatencyInsightService( + ClusterService clusterService, + Client client, + ThreadPool threadPool + ) { + super( + threadPool, + new PriorityBlockingQueue<>(), + null + ); + + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, this::setEnabled); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_SIZE, this::setTopNSize); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_WINDOW_SIZE, this::setWindowSize); + } + + /** + * Ingest the query data into to the top N queries with latency store + * + * @param timestamp The timestamp of the query. + * @param searchType The manner at which the search operation is executed. see {@link SearchType} + * @param source The search source that was executed by the query. + * @param totalShards Total number of shards as part of the search query across all indices + * @param indices The indices involved in the search query + * @param propertyMap Extra attributes and information about a search query + * @param phaseLatencyMap Contains phase level latency information in a search query + */ + public void ingestQueryData( + final Long timestamp, + final SearchType searchType, + final String source, + final int totalShards, + final String[] indices, + final Map propertyMap, + final Map phaseLatencyMap + ) { + if (timestamp <= 0) { + log.error(String.format("Invalid timestamp %s when ingesting query data to compute top n queries with latency", timestamp)); + return; + } + if (totalShards <= 0) { + log.error(String.format("Invalid totalShards %s when ingesting query data to compute top n queries with latency", totalShards)); + return; + } + super.ingestQueryData(new SearchQueryLatencyRecord( + timestamp, + searchType, + source, + totalShards, + indices, + propertyMap, + phaseLatencyMap + )); + // remove top elements for fix sizing priority queue + if (this.store.size() > this.getTopNSize()) { + this.store.poll(); + } + log.debug(String.format("successfully ingested: %s", this.store)); + } + + @Override + public void clearOutdatedData() { + store.removeIf(record -> + record.getTimestamp() < System.currentTimeMillis() - windowSize.getMillis() + ); + } + + public void setTopNSize(int size) { + this.topNSize = size; + } + + public void setWindowSize(int windowSize) { + this.windowSize = TimeValue.timeValueSeconds(windowSize); + } + + public int getTopNSize() { + return this.topNSize; + } + + public TimeValue getWindowSize() { + return this.windowSize; + } + +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchQueryLatencyListener.java b/server/src/main/java/org/opensearch/action/search/SearchQueryLatencyListener.java new file mode 100644 index 0000000000000..b403b12442234 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchQueryLatencyListener.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.xcontent.ToXContent; + +import java.util.Collections; +import java.util.HashMap; + +/** + * The listener for top N queries by latency + * + * @opensearch.internal + */ +public final class SearchQueryLatencyListener extends SearchRequestOperationsListener { + private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); + private final QueryLatencyInsightService queryLatencyInsightService; + + @Inject + public SearchQueryLatencyListener(QueryLatencyInsightService queryLatencyInsightService) { + this.queryLatencyInsightService = queryLatencyInsightService; + } + + @Override + public void onPhaseStart(SearchPhaseContext context) {} + + @Override + public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {} + + @Override + public void onPhaseFailure(SearchPhaseContext context) {} + + @Override + public void onRequestStart(SearchRequestContext searchRequestContext) {} + + @Override + public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { + SearchRequest request = context.getRequest(); + queryLatencyInsightService.ingestQueryData( + request.getOrCreateAbsoluteStartMillis(), + request.searchType(), + request.source().toString(FORMAT_PARAMS), + context.getNumShards(), + request.indices(), + new HashMap<>(), + searchRequestContext.phaseTookMap() + ); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchQueryLatencyRecord.java b/server/src/main/java/org/opensearch/action/search/SearchQueryLatencyRecord.java new file mode 100644 index 0000000000000..0357714c4d76e --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchQueryLatencyRecord.java @@ -0,0 +1,78 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; + +/** + * The Latency record stored in the Query Insight Framework + * + * @opensearch.internal + */ +public final class SearchQueryLatencyRecord extends SearchQueryRecord { + + private static final String PHASE_LATENCY_MAP = "phaseLatencyMap"; + + // latency info for each search phase + private final Map phaseLatencyMap; + + public SearchQueryLatencyRecord(final StreamInput in) throws IOException { + super(in); + this.phaseLatencyMap = in.readMap(StreamInput::readString, StreamInput::readLong); + } + + public SearchQueryLatencyRecord( + final Long timestamp, + final SearchType searchType, + final String source, + final int totalShards, + final String[] indices, + final Map propertyMap, + final Map phaseLatencyMap + ) { + super( + timestamp, + searchType, + source, + totalShards, + indices, + propertyMap, + phaseLatencyMap.values().stream().mapToLong(x -> x).sum()); + + this.phaseLatencyMap = phaseLatencyMap; + } + + public Map getPhaseLatencyMap() { + return phaseLatencyMap; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(TIMESTAMP, this.getTimestamp()); + builder.field(SEARCH_TYPE, this.getSearchType()); + builder.field(SOURCE, this.getSource()); + builder.field(TOTAL_SHARDS, this.getTotalShards()); + builder.field(INDICES, this.getIndices()); + builder.field(PROPERTY_MAP, this.getPropertyMap()); + builder.field(PHASE_LATENCY_MAP, this.getPhaseLatencyMap()); + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeMap(phaseLatencyMap, StreamOutput::writeString, StreamOutput::writeLong); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchQueryRecord.java b/server/src/main/java/org/opensearch/action/search/SearchQueryRecord.java new file mode 100644 index 0000000000000..e935918016c84 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchQueryRecord.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; + +/** + * Simple abstract class that represent record stored in the Query Insight Framework + * + * @param The value type associated with the record + * @opensearch.internal + */ +public abstract class SearchQueryRecord > implements Comparable>, Writeable, ToXContentObject { + protected static final String TIMESTAMP = "timestamp"; + protected static final String SEARCH_TYPE = "searchType"; + protected static final String SOURCE = "source"; + protected static final String TOTAL_SHARDS = "totalShards"; + protected static final String INDICES = "indices"; + protected static final String PROPERTY_MAP = "propertyMap"; + protected static final String VALUE = "value"; + + protected final Long timestamp; + + private final SearchType searchType; + + private final String source; + + private final int totalShards; + + private final String[] indices; + + // TODO: add user-account which initialized the request in the future + private final Map propertyMap; + + private T value; + + public SearchQueryRecord(final StreamInput in) throws IOException { + this.timestamp = in.readLong(); + this.searchType = SearchType.fromString(in.readString().toLowerCase()); + this.source = in.readString(); + this.totalShards = in.readInt(); + this.indices = in.readStringArray(); + this.propertyMap = in.readMap(); + this.value = (T) in.readGenericValue(); + } + + public SearchQueryRecord( + final Long timestamp, + final SearchType searchType, + final String source, + final int totalShards, + final String[] indices, + final Map propertyMap, + final T value + ) { + this ( + timestamp, + searchType, + source, + totalShards, + indices, + propertyMap + ); + this.value = value; + } + + public SearchQueryRecord( + final Long timestamp, + final SearchType searchType, + final String source, + final int totalShards, + final String[] indices, + final Map propertyMap + ) { + this.timestamp = timestamp; + this.searchType = searchType; + this.source = source; + this.totalShards = totalShards; + this.indices = indices; + this.propertyMap = propertyMap; + } + + /** + * The timestamp of the top query. + */ + public Long getTimestamp() { + return timestamp; + } + + /** + * The manner at which the search operation is executed. + */ + public SearchType getSearchType() { + return searchType; + } + + /** + * The search source that was executed by the query. + */ + public String getSource() { + return source; + } + + /** + * Total number of shards as part of the search query across all indices + */ + public int getTotalShards() { + return totalShards; + } + + /** + * The indices involved in the search query + */ + public String[] getIndices() { + return indices; + } + + /** + * Get the value of the query metric record + */ + public T getValue() { + return value; + } + + /** + * Set the value of the query metric record + */ + public void setValue(T value) { + this.value = value; + } + + /** + * Extra attributes and information about a search query + */ + public Map getPropertyMap() { + return propertyMap; + } + + @Override + public int compareTo(SearchQueryRecord otherRecord) { + return value.compareTo(otherRecord.getValue()); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(TIMESTAMP, timestamp); + builder.field(SEARCH_TYPE, searchType); + builder.field(SOURCE, source); + builder.field(TOTAL_SHARDS, totalShards); + builder.field(INDICES, indices); + builder.field(PROPERTY_MAP, propertyMap); + builder.field(VALUE, value); + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(timestamp); + out.writeString(searchType.toString()); + out.writeString(source); + out.writeInt(totalShards); + out.writeStringArray(indices); + out.writeMap(propertyMap); + out.writeGenericValue(value); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 05f4308df74fa..1cad41315b90a 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -189,6 +189,8 @@ public class TransportSearchAction extends HandledTransportAction) SearchRequest::new); @@ -228,6 +231,7 @@ public TransportSearchAction( clusterService.getClusterSettings().addSettingsUpdateConsumer(SEARCH_REQUEST_STATS_ENABLED, this::setIsRequestStatsEnabled); this.searchRequestStats = searchRequestStats; this.searchRequestSlowLog = searchRequestSlowLog; + this.searchQueryLatencyListener = searchQueryLatencyListener; this.metricsRegistry = metricsRegistry; this.searchQueryMetricsEnabled = clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING); clusterService.getClusterSettings() @@ -1270,6 +1274,12 @@ private List createSearchListenerList(SearchReq searchListenersList.add(searchRequestSlowLog); } + // enable query insight listeners + if (clusterService.getClusterSettings().get(QueryLatencyInsightService.TOP_N_LATENCY_QUERIES_ENABLED)) { + searchListenersList.add(searchQueryLatencyListener); + } + + return searchListenersList; } diff --git a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java index 05f09c1a6e661..e3b579b84f53d 100644 --- a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java @@ -47,6 +47,9 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.action.admin.cluster.health.ClusterHealthRequestBuilder; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.insights.top_queries.TopQueriesRequest; +import org.opensearch.action.admin.cluster.insights.top_queries.TopQueriesRequestBuilder; +import org.opensearch.action.admin.cluster.insights.top_queries.TopQueriesResponse; import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest; import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequestBuilder; import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse; @@ -942,4 +945,27 @@ public interface ClusterAdminClient extends OpenSearchClient { * Deletes a stored search pipeline */ ActionFuture deleteSearchPipeline(DeleteSearchPipelineRequest request); + + /** + * Top Queries of the cluster. + * + * @param request The top queries request + * @return The result future + * @see org.opensearch.client.Requests#topQueriesRequest(String...) + */ + ActionFuture topQueries(TopQueriesRequest request); + + /** + * Top Queries of the cluster. + * + * @param request The top queries request + * @param listener A listener to be notified with a result + * @see org.opensearch.client.Requests#topQueriesRequest(String...) + */ + void topQueries(TopQueriesRequest request, ActionListener listener); + + /** + * Top Queries of the cluster. + */ + TopQueriesRequestBuilder prepareTopQueries(String... nodesIds); } diff --git a/server/src/main/java/org/opensearch/client/Requests.java b/server/src/main/java/org/opensearch/client/Requests.java index 3607590826007..e0a8f3c583733 100644 --- a/server/src/main/java/org/opensearch/client/Requests.java +++ b/server/src/main/java/org/opensearch/client/Requests.java @@ -36,6 +36,7 @@ import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateRequest; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; +import org.opensearch.action.admin.cluster.insights.top_queries.TopQueriesRequest; import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; @@ -608,4 +609,27 @@ public static GetDecommissionStateRequest getDecommissionStateRequest() { public static DeleteDecommissionStateRequest deleteDecommissionStateRequest() { return new DeleteDecommissionStateRequest(); } + + + /** + * Creates a cluster level top queries request + * + * @return The top queries request + * @see org.opensearch.client.ClusterAdminClient#topQueries(TopQueriesRequest) + */ + public static TopQueriesRequest topQueriesRequest() { + return new TopQueriesRequest(); + } + + + /** + * Creates a top queries request against one or more nodes. Pass {@code null} or an empty array for all nodes. + * + * @param nodesIds The nodes ids to get top queries for + * @return The top queries request + * @see org.opensearch.client.ClusterAdminClient#topQueries(TopQueriesRequest) + */ + public static TopQueriesRequest topQueriesRequest(String... nodesIds) { + return new TopQueriesRequest(nodesIds); + } } diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index 786bfa38bb19c..b6f0755f9c4d0 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -56,6 +56,10 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.action.admin.cluster.health.ClusterHealthRequestBuilder; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.insights.top_queries.TopQueriesAction; +import org.opensearch.action.admin.cluster.insights.top_queries.TopQueriesRequest; +import org.opensearch.action.admin.cluster.insights.top_queries.TopQueriesRequestBuilder; +import org.opensearch.action.admin.cluster.insights.top_queries.TopQueriesResponse; import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction; import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest; import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequestBuilder; @@ -1510,6 +1514,21 @@ public void deleteSearchPipeline(DeleteSearchPipelineRequest request, ActionList public ActionFuture deleteSearchPipeline(DeleteSearchPipelineRequest request) { return execute(DeleteSearchPipelineAction.INSTANCE, request); } + + @Override + public ActionFuture topQueries(final TopQueriesRequest request) { + return execute(TopQueriesAction.INSTANCE, request); + } + + @Override + public void topQueries(final TopQueriesRequest request, final ActionListener listener) { + execute(TopQueriesAction.INSTANCE, request, listener); + } + + @Override + public TopQueriesRequestBuilder prepareTopQueries(String... nodesIds) { + return new TopQueriesRequestBuilder(this, TopQueriesAction.INSTANCE).setNodesIds(nodesIds); + } } static class IndicesAdmin implements IndicesAdminClient { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index ab0ea89f4734d..10e4fe62e90b5 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -35,6 +35,7 @@ import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction; import org.opensearch.action.admin.indices.close.TransportCloseIndexAction; import org.opensearch.action.search.CreatePitController; +import org.opensearch.action.search.QueryLatencyInsightService; import org.opensearch.action.search.SearchRequestSlowLog; import org.opensearch.action.search.TransportSearchAction; import org.opensearch.action.support.AutoCreateIndex; @@ -701,7 +702,12 @@ public void apply(Settings value, Settings current, Settings previous) { AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, - CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT + CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, + + // Cluster insight settings + QueryLatencyInsightService.TOP_N_LATENCY_QUERIES_ENABLED, + QueryLatencyInsightService.TOP_N_LATENCY_QUERIES_WINDOW_SIZE, + QueryLatencyInsightService.TOP_N_LATENCY_QUERIES_SIZE ) ) ); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 4cbf8dc191a9d..fbb5f32978832 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -44,11 +44,13 @@ import org.opensearch.action.ActionModule.DynamicActionRegistry; import org.opensearch.action.ActionType; import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; +import org.opensearch.action.search.QueryLatencyInsightService; import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; import org.opensearch.action.search.SearchRequestSlowLog; import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.SearchTransportService; +import org.opensearch.action.search.SearchQueryLatencyListener; import org.opensearch.action.support.TransportAction; import org.opensearch.action.update.UpdateHelper; import org.opensearch.bootstrap.BootstrapCheck; @@ -786,6 +788,13 @@ protected Node( final SearchRequestStats searchRequestStats = new SearchRequestStats(); final SearchRequestSlowLog searchRequestSlowLog = new SearchRequestSlowLog(clusterService); + final QueryLatencyInsightService queryLatencyInsightService = new QueryLatencyInsightService( + clusterService, + client, + threadPool + ); + final SearchQueryLatencyListener searchQueryLatencyListener = new SearchQueryLatencyListener(queryLatencyInsightService); + remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); final IndicesService indicesService = new IndicesService( settings, @@ -1275,6 +1284,8 @@ protected Node( b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker); + b.bind(QueryLatencyInsightService.class).toInstance(queryLatencyInsightService); + b.bind(SearchQueryLatencyListener.class).toInstance(searchQueryLatencyListener); }); injector = modules.createInjector(); @@ -1386,6 +1397,7 @@ public Node start() throws NodeValidationException { injector.getInstance(FsHealthService.class).start(); injector.getInstance(NodeResourceUsageTracker.class).start(); injector.getInstance(ResourceUsageCollectorService.class).start(); + injector.getInstance(QueryLatencyInsightService.class).start(); nodeService.getMonitorService().start(); nodeService.getSearchBackpressureService().start(); nodeService.getTaskCancellationMonitoringService().start(); diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestTopQueriesAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestTopQueriesAction.java new file mode 100644 index 0000000000000..3c7029e982f70 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestTopQueriesAction.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + + +package org.opensearch.rest.action.admin.cluster; + +import org.opensearch.action.admin.cluster.insights.top_queries.TopQueriesRequest; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.Strings; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestActions.NodesResponseRestListener; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.Set; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.GET; + +/** + * Transport action to get Top N queries by certain metric type + * + * @opensearch.api + */ +public class RestTopQueriesAction extends BaseRestHandler { + /** The metric types that are allowed in top N queries */ + static final Set ALLOWED_METRICS = TopQueriesRequest.Metric.allMetrics(); + + public RestTopQueriesAction() {} + + @Override + public List routes() { + return unmodifiableList( + asList( + new Route(GET, "/_insights/top_queries"), + new Route(GET, "/_insights/top_queries/{nodeId}") + ) + ); + } + + @Override + public String getName() { + return "top_queries_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + final TopQueriesRequest topQueriesRequest = prepareRequest(request); + topQueriesRequest.timeout(request.param("timeout")); + + return channel -> client.admin().cluster().topQueries(topQueriesRequest, new NodesResponseRestListener<>(channel)); + } + + static TopQueriesRequest prepareRequest(final RestRequest request) { + String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); + String metricType = request.param("type", TopQueriesRequest.Metric.LATENCY.metricName()).toUpperCase(); + if (!ALLOWED_METRICS.contains(metricType)) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "request [%s] contains invalid metric type [%s]", + request.path(), + metricType + ) + ); + } + TopQueriesRequest topQueriesRequest = new TopQueriesRequest(nodesIds); + topQueriesRequest.setMetricType(metricType); + return topQueriesRequest; + } + + @Override + protected Set responseParams() { + return Settings.FORMAT_PARAMS; + } + + @Override + public boolean canTripCircuitBreaker() { + return false; + } +} diff --git a/server/src/main/resources/org/opensearch/action/search/mappings/top_n_queries_record.json b/server/src/main/resources/org/opensearch/action/search/mappings/top_n_queries_record.json new file mode 100644 index 0000000000000..3bdfa835e813e --- /dev/null +++ b/server/src/main/resources/org/opensearch/action/search/mappings/top_n_queries_record.json @@ -0,0 +1,40 @@ +{ + "_meta" : { + "schema_version": 1 + }, + "properties" : { + "timestamp" : { + "type" : "date" + }, + "search_type" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "total_shards" : { + "type" : "integer" + }, + "indices" : { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword", + "ignore_above" : 256 + } + } + }, + "phase_latency_map" : { + "type" : "object" + }, + "property_map" : { + "type" : "object" + }, + "value" : { + "type" : "long" + } + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesRequestTests.java new file mode 100644 index 0000000000000..ae63031fab835 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesRequestTests.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + + +package org.opensearch.action.admin.cluster.insights.top_queries; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + + +/** + * Granular tests for the {@link TopQueriesRequest} class. + */ +public class TopQueriesRequestTests extends OpenSearchTestCase { + + /** + * Check that we can set the metric type + */ + public void testSetMetricType() throws Exception { + TopQueriesRequest request = new TopQueriesRequest(randomAlphaOfLength(5)); + request.setMetricType(randomFrom(TopQueriesRequest.Metric.allMetrics())); + TopQueriesRequest deserializedRequest = roundTripRequest(request); + assertEquals(request.getMetricType(), deserializedRequest.getMetricType()); + } + + /** + * Serialize and deserialize a request. + * @param request A request to serialize. + * @return The deserialized, "round-tripped" request. + */ + private static TopQueriesRequest roundTripRequest(TopQueriesRequest request) throws Exception { + try (BytesStreamOutput out = new BytesStreamOutput()) { + request.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + return new TopQueriesRequest(in); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesTests.java new file mode 100644 index 0000000000000..4c04b7f5270ac --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/insights/top_queries/TopQueriesTests.java @@ -0,0 +1,101 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + + +package org.opensearch.action.admin.cluster.insights.top_queries; + +import org.opensearch.action.search.SearchQueryLatencyRecord; +import org.opensearch.action.search.SearchType; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.VersionUtils; + +import java.io.IOException; +import java.util.*; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * Tests for {@link TopQueries}. + */ +public class TopQueriesTests extends OpenSearchTestCase { + + public void testTopQueries() throws IOException { + TopQueries topQueries = createTopQueries(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + topQueries.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + TopQueries readTopQueries = new TopQueries(in); + assertExpected(topQueries, readTopQueries); + } + } + } + + /** + * checks all properties that are expected to be unchanged. + */ + private void assertExpected(TopQueries topQueries, TopQueries readTopQueries) throws IOException { + for (int i = 0; i < topQueries.getLatencyRecords().size(); i ++) { + compareJson(topQueries.getLatencyRecords().get(i), readTopQueries.getLatencyRecords().get(i)); + } + } + + private void compareJson(ToXContent param1, ToXContent param2) throws IOException { + if (param1 == null || param2 == null) { + assertNull(param1); + assertNull(param2); + return; + } + + ToXContent.Params params = ToXContent.EMPTY_PARAMS; + XContentBuilder param1Builder = jsonBuilder(); + param1.toXContent(param1Builder, params); + + XContentBuilder param2Builder = jsonBuilder(); + param2.toXContent(param2Builder, params); + + assertEquals(param1Builder.toString(), param2Builder.toString()); + } + + private static TopQueries createTopQueries() { + DiscoveryNode node = new DiscoveryNode( + "node_for_top_queries_test", + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + VersionUtils.randomVersion(random()) + ); + + Map propertyMap = new HashMap<>(); + propertyMap.put("userId", "user1"); + + Map phaseLatencyMap = new HashMap<>(); + phaseLatencyMap.put("expand", 0L); + phaseLatencyMap.put("query", 20L); + phaseLatencyMap.put("fetch", 1L); + + List records = new ArrayList<>(); + records.add(new SearchQueryLatencyRecord( + randomLong(), + SearchType.QUERY_THEN_FETCH, + "{\"size\":20}", + randomInt(), + randomArray(1, 3, String[]::new, () -> randomAlphaOfLengthBetween(5, 10)), + propertyMap, + phaseLatencyMap + )); + + return new TopQueries(node, records); + } +} diff --git a/server/src/test/java/org/opensearch/action/search/SearchQueryLatencyListenerTests.java b/server/src/test/java/org/opensearch/action/search/SearchQueryLatencyListenerTests.java new file mode 100644 index 0000000000000..afa97c8eda1e7 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/SearchQueryLatencyListenerTests.java @@ -0,0 +1,168 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.opensearch.search.aggregations.support.ValueType; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SearchQueryLatencyListenerTests extends OpenSearchTestCase { + + public void testOnRequestEnd() { + final SearchRequestContext searchRequestContext = mock(SearchRequestContext.class); + final SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); + final SearchRequest searchRequest = mock(SearchRequest.class); + final QueryLatencyInsightService queryLatencyInsightService = mock(QueryLatencyInsightService.class); + + Long timestamp = System.currentTimeMillis() - 100L; + SearchType searchType = SearchType.QUERY_THEN_FETCH; + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.aggregation( + new TermsAggregationBuilder("agg1") + .userValueTypeHint(ValueType.STRING) + .field("type.keyword") + ); + searchSourceBuilder.size(0); + + String[] indices = new String[]{"index-1", "index-2"}; + + Map phaseLatencyMap = new HashMap<>(); + phaseLatencyMap.put("expand", 0L); + phaseLatencyMap.put("query", 20L); + phaseLatencyMap.put("fetch", 1L); + + int numberOfShards = 10; + + + SearchQueryLatencyListener searchQueryLatencyListener = new SearchQueryLatencyListener(queryLatencyInsightService); + final List searchListenersList = new ArrayList<>( + List.of(searchQueryLatencyListener) + ); + + when(searchRequestContext.getSearchRequestOperationsListener()).thenReturn( + new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger) + ); + when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); + when(searchRequest.searchType()).thenReturn(searchType); + when(searchRequest.source()).thenReturn(searchSourceBuilder); + when(searchRequest.indices()).thenReturn(indices); + when(searchRequestContext.phaseTookMap()).thenReturn(phaseLatencyMap); + when(searchPhaseContext.getRequest()).thenReturn(searchRequest); + when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); + + + searchRequestContext.getSearchRequestOperationsListener().onRequestEnd(searchPhaseContext, searchRequestContext); + + verify(queryLatencyInsightService, times(1)).ingestQueryData( + eq(timestamp), + eq(searchType), + eq(searchSourceBuilder.toString()), + eq(numberOfShards), + eq(indices), + anyMap(), + eq(phaseLatencyMap) + ); + } + + public void testConcurrentOnRequestEnd() throws InterruptedException { + final SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); + final SearchRequest searchRequest = mock(SearchRequest.class); + final QueryLatencyInsightService queryLatencyInsightService = mock(QueryLatencyInsightService.class); + + Long timestamp = System.currentTimeMillis() - 100L; + SearchType searchType = SearchType.QUERY_THEN_FETCH; + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.aggregation( + new TermsAggregationBuilder("agg1") + .userValueTypeHint(ValueType.STRING) + .field("type.keyword") + ); + searchSourceBuilder.size(0); + + String[] indices = new String[]{"index-1", "index-2"}; + + Map phaseLatencyMap = new HashMap<>(); + phaseLatencyMap.put("expand", 0L); + phaseLatencyMap.put("query", 20L); + phaseLatencyMap.put("fetch", 1L); + + int numberOfShards = 10; + + + SearchQueryLatencyListener searchQueryLatencyListener = new SearchQueryLatencyListener(queryLatencyInsightService); + final List searchListenersList = new ArrayList<>( + List.of(searchQueryLatencyListener) + ); + + when(searchRequest.getOrCreateAbsoluteStartMillis()).thenReturn(timestamp); + when(searchRequest.searchType()).thenReturn(searchType); + when(searchRequest.source()).thenReturn(searchSourceBuilder); + when(searchRequest.indices()).thenReturn(indices); + when(searchPhaseContext.getRequest()).thenReturn(searchRequest); + when(searchPhaseContext.getNumShards()).thenReturn(numberOfShards); + + + int numRequests = 50; + Thread[] threads = new Thread[numRequests]; + Phaser phaser = new Phaser(numRequests + 1); + CountDownLatch countDownLatch = new CountDownLatch(numRequests); + + ArrayList searchRequestContexts = new ArrayList<>(); + for (int i = 0; i < numRequests; i++) { + SearchRequestContext searchRequestContext = new SearchRequestContext( + new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger) + ); + phaseLatencyMap.forEach(searchRequestContext::updatePhaseTookMap); + searchRequestContexts.add(searchRequestContext); + } + + for (int i = 0; i < numRequests; i++) { + int finalI = i; + threads[i] = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + SearchRequestContext thisContext = searchRequestContexts.get(finalI); + thisContext + .getSearchRequestOperationsListener() + .onRequestEnd(searchPhaseContext, thisContext); + countDownLatch.countDown(); + }); + threads[i].start(); + } + phaser.arriveAndAwaitAdvance(); + countDownLatch.await(); + + verify(queryLatencyInsightService, times(numRequests)).ingestQueryData( + eq(timestamp), + eq(searchType), + eq(searchSourceBuilder.toString()), + eq(numberOfShards), + eq(indices), + anyMap(), + eq(phaseLatencyMap) + ); + } +} diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestTopQueriesActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestTopQueriesActionTests.java new file mode 100644 index 0000000000000..583b58f03fe0c --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestTopQueriesActionTests.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + + +package org.opensearch.rest.action.admin.cluster; + +import org.opensearch.action.admin.cluster.insights.top_queries.TopQueriesRequest; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestRequest; + +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.rest.action.admin.cluster.RestTopQueriesAction.ALLOWED_METRICS; + + +public class RestTopQueriesActionTests extends OpenSearchTestCase { + + public void testEmptyNodeIdsValidType() { + Map params = new HashMap<>(); + params.put("type", randomFrom(ALLOWED_METRICS)); + + RestRequest restRequest = buildRestRequest(params); + TopQueriesRequest actual = RestTopQueriesAction.prepareRequest(restRequest); + assertEquals(0, actual.nodesIds().length); + } + + public void testNodeIdsValid() { + Map params = new HashMap<>(); + params.put("type", randomFrom(ALLOWED_METRICS)); + String[] nodes = randomArray(1, 10, String[]::new, () -> randomAlphaOfLengthBetween(5, 10)); + params.put("nodeId", String.join(",", nodes)); + + RestRequest restRequest = buildRestRequest(params); + TopQueriesRequest actual = RestTopQueriesAction.prepareRequest(restRequest); + assertArrayEquals(nodes, actual.nodesIds()); + } + + public void testInValidType() { + Map params = new HashMap<>(); + params.put("type", randomAlphaOfLengthBetween(5, 10).toUpperCase()); + + RestRequest restRequest = buildRestRequest(params); + Exception exception = assertThrows(IllegalArgumentException.class, () -> { + RestTopQueriesAction.prepareRequest(restRequest); + }); + assertEquals(String.format("request [/_insights/top_queries] contains invalid metric type [%s]", params.get("type")), exception.getMessage()); + } + + private FakeRestRequest buildRestRequest(Map params) { + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET) + .withPath("/_insights/top_queries") + .withParams(params) + .build(); + } +} diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 9fe1f8294fc74..ed1618fe6b6b5 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -86,9 +86,11 @@ import org.opensearch.action.bulk.TransportShardBulkAction; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.resync.TransportResyncReplicationAction; +import org.opensearch.action.search.QueryLatencyInsightService; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; +import org.opensearch.action.search.SearchQueryLatencyListener; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchRequestSlowLog; import org.opensearch.action.search.SearchResponse; @@ -2312,6 +2314,11 @@ public void onFailure(final Exception e) { ), null, new SearchRequestSlowLog(clusterService), + new SearchQueryLatencyListener(new QueryLatencyInsightService( + clusterService, + client, + threadPool + )), NoopMetricsRegistry.INSTANCE ) );