Skip to content

Commit

Permalink
spotless
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Dec 7, 2023
1 parent 2dc0271 commit 36f98e7
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,30 +165,22 @@ private static Weight[] createFilterForAggregations(
while (i < bucketCount) {
// Calculate the lower bucket bound
final byte[] lower = new byte[8];
NumericUtils.longToSortableBytes(
i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow),
lower, 0
);
NumericUtils.longToSortableBytes(i == 0 ? low : fieldType.convertRoundedMillisToNanos(roundedLow), lower, 0);

// Calculate the upper bucket bound
roundedLow = preparedRounding.round(roundedLow + interval);
final byte[] upper = new byte[8];
NumericUtils.longToSortableBytes(
i + 1 == bucketCount ? high :
// Subtract -1 if the minimum is roundedLow as roundedLow itself
// is included in the next bucket
fieldType.convertRoundedMillisToNanos(roundedLow) - 1,
upper,
0
);

filters[i++] = context.searcher().createWeight(
new PointRangeQuery(field, lower, upper, 1) {
@Override
protected String toString(int dimension, byte[] value) {
return null;
}
}, ScoreMode.COMPLETE_NO_SCORES, 1);
NumericUtils.longToSortableBytes(i + 1 == bucketCount ? high :
// Subtract -1 if the minimum is roundedLow as roundedLow itself
// is included in the next bucket
fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0);

filters[i++] = context.searcher().createWeight(new PointRangeQuery(field, lower, upper, 1) {
@Override
protected String toString(int dimension, byte[] value) {
return null;
}
}, ScoreMode.COMPLETE_NO_SCORES, 1);
}
}

