Skip to content

Commit

Permalink
reading
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Nov 23, 2023
1 parent 2f18078 commit 73888af
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,10 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num];
long[] bucketOrdsToCollect = new long[queue.size()];
for (int i = 0; i < queue.size(); i++) {
bucketOrdsToCollect[i] = i;
bucketOrdsToCollect[i] = i; // TODO reading meaning queue is indexed with bucket key, and contains doccount
}
InternalAggregations[] subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect);

while (queue.size() > 0) {
int slot = queue.pop();
CompositeKey key = queue.toCompositeKey(slot);
Expand All @@ -207,6 +208,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
aggs
);
}

CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null;
return new InternalAggregation[] {
new InternalComposite(
Expand Down Expand Up @@ -283,7 +285,7 @@ private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException
for (int i = 0; i < end; i++) {
CompositeValuesSourceConfig sourceConfig = sourceConfigs[i];
SingleDimensionValuesSource<?> source = sources[i];
SortField indexSortField = indexSort.getSort()[i];
SortField indexSortField = indexSort.getSort()[i]; // TODO reading requiring the order should match
if (source.fieldType == null
// TODO: can we handle missing bucket when using index sort optimization ?
|| source.missingBucket
Expand Down Expand Up @@ -449,21 +451,22 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
finishLeaf();

boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR;
boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR; // TODO reading subAggs are deferred

Sort indexSortPrefix = buildIndexSortPrefix(ctx);
int sortPrefixLen = computeSortPrefixLen(indexSortPrefix);
int sortPrefixLen = computeSortPrefixLen(indexSortPrefix); // TODO reading asc index sort exists

// are there index sort enabled? sortPrefixLen
SortedDocsProducer sortedDocsProducer = sortPrefixLen == 0
? sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query())
? sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query()) // TODO reading only using the first field
: null;
if (sortedDocsProducer != null) {
// Visit documents sorted by the leading source of the composite definition and terminates
// when the leading source value is guaranteed to be greater than the lowest composite bucket
// in the queue.
DocIdSet docIdSet = sortedDocsProducer.processLeaf(context.query(), queue, ctx, fillDocIdSet);
if (fillDocIdSet) {
entries.add(new Entry(ctx, docIdSet));
entries.add(new Entry(ctx, docIdSet)); // TODO reading add entries
}
// We can bypass search entirely for this segment, the processing is done in the previous call.
// Throwing this exception will terminate the execution of the search for this root aggregation,
Expand All @@ -472,7 +475,7 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
throw new CollectionTerminatedException();
} else {
if (fillDocIdSet) {
currentLeaf = ctx;
currentLeaf = ctx; // TODO reading add entries
docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc());
}
if (rawAfterKey != null && sortPrefixLen > 0) {
Expand All @@ -482,6 +485,7 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
processLeafFromQuery(ctx, indexSortPrefix);
throw new CollectionTerminatedException();
} else {
// rawAfterKey == null || sort order is reversed
final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, sortPrefixLen));
return new LeafBucketCollector() {
@Override
Expand All @@ -506,7 +510,7 @@ public void collect(int doc, long bucket) throws IOException {
try {
long docCount = docCountProvider.getDocCount(doc);
if (queue.addIfCompetitive(indexSortPrefix, docCount)) {
if (builder != null && lastDoc != doc) {
if (builder != null && lastDoc != doc) { // TODO reading how can lastDoc == doc?
builder.add(doc);
lastDoc = doc;
}
Expand All @@ -530,6 +534,7 @@ private void runDeferredCollections() throws IOException {
Query query = context.query();
weight = context.searcher().createWeight(context.searcher().rewrite(query), ScoreMode.COMPLETE, 1f);
}

deferredCollectors.preCollection();
for (Entry entry : entries) {
DocIdSetIterator docIdSetIterator = entry.docIdSet.iterator();
Expand All @@ -538,6 +543,7 @@ private void runDeferredCollections() throws IOException {
}
final LeafBucketCollector subCollector = deferredCollectors.getLeafCollector(entry.context);
final LeafBucketCollector collector = queue.getLeafCollector(entry.context, getSecondPassCollector(subCollector));

DocIdSetIterator scorerIt = null;
if (needsScores) {
Scorer scorer = weight.scorer(entry.context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ boolean isFull() {
* the slot if the candidate is already in the queue or null if the candidate is not present.
*/
Integer compareCurrent() {
return map.get(new Slot(CANDIDATE_SLOT));
return map.get(new Slot(CANDIDATE_SLOT)); // TODO reading this check the slot/bucket? of the current value
}

/**
Expand Down Expand Up @@ -152,7 +152,7 @@ long getDocCount(int slot) {
*/
private void copyCurrent(int slot, long value) {
for (int i = 0; i < arrays.length; i++) {
arrays[i].copyCurrent(slot);
arrays[i].copyCurrent(slot); // TODO reading valueSource knows current value, set the value to this slot/index
}
docCounts = bigArrays.grow(docCounts, slot + 1);
docCounts.set(slot, value);
Expand Down Expand Up @@ -202,7 +202,10 @@ boolean equals(int slot1, int slot2) {
int hashCode(int slot) {
int result = 1;
for (int i = 0; i < arrays.length; i++) {
result = 31 * result + (slot == CANDIDATE_SLOT ? arrays[i].hashCodeCurrent() : arrays[i].hashCode(slot));
result = 31 * result +
(slot == CANDIDATE_SLOT ?
arrays[i].hashCodeCurrent() :
arrays[i].hashCode(slot));
}
return result;
}
Expand Down Expand Up @@ -281,10 +284,10 @@ boolean addIfCompetitive(long inc) {
*/
boolean addIfCompetitive(int indexSortSourcePrefix, long inc) {
// checks if the candidate key is competitive
Integer topSlot = compareCurrent();
if (topSlot != null) {
Integer curSlot = compareCurrent();
if (curSlot != null) {
// this key is already in the top N, skip it
docCounts.increment(topSlot, inc);
docCounts.increment(curSlot, inc);
return true;
}
if (afterKeyIsSet) {
Expand All @@ -300,7 +303,7 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) {
return false;
}
}
if (size() >= maxSize) {
if (size() >= maxSize) { // TODO reading queue full
// the tree map is full, check if the candidate key should be kept
int cmp = compare(CANDIDATE_SLOT, top());
if (cmp > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ public static void register(ValuesSourceRegistry.Builder builder) {
IndexReader reader,
int size,
LongConsumer addRequestCircuitBreakerBytes,
CompositeValuesSourceConfig compositeValuesSourceConfig) -> {
CompositeValuesSourceConfig compositeValuesSourceConfig
) -> {
final RoundingValuesSource roundingValuesSource = (RoundingValuesSource) compositeValuesSourceConfig.valuesSource();
return new LongValuesSource(
bigArrays,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ static boolean checkMatchAllOrRangeQuery(Query query, String fieldName) {
@Override
SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) {
query = extractQuery(query);
if (checkIfSortedDocsIsApplicable(reader, fieldType) == false || checkMatchAllOrRangeQuery(query, fieldType.name()) == false) {
if (checkIfSortedDocsIsApplicable(reader, fieldType) == false
|| checkMatchAllOrRangeQuery(query, fieldType.name()) == false) {
return null;
}
final byte[] lowerPoint;
Expand All @@ -275,13 +276,11 @@ SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query quer
case "long":
toBucketFunction = (value) -> rounding.applyAsLong(LongPoint.decodeDimension(value, 0));
break;

case "int":
case "short":
case "byte":
toBucketFunction = (value) -> rounding.applyAsLong(IntPoint.decodeDimension(value, 0));
break;

default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReade
// no value for the field
return DocIdSet.EMPTY;
}

long lowerBucket = Long.MIN_VALUE;
Comparable lowerValue = queue.getLowerValueLeadSource();
if (lowerValue != null) {
Expand All @@ -85,6 +86,7 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReade
}
upperBucket = (Long) upperValue;
}

DocIdSetBuilder builder = fillDocIdSet ? new DocIdSetBuilder(context.reader().maxDoc(), values, field) : null;
Visitor visitor = new Visitor(context, queue, builder, values.getBytesPerDimension(), lowerBucket, upperBucket);
try {
Expand Down Expand Up @@ -147,7 +149,7 @@ public void visit(int docID, byte[] packedValue) throws IOException {

long bucket = bucketFunction.applyAsLong(packedValue);
if (first == false && bucket != lastBucket) {
final DocIdSet docIdSet = bucketDocsBuilder.build();
final DocIdSet docIdSet = bucketDocsBuilder.build(); // TODO reading need to grasp how to use docIdSet
if (processBucket(queue, context, docIdSet.iterator(), lastBucket, builder) &&
// lower bucket is inclusive
lowerBucket != lastBucket) {
Expand All @@ -168,7 +170,8 @@ public void visit(int docID, byte[] packedValue) throws IOException {

@Override
public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
if ((upperPointQuery != null && Arrays.compareUnsigned(minPackedValue, 0, bytesPerDim, upperPointQuery, 0, bytesPerDim) > 0)
if ((upperPointQuery != null
&& Arrays.compareUnsigned(minPackedValue, 0, bytesPerDim, upperPointQuery, 0, bytesPerDim) > 0)
|| (lowerPointQuery != null
&& Arrays.compareUnsigned(maxPackedValue, 0, bytesPerDim, lowerPointQuery, 0, bytesPerDim) < 0)) {
// does not match the query
Expand All @@ -182,13 +185,13 @@ public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}
}

if (upperBucket != Long.MAX_VALUE) {
long minBucket = bucketFunction.applyAsLong(minPackedValue);
if (minBucket > upperBucket) {
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}
}

return PointValues.Relation.CELL_CROSSES_QUERY;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ protected boolean checkIfSortedDocsIsApplicable(IndexReader reader, MappedFieldT
return false;
}

if (reader.hasDeletions() && (reader.numDocs() == 0 || (double) reader.numDocs() / (double) reader.maxDoc() < 0.5)) {
if (reader.hasDeletions()
&& (reader.numDocs() == 0 || (double) reader.numDocs() / (double) reader.maxDoc() < 0.5)) {
// do not use the index if it has more than 50% of deleted docs
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected boolean processBucket(
@Override
public void collect(int doc, long bucket) throws IOException {
hasCollected[0] = true;
long docCount = docCountProvider.getDocCount(doc);
long docCount = docCountProvider.getDocCount(doc); // TODO reading _doc_count can be >1
if (queue.addIfCompetitive(docCount)) {
topCompositeCollected[0]++;
if (adder != null && doc != lastDoc) {
Expand All @@ -106,10 +106,10 @@ public void collect(int doc, long bucket) throws IOException {
}
}
};
final Bits liveDocs = context.reader().getLiveDocs();
final LeafBucketCollector collector = queue.getLeafCollector(leadSourceBucket, context, queueCollector);
final Bits liveDocs = context.reader().getLiveDocs();
while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
if (liveDocs == null || liveDocs.get(iterator.docID())) {
if (liveDocs == null || liveDocs.get(iterator.docID())) { // TODO reading doc exists
collector.collect(iterator.docID());
}
}
Expand Down

0 comments on commit 73888af

Please sign in to comment.