From 73888af72d41faee8ac9522fa582b41bba9c25ba Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 22 Nov 2023 17:14:43 -0800 Subject: [PATCH] reading Signed-off-by: bowenlan-amzn --- .../bucket/composite/CompositeAggregator.java | 22 ++++++++++++------- .../CompositeValuesCollectorQueue.java | 17 ++++++++------ .../DateHistogramValuesSourceBuilder.java | 3 ++- .../bucket/composite/LongValuesSource.java | 5 ++--- .../composite/PointsSortedDocsProducer.java | 9 +++++--- .../SingleDimensionValuesSource.java | 3 ++- .../bucket/composite/SortedDocsProducer.java | 6 ++--- 7 files changed, 39 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index f2a4d5cd46127..58128f290bcb5 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -189,9 +189,10 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num]; long[] bucketOrdsToCollect = new long[queue.size()]; for (int i = 0; i < queue.size(); i++) { - bucketOrdsToCollect[i] = i; + bucketOrdsToCollect[i] = i; // TODO reading meaning queue is indexed with bucket key, and contains doccount } InternalAggregations[] subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect); + while (queue.size() > 0) { int slot = queue.pop(); CompositeKey key = queue.toCompositeKey(slot); @@ -207,6 +208,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I aggs ); } + CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null; return new InternalAggregation[] { new InternalComposite( @@ -283,7 +285,7 @@ private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException for (int i = 0; i < end; i++) { CompositeValuesSourceConfig sourceConfig = sourceConfigs[i]; SingleDimensionValuesSource source = sources[i]; - SortField indexSortField = indexSort.getSort()[i]; + SortField indexSortField = indexSort.getSort()[i]; // TODO reading requiring the order should match if (source.fieldType == null // TODO: can we handle missing bucket when using index sort optimization ? || source.missingBucket @@ -449,13 +451,14 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException { finishLeaf(); - boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR; + boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR; // TODO reading subAggs are deferred Sort indexSortPrefix = buildIndexSortPrefix(ctx); - int sortPrefixLen = computeSortPrefixLen(indexSortPrefix); + int sortPrefixLen = computeSortPrefixLen(indexSortPrefix); // TODO reading asc index sort exists + // are there index sort enabled? sortPrefixLen SortedDocsProducer sortedDocsProducer = sortPrefixLen == 0 - ? sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query()) + ? sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query()) // TODO reading only using the first field : null; if (sortedDocsProducer != null) { // Visit documents sorted by the leading source of the composite definition and terminates @@ -463,7 +466,7 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket // in the queue. DocIdSet docIdSet = sortedDocsProducer.processLeaf(context.query(), queue, ctx, fillDocIdSet); if (fillDocIdSet) { - entries.add(new Entry(ctx, docIdSet)); + entries.add(new Entry(ctx, docIdSet)); // TODO reading add entries } // We can bypass search entirely for this segment, the processing is done in the previous call. // Throwing this exception will terminate the execution of the search for this root aggregation, @@ -472,7 +475,7 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket throw new CollectionTerminatedException(); } else { if (fillDocIdSet) { - currentLeaf = ctx; + currentLeaf = ctx; // TODO reading add entries docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc()); } if (rawAfterKey != null && sortPrefixLen > 0) { @@ -482,6 +485,7 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket processLeafFromQuery(ctx, indexSortPrefix); throw new CollectionTerminatedException(); } else { + // rawAfterKey == null || sort order is reversed final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, sortPrefixLen)); return new LeafBucketCollector() { @Override @@ -506,7 +510,7 @@ public void collect(int doc, long bucket) throws IOException { try { long docCount = docCountProvider.getDocCount(doc); if (queue.addIfCompetitive(indexSortPrefix, docCount)) { - if (builder != null && lastDoc != doc) { + if (builder != null && lastDoc != doc) { // TODO reading how can lastDoc == doc? builder.add(doc); lastDoc = doc; } @@ -530,6 +534,7 @@ private void runDeferredCollections() throws IOException { Query query = context.query(); weight = context.searcher().createWeight(context.searcher().rewrite(query), ScoreMode.COMPLETE, 1f); } + deferredCollectors.preCollection(); for (Entry entry : entries) { DocIdSetIterator docIdSetIterator = entry.docIdSet.iterator(); @@ -538,6 +543,7 @@ private void runDeferredCollections() throws IOException { } final LeafBucketCollector subCollector = deferredCollectors.getLeafCollector(entry.context); final LeafBucketCollector collector = queue.getLeafCollector(entry.context, getSecondPassCollector(subCollector)); + DocIdSetIterator scorerIt = null; if (needsScores) { Scorer scorer = weight.scorer(entry.context); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java index 6ee1682a7b196..37317fafd56e4 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeValuesCollectorQueue.java @@ -123,7 +123,7 @@ boolean isFull() { * the slot if the candidate is already in the queue or null if the candidate is not present. */ Integer compareCurrent() { - return map.get(new Slot(CANDIDATE_SLOT)); + return map.get(new Slot(CANDIDATE_SLOT)); // TODO reading this check the slot/bucket? of the current value } /** @@ -152,7 +152,7 @@ long getDocCount(int slot) { */ private void copyCurrent(int slot, long value) { for (int i = 0; i < arrays.length; i++) { - arrays[i].copyCurrent(slot); + arrays[i].copyCurrent(slot); // TODO reading valueSource knows current value, set the value to this slot/index } docCounts = bigArrays.grow(docCounts, slot + 1); docCounts.set(slot, value); @@ -202,7 +202,10 @@ boolean equals(int slot1, int slot2) { int hashCode(int slot) { int result = 1; for (int i = 0; i < arrays.length; i++) { - result = 31 * result + (slot == CANDIDATE_SLOT ? arrays[i].hashCodeCurrent() : arrays[i].hashCode(slot)); + result = 31 * result + + (slot == CANDIDATE_SLOT ? + arrays[i].hashCodeCurrent() : + arrays[i].hashCode(slot)); } return result; } @@ -281,10 +284,10 @@ boolean addIfCompetitive(long inc) { */ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // checks if the candidate key is competitive - Integer topSlot = compareCurrent(); - if (topSlot != null) { + Integer curSlot = compareCurrent(); + if (curSlot != null) { // this key is already in the top N, skip it - docCounts.increment(topSlot, inc); + docCounts.increment(curSlot, inc); return true; } if (afterKeyIsSet) { @@ -300,7 +303,7 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { return false; } } - if (size() >= maxSize) { + if (size() >= maxSize) { // TODO reading queue full // the tree map is full, check if the candidate key should be kept int cmp = compare(CANDIDATE_SLOT, top()); if (cmp > 0) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java index fd94ba355238a..3f6aca2ad9c07 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/DateHistogramValuesSourceBuilder.java @@ -316,7 +316,8 @@ public static void register(ValuesSourceRegistry.Builder builder) { IndexReader reader, int size, LongConsumer addRequestCircuitBreakerBytes, - CompositeValuesSourceConfig compositeValuesSourceConfig) -> { + CompositeValuesSourceConfig compositeValuesSourceConfig + ) -> { final RoundingValuesSource roundingValuesSource = (RoundingValuesSource) compositeValuesSourceConfig.valuesSource(); return new LongValuesSource( bigArrays, diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java index 48e080c1576dd..ee3b1d252fa25 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/LongValuesSource.java @@ -253,7 +253,8 @@ static boolean checkMatchAllOrRangeQuery(Query query, String fieldName) { @Override SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) { query = extractQuery(query); - if (checkIfSortedDocsIsApplicable(reader, fieldType) == false || checkMatchAllOrRangeQuery(query, fieldType.name()) == false) { + if (checkIfSortedDocsIsApplicable(reader, fieldType) == false + || checkMatchAllOrRangeQuery(query, fieldType.name()) == false) { return null; } final byte[] lowerPoint; @@ -275,13 +276,11 @@ SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query quer case "long": toBucketFunction = (value) -> rounding.applyAsLong(LongPoint.decodeDimension(value, 0)); break; - case "int": case "short": case "byte": toBucketFunction = (value) -> rounding.applyAsLong(IntPoint.decodeDimension(value, 0)); break; - default: return null; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java index 3d6730203b6ae..b3a34df17dc90 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/PointsSortedDocsProducer.java @@ -68,6 +68,7 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReade // no value for the field return DocIdSet.EMPTY; } + long lowerBucket = Long.MIN_VALUE; Comparable lowerValue = queue.getLowerValueLeadSource(); if (lowerValue != null) { @@ -85,6 +86,7 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReade } upperBucket = (Long) upperValue; } + DocIdSetBuilder builder = fillDocIdSet ? new DocIdSetBuilder(context.reader().maxDoc(), values, field) : null; Visitor visitor = new Visitor(context, queue, builder, values.getBytesPerDimension(), lowerBucket, upperBucket); try { @@ -147,7 +149,7 @@ public void visit(int docID, byte[] packedValue) throws IOException { long bucket = bucketFunction.applyAsLong(packedValue); if (first == false && bucket != lastBucket) { - final DocIdSet docIdSet = bucketDocsBuilder.build(); + final DocIdSet docIdSet = bucketDocsBuilder.build(); // TODO reading need to grasp how to use docIdSet if (processBucket(queue, context, docIdSet.iterator(), lastBucket, builder) && // lower bucket is inclusive lowerBucket != lastBucket) { @@ -168,7 +170,8 @@ public void visit(int docID, byte[] packedValue) throws IOException { @Override public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) { - if ((upperPointQuery != null && Arrays.compareUnsigned(minPackedValue, 0, bytesPerDim, upperPointQuery, 0, bytesPerDim) > 0) + if ((upperPointQuery != null + && Arrays.compareUnsigned(minPackedValue, 0, bytesPerDim, upperPointQuery, 0, bytesPerDim) > 0) || (lowerPointQuery != null && Arrays.compareUnsigned(maxPackedValue, 0, bytesPerDim, lowerPointQuery, 0, bytesPerDim) < 0)) { // does not match the query @@ -182,13 +185,13 @@ public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue return PointValues.Relation.CELL_OUTSIDE_QUERY; } } - if (upperBucket != Long.MAX_VALUE) { long minBucket = bucketFunction.applyAsLong(minPackedValue); if (minBucket > upperBucket) { return PointValues.Relation.CELL_OUTSIDE_QUERY; } } + return PointValues.Relation.CELL_CROSSES_QUERY; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java index fe0801d6d230e..8eabd6e552f10 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SingleDimensionValuesSource.java @@ -185,7 +185,8 @@ protected boolean checkIfSortedDocsIsApplicable(IndexReader reader, MappedFieldT return false; } - if (reader.hasDeletions() && (reader.numDocs() == 0 || (double) reader.numDocs() / (double) reader.maxDoc() < 0.5)) { + if (reader.hasDeletions() + && (reader.numDocs() == 0 || (double) reader.numDocs() / (double) reader.maxDoc() < 0.5)) { // do not use the index if it has more than 50% of deleted docs return false; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java index 9442529bf9342..38b0bd83d58fa 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/SortedDocsProducer.java @@ -89,7 +89,7 @@ protected boolean processBucket( @Override public void collect(int doc, long bucket) throws IOException { hasCollected[0] = true; - long docCount = docCountProvider.getDocCount(doc); + long docCount = docCountProvider.getDocCount(doc); // TODO reading _doc_count can be >1 if (queue.addIfCompetitive(docCount)) { topCompositeCollected[0]++; if (adder != null && doc != lastDoc) { @@ -106,10 +106,10 @@ public void collect(int doc, long bucket) throws IOException { } } }; - final Bits liveDocs = context.reader().getLiveDocs(); final LeafBucketCollector collector = queue.getLeafCollector(leadSourceBucket, context, queueCollector); + final Bits liveDocs = context.reader().getLiveDocs(); while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { - if (liveDocs == null || liveDocs.get(iterator.docID())) { + if (liveDocs == null || liveDocs.get(iterator.docID())) { // TODO reading doc exists collector.collect(iterator.docID()); } }