Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Top N Queries by latency implementation #12203

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Updates IpField to be searchable when only `doc_values` are enabled ([#11508](https://github.com/opensearch-project/OpenSearch/pull/11508))
- Added Support for dynamically adding SearchRequestOperationsListeners with SearchRequestOperationsCompositeListenerFactory ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526))
- [Query Insights] Query Insights Framework which currently supports retrieving the most time-consuming queries within the last configured time window ([#11903](https://github.com/opensearch-project/OpenSearch/pull/11903))
- [Query Insights] Implement Top N Queries feature to collect and gather information about high latency queries in a window ([#11904](https://github.com/opensearch-project/OpenSearch/pull/11904))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Assert;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* Transport Action tests for Query Insights Plugin
*/

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST)
public class QueryInsightsPluginTransportIT extends OpenSearchIntegTestCase {

private final int TOTAL_NUMBER_OF_NODES = 2;
private final int TOTAL_SEARCH_REQUESTS = 5;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(QueryInsightsPlugin.class);
}

/**
* Test Query Insights Plugin is installed
*/
public void testQueryInsightPluginInstalled() {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
List<PluginInfo> pluginInfos = nodesInfoResponse.getNodes()
.stream()
.flatMap(
(Function<NodeInfo, Stream<PluginInfo>>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream()
)
.collect(Collectors.toList());
Assert.assertTrue(
pluginInfos.stream().anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.plugin.insights.QueryInsightsPlugin"))
);
}

/**
* Test get top queries when feature disabled
*/
public void testGetTopQueriesWhenFeatureDisabled() {
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertNotEquals(0, response.failures().size());
Assert.assertEquals(
"Cannot get top n queries for [latency] when it is not enabled.",
response.failures().get(0).getCause().getCause().getMessage()
);
}

/**
* Test update top query record when feature enabled
*/
public void testUpdateRecordWhenFeatureDisabledThenEnabled() throws ExecutionException, InterruptedException {
Settings commonSettings = Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "false").build();

logger.info("--> starting nodes for query insight testing");
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertFalse(health.isTimedOut());

assertAcked(
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2))
);
ensureStableCluster(2);
logger.info("--> creating indices for query insight testing");
for (int i = 0; i < 5; i++) {
IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get();
assertEquals("CREATED", response.status().toString());
}
// making search requests to get top queries
for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) {
SearchResponse searchResponse = internalCluster().client(randomFrom(nodes))
.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.get();
assertEquals(searchResponse.getFailedShards(), 0);
}

TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertNotEquals(0, response.failures().size());
Assert.assertEquals(
"Cannot get top n queries for [latency] when it is not enabled.",
response.failures().get(0).getCause().getCause().getMessage()
);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().persistentSettings(
Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true").build()
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
TopQueriesRequest request2 = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response2 = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request2).actionGet();
Assert.assertEquals(0, response2.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response2.getNodes().size());
for (int i = 0; i < TOTAL_NUMBER_OF_NODES; i++) {
Assert.assertEquals(0, response2.getNodes().get(i).getTopQueriesRecord().size());
}

internalCluster().stopAllNodes();
}

/**
* Test get top queries when feature enabled
*/
public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s")
.build();

logger.info("--> starting nodes for query insight testing");
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertFalse(health.isTimedOut());

assertAcked(
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2))
);
ensureStableCluster(2);
logger.info("--> creating indices for query insight testing");
for (int i = 0; i < 5; i++) {
IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get();
assertEquals("CREATED", response.status().toString());
}
// making search requests to get top queries
for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) {
SearchResponse searchResponse = internalCluster().client(randomFrom(nodes))
.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.get();
assertEquals(searchResponse.getFailedShards(), 0);
}
// Sleep to wait for queue drained to top queries store
Thread.sleep(6000);
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum());

internalCluster().stopAllNodes();
}

/**
* Test get top queries with small top n size
*/
public void testGetTopQueriesWithSmallTopN() throws InterruptedException {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "1")
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s")
.build();

