Skip to content

Commit

Permalink
Increase code coverage and minor refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Sandesh Kumar <[email protected]>
  • Loading branch information
sandeshkr419 committed Mar 8, 2024
1 parent 6683679 commit 72668a9
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void setWeight(Weight weight) {
}

/**
Collects term frequencies for a given field from a LeafReaderContext.
Collects term frequencies for a given field from a LeafReaderContext directly from stored segment terms
@param ctx The LeafReaderContext to collect terms from
@param globalOrds The SortedSetDocValues for the field's ordinals
@param ordCountConsumer A consumer to accept collected term frequencies
Expand All @@ -170,15 +170,18 @@ LeafBucketCollector termDocFreqCollector(
SortedSetDocValues globalOrds,
BiConsumer<Long, Integer> ordCountConsumer
) throws IOException {
if (weight == null || weight.count(ctx) != ctx.reader().maxDoc()) {
// weight.count(ctx) == ctx.reader().maxDoc() implies there are no deleted documents and
// top-level query matches all docs in the segment
if (weight == null) {
// Weight not assigned - cannot use this optimization
return null;
}

if (weight.count(ctx) == 0) {
// No documents matches top level query on this segment, we can skip the segment
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
if (weight.count(ctx) == 0) {
// No documents matches top level query on this segment, we can skip the segment entirely
return LeafBucketCollector.NO_OP_COLLECTOR;
} else if (weight.count(ctx) != ctx.reader().maxDoc()) {
// weight.count(ctx) == ctx.reader().maxDoc() implies there are no deleted documents and
// top-level query matches all docs in the segment
return null;
}
}

Terms segmentTerms = ctx.reader().terms(this.fieldName);
Expand All @@ -198,6 +201,8 @@ LeafBucketCollector termDocFreqCollector(
TermsEnum globalOrdinalTermsEnum = globalOrds.termsEnum();
BytesRef ordinalTerm = globalOrdinalTermsEnum.next();

// Iterate over the terms in the segment, look for matches in the global ordinal terms,
// and increment bucket count when segment terms match global ordinal terms.
while (indexTerm != null && ordinalTerm != null) {
int compare = indexTerm.compareTo(ordinalTerm);
if (compare == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.TriConsumer;
import org.opensearch.index.mapper.KeywordFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorTestCase;
import org.opensearch.search.aggregations.support.ValueType;

Expand All @@ -68,61 +70,103 @@ public class KeywordTermsAggregatorTests extends AggregatorTestCase {
dataset = d;
}

private static Consumer<InternalMappedTerms> VERIFY_MATCH_ALL_DOCS = agg -> {
assertEquals(9, agg.getBuckets().size());
for (int i = 0; i < 9; i++) {
StringTerms.Bucket bucket = (StringTerms.Bucket) agg.getBuckets().get(i);
assertThat(bucket.getKey(), equalTo(String.valueOf(9L - i)));
assertThat(bucket.getDocCount(), equalTo(9L - i));
}
};

private static Query MATCH_ALL_DOCS_QUERY = new MatchAllDocsQuery();

private static Query MATCH_NO_DOCS_QUERY = new MatchNoDocsQuery();

public void testMatchNoDocs() throws IOException {
testSearchCase(
new MatchNoDocsQuery(),
ADD_SORTED_FIELD_NO_STORE,
MATCH_NO_DOCS_QUERY,
dataset,
aggregation -> aggregation.field(KEYWORD_FIELD),
agg -> assertEquals(0, agg.getBuckets().size()),
null // without type hint
null, // without type hint
DEFAULT_POST_COLLECTION
);

testSearchCase(
new MatchNoDocsQuery(),
ADD_SORTED_FIELD_NO_STORE,
MATCH_NO_DOCS_QUERY,
dataset,
aggregation -> aggregation.field(KEYWORD_FIELD),
agg -> assertEquals(0, agg.getBuckets().size()),
ValueType.STRING // with type hint
ValueType.STRING, // with type hint
DEFAULT_POST_COLLECTION
);
}

public void testMatchAllDocs() throws IOException {
Query query = new MatchAllDocsQuery();

testSearchCase(query, dataset, aggregation -> aggregation.field(KEYWORD_FIELD), agg -> {
assertEquals(9, agg.getBuckets().size());
for (int i = 0; i < 9; i++) {
StringTerms.Bucket bucket = (StringTerms.Bucket) agg.getBuckets().get(i);
assertThat(bucket.getKey(), equalTo(String.valueOf(9L - i)));
assertThat(bucket.getDocCount(), equalTo(9L - i));
}
},
null // without type hint
testSearchCase(
ADD_SORTED_FIELD_NO_STORE,
MATCH_ALL_DOCS_QUERY,
dataset,
aggregation -> aggregation.field(KEYWORD_FIELD),
VERIFY_MATCH_ALL_DOCS,
null, // without type hint
DEFAULT_POST_COLLECTION
);

testSearchCase(query, dataset, aggregation -> aggregation.field(KEYWORD_FIELD), agg -> {
assertEquals(9, agg.getBuckets().size());
for (int i = 0; i < 9; i++) {
StringTerms.Bucket bucket = (StringTerms.Bucket) agg.getBuckets().get(i);
assertThat(bucket.getKey(), equalTo(String.valueOf(9L - i)));
assertThat(bucket.getDocCount(), equalTo(9L - i));
}
},
ValueType.STRING // with type hint
testSearchCase(
ADD_SORTED_FIELD_NO_STORE,
MATCH_ALL_DOCS_QUERY,
dataset,
aggregation -> aggregation.field(KEYWORD_FIELD),
VERIFY_MATCH_ALL_DOCS,
ValueType.STRING, // with type hint
DEFAULT_POST_COLLECTION
);
}

public void testMatchAllDocsWithStoredValues() throws IOException {
// aggregator.postCollection() is not required when LeafBucketCollector#termDocFreqCollector optimization is used,
// therefore using NOOP_POST_COLLECTION
// This also verifies that the bucket count is completed without running postCollection()

testSearchCase(
ADD_SORTED_FIELD_STORE,
MATCH_ALL_DOCS_QUERY,
dataset,
aggregation -> aggregation.field(KEYWORD_FIELD),
VERIFY_MATCH_ALL_DOCS,
null, // without type hint
NOOP_POST_COLLECTION
);

testSearchCase(
ADD_SORTED_FIELD_STORE,
MATCH_ALL_DOCS_QUERY,
dataset,
aggregation -> aggregation.field(KEYWORD_FIELD),
VERIFY_MATCH_ALL_DOCS,
ValueType.STRING, // with type hint
NOOP_POST_COLLECTION
);
}

private void testSearchCase(
TriConsumer<Document, String, String> addField,
Query query,
List<String> dataset,
Consumer<TermsAggregationBuilder> configure,
Consumer<InternalMappedTerms> verify,
ValueType valueType
ValueType valueType,
Consumer<Aggregator> postCollectionConsumer
) throws IOException {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Document document = new Document();
for (String value : dataset) {
addField.apply(document, KEYWORD_FIELD, value);
document.add(new SortedSetDocValuesField(KEYWORD_FIELD, new BytesRef(value)));
indexWriter.addDocument(document);
document.clear();
Expand All @@ -147,5 +191,4 @@ private void testSearchCase(
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,6 @@ public class TermsAggregatorTests extends AggregatorTestCase {
private static final String STRING_SCRIPT_NAME = "string_script";
private static final String STRING_SCRIPT_OUTPUT = "Orange";

private static final Consumer<TermsAggregator> DEFAULT_POST_COLLECTION = termsAggregator -> {
try {
termsAggregator.postCollection();
} catch (IOException e) {
throw new RuntimeException(e);
}
};

// aggregator.postCollection() is not required when LeafBucketCollector#termDocFreqCollector optimization is used.
// using NOOP_POST_COLLECTION_CONSUMER ensures that the bucket count in aggregation is completed before/without running postCollection()
private static final Consumer<TermsAggregator> NOOP_POST_COLLECTION = termsAggregator -> {};

@Override
protected MapperService mapperServiceMock() {
MapperService mapperService = mock(MapperService.class);
Expand Down Expand Up @@ -275,31 +263,25 @@ public void testUsesGlobalOrdinalsByDefault() throws Exception {
* In this case, the segment terms will not get initialized and will run without LeafBucketCollector#termDocFreqCollector optimization
*/
public void testSimpleAggregation() throws Exception {
testSimple(
(document, field, value) -> document.add(new SortedSetDocValuesField(field, new BytesRef(value))),
DEFAULT_POST_COLLECTION
);
testSimple(ADD_SORTED_FIELD_NO_STORE, DEFAULT_POST_COLLECTION);
}

