Skip to content

Commit

Permalink
Query grouping framework for Top N queries and group by query similar…
Browse files Browse the repository at this point in the history
…ity (#66) (#86)

(cherry picked from commit 65e4489)

Signed-off-by: Siddhant Deshmukh <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent f94be16 commit 9c142a7
Show file tree
Hide file tree
Showing 23 changed files with 1,808 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ public List<Setting<?>> getSettings() {
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_QUERIES_WINDOW_SIZE,
QueryInsightsSettings.TOP_N_MEMORY_EXPORTER_SETTINGS,
QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY,
QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N,
QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.plugin.insights.core.listener;

import static org.opensearch.plugin.insights.settings.QueryCategorizationSettings.SEARCH_QUERY_METRICS_ENABLED_SETTING;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_GROUP_BY;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNEnabledSetting;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNSizeSetting;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getTopNWindowSizeSetting;
Expand All @@ -31,7 +33,9 @@
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.core.service.QueryInsightsService;
import org.opensearch.plugin.insights.core.service.categorizer.QueryShapeGenerator;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.Measurement;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.tasks.Task;
Expand Down Expand Up @@ -101,6 +105,26 @@ public QueryInsightsListener(
this.queryInsightsService.setWindowSize(type, clusterService.getClusterSettings().get(getTopNWindowSizeSetting(type)));
}

// Settings endpoints set for grouping top n queries
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_QUERIES_GROUP_BY,
v -> this.queryInsightsService.setGrouping(v),
v -> this.queryInsightsService.validateGrouping(v)
);
this.queryInsightsService.validateGrouping(clusterService.getClusterSettings().get(TOP_N_QUERIES_GROUP_BY));
this.queryInsightsService.setGrouping(clusterService.getClusterSettings().get(TOP_N_QUERIES_GROUP_BY));

clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N,
v -> this.queryInsightsService.setMaximumGroups(v),
v -> this.queryInsightsService.validateMaximumGroups(v)
);
this.queryInsightsService.validateMaximumGroups(clusterService.getClusterSettings().get(TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N));
this.queryInsightsService.setMaximumGroups(clusterService.getClusterSettings().get(TOP_N_QUERIES_MAX_GROUPS_EXCLUDING_N));

// Settings endpoints set for search query metrics
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SEARCH_QUERY_METRICS_ENABLED_SETTING, v -> setSearchQueryMetricsEnabled(v));
setSearchQueryMetricsEnabled(clusterService.getClusterSettings().get(SEARCH_QUERY_METRICS_ENABLED_SETTING));
Expand Down Expand Up @@ -191,32 +215,40 @@ private void constructSearchQueryRecord(final SearchPhaseContext context, final

final SearchRequest request = context.getRequest();
try {
Map<MetricType, Number> measurements = new HashMap<>();
Map<MetricType, Measurement> measurements = new HashMap<>();
if (shouldCollect(MetricType.LATENCY)) {
measurements.put(
MetricType.LATENCY,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos())
new Measurement(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchRequestContext.getAbsoluteStartNanos()))
);
}
if (shouldCollect(MetricType.CPU)) {
measurements.put(
MetricType.CPU,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum()
new Measurement(
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getCpuTimeInNanos()).mapToLong(Long::longValue).sum()
)
);
}
if (shouldCollect(MetricType.MEMORY)) {
measurements.put(
MetricType.MEMORY,
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum()
new Measurement(
tasksResourceUsages.stream().map(a -> a.getTaskResourceUsage().getMemoryInBytes()).mapToLong(Long::longValue).sum()
)
);
}

String hashcode = QueryShapeGenerator.getShapeHashCodeAsString(request.source(), false);

Map<Attribute, Object> attributes = new HashMap<>();
attributes.put(Attribute.SEARCH_TYPE, request.searchType().toString().toLowerCase(Locale.ROOT));
attributes.put(Attribute.SOURCE, request.source());
attributes.put(Attribute.TOTAL_SHARDS, context.getNumShards());
attributes.put(Attribute.INDICES, request.indices());
attributes.put(Attribute.PHASE_LATENCY_MAP, searchRequestContext.phaseTookMap());
attributes.put(Attribute.TASK_RESOURCE_USAGES, tasksResourceUsages);
attributes.put(Attribute.QUERY_HASHCODE, hashcode);

Map<String, Object> labels = new HashMap<>();
// Retrieve user provided label if exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.plugin.insights.core.service;

import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.DEFAULT_GROUPING_TYPE;
import static org.opensearch.plugin.insights.settings.QueryInsightsSettings.getExporterSettings;

