Skip to content

Commit

Permalink
Remove ingest processor supports excluding fields (#10967)
Browse files Browse the repository at this point in the history
* Remove ingest processor supports field patterns and excluding fields

Signed-off-by: Gao Binlong <[email protected]>

* Format some code

Signed-off-by: Gao Binlong <[email protected]>

* Fix test failure

Signed-off-by: Gao Binlong <[email protected]>

* Remove the code of field pattern

Signed-off-by: Gao Binlong <[email protected]>

* Add skip version in rest test yml

Signed-off-by: Gao Binlong <[email protected]>

* Make fields and exclude_fields mutually exclusive when constructing RemoveProcessor

Signed-off-by: Gao Binlong <[email protected]>

---------

Signed-off-by: Gao Binlong <[email protected]>
  • Loading branch information
gaobinlong authored Jan 17, 2024
1 parent 517f091 commit 5dd4b61
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
- Per request phase latency ([#10351](https://github.com/opensearch-project/OpenSearch/issues/10351))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- Remove ingest processor supports excluding fields ([#10967](https://github.com/opensearch-project/OpenSearch/pull/10967))
- [Tiered caching] Enabling serialization for IndicesRequestCache key object ([#10275](https://github.com/opensearch-project/OpenSearch/pull/10275))
- [Tiered caching] Defining interfaces, listeners and extending IndicesRequestCache with Tiered cache support ([#10753](https://github.com/opensearch-project/OpenSearch/pull/10753))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.ingest.common;

import org.opensearch.common.Nullable;
import org.opensearch.core.common.Strings;
import org.opensearch.index.VersionType;
import org.opensearch.ingest.AbstractProcessor;
Expand All @@ -42,11 +43,15 @@
import org.opensearch.script.TemplateScript;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* Processor that removes existing fields. Nothing happens if the field is not present.
*/
Expand All @@ -55,54 +60,105 @@ public final class RemoveProcessor extends AbstractProcessor {
public static final String TYPE = "remove";

private final List<TemplateScript.Factory> fields;
private final List<TemplateScript.Factory> excludeFields;
private final boolean ignoreMissing;

RemoveProcessor(String tag, String description, List<TemplateScript.Factory> fields, boolean ignoreMissing) {
RemoveProcessor(
String tag,
String description,
@Nullable List<TemplateScript.Factory> fields,
@Nullable List<TemplateScript.Factory> excludeFields,
boolean ignoreMissing
) {
super(tag, description);
this.fields = new ArrayList<>(fields);
if (fields == null && excludeFields == null || fields != null && excludeFields != null) {
throw new IllegalArgumentException("ether fields and excludeFields must be set");
}
if (fields != null) {
this.fields = new ArrayList<>(fields);
this.excludeFields = null;
} else {
this.fields = null;
this.excludeFields = new ArrayList<>(excludeFields);
}

this.ignoreMissing = ignoreMissing;
}

public List<TemplateScript.Factory> getFields() {
return fields;
}

public List<TemplateScript.Factory> getExcludeFields() {
return excludeFields;
}

@Override
public IngestDocument execute(IngestDocument document) {
fields.forEach(field -> {
String path = document.renderTemplate(field);
final boolean fieldPathIsNullOrEmpty = Strings.isNullOrEmpty(path);
if (fieldPathIsNullOrEmpty || document.hasField(path) == false) {
if (ignoreMissing) {
return;
} else if (fieldPathIsNullOrEmpty) {
throw new IllegalArgumentException("field path cannot be null nor empty");
} else {
throw new IllegalArgumentException("field [" + path + "] doesn't exist");
if (fields != null && !fields.isEmpty()) {
fields.forEach(field -> {
String path = document.renderTemplate(field);
final boolean fieldPathIsNullOrEmpty = Strings.isNullOrEmpty(path);
if (fieldPathIsNullOrEmpty || document.hasField(path) == false) {
if (ignoreMissing) {
return;
} else if (fieldPathIsNullOrEmpty) {
throw new IllegalArgumentException("field path cannot be null nor empty");
} else {
throw new IllegalArgumentException("field [" + path + "] doesn't exist");
}
}
}
// cannot remove _index, _version and _version_type.
if (path.equals(IngestDocument.Metadata.INDEX.getFieldName())
|| path.equals(IngestDocument.Metadata.VERSION.getFieldName())
|| path.equals(IngestDocument.Metadata.VERSION_TYPE.getFieldName())) {
throw new IllegalArgumentException("cannot remove metadata field [" + path + "]");
}
// removing _id is disallowed when there's an external version specified in the request
if (path.equals(IngestDocument.Metadata.ID.getFieldName())
&& document.hasField(IngestDocument.Metadata.VERSION_TYPE.getFieldName())) {
String versionType = document.getFieldValue(IngestDocument.Metadata.VERSION_TYPE.getFieldName(), String.class);
if (!Objects.equals(versionType, VersionType.toString(VersionType.INTERNAL))) {
Long version = document.getFieldValue(IngestDocument.Metadata.VERSION.getFieldName(), Long.class, true);
throw new IllegalArgumentException(
"cannot remove metadata field [_id] when specifying external version for the document, version: "
+ version
+ ", version_type: "
+ versionType
);

// cannot remove _index, _version and _version_type.
if (path.equals(IngestDocument.Metadata.INDEX.getFieldName())
|| path.equals(IngestDocument.Metadata.VERSION.getFieldName())
|| path.equals(IngestDocument.Metadata.VERSION_TYPE.getFieldName())) {
throw new IllegalArgumentException("cannot remove metadata field [" + path + "]");
}
// removing _id is disallowed when there's an external version specified in the request
if (path.equals(IngestDocument.Metadata.ID.getFieldName())
&& document.hasField(IngestDocument.Metadata.VERSION_TYPE.getFieldName())) {
String versionType = document.getFieldValue(IngestDocument.Metadata.VERSION_TYPE.getFieldName(), String.class);
if (!Objects.equals(versionType, VersionType.toString(VersionType.INTERNAL))) {
Long version = document.getFieldValue(IngestDocument.Metadata.VERSION.getFieldName(), Long.class, true);
throw new IllegalArgumentException(
"cannot remove metadata field [_id] when specifying external version for the document, version: "
+ version
+ ", version_type: "
+ versionType
);
}
}
document.removeField(path);
});
}

if (excludeFields != null && !excludeFields.isEmpty()) {
Set<String> excludeFieldSet = new HashSet<>();
excludeFields.forEach(field -> {
String path = document.renderTemplate(field);
// ignore the empty or null field path
if (!Strings.isNullOrEmpty(path)) {
excludeFieldSet.add(path);
}
});

if (!excludeFieldSet.isEmpty()) {
Set<String> existingFields = new HashSet<>(document.getSourceAndMetadata().keySet());
Set<String> metadataFields = document.getMetadata()
.keySet()
.stream()
.map(IngestDocument.Metadata::getFieldName)
.collect(Collectors.toSet());
existingFields.forEach(field -> {
// ignore metadata fields such as _index, _id, etc.
if (!metadataFields.contains(field) && !excludeFieldSet.contains(field)) {
document.removeField(field);
}
});
}
document.removeField(path);
});
}

return document;
}

Expand All @@ -127,20 +183,41 @@ public RemoveProcessor create(
Map<String, Object> config
) throws Exception {
final List<String> fields = new ArrayList<>();
final Object field = ConfigurationUtils.readObject(TYPE, processorTag, config, "field");
if (field instanceof List) {
@SuppressWarnings("unchecked")
List<String> stringList = (List<String>) field;
fields.addAll(stringList);
} else {
fields.add((String) field);
final List<String> excludeFields = new ArrayList<>();
final Object field = ConfigurationUtils.readOptionalObject(config, "field");
final Object excludeField = ConfigurationUtils.readOptionalObject(config, "exclude_field");

if (field == null && excludeField == null || field != null && excludeField != null) {
throw newConfigurationException(TYPE, processorTag, "field", "ether field or exclude_field must be set");
}

final List<TemplateScript.Factory> compiledTemplates = fields.stream()
.map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", f, scriptService))
.collect(Collectors.toList());
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
return new RemoveProcessor(processorTag, description, compiledTemplates, ignoreMissing);

if (field != null) {
if (field instanceof List) {
@SuppressWarnings("unchecked")
List<String> stringList = (List<String>) field;
fields.addAll(stringList);
} else {
fields.add((String) field);
}
List<TemplateScript.Factory> fieldCompiledTemplates = fields.stream()
.map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", f, scriptService))
.collect(Collectors.toList());
return new RemoveProcessor(processorTag, description, fieldCompiledTemplates, null, ignoreMissing);
} else {
if (excludeField instanceof List) {
@SuppressWarnings("unchecked")
List<String> stringList = (List<String>) excludeField;
excludeFields.addAll(stringList);
} else {
excludeFields.add((String) excludeField);
}
List<TemplateScript.Factory> excludeFieldCompiledTemplates = excludeFields.stream()
.map(f -> ConfigurationUtils.compileTemplate(TYPE, processorTag, "exclude_field", f, scriptService))
.collect(Collectors.toList());
return new RemoveProcessor(processorTag, description, null, excludeFieldCompiledTemplates, ignoreMissing);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -79,16 +80,6 @@ public void testCreateMultipleFields() throws Exception {
);
}

public void testCreateMissingField() throws Exception {
Map<String, Object> config = new HashMap<>();
try {
factory.create(null, null, null, config);
fail("factory create should have failed");
} catch (OpenSearchParseException e) {
assertThat(e.getMessage(), equalTo("[field] required property is missing"));
}
}

public void testInvalidMustacheTemplate() throws Exception {
RemoveProcessor.Factory factory = new RemoveProcessor.Factory(TestTemplateService.instance(true));
Map<String, Object> config = new HashMap<>();
Expand All @@ -98,4 +89,31 @@ public void testInvalidMustacheTemplate() throws Exception {
assertThat(exception.getMessage(), equalTo("java.lang.RuntimeException: could not compile script"));
assertThat(exception.getMetadata("opensearch.processor_tag").get(0), equalTo(processorTag));
}

public void testCreateWithExcludeField() throws Exception {
Map<String, Object> config = new HashMap<>();
String processorTag = randomAlphaOfLength(10);
OpenSearchException exception = expectThrows(
OpenSearchParseException.class,
() -> factory.create(null, processorTag, null, config)
);
assertThat(exception.getMessage(), equalTo("[field] ether field or exclude_field must be set"));

Map<String, Object> config2 = new HashMap<>();
config2.put("field", "field1");
config2.put("exclude_field", "field2");
exception = expectThrows(OpenSearchParseException.class, () -> factory.create(null, processorTag, null, config2));
assertThat(exception.getMessage(), equalTo("[field] ether field or exclude_field must be set"));

Map<String, Object> config6 = new HashMap<>();
config6.put("exclude_field", "exclude_field");
RemoveProcessor removeProcessor = factory.create(null, processorTag, null, config6);
assertThat(
removeProcessor.getExcludeFields()
.stream()
.map(template -> template.newInstance(Collections.emptyMap()).execute())
.collect(Collectors.toList()),
equalTo(List.of("exclude_field"))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
import org.opensearch.ingest.Processor;
import org.opensearch.ingest.RandomDocumentPicks;
import org.opensearch.ingest.TestTemplateService;
import org.opensearch.script.TemplateScript;
import org.opensearch.test.OpenSearchTestCase;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -57,12 +59,28 @@ public void testRemoveFields() throws Exception {
randomAlphaOfLength(10),
null,
Collections.singletonList(new TestTemplateService.MockTemplateScript.Factory(field)),
null,
false
);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(field), equalTo(false));
}

public void testRemoveByExcludeFields() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
ingestDocument.setFieldValue("foo_1", "value");
ingestDocument.setFieldValue("foo_2", "value");
ingestDocument.setFieldValue("foo_3", "value");
List<TemplateScript.Factory> excludeFields = new ArrayList<>();
excludeFields.add(new TestTemplateService.MockTemplateScript.Factory("foo_1"));
excludeFields.add(new TestTemplateService.MockTemplateScript.Factory("foo_2"));
Processor processor = new RemoveProcessor(randomAlphaOfLength(10), null, null, excludeFields, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField("foo_1"), equalTo(true));
assertThat(ingestDocument.hasField("foo_2"), equalTo(true));
assertThat(ingestDocument.hasField("foo_3"), equalTo(false));
}

public void testRemoveNonExistingField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Expand Down Expand Up @@ -183,6 +201,34 @@ public void testRemoveMetadataField() throws Exception {
}
}

public void testCreateRemoveProcessorWithBothFieldsAndExcludeFields() throws Exception {
assertThrows(
"ether fields and excludeFields must be set",
IllegalArgumentException.class,
() -> new RemoveProcessor(randomAlphaOfLength(10), null, null, null, false)
);

final List<TemplateScript.Factory> fields;
if (randomBoolean()) {
fields = new ArrayList<>();
} else {
fields = List.of(new TestTemplateService.MockTemplateScript.Factory("foo_1"));
}

final List<TemplateScript.Factory> excludeFields;
if (randomBoolean()) {
excludeFields = new ArrayList<>();
} else {
excludeFields = List.of(new TestTemplateService.MockTemplateScript.Factory("foo_2"));
}

assertThrows(
"ether fields and excludeFields must be set",
IllegalArgumentException.class,
() -> new RemoveProcessor(randomAlphaOfLength(10), null, fields, excludeFields, false)
);
}

public void testRemoveDocumentId() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", IngestDocument.Metadata.ID.getFieldName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,43 @@ teardown:
}
- match: { docs.0.error.type: "illegal_argument_exception" }
- match: { docs.0.error.reason: "cannot remove metadata field [_id] when specifying external version for the document, version: 1, version_type: external_gte" }

# Related issue: https://github.com/opensearch-project/OpenSearch/issues/1578
---
"Test remove processor with exclude_field":
- skip:
version: " - 2.11.99"
reason: "exclude_field is introduced in 2.12"
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [
{
"remove" : {
"exclude_field": "bar"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "my_pipeline"
body: {
foo1: "bar",
foo2: "bar",
bar: "zoo",
zoo: "bar"
}

- do:
get:
index: test
id: 1
- match: { _source: { bar: "zoo"}}
Loading

0 comments on commit 5dd4b61

Please sign in to comment.