Skip to content

Commit

Permalink
Refactor record and service to make them generic
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <[email protected]>
  • Loading branch information
ansjcy committed Feb 1, 2024
1 parent 8e83e01 commit 021b1cd
Show file tree
Hide file tree
Showing 18 changed files with 306 additions and 933 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Capture information for additional query types and aggregation types ([#11582](https://github.com/opensearch-project/OpenSearch/pull/11582))
- Use slice_size == shard_size heuristic in terms aggs for concurrent segment search and properly calculate the doc_count_error ([#11732](https://github.com/opensearch-project/OpenSearch/pull/11732))
- Added Support for dynamically adding SearchRequestOperationsListeners with SearchRequestOperationsCompositeListenerFactory ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526))
- [Query Insights] Implement Top N Queries Feature in Query Insights Framework([#11904](https://github.com/opensearch-project/OpenSearch/pull/11904))
- [Query Insights] Implement Top N Queries feature to collect and gather information about high latency queries in a window([#11904](https://github.com/opensearch-project/OpenSearch/pull/11904))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesAction;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesRequest;
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -74,11 +75,11 @@ public void testQueryInsightPluginInstalled() {
* Test get top queries when feature disabled
*/
public void testGetTopQueriesWhenFeatureDisabled() {
TopQueriesRequest request = new TopQueriesRequest();
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertNotEquals(0, response.failures().size());
Assert.assertEquals(
"Cannot get query data when query insight feature is not enabled.",
"Cannot get query data when query insight feature is not enabled for MetricType [latency].",
response.failures().get(0).getCause().getCause().getMessage()
);
}
Expand All @@ -93,7 +94,7 @@ public void testGetTopQueriesWhenFeatureEnabled() {
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s")
.build();

logger.info("--> starting 2 nodes for query insight testing");
logger.info("--> starting nodes for query insight testing");
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());

logger.info("--> waiting for nodes to form a cluster");
Expand All @@ -118,11 +119,11 @@ public void testGetTopQueriesWhenFeatureEnabled() {
assertEquals(searchResponse.getFailedShards(), 0);
}

TopQueriesRequest request = new TopQueriesRequest();
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum());
Assert.assertEquals(TOTAL_SEARCH_REQUESTS, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum());

internalCluster().stopAllNodes();
}
Expand All @@ -137,7 +138,7 @@ public void testGetTopQueriesWithSmallTopN() {
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "600s")
.build();

logger.info("--> starting 2 nodes for query insight testing");
logger.info("--> starting nodes for query insight testing");
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());

logger.info("--> waiting for nodes to form a cluster");
Expand All @@ -162,12 +163,11 @@ public void testGetTopQueriesWithSmallTopN() {
assertEquals(searchResponse.getFailedShards(), 0);
}

TopQueriesRequest request = new TopQueriesRequest();
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
// TODO: this should be 1 after changing to cluster level top N.
Assert.assertEquals(2, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum());
Assert.assertEquals(2, response.getNodes().stream().mapToInt(o -> o.getTopQueriesRecord().size()).sum());

internalCluster().stopAllNodes();
}
Expand All @@ -179,10 +179,10 @@ public void testGetTopQueriesWithSmallWindowSize() {
Settings commonSettings = Settings.builder()
.put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true")
.put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100")
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "0ms")
.put(TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey(), "1m")
.build();

logger.info("--> starting 2 nodes for query insight testing");
logger.info("--> starting nodes for query insight testing");
List<String> nodes = internalCluster().startNodes(TOTAL_NUMBER_OF_NODES, Settings.builder().put(commonSettings).build());

logger.info("--> waiting for nodes to form a cluster");
Expand All @@ -207,11 +207,10 @@ public void testGetTopQueriesWithSmallWindowSize() {
assertEquals(searchResponse.getFailedShards(), 0);
}

TopQueriesRequest request = new TopQueriesRequest();
TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY);
TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet();
Assert.assertEquals(0, response.failures().size());
Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size());
Assert.assertEquals(0, response.getNodes().stream().mapToInt(o -> o.getLatencyRecords().size()).sum());

internalCluster().stopAllNodes();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.insights.core.listener;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchRequestContext;
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.Measurement;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;

import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE;

/**
* The listener for top N queries by latency
*
* @opensearch.internal
*/
public final class QueryInsightsListener extends SearchRequestOperationsListener {
private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

private static final Logger log = LogManager.getLogger(QueryInsightsListener.class);

private final QueryInsightsService queryInsightsService;

/**
* Constructor for QueryInsightsListener
*
* @param clusterService The Node's cluster service.
* @param queryInsightsService The topQueriesByLatencyService associated with this listener
*/
@Inject
public QueryInsightsListener(ClusterService clusterService, QueryInsightsService queryInsightsService) {
this.queryInsightsService = queryInsightsService;
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnabled(MetricType.LATENCY, v));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_SIZE,
this.queryInsightsService::setTopNSize,
this.queryInsightsService::validateTopNSize
);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_LATENCY_QUERIES_WINDOW_SIZE,
this.queryInsightsService::setWindowSize,
this.queryInsightsService::validateWindowSize
);
this.setEnabled(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED));
this.queryInsightsService.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE));
this.queryInsightsService.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE));
}

/**
* Enable or disable metric collection for {@link MetricType}
*
* @param metricType {@link MetricType}
* @param enabled boolean
*/
public void setEnabled(MetricType metricType, boolean enabled) {
this.queryInsightsService.enableCollection(metricType, enabled);

// disable QueryInsightsListener only if collection for all metrics are disabled.
if (!enabled) {
for (MetricType t : MetricType.allMetricTypes()) {
if (this.queryInsightsService.isCollectionEnabled(t)) {
return;
}
}
super.setEnabled(false);
} else {
super.setEnabled(true);
}
}

@Override
public boolean isEnabled() {
return super.isEnabled();
}

@Override
public void onPhaseStart(SearchPhaseContext context) {}

@Override
public void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

@Override
public void onPhaseFailure(SearchPhaseContext context) {}

@Override
public void onRequestStart(SearchRequestContext searchRequestContext) {}

@Override
public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
SearchRequest request = context.getRequest();
try {
Map<MetricType, Measurement<? extends Number>> measurements = new HashMap<>();
if (queryInsightsService.isCollectionEnabled(MetricType.LATENCY)) {
measurements.put(
MetricType.LATENCY,
new Measurement<>(
MetricType.LATENCY.name(),
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
)
);
}
Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.SOURCE, request.source().toString(FORMAT_PARAMS));
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
SearchQueryRecord record = new SearchQueryRecord(request.getOrCreateAbsoluteStartMillis(), measurements, attributes);
queryInsightsService.addRecord(record);
} catch (Exception e) {
log.error(String.format(Locale.ROOT, "fail to ingest query insight data, error: %s", e));
}
}
}

This file was deleted.

Loading

0 comments on commit 021b1cd

Please sign in to comment.