/**
* This test case utilizes the low cardinality implementation of GlobalOrdinalsStringTermsAggregator.
* In this case, the segment terms will get initialized and will use LeafBucketCollector#termDocFreqCollector optimization
*/
public void testSimpleAggregationWithStoredValues() throws Exception {
// aggregator.postCollection() is not required when LeafBucketCollector#termDocFreqCollector optimization is used.
// aggregator.postCollection() is not required when LeafBucketCollector#termDocFreqCollector optimization is used,
// therefore using NOOP_POST_COLLECTION
// This also verifies that the bucket count is completed without running postCollection()
testSimple((document, field, value) -> {
document.add(new SortedSetDocValuesField(field, new BytesRef(value)));
document.add(new StringField(field, value, Field.Store.NO));
}, NOOP_POST_COLLECTION);

testSimple(ADD_SORTED_FIELD_STORE, NOOP_POST_COLLECTION);
}

/**
* This is a utility method to test out string terms aggregation
* @param addFieldConsumer a function that determines how a field is added to the document
*/
private void testSimple(TriConsumer<Document, String, String> addFieldConsumer, Consumer<TermsAggregator> postCollectionConsumer)
private void testSimple(TriConsumer<Document, String, String> addFieldConsumer, Consumer<Aggregator> postCollectionConsumer)
throws Exception {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Expand Down Expand Up @@ -374,7 +356,7 @@ public void testStringIncludeExcludeWithStoredValues() throws Exception {
}, NOOP_POST_COLLECTION);
}

