Skip to content

Commit

Permalink
Encapsulate map in PipelinedRequestContext
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Froh <[email protected]>
  • Loading branch information
msfroh committed Nov 9, 2023
1 parent 68b9e72 commit 2af7753
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 279 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public SearchRequest processRequest(SearchRequest request, PipelinedRequestConte
if (originalSize == -1) {
originalSize = SearchService.DEFAULT_SIZE;
}
requestContext.getGenericRequestContext().put(applyContextPrefix(contextPrefix, ORIGINAL_SIZE), originalSize);
requestContext.setAttribute(applyContextPrefix(contextPrefix, ORIGINAL_SIZE), originalSize);
int newSize = (int) Math.ceil(originalSize * sampleFactor);
request.source().size(newSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.StatefulSearchRequestProcessor;
import org.opensearch.search.pipeline.common.helpers.BasicMap;
import org.opensearch.search.pipeline.common.helpers.SearchRequestMap;

import java.io.InputStream;
Expand Down Expand Up @@ -88,12 +89,33 @@ public SearchRequest processRequest(SearchRequest request, PipelinedRequestConte
searchScript = precompiledSearchScript;
}
// execute the script with the search request in context
searchScript.execute(
Map.of("_source", new SearchRequestMap(request), "request_context", requestContext.getGenericRequestContext())
);
searchScript.execute(Map.of("_source", new SearchRequestMap(request), "request_context", new RequestContextMap(requestContext)));
return request;
}

private static class RequestContextMap extends BasicMap {
private final PipelinedRequestContext pipelinedRequestContext;

private RequestContextMap(PipelinedRequestContext pipelinedRequestContext) {
this.pipelinedRequestContext = pipelinedRequestContext;
}

@Override
public Object get(Object key) {
if (key instanceof String) {
return pipelinedRequestContext.getAttribute(key.toString());
}
return null;
}

@Override
public Object put(String key, Object value) {
Object originalValue = get(key);
pipelinedRequestContext.setAttribute(key, value);
return originalValue;
}
}

/**
* Returns the type of the processor.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp
int size;
if (targetSize < 0) { // No value specified in processor config. Use context value instead.
String key = applyContextPrefix(contextPrefix, OversampleRequestProcessor.ORIGINAL_SIZE);
Object o = requestContext.getGenericRequestContext().get(key);
Object o = requestContext.getAttribute(key);
if (o == null) {
throw new IllegalStateException("Must specify " + TARGET_SIZE + " unless an earlier processor set " + key);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common.helpers;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
* Helper for map abstractions passed to scripting processors. Throws {@link UnsupportedOperationException} for almost
* all methods. Subclasses just need to implement get and put.
*/
public abstract class BasicMap implements Map<String, Object> {

/**
* No-args constructor.
*/
protected BasicMap() {}

private static final String UNSUPPORTED_OP_ERR = " Method not supported in Search pipeline script";

@Override
public boolean isEmpty() {
throw new UnsupportedOperationException("isEmpty" + UNSUPPORTED_OP_ERR);
}

public int size() {
throw new UnsupportedOperationException("size" + UNSUPPORTED_OP_ERR);
}

public boolean containsKey(Object key) {
return get(key) != null;
}

public boolean containsValue(Object value) {
throw new UnsupportedOperationException("containsValue" + UNSUPPORTED_OP_ERR);
}

public Object remove(Object key) {
throw new UnsupportedOperationException("remove" + UNSUPPORTED_OP_ERR);
}

public void putAll(Map<? extends String, ?> m) {
throw new UnsupportedOperationException("putAll" + UNSUPPORTED_OP_ERR);
}

public void clear() {
throw new UnsupportedOperationException("clear" + UNSUPPORTED_OP_ERR);
}

public Set<String> keySet() {
throw new UnsupportedOperationException("keySet" + UNSUPPORTED_OP_ERR);
}

public Collection<Object> values() {
throw new UnsupportedOperationException("values" + UNSUPPORTED_OP_ERR);
}

public Set<Map.Entry<String, Object>> entrySet() {
throw new UnsupportedOperationException("entrySet" + UNSUPPORTED_OP_ERR);
}

@Override
public Object getOrDefault(Object key, Object defaultValue) {
throw new UnsupportedOperationException("getOrDefault" + UNSUPPORTED_OP_ERR);
}

@Override
public void forEach(BiConsumer<? super String, ? super Object> action) {
throw new UnsupportedOperationException("forEach" + UNSUPPORTED_OP_ERR);
}

@Override
public void replaceAll(BiFunction<? super String, ? super Object, ?> function) {
throw new UnsupportedOperationException("replaceAll" + UNSUPPORTED_OP_ERR);
}

@Override
public Object putIfAbsent(String key, Object value) {
throw new UnsupportedOperationException("putIfAbsent" + UNSUPPORTED_OP_ERR);
}

@Override
public boolean remove(Object key, Object value) {
throw new UnsupportedOperationException("remove" + UNSUPPORTED_OP_ERR);
}

@Override
public boolean replace(String key, Object oldValue, Object newValue) {
throw new UnsupportedOperationException("replace" + UNSUPPORTED_OP_ERR);
}

@Override
public Object replace(String key, Object value) {
throw new UnsupportedOperationException("replace" + UNSUPPORTED_OP_ERR);
}

@Override
public Object computeIfAbsent(String key, Function<? super String, ?> mappingFunction) {
throw new UnsupportedOperationException("computeIfAbsent" + UNSUPPORTED_OP_ERR);
}

@Override
public Object computeIfPresent(String key, BiFunction<? super String, ? super Object, ?> remappingFunction) {
throw new UnsupportedOperationException("computeIfPresent" + UNSUPPORTED_OP_ERR);
}

@Override
public Object compute(String key, BiFunction<? super String, ? super Object, ?> remappingFunction) {
throw new UnsupportedOperationException("compute" + UNSUPPORTED_OP_ERR);
}

@Override
public Object merge(String key, Object value, BiFunction<? super Object, ? super Object, ?> remappingFunction) {
throw new UnsupportedOperationException("merge" + UNSUPPORTED_OP_ERR);
}
}
Loading

0 comments on commit 2af7753

Please sign in to comment.