Skip to content

Commit

Permalink
[Search Pipelines] Add request-scoped state shared between processors (
Browse files Browse the repository at this point in the history
…#9405)

To handle cases where multiple search pipeline processors need to share
information, we will allocate a context holder for the lifetime of
the request and pass it to each processor to get/set values.

To explain this behavior and benefit from it, this change also introduces
three new processors:
1. The "oversample" request processor that increases "size", storing the
original size in the context.
2. The "truncate" response processor that discards results after some
number, by default using the original size before oversampling.
3. The "collapse" response processor offers similar behavior to a collapse
query, discarding results that have a field value in common with a 
higher-scoring result.

Signed-off-by: Michael Froh <[email protected]>
  • Loading branch information
msfroh authored Dec 5, 2023
1 parent d875558 commit c204585
Show file tree
Hide file tree
Showing 30 changed files with 1,444 additions and 464 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Admission control] Add Resource usage collector service and resource usage tracker ([#9890](https://github.com/opensearch-project/OpenSearch/pull/9890))
- [Admission control] Add enhancements to FS stats to include read/write time, queue size and IO time ([#10541](https://github.com/opensearch-project/OpenSearch/pull/10541))
- [Remote cluster state] Change file names for remote cluster state ([#10557](https://github.com/opensearch-project/OpenSearch/pull/10557))
- [Search Pipelines] Add request-scoped state shared between processors (and three new processors) ([#9405](https://github.com/opensearch-project/OpenSearch/pull/9405))
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
- [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
* Helper for map abstractions passed to scripting processors. Throws {@link UnsupportedOperationException} for almost
* all methods. Subclasses just need to implement get and put.
*/
abstract class BasicMap implements Map<String, Object> {

/**
* No-args constructor.
*/
protected BasicMap() {}

private static final String UNSUPPORTED_OP_ERR = " Method not supported in Search pipeline script";

@Override
public boolean isEmpty() {
throw new UnsupportedOperationException("isEmpty" + UNSUPPORTED_OP_ERR);
}

public int size() {
throw new UnsupportedOperationException("size" + UNSUPPORTED_OP_ERR);
}

public boolean containsKey(Object key) {
return get(key) != null;
}

public boolean containsValue(Object value) {
throw new UnsupportedOperationException("containsValue" + UNSUPPORTED_OP_ERR);
}

public Object remove(Object key) {
throw new UnsupportedOperationException("remove" + UNSUPPORTED_OP_ERR);
}

public void putAll(Map<? extends String, ?> m) {
throw new UnsupportedOperationException("putAll" + UNSUPPORTED_OP_ERR);
}

public void clear() {
throw new UnsupportedOperationException("clear" + UNSUPPORTED_OP_ERR);
}

public Set<String> keySet() {
throw new UnsupportedOperationException("keySet" + UNSUPPORTED_OP_ERR);
}

public Collection<Object> values() {
throw new UnsupportedOperationException("values" + UNSUPPORTED_OP_ERR);
}

public Set<Map.Entry<String, Object>> entrySet() {
throw new UnsupportedOperationException("entrySet" + UNSUPPORTED_OP_ERR);
}

@Override
public Object getOrDefault(Object key, Object defaultValue) {
throw new UnsupportedOperationException("getOrDefault" + UNSUPPORTED_OP_ERR);
}

@Override
public void forEach(BiConsumer<? super String, ? super Object> action) {
throw new UnsupportedOperationException("forEach" + UNSUPPORTED_OP_ERR);
}

@Override
public void replaceAll(BiFunction<? super String, ? super Object, ?> function) {
throw new UnsupportedOperationException("replaceAll" + UNSUPPORTED_OP_ERR);
}

@Override
public Object putIfAbsent(String key, Object value) {
throw new UnsupportedOperationException("putIfAbsent" + UNSUPPORTED_OP_ERR);
}

@Override
public boolean remove(Object key, Object value) {
throw new UnsupportedOperationException("remove" + UNSUPPORTED_OP_ERR);
}

@Override
public boolean replace(String key, Object oldValue, Object newValue) {
throw new UnsupportedOperationException("replace" + UNSUPPORTED_OP_ERR);
}

@Override
public Object replace(String key, Object value) {
throw new UnsupportedOperationException("replace" + UNSUPPORTED_OP_ERR);
}

@Override
public Object computeIfAbsent(String key, Function<? super String, ?> mappingFunction) {
throw new UnsupportedOperationException("computeIfAbsent" + UNSUPPORTED_OP_ERR);
}

@Override
public Object computeIfPresent(String key, BiFunction<? super String, ? super Object, ?> remappingFunction) {
throw new UnsupportedOperationException("computeIfPresent" + UNSUPPORTED_OP_ERR);
}

@Override
public Object compute(String key, BiFunction<? super String, ? super Object, ?> remappingFunction) {
throw new UnsupportedOperationException("compute" + UNSUPPORTED_OP_ERR);
}

@Override
public Object merge(String key, Object value, BiFunction<? super Object, ? super Object, ?> remappingFunction) {
throw new UnsupportedOperationException("merge" + UNSUPPORTED_OP_ERR);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.document.DocumentField;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchResponseProcessor;
import org.opensearch.search.pipeline.common.helpers.SearchResponseUtil;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* A simple implementation of field collapsing on search responses. Note that this is not going to work as well as
* field collapsing at the shard level, as implemented with the "collapse" parameter in a search request. Mostly
* just using this to demo the oversample / truncate_hits processors.
*/
public class CollapseResponseProcessor extends AbstractProcessor implements SearchResponseProcessor {
/**
* Key to reference this processor type from a search pipeline.
*/
public static final String TYPE = "collapse";
static final String COLLAPSE_FIELD = "field";
private final String collapseField;

private CollapseResponseProcessor(String tag, String description, boolean ignoreFailure, String collapseField) {
super(tag, description, ignoreFailure);
this.collapseField = Objects.requireNonNull(collapseField);
}

@Override
public String getType() {
return TYPE;
}

@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response) {

if (response.getHits() != null) {
if (response.getHits().getCollapseField() != null) {
throw new IllegalStateException(
"Cannot collapse on " + collapseField + ". Results already collapsed on " + response.getHits().getCollapseField()
);
}
Map<String, SearchHit> collapsedHits = new LinkedHashMap<>();
List<Object> collapseValues = new ArrayList<>();
for (SearchHit hit : response.getHits()) {
Object fieldValue = null;
DocumentField docField = hit.getFields().get(collapseField);
if (docField != null) {
if (docField.getValues().size() > 1) {
throw new IllegalStateException(
"Failed to collapse " + hit.getId() + ": doc has multiple values for field " + collapseField
);
}
fieldValue = docField.getValues().get(0);
} else if (hit.getSourceAsMap() != null) {
fieldValue = hit.getSourceAsMap().get(collapseField);
}
String fieldValueString;
if (fieldValue == null) {
fieldValueString = "__missing__";
} else {
fieldValueString = fieldValue.toString();
}

// Results are already sorted by sort criterion. Only keep the first hit for each field.
if (collapsedHits.containsKey(fieldValueString) == false) {
collapsedHits.put(fieldValueString, hit);
collapseValues.add(fieldValue);
}
}
SearchHit[] newHits = new SearchHit[collapsedHits.size()];
int i = 0;
for (SearchHit collapsedHit : collapsedHits.values()) {
newHits[i++] = collapsedHit;
}
SearchHits searchHits = new SearchHits(
newHits,
response.getHits().getTotalHits(),
response.getHits().getMaxScore(),
response.getHits().getSortFields(),
collapseField,
collapseValues.toArray()
);
return SearchResponseUtil.replaceHits(searchHits, response);
}
return response;
}

static class Factory implements Processor.Factory<SearchResponseProcessor> {

@Override
public CollapseResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) {
String collapseField = ConfigurationUtils.readStringProperty(TYPE, tag, config, COLLAPSE_FIELD);
return new CollapseResponseProcessor(tag, description, ignoreFailure, collapseField);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchService;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.PipelineProcessingContext;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.StatefulSearchRequestProcessor;
import org.opensearch.search.pipeline.common.helpers.ContextUtils;

import java.util.Map;

import static org.opensearch.search.pipeline.common.helpers.ContextUtils.applyContextPrefix;

/**
* Multiplies the "size" parameter on the {@link SearchRequest} by the given scaling factor, storing the original value
* in the request context as "original_size".
*/
public class OversampleRequestProcessor extends AbstractProcessor implements StatefulSearchRequestProcessor {

/**
* Key to reference this processor type from a search pipeline.
*/
public static final String TYPE = "oversample";
static final String SAMPLE_FACTOR = "sample_factor";
static final String ORIGINAL_SIZE = "original_size";
private final double sampleFactor;
private final String contextPrefix;

private OversampleRequestProcessor(String tag, String description, boolean ignoreFailure, double sampleFactor, String contextPrefix) {
super(tag, description, ignoreFailure);
this.sampleFactor = sampleFactor;
this.contextPrefix = contextPrefix;
}

@Override
public SearchRequest processRequest(SearchRequest request, PipelineProcessingContext requestContext) {
if (request.source() != null) {
int originalSize = request.source().size();
if (originalSize == -1) {
originalSize = SearchService.DEFAULT_SIZE;
}
requestContext.setAttribute(applyContextPrefix(contextPrefix, ORIGINAL_SIZE), originalSize);
int newSize = (int) Math.ceil(originalSize * sampleFactor);
request.source().size(newSize);
}
return request;
}

@Override
public String getType() {
return TYPE;
}

static class Factory implements Processor.Factory<SearchRequestProcessor> {
@Override
public OversampleRequestProcessor create(
Map<String, Processor.Factory<SearchRequestProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) {
double sampleFactor = ConfigurationUtils.readDoubleProperty(TYPE, tag, config, SAMPLE_FACTOR);
if (sampleFactor < 1.0) {
throw ConfigurationUtils.newConfigurationException(TYPE, tag, SAMPLE_FACTOR, "Value must be >= 1.0");
}
String contextPrefix = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, ContextUtils.CONTEXT_PREFIX_PARAMETER);
return new OversampleRequestProcessor(tag, description, ignoreFailure, sampleFactor, contextPrefix);
}
}
}
Loading

0 comments on commit c204585

Please sign in to comment.