Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Query grouping framework for Top N queries and group by query similarity #86

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS,
QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY,
QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.plugin.insights.core.listener;

import static org.opensearch.plugin.insights.settings.QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNEnabledSetting;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNSizeSetting;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNWindowSizeSetting;
Expand All @@ -31,7 +33,9 @@
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.Measurement;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.tasks.Task;
Expand Down Expand Up @@ -101,6 +105,26 @@ public QueryInsightsListener(
this.queryInsightsService.setWindowSize(type, clusterService.getClusterSettings().get(getTopNWindowSizeSetting(type)));
}

// Settings endpoints set for grouping top n queries
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_QUERIES_GROUP_BY,
v -> this.queryInsightsService.setGrouping(v),
v -> this.queryInsightsService.validateGrouping(v)
);
this.queryInsightsService.validateGrouping(clusterService.getClusterSettings().get(TOP_N_QUERIES_GROUP_BY));
this.queryInsightsService.setGrouping(clusterService.getClusterSettings().get(TOP_N_QUERIES_GROUP_BY));

clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N,
v -> this.queryInsightsService.setMaximumGroups(v),
v -> this.queryInsightsService.validateMaximumGroups(v)
);
this.queryInsightsService.validateMaximumGroups(clusterService.getClusterSettings().get(TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N));
this.queryInsightsService.setMaximumGroups(clusterService.getClusterSettings().get(TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N));

// Settings endpoints set for search query metrics
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, v -> setSearchQueryMetricsEnabled(v));
setSearchQueryMetricsEnabled(clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING));
Expand Down Expand Up @@ -191,32 +215,40 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final

final SearchRequest request = context.getRequest();
try {
Map<MetricType, Number> measurements = new HashMap<>();
Map<MetricType, Measurement> measurements = new HashMap<>();
if (shouldCollect(MetricType.LATENCY)) {
measurements.put(
MetricType.LATENCY,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
new Measurement(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()))
);
}
if (shouldCollect(MetricType.CPU)) {
measurements.put(
MetricType.CPU,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum()
new Measurement(
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum()
)
);
}
if (shouldCollect(MetricType.MEMORY)) {
measurements.put(
MetricType.MEMORY,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum()
new Measurement(
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum()
)
);
}

String hashcode = QueryShapeGenerator.getShapeHashCodeAsString(request.source(), false);

Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.SOURCE, request.source());
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
attributes.put(Attribute.TASK_RESOURCE_USAGES, tasksResourceUsages);
attributes.put(Attribute.QUERY_HASHCODE, hashcode);

Map<String, Object> labels = new HashMap<>();
// Retrieve user provided label if exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.plugin.insights.core.service;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;

import java.io.IOException;
Expand All @@ -27,6 +28,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer;
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
Expand Down Expand Up @@ -73,6 +75,11 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
*/
final QueryInsightsExporterFactory queryInsightsExporterFactory;

/**
* Flags for enabling insight data grouping for different metric types
*/
private GroupingType groupingType;

private volatile boolean searchQueryMetricsEnabled;

private SearchQueryCategorizer searchQueryCategorizer;
Expand Down Expand Up @@ -112,16 +119,17 @@ public QueryInsightsService(

this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
this.enableSearchQueryMetricsFeature(false);
this.groupingType = DEFAULT_GROUPING_TYPE;
}

