From d864f1476a8cba587da2e7baba8aab62cfbdf04f Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Thu, 21 Sep 2023 23:01:08 +0800 Subject: [PATCH 1/2] Set ingest processor supports copying from one field to another Signed-off-by: Gao Binlong --- CHANGELOG.md | 1 + .../ingest/common/SetProcessor.java | 50 ++++-- .../ingest/common/ForEachProcessorTests.java | 3 +- .../common/SetProcessorFactoryTests.java | 12 ++ .../ingest/common/SetProcessorTests.java | 78 +++++++-- .../test/ingest/270_set_processor.yml | 155 ++++++++++++++++++ .../org/opensearch/ingest/IngestDocument.java | 2 +- 7 files changed, 273 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a039526d3664..bff4f7d16a830 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -93,6 +93,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204)) - [Remote Store] Add support to restrict creation & deletion if system repository and mutation of immutable settings of system repository ([#9839](https://github.com/opensearch-project/OpenSearch/pull/9839)) - Improve compressed request handling ([#10261](https://github.com/opensearch-project/OpenSearch/pull/10261)) +- Set ingest processor supports copying from one field to another ### Dependencies - Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575)) diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/SetProcessor.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/SetProcessor.java index 949aef5c2d254..308914e0604ec 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/SetProcessor.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/SetProcessor.java @@ -42,6 +42,8 @@ import java.util.Map; +import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; + /** * Processor that adds new fields with their corresponding values. If the field is already present, its value * will be replaced with the provided one. @@ -54,9 +56,10 @@ public final class SetProcessor extends AbstractProcessor { private final TemplateScript.Factory field; private final ValueSource value; private final boolean ignoreEmptyValue; + private final String copyFrom; - SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value) { - this(tag, description, field, value, true, false); + SetProcessor(String tag, String description, TemplateScript.Factory field, ValueSource value, String copyFrom) { + this(tag, description, field, value, true, false, copyFrom); } SetProcessor( @@ -65,13 +68,15 @@ public final class SetProcessor extends AbstractProcessor { TemplateScript.Factory field, ValueSource value, boolean overrideEnabled, - boolean ignoreEmptyValue + boolean ignoreEmptyValue, + String copyFrom ) { super(tag, description); this.overrideEnabled = overrideEnabled; this.field = field; this.value = value; this.ignoreEmptyValue = ignoreEmptyValue; + this.copyFrom = copyFrom; } public boolean isOverrideEnabled() { @@ -90,10 +95,27 @@ public boolean isIgnoreEmptyValue() { return ignoreEmptyValue; } + public String getCopyFrom() { + return copyFrom; + } + @Override public IngestDocument execute(IngestDocument document) { if (overrideEnabled || document.hasField(field) == false || document.getFieldValue(field, Object.class) == null) { - document.setFieldValue(field, value, ignoreEmptyValue); + if (copyFrom != null) { + String path = document.renderTemplate(field); + if (copyFrom.isEmpty()) { + throw new IllegalArgumentException("copy_from cannot be empty"); + } + Object sourceFieldValue = document.getFieldValue(copyFrom, Object.class, ignoreEmptyValue); + if (ignoreEmptyValue + && (sourceFieldValue == null || sourceFieldValue instanceof String && ((String) sourceFieldValue).isEmpty())) { + return document; + } + document.setFieldValue(path, IngestDocument.deepCopy(sourceFieldValue)); + } else { + document.setFieldValue(field, value, ignoreEmptyValue); + } } return document; } @@ -119,18 +141,20 @@ public SetProcessor create( Map config ) throws Exception { String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); - Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value"); boolean overrideEnabled = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "override", true); TemplateScript.Factory compiledTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", field, scriptService); boolean ignoreEmptyValue = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_empty_value", false); - return new SetProcessor( - processorTag, - description, - compiledTemplate, - ValueSource.wrap(value, scriptService), - overrideEnabled, - ignoreEmptyValue - ); + String copyFrom = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "copy_from"); + + ValueSource valueSource = null; + if (copyFrom == null) { + Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value"); + valueSource = ValueSource.wrap(value, scriptService); + } else if (config.get("value") != null) { + throw newConfigurationException(TYPE, processorTag, "copy_from", "either copy_from or value can be set"); + } + + return new SetProcessor(processorTag, description, compiledTemplate, valueSource, overrideEnabled, ignoreEmptyValue, copyFrom); } } } diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/ForEachProcessorTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/ForEachProcessorTests.java index 241945e58fa06..6a1cba81d95b9 100644 --- a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/ForEachProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/ForEachProcessorTests.java @@ -160,7 +160,8 @@ public void testRestOfTheDocumentIsAvailable() throws Exception { "_tag", null, new TestTemplateService.MockTemplateScript.Factory("_ingest._value.new_field"), - (model) -> model.get("other") + (model) -> model.get("other"), + null ), false ); diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/SetProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/SetProcessorFactoryTests.java index bb0f319a61b00..85636507789db 100644 --- a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/SetProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/SetProcessorFactoryTests.java @@ -124,4 +124,16 @@ public void testInvalidMustacheTemplate() throws Exception { assertThat(exception.getMetadata("opensearch.processor_tag").get(0), equalTo(processorTag)); } + public void testCopyFrom() throws Exception { + Map config = new HashMap<>(); + config.put("field", "field1"); + config.put("value", "value1"); + config.put("copy_from", "field2"); + String processorTag = randomAlphaOfLength(10); + assertThrows( + "either copy_from or value can be set", + OpenSearchParseException.class, + () -> factory.create(null, processorTag, null, config) + ); + } } diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/SetProcessorTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/SetProcessorTests.java index 923757b605108..8823a8cebe85f 100644 --- a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/SetProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/SetProcessorTests.java @@ -42,6 +42,8 @@ import org.hamcrest.Matchers; import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.hamcrest.Matchers.equalTo; @@ -51,7 +53,7 @@ public void testSetExistingFields() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); String fieldName = RandomDocumentPicks.randomExistingFieldName(random(), ingestDocument); Object fieldValue = RandomDocumentPicks.randomFieldValue(random()); - Processor processor = createSetProcessor(fieldName, fieldValue, true, false); + Processor processor = createSetProcessor(fieldName, fieldValue, true, false, null); processor.execute(ingestDocument); assertThat(ingestDocument.hasField(fieldName), equalTo(true)); assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue)); @@ -63,7 +65,7 @@ public void testSetNewFields() throws Exception { IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); Object fieldValue = RandomDocumentPicks.randomFieldValue(random()); String fieldName = RandomDocumentPicks.addRandomField(random(), testIngestDocument, fieldValue); - Processor processor = createSetProcessor(fieldName, fieldValue, true, false); + Processor processor = createSetProcessor(fieldName, fieldValue, true, false, null); processor.execute(ingestDocument); assertThat(ingestDocument.hasField(fieldName), equalTo(true)); assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue)); @@ -72,7 +74,7 @@ public void testSetNewFields() throws Exception { public void testSetFieldsTypeMismatch() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); ingestDocument.setFieldValue("field", "value"); - Processor processor = createSetProcessor("field.inner", "value", true, false); + Processor processor = createSetProcessor("field.inner", "value", true, false, null); try { processor.execute(ingestDocument); fail("processor execute should have failed"); @@ -88,7 +90,7 @@ public void testSetNewFieldWithOverrideDisabled() throws Exception { IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); String fieldName = RandomDocumentPicks.randomFieldName(random()); Object fieldValue = RandomDocumentPicks.randomFieldValue(random()); - Processor processor = createSetProcessor(fieldName, fieldValue, false, false); + Processor processor = createSetProcessor(fieldName, fieldValue, false, false, null); processor.execute(ingestDocument); assertThat(ingestDocument.hasField(fieldName), equalTo(true)); assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue)); @@ -98,7 +100,7 @@ public void testSetExistingFieldWithOverrideDisabled() throws Exception { IngestDocument ingestDocument = new IngestDocument(new HashMap<>(), new HashMap<>()); Object fieldValue = "foo"; String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); - Processor processor = createSetProcessor(fieldName, "bar", false, false); + Processor processor = createSetProcessor(fieldName, "bar", false, false, null); processor.execute(ingestDocument); assertThat(ingestDocument.hasField(fieldName), equalTo(true)); assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(fieldValue)); @@ -109,7 +111,7 @@ public void testSetExistingNullFieldWithOverrideDisabled() throws Exception { Object fieldValue = null; Object newValue = "bar"; String fieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, fieldValue); - Processor processor = createSetProcessor(fieldName, newValue, false, false); + Processor processor = createSetProcessor(fieldName, newValue, false, false, null); processor.execute(ingestDocument); assertThat(ingestDocument.hasField(fieldName), equalTo(true)); assertThat(ingestDocument.getFieldValue(fieldName, Object.class), equalTo(newValue)); @@ -117,7 +119,7 @@ public void testSetExistingNullFieldWithOverrideDisabled() throws Exception { public void testSetMetadataExceptVersion() throws Exception { Metadata randomMetadata = randomFrom(Metadata.INDEX, Metadata.ID, Metadata.ROUTING); - Processor processor = createSetProcessor(randomMetadata.getFieldName(), "_value", true, false); + Processor processor = createSetProcessor(randomMetadata.getFieldName(), "_value", true, false, null); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue(randomMetadata.getFieldName(), String.class), Matchers.equalTo("_value")); @@ -125,7 +127,7 @@ public void testSetMetadataExceptVersion() throws Exception { public void testSetMetadataVersion() throws Exception { long version = randomNonNegativeLong(); - Processor processor = createSetProcessor(Metadata.VERSION.getFieldName(), version, true, false); + Processor processor = createSetProcessor(Metadata.VERSION.getFieldName(), version, true, false, null); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue(Metadata.VERSION.getFieldName(), Long.class), Matchers.equalTo(version)); @@ -133,7 +135,7 @@ public void testSetMetadataVersion() throws Exception { public void testSetMetadataVersionType() throws Exception { String versionType = randomFrom("internal", "external", "external_gte"); - Processor processor = createSetProcessor(Metadata.VERSION_TYPE.getFieldName(), versionType, true, false); + Processor processor = createSetProcessor(Metadata.VERSION_TYPE.getFieldName(), versionType, true, false, null); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue(Metadata.VERSION_TYPE.getFieldName(), String.class), Matchers.equalTo(versionType)); @@ -141,7 +143,7 @@ public void testSetMetadataVersionType() throws Exception { public void testSetMetadataIfSeqNo() throws Exception { long ifSeqNo = randomNonNegativeLong(); - Processor processor = createSetProcessor(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo, true, false); + Processor processor = createSetProcessor(Metadata.IF_SEQ_NO.getFieldName(), ifSeqNo, true, false, null); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue(Metadata.IF_SEQ_NO.getFieldName(), Long.class), Matchers.equalTo(ifSeqNo)); @@ -149,20 +151,70 @@ public void testSetMetadataIfSeqNo() throws Exception { public void testSetMetadataIfPrimaryTerm() throws Exception { long ifPrimaryTerm = randomNonNegativeLong(); - Processor processor = createSetProcessor(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm, true, false); + Processor processor = createSetProcessor(Metadata.IF_PRIMARY_TERM.getFieldName(), ifPrimaryTerm, true, false, null); IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); processor.execute(ingestDocument); assertThat(ingestDocument.getFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), Long.class), Matchers.equalTo(ifPrimaryTerm)); } - private static Processor createSetProcessor(String fieldName, Object fieldValue, boolean overrideEnabled, boolean ignoreEmptyValue) { + public void testCopyFromWithIgnoreEmptyValue() throws Exception { + // do nothing if copy_from field does not exist + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + String newFieldName = RandomDocumentPicks.randomFieldName(random()); + Processor processor = createSetProcessor(newFieldName, null, true, true, RandomDocumentPicks.randomFieldName(random())); + processor.execute(ingestDocument); + assertFalse(ingestDocument.hasField(newFieldName)); + + // throw illegalArgumentException if copy_from is empty string + Processor processorWithEmptyCopyFrom = createSetProcessor(newFieldName, null, true, true, ""); + assertThrows("copy_from cannot be empty", IllegalArgumentException.class, () -> processorWithEmptyCopyFrom.execute(ingestDocument)); + } + + public void testCopyFromOtherField() throws Exception { + // can copy different types of data from one field to another + IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + Object existingFieldValue = RandomDocumentPicks.randomFieldValue(random()); + String existingFieldName = RandomDocumentPicks.addRandomField(random(), ingestDocument, existingFieldValue); + String newFieldName = RandomDocumentPicks.randomFieldName(random()); + Processor processor = createSetProcessor(newFieldName, null, true, false, existingFieldName); + processor.execute(ingestDocument); + assertTrue(ingestDocument.hasField(newFieldName)); + assertDeepCopiedObjectEquals(ingestDocument.getFieldValue(newFieldName, Object.class), existingFieldValue); + } + + @SuppressWarnings("unchecked") + private static void assertDeepCopiedObjectEquals(Object expected, Object actual) { + if (expected instanceof Map) { + Map expectedMap = (Map) expected; + Map actualMap = (Map) actual; + assertEquals(expectedMap.size(), actualMap.size()); + for (Map.Entry expectedEntry : expectedMap.entrySet()) { + assertDeepCopiedObjectEquals(expectedEntry.getValue(), actualMap.get(expectedEntry.getKey())); + } + } else if (expected instanceof List) { + assertArrayEquals(((List) expected).toArray(), ((List) actual).toArray()); + } else if (expected instanceof byte[]) { + assertArrayEquals((byte[]) expected, (byte[]) actual); + } else { + assertEquals(expected, actual); + } + } + + private static Processor createSetProcessor( + String fieldName, + Object fieldValue, + boolean overrideEnabled, + boolean ignoreEmptyValue, + String copyFrom + ) { return new SetProcessor( randomAlphaOfLength(10), null, new TestTemplateService.MockTemplateScript.Factory(fieldName), ValueSource.wrap(fieldValue, TestTemplateService.instance()), overrideEnabled, - ignoreEmptyValue + ignoreEmptyValue, + copyFrom ); } } diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/270_set_processor.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/270_set_processor.yml index bcd9e23864609..ac2b5d4230f0e 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/270_set_processor.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/270_set_processor.yml @@ -101,3 +101,158 @@ teardown: pipeline: 1 require_alias: true body: { foo: bar } + +--- +"Test set processor with both value and copy_from": + - do: + catch: '/either copy_from or value can be set/' + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "foo", + "value" : "bar", + "copy_from" : "zoo" + } + } + ] + } + +--- +"Test set processor with copy_from and ignore_empty_value": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "foo", + "copy_from" : "zoo" + } + } + ] + } + - match: { acknowledged: true } + - do: + catch: '/field \[zoo\] not present as part of path \[zoo\]/' + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: { + foo: "foo" + } + + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "foo", + "copy_from" : "zoo", + "ignore_empty_value" : true + } + } + ] + } + - match: { acknowledged: true } + # field zoo does not exist + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: { + foo: "foo" + } + + - do: + get: + index: test + id: 1 + - match: { _source: {foo: "foo"} } + + # field zoo is empty string + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: { + foo: "foo", + zoo: "" + } + + - do: + get: + index: test + id: 1 + - match: { _source: { foo: "foo", zoo: "" } } + +--- +"Test set processor with copy_from": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [ + { + "set" : { + "field" : "foo", + "copy_from" : "zoo" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "my_pipeline" + body: { + foo: "bar", + zoo: { + stringField: "string", + intField: 1, + floatField: 1.0, + booleanField: true, + stringArrayField: ["a", "b", "c"], + intArrayField: [1, 2, 3], + booleanArrayField: [true, false, true], + floatArrayField: [1.0, 2.0, 3.0], + mapField: { + field_a: "value_a", + field_b: { + field_c: "value_c" + } + } + } + } + - do: + get: + index: test + id: 1 + - match: { _source.foo.stringField: "string" } + - match: { _source.foo.intField: 1 } + - match: { _source.foo.floatField: 1.0 } + - match: { _source.foo.booleanField: true } + - match: { _source.foo.stringArrayField: ["a", "b", "c"] } + - match: { _source.foo.intArrayField: [1, 2, 3] } + - match: { _source.foo.booleanArrayField: [true, false, true] } + - match: { _source.foo.floatArrayField: [1.0, 2.0, 3.0] } + - match: { _source.foo.mapField: { field_a: "value_a", field_b: { field_c: "value_c"}}} diff --git a/server/src/main/java/org/opensearch/ingest/IngestDocument.java b/server/src/main/java/org/opensearch/ingest/IngestDocument.java index e0de0a9488ad9..3510bfa936b10 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/opensearch/ingest/IngestDocument.java @@ -755,7 +755,7 @@ public static Map deepCopyMap(Map source) { return (Map) deepCopy(source); } - private static Object deepCopy(Object value) { + public static Object deepCopy(Object value) { if (value instanceof Map) { Map mapValue = (Map) value; Map copy = new HashMap<>(mapValue.size()); From 15069d6091d1d6e41330933eb826cdc0e28587af Mon Sep 17 00:00:00 2001 From: Gao Binlong Date: Tue, 10 Oct 2023 17:49:35 +0800 Subject: [PATCH 2/2] Modify changelog Signed-off-by: Gao Binlong --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bff4f7d16a830..3cc3d2887f3d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -93,7 +93,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204)) - [Remote Store] Add support to restrict creation & deletion if system repository and mutation of immutable settings of system repository ([#9839](https://github.com/opensearch-project/OpenSearch/pull/9839)) - Improve compressed request handling ([#10261](https://github.com/opensearch-project/OpenSearch/pull/10261)) -- Set ingest processor supports copying from one field to another +- Set ingest processor supports copying from one field to another ([#10529](https://github.com/opensearch-project/OpenSearch/pull/10529)) ### Dependencies - Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))