Skip to content

Commit

Permalink
Make sure listener is started when query metrics enabled (opensearch-…
Browse files Browse the repository at this point in the history
…project#74)

* Make sure listener is started when query metrics enabled

Signed-off-by: Siddhant Deshmukh <[email protected]>

* Remove logging

Signed-off-by: Siddhant Deshmukh <[email protected]>

* Update unit tests

Signed-off-by: Siddhant Deshmukh <[email protected]>

* Address review comments

Signed-off-by: Siddhant Deshmukh <[email protected]>

* Refactor enable/disable feature logic

Signed-off-by: Siddhant Deshmukh <[email protected]>

* Address review comments

Signed-off-by: Siddhant Deshmukh <[email protected]>

* Remove concurrency not required

Signed-off-by: Siddhant Deshmukh <[email protected]>

* Remove unnecessary call

Signed-off-by: Siddhant Deshmukh <[email protected]>

---------

Signed-off-by: Siddhant Deshmukh <[email protected]>
  • Loading branch information
deshsidd authored Aug 28, 2024
1 parent c6a46f5 commit 99ccbca
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public Collection<Object> createComponents(
client,
metricsRegistry
);
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService));
return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService, false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public QueryInsightsExporter updateExporter(QueryInsightsExporter exporter, Stri
* Close an exporter
*
* @param exporter the exporter to close
* @throws IOException exception
*/
public void closeExporter(QueryInsightsExporter exporter) throws IOException {
if (exporter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.plugin.insights.core.listener;

import static org.opensearch.plugin.insights.settings.QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNEnabledSetting;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNSizeSetting;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNWindowSizeSetting;
Expand Down Expand Up @@ -56,10 +57,27 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener
*/
@Inject
public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) {
this(clusterService, queryInsightsService, false);
}

/**
* Constructor for QueryInsightsListener
*
* @param clusterService The Node's cluster service.
* @param queryInsightsService The topQueriesByLatencyService associated with this listener
* @param initiallyEnabled Is the listener initially enabled/disabled
*/
public QueryInsightsListener(
final ClusterService clusterService,
final QueryInsightsService queryInsightsService,
boolean initiallyEnabled
) {
super(initiallyEnabled);
this.clusterService = clusterService;
this.queryInsightsService = queryInsightsService;
// Setting endpoints set up for top n queries, including enabling top n queries, window size and top n size
// Expected metricTypes are Latency, CPU and Memory.

// Setting endpoints set up for top n queries, including enabling top n queries, window size, and top n size
// Expected metricTypes are Latency, CPU, and Memory.
for (MetricType type : MetricType.allMetricTypes()) {
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(getTopNEnabledSetting(type), v -> this.setEnableTopQueries(type, v));
Expand All @@ -82,31 +100,48 @@ public QueryInsightsListener(final ClusterService clusterService, final QueryIns
this.queryInsightsService.validateWindowSize(type, clusterService.getClusterSettings().get(getTopNWindowSizeSetting(type)));
this.queryInsightsService.setWindowSize(type, clusterService.getClusterSettings().get(getTopNWindowSizeSetting(type)));
}

clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, v -> setSearchQueryMetricsEnabled(v));
setSearchQueryMetricsEnabled(clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING));
}

/**
* Enable or disable top queries insights collection for {@link MetricType}
* Enable or disable top queries insights collection for {@link MetricType}.
* This function will enable or disable the corresponding listeners
* and query insights services.
*
* @param metricType {@link MetricType}
* @param isCurrentMetricEnabled boolean
*/
public void setEnableTopQueries(final MetricType metricType, final boolean isCurrentMetricEnabled) {
boolean isTopNFeaturePreviouslyDisabled = !queryInsightsService.isTopNFeatureEnabled();
this.queryInsightsService.enableCollection(metricType, isCurrentMetricEnabled);
boolean isTopNFeatureCurrentlyDisabled = !queryInsightsService.isTopNFeatureEnabled();
updateQueryInsightsState();
}

if (isTopNFeatureCurrentlyDisabled) {
super.setEnabled(false);
if (!isTopNFeaturePreviouslyDisabled) {
queryInsightsService.checkAndStopQueryInsights();
}
} else {
/**
* Set search query metrics enabled to enable collection of search query categorization metrics.
* @param searchQueryMetricsEnabled boolean flag
*/
public void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) {
this.queryInsightsService.enableSearchQueryMetricsFeature(searchQueryMetricsEnabled);
updateQueryInsightsState();
}

/**
* Update the query insights service state based on the enabled features.
* If any feature is enabled, it starts the service. If no features are enabled, it stops the service.
*/
private void updateQueryInsightsState() {
boolean anyFeatureEnabled = queryInsightsService.isAnyFeatureEnabled();

if (anyFeatureEnabled && !super.isEnabled()) {
super.setEnabled(true);
if (isTopNFeaturePreviouslyDisabled) {
queryInsightsService.checkAndRestartQueryInsights();
}
queryInsightsService.stop(); // Ensures a clean restart
queryInsightsService.start();
} else if (!anyFeatureEnabled && super.isEnabled()) {
super.setEnabled(false);
queryInsightsService.stop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.plugin.insights.core.service;

import static org.opensearch.plugin.insights.settings.QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;

import java.io.IOException;
Expand Down Expand Up @@ -111,15 +110,15 @@ public QueryInsightsService(
);
}

this.searchQueryMetricsEnabled = clusterSettings.get(SEARCH_QUERY_METRICS_ENABLED_SETTING);
this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
clusterSettings.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, this::setSearchQueryMetricsEnabled);
this.enableSearchQueryMetricsFeature(false);
}

