diff --git a/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java b/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java index be2f60ff..9b87f2dd 100644 --- a/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java +++ b/src/main/java/org/opensearch/plugin/insights/core/service/grouper/MinMaxHeapQueryGrouper.java @@ -115,14 +115,10 @@ public SearchQueryRecord add(SearchQueryRecord searchQueryRecord) { // 2) Existing group being updated to the grouping service // a. If present in min PQ // - remove the record from the min PQ - // - update the aggregate record (aggregate measurement could increase or decrease) - // - If max PQ contains elements, add to max PQ and promote any records to min PQ - // - If max PQ is empty, add to min PQ and overflow any records to max PQ // b. If present in max PQ // - remove the record from the max PQ - // - update the aggregate record (aggregate measurement could increase or decrease) - // - If min PQ is full, add to min PQ and overflow any records to max PQ - // - else, add to max PQ and promote any records to min PQ + // Add to min PQ and promote to max + // If max PQ is empty return else try to promote record from max to min if (!groupIdToAggSearchQueryRecord.containsKey(groupId)) { boolean maxGroupsLimitReached = checkMaxGroupsLimitReached(groupId); if (maxGroupsLimitReached) { @@ -131,15 +127,16 @@ public SearchQueryRecord add(SearchQueryRecord searchQueryRecord) { aggregateSearchQueryRecord = searchQueryRecord; aggregateSearchQueryRecord.setGroupingId(groupId); aggregateSearchQueryRecord.setMeasurementAggregation(metricType, aggregationType); - addToMinPQOverflowToMaxPQ(aggregateSearchQueryRecord, groupId); + addToMinPQ(aggregateSearchQueryRecord, groupId); } else { aggregateSearchQueryRecord = groupIdToAggSearchQueryRecord.get(groupId).v1(); boolean isPresentInMinPQ = groupIdToAggSearchQueryRecord.get(groupId).v2(); if (isPresentInMinPQ) { - updateToMinPQ(searchQueryRecord, aggregateSearchQueryRecord, groupId); + minHeapTopQueriesStore.remove(aggregateSearchQueryRecord); } else { - updateToMaxPQ(searchQueryRecord, aggregateSearchQueryRecord, groupId); + maxHeapQueryStore.remove(aggregateSearchQueryRecord); } + addAndPromote(searchQueryRecord, aggregateSearchQueryRecord, groupId); } return aggregateSearchQueryRecord; } @@ -204,49 +201,30 @@ public void updateTopNSize(int newSize) { this.topNSize = newSize; } - private void addToMinPQOverflowToMaxPQ(SearchQueryRecord searchQueryRecord, String groupId) { + private void addToMinPQ(SearchQueryRecord searchQueryRecord, String groupId) { minHeapTopQueriesStore.add(searchQueryRecord); groupIdToAggSearchQueryRecord.put(groupId, new Tuple<>(searchQueryRecord, true)); - - while (minHeapTopQueriesStore.size() > topNSize) { - SearchQueryRecord recordMovedFromMinToMax = minHeapTopQueriesStore.poll(); - maxHeapQueryStore.add(recordMovedFromMinToMax); - groupIdToAggSearchQueryRecord.put(recordMovedFromMinToMax.getGroupingId(), new Tuple<>(recordMovedFromMinToMax, false)); - } + overflow(); } - private void updateToMaxPQ(SearchQueryRecord searchQueryRecord, SearchQueryRecord aggregateSearchQueryRecord, String groupId) { - maxHeapQueryStore.remove(aggregateSearchQueryRecord); + private void addAndPromote(SearchQueryRecord searchQueryRecord, SearchQueryRecord aggregateSearchQueryRecord, String groupId) { Number measurementToAdd = searchQueryRecord.getMeasurement(metricType); aggregateSearchQueryRecord.addMeasurement(metricType, measurementToAdd); - - if (minHeapTopQueriesStore.size() >= topNSize) { - addToMinPQOverflowToMaxPQ(aggregateSearchQueryRecord, groupId); - } else { - addToMaxPQPromoteToMinPQ(aggregateSearchQueryRecord, groupId); + addToMinPQ(aggregateSearchQueryRecord, groupId); + if (maxHeapQueryStore.isEmpty()) { + return; } - } - - private void updateToMinPQ(SearchQueryRecord searchQueryRecord, SearchQueryRecord aggregateSearchQueryRecord, String groupId) { - minHeapTopQueriesStore.remove(aggregateSearchQueryRecord); - Number measurementToAdd = searchQueryRecord.getMeasurement(metricType); - aggregateSearchQueryRecord.addMeasurement(metricType, measurementToAdd); - - if (maxHeapQueryStore.size() > 0) { - addToMaxPQPromoteToMinPQ(aggregateSearchQueryRecord, groupId); - } else { - addToMinPQOverflowToMaxPQ(aggregateSearchQueryRecord, groupId); + if (SearchQueryRecord.compare(maxHeapQueryStore.peek(), minHeapTopQueriesStore.peek(), metricType) > 0) { + SearchQueryRecord recordMovedFromMaxToMin = maxHeapQueryStore.poll(); + addToMinPQ(recordMovedFromMaxToMin, recordMovedFromMaxToMin.getGroupingId()); } } - private void addToMaxPQPromoteToMinPQ(SearchQueryRecord aggregateSearchQueryRecord, String groupId) { - maxHeapQueryStore.add(aggregateSearchQueryRecord); - groupIdToAggSearchQueryRecord.put(groupId, new Tuple<>(aggregateSearchQueryRecord, false)); - - while (minHeapTopQueriesStore.size() < topNSize && !maxHeapQueryStore.isEmpty()) { - SearchQueryRecord recordMovedFromMaxToMin = maxHeapQueryStore.poll(); - minHeapTopQueriesStore.add(recordMovedFromMaxToMin); - groupIdToAggSearchQueryRecord.put(recordMovedFromMaxToMin.getGroupingId(), new Tuple<>(recordMovedFromMaxToMin, true)); + private void overflow() { + if (minHeapTopQueriesStore.size() > topNSize) { + SearchQueryRecord recordMovedFromMinToMax = minHeapTopQueriesStore.poll(); + maxHeapQueryStore.add(recordMovedFromMinToMax); + groupIdToAggSearchQueryRecord.put(recordMovedFromMinToMax.getGroupingId(), new Tuple<>(recordMovedFromMinToMax, false)); } }