import java.io.IOException;
Expand All @@ -27,6 +28,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.core.service.categorizer.SearchQueryCategorizer;
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
Expand Down Expand Up @@ -73,6 +75,11 @@ public class QueryInsightsService extends AbstractLifecycleComponent {
*/
final QueryInsightsExporterFactory queryInsightsExporterFactory;

/**
* Flags for enabling insight data grouping for different metric types
*/
private GroupingType groupingType;

private volatile boolean searchQueryMetricsEnabled;

private SearchQueryCategorizer searchQueryCategorizer;
Expand Down Expand Up @@ -112,16 +119,17 @@ public QueryInsightsService(

this.searchQueryCategorizer = SearchQueryCategorizer.getInstance(metricsRegistry);
this.enableSearchQueryMetricsFeature(false);
this.groupingType = DEFAULT_GROUPING_TYPE;
}

/**
* Ingest the query data into in-memory stores
*
* @param record the record to ingest
* @return SearchQueryRecord
* @return true/false
*/
public boolean addRecord(final SearchQueryRecord record) {
boolean shouldAdd = searchQueryMetricsEnabled;
boolean shouldAdd = isSearchQueryMetricsFeatureEnabled() || isGroupingEnabled();
if (!shouldAdd) {
for (Map.Entry<MetricType, TopQueriesService> entry : topQueriesServices.entrySet()) {
if (!enableCollect.get(entry.getKey())) {
Expand Down Expand Up @@ -185,6 +193,67 @@ public void enableCollection(final MetricType metricType, final boolean enable)
this.topQueriesServices.get(metricType).setEnabled(enable);
}

/**
* Validate grouping given grouping type setting
* @param groupingTypeSetting grouping setting
*/
public void validateGrouping(final String groupingTypeSetting) {
GroupingType.getGroupingTypeFromSettingAndValidate(groupingTypeSetting);
}

/**
* Set grouping
* @param groupingTypeSetting grouping
*/
public void setGrouping(final String groupingTypeSetting) {
GroupingType newGroupingType = GroupingType.getGroupingTypeFromSettingAndValidate(groupingTypeSetting);
GroupingType oldGroupingType = groupingType;

if (oldGroupingType != newGroupingType) {
groupingType = newGroupingType;

for (MetricType metricType : MetricType.allMetricTypes()) {
this.topQueriesServices.get(metricType).setGrouping(newGroupingType);
}
}
}

/**
* Set max number of groups
* @param maxGroups maximum number of groups that should be tracked when calculating Top N groups
*/
public void setMaximumGroups(final int maxGroups) {
for (MetricType metricType : MetricType.allMetricTypes()) {
this.topQueriesServices.get(metricType).setMaxGroups(maxGroups);
}
}

/**
* Validate max number of groups. Should be between 1 and MAX_GROUPS_LIMIT
* @param maxGroups maximum number of groups that should be tracked when calculating Top N groups
*/
public void validateMaximumGroups(final int maxGroups) {
if (maxGroups < 0 || maxGroups > QueryInsightsSettings.MAX_GROUPS_EXCLUDING_TOPN_LIMIT) {
throw new IllegalArgumentException(
"Max groups setting"
+ " should be between 0 and "
+ QueryInsightsSettings.MAX_GROUPS_EXCLUDING_TOPN_LIMIT
+ ", was ("
+ maxGroups
+ ")"
);
}
}

/**
* Get the grouping type based on the metricType
* @return GroupingType
*/

public GroupingType getGrouping() {
return groupingType;
}

/**
* Get if the Query Insights data collection is enabled for a MetricType
*
Expand Down Expand Up @@ -226,9 +295,18 @@ public boolean isSearchQueryMetricsFeatureEnabled() {
return this.searchQueryMetricsEnabled;
}

/**
* Is grouping feature enabled and TopN feature enabled
* @return boolean
*/
public boolean isGroupingEnabled() {
return this.groupingType != GroupingType.NONE && isTopNFeatureEnabled();
}

/**
* Enable/Disable search query metrics feature.
* @param enable enable/disable search query metrics feature
* Stops query insights service if no features enabled
*/
public void enableSearchQueryMetricsFeature(boolean enable) {
searchQueryMetricsEnabled = enable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.PriorityQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -35,6 +35,10 @@
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporter;
import org.opensearch.plugin.insights.core.exporter.QueryInsightsExporterFactory;
import org.opensearch.plugin.insights.core.exporter.SinkType;
import org.opensearch.plugin.insights.core.service.grouper.MinMaxHeapQueryGrouper;
import org.opensearch.plugin.insights.core.service.grouper.QueryGrouper;
import org.opensearch.plugin.insights.rules.model.AggregationType;
import org.opensearch.plugin.insights.rules.model.GroupingType;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
Expand Down Expand Up @@ -66,7 +70,7 @@ public class TopQueriesService {
/**
* The internal thread-safe store that holds the top n queries insight data
*/
private final PriorityQueue<SearchQueryRecord> topQueriesStore;
private final PriorityBlockingQueue<SearchQueryRecord> topQueriesStore;

/**
* The AtomicReference of a snapshot of the current window top queries for getters to consume
Expand All @@ -93,6 +97,8 @@ public class TopQueriesService {
*/
private QueryInsightsExporter exporter;

private QueryGrouper queryGrouper;

TopQueriesService(
final MetricType metricType,
final ThreadPool threadPool,
Expand All @@ -106,9 +112,16 @@ public class TopQueriesService {
this.windowSize = QueryInsightsSettings.DEFAULT_WINDOW_SIZE;
this.windowStart = -1L;
this.exporter = null;
topQueriesStore = new PriorityQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType));
topQueriesStore = new PriorityBlockingQueue<>(topNSize, (a, b) -> SearchQueryRecord.compare(a, b, metricType));
topQueriesCurrentSnapshot = new AtomicReference<>(new ArrayList<>());
topQueriesHistorySnapshot = new AtomicReference<>(new ArrayList<>());
queryGrouper = new MinMaxHeapQueryGrouper(
metricType,
QueryInsightsSettings.DEFAULT_GROUPING_TYPE,
AggregationType.AVERAGE,
topQueriesStore,
topNSize
);
}

/**
Expand All @@ -118,6 +131,7 @@ public class TopQueriesService {
*/
public void setTopNSize(final int topNSize) {
this.topNSize = topNSize;
this.queryGrouper.updateTopNSize(topNSize);
}

/**
Expand Down Expand Up @@ -169,6 +183,20 @@ public void setWindowSize(final TimeValue windowSize) {
this.windowStart = -1L;
}

public void setGrouping(final GroupingType groupingType) {
boolean changed = queryGrouper.setGroupingType(groupingType);
if (changed) {
drain();
}
}

public void setMaxGroups(final int maxGroups) {
boolean changed = queryGrouper.setMaxGroups(maxGroups);
if (changed) {
drain();
}
}

/**
* Validate if the window size is valid, based on internal constrains.
*
Expand Down Expand Up @@ -306,10 +334,16 @@ void consumeRecords(final List<SearchQueryRecord> records) {
}

private void addToTopNStore(final List<SearchQueryRecord> records) {
topQueriesStore.addAll(records);
// remove top elements for fix sizing priority queue
while (topQueriesStore.size() > topNSize) {
topQueriesStore.poll();
if (queryGrouper.getGroupingType() != GroupingType.NONE) {
for (SearchQueryRecord record : records) {
queryGrouper.add(record);
}
} else {
topQueriesStore.addAll(records);
// remove top elements for fix sizing priority queue
while (topQueriesStore.size() > topNSize) {
topQueriesStore.poll();
}
}
}

Expand All @@ -329,6 +363,9 @@ private void rotateWindowIfNecessary(final long newWindowStart) {
}
topQueriesHistorySnapshot.set(history);
topQueriesStore.clear();
if (queryGrouper.getGroupingType() != GroupingType.NONE) {
queryGrouper.drain();
}
topQueriesCurrentSnapshot.set(new ArrayList<>());
windowStart = newWindowStart;
// export to the configured sink
Expand Down Expand Up @@ -368,4 +405,13 @@ public List<SearchQueryRecord> getTopQueriesCurrentSnapshot() {
public void close() throws IOException {
queryInsightsExporterFactory.closeExporter(this.exporter);
}

/**
* Drain internal stores.
*/
private void drain() {
topQueriesStore.clear();
topQueriesHistorySnapshot.set(new ArrayList<>());
topQueriesCurrentSnapshot.set(new ArrayList<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ public static MurmurHash3.Hash128 getShapeHashCode(SearchSourceBuilder source, B
return MurmurHash3.hash128(shapeBytes.bytes, 0, shapeBytes.length, 0, new MurmurHash3.Hash128());
}

public static String getShapeHashCodeAsString(SearchSourceBuilder source, Boolean showFields) {
MurmurHash3.Hash128 hashcode = getShapeHashCode(source, showFields);
String hashAsString = Long.toHexString(hashcode.h1) + Long.toHexString(hashcode.h2);
return hashAsString;
}

/**
* Method to build search query shape given a source
* @param source search request source
Expand Down
Loading

0 comments on commit 9c142a7

Please sign in to comment.