Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply the fast filter optimization to composite aggregation #11505

Merged
merged 21 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057))
- Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023))
- Performance improvement for date histogram aggregations without sub-aggregations ([#11083](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Apply the fast filter optimization to composite aggregation of date histogram source ([#11505](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087))
- Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528))
- Improved performance of numeric exact-match queries ([#11209](https://github.com/opensearch-project/OpenSearch/pull/11209))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.comparators.LongComparator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.RoaringDocIdSet;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.IndexSortConfig;
import org.opensearch.lucene.queries.SearchAfterSortedDocQuery;
Expand All @@ -71,7 +73,9 @@
import org.opensearch.search.aggregations.MultiBucketCollector;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.FastFilterRewriteHelper;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.searchafter.SearchAfterBuilder;
import org.opensearch.search.sort.SortAndFormats;
Expand All @@ -80,6 +84,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.LongUnaryOperator;
Expand Down Expand Up @@ -111,6 +116,10 @@

private boolean earlyTerminated;

private final FastFilterRewriteHelper.FastFilterContext fastFilterContext;
private LongKeyedBucketOrds bucketOrds = null;
private Rounding.Prepared preparedRounding = null;

CompositeAggregator(
String name,
AggregatorFactories factories,
Expand Down Expand Up @@ -154,12 +163,33 @@
}
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext();
if (!FastFilterRewriteHelper.isCompositeAggRewriteable(sourceConfigs)) return;
fastFilterContext.setAggregationType(
new FastFilterRewriteHelper.CompositeAggregationType(sourceConfigs, rawAfterKey, formats, size)
);
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
// bucketOrds is the data structure for saving date histogram results
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
// Currently the filter rewrite is only supported for date histograms
FastFilterRewriteHelper.CompositeAggregationType aggregationType =
(FastFilterRewriteHelper.CompositeAggregationType) fastFilterContext.aggregationType;
preparedRounding = aggregationType.getRoundingPreparer();
fastFilterContext.buildFastFilter(
context,
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()),
x -> aggregationType.getRounding(),
() -> preparedRounding
);
}
}

@Override
protected void doClose() {
try {
Releasables.close(queue);
Releasables.close(bucketOrds);
} finally {
Releasables.close(sources);
}
Expand Down Expand Up @@ -187,12 +217,14 @@
}

int num = Math.min(size, queue.size());
final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num];
InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num];

long[] bucketOrdsToCollect = new long[queue.size()];
for (int i = 0; i < queue.size(); i++) {
bucketOrdsToCollect[i] = i;
}
InternalAggregations[] subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect);

while (queue.size() > 0) {
int slot = queue.pop();
CompositeKey key = queue.toCompositeKey(slot);
Expand All @@ -208,6 +240,43 @@
aggs
);
}

// Build results from fast filters optimization
if (bucketOrds != null) {
// CompositeKey is the value of bucket key
final Map<CompositeKey, InternalComposite.InternalBucket> bucketMap = new HashMap<>();
// Some segments may not be optimized, so buckets may contain results from the queue.
for (InternalComposite.InternalBucket internalBucket : buckets) {
bucketMap.put(internalBucket.getRawKey(), internalBucket);
}
// Loop over the buckets in the bucketOrds, and populate the map accordingly
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(0);
while (ordsEnum.next()) {
Long bucketKeyValue = ordsEnum.value();
CompositeKey key = new CompositeKey(bucketKeyValue);
if (bucketMap.containsKey(key)) {
long docCount = bucketDocCount(ordsEnum.ord()) + bucketMap.get(key).getDocCount();
bucketMap.get(key).setDocCount(docCount);
} else {

Check warning on line 260 in server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java#L258-L260

Added lines #L258 - L260 were not covered by tests
InternalComposite.InternalBucket bucket = new InternalComposite.InternalBucket(
sourceNames,
formats,
key,
reverseMuls,
missingOrders,
bucketDocCount(ordsEnum.ord()),
buildEmptySubAggregations()
);
bucketMap.put(key, bucket);
}
}
// since a map is not sorted structure, sort it before transform back to buckets
List<InternalComposite.InternalBucket> bucketList = new ArrayList<>(bucketMap.values());
CollectionUtil.introSort(bucketList, InternalComposite.InternalBucket::compareKey);
buckets = bucketList.subList(0, Math.min(size, bucketList.size())).toArray(InternalComposite.InternalBucket[]::new);
num = buckets.length;
}

CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null;
return new InternalAggregation[] {
new InternalComposite(
Expand Down Expand Up @@ -296,16 +365,16 @@

if (indexSortField.getReverse() != (source.reverseMul == -1)) {
if (i == 0) {
// the leading index sort matches the leading source field but the order is reversed
// the leading index sort matches the leading source field, but the order is reversed,
// so we don't check the other sources.
return new Sort(indexSortField);
}
break;
}
sortFields.add(indexSortField);
if (sourceConfig.valuesSource() instanceof RoundingValuesSource) {
// the rounding "squashes" many values together, that breaks the ordering of sub-values
// so we ignore subsequent source even if they match the index sort.
// the rounding "squashes" many values together, that breaks the ordering of sub-values,
// so we ignore the subsequent sources even if they match the index sort.
break;
}
}
Expand Down Expand Up @@ -448,6 +517,16 @@

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(
ctx,
fastFilterContext,
(key, count) -> incrementBucketDocCount(
FastFilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))),
count
)
);
if (optimized) throw new CollectionTerminatedException();

finishLeaf();

boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR;
Expand Down Expand Up @@ -477,9 +556,10 @@
docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc());
}
if (rawAfterKey != null && sortPrefixLen > 0) {
// We have an after key and index sort is applicable so we jump directly to the doc
// that is after the index sort prefix using the rawAfterKey and we start collecting
// document from there.
// We have an after key and index sort is applicable, so we jump directly to the doc
// after the index sort prefix using the rawAfterKey and we start collecting
// documents from there.
assert indexSortPrefix != null;
processLeafFromQuery(ctx, indexSortPrefix);
throw new CollectionTerminatedException();
} else {
Expand Down Expand Up @@ -507,6 +587,8 @@
try {
long docCount = docCountProvider.getDocCount(doc);
if (queue.addIfCompetitive(indexSortPrefix, docCount)) {
// one doc may contain multiple values, we iterate over and collect one by one
// so the same doc can appear multiple times here
if (builder != null && lastDoc != doc) {
builder.add(doc);
lastDoc = doc;
Expand Down Expand Up @@ -569,7 +651,7 @@
@Override
public void collect(int doc, long zeroBucket) throws IOException {
assert zeroBucket == 0;
Integer slot = queue.compareCurrent();
Integer slot = queue.getCurrentSlot();
if (slot != null) {
// The candidate key is a top bucket.
// We can defer the collection of this document/bucket to the sub collector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
*
* @opensearch.internal
*/
class CompositeKey implements Writeable {
public class CompositeKey implements Writeable {
private final Comparable[] values;

CompositeKey(Comparable... values) {
Expand All @@ -64,11 +64,11 @@ Comparable[] values() {
return values;
}

int size() {
public int size() {
return values.length;
}

Comparable get(int pos) {
public Comparable get(int pos) {
assert pos < values.length;
return values[pos];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@

/**
* A specialized {@link PriorityQueue} implementation for composite buckets.
* Can think of this as a max heap that holds the top small buckets slots in order.
* Each slot holds the values of the composite bucket key it represents.
*
* @opensearch.internal
*/
Expand Down Expand Up @@ -77,7 +79,7 @@ public int hashCode() {

private final BigArrays bigArrays;
private final int maxSize;
private final Map<Slot, Integer> map;
private final Map<Slot, Integer> map; // to quickly find the slot for a value
private final SingleDimensionValuesSource<?>[] arrays;

private LongArray docCounts;
Expand Down Expand Up @@ -108,7 +110,7 @@ public int hashCode() {

@Override
protected boolean lessThan(Integer a, Integer b) {
return compare(a, b) > 0;
return compare(a, b) > 0; // max heap
}

/**
Expand All @@ -119,10 +121,10 @@ boolean isFull() {
}

/**
* Compares the current candidate with the values in the queue and returns
* Try to get the slot of the current/candidate values in the queue and returns
* the slot if the candidate is already in the queue or null if the candidate is not present.
*/
Integer compareCurrent() {
Integer getCurrentSlot() {
return map.get(new Slot(CANDIDATE_SLOT));
}

Expand Down Expand Up @@ -281,32 +283,34 @@ boolean addIfCompetitive(long inc) {
*/
boolean addIfCompetitive(int indexSortSourcePrefix, long inc) {
// checks if the candidate key is competitive
Integer topSlot = compareCurrent();
if (topSlot != null) {
Integer curSlot = getCurrentSlot();
if (curSlot != null) {
// this key is already in the top N, skip it
docCounts.increment(topSlot, inc);
docCounts.increment(curSlot, inc);
return true;
}

if (afterKeyIsSet) {
int cmp = compareCurrentWithAfter();
if (cmp <= 0) {
if (indexSortSourcePrefix < 0 && cmp == indexSortSourcePrefix) {
// the leading index sort is in the reverse order of the leading source
// the leading index sort is and the leading source order are both reversed,
// so we can early terminate when we reach a document that is smaller
// than the after key (collected on a previous page).
throw new CollectionTerminatedException();
}
// key was collected on a previous page, skip it (>= afterKey).
// the key was collected on a previous page, skip it.
return false;
}
}

// the heap is full, check if the candidate key larger than max heap top
if (size() >= maxSize) {
// the tree map is full, check if the candidate key should be kept
int cmp = compare(CANDIDATE_SLOT, top());
if (cmp > 0) {
if (cmp <= indexSortSourcePrefix) {
// index sort guarantees that there is no key greater or equal than the
// current one in the subsequent documents so we can early terminate.
// index sort guarantees the following documents will have a key larger than the current candidate,
// so we can early terminate.
throw new CollectionTerminatedException();
}
// the candidate key is not competitive, skip it.
Expand All @@ -324,7 +328,7 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) {
} else {
newSlot = size();
}
// move the candidate key to its new slot
// move the candidate key to its new slot by copy its values to the new slot
copyCurrent(newSlot, inc);
map.put(new Slot(newSlot), newSlot);
add(newSlot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public MissingOrder missingOrder() {
/**
* Returns true if the source contains a script that can change the value.
*/
protected boolean hasScript() {
public boolean hasScript() {
return hasScript;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public static void register(ValuesSourceRegistry.Builder builder) {
// TODO once composite is plugged in to the values source registry or at least understands Date values source types use it
// here
Rounding.Prepared preparedRounding = rounding.prepareForUnknown();
RoundingValuesSource vs = new RoundingValuesSource(numeric, preparedRounding);
RoundingValuesSource vs = new RoundingValuesSource(numeric, preparedRounding, rounding);
// is specified in the builder.
final DocValueFormat docValueFormat = format == null ? DocValueFormat.RAW : valuesSourceConfig.format();
final MappedFieldType fieldType = valuesSourceConfig.fieldType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@
KeyComparable<InternalBucket> {

private final CompositeKey key;
private final long docCount;
private long docCount;
private final InternalAggregations aggregations;
private final transient int[] reverseMuls;
private final transient MissingOrder[] missingOrders;
Expand Down Expand Up @@ -436,6 +436,10 @@
return docCount;
}

public void setDocCount(long docCount) {
this.docCount = docCount;
}

Check warning on line 441 in server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/composite/InternalComposite.java#L440-L441

Added lines #L440 - L441 were not covered by tests

@Override
public Aggregations getAggregations() {
return aggregations;
Expand Down
Loading
Loading