Expand Down Expand Up @@ -305,8 +297,7 @@ public static boolean tryFastFilterAggregation(
if (counts[i] > 0) {
incrementDocCount.accept(
fieldType.convertNanosToMillis(
NumericUtils.sortableBytesToLong(
((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
),
counts[i]
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@
import org.opensearch.search.aggregations.MultiBucketCollector;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.FilterRewriteHelper;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.searchafter.SearchAfterBuilder;
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.search.aggregations.bucket.FilterRewriteHelper;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -519,7 +519,10 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t

if (scorer != null) {
DocIdSetIterator docIt = scorer.iterator();
final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, indexSortPrefix.getSort().length));
final LeafBucketCollector inner = queue.getLeafCollector(
ctx,
getFirstPassCollector(docIdSetBuilder, indexSortPrefix.getSort().length)
);
inner.setScorer(scorer);

final Bits liveDocs = ctx.reader().getLiveDocs();
Expand All @@ -533,15 +536,9 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType,
(key, count) -> {
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(
bucketOrds.add(0, preparedRounding.round(key))
),
count
);
}, size);
boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> {
incrementBucketDocCount(FilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), count);
}, size);
if (optimized) throw new CollectionTerminatedException();

finishLeaf();
Expand Down Expand Up @@ -673,7 +670,8 @@ private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollec
@Override
public void collect(int doc, long zeroBucket) throws IOException {
assert zeroBucket == 0;
Integer slot = queue.compareCurrent(); // TODO reading queue will make sure current value presents through collection mechanism
Integer slot = queue.compareCurrent(); // TODO reading queue will make sure current value presents through collection
// mechanism
if (slot != null) {
// The candidate key is a top bucket.
// We can defer the collection of this document/bucket to the sub collector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,7 @@ boolean equals(int slot1, int slot2) {
int hashCode(int slot) {
int result = 1;
for (int i = 0; i < arrays.length; i++) {
result = 31 * result +
(slot == CANDIDATE_SLOT ?
arrays[i].hashCodeCurrent() :
arrays[i].hashCode(slot));
result = 31 * result + (slot == CANDIDATE_SLOT ? arrays[i].hashCodeCurrent() : arrays[i].hashCode(slot));
}
return result;
}
Expand Down Expand Up @@ -310,12 +307,14 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // TODO reading
}

if (size() >= maxSize) { // TODO reading when queue is full, can check competitiveness
// the tree map is full, check if the candidate key should be kept // TODO reading queue contain topN largest composite key/bucket/slot
// the tree map is full, check if the candidate key should be kept // TODO reading queue contain topN largest composite
// key/bucket/slot
int cmp = compare(CANDIDATE_SLOT, top());
if (cmp > 0) { // TODO reading current large than queue
if (cmp <= indexSortSourcePrefix) { // TODO reading the way of comparing current and queue uses sorted fields
// index sort guarantees that there is no key greater or equal than the
// current one in the subsequent documents so we can early terminate. // TODO reading how to get the topN smallest items using heap?
// current one in the subsequent documents so we can early terminate. // TODO reading how to get the topN smallest items
// using heap?
throw new CollectionTerminatedException();
}
// the candidate key is not competitive, skip it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,7 @@ public static void register(ValuesSourceRegistry.Builder builder) {
IndexReader reader,
int size,
LongConsumer addRequestCircuitBreakerBytes,
CompositeValuesSourceConfig compositeValuesSourceConfig
) -> {
CompositeValuesSourceConfig compositeValuesSourceConfig) -> {
final RoundingValuesSource roundingValuesSource = (RoundingValuesSource) compositeValuesSourceConfig.valuesSource();
return new LongValuesSource(
bigArrays,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,13 @@ SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query quer
case "long":
toBucketFunction = (value) -> rounding.applyAsLong(LongPoint.decodeDimension(value, 0));
break;

case "int":
case "short":
case "byte":
toBucketFunction = (value) -> rounding.applyAsLong(IntPoint.decodeDimension(value, 0));
break;

default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void visit(int docID, byte[] packedValue) throws IOException {
if (first == false && bucket != lastBucket) { // process previous bucket when new bucket appears
final DocIdSet docIdSet = bucketDocsBuilder.build();
if (processBucket(queue, context, docIdSet.iterator(), lastBucket, builder) &&
// lower bucket is inclusive
// lower bucket is inclusive
lowerBucket != lastBucket) {
// this bucket does not have any competitive composite buckets,
// we can early terminate the collection because the remaining buckets are guaranteed
Expand All @@ -170,8 +170,7 @@ public void visit(int docID, byte[] packedValue) throws IOException {

@Override
public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
if ((upperPointQuery != null
&& Arrays.compareUnsigned(minPackedValue, 0, bytesPerDim, upperPointQuery, 0, bytesPerDim) > 0)
if ((upperPointQuery != null && Arrays.compareUnsigned(minPackedValue, 0, bytesPerDim, upperPointQuery, 0, bytesPerDim) > 0)
|| (lowerPointQuery != null
&& Arrays.compareUnsigned(maxPackedValue, 0, bytesPerDim, lowerPointQuery, 0, bytesPerDim) < 0)) {
// does not match the query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ private AutoDateHistogramAggregator(
valuesSourceConfig.missing() != null,
valuesSourceConfig.script() != null,
valuesSourceConfig.fieldType(),
0);
0
);
FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent(),
subAggregators.length,
Expand Down Expand Up @@ -233,15 +234,9 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc
return LeafBucketCollector.NO_OP_COLLECTOR;
}

boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType,
(key, count) -> {
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(
getBucketOrds().add(0, preparedRounding.round(key))
),
count
);
}, Integer.MAX_VALUE);
boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> {
incrementBucketDocCount(FilterRewriteHelper.getBucketOrd(getBucketOrds().add(0, preparedRounding.round(key))), count);
}, Integer.MAX_VALUE);
if (optimized) throw new CollectionTerminatedException();

final SortedNumericDocValues values = valuesSource.longValues(ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
valuesSourceConfig.missing() != null,
valuesSourceConfig.script() != null,
valuesSourceConfig.fieldType(),
0);
0
);
FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent,
subAggregators.length,
Expand Down Expand Up @@ -166,15 +167,9 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
return LeafBucketCollector.NO_OP_COLLECTOR;
}

boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType,
(key, count) -> {
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(
bucketOrds.add(0, preparedRounding.round(key))
),
count
);
}, Integer.MAX_VALUE);
boolean optimized = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> {
incrementBucketDocCount(FilterRewriteHelper.getBucketOrd(bucketOrds.add(0, preparedRounding.round(key))), count);
}, Integer.MAX_VALUE);
if (optimized) throw new CollectionTerminatedException();

SortedNumericDocValues values = valuesSource.longValues(ctx);
Expand Down Expand Up @@ -210,33 +205,29 @@ public void collect(int doc, long owningBucketOrd) throws IOException {

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return buildAggregationsForVariableBuckets(
owningBucketOrds,
bucketOrds,
(bucketValue, docCount, subAggregationResults) -> {
return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults);
},
(owningBucketOrd, buckets) -> {
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> {
return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults);
}, (owningBucketOrd, buckets) -> {
// the contract of the histogram aggregation is that shards must return buckets ordered by key in ascending order
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());

// value source will be null for unmapped fields
// Important: use `rounding` here, not `shardRounding`
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding.withoutOffset(), buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(
name,
buckets,
order,
minDocCount,
rounding.offset(),
emptyBucketInfo,
formatter,
keyed,
metadata()
);
});
// value source will be null for unmapped fields
// Important: use `rounding` here, not `shardRounding`
InternalDateHistogram.EmptyBucketInfo emptyBucketInfo = minDocCount == 0
? new InternalDateHistogram.EmptyBucketInfo(rounding.withoutOffset(), buildEmptySubAggregations(), extendedBounds)
: null;
return new InternalDateHistogram(
name,
buckets,
order,
minDocCount,
rounding.offset(),
emptyBucketInfo,
formatter,
keyed,
metadata()
);
});
}

@Override
Expand Down

0 comments on commit 36f98e7

Please sign in to comment.