diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index fb478c4860e67..4ceaeab8dfd7c 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -53,7 +53,6 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchShardTarget; -import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.search.internal.SearchContext; @@ -90,7 +89,7 @@ abstract class AbstractSearchAsyncAction exten private final SearchTransportService searchTransportService; private final Executor executor; private final ActionListener listener; - private final PipelinedRequest request; + private final SearchRequest request; /** * Used by subclasses to resolve node ids to DiscoveryNodes. **/ @@ -128,7 +127,7 @@ abstract class AbstractSearchAsyncAction exten Map concreteIndexBoosts, Map> indexRoutings, Executor executor, - PipelinedRequest request, + SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, @@ -197,10 +196,9 @@ public final void start() { if (getNumShards() == 0) { // no search shards to search on, bail with empty response // (it happens with search across _all with no indices around and consistent with broadcast operations) - SearchSourceBuilder source = request.transformedRequest().source(); - int trackTotalHitsUpTo = source == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO - : source.trackTotalHitsUpTo() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO - : source.trackTotalHitsUpTo(); + int trackTotalHitsUpTo = request.source() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO + : request.source().trackTotalHitsUpTo() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO + : request.source().trackTotalHitsUpTo(); // total hits is null in the response if the tracking of total hits is disabled boolean withTotalHits = trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED; listener.onResponse( @@ -227,10 +225,9 @@ public final void run() { assert iterator.skip(); skipShard(iterator); } - SearchRequest searchRequest = request.transformedRequest(); if (shardsIts.size() > 0) { - assert searchRequest.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults"; - if (searchRequest.allowPartialSearchResults() == false) { + assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults"; + if (request.allowPartialSearchResults() == false) { final StringBuilder missingShards = new StringBuilder(); // Fail-fast verification of all shards being available for (int index = 0; index < shardsIts.size(); index++) { @@ -375,7 +372,7 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha logger.debug(() -> new ParameterizedMessage("All shards failed for phase: [{}]", getName()), cause); onPhaseFailure(currentPhase, "all shards failed", cause); } else { - Boolean allowPartialResults = request.transformedRequest().allowPartialSearchResults(); + Boolean allowPartialResults = request.allowPartialSearchResults(); assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults"; if (allowPartialResults == false && successfulOps.get() != getNumShards()) { // check if there are actual failures in the atomic array since @@ -611,7 +608,7 @@ public final SearchTask getTask() { @Override public final SearchRequest getRequest() { - return request.transformedRequest(); + return request; } protected final SearchResponse buildSearchResponse( @@ -642,22 +639,19 @@ boolean buildPointInTimeFromSearchResults() { @Override public void sendSearchResponse(InternalSearchResponse internalSearchResponse, AtomicArray queryResults) { ShardSearchFailure[] failures = buildShardFailures(); - Boolean allowPartialResults = request.transformedRequest().allowPartialSearchResults(); + Boolean allowPartialResults = request.allowPartialSearchResults(); assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults"; if (allowPartialResults == false && failures.length > 0) { raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures)); } else { final Version minNodeVersion = clusterState.nodes().getMinNodeVersion(); - final String scrollId = request.transformedRequest().scroll() != null - ? TransportSearchHelper.buildScrollId(queryResults, minNodeVersion) - : null; + final String scrollId = request.scroll() != null ? TransportSearchHelper.buildScrollId(queryResults, minNodeVersion) : null; final String searchContextId; if (buildPointInTimeFromSearchResults()) { searchContextId = SearchContextId.encode(queryResults.asList(), aliasFilter, minNodeVersion); } else { - SearchSourceBuilder source = request.transformedRequest().source(); - if (source != null && source.pointInTimeBuilder() != null) { - searchContextId = source.pointInTimeBuilder().getId(); + if (request.source() != null && request.source().pointInTimeBuilder() != null) { + searchContextId = request.source().pointInTimeBuilder().getId(); } else { searchContextId = null; } @@ -679,7 +673,7 @@ public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) */ private void raisePhaseFailure(SearchPhaseExecutionException exception) { // we don't release persistent readers (point in time). - if (request.transformedRequest().pointInTimeBuilder() == null) { + if (request.pointInTimeBuilder() == null) { results.getSuccessfulResults().forEach((entry) -> { if (entry.getContextId() != null) { try { @@ -704,10 +698,12 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) { */ final void onPhaseDone() { // as a tribute to @kimchy aka. finishHim() final SearchPhase nextPhase = getNextPhase(results, this); - // From src files the next phase is never null, but from tests this is a possibility. Hence, making sure that - // tests pass, we need to do null check on next phase. - if (nextPhase != null) { - request.transformSearchPhase(results, this, this.getName(), nextPhase.getName()); + if (request instanceof PipelinedRequest && nextPhase != null) { + // From src files the next phase is never null, but from tests this is a possibility. Hence, making sure that + // tests pass, we need to do null check on next phase. + if (nextPhase != null) { + ((PipelinedRequest) request).transformSearchPhase(results, this, this.getName(), nextPhase.getName()); + } } executeNextPhase(this, nextPhase); } @@ -741,7 +737,7 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet()).toArray(new String[0]); ShardSearchRequest shardRequest = new ShardSearchRequest( shardIt.getOriginalIndices(), - request.transformedRequest(), + request, shardIt.shardId(), getNumShards(), filter, diff --git a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java index cb1dbaa3ff9d5..c026c72f77f00 100644 --- a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java @@ -41,7 +41,6 @@ import org.opensearch.search.SearchShardTarget; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.internal.AliasFilter; -import org.opensearch.search.pipeline.PipelinedRequest; import org.opensearch.search.sort.FieldSortBuilder; import org.opensearch.search.sort.MinAndMax; import org.opensearch.search.sort.SortOrder; @@ -84,7 +83,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction concreteIndexBoosts, Map> indexRoutings, Executor executor, - PipelinedRequest request, + SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, diff --git a/server/src/main/java/org/opensearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index 14dc898d5d999..71a986c0e15f7 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -41,7 +41,6 @@ import org.opensearch.search.dfs.AggregatedDfs; import org.opensearch.search.dfs.DfsSearchResult; import org.opensearch.search.internal.AliasFilter; -import org.opensearch.search.pipeline.PipelinedRequest; import org.opensearch.transport.Transport; import java.util.List; @@ -71,7 +70,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction final SearchPhaseController searchPhaseController, final Executor executor, final QueryPhaseResultConsumer queryPhaseResultConsumer, - final PipelinedRequest request, + final SearchRequest request, final ActionListener listener, final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, @@ -95,13 +94,13 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction clusterState, task, new ArraySearchPhaseResults<>(shardsIts.size()), - request.transformedRequest().getMaxConcurrentShardRequests(), + request.getMaxConcurrentShardRequests(), clusters ); this.queryPhaseResultConsumer = queryPhaseResultConsumer; this.searchPhaseController = searchPhaseController; SearchProgressListener progressListener = task.getProgressListener(); - SearchSourceBuilder sourceBuilder = request.transformedRequest().source(); + SearchSourceBuilder sourceBuilder = request.source(); progressListener.notifyListShards( SearchProgressListener.buildSearchShards(this.shardsIts), SearchProgressListener.buildSearchShards(toSkipShardsIts), diff --git a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java index 6e72dabfb439b..1ead14aac6b51 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -39,11 +39,9 @@ import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchShardTarget; -import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.internal.ShardSearchRequest; -import org.opensearch.search.pipeline.PipelinedRequest; import org.opensearch.search.query.QuerySearchResult; import org.opensearch.transport.Transport; @@ -77,7 +75,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener, final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, @@ -101,11 +99,11 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction 0; + boolean hasFetchPhase = request.source() == null ? true : request.source().size() > 0; progressListener.notifyListShards( SearchProgressListener.buildSearchShards(this.shardsIts), SearchProgressListener.buildSearchShards(toSkipShardsIts), diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index ef83f0450b21a..df2170cbe2af1 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -315,7 +315,7 @@ public void executeRequest( @Override public AbstractSearchAsyncAction asyncSearchAction( SearchTask task, - PipelinedRequest pipelinedRequest, + SearchRequest searchRequest, Executor executor, GroupShardsIterator shardsIts, SearchTimeProvider timeProvider, @@ -338,14 +338,14 @@ public AbstractSearchAsyncAction asyncSearchAction( concreteIndexBoosts, indexRoutings, executor, - pipelinedRequest, + searchRequest, listener, shardsIts, timeProvider, clusterState, task, new ArraySearchPhaseResults<>(shardsIts.size()), - pipelinedRequest.transformedRequest().getMaxConcurrentShardRequests(), + searchRequest.getMaxConcurrentShardRequests(), clusters ) { @Override @@ -390,19 +390,18 @@ private void executeRequest( relativeStartNanos, System::nanoTime ); - PipelinedRequest pipelinedRequest; + PipelinedRequest searchRequest; ActionListener listener; try { - pipelinedRequest = searchPipelineService.resolvePipeline(originalSearchRequest); + searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest); listener = ActionListener.wrap( - r -> originalListener.onResponse(pipelinedRequest.transformResponse(r)), + r -> originalListener.onResponse(searchRequest.transformResponse(r)), originalListener::onFailure ); } catch (Exception e) { originalListener.onFailure(e); return; } - SearchRequest searchRequest = pipelinedRequest.transformedRequest(); ActionListener rewriteListener = ActionListener.wrap(source -> { if (source != searchRequest.source()) { @@ -429,7 +428,7 @@ private void executeRequest( executeLocalSearch( task, timeProvider, - pipelinedRequest, + searchRequest, localIndices, clusterState, listener, @@ -439,7 +438,7 @@ private void executeRequest( } else { if (shouldMinimizeRoundtrips(searchRequest)) { ccsRemoteReduce( - pipelinedRequest, + searchRequest, localIndices, remoteClusterIndices, timeProvider, @@ -496,7 +495,7 @@ private void executeRequest( executeSearch( (SearchTask) task, timeProvider, - pipelinedRequest, + searchRequest, localIndices, remoteShardIterators, clusterNodeLookup, @@ -544,7 +543,7 @@ static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) { } static void ccsRemoteReduce( - PipelinedRequest pipelinedRequest, + SearchRequest searchRequest, OriginalIndices localIndices, Map remoteIndices, SearchTimeProvider timeProvider, @@ -552,7 +551,7 @@ static void ccsRemoteReduce( RemoteClusterService remoteClusterService, ThreadPool threadPool, ActionListener listener, - BiConsumer> localSearchConsumer + BiConsumer> localSearchConsumer ) { if (localIndices == null && remoteIndices.size() == 1) { @@ -563,7 +562,7 @@ static void ccsRemoteReduce( boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias); OriginalIndices indices = entry.getValue(); SearchRequest ccsSearchRequest = SearchRequest.subSearchRequest( - pipelinedRequest.transformedRequest(), + searchRequest, indices.indices(), clusterAlias, timeProvider.getAbsoluteStartMillis(), @@ -612,7 +611,7 @@ public void onFailure(Exception e) { }); } else { SearchResponseMerger searchResponseMerger = createSearchResponseMerger( - pipelinedRequest.transformedRequest().source(), + searchRequest.source(), timeProvider, aggReduceContextBuilder ); @@ -625,7 +624,7 @@ public void onFailure(Exception e) { boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias); OriginalIndices indices = entry.getValue(); SearchRequest ccsSearchRequest = SearchRequest.subSearchRequest( - pipelinedRequest.transformedRequest(), + searchRequest, indices.indices(), clusterAlias, timeProvider.getAbsoluteStartMillis(), @@ -656,14 +655,13 @@ public void onFailure(Exception e) { listener ); SearchRequest ccsLocalSearchRequest = SearchRequest.subSearchRequest( - pipelinedRequest.transformedRequest(), + searchRequest, localIndices.indices(), RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false ); - - localSearchConsumer.accept(pipelinedRequest.replaceRequest(ccsLocalSearchRequest), ccsListener); + localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener); } } } @@ -779,7 +777,7 @@ SearchResponse createFinalResponse() { private void executeLocalSearch( Task task, SearchTimeProvider timeProvider, - PipelinedRequest pipelinedRequest, + SearchRequest searchRequest, OriginalIndices localIndices, ClusterState clusterState, ActionListener listener, @@ -789,7 +787,7 @@ private void executeLocalSearch( executeSearch( (SearchTask) task, timeProvider, - pipelinedRequest, + searchRequest, localIndices, Collections.emptyList(), (clusterName, nodeId) -> null, @@ -907,7 +905,7 @@ private Index[] resolveLocalIndices(OriginalIndices localIndices, ClusterState c private void executeSearch( SearchTask task, SearchTimeProvider timeProvider, - PipelinedRequest pipelinedRequest, + SearchRequest searchRequest, OriginalIndices localIndices, List remoteShardIterators, BiFunction remoteConnections, @@ -929,7 +927,6 @@ private void executeSearch( final Map> indexRoutings; final String[] concreteLocalIndices; - final SearchRequest searchRequest = pipelinedRequest.transformedRequest(); if (searchContext != null) { assert searchRequest.pointInTimeBuilder() != null; aliasFilter = searchContext.aliasFilter(); @@ -1011,7 +1008,7 @@ private void executeSearch( ); searchAsyncActionProvider.asyncSearchAction( task, - pipelinedRequest, + searchRequest, asyncSearchExecutor, shardIterators, timeProvider, @@ -1094,7 +1091,7 @@ static GroupShardsIterator mergeShardsIterators( interface SearchAsyncActionProvider { AbstractSearchAsyncAction asyncSearchAction( SearchTask task, - PipelinedRequest searchRequest, + SearchRequest searchRequest, Executor executor, GroupShardsIterator shardIterators, SearchTimeProvider timeProvider, @@ -1112,7 +1109,7 @@ AbstractSearchAsyncAction asyncSearchAction( private AbstractSearchAsyncAction searchAsyncAction( SearchTask task, - PipelinedRequest pipelinedRequest, + SearchRequest searchRequest, Executor executor, GroupShardsIterator shardIterators, SearchTimeProvider timeProvider, @@ -1135,7 +1132,7 @@ private AbstractSearchAsyncAction searchAsyncAction concreteIndexBoosts, indexRoutings, executor, - pipelinedRequest, + searchRequest, listener, shardIterators, timeProvider, @@ -1144,7 +1141,7 @@ private AbstractSearchAsyncAction searchAsyncAction (iter) -> { AbstractSearchAsyncAction action = searchAsyncAction( task, - pipelinedRequest, + searchRequest, executor, iter, timeProvider, @@ -1168,7 +1165,6 @@ public void run() { clusters ); } else { - final SearchRequest searchRequest = pipelinedRequest.transformedRequest(); final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults( executor, circuitBreaker, @@ -1190,7 +1186,7 @@ public void run() { searchPhaseController, executor, queryResultConsumer, - pipelinedRequest, + searchRequest, listener, shardIterators, timeProvider, @@ -1210,7 +1206,7 @@ public void run() { searchPhaseController, executor, queryResultConsumer, - pipelinedRequest, + searchRequest, listener, shardIterators, timeProvider, diff --git a/server/src/main/java/org/opensearch/search/pipeline/PipelinedRequest.java b/server/src/main/java/org/opensearch/search/pipeline/PipelinedRequest.java index 966d6ba5a3e9b..eb5fd4b6c4c26 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/PipelinedRequest.java +++ b/server/src/main/java/org/opensearch/search/pipeline/PipelinedRequest.java @@ -19,21 +19,16 @@ * * @opensearch.internal */ -public final class PipelinedRequest { +public final class PipelinedRequest extends SearchRequest { private final Pipeline pipeline; - private final SearchRequest transformedRequest; PipelinedRequest(Pipeline pipeline, SearchRequest transformedRequest) { + super(transformedRequest); this.pipeline = pipeline; - this.transformedRequest = transformedRequest; } public SearchResponse transformResponse(SearchResponse response) { - return pipeline.transformResponse(transformedRequest, response); - } - - public SearchRequest transformedRequest() { - return transformedRequest; + return pipeline.transformResponse(this, response); } public SearchPhaseResults transformSearchPhase( @@ -49,21 +44,4 @@ public SearchPhaseResults transformSe Pipeline getPipeline() { return pipeline; } - - /** - * Wraps a search request with a no-op pipeline. Useful for testing. - * - * @param searchRequest the original search request - * @return a search request associated with a pipeline that does nothing - */ - public static PipelinedRequest wrapSearchRequest(SearchRequest searchRequest) { - return new PipelinedRequest(Pipeline.NO_OP_PIPELINE, searchRequest); - } - - /** - * Wraps the given search request with this request's pipeline. - */ - public PipelinedRequest replaceRequest(SearchRequest searchRequest) { - return new PipelinedRequest(pipeline, searchRequest); - } } diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index 206b8a571bb5b..ad2657517df9a 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -52,7 +52,6 @@ import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.search.internal.ShardSearchRequest; -import org.opensearch.search.pipeline.PipelinedRequest; import org.opensearch.search.query.QuerySearchResult; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.Transport; @@ -154,7 +153,7 @@ private AbstractSearchAsyncAction createAction( Collections.singletonMap("foo", 2.0f), Collections.singletonMap("name", Sets.newHashSet("bar", "baz")), executor, - PipelinedRequest.wrapSearchRequest(request), + request, listener, new GroupShardsIterator<>(Arrays.asList(shards)), timeProvider, diff --git a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java index 1a743716025d6..2e3ff166a6a53 100644 --- a/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/opensearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -47,7 +47,6 @@ import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.ShardSearchRequest; -import org.opensearch.search.pipeline.PipelinedRequest; import org.opensearch.search.sort.MinAndMax; import org.opensearch.search.sort.SortBuilders; import org.opensearch.search.sort.SortOrder; @@ -124,7 +123,7 @@ public void sendCanMatch( Collections.emptyMap(), Collections.emptyMap(), OpenSearchExecutors.newDirectExecutorService(), - PipelinedRequest.wrapSearchRequest(searchRequest), + searchRequest, null, shardsIter, timeProvider, @@ -215,7 +214,7 @@ public void sendCanMatch( Collections.emptyMap(), Collections.emptyMap(), OpenSearchExecutors.newDirectExecutorService(), - PipelinedRequest.wrapSearchRequest(searchRequest), + searchRequest, null, shardsIter, timeProvider, @@ -296,7 +295,7 @@ public void sendCanMatch( Collections.emptyMap(), Collections.emptyMap(), OpenSearchExecutors.newDirectExecutorService(), - PipelinedRequest.wrapSearchRequest(searchRequest), + searchRequest, null, shardsIter, timeProvider, @@ -310,7 +309,7 @@ public void sendCanMatch( Collections.emptyMap(), Collections.emptyMap(), executor, - PipelinedRequest.wrapSearchRequest(searchRequest), + searchRequest, responseListener, iter, new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), @@ -416,7 +415,7 @@ public void sendCanMatch( Collections.emptyMap(), Collections.emptyMap(), OpenSearchExecutors.newDirectExecutorService(), - PipelinedRequest.wrapSearchRequest(searchRequest), + searchRequest, null, shardsIter, timeProvider, @@ -515,7 +514,7 @@ public void sendCanMatch( Collections.emptyMap(), Collections.emptyMap(), OpenSearchExecutors.newDirectExecutorService(), - PipelinedRequest.wrapSearchRequest(searchRequest), + searchRequest, null, shardsIter, timeProvider, diff --git a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java index 53131d884a60a..cf838682aa717 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchAsyncActionTests.java @@ -31,7 +31,6 @@ package org.opensearch.action.search; -import org.mockito.Mockito; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.OriginalIndices; @@ -51,8 +50,6 @@ import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.search.internal.ShardSearchContextId; -import org.opensearch.search.pipeline.PipelinedRequest; -import org.opensearch.search.pipeline.SearchPipelineService; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportException; @@ -81,8 +78,6 @@ public class SearchAsyncActionTests extends OpenSearchTestCase { - private SearchPipelineService searchPipelineService = Mockito.mock(SearchPipelineService.class); - public void testSkipSearchShards() throws InterruptedException { SearchRequest request = new SearchRequest(); request.allowPartialSearchResults(true); @@ -132,7 +127,7 @@ public void testSkipSearchShards() throws InterruptedException { Collections.emptyMap(), Collections.emptyMap(), null, - PipelinedRequest.wrapSearchRequest(request), + request, responseListener, shardsIter, new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), @@ -250,7 +245,7 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { Collections.emptyMap(), Collections.emptyMap(), null, - PipelinedRequest.wrapSearchRequest(request), + request, responseListener, shardsIter, new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), @@ -367,7 +362,7 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI Collections.emptyMap(), Collections.emptyMap(), executor, - PipelinedRequest.wrapSearchRequest(request), + request, responseListener, shardsIter, new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), @@ -489,7 +484,7 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI Collections.emptyMap(), Collections.emptyMap(), executor, - PipelinedRequest.wrapSearchRequest(request), + request, responseListener, shardsIter, new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), @@ -602,7 +597,7 @@ public void testAllowPartialResults() throws InterruptedException { Collections.emptyMap(), Collections.emptyMap(), null, - PipelinedRequest.wrapSearchRequest(request), + request, responseListener, shardsIter, new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), diff --git a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java index e1bf9244b3a6b..4e351e1424cd0 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -56,7 +56,6 @@ import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.search.internal.ShardSearchRequest; -import org.opensearch.search.pipeline.PipelinedRequest; import org.opensearch.search.query.QuerySearchResult; import org.opensearch.search.sort.SortBuilders; import org.opensearch.test.OpenSearchTestCase; @@ -76,7 +75,6 @@ import static org.hamcrest.Matchers.instanceOf; public class SearchQueryThenFetchAsyncActionTests extends OpenSearchTestCase { - public void testBottomFieldSort() throws Exception { testCase(false, false); } @@ -210,7 +208,7 @@ public void sendExecuteQuery( controller, executor, resultConsumer, - PipelinedRequest.wrapSearchRequest(searchRequest), + searchRequest, null, shardsIter, timeProvider, diff --git a/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java index 96ffb016604f9..51d9a06c9ac43 100644 --- a/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/TransportSearchActionTests.java @@ -74,7 +74,6 @@ import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.search.internal.SearchContext; -import org.opensearch.search.pipeline.PipelinedRequest; import org.opensearch.search.sort.SortBuilders; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.MockTransportService; @@ -459,14 +458,14 @@ public void testCCSRemoteReduceMergeFails() throws Exception { SearchRequest searchRequest = new SearchRequest(); searchRequest.preference("null_target"); final CountDownLatch latch = new CountDownLatch(1); - SetOnce>> setOnce = new SetOnce<>(); + SetOnce>> setOnce = new SetOnce<>(); AtomicReference failure = new AtomicReference<>(); LatchedActionListener listener = new LatchedActionListener<>( ActionListener.wrap(r -> fail("no response expected"), failure::set), latch ); TransportSearchAction.ccsRemoteReduce( - PipelinedRequest.wrapSearchRequest(searchRequest), + searchRequest, localIndices, remoteIndicesByCluster, timeProvider, @@ -479,8 +478,8 @@ public void testCCSRemoteReduceMergeFails() throws Exception { if (localIndices == null) { assertNull(setOnce.get()); } else { - Tuple> tuple = setOnce.get(); - assertEquals("", tuple.v1().transformedRequest().getLocalClusterAlias()); + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); tuple.v2().onResponse(emptySearchResponse()); } @@ -515,14 +514,14 @@ public void testCCSRemoteReduce() throws Exception { { SearchRequest searchRequest = new SearchRequest(); final CountDownLatch latch = new CountDownLatch(1); - SetOnce>> setOnce = new SetOnce<>(); + SetOnce>> setOnce = new SetOnce<>(); AtomicReference response = new AtomicReference<>(); LatchedActionListener listener = new LatchedActionListener<>( ActionListener.wrap(response::set, e -> fail("no failures expected")), latch ); TransportSearchAction.ccsRemoteReduce( - PipelinedRequest.wrapSearchRequest(searchRequest), + searchRequest, localIndices, remoteIndicesByCluster, timeProvider, @@ -535,8 +534,8 @@ public void testCCSRemoteReduce() throws Exception { if (localIndices == null) { assertNull(setOnce.get()); } else { - Tuple> tuple = setOnce.get(); - assertEquals("", tuple.v1().transformedRequest().getLocalClusterAlias()); + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); tuple.v2().onResponse(emptySearchResponse()); } @@ -552,14 +551,14 @@ public void testCCSRemoteReduce() throws Exception { SearchRequest searchRequest = new SearchRequest(); searchRequest.preference("index_not_found"); final CountDownLatch latch = new CountDownLatch(1); - SetOnce>> setOnce = new SetOnce<>(); + SetOnce>> setOnce = new SetOnce<>(); AtomicReference failure = new AtomicReference<>(); LatchedActionListener listener = new LatchedActionListener<>( ActionListener.wrap(r -> fail("no response expected"), failure::set), latch ); TransportSearchAction.ccsRemoteReduce( - PipelinedRequest.wrapSearchRequest(searchRequest), + searchRequest, localIndices, remoteIndicesByCluster, timeProvider, @@ -572,8 +571,8 @@ public void testCCSRemoteReduce() throws Exception { if (localIndices == null) { assertNull(setOnce.get()); } else { - Tuple> tuple = setOnce.get(); - assertEquals("", tuple.v1().transformedRequest().getLocalClusterAlias()); + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); tuple.v2().onResponse(emptySearchResponse()); } @@ -610,14 +609,14 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti { SearchRequest searchRequest = new SearchRequest(); final CountDownLatch latch = new CountDownLatch(1); - SetOnce>> setOnce = new SetOnce<>(); + SetOnce>> setOnce = new SetOnce<>(); AtomicReference failure = new AtomicReference<>(); LatchedActionListener listener = new LatchedActionListener<>( ActionListener.wrap(r -> fail("no response expected"), failure::set), latch ); TransportSearchAction.ccsRemoteReduce( - PipelinedRequest.wrapSearchRequest(searchRequest), + searchRequest, localIndices, remoteIndicesByCluster, timeProvider, @@ -630,8 +629,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti if (localIndices == null) { assertNull(setOnce.get()); } else { - Tuple> tuple = setOnce.get(); - assertEquals("", tuple.v1().transformedRequest().getLocalClusterAlias()); + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); tuple.v2().onResponse(emptySearchResponse()); } @@ -650,14 +649,14 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti { SearchRequest searchRequest = new SearchRequest(); final CountDownLatch latch = new CountDownLatch(1); - SetOnce>> setOnce = new SetOnce<>(); + SetOnce>> setOnce = new SetOnce<>(); AtomicReference response = new AtomicReference<>(); LatchedActionListener listener = new LatchedActionListener<>( ActionListener.wrap(response::set, e -> fail("no failures expected")), latch ); TransportSearchAction.ccsRemoteReduce( - PipelinedRequest.wrapSearchRequest(searchRequest), + searchRequest, localIndices, remoteIndicesByCluster, timeProvider, @@ -670,8 +669,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti if (localIndices == null) { assertNull(setOnce.get()); } else { - Tuple> tuple = setOnce.get(); - assertEquals("", tuple.v1().transformedRequest().getLocalClusterAlias()); + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); tuple.v2().onResponse(emptySearchResponse()); } @@ -701,14 +700,14 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti { SearchRequest searchRequest = new SearchRequest(); final CountDownLatch latch = new CountDownLatch(1); - SetOnce>> setOnce = new SetOnce<>(); + SetOnce>> setOnce = new SetOnce<>(); AtomicReference response = new AtomicReference<>(); LatchedActionListener listener = new LatchedActionListener<>( ActionListener.wrap(response::set, e -> fail("no failures expected")), latch ); TransportSearchAction.ccsRemoteReduce( - PipelinedRequest.wrapSearchRequest(searchRequest), + searchRequest, localIndices, remoteIndicesByCluster, timeProvider, @@ -721,8 +720,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti if (localIndices == null) { assertNull(setOnce.get()); } else { - Tuple> tuple = setOnce.get(); - assertEquals("", tuple.v1().transformedRequest().getLocalClusterAlias()); + Tuple> tuple = setOnce.get(); + assertEquals("", tuple.v1().getLocalClusterAlias()); assertThat(tuple.v2(), instanceOf(TransportSearchAction.CCSActionListener.class)); tuple.v2().onResponse(emptySearchResponse()); } diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index 05bfa80a88b72..416983a19e9f1 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -198,13 +198,13 @@ public void testResolveIndexDefaultPipeline() throws Exception { SearchRequest searchRequest = new SearchRequest("my_index").source(SearchSourceBuilder.searchSource().size(5)); PipelinedRequest pipelinedRequest = service.resolvePipeline(searchRequest); assertEquals("p1", pipelinedRequest.getPipeline().getId()); - assertEquals(10, pipelinedRequest.transformedRequest().source().size()); + assertEquals(10, pipelinedRequest.source().size()); // Bypass the default pipeline searchRequest.pipeline("_none"); pipelinedRequest = service.resolvePipeline(searchRequest); assertEquals("_none", pipelinedRequest.getPipeline().getId()); - assertEquals(5, pipelinedRequest.transformedRequest().source().size()); + assertEquals(5, pipelinedRequest.source().size()); } private static abstract class FakeProcessor implements Processor { @@ -585,17 +585,14 @@ public void testTransformRequest() throws Exception { SearchRequest request = new SearchRequest("_index").source(sourceBuilder).pipeline("p1"); PipelinedRequest pipelinedRequest = searchPipelineService.resolvePipeline(request); - SearchRequest transformedRequest = pipelinedRequest.transformedRequest(); - assertEquals(2 * size, transformedRequest.source().size()); + assertEquals(2 * size, pipelinedRequest.source().size()); assertEquals(size, request.source().size()); // This request doesn't specify a pipeline, it doesn't get transformed. request = new SearchRequest("_index").source(sourceBuilder); pipelinedRequest = searchPipelineService.resolvePipeline(request); - SearchRequest notTransformedRequest = pipelinedRequest.transformedRequest(); - assertEquals(size, notTransformedRequest.source().size()); - assertSame(request, notTransformedRequest); + assertEquals(size, pipelinedRequest.source().size()); } public void testTransformResponse() throws Exception { @@ -870,8 +867,7 @@ public void testInlinePipeline() throws Exception { assertEquals(1, pipeline.getSearchResponseProcessors().size()); // Verify that pipeline transforms request - SearchRequest transformedRequest = pipelinedRequest.transformedRequest(); - assertEquals(200, transformedRequest.source().size()); + assertEquals(200, pipelinedRequest.source().size()); int size = 10; SearchHit[] hits = new SearchHit[size];