Skip to content

Commit

Permalink
Replaced stream.findFirst by for loop for hybrid query (#706) (#707)
Browse files Browse the repository at this point in the history
* Change stream.findFirst to for loop

Signed-off-by: Martin Gaievski <[email protected]>
Co-authored-by: Navneet Verma <[email protected]>
(cherry picked from commit b277b07)

Co-authored-by: Martin Gaievski <[email protected]>
  • Loading branch information
1 parent 40782d8 commit c7d7ded
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Enhancements
- Allowing execution of hybrid query on index alias with filters ([#670](https://github.com/opensearch-project/neural-search/pull/670))
- Allowing query by raw tokens in neural_sparse query ([#693](https://github.com/opensearch-project/neural-search/pull/693))
- Removed stream.findFirst implementation to use more native iteration implement to improve hybrid query latencies by 35% ([#706](https://github.com/opensearch-project/neural-search/pull/706))
### Bug Fixes
- Add support for request_cache flag in hybrid query ([#663](https://github.com/opensearch-project/neural-search/pull/663))
### Infrastructure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Map;
import java.util.Objects;

import com.google.common.primitives.Ints;
import org.apache.lucene.search.DisiPriorityQueue;
import org.apache.lucene.search.DisiWrapper;
import org.apache.lucene.search.DisjunctionDISIApproximation;
Expand Down Expand Up @@ -42,7 +43,7 @@ public final class HybridQueryScorer extends Scorer {

private final float[] subScores;

private final Map<Query, List<Integer>> queryToIndex;
private final Map<Query, int[]> queryToIndex;

private final DocIdSetIterator approximation;
private final HybridScoreBlockBoundaryPropagator disjunctionBlockPropagator;
Expand Down Expand Up @@ -201,48 +202,55 @@ public float[] hybridScores() throws IOException {
continue;
}
Query query = scorer.getWeight().getQuery();
List<Integer> indexes = queryToIndex.get(query);
int[] indexes = queryToIndex.get(query);
// we need to find the index of first sub-query that hasn't been set yet. Such score will have initial value of "0.0"
int index = indexes.stream()
.mapToInt(idx -> idx)
.filter(idx -> Float.compare(scores[idx], 0.0f) == 0)
.findFirst()
.orElseThrow(
() -> new IllegalStateException(
String.format(
Locale.ROOT,
"cannot set score for one of hybrid search subquery [%s] and document [%d]",
query.toString(),
scorer.docID()
)
int index = -1;
for (int idx : indexes) {
if (Float.compare(scores[idx], 0.0f) == 0) {
index = idx;
break;
}
}
if (index == -1) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"cannot set score for one of hybrid search subquery [%s] and document [%d]",
query.toString(),
scorer.docID()
)
);
}
scores[index] = scorer.score();
}
return scores;
}

private Map<Query, List<Integer>> mapQueryToIndex() {
Map<Query, List<Integer>> queryToIndex = new HashMap<>();
private Map<Query, int[]> mapQueryToIndex() {
// we need list as number of identical queries is unknown
Map<Query, List<Integer>> queryToListOfIndexes = new HashMap<>();
int idx = 0;
for (Scorer scorer : subScorers) {
if (scorer == null) {
idx++;
continue;
}
Query query = scorer.getWeight().getQuery();
queryToIndex.putIfAbsent(query, new ArrayList<>());
queryToIndex.get(query).add(idx);
queryToListOfIndexes.putIfAbsent(query, new ArrayList<>());
queryToListOfIndexes.get(query).add(idx);
idx++;
}
// convert to the int array for better performance
Map<Query, int[]> queryToIndex = new HashMap<>();
queryToListOfIndexes.forEach((key, value) -> queryToIndex.put(key, Ints.toArray(value)));
return queryToIndex;
}

private DisiPriorityQueue initializeSubScorersPQ() {
Objects.requireNonNull(queryToIndex, "should not be null");
Objects.requireNonNull(subScorers, "should not be null");
// we need to count this way in order to include all identical sub-queries
int numOfSubQueries = queryToIndex.values().stream().map(List::size).reduce(0, Integer::sum);
int numOfSubQueries = queryToIndex.values().stream().map(array -> array.length).reduce(0, Integer::sum);
DisiPriorityQueue subScorersPQ = new DisiPriorityQueue(numOfSubQueries);
for (Scorer scorer : subScorers) {
if (scorer == null) {
Expand Down

0 comments on commit c7d7ded

Please sign in to comment.