Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.16] Increment latency, cpu and memory histograms for query/aggregation/sort query types #38

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

package org.opensearch.plugin.insights.core.service.categorizer;

import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.PipelineAggregationBuilder;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.util.Collection;
import java.util.Map;

/**
* Increments the counters related to Aggregation Search Queries.
Expand All @@ -32,32 +34,34 @@ public SearchQueryAggregationCategorizer(SearchQueryCounters searchQueryCounters

/**
* Increment aggregation related counters
*
* @param aggregatorFactories input aggregations
* @param measurements latency, cpu, memory measurements
*/
public void incrementSearchQueryAggregationCounters(Collection<AggregationBuilder> aggregatorFactories) {
public void incrementSearchQueryAggregationCounters(Collection<AggregationBuilder> aggregatorFactories, Map<MetricType, Number> measurements) {
for (AggregationBuilder aggregationBuilder : aggregatorFactories) {
incrementCountersRecursively(aggregationBuilder);
incrementCountersRecursively(aggregationBuilder, measurements);
}
}

private void incrementCountersRecursively(AggregationBuilder aggregationBuilder) {
private void incrementCountersRecursively(AggregationBuilder aggregationBuilder, Map<MetricType, Number> measurements) {
// Increment counters for the current aggregation
String aggregationType = aggregationBuilder.getType();
searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(TYPE_TAG, aggregationType));
searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(TYPE_TAG, aggregationType), measurements);

// Recursively process sub-aggregations if any
Collection<AggregationBuilder> subAggregations = aggregationBuilder.getSubAggregations();
if (subAggregations != null && !subAggregations.isEmpty()) {
for (AggregationBuilder subAggregation : subAggregations) {
incrementCountersRecursively(subAggregation);
incrementCountersRecursively(subAggregation, measurements);
}
}

// Process pipeline aggregations
Collection<PipelineAggregationBuilder> pipelineAggregations = aggregationBuilder.getPipelineAggregations();
for (PipelineAggregationBuilder pipelineAggregation : pipelineAggregations) {
String pipelineAggregationType = pipelineAggregation.getType();
searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(TYPE_TAG, pipelineAggregationType));
searchQueryCounters.incrementAggCounter(1, Tags.create().addTag(TYPE_TAG, pipelineAggregationType), measurements);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilderVisitor;
import org.opensearch.plugin.insights.rules.model.Attribute;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.rules.model.SearchQueryRecord;
import org.opensearch.search.aggregations.AggregatorFactories;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand All @@ -21,6 +22,7 @@
import org.opensearch.telemetry.metrics.tags.Tags;

import java.util.List;
import java.util.Map;

/**
* Class to categorize the search queries based on the type and increment the relevant counters.
Expand Down Expand Up @@ -64,50 +66,53 @@ public static SearchQueryCategorizer getInstance(MetricsRegistry metricsRegistry
}

/**
* Consume records and increment counters for the records
* Consume records and increment categorization counters and histograms for the records including latency, cpu and memory.
* @param records records to consume
*/
public void consumeRecords(List<SearchQueryRecord> records) {
for (SearchQueryRecord record : records) {
SearchSourceBuilder source = (SearchSourceBuilder) record.getAttributes().get(Attribute.SOURCE);
categorize(source);
categorize(record);
}
}

/**
* Increment categorizations counters for the given source search query
* @param source search query source
* Increment categorizations counters for the given search query record and
* also increment latency, cpu and memory related histograms.
* @param record search query record
*/
public void categorize(SearchSourceBuilder source) {
public void categorize(SearchQueryRecord record) {
SearchSourceBuilder source = (SearchSourceBuilder) record.getAttributes().get(Attribute.SOURCE);
Map<MetricType, Number> measurements = record.getMeasurements();

QueryBuilder topLevelQueryBuilder = source.query();
logQueryShape(topLevelQueryBuilder);
incrementQueryTypeCounters(topLevelQueryBuilder);
incrementQueryAggregationCounters(source.aggregations());
incrementQuerySortCounters(source.sorts());
incrementQueryTypeCounters(topLevelQueryBuilder, measurements);
incrementQueryAggregationCounters(source.aggregations(), measurements);
incrementQuerySortCounters(source.sorts(), measurements);
}

private void incrementQuerySortCounters(List<SortBuilder<?>> sorts) {
private void incrementQuerySortCounters(List<SortBuilder<?>> sorts, Map<MetricType, Number> measurements) {
if (sorts != null && sorts.size() > 0) {
for (SortBuilder<?> sortBuilder : sorts) {
String sortOrder = sortBuilder.order().toString();
searchQueryCounters.incrementSortCounter(1, Tags.create().addTag("sort_order", sortOrder));
searchQueryCounters.incrementSortCounter(1, Tags.create().addTag("sort_order", sortOrder), measurements);
}
}
}

private void incrementQueryAggregationCounters(AggregatorFactories.Builder aggregations) {
private void incrementQueryAggregationCounters(AggregatorFactories.Builder aggregations, Map<MetricType, Number> measurements) {
if (aggregations == null) {
return;
}

searchQueryAggregationCategorizer.incrementSearchQueryAggregationCounters(aggregations.getAggregatorFactories());
searchQueryAggregationCategorizer.incrementSearchQueryAggregationCounters(aggregations.getAggregatorFactories(), measurements);
}

private void incrementQueryTypeCounters(QueryBuilder topLevelQueryBuilder) {
private void incrementQueryTypeCounters(QueryBuilder topLevelQueryBuilder, Map<MetricType, Number> measurements) {
if (topLevelQueryBuilder == null) {
return;
}
QueryBuilderVisitor searchQueryVisitor = new SearchQueryCategorizingVisitor(searchQueryCounters);
QueryBuilderVisitor searchQueryVisitor = new SearchQueryCategorizingVisitor(searchQueryCounters, measurements);
topLevelQueryBuilder.visit(searchQueryVisitor);
}

Expand All @@ -134,6 +139,8 @@ public SearchQueryCounters getSearchQueryCounters() {
* Reset the search query categorizer and its counters
*/
public void reset() {
instance = null;
synchronized (SearchQueryCategorizer.class) {
instance = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import org.apache.lucene.search.BooleanClause;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilderVisitor;
import org.opensearch.plugin.insights.rules.model.MetricType;

import java.util.Map;

/**
* Class to visit the query builder tree and also track the level information.
Expand All @@ -19,21 +22,23 @@
final class SearchQueryCategorizingVisitor implements QueryBuilderVisitor {
private final int level;
private final SearchQueryCounters searchQueryCounters;
private final Map<MetricType, Number> measurements;

public SearchQueryCategorizingVisitor(SearchQueryCounters searchQueryCounters) {
this(searchQueryCounters, 0);
public SearchQueryCategorizingVisitor(SearchQueryCounters searchQueryCounters, Map<MetricType, Number> measurements) {
this(searchQueryCounters, 0, measurements);
}

private SearchQueryCategorizingVisitor(SearchQueryCounters counters, int level) {
private SearchQueryCategorizingVisitor(SearchQueryCounters counters, int level, Map<MetricType, Number> measurements) {
this.searchQueryCounters = counters;
this.level = level;
this.measurements = measurements;
}

public void accept(QueryBuilder qb) {
searchQueryCounters.incrementCounter(qb, level);
searchQueryCounters.incrementCounter(qb, level, measurements);
}

public QueryBuilderVisitor getChildVisitor(BooleanClause.Occur occur) {
return new SearchQueryCategorizingVisitor(searchQueryCounters, level + 1);
return new SearchQueryCategorizingVisitor(searchQueryCounters, level + 1, measurements);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
package org.opensearch.plugin.insights.core.service.categorizer;

import org.opensearch.index.query.QueryBuilder;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

Expand All @@ -22,6 +24,7 @@
*/
public final class SearchQueryCounters {
private static final String LEVEL_TAG = "level";
private static final String TYPE_TAG = "type";
private static final String UNIT = "1";
private final MetricsRegistry metricsRegistry;
/**
Expand All @@ -36,6 +39,20 @@ public final class SearchQueryCounters {
* Counter for sort
*/
private final Counter sortCounter;

/**
* Histogram for latency per query type
*/
private final Histogram queryTypeLatencyHistogram;
/**
* Histogram for cpu per query type
*/
private final Histogram queryTypeCpuHistogram;
/**
* Histogram for memory per query type
*/
private final Histogram queryTypeMemoryHistogram;

private final Map<Class<? extends QueryBuilder>, Counter> queryHandlers;
/**
* Counter name to Counter object map
Expand Down Expand Up @@ -64,38 +81,61 @@ public SearchQueryCounters(MetricsRegistry metricsRegistry) {
"Counter for the number of top level sort search queries",
UNIT
);
this.queryTypeLatencyHistogram = metricsRegistry.createHistogram(
"search.query.type.latency.histogram",
"Histogram for the latency per query type",
UNIT
);
this.queryTypeCpuHistogram = metricsRegistry.createHistogram(
"search.query.type.cpu.histogram",
"Histogram for the cpu per query type",
UNIT
);
this.queryTypeMemoryHistogram = metricsRegistry.createHistogram(
"search.query.type.memory.histogram",
"Histogram for the memory per query type",
UNIT
);
this.queryHandlers = new HashMap<>();

}

/**
* Increment counter
* @param queryBuilder query builder
* @param level level of query builder, 0 being highest level
*/
public void incrementCounter(QueryBuilder queryBuilder, int level) {
public void incrementCounter(QueryBuilder queryBuilder, int level, Map<MetricType, Number> measurements) {
String uniqueQueryCounterName = queryBuilder.getName();

Counter counter = nameToQueryTypeCounters.computeIfAbsent(uniqueQueryCounterName, k -> createQueryCounter(k));
counter.add(1, Tags.create().addTag(LEVEL_TAG, level));
incrementAllHistograms(Tags.create().addTag(LEVEL_TAG, level).addTag(TYPE_TAG, uniqueQueryCounterName), measurements);
}

/**
* Increment aggregate counter
* @param value value to increment
* @param tags tags
*/
public void incrementAggCounter(double value, Tags tags) {
public void incrementAggCounter(double value, Tags tags, Map<MetricType, Number> measurements) {
aggCounter.add(value, tags);
incrementAllHistograms(tags, measurements);
}

/**
* Increment sort counter
* @param value value to increment
* @param tags tags
*/
public void incrementSortCounter(double value, Tags tags) {
public void incrementSortCounter(double value, Tags tags, Map<MetricType, Number> measurements) {
sortCounter.add(value, tags);
incrementAllHistograms(tags, measurements);
}

private void incrementAllHistograms(Tags tags, Map<MetricType, Number> measurements) {
queryTypeLatencyHistogram.record(measurements.get(MetricType.LATENCY).doubleValue(), tags);
queryTypeCpuHistogram.record(measurements.get(MetricType.CPU).doubleValue(), tags);
queryTypeMemoryHistogram.record(measurements.get(MetricType.MEMORY).doubleValue(), tags);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,29 @@ final public class QueryInsightsTestUtils {

public QueryInsightsTestUtils() {}

/**
* Returns list of randomly generated search query records.
* @param count number of records
* @return List of records
*/
public static List<SearchQueryRecord> generateQueryInsightRecords(int count) {
return generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0);
}

/**
* Returns list of randomly generated search query records.
* @param count number of records
* @param searchSourceBuilder source
* @return List of records
*/
public static List<SearchQueryRecord> generateQueryInsightRecords(int count, SearchSourceBuilder searchSourceBuilder) {
List<SearchQueryRecord> records = generateQueryInsightRecords(count, count, System.currentTimeMillis(), 0);
for (SearchQueryRecord record : records) {
record.getAttributes().put(Attribute.SOURCE, searchSourceBuilder);
}
return records;
}

/**
* Creates a List of random Query Insight Records for testing purpose
*/
Expand Down
Loading
Loading