logger.info("--> starting nodes for query insight testing");
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertFalse(health.isTimedOut());

assertAcked(
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2))
);
ensureStableCluster(2);
logger.info("--> creating indices for query insight testing");
for (int i = 0; i < 5; i++) {
IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get();
assertEquals("CREATED", response.status().toString());
}
// making search requests to get top queries
for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) {
SearchResponse searchResponse = internalCluster().client(randomFrom(nodes))
.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.get();
assertEquals(searchResponse.getFailedShards(), 0);
}
Thread.sleep(6000);
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
Assert.assertEquals(2, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum());

internalCluster().stopAllNodes();
}

/**
* Test get top queries with small window size
*/
public void testGetTopQueriesWithSmallWindowSize() throws InterruptedException {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "1m")
.build();

logger.info("--> starting nodes for query insight testing");
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertFalse(health.isTimedOut());

assertAcked(
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2))
);
ensureStableCluster(2);
logger.info("--> creating indices for query insight testing");
for (int i = 0; i < 5; i++) {
IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get();
assertEquals("CREATED", response.status().toString());
}
// making search requests to get top queries
for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) {
SearchResponse searchResponse = internalCluster().client(randomFrom(nodes))
.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.get();
assertEquals(searchResponse.getFailedShards(), 0);
}

TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
Thread.sleep(6000);
internalCluster().stopAllNodes();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights;

import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.test.rest.OpenSearchRestTestCase;
import org.junit.Assert;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

/**
* Rest Action tests for Query Insights
*/
public class TopQueriesRestIT extends OpenSearchRestTestCase {

/**
* test Query Insights is installed
* @throws IOException IOException
*/
@SuppressWarnings("unchecked")
public void testQueryInsightsPluginInstalled() throws IOException {
Request request = new Request("GET", "/_cat/plugins?s=component&h=name,component,version,description&format=json");
Response response = client().performRequest(request);
List<Object> pluginsList = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
response.getEntity().getContent()
).list();
Assert.assertTrue(
pluginsList.stream().map(o -> (Map<String, Object>) o).anyMatch(plugin -> plugin.get("component").equals("query-insights"))
);
}

/**
* test enabling top queries
* @throws IOException IOException
*/
public void testTopQueriesResponses() throws IOException {
// Enable Top N Queries feature
Request request = new Request("PUT", "/_cluster/settings");
request.setJsonEntity(defaultTopQueriesSettings());
Response response = client().performRequest(request);

Assert.assertEquals(200, response.getStatusLine().getStatusCode());

// Create documents for search
request = new Request("POST", "/my-index-0/_doc");
request.setJsonEntity(createDocumentsBody());
response = client().performRequest(request);

Assert.assertEquals(201, response.getStatusLine().getStatusCode());

// Do Search
request = new Request("GET", "/my-index-0/_search?size=20&pretty");
request.setJsonEntity(searchBody());
response = client().performRequest(request);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
response = client().performRequest(request);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());

// Get Top Queries
request = new Request("GET", "/_insights/top_queries?pretty");
response = client().performRequest(request);

Assert.assertEquals(200, response.getStatusLine().getStatusCode());
String top_requests = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8);
Assert.assertTrue(top_requests.contains("top_queries"));
Assert.assertEquals(2, top_requests.split("searchType", -1).length - 1);
}

private String defaultTopQueriesSettings() {
return "{\n"
+ " \"persistent\" : {\n"
+ " \"search.top_n_queries.latency.enabled\" : \"true\",\n"
+ " \"search.top_n_queries.latency.window_size\" : \"600s\",\n"
+ " \"search.top_n_queries.latency.top_n_size\" : 5\n"
+ " }\n"
+ "}";
}

private String createDocumentsBody() {
return "{\n"
+ " \"@timestamp\": \"2099-11-15T13:12:00\",\n"
+ " \"message\": \"this is document 1\",\n"
+ " \"user\": {\n"
+ " \"id\": \"cyji\"\n"
+ " }\n"
+ "}";
}

private String searchBody() {
return "{}";
}
}
Loading
Loading