From d74adb3d0c6f11232dfc78b0f9796dc8ab710d7f Mon Sep 17 00:00:00 2001 From: Chenyang Ji Date: Mon, 23 Sep 2024 12:14:03 -0700 Subject: [PATCH] Add data models for health stats API (#120) * Add data models for health stats api Signed-off-by: Chenyang Ji * update PR based on comments Signed-off-by: Chenyang Ji --------- Signed-off-by: Chenyang Ji --- .../core/service/QueryInsightsService.java | 20 +++ .../core/service/TopQueriesService.java | 10 ++ .../grouper/MinMaxHeapQueryGrouper.java | 55 ++++--- .../core/service/grouper/QueryGrouper.java | 8 ++ .../insights/rules/model/MetricType.java | 2 +- .../healthStats/QueryGrouperHealthStats.java | 92 ++++++++++++ .../healthStats/QueryInsightsHealthStats.java | 136 ++++++++++++++++++ .../healthStats/TopQueriesHealthStats.java | 92 ++++++++++++ .../rules/model/healthStats/package-info.java | 12 ++ .../service/QueryInsightsServiceTests.java | 34 ++++- .../core/service/TopQueriesServiceTests.java | 22 +++ .../grouper/MinMaxHeapQueryGrouperTests.java | 18 +++ .../QueryGrouperHealthStatsTests.java | 59 ++++++++ .../QueryInsightsHealthStatsTests.java | 107 ++++++++++++++ .../TopQueriesHealthStatsTests.java | 68 +++++++++ 15 files changed, 712 insertions(+), 23 deletions(-) create mode 100644 src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryGrouperHealthStats.java create mode 100644 src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryInsightsHealthStats.java create mode 100644 src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/TopQueriesHealthStats.java create mode 100644 src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/package-info.java create mode 100644 src/test/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryGrouperHealthStatsTests.java create mode 100644 src/test/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryInsightsHealthStatsTests.java create mode 100644 src/test/java/org/opensearch/plugin/insights/rules/model/healthStats/TopQueriesHealthStatsTests.java diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java index 2055719..bb215b8 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/QueryInsightsService.java @@ -9,6 +9,7 @@ package org.opensearch.plugin.insights.core.service; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE; +import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR; import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings; import java.io.IOException; @@ -18,6 +19,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.client.Client; @@ -33,6 +35,8 @@ import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.rules.model.healthStats.QueryInsightsHealthStats; +import org.opensearch.plugin.insights.rules.model.healthStats.TopQueriesHealthStats; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.threadpool.Scheduler; @@ -439,4 +443,20 @@ protected void doClose() throws IOException { queryInsightsExporterFactory.closeAllExporters(); queryInsightsReaderFactory.closeAllReaders(); } + + /** + * Get health stats for query insights services + * + * @return QueryInsightsHealthStats + */ + public QueryInsightsHealthStats getHealthStats() { + Map topQueriesHealthStatsMap = topQueriesServices.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getHealthStats())); + return new QueryInsightsHealthStats( + threadPool.info(QUERY_INSIGHTS_EXECUTOR), + this.queryRecordsQueue.size(), + topQueriesHealthStatsMap + ); + } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java index af3b6a8..fdc9fb3 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/TopQueriesService.java @@ -49,6 +49,7 @@ import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.rules.model.healthStats.TopQueriesHealthStats; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.threadpool.ThreadPool; @@ -516,4 +517,13 @@ private void drain() { topQueriesHistorySnapshot.set(new ArrayList<>()); topQueriesCurrentSnapshot.set(new ArrayList<>()); } + + /** + * Get top queries service health stats + * + * @return TopQueriesHealthStats + */ + public TopQueriesHealthStats getHealthStats() { + return new TopQueriesHealthStats(this.topQueriesStore.size(), this.queryGrouper.getHealthStats()); + } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java b/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java index 9b87f2d..d9c4774 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java @@ -20,6 +20,7 @@ import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.rules.model.healthStats.QueryGrouperHealthStats; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; /** @@ -39,12 +40,12 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper { /** * Metric type for the current grouping service */ - private MetricType metricType; + private final MetricType metricType; /** * Aggregation type for the current grouping service */ - private AggregationType aggregationType; + private final AggregationType aggregationType; /** * Map storing groupingId to Tuple containing Aggregate search query record and boolean. * SearchQueryRecord: Aggregate search query record to store the aggregate of a metric type based on the aggregation type.. @@ -53,18 +54,18 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper { * boolean: True if the aggregate record is in the Top N queries priority query (min heap) and False if the aggregate * record is in the Max Heap */ - private ConcurrentHashMap> groupIdToAggSearchQueryRecord; + private final ConcurrentHashMap> groupIdToAggSearchQueryRecord; /** * Min heap to keep track of the Top N query groups and is passed from TopQueriesService as the topQueriesStore */ - private PriorityBlockingQueue minHeapTopQueriesStore; + private final PriorityBlockingQueue minHeapTopQueriesStore; /** * The Max heap is an overflow data structure used to manage records that exceed the capacity of the Min heap. * It stores all records not included in the Top N query results. When the aggregate measurement for one of these * records is updated and it now qualifies as part of the Top N, the record is moved from the Max heap to the Min heap, * and the records are rearranged accordingly. */ - private PriorityBlockingQueue maxHeapQueryStore; + private final PriorityBlockingQueue maxHeapQueryStore; /** * Top N size based on the configuration set @@ -80,11 +81,11 @@ public class MinMaxHeapQueryGrouper implements QueryGrouper { private int maxGroups; public MinMaxHeapQueryGrouper( - MetricType metricType, - GroupingType groupingType, - AggregationType aggregationType, - PriorityBlockingQueue topQueriesStore, - int topNSize + final MetricType metricType, + final GroupingType groupingType, + final AggregationType aggregationType, + final PriorityBlockingQueue topQueriesStore, + final int topNSize ) { this.groupingType = groupingType; this.metricType = metricType; @@ -103,7 +104,7 @@ public MinMaxHeapQueryGrouper( * @return return the search query record that represents the group */ @Override - public SearchQueryRecord add(SearchQueryRecord searchQueryRecord) { + public SearchQueryRecord add(final SearchQueryRecord searchQueryRecord) { if (groupingType == GroupingType.NONE) { throw new IllegalArgumentException("Do not use addQueryToGroup when GroupingType is None"); } @@ -120,8 +121,7 @@ public SearchQueryRecord add(SearchQueryRecord searchQueryRecord) { // Add to min PQ and promote to max // If max PQ is empty return else try to promote record from max to min if (!groupIdToAggSearchQueryRecord.containsKey(groupId)) { - boolean maxGroupsLimitReached = checkMaxGroupsLimitReached(groupId); - if (maxGroupsLimitReached) { + if (checkMaxGroupsLimitReached(groupId)) { return null; } aggregateSearchQueryRecord = searchQueryRecord; @@ -158,7 +158,7 @@ public void drain() { * @return grouping type changed */ @Override - public boolean setGroupingType(GroupingType newGroupingType) { + public boolean setGroupingType(final GroupingType newGroupingType) { if (this.groupingType != newGroupingType) { this.groupingType = newGroupingType; drain(); @@ -183,7 +183,7 @@ public GroupingType getGroupingType() { * @return max groups changed */ @Override - public boolean setMaxGroups(int maxGroups) { + public boolean setMaxGroups(final int maxGroups) { if (this.maxGroups != maxGroups) { this.maxGroups = maxGroups; drain(); @@ -197,17 +197,21 @@ public boolean setMaxGroups(int maxGroups) { * @param newSize new size */ @Override - public void updateTopNSize(int newSize) { + public void updateTopNSize(final int newSize) { this.topNSize = newSize; } - private void addToMinPQ(SearchQueryRecord searchQueryRecord, String groupId) { + private void addToMinPQ(final SearchQueryRecord searchQueryRecord, final String groupId) { minHeapTopQueriesStore.add(searchQueryRecord); groupIdToAggSearchQueryRecord.put(groupId, new Tuple<>(searchQueryRecord, true)); overflow(); } - private void addAndPromote(SearchQueryRecord searchQueryRecord, SearchQueryRecord aggregateSearchQueryRecord, String groupId) { + private void addAndPromote( + final SearchQueryRecord searchQueryRecord, + final SearchQueryRecord aggregateSearchQueryRecord, + final String groupId + ) { Number measurementToAdd = searchQueryRecord.getMeasurement(metricType); aggregateSearchQueryRecord.addMeasurement(metricType, measurementToAdd); addToMinPQ(aggregateSearchQueryRecord, groupId); @@ -228,7 +232,7 @@ private void overflow() { } } - private boolean checkMaxGroupsLimitReached(String groupId) { + private boolean checkMaxGroupsLimitReached(final String groupId) { if (maxGroups <= maxHeapQueryStore.size() && minHeapTopQueriesStore.size() >= topNSize) { log.warn( "Exceeded [{}] setting threshold which is set at {}. Discarding new group with id {}.", @@ -259,11 +263,11 @@ int numberOfTopGroups() { } /** - * Get groupingId. This should be query hashcode for SIMILARITY grouping and user_id for USER_ID grouping. + * Get groupingId. This should be the query hashcode for SIMILARITY grouping and user_id for USER_ID grouping. * @param searchQueryRecord record * @return Grouping Id */ - private String getGroupingId(SearchQueryRecord searchQueryRecord) { + private String getGroupingId(final SearchQueryRecord searchQueryRecord) { switch (groupingType) { case SIMILARITY: return searchQueryRecord.getAttributes().get(Attribute.QUERY_HASHCODE).toString(); @@ -273,4 +277,13 @@ private String getGroupingId(SearchQueryRecord searchQueryRecord) { throw new IllegalArgumentException("The following grouping type is not supported : " + groupingType); } } + + /** + * Get health stats of the MinMaxHeapQueryGrouperService + * + * @return QueryGrouperHealthStats + */ + public QueryGrouperHealthStats getHealthStats() { + return new QueryGrouperHealthStats(this.groupIdToAggSearchQueryRecord.size(), this.maxHeapQueryStore.size()); + } } diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/grouper/QueryGrouper.java b/src/main/java/org/opensearch/plugin/insights/core/service/grouper/QueryGrouper.java index 07ba769..bcab5ba 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/grouper/QueryGrouper.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/grouper/QueryGrouper.java @@ -10,6 +10,7 @@ import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.rules.model.healthStats.QueryGrouperHealthStats; /** * Interface for grouping search queries based on grouping type for the metric type. @@ -57,4 +58,11 @@ public interface QueryGrouper { * @param topNSize the new top N size */ void updateTopNSize(int topNSize); + + /** + * Get health stats of the QueryGrouperService + * + * @return QueryGrouperHealthStats + */ + QueryGrouperHealthStats getHealthStats(); } diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java b/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java index 52b8331..54b6a7e 100644 --- a/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/MetricType.java @@ -62,7 +62,7 @@ public static MetricType fromString(final String metricType) { * @param metricType the MetricType to write * @throws IOException IOException */ - static void writeTo(final StreamOutput out, final MetricType metricType) throws IOException { + public static void writeTo(final StreamOutput out, final MetricType metricType) throws IOException { out.writeString(metricType.toString()); } diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryGrouperHealthStats.java b/src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryGrouperHealthStats.java new file mode 100644 index 0000000..b908dd7 --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryGrouperHealthStats.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model.healthStats; + +import java.io.IOException; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +/** + * Represents the health statistics of the query grouper. + */ +public class QueryGrouperHealthStats implements ToXContentFragment, Writeable { + private final int queryGroupCount; + private final int queryGroupHeapSize; + private static final String QUERY_GROUP_COUNT_TOTAL = "QueryGroupCount_Total"; + private static final String QUERY_GROUP_COUNT_MAX_HEAP = "QueryGroupCount_MaxHeap"; + + /** + * Constructor to read QueryGrouperHealthStats from a StreamInput. + * + * @param in the StreamInput to read the QueryGrouperHealthStats from + * @throws IOException IOException + */ + public QueryGrouperHealthStats(final StreamInput in) throws IOException { + this.queryGroupCount = in.readInt(); + this.queryGroupHeapSize = in.readInt(); + } + + /** + * Constructor of QueryGrouperHealthStats + * + * @param queryGroupCount Number of groups in the grouper + * @param queryGroupHeapSize Heap size of the grouper + */ + public QueryGrouperHealthStats(final int queryGroupCount, final int queryGroupHeapSize) { + this.queryGroupCount = queryGroupCount; + this.queryGroupHeapSize = queryGroupHeapSize; + } + + /** + * Write QueryGrouperHealthStats Object to output stream + * @param out streamOutput + * @throws IOException IOException + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeInt(queryGroupCount); + out.writeInt(queryGroupHeapSize); + } + + /** + * Write QueryGrouperHealthStats object to XContent + * + * @param builder XContentBuilder + * @param params Parameters + * @return XContentBuilder + * @throws IOException IOException + */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(QUERY_GROUP_COUNT_TOTAL, queryGroupCount); + builder.field(QUERY_GROUP_COUNT_MAX_HEAP, queryGroupHeapSize); + return builder; + } + + /** + * Gets the number of query groups. + * + * @return the query group count + */ + public int getQueryGroupCount() { + return queryGroupCount; + } + + /** + * Gets the query group heap size. + * + * @return the query group heap size + */ + public int getQueryGroupHeapSize() { + return queryGroupHeapSize; + } +} diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryInsightsHealthStats.java b/src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryInsightsHealthStats.java new file mode 100644 index 0000000..1fcd94e --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryInsightsHealthStats.java @@ -0,0 +1,136 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model.healthStats; + +import java.io.IOException; +import java.util.Map; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.threadpool.ThreadPoolInfo; + +/** + * QueryInsightsHealthStats holds the stats on one node to indicate the health of the Query Insights plugin. + */ +public class QueryInsightsHealthStats implements ToXContentFragment, Writeable { + private final ThreadPool.Info threadPoolInfo; + private final int queryRecordsQueueSize; + private final Map topQueriesHealthStats; + + private static final String THREAD_POOL_INFO = "ThreadPoolInfo"; + private static final String QUERY_RECORDS_QUEUE_SIZE = "QueryRecordsQueueSize"; + private static final String TOP_QUERIES_HEALTH_STATS = "TopQueriesHealthStats"; + + /** + * Constructor to read QueryInsightsHealthStats from a StreamInput. + * + * @param in the StreamInput to read the QueryInsightsHealthStats from + * @throws IOException if an I/O error occurs + */ + public QueryInsightsHealthStats(final StreamInput in) throws IOException { + this.threadPoolInfo = new ThreadPool.Info(in); + this.queryRecordsQueueSize = in.readInt(); + this.topQueriesHealthStats = in.readMap(MetricType::readFromStream, TopQueriesHealthStats::new); + } + + /** + * Constructor of QueryInsightsHealthStats + * + * @param threadPoolInfo the {@link ThreadPoolInfo} of the internal Query Insights threadPool + * @param queryRecordsQueueSize The generic Query Record Queue size + * @param topQueriesHealthStats Top Queries health stats + */ + public QueryInsightsHealthStats( + final ThreadPool.Info threadPoolInfo, + final int queryRecordsQueueSize, + final Map topQueriesHealthStats + ) { + if (threadPoolInfo == null || topQueriesHealthStats == null) { + throw new IllegalArgumentException("Parameters cannot be null"); + } + this.threadPoolInfo = threadPoolInfo; + this.queryRecordsQueueSize = queryRecordsQueueSize; + this.topQueriesHealthStats = topQueriesHealthStats; + } + + /** + * Write QueryInsightsHealthStats object to XContent + * + * @param builder XContentBuilder + * @param params Parameters for build xContent + * @return XContentBuilder + * @throws IOException if an I/O error occurs + */ + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + // Write thread pool info object + builder.startObject(THREAD_POOL_INFO); + threadPoolInfo.toXContent(builder, params); + builder.endObject(); + // Write query records queue size + builder.field(QUERY_RECORDS_QUEUE_SIZE, queryRecordsQueueSize); + // Write Top Queries health stats object + builder.startObject(TOP_QUERIES_HEALTH_STATS); + for (Map.Entry entry : topQueriesHealthStats.entrySet()) { + builder.startObject(entry.getKey().toString()); + entry.getValue().toXContent(builder, params); + builder.endObject(); + } + builder.endObject(); + return builder; + } + + /** + * Write QueryInsightsHealthStats Object to output stream + * + * @param out streamOutput + * @throws IOException if an I/O error occurs + */ + @Override + public void writeTo(final StreamOutput out) throws IOException { + threadPoolInfo.writeTo(out); + out.writeInt(queryRecordsQueueSize); + out.writeMap( + topQueriesHealthStats, + MetricType::writeTo, + (streamOutput, topQueriesHealthStats) -> topQueriesHealthStats.writeTo(out) + ); + } + + /** + * Get the thread pool info. + * + * @return the thread pool info + */ + public ThreadPool.Info getThreadPoolInfo() { + return threadPoolInfo; + } + + /** + * Get the query records queue size. + * + * @return the query records queue size + */ + public int getQueryRecordsQueueSize() { + return queryRecordsQueueSize; + } + + /** + * Get the top queries health stats. + * + * @return the top queries health stats + */ + public Map getTopQueriesHealthStats() { + return topQueriesHealthStats; + } +} diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/TopQueriesHealthStats.java b/src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/TopQueriesHealthStats.java new file mode 100644 index 0000000..8fcba56 --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/TopQueriesHealthStats.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model.healthStats; + +import java.io.IOException; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +/** + * Represents the health statistics of top queries, including heap sizes and query grouper health stats. + */ +public class TopQueriesHealthStats implements ToXContentFragment, Writeable { + private final int topQueriesHeapSize; + private final QueryGrouperHealthStats queryGrouperHealthStats; + private static final String TOP_QUERIES_HEAP_SIZE = "TopQueriesHeapSize"; + + /** + * Constructor to read TopQueriesHealthStats from a StreamInput. + * + * @param in the StreamInput to read the TopQueriesHealthStats from + * @throws IOException if an I/O error occurs + */ + public TopQueriesHealthStats(final StreamInput in) throws IOException { + this.topQueriesHeapSize = in.readInt(); + this.queryGrouperHealthStats = new QueryGrouperHealthStats(in); + } + + /** + * Constructor of TopQueriesHealthStats + * + * @param topQueriesHeapSize Top Queries heap size + * @param queryGrouperHealthStats Health stats for query grouper + */ + public TopQueriesHealthStats(final int topQueriesHeapSize, final QueryGrouperHealthStats queryGrouperHealthStats) { + this.topQueriesHeapSize = topQueriesHeapSize; + this.queryGrouperHealthStats = queryGrouperHealthStats; + } + + /** + * Write TopQueriesHealthStats Object to output stream + * + * @param out streamOutput + * @throws IOException if an I/O error occurs + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeInt(topQueriesHeapSize); + queryGrouperHealthStats.writeTo(out); + } + + /** + * Write TopQueriesHealthStats object to XContent + * + * @param builder XContentBuilder + * @param params Parameters + * @return XContentBuilder + * @throws IOException if an I/O error occurs + */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(TOP_QUERIES_HEAP_SIZE, topQueriesHeapSize); + queryGrouperHealthStats.toXContent(builder, params); + return builder; + } + + /** + * Gets the top queries heap size. + * + * @return the top queries heap size + */ + public int getTopQueriesHeapSize() { + return topQueriesHeapSize; + } + + /** + * Gets the query grouper health stats. + * + * @return the query grouper health stats + */ + public QueryGrouperHealthStats getQueryGrouperHealthStats() { + return queryGrouperHealthStats; + } +} diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/package-info.java b/src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/package-info.java new file mode 100644 index 0000000..d8034cc --- /dev/null +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/healthStats/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * health stats models + */ +package org.opensearch.plugin.insights.rules.model.healthStats; diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java index cff48d6..274257a 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/QueryInsightsServiceTests.java @@ -12,25 +12,32 @@ import static org.mockito.Mockito.spy; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.opensearch.client.Client; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.rules.model.healthStats.QueryInsightsHealthStats; +import org.opensearch.plugin.insights.rules.model.healthStats.TopQueriesHealthStats; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ScalingExecutorBuilder; +import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; /** * Unit Tests for {@link QueryInsightsService}. */ public class QueryInsightsServiceTests extends OpenSearchTestCase { - private final ThreadPool threadPool = mock(ThreadPool.class); + private ThreadPool threadPool; private final Client client = mock(Client.class); private final NamedXContentRegistry namedXContentRegistry = mock(NamedXContentRegistry.class); private QueryInsightsService queryInsightsService; @@ -42,6 +49,10 @@ public void setup() { Settings settings = settingsBuilder.build(); ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); QueryInsightsTestUtils.registerAllQueryInsightsSettings(clusterSettings); + this.threadPool = new TestThreadPool( + "QueryInsightsHealthStatsTests", + new ScalingExecutorBuilder(QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR, 1, 5, TimeValue.timeValueMinutes(5)) + ); queryInsightsService = new QueryInsightsService( clusterSettings, threadPool, @@ -55,6 +66,12 @@ public void setup() { queryInsightsServiceSpy = spy(queryInsightsService); } + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + public void testAddRecordToLimitAndDrain() { SearchQueryRecord record = QueryInsightsTestUtils.generateQueryInsightRecords(1, 1, System.currentTimeMillis(), 0).get(0); for (int i = 0; i < QueryInsightsSettings.QUERY_RECORD_QUEUE_CAPACITY; i++) { @@ -164,4 +181,19 @@ public void testAddRecordGroupBySimilarityWithTwoGroups() { queryInsightsService.drainRecords(); assertEquals(2, queryInsightsService.getTopQueriesService(MetricType.LATENCY).getTopQueriesRecords(false, null, null).size()); } + + public void testGetHealthStats() { + List records = QueryInsightsTestUtils.generateQueryInsightRecords(2); + queryInsightsService.addRecord(records.get(0)); + QueryInsightsHealthStats healthStats = queryInsightsService.getHealthStats(); + assertNotNull(healthStats); + assertEquals(threadPool.info(QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR), healthStats.getThreadPoolInfo()); + assertEquals(1, healthStats.getQueryRecordsQueueSize()); + Map topQueriesHealthStatsMap = healthStats.getTopQueriesHealthStats(); + assertNotNull(topQueriesHealthStatsMap); + assertEquals(3, topQueriesHealthStatsMap.size()); + assertTrue(topQueriesHealthStatsMap.containsKey(MetricType.LATENCY)); + assertTrue(topQueriesHealthStatsMap.containsKey(MetricType.CPU)); + assertTrue(topQueriesHealthStatsMap.containsKey(MetricType.MEMORY)); + } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java index 58854f9..5ec6f64 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/TopQueriesServiceTests.java @@ -21,6 +21,7 @@ import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.rules.model.healthStats.TopQueriesHealthStats; import org.opensearch.plugin.insights.settings.QueryInsightsSettings; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; @@ -146,4 +147,25 @@ public void testRollingWindowsWithDifferentGroup() { topQueriesService.consumeRecords(records); assertEquals(1, topQueriesService.getTopQueriesRecords(true, null, null).size()); } + + public void testGetHealthStats_EmptyService() { + TopQueriesHealthStats healthStats = topQueriesService.getHealthStats(); + // Validate the health stats + assertNotNull(healthStats); + assertEquals(0, healthStats.getTopQueriesHeapSize()); + assertNotNull(healthStats.getQueryGrouperHealthStats()); + assertEquals(0, healthStats.getQueryGrouperHealthStats().getQueryGroupCount()); + assertEquals(0, healthStats.getQueryGrouperHealthStats().getQueryGroupHeapSize()); + } + + public void testGetHealthStats_WithData() { + List records = QueryInsightsTestUtils.generateQueryInsightRecords(2); + topQueriesService.consumeRecords(records); + TopQueriesHealthStats healthStats = topQueriesService.getHealthStats(); + assertNotNull(healthStats); + assertEquals(2, healthStats.getTopQueriesHeapSize()); // Since we added two records + assertNotNull(healthStats.getQueryGrouperHealthStats()); + // Assuming no grouping by default, expect QueryGroupCount to be 0 + assertEquals(0, healthStats.getQueryGrouperHealthStats().getQueryGroupCount()); + } } diff --git a/src/test/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouperTests.java b/src/test/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouperTests.java index f92f49a..ec7ea30 100644 --- a/src/test/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouperTests.java +++ b/src/test/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouperTests.java @@ -19,6 +19,7 @@ import org.opensearch.plugin.insights.rules.model.GroupingType; import org.opensearch.plugin.insights.rules.model.MetricType; import org.opensearch.plugin.insights.rules.model.SearchQueryRecord; +import org.opensearch.plugin.insights.rules.model.healthStats.QueryGrouperHealthStats; import org.opensearch.test.OpenSearchTestCase; /** @@ -679,6 +680,23 @@ public void testMaxGroupLimitReached() { assertEquals(2, minMaxHeapQueryGrouper.numberOfGroups()); } + public void testGetHealthStatsWithEmptyGrouper() { + QueryGrouperHealthStats healthStats = minMaxHeapQueryGrouper.getHealthStats(); + // Expecting 0 group count and 0 heap size since no groups have been added + assertEquals(0, healthStats.getQueryGroupCount()); + assertEquals(0, healthStats.getQueryGroupHeapSize()); + } + + public void testGetHealthStatsWithGroups() { + List records = QueryInsightsTestUtils.generateQueryInsightRecords(2); + minMaxHeapQueryGrouper.add(records.get(0)); + minMaxHeapQueryGrouper.add(records.get(1)); + QueryGrouperHealthStats healthStats = minMaxHeapQueryGrouper.getHealthStats(); + // Verify that group count stats reflect the correct number of total groups + assertEquals(2, healthStats.getQueryGroupCount()); + assertEquals(0, healthStats.getQueryGroupHeapSize()); + } + private MinMaxHeapQueryGrouper getQueryGroupingService(AggregationType aggregationType, int topNSize) { return new MinMaxHeapQueryGrouper(MetricType.LATENCY, GroupingType.SIMILARITY, aggregationType, topQueriesStore, topNSize); } diff --git a/src/test/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryGrouperHealthStatsTests.java b/src/test/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryGrouperHealthStatsTests.java new file mode 100644 index 0000000..cd03a71 --- /dev/null +++ b/src/test/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryGrouperHealthStatsTests.java @@ -0,0 +1,59 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model.healthStats; + +import java.io.IOException; +import java.util.Locale; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; + +/** + * Unit tests for the {@link QueryGrouperHealthStats} class. + */ +public class QueryGrouperHealthStatsTests extends OpenSearchTestCase { + private final int queryGroupCount = 10; + private final int queryGroupHeapSize = 5; + + public void testConstructorAndGetters() { + QueryGrouperHealthStats stats = new QueryGrouperHealthStats(queryGroupCount, queryGroupHeapSize); + assertEquals(queryGroupCount, stats.getQueryGroupCount()); + assertEquals(queryGroupHeapSize, stats.getQueryGroupHeapSize()); + } + + public void testSerialization() throws IOException { + QueryGrouperHealthStats stats = new QueryGrouperHealthStats(queryGroupCount, queryGroupHeapSize); + // Write to StreamOutput + BytesStreamOutput out = new BytesStreamOutput(); + stats.writeTo(out); + // Read from StreamInput + StreamInput in = StreamInput.wrap(out.bytes().toBytesRef().bytes); + QueryGrouperHealthStats deserializedStats = new QueryGrouperHealthStats(in); + assertEquals(stats.getQueryGroupCount(), deserializedStats.getQueryGroupCount()); + assertEquals(stats.getQueryGroupHeapSize(), deserializedStats.getQueryGroupHeapSize()); + } + + public void testToXContent() throws IOException { + QueryGrouperHealthStats stats = new QueryGrouperHealthStats(queryGroupCount, queryGroupHeapSize); + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + stats.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + String expectedJson = String.format( + Locale.ROOT, + "{\"QueryGroupCount_Total\":%d,\"QueryGroupCount_MaxHeap\":%d}", + queryGroupCount, + queryGroupHeapSize + ); + assertEquals(expectedJson, builder.toString()); + } +} diff --git a/src/test/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryInsightsHealthStatsTests.java b/src/test/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryInsightsHealthStatsTests.java new file mode 100644 index 0000000..ac4e822 --- /dev/null +++ b/src/test/java/org/opensearch/plugin/insights/rules/model/healthStats/QueryInsightsHealthStatsTests.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model.healthStats; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.plugin.insights.rules.model.MetricType; +import org.opensearch.plugin.insights.settings.QueryInsightsSettings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ScalingExecutorBuilder; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +/** + * Unit tests for the {@link QueryInsightsHealthStats} class. + */ +public class QueryInsightsHealthStatsTests extends OpenSearchTestCase { + private ThreadPool threadPool; + private ThreadPool.Info threadPoolInfo; + private int queryRecordsQueueSize; + private Map topQueriesHealthStats; + + @Before + public void setUpQueryInsightsHealthStats() { + this.threadPool = new TestThreadPool( + "QueryInsightsHealthStatsTests", + new ScalingExecutorBuilder(QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR, 1, 5, TimeValue.timeValueMinutes(5)) + ); + threadPoolInfo = threadPool.info(QueryInsightsSettings.QUERY_INSIGHTS_EXECUTOR); + queryRecordsQueueSize = 100; + topQueriesHealthStats = new HashMap<>(); + topQueriesHealthStats.put(MetricType.LATENCY, new TopQueriesHealthStats(10, new QueryGrouperHealthStats(20, 15))); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + public void testConstructorAndGetters() { + QueryInsightsHealthStats healthStats = new QueryInsightsHealthStats(threadPoolInfo, queryRecordsQueueSize, topQueriesHealthStats); + assertNotNull(healthStats); + assertEquals(threadPoolInfo, healthStats.getThreadPoolInfo()); + assertEquals(queryRecordsQueueSize, healthStats.getQueryRecordsQueueSize()); + assertEquals(topQueriesHealthStats, healthStats.getTopQueriesHealthStats()); + } + + public void testSerialization() throws IOException { + QueryInsightsHealthStats healthStats = new QueryInsightsHealthStats(threadPoolInfo, queryRecordsQueueSize, topQueriesHealthStats); + // Write to StreamOutput + BytesStreamOutput out = new BytesStreamOutput(); + healthStats.writeTo(out); + // Read from StreamInput + StreamInput in = StreamInput.wrap(out.bytes().toBytesRef().bytes); + QueryInsightsHealthStats deserializedHealthStats = new QueryInsightsHealthStats(in); + assertEquals(healthStats.getQueryRecordsQueueSize(), deserializedHealthStats.getQueryRecordsQueueSize()); + assertNotNull(deserializedHealthStats.getThreadPoolInfo()); + assertNotNull(deserializedHealthStats.getTopQueriesHealthStats()); + } + + public void testToXContent() throws IOException { + QueryInsightsHealthStats healthStats = new QueryInsightsHealthStats(threadPoolInfo, queryRecordsQueueSize, topQueriesHealthStats); + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + + healthStats.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + String jsonOutput = builder.prettyPrint().toString(); + // Expected JSON output + String expectedJson = "{\n" + + " \"ThreadPoolInfo\": {\n" + + " \"query_insights_executor\": {\n" + + " \"type\": \"scaling\",\n" + + " \"core\": 1,\n" + + " \"max\": 5,\n" + + " \"keep_alive\": \"5m\",\n" + + " \"queue_size\": -1\n" + + " }\n" + + " },\n" + + " \"QueryRecordsQueueSize\": 100,\n" + + " \"TopQueriesHealthStats\": {\n" + + " \"latency\": {\n" + + " \"TopQueriesHeapSize\": 10,\n" + + " \"QueryGroupCount_Total\": 20,\n" + + " \"QueryGroupCount_MaxHeap\": 15\n" + + " }\n" + + " }\n" + + "}"; + assertEquals(expectedJson.replaceAll("\\s", ""), jsonOutput.replaceAll("\\s", "")); + } +} diff --git a/src/test/java/org/opensearch/plugin/insights/rules/model/healthStats/TopQueriesHealthStatsTests.java b/src/test/java/org/opensearch/plugin/insights/rules/model/healthStats/TopQueriesHealthStatsTests.java new file mode 100644 index 0000000..4a426e4 --- /dev/null +++ b/src/test/java/org/opensearch/plugin/insights/rules/model/healthStats/TopQueriesHealthStatsTests.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.insights.rules.model.healthStats; + +import java.io.IOException; +import java.util.Locale; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; + +/** + * Unit tests for the {@link TopQueriesHealthStats} class. + */ +public class TopQueriesHealthStatsTests extends OpenSearchTestCase { + private final int topQueriesHeapSize = 15; + private final QueryGrouperHealthStats queryGrouperHealthStats = new QueryGrouperHealthStats(10, 5); + + public void testConstructorAndGetters() { + TopQueriesHealthStats healthStats = new TopQueriesHealthStats(topQueriesHeapSize, queryGrouperHealthStats); + assertEquals(topQueriesHeapSize, healthStats.getTopQueriesHeapSize()); + assertEquals(queryGrouperHealthStats, healthStats.getQueryGrouperHealthStats()); + } + + public void testSerialization() throws IOException { + TopQueriesHealthStats healthStats = new TopQueriesHealthStats(topQueriesHeapSize, queryGrouperHealthStats); + // Write to StreamOutput + BytesStreamOutput out = new BytesStreamOutput(); + healthStats.writeTo(out); + // Read from StreamInput + StreamInput in = StreamInput.wrap(out.bytes().toBytesRef().bytes); + TopQueriesHealthStats deserializedHealthStats = new TopQueriesHealthStats(in); + assertEquals(healthStats.getTopQueriesHeapSize(), deserializedHealthStats.getTopQueriesHeapSize()); + assertNotNull(deserializedHealthStats.getQueryGrouperHealthStats()); + assertEquals( + healthStats.getQueryGrouperHealthStats().getQueryGroupCount(), + deserializedHealthStats.getQueryGrouperHealthStats().getQueryGroupCount() + ); + assertEquals( + healthStats.getQueryGrouperHealthStats().getQueryGroupHeapSize(), + deserializedHealthStats.getQueryGrouperHealthStats().getQueryGroupHeapSize() + ); + } + + public void testToXContent() throws IOException { + TopQueriesHealthStats healthStats = new TopQueriesHealthStats(topQueriesHeapSize, queryGrouperHealthStats); + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + healthStats.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + String expectedJson = String.format( + Locale.ROOT, + "{\"TopQueriesHeapSize\":%d,\"QueryGroupCount_Total\":%d,\"QueryGroupCount_MaxHeap\":%d}", + topQueriesHeapSize, + queryGrouperHealthStats.getQueryGroupCount(), + queryGrouperHealthStats.getQueryGroupHeapSize() + ); + assertEquals(expectedJson, builder.toString()); + } +}