Skip to content

Commit

Permalink
[Search pipeline] Add script processor (opensearch-project#7607)
Browse files Browse the repository at this point in the history
Be able to insert a painless or mustache script to manipulate search requests.

Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger authored May 22, 2023
1 parent 47d4f38 commit 959910b
Show file tree
Hide file tree
Showing 16 changed files with 1,098 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Search Pipelines] Add `default_search_pipeline` index setting ([#7470](https://github.com/opensearch-project/OpenSearch/pull/7470))
- [Search Pipelines] Add RenameFieldResponseProcessor for Search Pipelines ([#7377](https://github.com/opensearch-project/OpenSearch/pull/7377))
- [Search Pipelines] Split search pipeline processor factories by type ([#7597](https://github.com/opensearch-project/OpenSearch/pull/7597))
- [Search Pipelines] Add script processor ([#7607](https://github.com/opensearch-project/OpenSearch/pull/7607))
- Add descending order search optimization through reverse segment read. ([#7244](https://github.com/opensearch-project/OpenSearch/pull/7244))
- Add 'unsigned_long' numeric field type ([#6237](https://github.com/opensearch-project/OpenSearch/pull/6237))
- Add back primary shard preference for queries ([#7375](https://github.com/opensearch-project/OpenSearch/pull/7375))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
- do:
scripts_painless_context: {}
- match: { contexts.0: aggregation_selector}
- match: { contexts.22: update}
- match: { contexts.23: update}
---

"Action to get all API values for score context":
Expand Down
2 changes: 2 additions & 0 deletions modules/search-pipeline-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ apply plugin: 'opensearch.internal-cluster-test'
opensearchplugin {
description 'Module for search pipeline processors that do not require additional security permissions or have large dependencies and resources'
classname 'org.opensearch.search.pipeline.common.SearchPipelineCommonModulePlugin'
extendedPlugins = ['lang-painless']
}

dependencies {
compileOnly project(':modules:lang-painless')
}

restResources {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public class FilterQueryRequestProcessor extends AbstractProcessor implements Se

final QueryBuilder filterQuery;

/**
* Returns the type of the processor.
*
* @return The processor type.
*/
@Override
public String getType() {
return TYPE;
Expand All @@ -57,6 +62,14 @@ public FilterQueryRequestProcessor(String tag, String description, QueryBuilder
this.filterQuery = filterQuery;
}

/**
* Modifies the search request by adding a filtered query to the existing query, if any, and sets it as the new query
* in the search request's SearchSourceBuilder.
*
* @param request The search request to be processed.
* @return The modified search request.
* @throws Exception if an error occurs while processing the request.
*/
@Override
public SearchRequest processRequest(SearchRequest request) throws Exception {
QueryBuilder originalQuery = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;

import org.opensearch.common.Nullable;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.json.JsonXContent;

import org.opensearch.script.Script;
import org.opensearch.script.ScriptException;
import org.opensearch.script.ScriptService;
import org.opensearch.script.ScriptType;
import org.opensearch.script.SearchScript;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.common.helpers.SearchRequestMap;

import java.io.InputStream;
import java.util.Arrays;
import java.util.Map;

import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* Processor that evaluates a script with a search request in its context
* and then returns the modified search request.
*/
public final class ScriptRequestProcessor extends AbstractProcessor implements SearchRequestProcessor {
/**
* Key to reference this processor type from a search pipeline.
*/
public static final String TYPE = "script";

private final Script script;
private final ScriptService scriptService;
private final SearchScript precompiledSearchScript;

/**
* Processor that evaluates a script with a search request in its context
*
* @param tag The processor's tag.
* @param description The processor's description.
* @param script The {@link Script} to execute.
* @param precompiledSearchScript The {@link Script} precompiled
* @param scriptService The {@link ScriptService} used to execute the script.
*/
ScriptRequestProcessor(
String tag,
String description,
Script script,
@Nullable SearchScript precompiledSearchScript,
ScriptService scriptService
) {
super(tag, description);
this.script = script;
this.precompiledSearchScript = precompiledSearchScript;
this.scriptService = scriptService;
}

/**
* Executes the script with the search request in context.
*
* @param request The search request passed into the script context.
* @return The modified search request.
* @throws Exception if an error occurs while processing the request.
*/
@Override
public SearchRequest processRequest(SearchRequest request) throws Exception {
// assert request is not null and source is not null
if (request == null || request.source() == null) {
throw new IllegalArgumentException("search request must not be null");
}
final SearchScript searchScript;
if (precompiledSearchScript == null) {
SearchScript.Factory factory = scriptService.compile(script, SearchScript.CONTEXT);
searchScript = factory.newInstance(script.getParams());
} else {
searchScript = precompiledSearchScript;
}
// execute the script with the search request in context
searchScript.execute(Map.of("_source", new SearchRequestMap(request)));
return request;
}

/**
* Returns the type of the processor.
*
* @return The processor type.
*/
@Override
public String getType() {
return TYPE;
}

/**
* Returns the script used by the processor.
*
* @return The script.
*/
Script getScript() {
return script;
}

/**
* Returns the precompiled search script used by the processor.
*
* @return The precompiled search script.
*/
SearchScript getPrecompiledSearchScript() {
return precompiledSearchScript;
}

/**
* Factory class for creating {@link ScriptRequestProcessor}.
*/
public static final class Factory implements Processor.Factory<SearchRequestProcessor> {
private final ScriptService scriptService;

/**
* Constructs a new Factory instance with the specified {@link ScriptService}.
*
* @param scriptService The {@link ScriptService} used to execute scripts.
*/
public Factory(ScriptService scriptService) {
this.scriptService = scriptService;
}

/**
* Creates a new instance of {@link ScriptRequestProcessor}.
*
* @param registry The registry of processor factories.
* @param processorTag The processor's tag.
* @param description The processor's description.
* @param config The configuration options for the processor.
* @return The created {@link ScriptRequestProcessor} instance.
* @throws Exception if an error occurs during the creation process.
*/
@Override
public ScriptRequestProcessor create(
Map<String, Processor.Factory<SearchRequestProcessor>> registry,
String processorTag,
String description,
Map<String, Object> config
) throws Exception {
try (
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(config);
InputStream stream = BytesReference.bytes(builder).streamInput();
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)
) {
Script script = Script.parse(parser);

Arrays.asList("id", "source", "inline", "lang", "params", "options").forEach(config::remove);

// verify script is able to be compiled before successfully creating processor.
SearchScript searchScript = null;
try {
final SearchScript.Factory factory = scriptService.compile(script, SearchScript.CONTEXT);
if (ScriptType.INLINE.equals(script.getType())) {
searchScript = factory.newInstance(script.getParams());
}
} catch (ScriptException e) {
throw newConfigurationException(TYPE, processorTag, null, e);
}
return new ScriptRequestProcessor(processorTag, description, script, searchScript, scriptService);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,20 @@ public class SearchPipelineCommonModulePlugin extends Plugin implements SearchPi
*/
public SearchPipelineCommonModulePlugin() {}

/**
* Returns a map of processor factories.
*
* @param parameters The parameters required for creating the processor factories.
* @return A map of processor factories, where the keys are the processor types and the values are the corresponding factory instances.
*/
@Override
public Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessors(Processor.Parameters parameters) {
return Map.of(FilterQueryRequestProcessor.TYPE, new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry));
return Map.of(
FilterQueryRequestProcessor.TYPE,
new FilterQueryRequestProcessor.Factory(parameters.namedXContentRegistry),
ScriptRequestProcessor.TYPE,
new ScriptRequestProcessor.Factory(parameters.scriptService)
);
}

@Override
Expand Down
Loading

0 comments on commit 959910b

Please sign in to comment.