diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/OversampleRequestProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/OversampleRequestProcessor.java index b28358569e5fb..ecd5902128f6c 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/OversampleRequestProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/OversampleRequestProcessor.java @@ -50,7 +50,7 @@ public SearchRequest processRequest(SearchRequest request, PipelinedRequestConte if (originalSize == -1) { originalSize = SearchService.DEFAULT_SIZE; } - requestContext.getGenericRequestContext().put(applyContextPrefix(contextPrefix, ORIGINAL_SIZE), originalSize); + requestContext.setAttribute(applyContextPrefix(contextPrefix, ORIGINAL_SIZE), originalSize); int newSize = (int) Math.ceil(originalSize * sampleFactor); request.source().size(newSize); } diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/ScriptRequestProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/ScriptRequestProcessor.java index 982458fabc7f2..a2f55d677f004 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/ScriptRequestProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/ScriptRequestProcessor.java @@ -27,6 +27,7 @@ import org.opensearch.search.pipeline.Processor; import org.opensearch.search.pipeline.SearchRequestProcessor; import org.opensearch.search.pipeline.StatefulSearchRequestProcessor; +import org.opensearch.search.pipeline.common.helpers.BasicMap; import org.opensearch.search.pipeline.common.helpers.SearchRequestMap; import java.io.InputStream; @@ -88,12 +89,33 @@ public SearchRequest processRequest(SearchRequest request, PipelinedRequestConte searchScript = precompiledSearchScript; } // execute the script with the search request in context - searchScript.execute( - Map.of("_source", new SearchRequestMap(request), "request_context", requestContext.getGenericRequestContext()) - ); + searchScript.execute(Map.of("_source", new SearchRequestMap(request), "request_context", new RequestContextMap(requestContext))); return request; } + private static class RequestContextMap extends BasicMap { + private final PipelinedRequestContext pipelinedRequestContext; + + private RequestContextMap(PipelinedRequestContext pipelinedRequestContext) { + this.pipelinedRequestContext = pipelinedRequestContext; + } + + @Override + public Object get(Object key) { + if (key instanceof String) { + return pipelinedRequestContext.getAttribute(key.toString()); + } + return null; + } + + @Override + public Object put(String key, Object value) { + Object originalValue = get(key); + pipelinedRequestContext.setAttribute(key, value); + return originalValue; + } + } + /** * Returns the type of the processor. * diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/TruncateHitsResponseProcessor.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/TruncateHitsResponseProcessor.java index 633fc55323a62..358112880548d 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/TruncateHitsResponseProcessor.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/TruncateHitsResponseProcessor.java @@ -53,7 +53,7 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp int size; if (targetSize < 0) { // No value specified in processor config. Use context value instead. String key = applyContextPrefix(contextPrefix, OversampleRequestProcessor.ORIGINAL_SIZE); - Object o = requestContext.getGenericRequestContext().get(key); + Object o = requestContext.getAttribute(key); if (o == null) { throw new IllegalStateException("Must specify " + TARGET_SIZE + " unless an earlier processor set " + key); } diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/helpers/BasicMap.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/helpers/BasicMap.java new file mode 100644 index 0000000000000..7cbc0fcb132ef --- /dev/null +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/helpers/BasicMap.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pipeline.common.helpers; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Helper for map abstractions passed to scripting processors. Throws {@link UnsupportedOperationException} for almost + * all methods. Subclasses just need to implement get and put. + */ +public abstract class BasicMap implements Map { + + /** + * No-args constructor. + */ + protected BasicMap() {} + + private static final String UNSUPPORTED_OP_ERR = " Method not supported in Search pipeline script"; + + @Override + public boolean isEmpty() { + throw new UnsupportedOperationException("isEmpty" + UNSUPPORTED_OP_ERR); + } + + public int size() { + throw new UnsupportedOperationException("size" + UNSUPPORTED_OP_ERR); + } + + public boolean containsKey(Object key) { + return get(key) != null; + } + + public boolean containsValue(Object value) { + throw new UnsupportedOperationException("containsValue" + UNSUPPORTED_OP_ERR); + } + + public Object remove(Object key) { + throw new UnsupportedOperationException("remove" + UNSUPPORTED_OP_ERR); + } + + public void putAll(Map m) { + throw new UnsupportedOperationException("putAll" + UNSUPPORTED_OP_ERR); + } + + public void clear() { + throw new UnsupportedOperationException("clear" + UNSUPPORTED_OP_ERR); + } + + public Set keySet() { + throw new UnsupportedOperationException("keySet" + UNSUPPORTED_OP_ERR); + } + + public Collection values() { + throw new UnsupportedOperationException("values" + UNSUPPORTED_OP_ERR); + } + + public Set> entrySet() { + throw new UnsupportedOperationException("entrySet" + UNSUPPORTED_OP_ERR); + } + + @Override + public Object getOrDefault(Object key, Object defaultValue) { + throw new UnsupportedOperationException("getOrDefault" + UNSUPPORTED_OP_ERR); + } + + @Override + public void forEach(BiConsumer action) { + throw new UnsupportedOperationException("forEach" + UNSUPPORTED_OP_ERR); + } + + @Override + public void replaceAll(BiFunction function) { + throw new UnsupportedOperationException("replaceAll" + UNSUPPORTED_OP_ERR); + } + + @Override + public Object putIfAbsent(String key, Object value) { + throw new UnsupportedOperationException("putIfAbsent" + UNSUPPORTED_OP_ERR); + } + + @Override + public boolean remove(Object key, Object value) { + throw new UnsupportedOperationException("remove" + UNSUPPORTED_OP_ERR); + } + + @Override + public boolean replace(String key, Object oldValue, Object newValue) { + throw new UnsupportedOperationException("replace" + UNSUPPORTED_OP_ERR); + } + + @Override + public Object replace(String key, Object value) { + throw new UnsupportedOperationException("replace" + UNSUPPORTED_OP_ERR); + } + + @Override + public Object computeIfAbsent(String key, Function mappingFunction) { + throw new UnsupportedOperationException("computeIfAbsent" + UNSUPPORTED_OP_ERR); + } + + @Override + public Object computeIfPresent(String key, BiFunction remappingFunction) { + throw new UnsupportedOperationException("computeIfPresent" + UNSUPPORTED_OP_ERR); + } + + @Override + public Object compute(String key, BiFunction remappingFunction) { + throw new UnsupportedOperationException("compute" + UNSUPPORTED_OP_ERR); + } + + @Override + public Object merge(String key, Object value, BiFunction remappingFunction) { + throw new UnsupportedOperationException("merge" + UNSUPPORTED_OP_ERR); + } +} diff --git a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/helpers/SearchRequestMap.java b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/helpers/SearchRequestMap.java index 7af3ac66be146..0e9185df95de6 100644 --- a/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/helpers/SearchRequestMap.java +++ b/modules/search-pipeline-common/src/main/java/org/opensearch/search/pipeline/common/helpers/SearchRequestMap.java @@ -11,19 +11,13 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.search.builder.SearchSourceBuilder; -import java.util.Collection; import java.util.Map; -import java.util.Set; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.Function; /** * A custom implementation of {@link Map} that provides access to the properties of a {@link SearchRequest}'s * {@link SearchSourceBuilder}. The class allows retrieving and modifying specific properties of the search request. */ -public class SearchRequestMap implements Map { - private static final String UNSUPPORTED_OP_ERR = " Method not supported in Search pipeline script"; +public class SearchRequestMap extends BasicMap implements Map { private final SearchSourceBuilder source; @@ -36,17 +30,6 @@ public SearchRequestMap(SearchRequest searchRequest) { source = searchRequest.source(); } - /** - * Retrieves the number of properties in the SearchSourceBuilder. - * - * @return The number of properties in the SearchSourceBuilder. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public int size() { - throw new UnsupportedOperationException("size" + UNSUPPORTED_OP_ERR); - } - /** * Checks if the SearchSourceBuilder is empty. * @@ -57,29 +40,6 @@ public boolean isEmpty() { return source == null; } - /** - * Checks if the SearchSourceBuilder contains the specified property. - * - * @param key The property to check for. - * @return {@code true} if the SearchSourceBuilder contains the specified property, {@code false} otherwise. - */ - @Override - public boolean containsKey(Object key) { - return get(key) != null; - } - - /** - * Checks if the SearchSourceBuilder contains the specified value. - * - * @param value The value to check for. - * @return {@code true} if the SearchSourceBuilder contains the specified value, {@code false} otherwise. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public boolean containsValue(Object value) { - throw new UnsupportedOperationException("containsValue" + UNSUPPORTED_OP_ERR); - } - /** * Retrieves the value associated with the specified property from the SearchSourceBuilder. * @@ -177,219 +137,4 @@ public Object put(String key, Object value) { } return originalValue; } - - /** - * Removes the specified property from the SearchSourceBuilder. - * - * @param key The name of the property that will be removed. - * @return The value associated with the property before it was removed, or null if the property was not found. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public Object remove(Object key) { - throw new UnsupportedOperationException("remove" + UNSUPPORTED_OP_ERR); - } - - /** - * Sets all the properties from the specified map to the SearchSourceBuilder. - * - * @param m The map containing the properties to be set. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public void putAll(Map m) { - throw new UnsupportedOperationException("putAll" + UNSUPPORTED_OP_ERR); - } - - /** - * Removes all properties from the SearchSourceBuilder. - * - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public void clear() { - throw new UnsupportedOperationException("clear" + UNSUPPORTED_OP_ERR); - } - - /** - * Returns a set view of the property names in the SearchSourceBuilder. - * - * @return A set view of the property names in the SearchSourceBuilder. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public Set keySet() { - throw new UnsupportedOperationException("keySet" + UNSUPPORTED_OP_ERR); - } - - /** - * Returns a collection view of the property values in the SearchSourceBuilder. - * - * @return A collection view of the property values in the SearchSourceBuilder. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public Collection values() { - throw new UnsupportedOperationException("values" + UNSUPPORTED_OP_ERR); - } - - /** - * Returns a set view of the properties in the SearchSourceBuilder. - * - * @return A set view of the properties in the SearchSourceBuilder. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public Set> entrySet() { - throw new UnsupportedOperationException("entrySet" + UNSUPPORTED_OP_ERR); - } - - /** - * Returns the value to which the specified property has, or the defaultValue if the property is not present in the - * SearchSourceBuilder. - * - * @param key The property whose associated value is to be returned. - * @param defaultValue The default value to be returned if the property is not present. - * @return The value to which the specified property has, or the defaultValue if the property is not present. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public Object getOrDefault(Object key, Object defaultValue) { - throw new UnsupportedOperationException("getOrDefault" + UNSUPPORTED_OP_ERR); - } - - /** - * Performs the given action for each property in the SearchSourceBuilder until all properties have been processed or the - * action throws an exception - * - * @param action The action to be performed for each property. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public void forEach(BiConsumer action) { - throw new UnsupportedOperationException("forEach" + UNSUPPORTED_OP_ERR); - } - - /** - * Replaces each property's value with the result of invoking the given function on that property until all properties have - * been processed or the function throws an exception. - * - * @param function The function to apply to each property. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public void replaceAll(BiFunction function) { - throw new UnsupportedOperationException("replaceAll" + UNSUPPORTED_OP_ERR); - } - - /** - * If the specified property is not already associated with a value, associates it with the given value and returns null, - * else returns the current value. - * - * @param key The property whose value is to be set if absent. - * @param value The value to be associated with the specified property. - * @return The current value associated with the property, or null if the property is not present. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public Object putIfAbsent(String key, Object value) { - throw new UnsupportedOperationException("putIfAbsent" + UNSUPPORTED_OP_ERR); - } - - /** - * Removes the property only if it has the given value. - * - * @param key The property to be removed. - * @param value The value expected to be associated with the property. - * @return {@code true} if the entry was removed, {@code false} otherwise. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public boolean remove(Object key, Object value) { - throw new UnsupportedOperationException("remove" + UNSUPPORTED_OP_ERR); - } - - /** - * Replaces the specified property only if it has the given value. - * - * @param key The property to be replaced. - * @param oldValue The value expected to be associated with the property. - * @param newValue The value to be associated with the property. - * @return {@code true} if the property was replaced, {@code false} otherwise. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public boolean replace(String key, Object oldValue, Object newValue) { - throw new UnsupportedOperationException("replace" + UNSUPPORTED_OP_ERR); - } - - /** - * Replaces the specified property only if it has the given value. - * - * @param key The property to be replaced. - * @param value The value to be associated with the property. - * @return The previous value associated with the property, or null if the property was not found. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public Object replace(String key, Object value) { - throw new UnsupportedOperationException("replace" + UNSUPPORTED_OP_ERR); - } - - /** - * The computed value associated with the property, or null if the property is not present. - * - * @param key The property whose value is to be computed if absent. - * @param mappingFunction The function to compute a value based on the property. - * @return The computed value associated with the property, or null if the property is not present. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public Object computeIfAbsent(String key, Function mappingFunction) { - throw new UnsupportedOperationException("computeIfAbsent" + UNSUPPORTED_OP_ERR); - } - - /** - * If the value for the specified property is present, attempts to compute a new mapping given the property and its current - * mapped value. - * - * @param key The property for which the mapping is to be computed. - * @param remappingFunction The function to compute a new mapping. - * @return The new value associated with the property, or null if the property is not present. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public Object computeIfPresent(String key, BiFunction remappingFunction) { - throw new UnsupportedOperationException("computeIfPresent" + UNSUPPORTED_OP_ERR); - } - - /** - * If the value for the specified property is present, attempts to compute a new mapping given the property and its current - * mapped value, or removes the property if the computed value is null. - * - * @param key The property for which the mapping is to be computed. - * @param remappingFunction The function to compute a new mapping. - * @return The new value associated with the property, or null if the property is not present. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public Object compute(String key, BiFunction remappingFunction) { - throw new UnsupportedOperationException("compute" + UNSUPPORTED_OP_ERR); - } - - /** - * If the specified property is not already associated with a value or is associated with null, associates it with the - * given non-null value. Otherwise, replaces the associated value with the results of applying the given - * remapping function to the current and new values. - * - * @param key The property for which the mapping is to be merged. - * @param value The non-null value to be merged with the existing value. - * @param remappingFunction The function to merge the existing and new values. - * @return The new value associated with the property, or null if the property is not present. - * @throws UnsupportedOperationException always, as the method is not supported. - */ - @Override - public Object merge(String key, Object value, BiFunction remappingFunction) { - throw new UnsupportedOperationException("merge" + UNSUPPORTED_OP_ERR); - } } diff --git a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/OversampleRequestProcessorTests.java b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/OversampleRequestProcessorTests.java index 61657cf09551a..7c1e21181a1b9 100644 --- a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/OversampleRequestProcessorTests.java +++ b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/OversampleRequestProcessorTests.java @@ -29,7 +29,7 @@ public void testEmptySource() { PipelinedRequestContext context = new PipelinedRequestContext(); SearchRequest transformedRequest = processor.processRequest(request, context); assertEquals(request, transformedRequest); - assertTrue(context.getGenericRequestContext().isEmpty()); + assertNull(context.getAttribute("original_size")); } public void testBasicBehavior() { @@ -42,8 +42,7 @@ public void testBasicBehavior() { PipelinedRequestContext context = new PipelinedRequestContext(); SearchRequest transformedRequest = processor.processRequest(request, context); assertEquals(30, transformedRequest.source().size()); - assertEquals(1, context.getGenericRequestContext().size()); - assertEquals(10, context.getGenericRequestContext().get("original_size")); + assertEquals(10, context.getAttribute("original_size")); } public void testContextPrefix() { @@ -58,7 +57,6 @@ public void testContextPrefix() { PipelinedRequestContext context = new PipelinedRequestContext(); SearchRequest transformedRequest = processor.processRequest(request, context); assertEquals(30, transformedRequest.source().size()); - assertEquals(1, context.getGenericRequestContext().size()); - assertEquals(10, context.getGenericRequestContext().get("foo.original_size")); + assertEquals(10, context.getAttribute("foo.original_size")); } } diff --git a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/TruncateHitsResponseProcessorTests.java b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/TruncateHitsResponseProcessorTests.java index 85c23cca73651..5009135cbd190 100644 --- a/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/TruncateHitsResponseProcessorTests.java +++ b/modules/search-pipeline-common/src/test/java/org/opensearch/search/pipeline/common/TruncateHitsResponseProcessorTests.java @@ -44,7 +44,7 @@ public void testTargetSizePassedViaContext() { int numHits = randomInt(100); SearchResponse response = constructResponse(numHits); PipelinedRequestContext requestContext = new PipelinedRequestContext(); - requestContext.getGenericRequestContext().put("original_size", targetSize); + requestContext.setAttribute("original_size", targetSize); SearchResponse transformedResponse = processor.processResponse(new SearchRequest(), response, requestContext); assertEquals(Math.min(targetSize, numHits), transformedResponse.getHits().getHits().length); } @@ -58,7 +58,7 @@ public void testTargetSizePassedViaContextWithPrefix() { int numHits = randomInt(100); SearchResponse response = constructResponse(numHits); PipelinedRequestContext requestContext = new PipelinedRequestContext(); - requestContext.getGenericRequestContext().put("foo.original_size", targetSize); + requestContext.setAttribute("foo.original_size", targetSize); SearchResponse transformedResponse = processor.processResponse(new SearchRequest(), response, requestContext); assertEquals(Math.min(targetSize, numHits), transformedResponse.getHits().getHits().length); } diff --git a/server/src/main/java/org/opensearch/search/pipeline/PipelinedRequestContext.java b/server/src/main/java/org/opensearch/search/pipeline/PipelinedRequestContext.java index 3b3b1d3095a52..107ee4dc201c3 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/PipelinedRequestContext.java +++ b/server/src/main/java/org/opensearch/search/pipeline/PipelinedRequestContext.java @@ -15,9 +15,24 @@ * A holder for state that is passed through each processor in the pipeline. */ public class PipelinedRequestContext { - private final Map genericRequestContext = new HashMap<>(); + private final Map attributes = new HashMap<>(); - public Map getGenericRequestContext() { - return genericRequestContext; + /** + * Set a generic attribute in the state for this request. Overwrites any existing value. + * + * @param name the name of the attribute to set + * @param value the value to set on the attribute + */ + public void setAttribute(String name, Object value) { + attributes.put(name, value); + } + + /** + * Retrieves a generic attribute value from the state for this request. + * @param name the name of the attribute + * @return the value of the attribute if previously set (and null otherwise) + */ + public Object getAttribute(String name) { + return attributes.get(name); } } diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index 6dfa8145e3a07..536889ac73eeb 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -1383,9 +1383,9 @@ public void testExtraParameterInProcessorConfig() { private static class FakeStatefulRequestProcessor extends AbstractProcessor implements StatefulSearchRequestProcessor { private final String type; - private final Consumer> stateConsumer; + private final Consumer stateConsumer; - public FakeStatefulRequestProcessor(String type, Consumer> stateConsumer) { + public FakeStatefulRequestProcessor(String type, Consumer stateConsumer) { super(null, null, false); this.type = type; this.stateConsumer = stateConsumer; @@ -1398,16 +1398,16 @@ public String getType() { @Override public SearchRequest processRequest(SearchRequest request, PipelinedRequestContext requestContext) throws Exception { - stateConsumer.accept(requestContext.getGenericRequestContext()); + stateConsumer.accept(requestContext); return request; } } private static class FakeStatefulResponseProcessor extends AbstractProcessor implements StatefulSearchResponseProcessor { private final String type; - private final Consumer> stateConsumer; + private final Consumer stateConsumer; - public FakeStatefulResponseProcessor(String type, Consumer> stateConsumer) { + public FakeStatefulResponseProcessor(String type, Consumer stateConsumer) { super(null, null, false); this.type = type; this.stateConsumer = stateConsumer; @@ -1421,7 +1421,7 @@ public String getType() { @Override public SearchResponse processResponse(SearchRequest request, SearchResponse response, PipelinedRequestContext requestContext) throws Exception { - stateConsumer.accept(requestContext.getGenericRequestContext()); + stateConsumer.accept(requestContext); return response; } } @@ -1429,12 +1429,15 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp public void testStatefulProcessors() throws Exception { AtomicReference contextHolder = new AtomicReference<>(); SearchPipelineService searchPipelineService = createWithProcessors( - Map.of("write_context", (pf, t, d, igf, cfg, ctx) -> new FakeStatefulRequestProcessor("write_context", (c) -> c.put("a", "b"))), + Map.of( + "write_context", + (pf, t, d, igf, cfg, ctx) -> new FakeStatefulRequestProcessor("write_context", (c) -> c.setAttribute("a", "b")) + ), Map.of( "read_context", (pf, t, d, igf, cfg, ctx) -> new FakeStatefulResponseProcessor( "read_context", - (c) -> contextHolder.set((String) c.get("a")) + (c) -> contextHolder.set((String) c.getAttribute("a")) ) ), Collections.emptyMap()