private void testStringIncludeExclude(TriConsumer<Document, String, String> addField, Consumer<TermsAggregator> postCollectionConsumer)
private void testStringIncludeExclude(TriConsumer<Document, String, String> addField, Consumer<Aggregator> postCollectionConsumer)
throws Exception {
try (Directory directory = newDirectory()) {
try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.InetAddressPoint;
import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.CompositeReaderContext;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
Expand All @@ -62,6 +64,7 @@
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.TriConsumer;
import org.opensearch.common.TriFunction;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
Expand Down Expand Up @@ -121,6 +124,7 @@
import org.opensearch.search.aggregations.AggregatorFactories.Builder;
import org.opensearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
import org.opensearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;
import org.opensearch.search.aggregations.metrics.MetricsAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
Expand Down Expand Up @@ -178,6 +182,26 @@ public abstract class AggregatorTestCase extends OpenSearchTestCase {
// A list of field types that should not be tested, or are not currently supported
private static List<String> TYPE_TEST_DENYLIST;

protected static final Consumer<Aggregator> DEFAULT_POST_COLLECTION = termsAggregator -> {
try {
termsAggregator.postCollection();
} catch (IOException e) {
throw new RuntimeException(e);
}
};

// aggregator.postCollection() is not required when LeafBucketCollector#termDocFreqCollector optimization is used.
// using NOOP_POST_COLLECTION_CONSUMER ensures that the bucket count in aggregation is completed before/without running postCollection()
protected static final Consumer<Aggregator> NOOP_POST_COLLECTION = termsAggregator -> {};

protected static final TriConsumer<Document, String, String> ADD_SORTED_FIELD_NO_STORE = (document, field, value) ->
document.add(new SortedSetDocValuesField(field, new BytesRef(value)));

protected static final TriConsumer<Document, String, String> ADD_SORTED_FIELD_STORE = (document, field, value) -> {
document.add(new SortedSetDocValuesField(field, new BytesRef(value)));
document.add(new StringField(field, value, Field.Store.NO));
};

static {
List<String> denylist = new ArrayList<>();
denylist.add(ObjectMapper.CONTENT_TYPE); // Cannot aggregate objects
Expand Down Expand Up @@ -484,6 +508,16 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
return searchAndReduce(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, fieldTypes);
}

protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
IndexSearcher searcher,
Query query,
AggregationBuilder builder,
Consumer<Aggregator> postCollectionConsumer,
MappedFieldType... fieldTypes
) throws IOException {
return searchAndReduce(createIndexSettings(), searcher, query, builder, DEFAULT_MAX_BUCKETS, postCollectionConsumer, fieldTypes);
}

protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
IndexSettings indexSettings,
IndexSearcher searcher,
Expand All @@ -504,6 +538,17 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
return searchAndReduce(createIndexSettings(), searcher, query, builder, maxBucket, fieldTypes);
}

protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
IndexSettings indexSettings,
IndexSearcher searcher,
Query query,
AggregationBuilder builder,
int maxBucket,
MappedFieldType... fieldTypes
) throws IOException {
return searchAndReduce(indexSettings, searcher, query, builder, maxBucket, DEFAULT_POST_COLLECTION, fieldTypes);
}

/**
* Collects all documents that match the provided query {@link Query} and
* returns the reduced {@link InternalAggregation}.
Expand All @@ -518,6 +563,7 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
Query query,
AggregationBuilder builder,
int maxBucket,
Consumer<Aggregator> postCollectionConsumer,
MappedFieldType... fieldTypes
) throws IOException {
final IndexReaderContext ctx = searcher.getTopReaderContext();
Expand Down Expand Up @@ -548,13 +594,13 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
a.preCollection();
Weight weight = subSearcher.createWeight(rewritten, ScoreMode.COMPLETE, 1f);
subSearcher.search(weight, a);
a.postCollection();
postCollectionConsumer.accept(a);
aggs.add(a.buildTopLevel());
}
} else {
root.preCollection();
searcher.search(rewritten, root);
root.postCollection();
postCollectionConsumer.accept(root);
aggs.add(root.buildTopLevel());
}

Expand Down

0 comments on commit 72668a9

Please sign in to comment.