Skip to content

Commit

Permalink
Fix double invocation of postCollection when MultiBucketCollector is …
Browse files Browse the repository at this point in the history
…present

Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Jun 6, 2024
1 parent ba0df74 commit 2d9e52d
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.aggregations.AggregationExecutionException;
import org.opensearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.opensearch.search.aggregations.BucketOrder;
import org.opensearch.search.aggregations.bucket.filter.Filter;
import org.opensearch.search.aggregations.bucket.filter.InternalFilters;
import org.opensearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.opensearch.search.aggregations.metrics.Avg;
import org.opensearch.search.aggregations.metrics.ExtendedStats;
Expand Down Expand Up @@ -999,6 +1001,72 @@ public void testOtherDocCount() {
testOtherDocCount(SINGLE_VALUED_FIELD_NAME, MULTI_VALUED_FIELD_NAME);
}

public void testDeferredSubAggs() {
// Tests subAgg doc count is the same with different collection modes and additional top level aggs
SearchResponse r1 = client().prepareSearch("idx")
.setSize(0)
.addAggregation(
terms("terms1").collectMode(SubAggCollectionMode.BREADTH_FIRST)
.field("s_value")
.size(2)
.subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery()))
)
.addAggregation(AggregationBuilders.min("min").field("constant"))
.get();

SearchResponse r2 = client().prepareSearch("idx")
.setSize(0)
.addAggregation(
terms("terms1").collectMode(SubAggCollectionMode.DEPTH_FIRST)
.field("s_value")
.size(2)
.subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery()))
)
.addAggregation(AggregationBuilders.min("min").field("constant"))
.get();

SearchResponse r3 = client().prepareSearch("idx")
.setSize(0)
.addAggregation(
terms("terms1").collectMode(SubAggCollectionMode.BREADTH_FIRST)
.field("s_value")
.size(2)
.subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery()))
)
.get();

SearchResponse r4 = client().prepareSearch("idx")
.setSize(0)
.addAggregation(
terms("terms1").collectMode(SubAggCollectionMode.DEPTH_FIRST)
.field("s_value")
.size(2)
.subAggregation(AggregationBuilders.filters("filter", QueryBuilders.boolQuery()))
)
.get();

assertNotNull(r1.getAggregations().get("terms1"));
assertNotNull(r2.getAggregations().get("terms1"));
assertNotNull(r3.getAggregations().get("terms1"));
assertNotNull(r4.getAggregations().get("terms1"));

Terms terms = r1.getAggregations().get("terms1");
Bucket b1 = terms.getBucketByKey("val0");
InternalFilters f1 = b1.getAggregations().get("filter");
long docCount1 = f1.getBuckets().get(0).getDocCount();
Bucket b2 = terms.getBucketByKey("val1");
InternalFilters f2 = b2.getAggregations().get("filter");
long docCount2 = f1.getBuckets().get(0).getDocCount();

for (SearchResponse response : new SearchResponse[] { r2, r3, r4 }) {
terms = response.getAggregations().get("terms1");
f1 = terms.getBucketByKey(b1.getKeyAsString()).getAggregations().get("filter");
f2 = terms.getBucketByKey(b2.getKeyAsString()).getAggregations().get("filter");
assertEquals(docCount1, f1.getBuckets().get(0).getDocCount());
assertEquals(docCount2, f2.getBuckets().get(0).getDocCount());
}
}

/**
* Make sure that a request using a deterministic script or not using a script get cached.
* Ensure requests using nondeterministic scripts do not get cached.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ public void processPostCollection(Collector collectorTree) throws IOException {
collectors.offer(innerCollector);
}
} else if (currentCollector instanceof BucketCollector) {
((BucketCollector) currentCollector).postCollection();

// Perform build aggregation during post collection
if (currentCollector instanceof Aggregator) {
// Do not perform postCollection for MultiBucketCollector as we are unwrapping that below
((BucketCollector) currentCollector).postCollection();
((Aggregator) currentCollector).buildTopLevel();
} else if (currentCollector instanceof MultiBucketCollector) {
for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ private void finishLeaf() {
if (context != null) {
assert docDeltasBuilder != null && bucketsBuilder != null;
entries.add(new Entry(context, docDeltasBuilder.build(), bucketsBuilder.build()));
context = null;
}
}

Expand Down Expand Up @@ -161,6 +162,7 @@ public void preCollection() throws IOException {

@Override
public void postCollection() throws IOException {
assert searchContext.searcher().getLeafContexts().isEmpty() || finished != true;
finishLeaf();
finished = true;
}
Expand Down

0 comments on commit 2d9e52d

Please sign in to comment.