Skip to content

Commit

Permalink
[Search Pipelines] Support ad hoc pipelines (#7253)
Browse files Browse the repository at this point in the history
* [Search Pipelines] Support ad hoc pipelines

This change allows a search pipeline to be defined within a search
request body. This will take precedence over any other search pipeline,
and allows ad hoc testing of pipeline configurations before persisting
the pipeline definition in cluster state.

Signed-off-by: Michael Froh <[email protected]>

* Added changelog entry

Signed-off-by: Michael Froh <[email protected]>

* Incorporate feedback from @reta

Resolve + transform request into PipelinedRequest, then use that to
transform the search response.

Signed-off-by: Michael Froh <[email protected]>

* Incorporate more feedback from @reta

1. Create all streams in try-with-resources.
2. Keep methods in Pipeline package-private.
3. Remove spurious character in CHANGELOG.

Also, I remembered that in a future change, I'm going to call the no-op
pipeline "_none" (like when bypassing an ingest pipeline) so I should
name it "_none" from the start.

Signed-off-by: Michael Froh <[email protected]>

* Incorporate feedback from @andrross

- Added opensearch.internal annotation to PipelinedRequest.
- Made PipelinedRequest final.
- Removed TODO from PipelinedRequest.
- Moved changelog entry to 2.x.

Signed-off-by: Michael Froh <[email protected]>

---------

Signed-off-by: Michael Froh <[email protected]>
  • Loading branch information
msfroh authored May 6, 2023
1 parent 9bf99b4 commit c43d713
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Extensions] Moving Extensions APIs to support cross versions via protobuf. ([#7402](https://github.com/opensearch-project/OpenSearch/issues/7402))
- [Extensions] Add IdentityPlugin into core to support Extension identities ([#7246](https://github.com/opensearch-project/OpenSearch/pull/7246))
- Add connectToNodeAsExtension in TransportService ([#6866](https://github.com/opensearch-project/OpenSearch/pull/6866))
- [Search Pipelines] Accept pipelines defined in search source ([#7253](https://github.com/opensearch-project/OpenSearch/pull/7253))
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237))
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,23 @@ teardown:
}
- match: { hits.total.value: 1 }
- match: { hits.hits.0._id: "2" }

# Should work with inline query
- do:
search:
index: test
body: {
"search_pipeline" : {
"request_processors" : [
"filter_query": {
"query": {
"term": {
"field": "foo"
}
}
}
]
}
}
- match: { hits.total.value: 1 }
- match: { hits.hits.0._id: "1" }
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.pipeline.PipelinedRequest;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
Expand Down Expand Up @@ -390,17 +391,18 @@ private void executeRequest(
System::nanoTime
);
SearchRequest searchRequest;
ActionListener<SearchResponse> listener;
try {
searchRequest = searchPipelineService.transformRequest(originalSearchRequest);
PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(originalSearchRequest);
searchRequest = pipelinedRequest.transformedRequest();
listener = ActionListener.wrap(
r -> originalListener.onResponse(pipelinedRequest.transformResponse(r)),
originalListener::onFailure
);
} catch (Exception e) {
originalListener.onFailure(e);
throw new RuntimeException(e);
}
ActionListener<SearchResponse> listener = ActionListener.wrap(
// TODO: Should we transform responses with the original request or the transformed request? Or both?
r -> originalListener.onResponse(searchPipelineService.transformResponse(originalSearchRequest, r)),
originalListener::onFailure
);

ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
package org.opensearch.search.builder;

import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.ParseField;
import org.opensearch.common.ParsingException;
import org.opensearch.common.Strings;
Expand Down Expand Up @@ -76,6 +78,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
Expand Down Expand Up @@ -127,6 +130,7 @@ public final class SearchSourceBuilder implements Writeable, ToXContentObject, R
public static final ParseField COLLAPSE = new ParseField("collapse");
public static final ParseField SLICE = new ParseField("slice");
public static final ParseField POINT_IN_TIME = new ParseField("pit");
public static final ParseField SEARCH_PIPELINE = new ParseField("search_pipeline");

public static SearchSourceBuilder fromXContent(XContentParser parser) throws IOException {
return fromXContent(parser, true);
Expand Down Expand Up @@ -207,6 +211,8 @@ public static HighlightBuilder highlight() {

private PointInTimeBuilder pointInTimeBuilder = null;

private Map<String, Object> searchPipelineSource = null;

/**
* Constructs a new search source builder.
*/
Expand Down Expand Up @@ -264,6 +270,11 @@ public SearchSourceBuilder(StreamInput in) throws IOException {
fetchFields = in.readList(FieldAndFormat::new);
}
pointInTimeBuilder = in.readOptionalWriteable(PointInTimeBuilder::new);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO: Update if/when we backport to 2.x
if (in.readBoolean()) {
searchPipelineSource = in.readMap();
}
}
}

@Override
Expand Down Expand Up @@ -323,6 +334,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeList(fetchFields);
}
out.writeOptionalWriteable(pointInTimeBuilder);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO: Update if/when we backport to 2.x
out.writeBoolean(searchPipelineSource != null);
if (searchPipelineSource != null) {
out.writeMap(searchPipelineSource);
}
}
}

/**
Expand Down Expand Up @@ -981,6 +998,21 @@ public SearchSourceBuilder pointInTimeBuilder(PointInTimeBuilder builder) {
return this;
}

/**
* @return a search pipeline defined within the search source (see {@link org.opensearch.search.pipeline.SearchPipelineService})
*/
public Map<String, Object> searchPipelineSource() {
return searchPipelineSource;
}

/**
* Define a search pipeline to process this search request and/or its response. See {@link org.opensearch.search.pipeline.SearchPipelineService}.
*/
public SearchSourceBuilder searchPipelineSource(Map<String, Object> searchPipelineSource) {
this.searchPipelineSource = searchPipelineSource;
return this;
}

/**
* Rewrites this search source builder into its primitive form. e.g. by
* rewriting the QueryBuilder. If the builder did not change the identity
Expand Down Expand Up @@ -1218,13 +1250,16 @@ public void parseXContent(XContentParser parser, boolean checkTrailingTokens) th
collapse = CollapseBuilder.fromXContent(parser);
} else if (POINT_IN_TIME.match(currentFieldName, parser.getDeprecationHandler())) {
pointInTimeBuilder = PointInTimeBuilder.fromXContent(parser);
} else {
throw new ParsingException(
parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + currentFieldName + "].",
parser.getTokenLocation()
);
}
} else if (FeatureFlags.isEnabled(FeatureFlags.SEARCH_PIPELINE)
&& SEARCH_PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
searchPipelineSource = parser.mapOrdered();
} else {
throw new ParsingException(
parser.getTokenLocation(),
"Unknown key for a " + token + " in [" + currentFieldName + "].",
parser.getTokenLocation()
);
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (STORED_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
storedFieldsContext = StoredFieldsContext.fromXContent(STORED_FIELDS_FIELD.getPreferredName(), parser);
Expand Down Expand Up @@ -1443,6 +1478,9 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
if (pointInTimeBuilder != null) {
pointInTimeBuilder.toXContent(builder, params);
}
if (searchPipelineSource != null) {
builder.field(SEARCH_PIPELINE.getPreferredName(), searchPipelineSource);
}
return builder;
}

Expand Down
44 changes: 38 additions & 6 deletions server/src/main/java/org/opensearch/search/pipeline/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.ingest.ConfigurationUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -39,22 +44,30 @@ class Pipeline {
private final List<SearchRequestProcessor> searchRequestProcessors;
private final List<SearchResponseProcessor> searchResponseProcessors;

private final NamedWriteableRegistry namedWriteableRegistry;

Pipeline(
String id,
@Nullable String description,
@Nullable Integer version,
List<SearchRequestProcessor> requestProcessors,
List<SearchResponseProcessor> responseProcessors
List<SearchResponseProcessor> responseProcessors,
NamedWriteableRegistry namedWriteableRegistry
) {
this.id = id;
this.description = description;
this.version = version;
this.searchRequestProcessors = requestProcessors;
this.searchResponseProcessors = responseProcessors;
this.namedWriteableRegistry = namedWriteableRegistry;
}

public static Pipeline create(String id, Map<String, Object> config, Map<String, Processor.Factory> processorFactories)
throws Exception {
public static Pipeline create(
String id,
Map<String, Object> config,
Map<String, Processor.Factory> processorFactories,
NamedWriteableRegistry namedWriteableRegistry
) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null);
List<Map<String, Object>> requestProcessorConfigs = ConfigurationUtils.readOptionalList(null, null, config, REQUEST_PROCESSORS_KEY);
Expand Down Expand Up @@ -82,7 +95,7 @@ public static Pipeline create(String id, Map<String, Object> config, Map<String,
+ Arrays.toString(config.keySet().toArray())
);
}
return new Pipeline(id, description, version, requestProcessors, responseProcessors);
return new Pipeline(id, description, version, requestProcessors, responseProcessors, namedWriteableRegistry);
}

@SuppressWarnings("unchecked") // Cast is checked using isInstance
Expand Down Expand Up @@ -143,8 +156,18 @@ List<SearchResponseProcessor> getSearchResponseProcessors() {
}

SearchRequest transformRequest(SearchRequest request) throws Exception {
for (SearchRequestProcessor searchRequestProcessor : searchRequestProcessors) {
request = searchRequestProcessor.processRequest(request);
if (searchRequestProcessors.isEmpty() == false) {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
request.writeTo(bytesStreamOutput);
try (StreamInput in = bytesStreamOutput.bytes().streamInput()) {
try (StreamInput input = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry)) {
request = new SearchRequest(input);
}
}
}
for (SearchRequestProcessor searchRequestProcessor : searchRequestProcessors) {
request = searchRequestProcessor.processRequest(request);
}
}
return request;
}
Expand All @@ -159,4 +182,13 @@ SearchResponse transformResponse(SearchRequest request, SearchResponse response)
throw new SearchPipelineProcessingException(e);
}
}

static final Pipeline NO_OP_PIPELINE = new Pipeline(
SearchPipelineService.NOOP_PIPELINE_ID,
"Pipeline that does not transform anything",
0,
Collections.emptyList(),
Collections.emptyList(),
null
);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;

/**
* Groups a search pipeline based on a request and the request after being transformed by the pipeline.
*
* @opensearch.internal
*/
public final class PipelinedRequest {
private final Pipeline pipeline;
private final SearchRequest transformedRequest;

PipelinedRequest(Pipeline pipeline, SearchRequest transformedRequest) {
this.pipeline = pipeline;
this.transformedRequest = transformedRequest;
}

public SearchResponse transformResponse(SearchResponse response) {
return pipeline.transformResponse(transformedRequest, response);
}

public SearchRequest transformedRequest() {
return transformedRequest;
}

// Visible for testing
Pipeline getPipeline() {
return pipeline;
}
}
Loading

0 comments on commit c43d713

Please sign in to comment.