Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set ingest processor supports copying from one field to another #10529

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add template snippets support for field and target_field in KV ingest processor ([#10040](https://github.com/opensearch-project/OpenSearch/pull/10040))
- Allowing pipeline processors to access index mapping info by passing ingest service ref as part of the processor factory parameters ([#10307](https://github.com/opensearch-project/OpenSearch/pull/10307))
- Make number of segment metadata files in remote segment store configurable ([#11329](https://github.com/opensearch-project/OpenSearch/pull/11329))
- Set ingest processor supports copying from one field to another ([#10529](https://github.com/opensearch-project/OpenSearch/pull/10529))
- Allow changing number of replicas of searchable snapshot index ([#11317](https://github.com/opensearch-project/OpenSearch/pull/11317))
- Adding slf4j license header to LoggerMessageFormat.java ([#11069](https://github.com/opensearch-project/OpenSearch/pull/11069))
- [BWC and API enforcement] Introduce checks for enforcing the API restrictions ([#11175](https://github.com/opensearch-project/OpenSearch/pull/11175))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@

import java.util.Map;

import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException;

/**
* Processor that adds new fields with their corresponding values. If the field is already present, its value
* will be replaced with the provided one.
Expand All @@ -54,9 +56,10 @@
private final TemplateScript.Factory field;
private final ValueSource value;
private final boolean ignoreEmptyValue;
private final String copyFrom;

SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value) {
this(tag, description, field, value, true, false);
SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, String copyFrom) {
this(tag, description, field, value, true, false, copyFrom);
}

SetProcessor(
Expand All @@ -65,13 +68,15 @@
TemplateScript.Factory field,
ValueSource value,
boolean overrideEnabled,
boolean ignoreEmptyValue
boolean ignoreEmptyValue,
String copyFrom
) {
super(tag, description);
this.overrideEnabled = overrideEnabled;
this.field = field;
this.value = value;
this.ignoreEmptyValue = ignoreEmptyValue;
this.copyFrom = copyFrom;
}

public boolean isOverrideEnabled() {
Expand All @@ -90,10 +95,27 @@
return ignoreEmptyValue;
}

public String getCopyFrom() {
return copyFrom;

Check warning on line 99 in modules/ingest-common/src/main/java/org/opensearch/ingest/common/SetProcessor.java

View check run for this annotation

Codecov / codecov/patch

modules/ingest-common/src/main/java/org/opensearch/ingest/common/SetProcessor.java#L99

Added line #L99 was not covered by tests
}

@Override
public IngestDocument execute(IngestDocument document) {
if (overrideEnabled || document.hasField(field) == false || document.getFieldValue(field, Object.class) == null) {
document.setFieldValue(field, value, ignoreEmptyValue);
if (copyFrom != null) {
String path = document.renderTemplate(field);
if (copyFrom.isEmpty()) {
throw new IllegalArgumentException("copy_from cannot be empty");
}
Object sourceFieldValue = document.getFieldValue(copyFrom, Object.class, ignoreEmptyValue);
if (ignoreEmptyValue
&& (sourceFieldValue == null || sourceFieldValue instanceof String && ((String) sourceFieldValue).isEmpty())) {
return document;
}
document.setFieldValue(path, IngestDocument.deepCopy(sourceFieldValue));
} else {
document.setFieldValue(field, value, ignoreEmptyValue);
}
}
return document;
}
Expand All @@ -119,18 +141,20 @@
Map<String, Object> config
) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "override", true);
TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", field, scriptService);
boolean ignoreEmptyValue = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_empty_value", false);
return new SetProcessor(
processorTag,
description,
compiledTemplate,
ValueSource.wrap(value, scriptService),
overrideEnabled,
ignoreEmptyValue
);
String copyFrom = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "copy_from");

ValueSource valueSource = null;
if (copyFrom == null) {
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
valueSource = ValueSource.wrap(value, scriptService);
} else if (config.get("value") != null) {
throw newConfigurationException(TYPE, processorTag, "copy_from", "either copy_from or value can be set");
}

return new SetProcessor(processorTag, description, compiledTemplate, valueSource, overrideEnabled, ignoreEmptyValue, copyFrom);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ public void testRestOfTheDocumentIsAvailable() throws Exception {
"_tag",
null,
new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"),
(model) -> model.get("other")
(model) -> model.get("other"),
null
),
false
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,16 @@ public void testInvalidMustacheTemplate() throws Exception {
assertThat(exception.getMetadata("opensearch.processor_tag").get(0), equalTo(processorTag));
}

public void testCopyFrom() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("value", "value1");
config.put("copy_from", "field2");
String processorTag = randomAlphaOfLength(10);
assertThrows(
"either copy_from or value can be set",
OpenSearchParseException.class,
() -> factory.create(null, processorTag, null, config)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.hamcrest.Matchers;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

Expand All @@ -51,7 +53,7 @@ public void testSetExistingFields() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
String fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument);
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
Processor processor = createSetProcessor(fieldName, fieldValue, true, false);
Processor processor = createSetProcessor(fieldName, fieldValue, true, false, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -63,7 +65,7 @@ public void testSetNewFields() throws Exception {
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
String fieldName = RandomDocumentPicks.addRandomField(random(), testIngestDocument, fieldValue);
Processor processor = createSetProcessor(fieldName, fieldValue, true, false);
Processor processor = createSetProcessor(fieldName, fieldValue, true, false, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -72,7 +74,7 @@ public void testSetNewFields() throws Exception {
public void testSetFieldsTypeMismatch() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
ingestDocument.setFieldValue("field", "value");
Processor processor = createSetProcessor("field.inner", "value", true, false);
Processor processor = createSetProcessor("field.inner", "value", true, false, null);
try {
processor.execute(ingestDocument);
fail("processor execute should have failed");
Expand All @@ -88,7 +90,7 @@ public void testSetNewFieldWithOverrideDisabled() throws Exception {
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
String fieldName = RandomDocumentPicks.randomFieldName(random());
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
Processor processor = createSetProcessor(fieldName, fieldValue, false, false);
Processor processor = createSetProcessor(fieldName, fieldValue, false, false, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -98,7 +100,7 @@ public void testSetExistingFieldWithOverrideDisabled() throws Exception {
IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>());
Object fieldValue = "foo";
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = createSetProcessor(fieldName, "bar", false, false);
Processor processor = createSetProcessor(fieldName, "bar", false, false, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue));
Expand All @@ -109,60 +111,110 @@ public void testSetExistingNullFieldWithOverrideDisabled() throws Exception {
Object fieldValue = null;
Object newValue = "bar";
String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue);
Processor processor = createSetProcessor(fieldName, newValue, false, false);
Processor processor = createSetProcessor(fieldName, newValue, false, false, null);
processor.execute(ingestDocument);
assertThat(ingestDocument.hasField(fieldName), equalTo(true));
assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(newValue));
}

public void testSetMetadataExceptVersion() throws Exception {
Metadata randomMetadata = randomFrom(Metadata.INDEX, Metadata.ID, Metadata.ROUTING);
Processor processor = createSetProcessor(randomMetadata.getFieldName(), "_value", true, false);
Processor processor = createSetProcessor(randomMetadata.getFieldName(), "_value", true, false, null);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(randomMetadata.getFieldName(), String.class), Matchers.equalTo("_value"));
}

public void testSetMetadataVersion() throws Exception {
long version = randomNonNegativeLong();
Processor processor = createSetProcessor(Metadata.VERSION.getFieldName(), version, true, false);
Processor processor = createSetProcessor(Metadata.VERSION.getFieldName(), version, true, false, null);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.VERSION.getFieldName(), Long.class), Matchers.equalTo(version));
}

public void testSetMetadataVersionType() throws Exception {
String versionType = randomFrom("internal", "external", "external_gte");
Processor processor = createSetProcessor(Metadata.VERSION_TYPE.getFieldName(), versionType, true, false);
Processor processor = createSetProcessor(Metadata.VERSION_TYPE.getFieldName(), versionType, true, false, null);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.VERSION_TYPE.getFieldName(), String.class), Matchers.equalTo(versionType));
}

public void testSetMetadataIfSeqNo() throws Exception {
long ifSeqNo = randomNonNegativeLong();
Processor processor = createSetProcessor(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo, true, false);
Processor processor = createSetProcessor(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo, true, false, null);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.IF_SEQ_NO.getFieldName(), Long.class), Matchers.equalTo(ifSeqNo));
}

public void testSetMetadataIfPrimaryTerm() throws Exception {
long ifPrimaryTerm = randomNonNegativeLong();
Processor processor = createSetProcessor(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm, true, false);
Processor processor = createSetProcessor(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm, true, false, null);
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), Long.class), Matchers.equalTo(ifPrimaryTerm));
}

private static Processor createSetProcessor(String fieldName, Object fieldValue, boolean overrideEnabled, boolean ignoreEmptyValue) {
public void testCopyFromWithIgnoreEmptyValue() throws Exception {
// do nothing if copy_from field does not exist
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
String newFieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = createSetProcessor(newFieldName, null, true, true, RandomDocumentPicks.randomFieldName(random()));
processor.execute(ingestDocument);
assertFalse(ingestDocument.hasField(newFieldName));

// throw illegalArgumentException if copy_from is empty string
Processor processorWithEmptyCopyFrom = createSetProcessor(newFieldName, null, true, true, "");
assertThrows("copy_from cannot be empty", IllegalArgumentException.class, () -> processorWithEmptyCopyFrom.execute(ingestDocument));
}

public void testCopyFromOtherField() throws Exception {
// can copy different types of data from one field to another
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
Object existingFieldValue = RandomDocumentPicks.randomFieldValue(random());
String existingFieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, existingFieldValue);
String newFieldName = RandomDocumentPicks.randomFieldName(random());
Processor processor = createSetProcessor(newFieldName, null, true, false, existingFieldName);
processor.execute(ingestDocument);
assertTrue(ingestDocument.hasField(newFieldName));
assertDeepCopiedObjectEquals(ingestDocument.getFieldValue(newFieldName, Object.class), existingFieldValue);
}

@SuppressWarnings("unchecked")
private static void assertDeepCopiedObjectEquals(Object expected, Object actual) {
if (expected instanceof Map) {
Map<String, Object> expectedMap = (Map<String, Object>) expected;
Map<String, Object> actualMap = (Map<String, Object>) actual;
assertEquals(expectedMap.size(), actualMap.size());
for (Map.Entry<String, Object> expectedEntry : expectedMap.entrySet()) {
assertDeepCopiedObjectEquals(expectedEntry.getValue(), actualMap.get(expectedEntry.getKey()));
}
} else if (expected instanceof List) {
assertArrayEquals(((List<?>) expected).toArray(), ((List<?>) actual).toArray());
} else if (expected instanceof byte[]) {
assertArrayEquals((byte[]) expected, (byte[]) actual);
} else {
assertEquals(expected, actual);
}
}

private static Processor createSetProcessor(
String fieldName,
Object fieldValue,
boolean overrideEnabled,
boolean ignoreEmptyValue,
String copyFrom
) {
return new SetProcessor(
randomAlphaOfLength(10),
null,
new TestTemplateService.MockTemplateScript.Factory(fieldName),
ValueSource.wrap(fieldValue, TestTemplateService.instance()),
overrideEnabled,
ignoreEmptyValue
ignoreEmptyValue,
copyFrom
);
}
}
Loading
Loading