diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 8e31aa23d88cf..082f9ae688b92 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -235,14 +235,7 @@ import org.opensearch.action.ingest.SimulatePipelineTransportAction; import org.opensearch.action.main.MainAction; import org.opensearch.action.main.TransportMainAction; -import org.opensearch.action.search.ClearScrollAction; -import org.opensearch.action.search.MultiSearchAction; -import org.opensearch.action.search.SearchAction; -import org.opensearch.action.search.SearchScrollAction; -import org.opensearch.action.search.TransportClearScrollAction; -import org.opensearch.action.search.TransportMultiSearchAction; -import org.opensearch.action.search.TransportSearchAction; -import org.opensearch.action.search.TransportSearchScrollAction; +import org.opensearch.action.search.*; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.support.DestructiveOperations; @@ -398,12 +391,7 @@ import org.opensearch.rest.action.ingest.RestGetPipelineAction; import org.opensearch.rest.action.ingest.RestPutPipelineAction; import org.opensearch.rest.action.ingest.RestSimulatePipelineAction; -import org.opensearch.rest.action.search.RestClearScrollAction; -import org.opensearch.rest.action.search.RestCountAction; -import org.opensearch.rest.action.search.RestExplainAction; -import org.opensearch.rest.action.search.RestMultiSearchAction; -import org.opensearch.rest.action.search.RestSearchAction; -import org.opensearch.rest.action.search.RestSearchScrollAction; +import org.opensearch.rest.action.search.*; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.usage.UsageService; @@ -660,6 +648,7 @@ public void reg actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class); actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class); actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class); + actions.register(CreatePITAction.INSTANCE, TransportCreatePITAction.class); return unmodifiableMap(actions.getRegistry()); } @@ -832,6 +821,10 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestRepositoriesAction()); registerHandler.accept(new RestSnapshotAction()); registerHandler.accept(new RestTemplatesAction()); + + // Point in time API + registerHandler.accept(new RestCreatePITAction()); + for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers( settings, diff --git a/server/src/main/java/org/opensearch/action/search/CreatePITAction.java b/server/src/main/java/org/opensearch/action/search/CreatePITAction.java new file mode 100644 index 0000000000000..69b7b4bb2d61f --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/CreatePITAction.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionType; + +public class CreatePITAction extends ActionType { + public static final CreatePITAction INSTANCE = new CreatePITAction(); + public static final String NAME = "indices:data/read/pit"; + + private CreatePITAction() { + super(NAME, SearchResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/CreatePITRequest.java b/server/src/main/java/org/opensearch/action/search/CreatePITRequest.java new file mode 100644 index 0000000000000..eb077969fc0d7 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/CreatePITRequest.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.IndicesRequest; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.common.Nullable; +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +public class CreatePITRequest extends ActionRequest implements IndicesRequest.Replaceable { + + private TimeValue keepAlive; + private final boolean allowPartialPitCreation; + @Nullable + private String routing = null; + @Nullable + private String preference = null; + private String[] indices = Strings.EMPTY_ARRAY; + private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; + + + public CreatePITRequest(TimeValue keepAlive, boolean allowPartialPitCreation) { + this.keepAlive = keepAlive; + this.allowPartialPitCreation = allowPartialPitCreation; + } + + public CreatePITRequest(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + routing = in.readOptionalString(); + preference = in.readOptionalString(); + keepAlive = in.readTimeValue(); + routing = in.readOptionalString(); + allowPartialPitCreation = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + indicesOptions.writeIndicesOptions(out); + out.writeOptionalString(preference); + out.writeTimeValue(keepAlive); + out.writeOptionalString(routing); + out.writeBoolean(allowPartialPitCreation); + } + + public String getRouting() { + return routing; + } + + public String getPreference() { + return preference; + } + + public String[] getIndices() { + return indices; + } + + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + + public TimeValue getKeepAlive() { + return keepAlive; + } + + public boolean isAllowPartialPitCreation() { + return allowPartialPitCreation; + } + + public void setRouting(String routing) { + this.routing = routing; + } + + public void setPreference(String preference) { + this.preference = preference; + } + + public void setIndices(String[] indices) { + this.indices = indices; + } + + public void setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = Objects.requireNonNull(indicesOptions, "indicesOptions must not be null"); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public String[] indices() { + return indices; + } + + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + public void setKeepAlive(TimeValue keepAlive) { + this.keepAlive = keepAlive; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new SearchTask(id, type, action, () -> "desc", parentTaskId, headers); + } + + /** + * Sets the indices the search will be executed on. + */ + @Override + public CreatePITRequest indices(String... indices) { + SearchRequest.validateIndices(indices); + this.indices = indices; + return this; + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java index c4b97c35bc405..b816c926daacc 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java @@ -379,7 +379,7 @@ public SearchRequest indices(String... indices) { return this; } - private static void validateIndices(String... indices) { + protected static void validateIndices(String... indices) { Objects.requireNonNull(indices, "indices must not be null"); for (String index : indices) { Objects.requireNonNull(index, "index must not be null"); diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index 121de6d1c80da..ddadf25f75d14 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -93,6 +93,8 @@ public class SearchTransportService { public static final String FETCH_ID_SCROLL_ACTION_NAME = "indices:data/read/search[phase/fetch/id/scroll]"; public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]"; public static final String QUERY_CAN_MATCH_NAME = "indices:data/read/search[can_match]"; + public static final String CREATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[create_context]"; + public static final String UPDATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[update_context]"; private final TransportService transportService; private final BiFunction responseWrapper; @@ -140,6 +142,20 @@ public void sendFreeContext( ); } + public void updatePitContext( + Transport.Connection connection, + TransportCreatePITAction.UpdatePITReaderRequest request, + ActionListener actionListener) { + transportService.sendRequest( + connection, + UPDATE_READER_CONTEXT_ACTION_NAME, + request, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler(actionListener, + TransportCreatePITAction.UpdatePitContextResponse::new) + ); + } + public void sendCanMatch( Transport.Connection connection, final ShardSearchRequest request, @@ -307,7 +323,7 @@ public Map getPendingSearchRequests() { return new HashMap<>(clientConnections); } - static class ScrollFreeContextRequest extends TransportRequest { + static class ScrollFreeContextRequest extends TransportRequest { private ShardSearchContextId contextId; ScrollFreeContextRequest(ShardSearchContextId contextId) { @@ -545,6 +561,50 @@ public static void registerRequestHandler(TransportService transportService, Sea } ); TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, SearchService.CanMatchResponse::new); + transportService.registerRequestHandler( + CREATE_READER_CONTEXT_ACTION_NAME, + ThreadPool.Names.SAME, + TransportCreatePITAction.CreateReaderContextRequest::new, + (request, channel, task) -> { + ChannelActionListener< + TransportCreatePITAction.CreateReaderContextResponse, + TransportCreatePITAction.CreateReaderContextRequest> + listener = + new ChannelActionListener<>(channel, CREATE_READER_CONTEXT_ACTION_NAME, request); + searchService.openReaderContext( + request.getShardId(), + request.getKeepAlive(), + ActionListener.wrap( + r -> + listener.onResponse( + new TransportCreatePITAction.CreateReaderContextResponse(r)), + listener::onFailure)); + }); + TransportActionProxy.registerProxyAction( + transportService, + CREATE_READER_CONTEXT_ACTION_NAME, + TransportCreatePITAction.CreateReaderContextResponse::new); + + transportService.registerRequestHandler( + UPDATE_READER_CONTEXT_ACTION_NAME, + ThreadPool.Names.SAME, + TransportCreatePITAction.UpdatePITReaderRequest::new, + (request, channel, task) -> { + ChannelActionListener listener = + new ChannelActionListener<>(channel, UPDATE_READER_CONTEXT_ACTION_NAME, request); + searchService.updatePitIdAndKeepAlive(request, + ActionListener.wrap( + r -> listener.onResponse(r), + listener::onFailure + )); + } + ); + TransportActionProxy.registerProxyAction( + transportService, + UPDATE_READER_CONTEXT_ACTION_NAME, + TransportCreatePITAction.UpdatePitContextResponse::new); + } /** diff --git a/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java b/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java new file mode 100644 index 0000000000000..01b4104cb5ab9 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java @@ -0,0 +1,305 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Strings; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.shard.ShardId; +import org.opensearch.search.SearchPhaseResult; +import org.opensearch.search.SearchService; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.internal.ShardSearchContextId; +import org.opensearch.tasks.Task; +import org.opensearch.transport.*; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +/** + * Transport action for creating PIT reader context + */ +public class TransportCreatePITAction extends HandledTransportAction { + + public static final String CREATE_PIT = "create_pit"; + private final TimeValue CREATE_PIT_TEMPORARY_KEEP_ALIVE = new TimeValue(30, TimeUnit.SECONDS); + private SearchService searchService; + private final TransportService transportService; + private final SearchTransportService searchTransportService; + private final ClusterService clusterService; + private TransportSearchAction transportSearchAction; + private final NamedWriteableRegistry namedWriteableRegistry; + + @Inject + public TransportCreatePITAction( + SearchService searchService, + TransportService transportService, + ActionFilters actionFilters, + SearchTransportService searchTransportService, + ClusterService clusterService, + TransportSearchAction transportSearchAction, + NamedWriteableRegistry namedWriteableRegistry) { + super(CreatePITAction.NAME, transportService, actionFilters, in -> new CreatePITRequest(in)); + this.searchService = searchService; + this.transportService = transportService; + this.searchTransportService = searchTransportService; + this.clusterService = clusterService; + this.transportSearchAction = transportSearchAction; + this.namedWriteableRegistry = namedWriteableRegistry; + } + + @Override + protected void doExecute(Task task, CreatePITRequest request, ActionListener listener) { + SearchRequest searchRequest = new SearchRequest(request.getIndices()); + searchRequest.preference(request.getPreference()); + searchRequest.routing(request.getRouting()); + searchRequest.indicesOptions(request.getIndicesOptions()); + searchRequest.allowPartialSearchResults(request.isAllowPartialPitCreation()); + + final StepListener createPitListener = new StepListener(); + + final ActionListener updatePitIdListener = + new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + listener.onResponse(searchResponse); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }; + + /** + * Phase 1 of create PIT request : Create PIT reader contexts in the associated shards with a + * temporary keep alive + */ + transportSearchAction.executeRequest( + task, + searchRequest, + CREATE_PIT, + true, + new TransportSearchAction.SinglePhaseSearchAction() { + @Override + public void executeOnShardTarget( + SearchTask searchTask, + SearchShardTarget target, + Transport.Connection connection, + ActionListener searchPhaseResultActionListener) { + transportService.sendChildRequest( + connection, + SearchTransportService.CREATE_READER_CONTEXT_ACTION_NAME, + new CreateReaderContextRequest(target.getShardId(), CREATE_PIT_TEMPORARY_KEEP_ALIVE), + searchTask, + new TransportResponseHandler() { + @Override + public CreateReaderContextResponse read(StreamInput in) throws IOException { + return new CreateReaderContextResponse(in); + } + + @Override + public void handleResponse(CreateReaderContextResponse response) { + searchPhaseResultActionListener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + searchPhaseResultActionListener.onFailure(exp); + } + + @Override + public String executor() { + return "generic"; // TODO + } + }); + } + }, + createPitListener); + + /** + * Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request + * Fail create pit operation if any of the updates in this phase are failed + */ + createPitListener.whenComplete( + searchResponse -> { + SearchContextId contextId = + SearchContextId.decode(namedWriteableRegistry, searchResponse.pointInTimeId()); + final StepListener> lookupListener = + getConnectionLookupListener(contextId); + lookupListener.whenComplete( + nodelookup -> { + final ActionListener + groupedActionListener = + getGroupedListener( + updatePitIdListener, searchResponse, contextId.shards().size()); + for (Map.Entry entry : + contextId.shards().entrySet()) { + DiscoveryNode node = + nodelookup.apply( + entry.getValue().getClusterAlias(), entry.getValue().getNode()); + try { + final Transport.Connection connection = + searchTransportService.getConnection( + entry.getValue().getClusterAlias(), node); + searchTransportService.updatePitContext( + connection, + new UpdatePITReaderRequest( + entry.getValue().getSearchContextId(), + searchResponse.pointInTimeId(), + request.getKeepAlive().millis()), + groupedActionListener); + } catch (Exception e) { + groupedActionListener.onFailure(e); + } + } + }, + updatePitIdListener::onFailure); + }, + updatePitIdListener::onFailure); + } + + private StepListener> getConnectionLookupListener( + SearchContextId contextId) { + ClusterState state = clusterService.state(); + + final Set clusters = + contextId.shards().values().stream() + .filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false) + .map(SearchContextIdForNode::getClusterAlias) + .collect(Collectors.toSet()); + + final StepListener> lookupListener = + new StepListener<>(); + + if (clusters.isEmpty() == false) { + searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener); + } else { + lookupListener.onResponse((cluster, nodeId) -> state.getNodes().get(nodeId)); + } + return lookupListener; + } + + private ActionListener getGroupedListener( + ActionListener updatePitIdListener, SearchResponse searchResponse, int size) { + return new GroupedActionListener( + new ActionListener>() { + @Override + public void onResponse(final Collection responses) { + updatePitIdListener.onResponse(searchResponse); + } + @Override + public void onFailure(final Exception e) { + updatePitIdListener.onFailure(e); + } + }, + size); + } + + public static class CreateReaderContextRequest extends TransportRequest { + private final ShardId shardId; + private final TimeValue keepAlive; + public CreateReaderContextRequest(ShardId shardId, TimeValue keepAlive) { + this.shardId = shardId; + this.keepAlive = keepAlive; + } + public ShardId getShardId() { + return shardId; + } + public TimeValue getKeepAlive() { + return keepAlive; + } + public CreateReaderContextRequest(StreamInput in) throws IOException { + super(in); + this.shardId = new ShardId(in); + this.keepAlive = in.readTimeValue(); + } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + out.writeTimeValue(keepAlive); + } + } + + public static class CreateReaderContextResponse extends SearchPhaseResult { + public CreateReaderContextResponse(ShardSearchContextId shardSearchContextId) { + this.contextId = shardSearchContextId; + } + public CreateReaderContextResponse(StreamInput in) throws IOException { + super(in); + contextId = new ShardSearchContextId(in); + } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + contextId.writeTo(out); + } + } + + public static class UpdatePITReaderRequest extends TransportRequest { + private String PitId; + private long keepAlive; + private ShardSearchContextId searchContextId; + UpdatePITReaderRequest(ShardSearchContextId searchContextId, String PitId, long keepAlive) { + this.PitId = PitId; + this.searchContextId = searchContextId; + this.keepAlive = keepAlive; + } + UpdatePITReaderRequest(StreamInput in) throws IOException { + super(in); + } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + public ShardSearchContextId getSearchContextId() { + return searchContextId; + } + public String getPitId() { + return PitId; + } + public String id() { + return this.getPitId(); + } + } + + public static class UpdatePitContextResponse extends TransportResponse { + private String pitId; + UpdatePitContextResponse(StreamInput in) throws IOException { + pitId = in.readString(); + } + public UpdatePitContextResponse(String pitId) { + this.pitId = pitId; + } + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(pitId); + } + public String getPitId() { + return pitId; + } + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index c758b7d2918e7..0dab08db426fe 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -463,6 +463,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.KEEPALIVE_INTERVAL_SETTING, SearchService.MAX_KEEPALIVE_SETTING, SearchService.ALLOW_EXPENSIVE_QUERIES, + SearchService.MAX_OPEN_PIT_CONTEXT, MultiBucketConsumerService.MAX_BUCKET_SETTING, SearchService.LOW_LEVEL_CANCELLATION_SETTING, SearchService.MAX_OPEN_SCROLL_CONTEXT, diff --git a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java index da4b3a141dfc5..30ed4041f6b39 100644 --- a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java @@ -129,6 +129,19 @@ default void onFreeScrollContext(ReaderContext readerContext) {} */ default void validateReaderContext(ReaderContext readerContext, TransportRequest transportRequest) {} + /** + * Executed when a new Point-In-Time {@link ReaderContext} was created + * @param readerContext the created reader context + */ + default void onNewPitContext(ReaderContext readerContext) {} + + /** + * Executed when a Point-In-Time search {@link SearchContext} is freed. + * This happens on deleteion of a Point-In-Time or on it's keep-alive expiring. + * @param readerContext the freed search context + */ + default void onFreePitContext(ReaderContext readerContext) {} + /** * A Composite listener that multiplexes calls to each of the listeners methods. */ @@ -263,5 +276,27 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest } ExceptionsHelper.reThrowIfNotNull(exception); } + + @Override + public void onNewPitContext(ReaderContext readerContext) { + for (SearchOperationListener listener : listeners) { + try { + listener.onNewPitContext(readerContext); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onNewPitContext listener [{}] failed", listener), e); + } + } + } + + @Override + public void onFreePitContext(ReaderContext readerContext) { + for (SearchOperationListener listener : listeners) { + try { + listener.onNewPitContext(readerContext); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onFreePitContext listener [{}] failed", listener), e); + } + } + } } } diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java new file mode 100644 index 0000000000000..198ad06d7fbff --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.search; + +import org.opensearch.action.search.CreatePITAction; +import org.opensearch.action.search.CreatePITRequest; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.Strings; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestCancellableNodeClient; +import org.opensearch.rest.action.RestStatusToXContentListener; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.POST; + +public class RestCreatePITAction extends BaseRestHandler { + @Override + public String getName() { + return "create_pit_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + boolean allowPartialPitCreation = request.paramAsBoolean("allow_partial_pit_creation", false); + + CreatePITRequest createPitRequest = + new CreatePITRequest(request.paramAsTime("keep_alive", null), allowPartialPitCreation); + createPitRequest.setIndicesOptions(IndicesOptions.fromRequest(request, createPitRequest.indicesOptions())); + createPitRequest.setPreference(request.param("preference")); + createPitRequest.setRouting(request.param("routing")); + createPitRequest.setIndices(Strings.splitStringByCommaToArray(request.param("index"))); + return channel -> { + RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel()); + cancelClient.execute(CreatePITAction.INSTANCE, createPitRequest, new RestStatusToXContentListener<>(channel)); + }; + } + + @Override + public List routes() { + return unmodifiableList(Collections.singletonList( + new Route(POST, "/{index}/_pit"))); + } + +} diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 5a5f223d33e60..729cd2801d899 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -41,9 +41,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; import org.opensearch.action.OriginalIndices; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchShardTask; -import org.opensearch.action.search.SearchType; +import org.opensearch.action.search.*; import org.opensearch.action.support.TransportActions; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.service.ClusterService; @@ -71,6 +69,7 @@ import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.Segment; import org.opensearch.index.query.InnerHitContextBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.MatchNoneQueryBuilder; @@ -108,13 +107,7 @@ import org.opensearch.search.fetch.subphase.FetchFieldsContext; import org.opensearch.search.fetch.subphase.ScriptFieldsContext.ScriptField; import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder; -import org.opensearch.search.internal.AliasFilter; -import org.opensearch.search.internal.InternalScrollSearchRequest; -import org.opensearch.search.internal.LegacyReaderContext; -import org.opensearch.search.internal.ReaderContext; -import org.opensearch.search.internal.SearchContext; -import org.opensearch.search.internal.ShardSearchContextId; -import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.search.internal.*; import org.opensearch.search.lookup.SearchLookup; import org.opensearch.search.profile.Profilers; import org.opensearch.search.query.QueryPhase; @@ -213,6 +206,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.NodeScope ); + public static final Setting MAX_OPEN_PIT_CONTEXT = + Setting.intSetting( + "search.max_open_pit_context", 500, 0, Property.Dynamic, Property.NodeScope); + public static final int DEFAULT_SIZE = 10; public static final int DEFAULT_FROM = 0; @@ -246,6 +243,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private volatile int maxOpenScrollContext; + private volatile int maxOpenPitContext; + private final Cancellable keepAliveReaper; private final AtomicLong idGenerator = new AtomicLong(); @@ -254,6 +253,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final MultiBucketConsumerService multiBucketConsumerService; + private final AtomicInteger openPitContexts = new AtomicInteger(); private final AtomicInteger openScrollContexts = new AtomicInteger(); private final String sessionId = UUIDs.randomBase64UUID(); private final Executor indexSearcherExecutor; @@ -304,6 +304,9 @@ public SearchService( maxOpenScrollContext = MAX_OPEN_SCROLL_CONTEXT.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_OPEN_SCROLL_CONTEXT, this::setMaxOpenScrollContext); + maxOpenPitContext = MAX_OPEN_PIT_CONTEXT.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_OPEN_PIT_CONTEXT, this::setMaxOpenPitContext); + lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation); } @@ -348,6 +351,10 @@ private void setMaxOpenScrollContext(int maxOpenScrollContext) { this.maxOpenScrollContext = maxOpenScrollContext; } + private void setMaxOpenPitContext(int maxOpenPitContext) { + this.maxOpenPitContext = maxOpenPitContext; + } + private void setLowLevelCancellation(Boolean lowLevelCancellation) { this.lowLevelCancellation = lowLevelCancellation; } @@ -794,15 +801,36 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen final IndexShard shard = indexService.getShard(shardId.id()); final SearchOperationListener searchOperationListener = shard.getSearchOperationListener(); shard.awaitShardSearchActive(ignored -> { + Releasable decreasePitContexts = null; Engine.SearcherSupplier searcherSupplier = null; ReaderContext readerContext = null; try { + decreasePitContexts = openPitContexts::decrementAndGet; + if (openPitContexts.incrementAndGet() > maxOpenPitContext) { + throw new OpenSearchRejectedExecutionException( + "Trying to create too many Point In Time contexts. Must be less than or equal to: [" + + maxOpenPitContext + "]. " + "This limit can be set by changing the [" + + MAX_OPEN_PIT_CONTEXT.getKey() + "] setting."); + } searcherSupplier = shard.acquireSearcherSupplier(); + List nonVerboseSegments = shard.segments(false); + shard.routingEntry(); final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet()); - readerContext = new ReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false); + readerContext = new PitReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false, + shard.routingEntry(),nonVerboseSegments); final ReaderContext finalReaderContext = readerContext; searcherSupplier = null; // transfer ownership to reader context + searchOperationListener.onNewReaderContext(readerContext); + + searchOperationListener.onNewPitContext(finalReaderContext); + readerContext.addOnClose(decreasePitContexts); + decreasePitContexts = null; + + readerContext.addOnClose(() -> { + searchOperationListener.onFreeReaderContext(finalReaderContext); + searchOperationListener.onFreePitContext(finalReaderContext); + }); readerContext.addOnClose(() -> searchOperationListener.onFreeReaderContext(finalReaderContext)); putReaderContext(readerContext); readerContext = null; @@ -810,6 +838,8 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen } catch (Exception exc) { Releasables.closeWhileHandlingException(searcherSupplier, readerContext); listener.onFailure(exc); + } finally{ + Releasables.close(decreasePitContexts); } }); } @@ -927,6 +957,23 @@ public boolean freeReaderContext(ShardSearchContextId contextId) { return false; } + public void updatePitIdAndKeepAlive(TransportCreatePITAction.UpdatePITReaderRequest request, + ActionListener listener) { + if (getReaderContext(request.getSearchContextId()) != null) { + try { + PitReaderContext readerContext = getPitReaderContext(request.getSearchContextId()); + if(readerContext == null) { + throw new SearchContextMissingException(request.getSearchContextId()); + } + readerContext.updatePitId(request.getPitId()); + readerContext.tryUpdateKeepAlive(maxKeepAlive); + listener.onResponse(new TransportCreatePITAction.UpdatePitContextResponse(request.getPitId())); + } catch(Exception e) { + listener.onFailure(e); + } + } + } + public void freeAllScrollContexts() { for (ReaderContext readerContext : activeReaders.values()) { if (readerContext.scrollContext() != null) { @@ -1256,6 +1303,16 @@ public ResponseCollectorService getResponseCollectorService() { return this.responseCollectorService; } + public PitReaderContext getPitReaderContext(ShardSearchContextId id) { + for(Map.Entry context : activeReaders.entrySet()) { + if(context.getValue() instanceof PitReaderContext) { + if(context.getKey() == id.getId()) return (PitReaderContext) context.getValue(); + } + } + + return null; + } + class Reaper implements Runnable { @Override public void run() { diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java new file mode 100644 index 0000000000000..e7a29f8c0b68b --- /dev/null +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.internal; + +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.index.IndexService; +import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.Segment; +import org.opensearch.index.shard.IndexShard; + +import java.util.List; + +public class PitReaderContext extends ReaderContext { + + public ShardRouting getShardRouting() { + return shardRouting; + } + + private final ShardRouting shardRouting; + private final List segments; + private String pitId; + + public PitReaderContext(ShardSearchContextId id, IndexService indexService, + IndexShard indexShard, Engine.SearcherSupplier searcherSupplier, + long keepAliveInMillis, boolean singleSession, + ShardRouting shardRouting, List nonVerboseSegments) { + super(id, indexService, indexShard, searcherSupplier, keepAliveInMillis, singleSession); + this.shardRouting = shardRouting; + segments = nonVerboseSegments; + } + + public List getSegments() { + return segments; + } + + public void updatePitId(final String pitId) { + this.pitId = pitId; + } + +} diff --git a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java index 31d7a44d9bbd2..ca4a047d4adee 100644 --- a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java @@ -138,7 +138,7 @@ public Engine.Searcher acquireSearcher(String source) { return searcherSupplier.acquireSearcher(source); } - private void tryUpdateKeepAlive(long keepAlive) { + public void tryUpdateKeepAlive(long keepAlive) { this.keepAlive.updateAndGet(curr -> Math.max(curr, keepAlive)); } diff --git a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java new file mode 100644 index 0000000000000..3f458865e5351 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.opensearch.action.ActionFuture; +import org.opensearch.action.search.CreatePITAction; +import org.opensearch.action.search.CreatePITRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.search.builder.PointInTimeBuilder; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + + +import java.util.concurrent.ExecutionException; +import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +public class PitSingleNodeTests extends OpenSearchSingleNodeTestCase { + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + public void testCreatePIT() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index"}); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse pitResponse = execute.get(); + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + SearchResponse searchResponse = client().prepareSearch("index") + .setSize(2).setPointInTime(new PointInTimeBuilder(pitResponse.pointInTimeId()).setKeepAlive(TimeValue.timeValueDays(1))).get(); + assertHitCount(searchResponse, 1); + + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(2, service.getActiveContexts()); + service.doClose(); // this kills the keep-alive reaper we have to reset the node after this test + } + + public void testCreatePITWithShardReplicas() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index"}); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse pitResponse = execute.get(); + + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + SearchResponse searchResponse = client().prepareSearch("index") + .setSize(2).setPointInTime(new PointInTimeBuilder(pitResponse.pointInTimeId()).setKeepAlive(TimeValue.timeValueDays(1))).get(); + assertHitCount(searchResponse, 1); + + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(2, service.getActiveContexts()); + service.doClose(); + } + + public void testCreatePITOnValidIndexAndThenDeleteIndex() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index"}); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse pitResponse = execute.get(); + client().admin().indices().prepareDelete("index").get(); + + IndexNotFoundException ex = expectThrows( + IndexNotFoundException.class, + () -> { + SearchResponse searchResponse = client().prepareSearch("index") + .setSize(2).setPointInTime(new PointInTimeBuilder(pitResponse.pointInTimeId()).setKeepAlive(TimeValue.timeValueDays(1))).get(); + } + ); + assertTrue(ex.getMessage().contains("no such index [index]")); + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(0, service.getActiveContexts()); + service.doClose(); + } + + public void testCreatePITOnValidIndexAndThenCloseIndex() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().admin().indices().prepareClose("index").get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index"}); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + + ExecutionException ex = expectThrows( + ExecutionException.class, + () -> { + SearchResponse response = execute.get(); + }); + + assertTrue(ex.getMessage().contains("IndexClosedException")); + + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(0, service.getActiveContexts()); + service.doClose(); + } + + public void testCreatePITWithMultipleIndices() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + createIndex("index1", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index1").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index", "index1"}); + SearchService service = getInstanceFromNode(SearchService.class); + + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse response = execute.get(); + assertEquals(4,response.getSuccessfulShards()); + assertEquals(4, service.getActiveContexts()); + service.doClose(); + } + + public void testCreatePITWithNonExistentIndex() { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index", "index1"}); + SearchService service = getInstanceFromNode(SearchService.class); + + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + + ExecutionException ex = expectThrows( + ExecutionException.class, + () -> { + SearchResponse response = execute.get(); + }); + + assertTrue(ex.getMessage().contains("no such index [index1]")); + assertEquals(0, service.getActiveContexts()); + service.doClose(); + } + + public void testMaxOpenPitContexts() throws Exception { + createIndex("index"); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index"}); + SearchService service = getInstanceFromNode(SearchService.class); + + SearchResponse pitResponse = null; + for (int i = 0; i < SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY); i++) { + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + pitResponse = execute.get(); + logger.info(SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY) + " -- " + i); + } + ExecutionException ex = expectThrows( + ExecutionException.class, + () -> { + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse response = execute.get(); + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + }); + + assertTrue(ex.getMessage().contains("Trying to create too many Point In Time contexts. " + + "Must be less than or equal to: [" + SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY) + "]. " + + "This limit can be set by changing the [search.max_open_pit_context] setting.")); + + service.doClose(); + } +}