-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
* Query insights plugin implementation Signed-off-by: Chenyang Ji <[email protected]> * Increase JavaDoc coverage and update PR based comments Signed-off-by: Chenyang Ji <[email protected]> * Refactor record and service to make them generic Signed-off-by: Chenyang Ji <[email protected]> * refactor service for improving multithreading efficiency Signed-off-by: Chenyang Ji <[email protected]> --------- Signed-off-by: Chenyang Ji <[email protected]> (cherry picked from commit 3cbf54e)
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
* | ||
* Modifications Copyright OpenSearch Contributors. See | ||
* GitHub history for details. | ||
*/ | ||
|
||
opensearchplugin { | ||
description 'OpenSearch Query Insights Plugin.' | ||
classname 'org.opensearch.plugin.insights.QueryInsightsPlugin' | ||
} | ||
|
||
dependencies { | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.plugin.insights; | ||
|
||
import org.opensearch.action.ActionRequest; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.cluster.metadata.IndexNameExpressionResolver; | ||
import org.opensearch.cluster.node.DiscoveryNodes; | ||
import org.opensearch.cluster.service.ClusterService; | ||
import org.opensearch.common.settings.ClusterSettings; | ||
import org.opensearch.common.settings.IndexScopedSettings; | ||
import org.opensearch.common.settings.Setting; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.settings.SettingsFilter; | ||
import org.opensearch.common.unit.TimeValue; | ||
import org.opensearch.common.util.concurrent.OpenSearchExecutors; | ||
import org.opensearch.core.action.ActionResponse; | ||
import org.opensearch.core.common.io.stream.NamedWriteableRegistry; | ||
import org.opensearch.core.xcontent.NamedXContentRegistry; | ||
import org.opensearch.env.Environment; | ||
import org.opensearch.env.NodeEnvironment; | ||
import org.opensearch.plugin.insights.core.service.QueryInsightsService; | ||
import org.opensearch.plugin.insights.settings.QueryInsightsSettings; | ||
import org.opensearch.plugins.ActionPlugin; | ||
import org.opensearch.plugins.Plugin; | ||
import org.opensearch.repositories.RepositoriesService; | ||
import org.opensearch.rest.RestController; | ||
import org.opensearch.rest.RestHandler; | ||
import org.opensearch.script.ScriptService; | ||
import org.opensearch.threadpool.ExecutorBuilder; | ||
import org.opensearch.threadpool.ScalingExecutorBuilder; | ||
import org.opensearch.threadpool.ThreadPool; | ||
import org.opensearch.watcher.ResourceWatcherService; | ||
|
||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.function.Supplier; | ||
|
||
/** | ||
* Plugin class for Query Insights. | ||
*/ | ||
public class QueryInsightsPlugin extends Plugin implements ActionPlugin { | ||
/** | ||
* Default constructor | ||
*/ | ||
public QueryInsightsPlugin() {} | ||
|
||
@Override | ||
public Collection<Object> createComponents( | ||
final Client client, | ||
final ClusterService clusterService, | ||
final ThreadPool threadPool, | ||
final ResourceWatcherService resourceWatcherService, | ||
final ScriptService scriptService, | ||
final NamedXContentRegistry xContentRegistry, | ||
final Environment environment, | ||
final NodeEnvironment nodeEnvironment, | ||
final NamedWriteableRegistry namedWriteableRegistry, | ||
final IndexNameExpressionResolver indexNameExpressionResolver, | ||
final Supplier<RepositoriesService> repositoriesServiceSupplier | ||
) { | ||
// create top n queries service | ||
final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool); | ||
return List.of(queryInsightsService); | ||
} | ||
|
||
@Override | ||
public List<ExecutorBuilder<?>> getExecutorBuilders(final Settings settings) { | ||
return List.of( | ||
Check warning on line 75 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java Codecov / codecov/patchplugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java#L75
|
||
new ScalingExecutorBuilder( | ||
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR, | ||
1, | ||
Math.min((OpenSearchExecutors.allocatedProcessors(settings) + 1) / 2, QueryInsightsSettings.MAX_THREAD_COUNT), | ||
TimeValue.timeValueMinutes(5) | ||
Check warning on line 80 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java Codecov / codecov/patchplugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java#L79-L80
|
||
) | ||
); | ||
} | ||
|
||
@Override | ||
public List<RestHandler> getRestHandlers( | ||
final Settings settings, | ||
final RestController restController, | ||
final ClusterSettings clusterSettings, | ||
final IndexScopedSettings indexScopedSettings, | ||
final SettingsFilter settingsFilter, | ||
final IndexNameExpressionResolver indexNameExpressionResolver, | ||
final Supplier<DiscoveryNodes> nodesInCluster | ||
) { | ||
return List.of(); | ||
} | ||
|
||
@Override | ||
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() { | ||
return List.of(); | ||
} | ||
|
||
@Override | ||
public List<Setting<?>> getSettings() { | ||
return List.of( | ||
// Settings for top N queries | ||
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED, | ||
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE, | ||
QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE | ||
); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.plugin.insights.core.service; | ||
|
||
import org.opensearch.common.inject.Inject; | ||
import org.opensearch.common.lifecycle.AbstractLifecycleComponent; | ||
import org.opensearch.plugin.insights.rules.model.MetricType; | ||
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; | ||
import org.opensearch.plugin.insights.settings.QueryInsightsSettings; | ||
import org.opensearch.threadpool.Scheduler; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Comparator; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
|
||
/** | ||
* Service responsible for gathering, analyzing, storing and exporting | ||
* information related to search queries | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class QueryInsightsService extends AbstractLifecycleComponent { | ||
/** | ||
* The internal OpenSearch thread pool that execute async processing and exporting tasks | ||
*/ | ||
private final ThreadPool threadPool; | ||
|
||
/** | ||
* Services to capture top n queries for different metric types | ||
*/ | ||
private final Map<MetricType, TopQueriesService> topQueriesServices; | ||
|
||
/** | ||
* Flags for enabling insight data collection for different metric types | ||
*/ | ||
private final Map<MetricType, Boolean> enableCollect; | ||
|
||
/** | ||
* The internal thread-safe queue to ingest the search query data and subsequently forward to processors | ||
*/ | ||
private final LinkedBlockingQueue<SearchQueryRecord> queryRecordsQueue; | ||
|
||
/** | ||
* Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when | ||
* the service closed concurrently. | ||
*/ | ||
protected volatile Scheduler.Cancellable scheduledFuture; | ||
|
||
/** | ||
* Constructor of the QueryInsightsService | ||
* | ||
* @param threadPool The OpenSearch thread pool to run async tasks | ||
*/ | ||
@Inject | ||
public QueryInsightsService(final ThreadPool threadPool) { | ||
enableCollect = new HashMap<>(); | ||
queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY); | ||
topQueriesServices = new HashMap<>(); | ||
for (MetricType metricType : MetricType.allMetricTypes()) { | ||
enableCollect.put(metricType, false); | ||
topQueriesServices.put(metricType, new TopQueriesService(metricType)); | ||
} | ||
this.threadPool = threadPool; | ||
} | ||
|
||
/** | ||
* Ingest the query data into in-memory stores | ||
* | ||
* @param record the record to ingest | ||
*/ | ||
public boolean addRecord(final SearchQueryRecord record) { | ||
boolean shouldAdd = false; | ||
for (Map.Entry<MetricType, TopQueriesService> entry : topQueriesServices.entrySet()) { | ||
if (!enableCollect.get(entry.getKey())) { | ||
continue; | ||
Check warning on line 85 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java Codecov / codecov/patchplugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java#L85
|
||
} | ||
List<SearchQueryRecord> currentSnapshot = entry.getValue().getTopQueriesCurrentSnapshot(); | ||
// skip add to top N queries store if the incoming record is smaller than the Nth record | ||
if (currentSnapshot.size() < entry.getValue().getTopNSize() | ||
|| SearchQueryRecord.compare(record, currentSnapshot.get(0), entry.getKey()) > 0) { | ||
shouldAdd = true; | ||
break; | ||
} | ||
} | ||
Check warning on line 94 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java Codecov / codecov/patchplugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java#L94
|
||
if (shouldAdd) { | ||
return queryRecordsQueue.offer(record); | ||
} | ||
return false; | ||
Check warning on line 98 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java Codecov / codecov/patchplugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java#L98
|
||
} | ||
|
||
/** | ||
* Drain the queryRecordsQueue into internal stores and services | ||
*/ | ||
public void drainRecords() { | ||
final List<SearchQueryRecord> records = new ArrayList<>(); | ||
queryRecordsQueue.drainTo(records); | ||
records.sort(Comparator.comparingLong(SearchQueryRecord::getTimestamp)); | ||
for (MetricType metricType : MetricType.allMetricTypes()) { | ||
if (enableCollect.get(metricType)) { | ||
// ingest the records into topQueriesService | ||
topQueriesServices.get(metricType).consumeRecords(records); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Get the top queries service based on metricType | ||
* @param metricType {@link MetricType} | ||
* @return {@link TopQueriesService} | ||
*/ | ||
public TopQueriesService getTopQueriesService(final MetricType metricType) { | ||
return topQueriesServices.get(metricType); | ||
} | ||
|
||
/** | ||
* Set flag to enable or disable Query Insights data collection | ||
* | ||
* @param metricType {@link MetricType} | ||
* @param enable Flag to enable or disable Query Insights data collection | ||
*/ | ||
public void enableCollection(final MetricType metricType, final boolean enable) { | ||
this.enableCollect.put(metricType, enable); | ||
this.topQueriesServices.get(metricType).setEnabled(enable); | ||
} | ||
|
||
/** | ||
* Get if the Query Insights data collection is enabled for a MetricType | ||
* | ||
* @param metricType {@link MetricType} | ||
* @return if the Query Insights data collection is enabled | ||
*/ | ||
public boolean isCollectionEnabled(final MetricType metricType) { | ||
return this.enableCollect.get(metricType); | ||
Check warning on line 143 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java Codecov / codecov/patchplugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java#L143
|
||
} | ||
|
||
/** | ||
* Check if query insights service is enabled | ||
* | ||
* @return if query insights service is enabled | ||
*/ | ||
public boolean isEnabled() { | ||
for (MetricType t : MetricType.allMetricTypes()) { | ||
if (isCollectionEnabled(t)) { | ||
return true; | ||
Check warning on line 154 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java Codecov / codecov/patchplugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java#L154
|
||
} | ||
} | ||
return false; | ||
Check warning on line 157 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java Codecov / codecov/patchplugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java#L156-L157
|
||
} | ||
|
||
@Override | ||
protected void doStart() { | ||
if (isEnabled()) { | ||
scheduledFuture = threadPool.scheduleWithFixedDelay( | ||
Check warning on line 163 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java Codecov / codecov/patchplugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java#L163
|
||
this::drainRecords, | ||
QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL, | ||
QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR | ||
); | ||
} | ||
} | ||
Check warning on line 169 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java Codecov / codecov/patchplugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java#L169
|
||
|
||
@Override | ||
protected void doStop() { | ||
if (scheduledFuture != null) { | ||
scheduledFuture.cancel(); | ||
Check warning on line 174 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java Codecov / codecov/patchplugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java#L174
|
||
} | ||
} | ||
Check warning on line 176 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java Codecov / codecov/patchplugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java#L176
|
||
|
||
@Override | ||
protected void doClose() {} | ||
Check warning on line 179 in plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java Codecov / codecov/patchplugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java#L179
|
||
} |