Skip to content

Commit

Permalink
Add SplitResponseProcessor to Search Pipelines (#14800)
Browse files Browse the repository at this point in the history
* Add SplitResponseProcessor for search pipelines

Signed-off-by: Daniel Widdis <[email protected]>

* Register the split processor factory

Signed-off-by: Daniel Widdis <[email protected]>

* Address code review comments

Signed-off-by: Daniel Widdis <[email protected]>

* Avoid list copy by casting array

Signed-off-by: Daniel Widdis <[email protected]>

---------

Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis authored Jul 22, 2024
1 parent b35690c commit 45c5f8d
Show file tree
Hide file tree
Showing 5 changed files with 380 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Enabling term version check on local state for all ClusterManager Read Transport Actions ([#14273](https://github.com/opensearch-project/OpenSearch/pull/14273))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProces
TruncateHitsResponseProcessor.TYPE,
new TruncateHitsResponseProcessor.Factory(),
CollapseResponseProcessor.TYPE,
new CollapseResponseProcessor.Factory()
new CollapseResponseProcessor.Factory(),
SplitResponseProcessor.TYPE,
new SplitResponseProcessor.Factory()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;

/**
* Processor that sorts an array of items.
* Throws exception is the specified field is not an array.
*/
public class SplitResponseProcessor extends AbstractProcessor implements SearchResponseProcessor {
/** Key to reference this processor type from a search pipeline. */
public static final String TYPE = "split";
/** Key defining the string field to be split. */
public static final String SPLIT_FIELD = "field";
/** Key defining the delimiter used to split the string. This can be a regular expression pattern. */
public static final String SEPARATOR = "separator";
/** Optional key for handling empty trailing fields. */
public static final String PRESERVE_TRAILING = "preserve_trailing";
/** Optional key to put the split values in a different field. */
public static final String TARGET_FIELD = "target_field";

private final String splitField;
private final String separator;
private final boolean preserveTrailing;
private final String targetField;

SplitResponseProcessor(
String tag,
String description,
boolean ignoreFailure,
String splitField,
String separator,
boolean preserveTrailing,
String targetField
) {
super(tag, description, ignoreFailure);
this.splitField = Objects.requireNonNull(splitField);
this.separator = Objects.requireNonNull(separator);
this.preserveTrailing = preserveTrailing;
this.targetField = targetField == null ? splitField : targetField;
}

/**
* Getter function for splitField
* @return sortField
*/
public String getSplitField() {
return splitField;
}

/**
* Getter function for separator
* @return separator
*/
public String getSeparator() {
return separator;
}

/**
* Getter function for preserveTrailing
* @return preserveTrailing;
*/
public boolean isPreserveTrailing() {
return preserveTrailing;
}

/**
* Getter function for targetField
* @return targetField
*/
public String getTargetField() {
return targetField;
}

@Override
public String getType() {
return TYPE;
}

@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception {
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
Map<String, DocumentField> fields = hit.getFields();
if (fields.containsKey(splitField)) {
DocumentField docField = hit.getFields().get(splitField);
if (docField == null) {
throw new IllegalArgumentException("field [" + splitField + "] is null, cannot split.");
}
Object val = docField.getValue();
if (val == null || !String.class.isAssignableFrom(val.getClass())) {
throw new IllegalArgumentException("field [" + splitField + "] is not a string, cannot split");
}
Object[] strings = ((String) val).split(separator, preserveTrailing ? -1 : 0);
hit.setDocumentField(targetField, new DocumentField(targetField, Arrays.asList(strings)));
}
if (hit.hasSource()) {
BytesReference sourceRef = hit.getSourceRef();
Tuple<? extends MediaType, Map<String, Object>> typeAndSourceMap = XContentHelper.convertToMap(
sourceRef,
false,
(MediaType) null
);

Map<String, Object> sourceAsMap = typeAndSourceMap.v2();
if (sourceAsMap.containsKey(splitField)) {
Object val = sourceAsMap.get(splitField);
if (val instanceof String) {
Object[] strings = ((String) val).split(separator, preserveTrailing ? -1 : 0);
sourceAsMap.put(targetField, Arrays.asList(strings));
}
XContentBuilder builder = XContentBuilder.builder(typeAndSourceMap.v1().xContent());
builder.map(sourceAsMap);
hit.sourceRef(BytesReference.bytes(builder));
}
}
}
return response;
}

static class Factory implements Processor.Factory<SearchResponseProcessor> {

@Override
public SplitResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) {
String splitField = ConfigurationUtils.readStringProperty(TYPE, tag, config, SPLIT_FIELD);
String separator = ConfigurationUtils.readStringProperty(TYPE, tag, config, SEPARATOR);
boolean preserveTrailing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, PRESERVE_TRAILING, false);
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, TARGET_FIELD, splitField);
return new SplitResponseProcessor(tag, description, ignoreFailure, splitField, separator, preserveTrailing, targetField);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testAllowlistNotSpecified() throws IOException {
try (SearchPipelineCommonModulePlugin plugin = new SearchPipelineCommonModulePlugin()) {
assertEquals(Set.of("oversample", "filter_query", "script"), plugin.getRequestProcessors(createParameters(settings)).keySet());
assertEquals(
Set.of("rename_field", "truncate_hits", "collapse"),
Set.of("rename_field", "truncate_hits", "collapse", "split"),
plugin.getResponseProcessors(createParameters(settings)).keySet()
);
assertEquals(Set.of(), plugin.getSearchPhaseResultsProcessors(createParameters(settings)).keySet());
Expand Down
Loading

0 comments on commit 45c5f8d

Please sign in to comment.