Skip to content

Commit

Permalink
Refactor query grouper
Browse files Browse the repository at this point in the history
Signed-off-by: Siddhant Deshmukh <[email protected]>
  • Loading branch information
deshsidd committed Sep 4, 2024
1 parent d891909 commit 3493e65
Showing 1 changed file with 20 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,10 @@ public SearchQueryRecord add(SearchQueryRecord searchQueryRecord) {
// 2) Existing group being updated to the grouping service
// a. If present in min PQ
// - remove the record from the min PQ
// - update the aggregate record (aggregate measurement could increase or decrease)
// - If max PQ contains elements, add to max PQ and promote any records to min PQ
// - If max PQ is empty, add to min PQ and overflow any records to max PQ
// b. If present in max PQ
// - remove the record from the max PQ
// - update the aggregate record (aggregate measurement could increase or decrease)
// - If min PQ is full, add to min PQ and overflow any records to max PQ
// - else, add to max PQ and promote any records to min PQ
// Add to min PQ and promote to max
// If max PQ is empty return else try to promote record from max to min
if (!groupIdToAggSearchQueryRecord.containsKey(groupId)) {
boolean maxGroupsLimitReached = checkMaxGroupsLimitReached(groupId);
if (maxGroupsLimitReached) {
Expand All @@ -131,15 +127,16 @@ public SearchQueryRecord add(SearchQueryRecord searchQueryRecord) {
aggregateSearchQueryRecord = searchQueryRecord;
aggregateSearchQueryRecord.setGroupingId(groupId);
aggregateSearchQueryRecord.setMeasurementAggregation(metricType, aggregationType);
addToMinPQOverflowToMaxPQ(aggregateSearchQueryRecord, groupId);
addToMinPQ(aggregateSearchQueryRecord, groupId);
} else {
aggregateSearchQueryRecord = groupIdToAggSearchQueryRecord.get(groupId).v1();
boolean isPresentInMinPQ = groupIdToAggSearchQueryRecord.get(groupId).v2();
if (isPresentInMinPQ) {
updateToMinPQ(searchQueryRecord, aggregateSearchQueryRecord, groupId);
minHeapTopQueriesStore.remove(aggregateSearchQueryRecord);
} else {
updateToMaxPQ(searchQueryRecord, aggregateSearchQueryRecord, groupId);
maxHeapQueryStore.remove(aggregateSearchQueryRecord);
}
addAndPromote(searchQueryRecord, aggregateSearchQueryRecord, groupId);
}
return aggregateSearchQueryRecord;
}
Expand Down Expand Up @@ -204,49 +201,30 @@ public void updateTopNSize(int newSize) {
this.topNSize = newSize;
}

private void addToMinPQOverflowToMaxPQ(SearchQueryRecord searchQueryRecord, String groupId) {
private void addToMinPQ(SearchQueryRecord searchQueryRecord, String groupId) {
minHeapTopQueriesStore.add(searchQueryRecord);
groupIdToAggSearchQueryRecord.put(groupId, new Tuple<>(searchQueryRecord, true));

while (minHeapTopQueriesStore.size() > topNSize) {
SearchQueryRecord recordMovedFromMinToMax = minHeapTopQueriesStore.poll();
maxHeapQueryStore.add(recordMovedFromMinToMax);
groupIdToAggSearchQueryRecord.put(recordMovedFromMinToMax.getGroupingId(), new Tuple<>(recordMovedFromMinToMax, false));
}
overflow();
}

private void updateToMaxPQ(SearchQueryRecord searchQueryRecord, SearchQueryRecord aggregateSearchQueryRecord, String groupId) {
maxHeapQueryStore.remove(aggregateSearchQueryRecord);
private void addAndPromote(SearchQueryRecord searchQueryRecord, SearchQueryRecord aggregateSearchQueryRecord, String groupId) {
Number measurementToAdd = searchQueryRecord.getMeasurement(metricType);
aggregateSearchQueryRecord.addMeasurement(metricType, measurementToAdd);

if (minHeapTopQueriesStore.size() >= topNSize) {
addToMinPQOverflowToMaxPQ(aggregateSearchQueryRecord, groupId);
} else {
addToMaxPQPromoteToMinPQ(aggregateSearchQueryRecord, groupId);
addToMinPQ(aggregateSearchQueryRecord, groupId);
if (maxHeapQueryStore.isEmpty()) {
return;
}
}

private void updateToMinPQ(SearchQueryRecord searchQueryRecord, SearchQueryRecord aggregateSearchQueryRecord, String groupId) {
minHeapTopQueriesStore.remove(aggregateSearchQueryRecord);
Number measurementToAdd = searchQueryRecord.getMeasurement(metricType);
aggregateSearchQueryRecord.addMeasurement(metricType, measurementToAdd);

if (maxHeapQueryStore.size() > 0) {
addToMaxPQPromoteToMinPQ(aggregateSearchQueryRecord, groupId);
} else {
addToMinPQOverflowToMaxPQ(aggregateSearchQueryRecord, groupId);
if (SearchQueryRecord.compare(maxHeapQueryStore.peek(), minHeapTopQueriesStore.peek(), metricType) > 0) {
SearchQueryRecord recordMovedFromMaxToMin = maxHeapQueryStore.poll();
addToMinPQ(recordMovedFromMaxToMin, recordMovedFromMaxToMin.getGroupingId());
}
}

private void addToMaxPQPromoteToMinPQ(SearchQueryRecord aggregateSearchQueryRecord, String groupId) {
maxHeapQueryStore.add(aggregateSearchQueryRecord);
groupIdToAggSearchQueryRecord.put(groupId, new Tuple<>(aggregateSearchQueryRecord, false));

while (minHeapTopQueriesStore.size() < topNSize && !maxHeapQueryStore.isEmpty()) {
SearchQueryRecord recordMovedFromMaxToMin = maxHeapQueryStore.poll();
minHeapTopQueriesStore.add(recordMovedFromMaxToMin);
groupIdToAggSearchQueryRecord.put(recordMovedFromMaxToMin.getGroupingId(), new Tuple<>(recordMovedFromMaxToMin, true));
private void overflow() {
if (minHeapTopQueriesStore.size() > topNSize) {
SearchQueryRecord recordMovedFromMinToMax = minHeapTopQueriesStore.poll();
maxHeapQueryStore.add(recordMovedFromMinToMax);
groupIdToAggSearchQueryRecord.put(recordMovedFromMinToMax.getGroupingId(), new Tuple<>(recordMovedFromMinToMax, false));
}
}

Expand Down

0 comments on commit 3493e65

Please sign in to comment.