/**
* Ingest the query data into in-memory stores
*
* @param record the record to ingest
* @return SearchQueryRecord
* @return true/false
*/
public boolean addRecord(final SearchQueryRecord record) {
boolean shouldAdd = searchQueryMetricsEnabled;
boolean shouldAdd = isSearchQueryMetricsFeatureEnabled() || isGroupingEnabled();
if (!shouldAdd) {
for (Map.Entry<MetricType, TopQueriesService> entry : topQueriesServices.entrySet()) {
if (!enableCollect.get(entry.getKey())) {
Expand Down Expand Up @@ -185,6 +193,67 @@ public void enableCollection(final MetricType metricType, final boolean enable)
this.topQueriesServices.get(metricType).setEnabled(enable);
}

/**
* Validate grouping given grouping type setting
* @param groupingTypeSetting grouping setting
*/
public void validateGrouping(final String groupingTypeSetting) {
GroupingType.getGroupingTypeFromSettingAndValidate(groupingTypeSetting);
}

/**
* Set grouping
* @param groupingTypeSetting grouping
*/
public void setGrouping(final String groupingTypeSetting) {
GroupingType newGroupingType = GroupingType.getGroupingTypeFromSettingAndValidate(groupingTypeSetting);
GroupingType oldGroupingType = groupingType;

if (oldGroupingType != newGroupingType) {
groupingType = newGroupingType;

for (MetricType metricType : MetricType.allMetricTypes()) {
this.topQueriesServices.get(metricType).setGrouping(newGroupingType);
}
}
}

/**
* Set max number of groups
* @param maxGroups maximum number of groups that should be tracked when calculating Top N groups
*/
public void setMaximumGroups(final int maxGroups) {
for (MetricType metricType : MetricType.allMetricTypes()) {
this.topQueriesServices.get(metricType).setMaxGroups(maxGroups);
}
}

/**
* Validate max number of groups. Should be between 1 and MAX_GROUPS_LIMIT
* @param maxGroups maximum number of groups that should be tracked when calculating Top N groups
*/
public void validateMaximumGroups(final int maxGroups) {
if (maxGroups < 0 || maxGroups > QueryInsightsSettings.MAX_GROUPS_EXCLUDING_TOPN_LIMIT) {
throw new IllegalArgumentException(
"Max groups setting"
+ " should be between 0 and "
+ QueryInsightsSettings.MAX_GROUPS_EXCLUDING_TOPN_LIMIT
+ ", was ("
+ maxGroups
+ ")"
);
}
}

/**
* Get the grouping type based on the metricType
* @return GroupingType
*/

public GroupingType getGrouping() {
return groupingType;
}

/**
* Get if the Query Insights data collection is enabled for a MetricType
*
Expand Down Expand Up @@ -226,9 +295,18 @@ public boolean isSearchQueryMetricsFeatureEnabled() {
return this.searchQueryMetricsEnabled;
}

/**
* Is grouping feature enabled and TopN feature enabled
* @return boolean
*/
public boolean isGroupingEnabled() {
return this.groupingType != GroupingType.NONE && isTopNFeatureEnabled();
}

/**
* Enable/Disable search query metrics feature.
* @param enable enable/disable search query metrics feature
* Stops query insights service if no features enabled
*/
public void enableSearchQueryMetricsFeature(boolean enable) {
searchQueryMetricsEnabled = enable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.PriorityQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -35,6 +35,10 @@
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.core.exporter.SinkType;
import org.opensearch.plugin.insights.core.service.grouper.MinMaxHeapQueryGrouper;
import org.opensearch.plugin.insights.core.service.grouper.QueryGrouper;
import org.opensearch.plugin.insights.rules.model.AggregationType;
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
Expand Down Expand Up @@ -66,7 +70,7 @@ public class TopQueriesService {
/**
* The internal thread-safe store that holds the top n queries insight data
*/
private final PriorityQueue<SearchQueryRecord> topQueriesStore;
private final PriorityBlockingQueue<SearchQueryRecord> topQueriesStore;

/**
* The AtomicReference of a snapshot of the current window top queries for getters to consume
Expand All @@ -93,6 +97,8 @@ public class TopQueriesService {
*/
private QueryInsightsExporter exporter;

private QueryGrouper queryGrouper;

TopQueriesService(
final MetricType metricType,
final ThreadPool threadPool,
Expand All @@ -106,9 +112,16 @@ public class TopQueriesService {
this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE;
this.windowStart = -1L;
this.exporter = null;
topQueriesStore = new PriorityQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType));
topQueriesStore = new PriorityBlockingQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType));
topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>());
topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>());
queryGrouper = new MinMaxHeapQueryGrouper(
metricType,
QueryInsightsSettings.DEFAULT_GROUPING_TYPE,
AggregationType.AVERAGE,
topQueriesStore,
topNSize
);
}

/**
Expand All @@ -118,6 +131,7 @@ public class TopQueriesService {
*/
public void setTopNSize(final int topNSize) {
this.topNSize = topNSize;
this.queryGrouper.updateTopNSize(topNSize);
}

/**
Expand Down Expand Up @@ -169,6 +183,20 @@ public void setWindowSize(final TimeValue windowSize) {
this.windowStart = -1L;
}

public void setGrouping(final GroupingType groupingType) {
boolean changed = queryGrouper.setGroupingType(groupingType);
if (changed) {
drain();
}
}

public void setMaxGroups(final int maxGroups) {
boolean changed = queryGrouper.setMaxGroups(maxGroups);
if (changed) {
drain();
}
}

/**
* Validate if the window size is valid, based on internal constrains.
*
Expand Down Expand Up @@ -306,10 +334,16 @@ void consumeRecords(final List<SearchQueryRecord> records) {
}

private void addToTopNStore(final List<SearchQueryRecord> records) {
topQueriesStore.addAll(records);
// remove top elements for fix sizing priority queue
while (topQueriesStore.size() > topNSize) {
topQueriesStore.poll();
if (queryGrouper.getGroupingType() != GroupingType.NONE) {
for (SearchQueryRecord record : records) {
queryGrouper.add(record);
}
} else {
topQueriesStore.addAll(records);
// remove top elements for fix sizing priority queue
while (topQueriesStore.size() > topNSize) {
topQueriesStore.poll();
}
}
}

Expand All @@ -329,6 +363,9 @@ private void rotateWindowIfNecessary(final long newWindowStart) {
}
topQueriesHistorySnapshot.set(history);
topQueriesStore.clear();
if (queryGrouper.getGroupingType() != GroupingType.NONE) {
queryGrouper.drain();
}
topQueriesCurrentSnapshot.set(new ArrayList<>());
windowStart = newWindowStart;
// export to the configured sink
Expand Down Expand Up @@ -368,4 +405,13 @@ public List<SearchQueryRecord> getTopQueriesCurrentSnapshot() {
public void close() throws IOException {
queryInsightsExporterFactory.closeExporter(this.exporter);
}

/**
* Drain internal stores.
*/
private void drain() {
topQueriesStore.clear();
topQueriesHistorySnapshot.set(new ArrayList<>());
topQueriesCurrentSnapshot.set(new ArrayList<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public static MurmurHash3.Hash128 getShapeHashCode(SearchSourceBuilder source, B
return MurmurHash3.hash128(shapeBytes.bytes, 0, shapeBytes.length, 0, new MurmurHash3.Hash128());
}

public static String getShapeHashCodeAsString(SearchSourceBuilder source, Boolean showFields) {
MurmurHash3.Hash128 hashcode = getShapeHashCode(source, showFields);
String hashAsString = Long.toHexString(hashcode.h1) + Long.toHexString(hashcode.h2);
return hashAsString;
}

/**
* Method to build search query shape given a source
* @param source search request source
Expand Down
Loading
Loading