Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Search pipelines] Add Global Ignore_failure options for Processors #8373

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Adds mock implementation for TelemetryPlugin ([#7545](https://github.com/opensearch-project/OpenSearch/issues/7545))
- Support transport action names when registering NamedRoutes ([#7957](https://github.com/opensearch-project/OpenSearch/pull/7957))
- Create concept of persistent ThreadContext headers that are unstashable ([#8291]()https://github.com/opensearch-project/OpenSearch/pull/8291)
- [Search pipelines] Add Global Ignore_failure options for Processors ([#8373](https://github.com/opensearch-project/OpenSearch/pull/8373))
- Enable Partial Flat Object ([#7997](https://github.com/opensearch-project/OpenSearch/pull/7997))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.SearchRequestProcessor;

import java.io.InputStream;
Expand Down Expand Up @@ -53,12 +54,13 @@ public String getType() {
/**
* Constructor that takes a filter query.
*
* @param tag processor tag
* @param description processor description
* @param tag processor tag
* @param description processor description
* @param ignoreFailure option to ignore failure
* @param filterQuery the query that will be added as a filter to incoming queries
*/
public FilterQueryRequestProcessor(String tag, String description, QueryBuilder filterQuery) {
super(tag, description);
FilterQueryRequestProcessor(String tag, String description, boolean ignoreFailure, QueryBuilder filterQuery) {
super(tag, description, ignoreFailure);
this.filterQuery = filterQuery;
}

Expand Down Expand Up @@ -101,6 +103,7 @@ public FilterQueryRequestProcessor create(
Map<String, Processor.Factory<SearchRequestProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
Expand All @@ -114,7 +117,7 @@ public FilterQueryRequestProcessor create(
XContentParser parser = XContentType.JSON.xContent()
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)
) {
return new FilterQueryRequestProcessor(tag, description, parseInnerQueryBuilder(parser));
return new FilterQueryRequestProcessor(tag, description, ignoreFailure, parseInnerQueryBuilder(parser));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

Expand All @@ -41,14 +42,22 @@ public class RenameFieldResponseProcessor extends AbstractProcessor implements S
/**
* Constructor that takes a target field to rename and the new name
*
* @param tag processor tag
* @param description processor description
* @param oldField name of field to be renamed
* @param newField name of field that will replace the old field
* @param tag processor tag
* @param description processor description
* @param ignoreFailure option to ignore failure
* @param oldField name of field to be renamed
* @param newField name of field that will replace the old field
* @param ignoreMissing if true, do not throw error if oldField does not exist within search response
*/
public RenameFieldResponseProcessor(String tag, String description, String oldField, String newField, boolean ignoreMissing) {
super(tag, description);
public RenameFieldResponseProcessor(
String tag,
String description,
boolean ignoreFailure,
String oldField,
String newField,
boolean ignoreMissing
) {
super(tag, description, ignoreFailure);
this.oldField = oldField;
this.newField = newField;
this.ignoreMissing = ignoreMissing;
Expand Down Expand Up @@ -140,13 +149,14 @@ public RenameFieldResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
String oldField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
String newField = ConfigurationUtils.readStringProperty(TYPE, tag, config, "target_field");
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, "ignore_missing", false);
return new RenameFieldResponseProcessor(tag, description, oldField, newField, ignoreMissing);
return new RenameFieldResponseProcessor(tag, description, ignoreFailure, oldField, newField, ignoreMissing);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.script.ScriptType;
import org.opensearch.script.SearchScript;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.common.helpers.SearchRequestMap;

Expand Down Expand Up @@ -54,18 +55,20 @@ public final class ScriptRequestProcessor extends AbstractProcessor implements S
*
* @param tag The processor's tag.
* @param description The processor's description.
* @param ignoreFailure The option to ignore failure
* @param script The {@link Script} to execute.
* @param precompiledSearchScript The {@link Script} precompiled
* @param scriptService The {@link ScriptService} used to execute the script.
*/
ScriptRequestProcessor(
String tag,
String description,
boolean ignoreFailure,
Script script,
@Nullable SearchScript precompiledSearchScript,
ScriptService scriptService
) {
super(tag, description);
super(tag, description, ignoreFailure);
this.script = script;
this.precompiledSearchScript = precompiledSearchScript;
this.scriptService = scriptService;
Expand Down Expand Up @@ -146,6 +149,7 @@ public ScriptRequestProcessor create(
Map<String, Processor.Factory<SearchRequestProcessor>> registry,
String processorTag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
Expand Down Expand Up @@ -174,7 +178,7 @@ public ScriptRequestProcessor create(
} catch (ScriptException e) {
throw newConfigurationException(TYPE, processorTag, null, e);
}
return new ScriptRequestProcessor(processorTag, description, script, searchScript, scriptService);
return new ScriptRequestProcessor(processorTag, description, ignoreFailure, script, searchScript, scriptService);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class FilterQueryRequestProcessorTests extends AbstractBuilderTestCase {

public void testFilterQuery() throws Exception {
QueryBuilder filterQuery = new TermQueryBuilder("field", "value");
FilterQueryRequestProcessor filterQueryRequestProcessor = new FilterQueryRequestProcessor(null, null, filterQuery);
FilterQueryRequestProcessor filterQueryRequestProcessor = new FilterQueryRequestProcessor(null, null, false, filterQuery);
QueryBuilder incomingQuery = new TermQueryBuilder("text", "foo");
SearchSourceBuilder source = new SearchSourceBuilder().query(incomingQuery);
SearchRequest request = new SearchRequest().source(source);
Expand All @@ -39,13 +39,13 @@ public void testFilterQuery() throws Exception {
public void testFactory() throws Exception {
FilterQueryRequestProcessor.Factory factory = new FilterQueryRequestProcessor.Factory(this.xContentRegistry());
Map<String, Object> configMap = new HashMap<>(Map.of("query", Map.of("term", Map.of("field", "value"))));
FilterQueryRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, configMap, null);
FilterQueryRequestProcessor processor = factory.create(Collections.emptyMap(), null, null, false, configMap, null);
assertEquals(new TermQueryBuilder("field", "value"), processor.filterQuery);

// Missing "query" parameter:
expectThrows(
IllegalArgumentException.class,
() -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null)
() -> factory.create(Collections.emptyMap(), null, null, false, Collections.emptyMap(), null)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void testRenameResponse() throws Exception {
RenameFieldResponseProcessor renameFieldResponseProcessor = new RenameFieldResponseProcessor(
null,
null,
false,
"field 0",
"new field",
false
Expand All @@ -74,6 +75,7 @@ public void testRenameResponseWithMapping() throws Exception {
RenameFieldResponseProcessor renameFieldResponseProcessor = new RenameFieldResponseProcessor(
null,
null,
false,
"field 0",
"new field",
true
Expand All @@ -97,6 +99,7 @@ public void testMissingField() throws Exception {
RenameFieldResponseProcessor renameFieldResponseProcessor = new RenameFieldResponseProcessor(
null,
null,
false,
"field",
"new field",
false
Expand All @@ -115,15 +118,15 @@ public void testFactory() throws Exception {
config.put("target_field", newField);

RenameFieldResponseProcessor.Factory factory = new RenameFieldResponseProcessor.Factory();
RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, config, null);
RenameFieldResponseProcessor processor = factory.create(Collections.emptyMap(), null, null, false, config, null);
assertEquals(processor.getType(), "rename_field");
assertEquals(processor.getOldField(), oldField);
assertEquals(processor.getNewField(), newField);
assertFalse(processor.isIgnoreMissing());

expectThrows(
OpenSearchParseException.class,
() -> factory.create(Collections.emptyMap(), null, null, Collections.emptyMap(), null)
() -> factory.create(Collections.emptyMap(), null, null, false, Collections.emptyMap(), null)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void setupScripting() {
}

public void testScriptingWithoutPrecompiledScriptFactory() throws Exception {
ScriptRequestProcessor processor = new ScriptRequestProcessor(randomAlphaOfLength(10), null, script, null, scriptService);
ScriptRequestProcessor processor = new ScriptRequestProcessor(randomAlphaOfLength(10), null, false, script, null, scriptService);
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(createSearchSourceBuilder());

Expand All @@ -92,7 +92,14 @@ public void testScriptingWithoutPrecompiledScriptFactory() throws Exception {
}

public void testScriptingWithPrecompiledIngestScript() throws Exception {
ScriptRequestProcessor processor = new ScriptRequestProcessor(randomAlphaOfLength(10), null, script, searchScript, scriptService);
ScriptRequestProcessor processor = new ScriptRequestProcessor(
randomAlphaOfLength(10),
null,
false,
script,
searchScript,
scriptService
);
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(createSearchSourceBuilder());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ teardown:
}
- match: { acknowledged: true }

- do:
search_pipeline.put:
id: "my_pipeline_4"
body: >
{
"description": "test pipeline with ignore missing false and ignore failure true",
"response_processors": [
{
"rename_field":
{
"field": "aa",
"target_field": "b",
"ignore_missing": false,
"ignore_failure": true
}
}
]
}
- match: { acknowledged: true }

- do:
indices.create:
index: test
Expand Down Expand Up @@ -119,15 +139,24 @@ teardown:
- match: { hits.total.value: 1 }
- match: {hits.hits.0._source: { "a": "foo" } }

# Pipeline with ignore_missing set to true
# Should still pass even though index does not contain field
# Pipeline with ignore_missing set to false
# Should throw illegal_argument_exception
- do:
catch: bad_request
search:
index: test
search_pipeline: "my_pipeline_3"
- match: { error.type: "illegal_argument_exception" }

# Pipeline with ignore_missing set to false and ignore_failure set to true
# Should return while catching error
- do:
search:
index: test
search_pipeline: "my_pipeline_4"
- match: { hits.total.value: 1 }
- match: {hits.hits.0._source: { "a": "foo" } }

# No source, using stored_fields
- do:
search:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public final class ConfigurationUtils {

public static final String TAG_KEY = "tag";
public static final String DESCRIPTION_KEY = "description";
public static final String IGNORE_FAILURE_KEY = "ignore_failure";
mingshl marked this conversation as resolved.
Show resolved Hide resolved

private ConfigurationUtils() {}

Expand Down Expand Up @@ -194,7 +195,7 @@ public static String readOptionalStringOrIntProperty(
return readStringOrInt(processorType, processorTag, propertyName, value);
}

public static Boolean readBooleanProperty(
public static boolean readBooleanProperty(
String processorType,
String processorTag,
Map<String, Object> configuration,
Expand All @@ -214,7 +215,7 @@ private static Boolean readBoolean(String processorType, String processorTag, St
return null;
}
if (value instanceof Boolean) {
return (Boolean) value;
return (boolean) value;
}
throw newConfigurationException(
processorType,
Expand Down Expand Up @@ -530,10 +531,11 @@ public static Processor readProcessor(
) throws Exception {
String tag = ConfigurationUtils.readOptionalStringProperty(null, null, config, TAG_KEY);
String description = ConfigurationUtils.readOptionalStringProperty(null, tag, config, DESCRIPTION_KEY);
boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, IGNORE_FAILURE_KEY, false);
Script conditionalScript = extractConditional(config);
Processor.Factory factory = processorFactories.get(type);

if (factory != null) {
boolean ignoreFailure = ConfigurationUtils.readBooleanProperty(null, null, config, "ignore_failure", false);
List<Map<String, Object>> onFailureProcessorConfigs = ConfigurationUtils.readOptionalList(
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.search.pipeline.Processor;
package org.opensearch.search.pipeline;

/**
* Base class for common processor behavior.
*/
abstract class AbstractProcessor implements Processor {
public abstract class AbstractProcessor implements Processor {
mingshl marked this conversation as resolved.
Show resolved Hide resolved
private final String tag;
private final String description;
private final boolean ignoreFailure;

protected AbstractProcessor(String tag, String description) {
protected AbstractProcessor(String tag, String description, boolean ignoreFailure) {
this.tag = tag;
this.description = description;
this.ignoreFailure = ignoreFailure;
}

@Override
Expand All @@ -31,4 +31,9 @@ public String getTag() {
public String getDescription() {
return description;
}

@Override
public boolean isIgnoreFailure() {
return ignoreFailure;
}
}
Loading