diff --git a/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java index c16a2edbdf3de..04e715444f50a 100644 --- a/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java +++ b/plugins/query-insights/src/internalClusterTest/java/org/opensearch/plugin/insights/QueryInsightsPluginTransportIT.java @@ -81,7 +81,7 @@ public void testGetTopQueriesWhenFeatureDisabled() { TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); Assert.assertNotEquals(0, response.failures().size()); Assert.assertEquals( - "Cannot get query data when query insight feature is not enabled for MetricType [latency].", + "Cannot get top n queries for [latency] when it is not enabled.", response.failures().get(0).getCause().getCause().getMessage() ); } @@ -89,7 +89,7 @@ public void testGetTopQueriesWhenFeatureDisabled() { /** * Test update top query record when feature enabled */ - public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, InterruptedException { + public void testUpdateRecordWhenFeatureDisabledThenEnabled() throws ExecutionException, InterruptedException { Settings commonSettings = Settings.builder().put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "false").build(); logger.info("--> starting nodes for query insight testing"); @@ -121,7 +121,7 @@ public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, Inte TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); Assert.assertNotEquals(0, response.failures().size()); Assert.assertEquals( - "Cannot get query data when query insight feature is not enabled for MetricType [latency].", + "Cannot get top n queries for [latency] when it is not enabled.", response.failures().get(0).getCause().getCause().getMessage() ); @@ -143,7 +143,7 @@ public void testUpdateRecordWhenFeatureEnabled() throws ExecutionException, Inte /** * Test get top queries when feature enabled */ - public void testGetTopQueriesWhenFeatureEnabled() { + public void testGetTopQueriesWhenFeatureEnabled() throws InterruptedException { Settings commonSettings = Settings.builder() .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") @@ -174,7 +174,8 @@ public void testGetTopQueriesWhenFeatureEnabled() { .get(); assertEquals(searchResponse.getFailedShards(), 0); } - + // Sleep to wait for queue drained to top queries store + Thread.sleep(6000); TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); Assert.assertEquals(0, response.failures().size()); @@ -187,7 +188,7 @@ public void testGetTopQueriesWhenFeatureEnabled() { /** * Test get top queries with small top n size */ - public void testGetTopQueriesWithSmallTopN() { + public void testGetTopQueriesWithSmallTopN() throws InterruptedException { Settings commonSettings = Settings.builder() .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "1") @@ -218,7 +219,7 @@ public void testGetTopQueriesWithSmallTopN() { .get(); assertEquals(searchResponse.getFailedShards(), 0); } - + Thread.sleep(6000); TopQueriesRequest request = new TopQueriesRequest(MetricType.LATENCY); TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); Assert.assertEquals(0, response.failures().size()); @@ -231,7 +232,7 @@ public void testGetTopQueriesWithSmallTopN() { /** * Test get top queries with small window size */ - public void testGetTopQueriesWithSmallWindowSize() { + public void testGetTopQueriesWithSmallWindowSize() throws InterruptedException { Settings commonSettings = Settings.builder() .put(TOP_N_LATENCY_QUERIES_ENABLED.getKey(), "true") .put(TOP_N_LATENCY_QUERIES_SIZE.getKey(), "100") @@ -267,7 +268,7 @@ public void testGetTopQueriesWithSmallWindowSize() { TopQueriesResponse response = OpenSearchIntegTestCase.client().execute(TopQueriesAction.INSTANCE, request).actionGet(); Assert.assertEquals(0, response.failures().size()); Assert.assertEquals(TOTAL_NUMBER_OF_NODES, response.getNodes().size()); - + Thread.sleep(6000); internalCluster().stopAllNodes(); } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java index e35c90e86f41b..4d7e0d486068a 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/QueryInsightsPlugin.java @@ -57,25 +57,25 @@ public QueryInsightsPlugin() {} @Override public Collection createComponents( - Client client, - ClusterService clusterService, - ThreadPool threadPool, - ResourceWatcherService resourceWatcherService, - ScriptService scriptService, - NamedXContentRegistry xContentRegistry, - Environment environment, - NodeEnvironment nodeEnvironment, - NamedWriteableRegistry namedWriteableRegistry, - IndexNameExpressionResolver indexNameExpressionResolver, - Supplier repositoriesServiceSupplier + final Client client, + final ClusterService clusterService, + final ThreadPool threadPool, + final ResourceWatcherService resourceWatcherService, + final ScriptService scriptService, + final NamedXContentRegistry xContentRegistry, + final Environment environment, + final NodeEnvironment nodeEnvironment, + final NamedWriteableRegistry namedWriteableRegistry, + final IndexNameExpressionResolver indexNameExpressionResolver, + final Supplier repositoriesServiceSupplier ) { // create top n queries service - QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool); + final QueryInsightsService queryInsightsService = new QueryInsightsService(threadPool); return List.of(queryInsightsService, new QueryInsightsListener(clusterService, queryInsightsService)); } @Override - public List> getExecutorBuilders(Settings settings) { + public List> getExecutorBuilders(final Settings settings) { return List.of( new ScalingExecutorBuilder( QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR, @@ -88,13 +88,13 @@ public List> getExecutorBuilders(Settings settings) { @Override public List getRestHandlers( - Settings settings, - RestController restController, - ClusterSettings clusterSettings, - IndexScopedSettings indexScopedSettings, - SettingsFilter settingsFilter, - IndexNameExpressionResolver indexNameExpressionResolver, - Supplier nodesInCluster + final Settings settings, + final RestController restController, + final ClusterSettings clusterSettings, + final IndexScopedSettings indexScopedSettings, + final SettingsFilter settingsFilter, + final IndexNameExpressionResolver indexNameExpressionResolver, + final Supplier nodesInCluster ) { return List.of(new RestTopQueriesAction()); } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java index 3bc8215ec19e5..93eccfb534401 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListener.java @@ -19,7 +19,6 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.plugin.insights.core.service.QueryInsightsService; import org.opensearch.plugin.insights.rules.model.Attribute; -import org.opensearch.plugin.insights.rules.model.Measurement; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; @@ -34,7 +33,9 @@ import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE; /** - * The listener for top N queries by latency + * The listener for query insights services. + * It forwards query-related data to the appropriate query insights stores, + * either for each request or for each phase. * * @opensearch.internal */ @@ -52,47 +53,53 @@ public final class QueryInsightsListener extends SearchRequestOperationsListener * @param queryInsightsService The topQueriesByLatencyService associated with this listener */ @Inject - public QueryInsightsListener(ClusterService clusterService, QueryInsightsService queryInsightsService) { + public QueryInsightsListener(final ClusterService clusterService, final QueryInsightsService queryInsightsService) { this.queryInsightsService = queryInsightsService; clusterService.getClusterSettings() - .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnabled(MetricType.LATENCY, v)); + .addSettingsUpdateConsumer(TOP_N_LATENCY_QUERIES_ENABLED, v -> this.setEnableTopQueries(MetricType.LATENCY, v)); clusterService.getClusterSettings() .addSettingsUpdateConsumer( TOP_N_LATENCY_QUERIES_SIZE, - this.queryInsightsService::setTopNSize, - this.queryInsightsService::validateTopNSize + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setTopNSize(v), + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateTopNSize(v) ); clusterService.getClusterSettings() .addSettingsUpdateConsumer( TOP_N_LATENCY_QUERIES_WINDOW_SIZE, - this.queryInsightsService::setWindowSize, - this.queryInsightsService::validateWindowSize + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).setWindowSize(v), + v -> this.queryInsightsService.getTopQueriesService(MetricType.LATENCY).validateWindowSize(v) ); - this.setEnabled(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED)); - this.queryInsightsService.setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE)); - this.queryInsightsService.setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE)); + this.setEnableTopQueries(MetricType.LATENCY, clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_ENABLED)); + this.queryInsightsService.getTopQueriesService(MetricType.LATENCY) + .setTopNSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_SIZE)); + this.queryInsightsService.getTopQueriesService(MetricType.LATENCY) + .setWindowSize(clusterService.getClusterSettings().get(TOP_N_LATENCY_QUERIES_WINDOW_SIZE)); } /** - * Enable or disable metric collection for {@link MetricType} + * Enable or disable top queries insights collection for {@link MetricType} + * This function will enable or disable the corresponding listeners + * and query insights services. * * @param metricType {@link MetricType} * @param enabled boolean */ - public void setEnabled(MetricType metricType, boolean enabled) { - this.queryInsightsService.enableCollection(metricType, enabled); - - // disable QueryInsightsListener only if collection for all metrics are disabled. + public void setEnableTopQueries(final MetricType metricType, final boolean enabled) { if (!enabled) { + // disable QueryInsightsListener only if all metrics collections are disabled. for (MetricType t : MetricType.allMetricTypes()) { if (this.queryInsightsService.isCollectionEnabled(t)) { + this.queryInsightsService.getTopQueriesService(metricType).setEnabled(false); return; } } super.setEnabled(false); + this.queryInsightsService.stop(); } else { super.setEnabled(true); + this.queryInsightsService.start(); } + this.queryInsightsService.enableCollection(metricType, enabled); } @Override @@ -113,17 +120,14 @@ public void onPhaseFailure(SearchPhaseContext context) {} public void onRequestStart(SearchRequestContext searchRequestContext) {} @Override - public void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) { - SearchRequest request = context.getRequest(); + public void onRequestEnd(final SearchPhaseContext context, final SearchRequestContext searchRequestContext) { + final SearchRequest request = context.getRequest(); try { - Map> measurements = new HashMap<>(); + Map measurements = new HashMap<>(); if (queryInsightsService.isCollectionEnabled(MetricType.LATENCY)) { measurements.put( MetricType.LATENCY, - new Measurement<>( - MetricType.LATENCY.name(), - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()) - ) + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()) ); } Map attributes = new HashMap<>(); diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 579130d2cfefc..4d1286d1df615 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -8,81 +8,67 @@ package org.opensearch.plugin.insights.core.service; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.opensearch.common.inject.Inject; -import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.ZoneOffset; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.List; -import java.util.Locale; import java.util.Map; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.concurrent.LinkedBlockingQueue; /** * Service responsible for gathering, analyzing, storing and exporting - * top N queries with high latency data for search queries + * information related to search queries * * @opensearch.internal */ -public class QueryInsightsService { - private static final Logger log = LogManager.getLogger(QueryInsightsService.class); - - private static final TimeValue delay = TimeValue.ZERO; +public class QueryInsightsService extends AbstractLifecycleComponent { /** * The internal OpenSearch thread pool that execute async processing and exporting tasks */ private final ThreadPool threadPool; /** - * enable insight data collection + * Services to capture top n queries for different metric types */ - private final Map enableCollect = new HashMap<>(); - - private int topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE; + private final Map topQueriesServices; - private TimeValue windowSize = TimeValue.timeValueSeconds(QueryInsightsSettings.DEFAULT_WINDOW_SIZE); /** - * The internal thread-safe store that holds the top n queries insight data, by different MetricType + * Flags for enabling insight data collection for different metric types */ - private final Map> topQueriesStores; + private final Map enableCollect; /** - * The internal store that holds historical top n queries insight data by different MetricType in the last window + * The internal thread-safe queue to ingest the search query data and subsequently forward to processors */ - private final Map> topQueriesHistoryStores; + private final LinkedBlockingQueue queryRecordsQueue; + /** - * window start timestamp for each top queries collector + * Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when + * the service closed concurrently. */ - private final Map topQueriesWindowStart; + protected volatile Scheduler.Cancellable scheduledFuture; /** - * Create the TopQueriesByLatencyService Object + * Constructor of the QueryInsightsService * * @param threadPool The OpenSearch thread pool to run async tasks */ @Inject - public QueryInsightsService(ThreadPool threadPool) { - topQueriesStores = new HashMap<>(); - topQueriesHistoryStores = new HashMap<>(); - topQueriesWindowStart = new HashMap<>(); + public QueryInsightsService(final ThreadPool threadPool) { + enableCollect = new HashMap<>(); + queryRecordsQueue = new LinkedBlockingQueue<>(QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY); + topQueriesServices = new HashMap<>(); for (MetricType metricType : MetricType.allMetricTypes()) { - topQueriesStores.put(metricType, new PriorityBlockingQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType))); - topQueriesHistoryStores.put(metricType, new ArrayList<>()); - topQueriesWindowStart.put(metricType, -1L); + enableCollect.put(metricType, false); + topQueriesServices.put(metricType, new TopQueriesService(metricType)); } this.threadPool = threadPool; } @@ -92,71 +78,32 @@ public QueryInsightsService(ThreadPool threadPool) { * * @param record the record to ingest */ - public void addRecord(SearchQueryRecord record) { - for (MetricType metricType : record.getMeasurements().keySet()) { - this.threadPool.schedule(() -> { - if (!topQueriesStores.containsKey(metricType)) { - return; - } - // add the record to corresponding priority queues to calculate top n queries insights - PriorityBlockingQueue store = topQueriesStores.get(metricType); - checkAndResetWindow(metricType, record.getTimestamp()); - if (record.getTimestamp() > topQueriesWindowStart.get(metricType)) { - store.add(record); - // remove top elements for fix sizing priority queue - if (store.size() > this.getTopNSize()) { - store.poll(); - } - } - }, delay, QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR); - } + public boolean addRecord(final SearchQueryRecord record) { + return queryRecordsQueue.offer(record); } - private synchronized void checkAndResetWindow(MetricType metricType, Long timestamp) { - Long windowStart = calculateWindowStart(timestamp); - // reset window if the current window is outdated - if (topQueriesWindowStart.get(metricType) < windowStart) { - // rotate the current window to history store only if the data belongs to the last window - if (topQueriesWindowStart.get(metricType) == windowStart - windowSize.getMillis()) { - topQueriesHistoryStores.put(metricType, new ArrayList<>(topQueriesStores.get(metricType))); - } else { - topQueriesHistoryStores.get(metricType).clear(); + /** + * Drain the queryRecordsQueue into internal stores and services + */ + public void drainRecords() { + final List records = new ArrayList<>(); + queryRecordsQueue.drainTo(records); + records.sort(Comparator.comparingLong(SearchQueryRecord::getTimestamp)); + for (MetricType metricType : MetricType.allMetricTypes()) { + if (enableCollect.get(metricType)) { + // ingest the records into topQueriesService + topQueriesServices.get(metricType).consumeRecords(records); } - topQueriesStores.get(metricType).clear(); - topQueriesWindowStart.put(metricType, windowStart); } } /** - * Get all top queries records that are in the current query insight store, based on the input MetricType - * Optionally include top N records from the last window. - * - * By default, return the records in sorted order. - * + * Get the top queries service based on metricType * @param metricType {@link MetricType} - * @param includeLastWindow if the top N queries from the last window should be included - * @return List of the records that are in the query insight store - * @throws IllegalArgumentException if query insight is disabled in the cluster + * @return {@link TopQueriesService} */ - public List getTopNRecords(MetricType metricType, boolean includeLastWindow) throws IllegalArgumentException { - if (!enableCollect.containsKey(metricType) || !enableCollect.get(metricType)) { - throw new IllegalArgumentException( - String.format( - Locale.ROOT, - "Cannot get query data when query insight feature is not enabled for MetricType [%s].", - metricType - ) - ); - } - checkAndResetWindow(metricType, System.currentTimeMillis()); - List queries = new ArrayList<>(topQueriesStores.get(metricType)); - if (includeLastWindow) { - queries.addAll(topQueriesHistoryStores.get(metricType)); - } - return Stream.of(queries) - .flatMap(Collection::stream) - .sorted((a, b) -> SearchQueryRecord.compare(a, b, metricType) * -1) - .collect(Collectors.toList()); + public TopQueriesService getTopQueriesService(final MetricType metricType) { + return topQueriesServices.get(metricType); } /** @@ -165,12 +112,9 @@ public List getTopNRecords(MetricType metricType, boolean inc * @param metricType {@link MetricType} * @param enable Flag to enable or disable Query Insights data collection */ - public void enableCollection(MetricType metricType, boolean enable) { + public void enableCollection(final MetricType metricType, final boolean enable) { this.enableCollect.put(metricType, enable); - // set topQueriesWindowStart to enable top n queries collection - if (enable) { - topQueriesWindowStart.put(metricType, calculateWindowStart(System.currentTimeMillis())); - } + this.topQueriesServices.get(metricType).setEnabled(enable); } /** @@ -179,122 +123,42 @@ public void enableCollection(MetricType metricType, boolean enable) { * @param metricType {@link MetricType} * @return if the Query Insights data collection is enabled */ - public boolean isCollectionEnabled(MetricType metricType) { - return this.enableCollect.containsKey(metricType) && this.enableCollect.get(metricType); - } - - /** - * Set the top N size for TopQueriesByLatencyService service. - * - * @param size the top N size to set - */ - public void setTopNSize(int size) { - this.topNSize = size; - } - - /** - * Validate the top N size based on the internal constrains - * - * @param size the wanted top N size - */ - public void validateTopNSize(int size) { - if (size > QueryInsightsSettings.MAX_N_SIZE) { - throw new IllegalArgumentException( - "Top N size setting [" - + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE.getKey() - + "]" - + " should be smaller than max top N size [" - + QueryInsightsSettings.MAX_N_SIZE - + "was (" - + size - + " > " - + QueryInsightsSettings.MAX_N_SIZE - + ")" - ); - } - } - - /** - * Get the top N size set for TopQueriesByLatencyService - * - * @return the top N size - */ - public int getTopNSize() { - return this.topNSize; + public boolean isCollectionEnabled(final MetricType metricType) { + return this.enableCollect.get(metricType); } /** - * Set the window size for TopQueriesByLatencyService + * Check if query insights service is enabled * - * @param windowSize window size to set + * @return if query insights service is enabled */ - public void setWindowSize(TimeValue windowSize) { - this.windowSize = windowSize; - for (MetricType metricType : MetricType.allMetricTypes()) { - topQueriesWindowStart.put(metricType, -1L); + public boolean isEnabled() { + for (MetricType t : MetricType.allMetricTypes()) { + if (isCollectionEnabled(t)) { + return true; + } } + return false; } - /** - * Validate if the window size is valid, based on internal constrains. - * - * @param windowSize the window size to validate - */ - public void validateWindowSize(TimeValue windowSize) { - if (windowSize.compareTo(QueryInsightsSettings.MAX_WINDOW_SIZE) > 0 - || windowSize.compareTo(QueryInsightsSettings.MIN_WINDOW_SIZE) < 0) { - throw new IllegalArgumentException( - "Window size setting [" - + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey() - + "]" - + " should be between [" - + QueryInsightsSettings.MAX_WINDOW_SIZE - + "," - + QueryInsightsSettings.MAX_WINDOW_SIZE - + "]" - + "was (" - + windowSize - + ")" - ); - } - if (!(QueryInsightsSettings.VALID_WINDOW_SIZES_IN_MINUTES.contains(windowSize) || windowSize.getMinutes() % 60 == 0)) { - throw new IllegalArgumentException( - "Window size setting [" - + QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE.getKey() - + "]" - + " should be multiple of 1 hour, or one of " - + QueryInsightsSettings.VALID_WINDOW_SIZES_IN_MINUTES - + ", was (" - + windowSize - + ")" + @Override + protected void doStart() { + if (isEnabled()) { + scheduledFuture = threadPool.scheduleWithFixedDelay( + this::drainRecords, + QueryInsightsSettings.QUERY_RECORD_QUEUE_DRAIN_INTERVAL, + QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR ); } } - /** - * Get the size of top N queries store - * @param metricType {@link MetricType} - * @return top N queries store size - */ - public int getTopNStoreSize(MetricType metricType) { - return topQueriesStores.get(metricType).size(); - } - - /** - * Get the size of top N queries history store - * @param metricType {@link MetricType} - * @return top N queries history store size - */ - public int getTopNHistoryStoreSize(MetricType metricType) { - return topQueriesHistoryStores.get(metricType).size(); - } - - private Long calculateWindowStart(Long timestamp) { - LocalDateTime currentTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.of("UTC")); - LocalDateTime windowStartTime = currentTime.truncatedTo(ChronoUnit.HOURS); - while (!windowStartTime.plusMinutes(windowSize.getMinutes()).isAfter(currentTime)) { - windowStartTime = windowStartTime.plusMinutes(windowSize.getMinutes()); + @Override + protected void doStop() { + if (scheduledFuture != null) { + scheduledFuture.cancel(); } - return windowStartTime.toInstant(ZoneOffset.UTC).getEpochSecond() * 1000; } + + @Override + protected void doClose() {} } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java new file mode 100644 index 0000000000000..0d677498dc654 --- /dev/null +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -0,0 +1,269 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Service responsible for gathering and storing top N queries + * with high latency or resource usage + * + * @opensearch.internal + */ +public class TopQueriesService { + private boolean enabled; + /** + * The metric type to measure top n queries + */ + private final MetricType metricType; + private int topNSize; + /** + * The window size to keep the top n queries + */ + private TimeValue windowSize; + /** + * The current window start timestamp + */ + private long windowStart; + /** + * The internal thread-safe store that holds the top n queries insight data + */ + private final PriorityBlockingQueue topQueriesStore; + + /** + * The AtomicReference of a snapshot of the current window top queries for getters to consume + */ + private final AtomicReference> topQueriesCurrentSnapshot; + + /** + * The AtomicReference of a snapshot of the last window top queries for getters to consume + */ + private final AtomicReference> topQueriesHistorySnapshot; + + TopQueriesService(final MetricType metricType) { + this.enabled = false; + this.metricType = metricType; + this.topNSize = QueryInsightsSettings.DEFAULT_TOP_N_SIZE; + this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE; + this.windowStart = -1L; + topQueriesStore = new PriorityBlockingQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType)); + topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>()); + topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>()); + } + + /** + * Set the top N size for TopQueriesService service. + * + * @param topNSize the top N size to set + */ + public void setTopNSize(final int topNSize) { + this.topNSize = topNSize; + } + + /** + * Validate the top N size based on the internal constrains + * + * @param size the wanted top N size + */ + public void validateTopNSize(final int size) { + if (size > QueryInsightsSettings.MAX_N_SIZE) { + throw new IllegalArgumentException( + "Top N size setting for [" + + metricType + + "]" + + " should be smaller than max top N size [" + + QueryInsightsSettings.MAX_N_SIZE + + "was (" + + size + + " > " + + QueryInsightsSettings.MAX_N_SIZE + + ")" + ); + } + } + + /** + * Set enable flag for the service + * @param enabled boolean + */ + public void setEnabled(final boolean enabled) { + this.enabled = enabled; + } + + /** + * Set the window size for top N queries service + * + * @param windowSize window size to set + */ + public void setWindowSize(final TimeValue windowSize) { + this.windowSize = windowSize; + // reset the window start time since the window size has changed + this.windowStart = -1L; + } + + /** + * Validate if the window size is valid, based on internal constrains. + * + * @param windowSize the window size to validate + */ + public void validateWindowSize(final TimeValue windowSize) { + if (windowSize.compareTo(QueryInsightsSettings.MAX_WINDOW_SIZE) > 0 + || windowSize.compareTo(QueryInsightsSettings.MIN_WINDOW_SIZE) < 0) { + throw new IllegalArgumentException( + "Window size setting for [" + + metricType + + "]" + + " should be between [" + + QueryInsightsSettings.MIN_WINDOW_SIZE + + "," + + QueryInsightsSettings.MAX_WINDOW_SIZE + + "]" + + "was (" + + windowSize + + ")" + ); + } + if (!(QueryInsightsSettings.VALID_WINDOW_SIZES_IN_MINUTES.contains(windowSize) || windowSize.getMinutes() % 60 == 0)) { + throw new IllegalArgumentException( + "Window size setting for [" + + metricType + + "]" + + " should be multiple of 1 hour, or one of " + + QueryInsightsSettings.VALID_WINDOW_SIZES_IN_MINUTES + + ", was (" + + windowSize + + ")" + ); + } + } + + /** + * Get all top queries records that are in the current top n queries store + * Optionally include top N records from the last window. + * + * By default, return the records in sorted order. + * + * @param includeLastWindow if the top N queries from the last window should be included + * @return List of the records that are in the query insight store + * @throws IllegalArgumentException if query insight is disabled in the cluster + */ + public List getTopQueriesRecords(final boolean includeLastWindow) throws IllegalArgumentException { + if (!enabled) { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Cannot get top n queries for [%s] when it is not enabled.", metricType.toString()) + ); + } + // read from window snapshots + final List queries = new ArrayList<>(topQueriesCurrentSnapshot.get()); + if (includeLastWindow) { + queries.addAll(topQueriesHistorySnapshot.get()); + } + return Stream.of(queries) + .flatMap(Collection::stream) + .sorted((a, b) -> SearchQueryRecord.compare(a, b, metricType) * -1) + .collect(Collectors.toList()); + } + + /** + * Consume records to top queries stores + * + * @param records a list of {@link SearchQueryRecord} + */ + void consumeRecords(final List records) { + final long currentWindowStart = calculateWindowStart(System.currentTimeMillis()); + List recordsInLastWindow = new ArrayList<>(); + List recordsInThisWindow = new ArrayList<>(); + List currentSnapshot = topQueriesCurrentSnapshot.get(); + for (SearchQueryRecord record : records) { + // skip the records that does not have the corresponding measurement + if (!record.getMeasurements().containsKey(metricType)) { + continue; + } + // skip add to top N queries store if the incoming record is smaller than the Nth record + if (currentSnapshot.size() >= topNSize && SearchQueryRecord.compare(record, currentSnapshot.get(0), metricType) < 0) { + continue; + } + if (record.getTimestamp() < currentWindowStart) { + recordsInLastWindow.add(record); + } else { + recordsInThisWindow.add(record); + } + } + // add records in last window, if there are any, to the top n store + addToTopNStore(recordsInLastWindow); + // rotate window and reset window start if necessary + rotateWindowIfNecessary(currentWindowStart); + // add records in current window, if there are any, to the top n store + addToTopNStore(recordsInThisWindow); + // update the current window snapshot for getters to consume + final List newSnapShot = new ArrayList<>(topQueriesStore); + newSnapShot.sort((a, b) -> SearchQueryRecord.compare(a, b, metricType)); + topQueriesCurrentSnapshot.set(newSnapShot); + } + + private void addToTopNStore(final List records) { + topQueriesStore.addAll(records); + // remove top elements for fix sizing priority queue + while (topQueriesStore.size() > topNSize) { + topQueriesStore.poll(); + } + } + + /** + * Reset the current window and rotate the data to history snapshot for top n queries, + * This function would be invoked zero time or only once in each consumeRecords call + * + * @param newWindowStart the new windowStart to set to + */ + private void rotateWindowIfNecessary(final long newWindowStart) { + // reset window if the current window is outdated + if (windowStart < newWindowStart) { + final List history = new ArrayList<>(); + // rotate the current window to history store only if the data belongs to the last window + if (windowStart == newWindowStart - windowSize.getMillis()) { + history.addAll(topQueriesStore); + } + topQueriesHistorySnapshot.set(history); + topQueriesStore.clear(); + topQueriesCurrentSnapshot.set(new ArrayList<>()); + windowStart = newWindowStart; + } + } + + /** + * Calculate the window start for the given timestamp + * + * @param timestamp the given timestamp to calculate window start + */ + private long calculateWindowStart(final long timestamp) { + final LocalDateTime currentTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.of("UTC")); + LocalDateTime windowStartTime = currentTime.truncatedTo(ChronoUnit.HOURS); + while (!windowStartTime.plusMinutes(windowSize.getMinutes()).isAfter(currentTime)) { + windowStartTime = windowStartTime.plusMinutes(windowSize.getMinutes()); + } + return windowStartTime.toInstant(ZoneOffset.UTC).getEpochSecond() * 1000; + } +} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java index 640a0b82260b5..26cff82aae52e 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueries.java @@ -20,14 +20,13 @@ import java.util.List; /** - * Top Queries by resource usage / latency on a node - *

+ * Holds all top queries records by resource usage or latency on a node * Mainly used in the top N queries node response workflow. * * @opensearch.internal */ public class TopQueries extends BaseNodeResponse implements ToXContentObject { - /** The store to keep the top N queries records */ + /** The store to keep the top queries records */ private final List topQueriesRecords; /** @@ -35,7 +34,7 @@ public class TopQueries extends BaseNodeResponse implements ToXContentObject { * @param in A {@link StreamInput} object. * @throws IOException IOException */ - public TopQueries(StreamInput in) throws IOException { + public TopQueries(final StreamInput in) throws IOException { super(in); topQueriesRecords = in.readList(SearchQueryRecord::new); } @@ -45,13 +44,13 @@ public TopQueries(StreamInput in) throws IOException { * @param node A node that is part of the cluster. * @param searchQueryRecords A list of SearchQueryRecord associated in this TopQueries. */ - public TopQueries(DiscoveryNode node, List searchQueryRecords) { + public TopQueries(final DiscoveryNode node, final List searchQueryRecords) { super(node); topQueriesRecords = searchQueryRecords; } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { if (topQueriesRecords != null) { for (SearchQueryRecord record : topQueriesRecords) { record.toXContent(builder, params); @@ -61,7 +60,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } @Override - public void writeTo(StreamOutput out) throws IOException { + public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); out.writeList(topQueriesRecords); diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java index 27177fef25bea..3bdff2c403161 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesRequest.java @@ -9,7 +9,6 @@ package org.opensearch.plugin.insights.rules.action.top_queries; import org.opensearch.action.support.nodes.BaseNodesRequest; -import org.opensearch.common.annotation.PublicApi; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.plugin.insights.rules.model.MetricType; @@ -21,10 +20,9 @@ * * @opensearch.internal */ -@PublicApi(since = "1.0.0") public class TopQueriesRequest extends BaseNodesRequest { - MetricType metricType; + final MetricType metricType; /** * Constructor for TopQueriesRequest @@ -32,13 +30,9 @@ public class TopQueriesRequest extends BaseNodesRequest { * @param in A {@link StreamInput} object. * @throws IOException if the stream cannot be deserialized. */ - public TopQueriesRequest(StreamInput in) throws IOException { + public TopQueriesRequest(final StreamInput in) throws IOException { super(in); - MetricType metricType = MetricType.readFromStream(in); - if (false == MetricType.allMetricTypes().contains(metricType)) { - throw new IllegalStateException("Invalid metric used in top queries request: " + metricType); - } - this.metricType = metricType; + this.metricType = MetricType.readFromStream(in); } /** @@ -48,7 +42,7 @@ public TopQueriesRequest(StreamInput in) throws IOException { * @param metricType {@link MetricType} * @param nodesIds the nodeIds specified in the request */ - public TopQueriesRequest(MetricType metricType, String... nodesIds) { + public TopQueriesRequest(final MetricType metricType, final String... nodesIds) { super(nodesIds); this.metricType = metricType; } @@ -61,7 +55,7 @@ public MetricType getMetricType() { } @Override - public void writeTo(StreamOutput out) throws IOException { + public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); out.writeString(metricType.toString()); } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java index fe7644de5629f..2e66bb7f77baf 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponse.java @@ -11,7 +11,6 @@ import org.opensearch.action.FailedNodeException; import org.opensearch.action.support.nodes.BaseNodesResponse; import org.opensearch.cluster.ClusterName; -import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -31,7 +30,6 @@ * * @opensearch.internal */ -@PublicApi(since = "1.0.0") public class TopQueriesResponse extends BaseNodesResponse implements ToXContentFragment { private static final String CLUSTER_LEVEL_RESULTS_KEY = "top_queries"; @@ -44,7 +42,7 @@ public class TopQueriesResponse extends BaseNodesResponse implements * @param in A {@link StreamInput} object. * @throws IOException if the stream cannot be deserialized. */ - public TopQueriesResponse(StreamInput in) throws IOException { + public TopQueriesResponse(final StreamInput in) throws IOException { super(in); top_n_size = in.readInt(); metricType = in.readEnum(MetricType.class); @@ -60,11 +58,11 @@ public TopQueriesResponse(StreamInput in) throws IOException { * @param metricType the {@link MetricType} to be returned in this response */ public TopQueriesResponse( - ClusterName clusterName, - List nodes, - List failures, - int top_n_size, - MetricType metricType + final ClusterName clusterName, + final List nodes, + final List failures, + final int top_n_size, + final MetricType metricType ) { super(clusterName, nodes, failures); this.top_n_size = top_n_size; @@ -72,20 +70,20 @@ public TopQueriesResponse( } @Override - protected List readNodesFrom(StreamInput in) throws IOException { + protected List readNodesFrom(final StreamInput in) throws IOException { return in.readList(TopQueries::new); } @Override - protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + protected void writeNodesTo(final StreamOutput out, final List nodes) throws IOException { out.writeList(nodes); out.writeLong(top_n_size); out.writeEnum(metricType); } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - List results = getNodes(); + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + final List results = getNodes(); postProcess(results); builder.startObject(); toClusterLevelResult(builder, params, results); @@ -95,7 +93,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws @Override public String toString() { try { - XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + final XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); builder.startObject(); this.toXContent(builder, EMPTY_PARAMS); builder.endObject(); @@ -110,9 +108,9 @@ public String toString() { * * @param results the top queries results */ - private void postProcess(List results) { + private void postProcess(final List results) { for (TopQueries topQueries : results) { - String nodeId = topQueries.getNode().getId(); + final String nodeId = topQueries.getNode().getId(); for (SearchQueryRecord record : topQueries.getTopQueriesRecord()) { record.addAttribute(Attribute.NODE_ID, nodeId); } @@ -127,8 +125,9 @@ private void postProcess(List results) { * @param results top queries results from all nodes * @throws IOException if an error occurs */ - private void toClusterLevelResult(XContentBuilder builder, Params params, List results) throws IOException { - List all_records = results.stream() + private void toClusterLevelResult(final XContentBuilder builder, final Params params, final List results) + throws IOException { + final List all_records = results.stream() .map(TopQueries::getTopQueriesRecord) .flatMap(Collection::stream) .sorted((a, b) -> SearchQueryRecord.compare(a, b, metricType) * -1) diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java index cb798dd6ed1f4..c1d17edf9ff14 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Attribute.java @@ -16,6 +16,8 @@ /** * Valid attributes for a search query record + * + * @opensearch.internal */ public enum Attribute { /** @@ -45,21 +47,23 @@ public enum Attribute { /** * Read an Attribute from a StreamInput + * * @param in the StreamInput to read from * @return Attribute * @throws IOException IOException */ - public static Attribute readFromStream(StreamInput in) throws IOException { + static Attribute readFromStream(final StreamInput in) throws IOException { return Attribute.valueOf(in.readString().toUpperCase(Locale.ROOT)); } /** * Write Attribute to a StreamOutput + * * @param out the StreamOutput to write * @param attribute the Attribute to write * @throws IOException IOException */ - public static void writeTo(StreamOutput out, Attribute attribute) throws IOException { + static void writeTo(final StreamOutput out, final Attribute attribute) throws IOException { out.writeString(attribute.toString()); } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Measurement.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Measurement.java deleted file mode 100644 index 5fc27ac745146..0000000000000 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/Measurement.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.plugin.insights.rules.model; - -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; - -import java.io.IOException; -import java.util.Locale; -import java.util.Objects; - -/** - * Represents a measurement with a name and a corresponding value. - * - * @param the type of the value - */ -public class Measurement> { - String name; - T value; - - /** - * Create a Measurement from a StreamInput - * @param in the StreamInput to read from - * @throws IOException IOException - */ - @SuppressWarnings("unchecked") - public Measurement(StreamInput in) throws IOException { - this.name = in.readString(); - this.value = (T) in.readGenericValue(); - } - - /** - * Write Measurement to a StreamOutput - * @param out the StreamOutput to write - * @param measurement the Measurement to write - * @throws IOException IOException - */ - public static void writeTo(StreamOutput out, Measurement measurement) throws IOException { - out.writeString(measurement.getName()); - out.writeGenericValue(measurement.getValue()); - } - - /** - * Constructs a new Measurement with input name and value. - * - * @param name the name of the measurement - * @param value the value of the measurement - */ - public Measurement(String name, T value) { - this.name = name; - this.value = value; - } - - /** - * Returns the name of the measurement. - * - * @return the name of the measurement - */ - public String getName() { - return name; - } - - /** - * Returns the value of the measurement. - * - * @return the value of the measurement - */ - public T getValue() { - return value; - } - - /** - * Compare two measurements on the value - * @param other the other measure to compare - * @return 0 if the value that the two measurements holds are the same - * -1 if the value of the measurement is smaller than the other one - * 1 if the value of the measurement is greater than the other one - */ - @SuppressWarnings("unchecked") - public int compareTo(Measurement other) { - Number otherValue = other.getValue(); - if (value.getClass().equals(otherValue.getClass())) { - return value.compareTo((T) otherValue); - } else { - throw new UnsupportedOperationException( - String.format( - Locale.ROOT, - "comparison between different types are not supported : %s, %s", - value.getClass(), - otherValue.getClass() - ) - ); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Measurement other = (Measurement) o; - return Objects.equals(name, other.name) && Objects.equals(value, other.value); - } - - @Override - public int hashCode() { - return Objects.hash(name, value); - } -} diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java index 2b03bec7a722f..cdd090fbf4804 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java @@ -13,14 +13,17 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Comparator; import java.util.Locale; import java.util.Set; import java.util.stream.Collectors; /** * Valid metric types for a search query record + * + * @opensearch.internal */ -public enum MetricType { +public enum MetricType implements Comparator { /** * Latency metric type */ @@ -36,30 +39,33 @@ public enum MetricType { /** * Read a MetricType from a StreamInput + * * @param in the StreamInput to read from * @return MetricType * @throws IOException IOException */ - public static MetricType readFromStream(StreamInput in) throws IOException { + public static MetricType readFromStream(final StreamInput in) throws IOException { return fromString(in.readString()); } /** * Create MetricType from String + * * @param metricType the String representation of MetricType * @return MetricType */ - public static MetricType fromString(String metricType) { + public static MetricType fromString(final String metricType) { return MetricType.valueOf(metricType.toUpperCase(Locale.ROOT)); } /** * Write MetricType to a StreamOutput + * * @param out the StreamOutput to write * @param metricType the MetricType to write * @throws IOException IOException */ - public static void writeTo(StreamOutput out, MetricType metricType) throws IOException { + static void writeTo(final StreamOutput out, final MetricType metricType) throws IOException { out.writeString(metricType.toString()); } @@ -70,9 +76,46 @@ public String toString() { /** * Get all valid metrics + * * @return A set of String that contains all valid metrics */ public static Set allMetricTypes() { return Arrays.stream(values()).collect(Collectors.toSet()); } + + /** + * Compare two numbers based on the metric type + * + * @param a the first Number to be compared. + * @param b the second Number to be compared. + * @return a negative integer, zero, or a positive integer as the first argument is less than, equal to, or greater than the second + */ + public int compare(final Number a, final Number b) { + switch (this) { + case LATENCY: + return Long.compare(a.longValue(), b.longValue()); + case JVM: + case CPU: + return Double.compare(a.doubleValue(), b.doubleValue()); + } + return -1; + } + + /** + * Parse a value with the correct type based on MetricType + * + * @param o the generic object to parse + * @return {@link Number} + */ + Number parseValue(final Object o) { + switch (this) { + case LATENCY: + return (Long) o; + case JVM: + case CPU: + return (Double) o; + default: + return (Number) o; + } + } } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java index 04392d54af42f..060711edb5580 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -16,28 +16,33 @@ import org.opensearch.core.xcontent.XContentBuilder; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.Objects; /** - * Simple abstract class that represent record stored in the Query Insight Framework + * SearchQueryRecord represents a minimal atomic record stored in the Query Insight Framework, + * which contains extensive information related to a search query. * * @opensearch.internal */ public class SearchQueryRecord implements ToXContentObject, Writeable { - private final Long timestamp; - private final Map> measurements; + private final long timestamp; + private final Map measurements; private final Map attributes; /** * Constructor of SearchQueryRecord + * * @param in the StreamInput to read the SearchQueryRecord from * @throws IOException IOException * @throws ClassCastException ClassCastException */ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastException { this.timestamp = in.readLong(); - this.measurements = in.readMap(MetricType::readFromStream, Measurement::new); + measurements = new HashMap<>(); + in.readMap(MetricType::readFromStream, StreamInput::readGenericValue) + .forEach(((metricType, o) -> measurements.put(metricType, metricType.parseValue(o)))); this.attributes = in.readMap(Attribute::readFromStream, StreamInput::readGenericValue); } @@ -48,15 +53,10 @@ public SearchQueryRecord(final StreamInput in) throws IOException, ClassCastExce * @param measurements A list of Measurement associated with this query * @param attributes A list of Attributes associated with this query */ - public SearchQueryRecord( - final Long timestamp, - Map> measurements, - Map attributes - ) { + public SearchQueryRecord(final long timestamp, Map measurements, final Map attributes) { if (measurements == null) { throw new IllegalArgumentException("Measurements cannot be null"); } - this.measurements = measurements; this.attributes = attributes; this.timestamp = timestamp; @@ -77,23 +77,23 @@ public long getTimestamp() { * @param name the name of the measurement * @return the measurement object, or null if not found */ - public Measurement getMeasurement(MetricType name) { + public Number getMeasurement(final MetricType name) { return measurements.get(name); } /** - * Returns an unmodifiable map of all the measurements associated with the metric. + * Returns a map of all the measurements associated with the metric. * - * @return an unmodifiable map of measurement names to measurement objects + * @return a map of measurement names to measurement objects */ - public Map> getMeasurements() { + public Map getMeasurements() { return measurements; } /** - * Returns an unmodifiable map of the attributes associated with the metric. + * Returns a map of the attributes associated with the metric. * - * @return an unmodifiable map of attribute keys to attribute values + * @return a map of attribute keys to attribute values */ public Map getAttributes() { return attributes; @@ -101,39 +101,37 @@ public Map getAttributes() { /** * Add an attribute to this record + * * @param attribute attribute to add * @param value the value associated with the attribute */ - public void addAttribute(Attribute attribute, Object value) { + public void addAttribute(final Attribute attribute, final Object value) { attributes.put(attribute, value); } @Override - public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + public XContentBuilder toXContent(final XContentBuilder builder, final ToXContent.Params params) throws IOException { builder.startObject(); builder.field("timestamp", timestamp); for (Map.Entry entry : attributes.entrySet()) { builder.field(entry.getKey().toString(), entry.getValue()); } - for (Map.Entry> entry : measurements.entrySet()) { - builder.field(entry.getKey().toString(), entry.getValue().getValue()); + for (Map.Entry entry : measurements.entrySet()) { + builder.field(entry.getKey().toString(), entry.getValue()); } return builder.endObject(); } /** * Write a SearchQueryRecord to a StreamOutput + * * @param out the StreamOutput to write * @throws IOException IOException */ @Override - public void writeTo(StreamOutput out) throws IOException { + public void writeTo(final StreamOutput out) throws IOException { out.writeLong(timestamp); - out.writeMap( - measurements, - (stream, metricType) -> MetricType.writeTo(out, metricType), - (stream, measurement) -> Measurement.writeTo(out, measurement) - ); + out.writeMap(measurements, (stream, metricType) -> MetricType.writeTo(out, metricType), StreamOutput::writeGenericValue); out.writeMap(attributes, (stream, attribute) -> Attribute.writeTo(out, attribute), StreamOutput::writeGenericValue); } @@ -147,24 +145,25 @@ public void writeTo(StreamOutput out) throws IOException { * -1 if the first SearchQueryRecord is numerically less than the second SearchQueryRecord; * 1 if the first SearchQueryRecord is numerically greater than the second SearchQueryRecord. */ - public static int compare(SearchQueryRecord a, SearchQueryRecord b, MetricType metricType) { - return a.getMeasurement(metricType).compareTo(b.getMeasurement(metricType)); + public static int compare(final SearchQueryRecord a, final SearchQueryRecord b, final MetricType metricType) { + return metricType.compare(a.getMeasurement(metricType), b.getMeasurement(metricType)); } /** * Check if a SearchQueryRecord is deep equal to another record + * * @param o the other SearchQueryRecord record * @return true if two records are deep equal, false otherwise. */ @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (this == o) { return true; } if (!(o instanceof SearchQueryRecord)) { return false; } - SearchQueryRecord other = (SearchQueryRecord) o; + final SearchQueryRecord other = (SearchQueryRecord) o; return timestamp == other.getTimestamp() && measurements.equals(other.getMeasurements()) && attributes.size() == other.getAttributes().size(); diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java index 654aab6eb1563..6aa511c626ab1 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/resthandler/top_queries/RestTopQueriesAction.java @@ -33,7 +33,7 @@ import static org.opensearch.rest.RestRequest.Method.GET; /** - * Transport action to get Top N queries by certain metric type + * Rest action to get Top N queries by certain metric type * * @opensearch.api */ @@ -64,17 +64,12 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC final TopQueriesRequest topQueriesRequest = prepareRequest(request); topQueriesRequest.timeout(request.param("timeout")); - return channel -> client.execute( - TopQueriesAction.INSTANCE, - topQueriesRequest, - topQueriesResponse(channel) - - ); + return channel -> client.execute(TopQueriesAction.INSTANCE, topQueriesRequest, topQueriesResponse(channel)); } static TopQueriesRequest prepareRequest(final RestRequest request) { - String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); - String metricType = request.param("type", MetricType.LATENCY.toString()); + final String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId")); + final String metricType = request.param("type", MetricType.LATENCY.toString()); if (!ALLOWED_METRICS.contains(metricType)) { throw new IllegalArgumentException( String.format(Locale.ROOT, "request [%s] contains invalid metric type [%s]", request.path(), metricType) @@ -93,10 +88,10 @@ public boolean canTripCircuitBreaker() { return false; } - private RestResponseListener topQueriesResponse(RestChannel channel) { + private RestResponseListener topQueriesResponse(final RestChannel channel) { return new RestResponseListener<>(channel) { @Override - public RestResponse buildResponse(TopQueriesResponse response) throws Exception { + public RestResponse buildResponse(final TopQueriesResponse response) throws Exception { return new BytesRestResponse(RestStatus.OK, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)); } }; diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java index a4ddaca0a6cdc..ddf614211bc41 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/rules/transport/top_queries/TransportTopQueriesAction.java @@ -55,11 +55,11 @@ public class TransportTopQueriesAction extends TransportNodesAction< */ @Inject public TransportTopQueriesAction( - ThreadPool threadPool, - ClusterService clusterService, - TransportService transportService, - QueryInsightsService queryInsightsService, - ActionFilters actionFilters + final ThreadPool threadPool, + final ClusterService clusterService, + final TransportService transportService, + final QueryInsightsService queryInsightsService, + final ActionFilters actionFilters ) { super( TopQueriesAction.NAME, @@ -77,9 +77,9 @@ public TransportTopQueriesAction( @Override protected TopQueriesResponse newResponse( - TopQueriesRequest topQueriesRequest, - List responses, - List failures + final TopQueriesRequest topQueriesRequest, + final List responses, + final List failures ) { if (topQueriesRequest.getMetricType() == MetricType.LATENCY) { return new TopQueriesResponse( @@ -95,20 +95,23 @@ protected TopQueriesResponse newResponse( } @Override - protected NodeRequest newNodeRequest(TopQueriesRequest request) { + protected NodeRequest newNodeRequest(final TopQueriesRequest request) { return new NodeRequest(request); } @Override - protected TopQueries newNodeResponse(StreamInput in) throws IOException { + protected TopQueries newNodeResponse(final StreamInput in) throws IOException { return new TopQueries(in); } @Override - protected TopQueries nodeOperation(NodeRequest nodeRequest) { - TopQueriesRequest request = nodeRequest.request; + protected TopQueries nodeOperation(final NodeRequest nodeRequest) { + final TopQueriesRequest request = nodeRequest.request; if (request.getMetricType() == MetricType.LATENCY) { - return new TopQueries(clusterService.localNode(), queryInsightsService.getTopNRecords(MetricType.LATENCY, true)); + return new TopQueries( + clusterService.localNode(), + queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(true) + ); } else { throw new OpenSearchException(String.format(Locale.ROOT, "invalid metric type %s", request.getMetricType())); } @@ -122,10 +125,11 @@ protected TopQueries nodeOperation(NodeRequest nodeRequest) { */ public static class NodeRequest extends TransportRequest { - TopQueriesRequest request; + final TopQueriesRequest request; /** * Create the NodeResponse object from StreamInput + * * @param in the StreamInput to read the object * @throws IOException IOException */ @@ -138,12 +142,12 @@ public NodeRequest(StreamInput in) throws IOException { * Create the NodeResponse object from a TopQueriesRequest * @param request the TopQueriesRequest object */ - public NodeRequest(TopQueriesRequest request) { + public NodeRequest(final TopQueriesRequest request) { this.request = request; } @Override - public void writeTo(StreamOutput out) throws IOException { + public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); request.writeTo(out); } diff --git a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java index 978185cda1268..52cc1fbde790f 100644 --- a/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java +++ b/plugins/query-insights/src/main/java/org/opensearch/plugin/insights/settings/QueryInsightsSettings.java @@ -31,6 +31,14 @@ public class QueryInsightsSettings { * Max number of thread */ public static final int MAX_THREAD_COUNT = 5; + /** + * Max number of requests for the consumer to collect at one time + */ + public static final int QUERY_RECORD_QUEUE_CAPACITY = 1000; + /** + * Time interval for record queue consumer to run + */ + public static final TimeValue QUERY_RECORD_QUEUE_DRAIN_INTERVAL = new TimeValue(5, TimeUnit.SECONDS); /** * Default Values and Settings */ @@ -54,7 +62,7 @@ public class QueryInsightsSettings { /** Default N size for top N queries */ public static final int MAX_N_SIZE = 100; /** Default window size in seconds to keep the top N queries with latency data in query insight store */ - public static final int DEFAULT_WINDOW_SIZE = 60; + public static final TimeValue DEFAULT_WINDOW_SIZE = new TimeValue(60, TimeUnit.SECONDS); /** Default top N size to keep the data in query insight store */ public static final int DEFAULT_TOP_N_SIZE = 3; /** @@ -68,7 +76,7 @@ public class QueryInsightsSettings { */ public static final String TOP_QUERIES_BASE_URI = PLUGINS_BASE_URI + "/top_queries"; /** Default prefix for top N queries feature */ - public static final String TOP_N_QUERIES_SETTING_PREFIX = "search.top_n_queries"; + public static final String TOP_N_QUERIES_SETTING_PREFIX = "search.insights.top_queries"; /** Default prefix for top N queries by latency feature */ public static final String TOP_N_LATENCY_QUERIES_PREFIX = TOP_N_QUERIES_SETTING_PREFIX + ".latency"; /** @@ -96,7 +104,7 @@ public class QueryInsightsSettings { */ public static final Setting TOP_N_LATENCY_QUERIES_WINDOW_SIZE = Setting.positiveTimeSetting( TOP_N_LATENCY_QUERIES_PREFIX + ".window_size", - new TimeValue(DEFAULT_WINDOW_SIZE, TimeUnit.SECONDS), + DEFAULT_WINDOW_SIZE, Setting.Property.NodeScope, Setting.Property.Dynamic ); diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java index 84c4331e7ec1a..870ef5b9c8be9 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -15,7 +15,6 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.plugin.insights.rules.action.top_queries.TopQueries; import org.opensearch.plugin.insights.rules.model.Attribute; -import org.opensearch.plugin.insights.rules.model.Measurement; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; import org.opensearch.test.VersionUtils; @@ -60,13 +59,13 @@ public static List generateQueryInsightRecords(int lower, int int countOfRecords = randomIntBetween(lower, upper); long timestamp = startTimeStamp; for (int i = 0; i < countOfRecords; ++i) { - Map> measurements = Map.of( + Map measurements = Map.of( MetricType.LATENCY, - new Measurement<>(MetricType.LATENCY.name(), randomLongBetween(1000, 10000)), + randomLongBetween(1000, 10000), MetricType.CPU, - new Measurement<>(MetricType.CPU.name(), randomDouble()), + randomDouble(), MetricType.JVM, - new Measurement<>(MetricType.JVM.name(), randomDouble()) + randomDouble() ); Map phaseLatencyMap = new HashMap<>(); @@ -116,10 +115,7 @@ public static TopQueries createFixedTopQueries() { public static SearchQueryRecord createFixedSearchQueryRecord() { long timestamp = 1706574180000L; - Map> measurements = Map.of( - MetricType.LATENCY, - new Measurement<>(MetricType.LATENCY.name(), 1L) - ); + Map measurements = Map.of(MetricType.LATENCY, 1L); Map phaseLatencyMap = new HashMap<>(); Map attributes = new HashMap<>(); diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java index a6368bb0e180f..f340950017a5c 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/listener/QueryInsightsListenerTests.java @@ -16,6 +16,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.plugin.insights.core.service.QueryInsightsService; +import org.opensearch.plugin.insights.core.service.TopQueriesService; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder; @@ -45,6 +46,7 @@ public class QueryInsightsListenerTests extends OpenSearchTestCase { private final SearchPhaseContext searchPhaseContext = mock(SearchPhaseContext.class); private final SearchRequest searchRequest = mock(SearchRequest.class); private final QueryInsightsService queryInsightsService = mock(QueryInsightsService.class); + private final TopQueriesService topQueriesService = mock(TopQueriesService.class); private ClusterService clusterService; @Before @@ -57,9 +59,10 @@ public void setup() { clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE); clusterService = new ClusterService(settings, clusterSettings, null); when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); + when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService); } - public void testOnRequestEnd() { + public void testOnRequestEnd() throws InterruptedException { Long timestamp = System.currentTimeMillis() - 100L; SearchType searchType = SearchType.QUERY_THEN_FETCH; @@ -146,13 +149,13 @@ public void testConcurrentOnRequestEnd() throws InterruptedException { public void testSetEnabled() { when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true); QueryInsightsListener queryInsightsListener = new QueryInsightsListener(clusterService, queryInsightsService); - queryInsightsListener.setEnabled(MetricType.LATENCY, true); + queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, true); assertTrue(queryInsightsListener.isEnabled()); when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(false); when(queryInsightsService.isCollectionEnabled(MetricType.CPU)).thenReturn(false); when(queryInsightsService.isCollectionEnabled(MetricType.JVM)).thenReturn(false); - queryInsightsListener.setEnabled(MetricType.LATENCY, false); + queryInsightsListener.setEnableTopQueries(MetricType.LATENCY, false); assertFalse(queryInsightsListener.isEnabled()); } } diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index 2d875375eddc1..c29b48b9690d1 100644 --- a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -8,10 +8,6 @@ package org.opensearch.plugin.insights.core.service; -import org.opensearch.cluster.coordination.DeterministicTaskQueue; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.node.Node; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; @@ -20,185 +16,34 @@ import org.opensearch.threadpool.ThreadPool; import org.junit.Before; -import java.util.List; -import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.mock; /** * Unit Tests for {@link QueryInsightsService}. */ public class QueryInsightsServiceTests extends OpenSearchTestCase { - - private DeterministicTaskQueue deterministicTaskQueue; - private ThreadPool threadPool; + private final ThreadPool threadPool = mock(ThreadPool.class); private QueryInsightsService queryInsightsService; @Before public void setup() { - final Settings settings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "top n queries tests").build(); - deterministicTaskQueue = new DeterministicTaskQueue(settings, random()); - threadPool = deterministicTaskQueue.getThreadPool(); queryInsightsService = new QueryInsightsService(threadPool); queryInsightsService.enableCollection(MetricType.LATENCY, true); queryInsightsService.enableCollection(MetricType.CPU, true); queryInsightsService.enableCollection(MetricType.JVM, true); - queryInsightsService.setTopNSize(Integer.MAX_VALUE); - queryInsightsService.setWindowSize(new TimeValue(Long.MAX_VALUE)); - } - - public void testIngestQueryDataWithLargeWindow() { - final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); - for (SearchQueryRecord record : records) { - queryInsightsService.addRecord(record); - } - runUntilTimeoutOrFinish(deterministicTaskQueue, 5000); - assertTrue( - QueryInsightsTestUtils.checkRecordsEqualsWithoutOrder( - queryInsightsService.getTopNRecords(MetricType.LATENCY, false), - records, - MetricType.LATENCY - ) - ); - } - - public void testConcurrentIngestQueryDataWithLargeWindow() { - final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); - - int numRequests = records.size(); - for (int i = 0; i < numRequests; i++) { - int finalI = i; - threadPool.schedule(() -> { queryInsightsService.addRecord(records.get(finalI)); }, TimeValue.ZERO, ThreadPool.Names.GENERIC); - } - runUntilTimeoutOrFinish(deterministicTaskQueue, 5000); - assertTrue( - QueryInsightsTestUtils.checkRecordsEqualsWithoutOrder( - queryInsightsService.getTopNRecords(MetricType.LATENCY, false), - records, - MetricType.LATENCY - ) - ); } - public void testRollingWindow() { - // Create records with starting timestamp Monday, January 1, 2024 8:13:23 PM, with interval 10 minutes - final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10, 10, 1704140003000L, 1000 * 60 * 10); - queryInsightsService.setWindowSize(TimeValue.timeValueMinutes(10)); - queryInsightsService.addRecord(records.get(0)); - runUntilTimeoutOrFinish(deterministicTaskQueue, 5000); - assertEquals(1, queryInsightsService.getTopNStoreSize(MetricType.LATENCY)); - assertEquals(0, queryInsightsService.getTopNHistoryStoreSize(MetricType.LATENCY)); - for (SearchQueryRecord record : records.subList(1, records.size())) { - queryInsightsService.addRecord(record); - runUntilTimeoutOrFinish(deterministicTaskQueue, 5000); - assertEquals(1, queryInsightsService.getTopNStoreSize(MetricType.LATENCY)); - assertEquals(1, queryInsightsService.getTopNHistoryStoreSize(MetricType.LATENCY)); + public void testAddRecordToLimitAndDrain() { + SearchQueryRecord record = QueryInsightsTestUtils.generateQueryInsightRecords(1, 1, System.currentTimeMillis(), 0).get(0); + for (int i = 0; i < QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY; i++) { + assertTrue(queryInsightsService.addRecord(record)); } - assertEquals(0, queryInsightsService.getTopNRecords(MetricType.LATENCY, false).size()); - } - - public void testRollingWindowWithHistory() { - // Create 2 records with starting Now and last 10 minutes - final List records = QueryInsightsTestUtils.generateQueryInsightRecords( - 2, - 2, - System.currentTimeMillis() - 1000 * 60 * 10, - 1000 * 60 * 10 - ); - queryInsightsService.setWindowSize(TimeValue.timeValueMinutes(3)); - queryInsightsService.addRecord(records.get(0)); - runUntilTimeoutOrFinish(deterministicTaskQueue, 5000); - assertEquals(1, queryInsightsService.getTopNStoreSize(MetricType.LATENCY)); - assertEquals(0, queryInsightsService.getTopNHistoryStoreSize(MetricType.LATENCY)); - queryInsightsService.addRecord(records.get(1)); - runUntilTimeoutOrFinish(deterministicTaskQueue, 5000); - assertEquals(1, queryInsightsService.getTopNStoreSize(MetricType.LATENCY)); - assertEquals(0, queryInsightsService.getTopNHistoryStoreSize(MetricType.LATENCY)); - assertEquals(1, queryInsightsService.getTopNRecords(MetricType.LATENCY, true).size()); - } - - public void testSmallWindowClearOutdatedData() { - final List records = QueryInsightsTestUtils.generateQueryInsightRecords( - 2, - 2, - System.currentTimeMillis(), - 1000 * 60 * 20 + // exceed capacity + assertFalse(queryInsightsService.addRecord(record)); + queryInsightsService.drainRecords(); + assertEquals( + QueryInsightsSettings.DEFAULT_TOP_N_SIZE, + queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false).size() ); - queryInsightsService.setWindowSize(TimeValue.timeValueMinutes(10)); - - for (SearchQueryRecord record : records) { - queryInsightsService.addRecord(record); - } - runUntilTimeoutOrFinish(deterministicTaskQueue, 5000); - assertTrue(queryInsightsService.getTopNRecords(MetricType.LATENCY, false).size() <= 1); - } - - public void testSmallNSize() { - final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); - queryInsightsService.setTopNSize(1); - - for (SearchQueryRecord record : records) { - queryInsightsService.addRecord(record); - } - runUntilTimeoutOrFinish(deterministicTaskQueue, 5000); - assertEquals(1, queryInsightsService.getTopNRecords(MetricType.LATENCY, false).size()); - } - - public void testSmallNSizeWithCPU() { - final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); - queryInsightsService.setTopNSize(1); - - for (SearchQueryRecord record : records) { - queryInsightsService.addRecord(record); - } - runUntilTimeoutOrFinish(deterministicTaskQueue, 5000); - assertEquals(1, queryInsightsService.getTopNRecords(MetricType.CPU, false).size()); - } - - public void testSmallNSizeWithJVM() { - final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); - queryInsightsService.setTopNSize(1); - - for (SearchQueryRecord record : records) { - queryInsightsService.addRecord(record); - } - runUntilTimeoutOrFinish(deterministicTaskQueue, 5000); - assertEquals(1, queryInsightsService.getTopNRecords(MetricType.JVM, false).size()); - } - - public void testValidateTopNSize() { - assertThrows( - IllegalArgumentException.class, - () -> { queryInsightsService.validateTopNSize(QueryInsightsSettings.MAX_N_SIZE + 1); } - ); - } - - public void testGetTopQueriesWhenNotEnabled() { - queryInsightsService.enableCollection(MetricType.LATENCY, false); - assertThrows(IllegalArgumentException.class, () -> { queryInsightsService.getTopNRecords(MetricType.LATENCY, false); }); - } - - public void testValidateWindowSize() { - assertThrows(IllegalArgumentException.class, () -> { - queryInsightsService.validateWindowSize( - new TimeValue(QueryInsightsSettings.MAX_WINDOW_SIZE.getSeconds() + 1, TimeUnit.SECONDS) - ); - }); - assertThrows(IllegalArgumentException.class, () -> { - queryInsightsService.validateWindowSize( - new TimeValue(QueryInsightsSettings.MIN_WINDOW_SIZE.getSeconds() - 1, TimeUnit.SECONDS) - ); - }); - assertThrows(IllegalArgumentException.class, () -> { queryInsightsService.validateWindowSize(new TimeValue(2, TimeUnit.DAYS)); }); - } - - private static void runUntilTimeoutOrFinish(DeterministicTaskQueue deterministicTaskQueue, long duration) { - final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + duration; - while (deterministicTaskQueue.getCurrentTimeMillis() < endTime - && (deterministicTaskQueue.hasRunnableTasks() || deterministicTaskQueue.hasDeferredTasks())) { - if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) { - deterministicTaskQueue.advanceTime(); - } else if (deterministicTaskQueue.hasRunnableTasks()) { - deterministicTaskQueue.runRandomTask(); - } - } } } diff --git a/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java new file mode 100644 index 0000000000000..060df84a89485 --- /dev/null +++ b/plugins/query-insights/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.core.service; + +import org.opensearch.cluster.coordination.DeterministicTaskQueue; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.plugin.insights.QueryInsightsTestUtils; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Unit Tests for {@link QueryInsightsService}. + */ +public class TopQueriesServiceTests extends OpenSearchTestCase { + private TopQueriesService topQueriesService; + + @Before + public void setup() { + topQueriesService = new TopQueriesService(MetricType.LATENCY); + topQueriesService.setTopNSize(Integer.MAX_VALUE); + topQueriesService.setWindowSize(new TimeValue(Long.MAX_VALUE)); + topQueriesService.setEnabled(true); + } + + public void testIngestQueryDataWithLargeWindow() { + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + topQueriesService.consumeRecords(records); + assertTrue( + QueryInsightsTestUtils.checkRecordsEqualsWithoutOrder( + topQueriesService.getTopQueriesRecords(false), + records, + MetricType.LATENCY + ) + ); + } + + public void testRollingWindows() { + List records; + // Create 5 records at Now - 10 minutes to make sure they belong to the last window + records = QueryInsightsTestUtils.generateQueryInsightRecords(5, 5, System.currentTimeMillis() - 1000 * 60 * 10, 0); + topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); + topQueriesService.consumeRecords(records); + assertEquals(0, topQueriesService.getTopQueriesRecords(true).size()); + + // Create 10 records at now + 1 minute, to make sure they belong to the current window + records = QueryInsightsTestUtils.generateQueryInsightRecords(10, 10, System.currentTimeMillis() + 1000 * 60, 0); + topQueriesService.setWindowSize(TimeValue.timeValueMinutes(10)); + topQueriesService.consumeRecords(records); + assertEquals(10, topQueriesService.getTopQueriesRecords(true).size()); + } + + public void testSmallNSize() { + final List records = QueryInsightsTestUtils.generateQueryInsightRecords(10); + topQueriesService.setTopNSize(1); + topQueriesService.consumeRecords(records); + assertEquals(1, topQueriesService.getTopQueriesRecords(false).size()); + } + + public void testValidateTopNSize() { + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateTopNSize(QueryInsightsSettings.MAX_N_SIZE + 1); }); + } + + public void testGetTopQueriesWhenNotEnabled() { + topQueriesService.setEnabled(false); + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.getTopQueriesRecords(false); }); + } + + public void testValidateWindowSize() { + assertThrows(IllegalArgumentException.class, () -> { + topQueriesService.validateWindowSize(new TimeValue(QueryInsightsSettings.MAX_WINDOW_SIZE.getSeconds() + 1, TimeUnit.SECONDS)); + }); + assertThrows(IllegalArgumentException.class, () -> { + topQueriesService.validateWindowSize(new TimeValue(QueryInsightsSettings.MIN_WINDOW_SIZE.getSeconds() - 1, TimeUnit.SECONDS)); + }); + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(2, TimeUnit.DAYS)); }); + assertThrows(IllegalArgumentException.class, () -> { topQueriesService.validateWindowSize(new TimeValue(7, TimeUnit.MINUTES)); }); + } + + private static void runUntilTimeoutOrFinish(DeterministicTaskQueue deterministicTaskQueue, long duration) { + final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + duration; + while (deterministicTaskQueue.getCurrentTimeMillis() < endTime + && (deterministicTaskQueue.hasRunnableTasks() || deterministicTaskQueue.hasDeferredTasks())) { + if (deterministicTaskQueue.hasDeferredTasks() && randomBoolean()) { + deterministicTaskQueue.advanceTime(); + } else if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + } + } + } +}