Skip to content

Commit

Permalink
Renamed SearchPhaseInjectorProcessor to SearchPhaseResultsProcessor a…
Browse files Browse the repository at this point in the history
…nd fixed the comments

Signed-off-by: Navneet Verma <[email protected]>
  • Loading branch information
navneet1v authored and msfroh committed Jun 28, 2023
1 parent b4bbec6 commit 87ab14d
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
) {
// We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super(
"can_match",
SearchPhaseName.CAN_MATCH.getName(),
logger,
searchTransportService,
nodeIdToConnection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public enum SearchPhaseName {
QUERY("query"),
FETCH("fetch"),
DFS_QUERY("dfs_query"),
EXPAND("expand");
EXPAND("expand"),
CAN_MATCH("can_match");

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package org.opensearch.plugins;

import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchPhaseInjectorProcessor;
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
import org.opensearch.search.pipeline.SearchRequestProcessor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

Expand Down Expand Up @@ -51,7 +51,7 @@ default Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProce
* in pipeline configurations, and the value is a {@link org.opensearch.search.pipeline.Processor.Factory}
* to create the processor from a given pipeline configuration.
*/
default Map<String, Processor.Factory<SearchPhaseInjectorProcessor>> getPhaseInjectorProcessors(Processor.Parameters parameters) {
default Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getPhaseResultsProcessors(Processor.Parameters parameters) {
return Collections.emptyMap();
}
}
20 changes: 10 additions & 10 deletions server/src/main/java/org/opensearch/search/pipeline/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Pipeline {

public static final String REQUEST_PROCESSORS_KEY = "request_processors";
public static final String RESPONSE_PROCESSORS_KEY = "response_processors";
public static final String PHASE_PROCESSORS_KEY = "phase_injector_processors";
public static final String PHASE_PROCESSORS_KEY = "phase_results_processors";
private final String id;
private final String description;
private final Integer version;
Expand All @@ -41,7 +41,7 @@ class Pipeline {
private final List<SearchRequestProcessor> searchRequestProcessors;
private final List<SearchResponseProcessor> searchResponseProcessors;
private final NamedWriteableRegistry namedWriteableRegistry;
private final List<SearchPhaseInjectorProcessor> searchPhaseInjectorProcessors;
private final List<SearchPhaseResultsProcessor> searchPhaseResultsProcessors;
private final LongSupplier relativeTimeSupplier;

Pipeline(
Expand All @@ -50,7 +50,7 @@ class Pipeline {
@Nullable Integer version,
List<SearchRequestProcessor> requestProcessors,
List<SearchResponseProcessor> responseProcessors,
List<SearchPhaseInjectorProcessor> phaseInjectorProcessors,
List<SearchPhaseResultsProcessor> phaseResultsProcessors,
NamedWriteableRegistry namedWriteableRegistry,
LongSupplier relativeTimeSupplier
) {
Expand All @@ -59,7 +59,7 @@ class Pipeline {
this.version = version;
this.searchRequestProcessors = requestProcessors;
this.searchResponseProcessors = responseProcessors;
this.searchPhaseInjectorProcessors = phaseInjectorProcessors;
this.searchPhaseResultsProcessors = phaseResultsProcessors;
this.namedWriteableRegistry = namedWriteableRegistry;
this.relativeTimeSupplier = relativeTimeSupplier;
}
Expand All @@ -84,8 +84,8 @@ List<SearchResponseProcessor> getSearchResponseProcessors() {
return searchResponseProcessors;
}

List<SearchPhaseInjectorProcessor> getSearchPhaseInjectorProcessors() {
return searchPhaseInjectorProcessors;
List<SearchPhaseResultsProcessor> getSearchPhaseResultsProcessors() {
return searchPhaseResultsProcessors;
}

protected void beforeTransformRequest() {}
Expand Down Expand Up @@ -186,10 +186,10 @@ <Result extends SearchPhaseResult> SearchPhaseResults<Result> runSearchPhaseTran
) throws SearchPipelineProcessingException {

try {
for (SearchPhaseInjectorProcessor searchPhaseInjectorProcessor : searchPhaseInjectorProcessors) {
if (currentPhase.equals(searchPhaseInjectorProcessor.getBeforePhase().getName())
&& nextPhase.equals(searchPhaseInjectorProcessor.getAfterPhase().getName())) {
searchPhaseResult = searchPhaseInjectorProcessor.execute(searchPhaseResult, context);
for (SearchPhaseResultsProcessor searchPhaseResultsProcessor : searchPhaseResultsProcessors) {
if (currentPhase.equals(searchPhaseResultsProcessor.getBeforePhase().getName())
&& nextPhase.equals(searchPhaseResultsProcessor.getAfterPhase().getName())) {
searchPhaseResult = searchPhaseResultsProcessor.process(searchPhaseResult, context);
}
}
return searchPhaseResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
/**
* Creates a processor that runs between Phases of the Search.
*/
public interface SearchPhaseInjectorProcessor extends Processor {
<Result extends SearchPhaseResult> SearchPhaseResults<Result> execute(
public interface SearchPhaseResultsProcessor extends Processor {
<Result extends SearchPhaseResult> SearchPhaseResults<Result> process(
final SearchPhaseResults<Result> searchPhaseResult,
final SearchPhaseContext searchPhaseContext
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class SearchPipelineService implements ClusterStateApplier, ReportingServ
private final Map<String, Processor.Factory<SearchRequestProcessor>> requestProcessorFactories;
private final Map<String, Processor.Factory<SearchResponseProcessor>> responseProcessorFactories;

private final Map<String, Processor.Factory<SearchPhaseInjectorProcessor>> phaseInjectorProcessorFactories;
private final Map<String, Processor.Factory<SearchPhaseResultsProcessor>> phaseInjectorProcessorFactories;
private volatile Map<String, PipelineHolder> pipelines = Collections.emptyMap();
private final ThreadPool threadPool;
private final List<Consumer<ClusterState>> searchPipelineClusterStateListeners = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -118,7 +118,7 @@ public SearchPipelineService(
);
this.requestProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getRequestProcessors(parameters));
this.responseProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getResponseProcessors(parameters));
this.phaseInjectorProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getPhaseInjectorProcessors(parameters));
this.phaseInjectorProcessorFactories = processorFactories(searchPipelinePlugins, p -> p.getPhaseResultsProcessors(parameters));
putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_SEARCH_PIPELINE_KEY, true);
deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SEARCH_PIPELINE_KEY, true);
this.isEnabled = isEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProces
}

@Override
public Map<String, Processor.Factory<SearchPhaseInjectorProcessor>> getPhaseInjectorProcessors(Processor.Parameters parameters) {
public Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getPhaseResultsProcessors(Processor.Parameters parameters) {
return Map.of("zoe", (factories, tag, description, config) -> null);
}
};
Expand Down Expand Up @@ -264,10 +264,10 @@ public SearchResponse processResponse(SearchRequest request, SearchResponse resp
}
}

private static class FakeSearchPhaseInjectorProcessor extends FakeProcessor implements SearchPhaseInjectorProcessor {
private static class FakeSearchPhaseResultsProcessor extends FakeProcessor implements SearchPhaseResultsProcessor {
private Consumer<SearchPhaseResult> querySearchResultConsumer;

public FakeSearchPhaseInjectorProcessor(
public FakeSearchPhaseResultsProcessor(
String type,
String tag,
String description,
Expand All @@ -278,7 +278,7 @@ public FakeSearchPhaseInjectorProcessor(
}

@Override
public <Result extends SearchPhaseResult> SearchPhaseResults<Result> execute(
public <Result extends SearchPhaseResult> SearchPhaseResults<Result> process(
SearchPhaseResults<Result> searchPhaseResult,
SearchPhaseContext searchPhaseContext
) {
Expand Down Expand Up @@ -316,11 +316,11 @@ private SearchPipelineService createWithProcessors() {
return new FakeResponseProcessor("fixed_score", tag, description, rsp -> rsp.getHits().forEach(h -> h.score(score)));
});

Map<String, Processor.Factory<SearchPhaseInjectorProcessor>> searchPhaseProcessors = new HashMap<>();
Map<String, Processor.Factory<SearchPhaseResultsProcessor>> searchPhaseProcessors = new HashMap<>();
searchPhaseProcessors.put("max_score", (processorFactories, tag, description, config) -> {
final float finalScore = config.containsKey("score") ? ((Number) config.remove("score")).floatValue() : 100f;
final Consumer<SearchPhaseResult> querySearchResultConsumer = (result) -> result.queryResult().topDocs().maxScore = finalScore;
return new FakeSearchPhaseInjectorProcessor("max_score", tag, description, querySearchResultConsumer);
return new FakeSearchPhaseResultsProcessor("max_score", tag, description, querySearchResultConsumer);
});

return createWithProcessors(requestProcessors, responseProcessors, searchPhaseProcessors);
Expand All @@ -335,7 +335,7 @@ protected NamedWriteableRegistry writableRegistry() {
private SearchPipelineService createWithProcessors(
Map<String, Processor.Factory<SearchRequestProcessor>> requestProcessors,
Map<String, Processor.Factory<SearchResponseProcessor>> responseProcessors,
Map<String, Processor.Factory<SearchPhaseInjectorProcessor>> phaseProcessors
Map<String, Processor.Factory<SearchPhaseResultsProcessor>> phaseProcessors
) {
Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
Expand All @@ -362,7 +362,7 @@ public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProces
}

@Override
public Map<String, Processor.Factory<SearchPhaseInjectorProcessor>> getPhaseInjectorProcessors(
public Map<String, Processor.Factory<SearchPhaseResultsProcessor>> getPhaseResultsProcessors(
Processor.Parameters parameters
) {
return phaseProcessors;
Expand All @@ -387,7 +387,7 @@ public void testUpdatePipelines() {
"{ "
+ "\"request_processors\" : [ { \"scale_request_size\": { \"scale\" : 2 } } ], "
+ "\"response_processors\" : [ { \"fixed_score\" : { \"score\" : 1.0 } } ],"
+ "\"phase_injector_processors\" : [ { \"max_score\" : { \"score\": 100 } } ]"
+ "\"phase_results_processors\" : [ { \"max_score\" : { \"score\": 100 } } ]"
+ "}"
),
XContentType.JSON
Expand All @@ -405,10 +405,10 @@ public void testUpdatePipelines() {
"scale_request_size",
searchPipelineService.getPipelines().get("_id").pipeline.getSearchRequestProcessors().get(0).getType()
);
assertEquals(1, searchPipelineService.getPipelines().get("_id").pipeline.getSearchPhaseInjectorProcessors().size());
assertEquals(1, searchPipelineService.getPipelines().get("_id").pipeline.getSearchPhaseResultsProcessors().size());
assertEquals(
"max_score",
searchPipelineService.getPipelines().get("_id").pipeline.getSearchPhaseInjectorProcessors().get(0).getType()
searchPipelineService.getPipelines().get("_id").pipeline.getSearchPhaseResultsProcessors().get(0).getType()
);
assertEquals(1, searchPipelineService.getPipelines().get("_id").pipeline.getSearchResponseProcessors().size());
assertEquals(
Expand Down Expand Up @@ -447,7 +447,7 @@ public void testPutPipeline() {
assertEquals("empty pipeline", pipeline.pipeline.getDescription());
assertEquals(0, pipeline.pipeline.getSearchRequestProcessors().size());
assertEquals(0, pipeline.pipeline.getSearchResponseProcessors().size());
assertEquals(0, pipeline.pipeline.getSearchPhaseInjectorProcessors().size());
assertEquals(0, pipeline.pipeline.getSearchPhaseResultsProcessors().size());
}

public void testPutInvalidPipeline() throws IllegalAccessException {
Expand Down Expand Up @@ -651,7 +651,7 @@ public void testTransformSearchPhase() {
"p1",
new PipelineConfiguration(
"p1",
new BytesArray("{\"phase_injector_processors\" : [ { \"max_score\" : { } } ]}"),
new BytesArray("{\"phase_results_processors\" : [ { \"max_score\" : { } } ]}"),
XContentType.JSON
)
)
Expand Down Expand Up @@ -746,7 +746,7 @@ public void testGetPipelines() {
"p3",
new PipelineConfiguration(
"p3",
new BytesArray("{\"phase_injector_processors\" : [ { \"max_score\" : { } } ]}"),
new BytesArray("{\"phase_results_processors\" : [ { \"max_score\" : { } } ]}"),
XContentType.JSON
)
)
Expand Down Expand Up @@ -803,7 +803,7 @@ public void testValidatePipeline() throws Exception {
"{"
+ "\"request_processors\": [{ \"scale_request_size\": { \"scale\" : 2 } }],"
+ "\"response_processors\": [{ \"fixed_score\": { \"score\" : 2 } }],"
+ "\"phase_injector_processors\" : [ { \"max_score\" : { } } ]"
+ "\"phase_results_processors\" : [ { \"max_score\" : { } } ]"
+ "}"
),
XContentType.JSON
Expand Down

0 comments on commit 87ab14d

Please sign in to comment.