/**
* Ingest the query data into in-memory stores
*
* @param record the record to ingest
* @return SearchQueryRecord
*/
public boolean addRecord(final SearchQueryRecord record) {
boolean shouldAdd = searchQueryMetricsEnabled;
Expand Down Expand Up @@ -228,22 +227,11 @@ public boolean isSearchQueryMetricsFeatureEnabled() {
}

/**
* Stops query insights service if no features enabled
* Enable/Disable search query metrics feature.
* @param enable enable/disable search query metrics feature
*/
public void checkAndStopQueryInsights() {
if (!isAnyFeatureEnabled()) {
this.stop();
}
}

/**
* Restarts query insights service if any feature enabled
*/
public void checkAndRestartQueryInsights() {
if (isAnyFeatureEnabled()) {
this.stop();
this.start();
}
public void enableSearchQueryMetricsFeature(boolean enable) {
searchQueryMetricsEnabled = enable;
}

/**
Expand Down Expand Up @@ -306,24 +294,6 @@ public void setExporter(final MetricType type, final Settings settings) {
}
}

/**
* Set search query metrics enabled to enable collection of search query categorization metrics
* @param searchQueryMetricsEnabled boolean flag
*/
public void setSearchQueryMetricsEnabled(boolean searchQueryMetricsEnabled) {
boolean oldSearchQueryMetricsEnabled = isSearchQueryMetricsFeatureEnabled();
this.searchQueryMetricsEnabled = searchQueryMetricsEnabled;
if (searchQueryMetricsEnabled) {
if (!oldSearchQueryMetricsEnabled) {
checkAndRestartQueryInsights();
}
} else {
if (oldSearchQueryMetricsEnabled) {
checkAndStopQueryInsights();
}
}
}

/**
* Get search query categorizer object
* @return SearchQueryCategorizer object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ public List<SearchQueryRecord> getTopQueriesCurrentSnapshot() {

/**
* Close the top n queries service
* @throws IOException exception
*/
public void close() throws IOException {
queryInsightsExporterFactory.closeExporter(this.exporter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public TopQueriesRequest(final MetricType metricType, final String... nodesIds)

/**
* Get the type of requested metrics
* @return MetricType for current top query service
*/
public MetricType getMetricType() {
return metricType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ static void writeTo(final StreamOutput out, final Attribute attribute) throws IO
*
* @param out the StreamOutput to write
* @param attributeValue the Attribute value to write
* @throws IOException exception
*/
@SuppressWarnings("unchecked")
public static void writeValueTo(StreamOutput out, Object attributeValue) throws IOException {
Expand Down
Loading

0 comments on commit 99ccbca

Please sign in to comment.