-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Top N Queries by latency implementation
Signed-off-by: Chenyang Ji <[email protected]>
- Loading branch information
Showing
27 changed files
with
1,979 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
218 changes: 218 additions & 0 deletions
218
...ternalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,218 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.plugin.insights; | ||
|
||
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; | ||
import org.opensearch.action.admin.cluster.node.info.NodeInfo; | ||
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; | ||
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; | ||
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; | ||
import org.opensearch.action.index.IndexResponse; | ||
import org.opensearch.action.search.SearchResponse; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.index.query.QueryBuilders; | ||
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction; | ||
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest; | ||
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse; | ||
import org.opensearch.plugins.Plugin; | ||
import org.opensearch.plugins.PluginInfo; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
import org.junit.Assert; | ||
|
||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED; | ||
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE; | ||
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE; | ||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; | ||
|
||
/** | ||
* Transport Action tests for Query Insights Plugin | ||
*/ | ||
|
||
@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST) | ||
public class QueryInsightsPluginTransportIT extends OpenSearchIntegTestCase { | ||
|
||
private final int TOTAL_NUMBER_OF_NODES = 2; | ||
private final int TOTAL_SEARCH_REQUESTS = 5; | ||
|
||
@Override | ||
protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
return Arrays.asList(QueryInsightsPlugin.class); | ||
} | ||
|
||
/** | ||
* Test Query Insights Plugin is installed | ||
*/ | ||
public void testQueryInsightPluginInstalled() { | ||
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); | ||
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); | ||
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet(); | ||
List<PluginInfo> pluginInfos = nodesInfoResponse.getNodes() | ||
.stream() | ||
.flatMap( | ||
(Function<NodeInfo, Stream<PluginInfo>>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class).getPluginInfos().stream() | ||
) | ||
.collect(Collectors.toList()); | ||
Assert.assertTrue( | ||
pluginInfos.stream().anyMatch(pluginInfo -> pluginInfo.getName().equals("org.opensearch.plugin.insights.QueryInsightsPlugin")) | ||
); | ||
} | ||
|
||
/** | ||
* Test get top queries when feature disabled | ||
*/ | ||
public void testGetTopQueriesWhenFeatureDisabled() { | ||
TopQueriesRequest request = new TopQueriesRequest(); | ||
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); | ||
Assert.assertNotEquals(0, response.failures().size()); | ||
Assert.assertEquals( | ||
"Cannot get query data when query insight feature is not enabled.", | ||
response.failures().get(0).getCause().getCause().getMessage() | ||
); | ||
} | ||
|
||
/** | ||
* Test get top queries when feature enabled | ||
*/ | ||
public void testGetTopQueriesWhenFeatureEnabled() { | ||
Settings commonSettings = Settings.builder() | ||
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") | ||
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") | ||
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s") | ||
.build(); | ||
|
||
logger.info("--> starting 2 nodes for query insight testing"); | ||
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); | ||
|
||
logger.info("--> waiting for nodes to form a cluster"); | ||
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); | ||
assertFalse(health.isTimedOut()); | ||
|
||
assertAcked( | ||
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) | ||
); | ||
ensureStableCluster(2); | ||
logger.info("--> creating indices for query insight testing"); | ||
for (int i = 0; i < 5; i++) { | ||
IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); | ||
assertEquals("CREATED", response.status().toString()); | ||
} | ||
// making search requests to get top queries | ||
for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { | ||
SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) | ||
.prepareSearch() | ||
.setQuery(QueryBuilders.matchAllQuery()) | ||
.get(); | ||
assertEquals(searchResponse.getFailedShards(), 0); | ||
} | ||
|
||
TopQueriesRequest request = new TopQueriesRequest(); | ||
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); | ||
Assert.assertEquals(0, response.failures().size()); | ||
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); | ||
Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum()); | ||
|
||
internalCluster().stopAllNodes(); | ||
} | ||
|
||
/** | ||
* Test get top queries with small top n size | ||
*/ | ||
public void testGetTopQueriesWithSmallTopN() { | ||
Settings commonSettings = Settings.builder() | ||
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") | ||
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "1") | ||
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s") | ||
.build(); | ||
|
||
logger.info("--> starting 2 nodes for query insight testing"); | ||
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); | ||
|
||
logger.info("--> waiting for nodes to form a cluster"); | ||
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); | ||
assertFalse(health.isTimedOut()); | ||
|
||
assertAcked( | ||
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) | ||
); | ||
ensureStableCluster(2); | ||
logger.info("--> creating indices for query insight testing"); | ||
for (int i = 0; i < 5; i++) { | ||
IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); | ||
assertEquals("CREATED", response.status().toString()); | ||
} | ||
// making search requests to get top queries | ||
for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { | ||
SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) | ||
.prepareSearch() | ||
.setQuery(QueryBuilders.matchAllQuery()) | ||
.get(); | ||
assertEquals(searchResponse.getFailedShards(), 0); | ||
} | ||
|
||
TopQueriesRequest request = new TopQueriesRequest(); | ||
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); | ||
Assert.assertEquals(0, response.failures().size()); | ||
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); | ||
// TODO: this should be 1 after changing to cluster level top N. | ||
Assert.assertEquals(2, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum()); | ||
|
||
internalCluster().stopAllNodes(); | ||
} | ||
|
||
/** | ||
* Test get top queries with small window size | ||
*/ | ||
public void testGetTopQueriesWithSmallWindowSize() { | ||
Settings commonSettings = Settings.builder() | ||
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") | ||
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") | ||
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "0ms") | ||
.build(); | ||
|
||
logger.info("--> starting 2 nodes for query insight testing"); | ||
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build()); | ||
|
||
logger.info("--> waiting for nodes to form a cluster"); | ||
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet(); | ||
assertFalse(health.isTimedOut()); | ||
|
||
assertAcked( | ||
prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 2)) | ||
); | ||
ensureStableCluster(2); | ||
logger.info("--> creating indices for query insight testing"); | ||
for (int i = 0; i < 5; i++) { | ||
IndexResponse response = client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); | ||
assertEquals("CREATED", response.status().toString()); | ||
} | ||
// making search requests to get top queries | ||
for (int i = 0; i < TOTAL_SEARCH_REQUESTS; i++) { | ||
SearchResponse searchResponse = internalCluster().client(randomFrom(nodes)) | ||
.prepareSearch() | ||
.setQuery(QueryBuilders.matchAllQuery()) | ||
.get(); | ||
assertEquals(searchResponse.getFailedShards(), 0); | ||
} | ||
|
||
TopQueriesRequest request = new TopQueriesRequest(); | ||
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); | ||
Assert.assertEquals(0, response.failures().size()); | ||
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); | ||
Assert.assertEquals(0, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum()); | ||
|
||
internalCluster().stopAllNodes(); | ||
} | ||
} |
107 changes: 107 additions & 0 deletions
107
...query-insights/src/javaRestTest/java/org/opensearch/plugin/insights/TopQueriesRestIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.plugin.insights; | ||
|
||
import org.opensearch.client.Request; | ||
import org.opensearch.client.Response; | ||
import org.opensearch.common.xcontent.LoggingDeprecationHandler; | ||
import org.opensearch.common.xcontent.json.JsonXContent; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.test.rest.OpenSearchRestTestCase; | ||
import org.junit.Assert; | ||
|
||
import java.io.IOException; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
/** | ||
* Rest Action tests for Query Insights | ||
*/ | ||
public class TopQueriesRestIT extends OpenSearchRestTestCase { | ||
|
||
/** | ||
* test Query Insights is installed | ||
* @throws IOException IOException | ||
*/ | ||
@SuppressWarnings("unchecked") | ||
public void testQueryInsightsPluginInstalled() throws IOException { | ||
Request request = new Request("GET", "/_cat/plugins?s=component&h=name,component,version,description&format=json"); | ||
Response response = client().performRequest(request); | ||
List<Object> pluginsList = JsonXContent.jsonXContent.createParser( | ||
NamedXContentRegistry.EMPTY, | ||
LoggingDeprecationHandler.INSTANCE, | ||
response.getEntity().getContent() | ||
).list(); | ||
Assert.assertTrue( | ||
pluginsList.stream().map(o -> (Map<String, Object>) o).anyMatch(plugin -> plugin.get("component").equals("query-insights")) | ||
); | ||
} | ||
|
||
/** | ||
* test enabling top queries | ||
* @throws IOException IOException | ||
*/ | ||
public void testTopQueriesResponses() throws IOException { | ||
// Enable Top N Queries feature | ||
Request request = new Request("PUT", "/_cluster/settings"); | ||
request.setJsonEntity(defaultTopQueriesSettings()); | ||
Response response = client().performRequest(request); | ||
|
||
Assert.assertEquals(200, response.getStatusLine().getStatusCode()); | ||
|
||
// Create documents for search | ||
request = new Request("POST", "/my-index-0/_doc"); | ||
request.setJsonEntity(createDocumentsBody()); | ||
response = client().performRequest(request); | ||
|
||
Assert.assertEquals(201, response.getStatusLine().getStatusCode()); | ||
|
||
// Do Search | ||
request = new Request("GET", "/my-index-0/_search?size=20&pretty"); | ||
request.setJsonEntity(searchBody()); | ||
response = client().performRequest(request); | ||
Assert.assertEquals(200, response.getStatusLine().getStatusCode()); | ||
response = client().performRequest(request); | ||
Assert.assertEquals(200, response.getStatusLine().getStatusCode()); | ||
|
||
// Get Top Queries | ||
request = new Request("GET", "/_insights/top_queries?pretty"); | ||
response = client().performRequest(request); | ||
|
||
Assert.assertEquals(200, response.getStatusLine().getStatusCode()); | ||
String top_requests = new String(response.getEntity().getContent().readAllBytes(), StandardCharsets.UTF_8); | ||
Assert.assertTrue(top_requests.contains("top_queries")); | ||
Assert.assertEquals(2, top_requests.split("searchType", -1).length - 1); | ||
} | ||
|
||
private String defaultTopQueriesSettings() { | ||
return "{\n" | ||
+ " \"persistent\" : {\n" | ||
+ " \"search.top_n_queries.latency.enabled\" : \"true\",\n" | ||
+ " \"search.top_n_queries.latency.window_size\" : \"600s\",\n" | ||
+ " \"search.top_n_queries.latency.top_n_size\" : 5\n" | ||
+ " }\n" | ||
+ "}"; | ||
} | ||
|
||
private String createDocumentsBody() { | ||
return "{\n" | ||
+ " \"@timestamp\": \"2099-11-15T13:12:00\",\n" | ||
+ " \"message\": \"this is document 1\",\n" | ||
+ " \"user\": {\n" | ||
+ " \"id\": \"cyji\"\n" | ||
+ " }\n" | ||
+ "}"; | ||
} | ||
|
||
private String searchBody() { | ||
return "{}"; | ||
} | ||
} |
Oops, something went wrong.