diff --git a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java index b6db551..b129c97 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java +++ b/src/main/java/org/opensearch/plugin/insights/core/reader/LocalIndexReader.java @@ -18,7 +18,10 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.query.MatchQueryBuilder; import org.opensearch.index.query.QueryBuilder; @@ -104,7 +107,9 @@ public List read(final String from, final String to) { try { SearchResponse searchResponse = client.search(searchRequest).actionGet(); for (SearchHit hit : searchResponse.getHits()) { - SearchQueryRecord record = SearchQueryRecord.getRecord(hit, namedXContentRegistry); + XContentParser parser = XContentType.JSON.xContent() + .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString()); + SearchQueryRecord record = SearchQueryRecord.fromXContent(parser); records.add(record); } } catch (IndexNotFoundException ignored) {} catch (Exception e) { diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/Measurement.java b/src/main/java/org/opensearch/plugin/insights/rules/model/Measurement.java index e7e5349..bd024d1 100644 --- a/src/main/java/org/opensearch/plugin/insights/rules/model/Measurement.java +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/Measurement.java @@ -10,18 +10,25 @@ import java.io.IOException; import java.util.Objects; +import org.opensearch.core.common.ParsingException; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; /** * Measurement that is stored in the SearchQueryRecord. Measurement can be of a specific AggregationType */ public class Measurement implements ToXContentObject, Writeable { private static int DEFAULT_COUNT = 1; + + private static final String NUMBER = "number"; + private static final String COUNT = "count"; + private static final String AGGREGATION_TYPE = "aggregationType"; + private AggregationType aggregationType; private Number number; private int count; @@ -55,6 +62,21 @@ public Measurement(Number number) { this(number, DEFAULT_COUNT, AggregationType.DEFAULT_AGGREGATION_TYPE); } + private Measurement() {} + + /** + * Construct a measurement from {@link XContentParser} + * + * @param parser {@link XContentParser} + * @return {@link Measurement} + * @throws IOException IOException + */ + public static Measurement fromXContent(XContentParser parser) throws IOException { + Measurement builder = new Measurement(); + builder.parseXContent(parser); + return builder; + } + /** * Add measurement number to the current number based on the aggregationType. * If aggregateType is NONE, replace the number since we are not aggregating in this case. @@ -150,13 +172,45 @@ public void setAggregationType(AggregationType aggregationType) { @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(); - builder.field("number", number); - builder.field("count", count); - builder.field("aggregationType", aggregationType.toString()); + builder.field(NUMBER, number); + builder.field(COUNT, count); + builder.field(AGGREGATION_TYPE, aggregationType.toString()); builder.endObject(); return builder; } + /** + * Parse a measurement from {@link XContentParser} + * + * @param parser {@link XContentParser} + * @throws IOException IOException + */ + private void parseXContent(XContentParser parser) throws IOException { + XContentParser.Token token = parser.currentToken(); + if (token != XContentParser.Token.START_OBJECT) { + throw new ParsingException( + parser.getTokenLocation(), + "Expected [" + XContentParser.Token.START_OBJECT + "] but found [" + token + "]", + parser.getTokenLocation() + ); + } else { + String currentFieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if (NUMBER.equals(currentFieldName)) { + this.number = parser.numberValue(); + } else if (COUNT.equals(currentFieldName)) { + this.count = parser.intValue(); + } else if (AGGREGATION_TYPE.equals(currentFieldName)) { + this.aggregationType = AggregationType.valueOf(parser.text()); + } + } + } + } + } + @Override public void writeTo(StreamOutput out) throws IOException { writeNumber(out, number); diff --git a/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java b/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java index a81bbc8..5877f67 100644 --- a/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java +++ b/src/main/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecord.java @@ -18,8 +18,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.Version; -import org.opensearch.common.xcontent.LoggingDeprecationHandler; -import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -27,12 +25,10 @@ import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo; import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; import org.opensearch.core.xcontent.MediaTypeRegistry; -import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.core.xcontent.XContentParserUtils; -import org.opensearch.search.SearchHit; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.tasks.Task; @@ -141,18 +137,17 @@ public SearchQueryRecord(final long timestamp, Map meas } /** - * Returns a SearchQueryRecord from a SearchHit + * Construct a SearchQueryRecord from {@link XContentParser} * - * @param hit SearchHit to parse into SearchQueryRecord - * @param namedXContentRegistry NamedXContentRegistry for parsing purposes - * @return SearchQueryRecord + * @param parser {@link XContentParser} + * @return {@link SearchQueryRecord} + * @throws IOException IOException */ - public static SearchQueryRecord getRecord(SearchHit hit, NamedXContentRegistry namedXContentRegistry) throws IOException { + public static SearchQueryRecord fromXContent(XContentParser parser) throws IOException { long timestamp = 0L; Map measurements = new HashMap<>(); Map attributes = new HashMap<>(); - XContentParser parser = XContentType.JSON.xContent() - .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.getSourceAsString()); + parser.nextToken(); XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); while (parser.nextToken() != XContentParser.Token.END_OBJECT) { @@ -167,7 +162,7 @@ public static SearchQueryRecord getRecord(SearchHit hit, NamedXContentRegistry n case CPU: case MEMORY: MetricType metric = MetricType.fromString(fieldName); - measurements.put(metric, new Measurement(metric.parseValue(parser.longValue()))); + measurements.put(metric, Measurement.fromXContent(parser)); break; case SEARCH_TYPE: attributes.put(Attribute.SEARCH_TYPE, parser.text()); diff --git a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java index 6fb9dff..8c2a6e4 100644 --- a/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java +++ b/src/test/java/org/opensearch/plugin/insights/QueryInsightsTestUtils.java @@ -240,8 +240,19 @@ public static SearchQueryRecord createFixedSearchQueryRecord() { Map measurements = Map.of(MetricType.LATENCY, new Measurement(1L)); Map phaseLatencyMap = new HashMap<>(); + phaseLatencyMap.put("expand", 1L); + phaseLatencyMap.put("query", 10L); + phaseLatencyMap.put("fetch", 1L); Map attributes = new HashMap<>(); attributes.put(Attribute.SEARCH_TYPE, SearchType.QUERY_THEN_FETCH.toString().toLowerCase(Locale.ROOT)); + attributes.put(Attribute.PHASE_LATENCY_MAP, phaseLatencyMap); + attributes.put( + Attribute.TASK_RESOURCE_USAGES, + List.of( + new TaskResourceInfo("action", 2L, 1L, "id", new TaskResourceUsage(1000L, 2000L)), + new TaskResourceInfo("action2", 3L, 1L, "id2", new TaskResourceUsage(2000L, 1000L)) + ) + ); return new SearchQueryRecord(timestamp, measurements, attributes); } diff --git a/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java b/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java index dbce9f6..e8b5ca9 100644 --- a/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java +++ b/src/test/java/org/opensearch/plugin/insights/rules/action/top_queries/TopQueriesResponseTests.java @@ -41,7 +41,7 @@ public void testSerialize() throws Exception { public void testToXContent() throws IOException { char[] expectedXcontent = - "{\"top_queries\":[{\"timestamp\":1706574180000,\"node_id\":\"node_for_top_queries_test\",\"search_type\":\"query_then_fetch\",\"measurements\":{\"latency\":{\"number\":1,\"count\":1,\"aggregationType\":\"NONE\"}}}]}" + "{\"top_queries\":[{\"timestamp\":1706574180000,\"node_id\":\"node_for_top_queries_test\",\"phase_latency_map\":{\"expand\":1,\"query\":10,\"fetch\":1},\"task_resource_usages\":[{\"action\":\"action\",\"taskId\":2,\"parentTaskId\":1,\"nodeId\":\"id\",\"taskResourceUsage\":{\"cpu_time_in_nanos\":1000,\"memory_in_bytes\":2000}},{\"action\":\"action2\",\"taskId\":3,\"parentTaskId\":1,\"nodeId\":\"id2\",\"taskResourceUsage\":{\"cpu_time_in_nanos\":2000,\"memory_in_bytes\":1000}}],\"search_type\":\"query_then_fetch\",\"measurements\":{\"latency\":{\"number\":1,\"count\":1,\"aggregationType\":\"NONE\"}}}]}" .toCharArray(); TopQueries topQueries = QueryInsightsTestUtils.createFixedTopQueries(); ClusterName clusterName = new ClusterName("test-cluster"); diff --git a/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecordTests.java b/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecordTests.java index a452b73..17b442f 100644 --- a/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecordTests.java +++ b/src/test/java/org/opensearch/plugin/insights/rules/model/SearchQueryRecordTests.java @@ -14,7 +14,9 @@ import java.util.List; import java.util.Set; import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.xcontent.XContentParser; import org.opensearch.plugin.insights.QueryInsightsTestUtils; import org.opensearch.test.OpenSearchTestCase; @@ -54,6 +56,16 @@ public void testEqual() { assertEquals(record1, record2); } + public void testFromXContent() { + SearchQueryRecord record = QueryInsightsTestUtils.createFixedSearchQueryRecord(); + try (XContentParser recordParser = createParser(JsonXContent.jsonXContent, record.toString())) { + SearchQueryRecord parsedRecord = SearchQueryRecord.fromXContent(recordParser); + QueryInsightsTestUtils.checkRecordsEquals(List.of(record), List.of(parsedRecord)); + } catch (Exception e) { + fail("Test should not throw exceptions when parsing search query record"); + } + } + /** * Serialize and deserialize a SearchQueryRecord. * @param record A SearchQueryRecord to serialize.