Skip to content

Commit

Permalink
Adjust DateHistogram's bucket accounting to be iteratively (elastic#1…
Browse files Browse the repository at this point in the history
…01012)

Adjust DateHistogram's consumeBucketsAndMaybeBreak to be iteratively during reduce instead accounting all buckets at the end of the reduce.

In case of many non-empty buckets accounting the number of buckets at the end of the reduce may be too late. Elasticsearch may already have failed with an OOME. This change changes the accounting to happen iteratively during the reduce for non-empty bucket.

Note that for empty buckets accounting of the number of buckets already happens iteratively.
  • Loading branch information
martijnvg authored Oct 18, 2023
1 parent a545fe2 commit e3cb876
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/101012.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 101012
summary: Adjust `DateHistogram's` bucket accounting to be iteratively
area: Aggregations
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Buck
}
}

int consumeBucketCount = 0;
List<Bucket> reducedBuckets = new ArrayList<>();
if (pq.size() > 0) {
// list of buckets coming from different shards that have the same key
Expand All @@ -323,6 +324,10 @@ protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Buck
// the key changes, reduce what we already buffered and reset the buffer for current buckets
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) {
reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
consumeBucketCount = 0;
}
reducedBuckets.add(reduced);
}
currentBuckets.clear();
Expand All @@ -344,10 +349,14 @@ protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Buck
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
reducedBuckets.add(reduced);
if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) {
reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
consumeBucketCount = 0;
}
}
}
}

reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
return reducedBuckets;
}

Expand Down Expand Up @@ -387,7 +396,7 @@ private void addEmptyBuckets(List<Bucket> list, AggregationReduceContext reduceC
* consumeBucketsAndMaybeBreak.
*/
class Counter implements LongConsumer {
private int size = list.size();
private int size;

@Override
public void accept(long key) {
Expand Down Expand Up @@ -490,11 +499,9 @@ private void iterateEmptyBuckets(List<Bucket> list, ListIterator<Bucket> iter, L
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
boolean alreadyAccountedForBuckets = false;
if (reduceContext.isFinalReduce()) {
if (minDocCount == 0) {
addEmptyBuckets(reducedBuckets, reduceContext);
alreadyAccountedForBuckets = true;
}
if (InternalOrder.isKeyDesc(order)) {
// we just need to reverse here...
Expand All @@ -508,9 +515,6 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Aggreg
CollectionUtil.introSort(reducedBuckets, order.comparator());
}
}
if (false == alreadyAccountedForBuckets) {
reduceContext.consumeBucketsAndMaybeBreak(reducedBuckets.size());
}
return new InternalDateHistogram(
getName(),
reducedBuckets,
Expand Down

0 comments on commit e3cb876

Please sign in to comment.