From 469738f10265906c51eae514024f21d7952b0b7c Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Thu, 24 Mar 2022 08:23:55 +0530 Subject: [PATCH 1/2] create pit changes Signed-off-by: Bharathwaj G --- .../action/search/CreatePITAction.java | 20 +++ .../opensearch/action/search/PITRequest.java | 136 ++++++++++++++++ .../opensearch/action/search/PITResponse.java | 57 +++++++ .../action/search/SearchRequest.java | 2 +- .../action/search/SearchTransportService.java | 7 + .../search/TransportCreatePITAction.java | 147 ++++++++++++++++++ .../common/settings/ClusterSettings.java | 1 + .../index/shard/SearchOperationListener.java | 35 +++++ .../action/search/RestCreatePITAction.java | 53 +++++++ .../org/opensearch/search/SearchService.java | 52 ++++++- .../search/internal/PitReaderContext.java | 42 +++++ 11 files changed, 544 insertions(+), 8 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/search/CreatePITAction.java create mode 100644 server/src/main/java/org/opensearch/action/search/PITRequest.java create mode 100644 server/src/main/java/org/opensearch/action/search/PITResponse.java create mode 100644 server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java create mode 100644 server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java create mode 100644 server/src/main/java/org/opensearch/search/internal/PitReaderContext.java diff --git a/server/src/main/java/org/opensearch/action/search/CreatePITAction.java b/server/src/main/java/org/opensearch/action/search/CreatePITAction.java new file mode 100644 index 0000000000000..22dfbafdf0b08 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/CreatePITAction.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionType; + +public class CreatePITAction extends ActionType { + public static final CreatePITAction INSTANCE = new CreatePITAction(); + public static final String NAME = "indices:data/read/pit"; + + private CreatePITAction() { + super(NAME, PITResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/PITRequest.java b/server/src/main/java/org/opensearch/action/search/PITRequest.java new file mode 100644 index 0000000000000..c188dd3b41400 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/PITRequest.java @@ -0,0 +1,136 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.IndicesRequest; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.common.Nullable; +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +public class PITRequest extends ActionRequest implements IndicesRequest.Replaceable { + public PITRequest(TimeValue keepAlive) { + this.keepAlive = keepAlive; + } + + public String getRouting() { + return routing; + } + + public String getPreference() { + return preference; + } + + public String[] getIndices() { + return indices; + } + + public IndicesOptions getIndicesOptions() { + return indicesOptions; + } + + public TimeValue getKeepAlive() { + return keepAlive; + } + + private TimeValue keepAlive; + + public PITRequest(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + routing = in.readOptionalString(); + preference = in.readOptionalString(); + keepAlive = in.readTimeValue(); + routing = in.readOptionalString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + indicesOptions.writeIndicesOptions(out); + out.writeOptionalString(preference); + out.writeTimeValue(keepAlive); + out.writeOptionalString(routing); + } + + public void setRouting(String routing) { + this.routing = routing; + } + + @Nullable + private String routing = null; + + public void setPreference(String preference) { + this.preference = preference; + } + + @Nullable + private String preference = null; + + public void setIndices(String[] indices) { + this.indices = indices; + } + + private String[] indices = Strings.EMPTY_ARRAY; + + public void setIndicesOptions(IndicesOptions indicesOptions) { + this.indicesOptions = Objects.requireNonNull(indicesOptions, "indicesOptions must not be null"); + } + + private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; + + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public String[] indices() { + return indices; + } + + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + + + public void setKeepAlive(TimeValue keepAlive) { + this.keepAlive = keepAlive; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new SearchTask(id, type, action, () -> "desc", parentTaskId, headers); + } + + /** + * Sets the indices the search will be executed on. + */ + @Override + public PITRequest indices(String... indices) { + SearchRequest.validateIndices(indices); + this.indices = indices; + return this; + } +} diff --git a/server/src/main/java/org/opensearch/action/search/PITResponse.java b/server/src/main/java/org/opensearch/action/search/PITResponse.java new file mode 100644 index 0000000000000..33df87bcb5112 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/PITResponse.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionResponse; +import org.opensearch.common.ParseField; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.StatusToXContentObject; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.rest.RestStatus; + +import java.io.IOException; + +public class PITResponse extends ActionResponse implements StatusToXContentObject { + + private static final ParseField ID = new ParseField("id"); + + public String getId() { + return id; + } + + private final String id; + + PITResponse(String id) { + this.id = id; + } + + public PITResponse(StreamInput streamInput) throws IOException { + id = streamInput.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(id); + + } + + @Override + public RestStatus status() { + return RestStatus.OK; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(ID.getPreferredName(), id); + builder.endObject(); + return builder; + } +} diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java index c4b97c35bc405..b816c926daacc 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java @@ -379,7 +379,7 @@ public SearchRequest indices(String... indices) { return this; } - private static void validateIndices(String... indices) { + protected static void validateIndices(String... indices) { Objects.requireNonNull(indices, "indices must not be null"); for (String index : indices) { Objects.requireNonNull(index, "index must not be null"); diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index 121de6d1c80da..6205bf0a05129 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -93,6 +93,7 @@ public class SearchTransportService { public static final String FETCH_ID_SCROLL_ACTION_NAME = "indices:data/read/search[phase/fetch/id/scroll]"; public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]"; public static final String QUERY_CAN_MATCH_NAME = "indices:data/read/search[can_match]"; + public static final String CREATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[create_context]"; private final TransportService transportService; private final BiFunction responseWrapper; @@ -545,6 +546,12 @@ public static void registerRequestHandler(TransportService transportService, Sea } ); TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, SearchService.CanMatchResponse::new); + transportService.registerRequestHandler(CREATE_READER_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, TransportCreatePITAction.CreateReaderContextRequest::new, + (request, channel, task) -> { + ChannelActionListener listener = new ChannelActionListener<>(channel, CREATE_READER_CONTEXT_ACTION_NAME, request); + searchService.openReaderContext(request.getShardId(), request.getKeepAlive(), ActionListener.wrap(r -> listener.onResponse(new TransportCreatePITAction.CreateReaderContextResponse(r)), listener::onFailure)); + }); + TransportActionProxy.registerProxyAction(transportService, CREATE_READER_CONTEXT_ACTION_NAME, TransportCreatePITAction.CreateReaderContextResponse::new); } /** diff --git a/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java b/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java new file mode 100644 index 0000000000000..991e4719fb56d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java @@ -0,0 +1,147 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.shard.ShardId; +import org.opensearch.search.SearchPhaseResult; +import org.opensearch.search.SearchService; +import org.opensearch.search.internal.ShardSearchContextId; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportResponseHandler; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +public class TransportCreatePITAction extends HandledTransportAction { + + public static final String CREATE_PIT = "create_pit"; + private SearchService searchService; + private final TransportService transportService; + private TransportSearchAction transportSearchAction; + + @Inject + public TransportCreatePITAction(SearchService searchService, + TransportService transportService, + ActionFilters actionFilters, + TransportSearchAction transportSearchAction) { + super(CreatePITAction.NAME, transportService, actionFilters, in -> new PITRequest(in)); + this.searchService = searchService; + this.transportService = transportService; + this.transportSearchAction = transportSearchAction; + } + + + @Override + protected void doExecute(Task task, PITRequest request, ActionListener listener) { + SearchRequest sr = new SearchRequest(request.getIndices()); + sr.preference(request.getPreference()); + sr.routing(request.getRouting()); + sr.indicesOptions(request.getIndicesOptions()); + transportSearchAction.executeRequest(task, sr, CREATE_PIT, true, + (searchTask, target, connection, searchPhaseResultActionListener) -> + /*TODO set a timeout based on "awaitActive"*/ + transportService.sendChildRequest(connection, SearchTransportService.CREATE_READER_CONTEXT_ACTION_NAME, + + new CreateReaderContextRequest(target.getShardId(), request.getKeepAlive()), searchTask, + + new TransportResponseHandler() { + @Override + public CreateReaderContextResponse read(StreamInput in) throws IOException { + return new CreateReaderContextResponse(in); + } + + @Override + public void handleResponse(CreateReaderContextResponse response) { + searchPhaseResultActionListener.onResponse(response); + } + + @Override + public void handleException(TransportException exp) { + + } + + @Override + public String executor() { + return "generic"; //TODO + } + }), + + new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + listener.onResponse(new PITResponse(searchResponse.pointInTimeId())); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + + } + + public static class CreateReaderContextRequest extends TransportRequest { + private final ShardId shardId; + private final TimeValue keepAlive; + + public CreateReaderContextRequest(ShardId shardId, TimeValue keepAlive) { + this.shardId = shardId; + this.keepAlive = keepAlive; + } + + public ShardId getShardId() { + return shardId; + } + + public TimeValue getKeepAlive() { + return keepAlive; + } + + public CreateReaderContextRequest(StreamInput in) throws IOException { + super(in); + this.shardId = new ShardId(in); + this.keepAlive = in.readTimeValue(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + out.writeTimeValue(keepAlive); + } + } + + public static class CreateReaderContextResponse extends SearchPhaseResult { + + public CreateReaderContextResponse(ShardSearchContextId shardSearchContextId) { + this.contextId = shardSearchContextId; + } + + public CreateReaderContextResponse(StreamInput in) throws IOException { + super(in); + contextId = new ShardSearchContextId(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + contextId.writeTo(out); + + } + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 308ab13a8a785..39897b62ee3ae 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -461,6 +461,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.KEEPALIVE_INTERVAL_SETTING, SearchService.MAX_KEEPALIVE_SETTING, SearchService.ALLOW_EXPENSIVE_QUERIES, + SearchService.MAX_OPEN_PIT_CONTEXT, MultiBucketConsumerService.MAX_BUCKET_SETTING, SearchService.LOW_LEVEL_CANCELLATION_SETTING, SearchService.MAX_OPEN_SCROLL_CONTEXT, diff --git a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java index da4b3a141dfc5..30ed4041f6b39 100644 --- a/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/opensearch/index/shard/SearchOperationListener.java @@ -129,6 +129,19 @@ default void onFreeScrollContext(ReaderContext readerContext) {} */ default void validateReaderContext(ReaderContext readerContext, TransportRequest transportRequest) {} + /** + * Executed when a new Point-In-Time {@link ReaderContext} was created + * @param readerContext the created reader context + */ + default void onNewPitContext(ReaderContext readerContext) {} + + /** + * Executed when a Point-In-Time search {@link SearchContext} is freed. + * This happens on deleteion of a Point-In-Time or on it's keep-alive expiring. + * @param readerContext the freed search context + */ + default void onFreePitContext(ReaderContext readerContext) {} + /** * A Composite listener that multiplexes calls to each of the listeners methods. */ @@ -263,5 +276,27 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest } ExceptionsHelper.reThrowIfNotNull(exception); } + + @Override + public void onNewPitContext(ReaderContext readerContext) { + for (SearchOperationListener listener : listeners) { + try { + listener.onNewPitContext(readerContext); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onNewPitContext listener [{}] failed", listener), e); + } + } + } + + @Override + public void onFreePitContext(ReaderContext readerContext) { + for (SearchOperationListener listener : listeners) { + try { + listener.onNewPitContext(readerContext); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("onFreePitContext listener [{}] failed", listener), e); + } + } + } } } diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java new file mode 100644 index 0000000000000..86b6e1887d75a --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.search; + +import org.opensearch.action.search.CreatePITAction; +import org.opensearch.action.search.PITRequest; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.Strings; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestCancellableNodeClient; +import org.opensearch.rest.action.RestStatusToXContentListener; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.POST; + +public class RestCreatePITAction extends BaseRestHandler { + @Override + public String getName() { + return "create_pit_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + PITRequest pitRequest = new PITRequest(request.paramAsTime("keep_alive", null)); + pitRequest.setIndicesOptions(IndicesOptions.fromRequest(request, pitRequest.indicesOptions())); + pitRequest.setPreference(request.param("preference")); + pitRequest.setRouting(request.param("routing")); + pitRequest.setIndices(Strings.splitStringByCommaToArray(request.param("index"))); + return channel -> { + RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel()); + cancelClient.execute(CreatePITAction.INSTANCE, pitRequest, new RestStatusToXContentListener<>(channel)); + }; + } + + @Override + public List routes() { + return unmodifiableList(Collections.singletonList( + new Route(POST, "/{index}/_pit"))); + } + +} diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index eda9153381046..cc135e273e74d 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -71,6 +71,7 @@ import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.Segment; import org.opensearch.index.query.InnerHitContextBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.MatchNoneQueryBuilder; @@ -108,13 +109,7 @@ import org.opensearch.search.fetch.subphase.FetchFieldsContext; import org.opensearch.search.fetch.subphase.ScriptFieldsContext.ScriptField; import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder; -import org.opensearch.search.internal.AliasFilter; -import org.opensearch.search.internal.InternalScrollSearchRequest; -import org.opensearch.search.internal.LegacyReaderContext; -import org.opensearch.search.internal.ReaderContext; -import org.opensearch.search.internal.SearchContext; -import org.opensearch.search.internal.ShardSearchContextId; -import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.search.internal.*; import org.opensearch.search.lookup.SearchLookup; import org.opensearch.search.profile.Profilers; import org.opensearch.search.query.QueryPhase; @@ -213,6 +208,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.NodeScope ); + public static final Setting MAX_OPEN_PIT_CONTEXT = + Setting.intSetting( + "search.max_open_pit_context", 500, 0, Property.Dynamic, Property.NodeScope); + public static final int DEFAULT_SIZE = 10; public static final int DEFAULT_FROM = 0; @@ -246,6 +245,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private volatile int maxOpenScrollContext; + private volatile int maxOpenPitContext; + private final Cancellable keepAliveReaper; private final AtomicLong idGenerator = new AtomicLong(); @@ -254,6 +255,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final MultiBucketConsumerService multiBucketConsumerService; + private final AtomicInteger openPitContexts = new AtomicInteger(); private final AtomicInteger openScrollContexts = new AtomicInteger(); private final String sessionId = UUIDs.randomBase64UUID(); @@ -300,6 +302,9 @@ public SearchService( maxOpenScrollContext = MAX_OPEN_SCROLL_CONTEXT.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_OPEN_SCROLL_CONTEXT, this::setMaxOpenScrollContext); + maxOpenPitContext = MAX_OPEN_PIT_CONTEXT.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_OPEN_PIT_CONTEXT, this::setMaxOpenPitContext); + lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation); } @@ -344,6 +349,10 @@ private void setMaxOpenScrollContext(int maxOpenScrollContext) { this.maxOpenScrollContext = maxOpenScrollContext; } + private void setMaxOpenPitContext(int maxOpenPitContext) { + this.maxOpenPitContext = maxOpenPitContext; + } + private void setLowLevelCancellation(Boolean lowLevelCancellation) { this.lowLevelCancellation = lowLevelCancellation; } @@ -790,15 +799,37 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen final IndexShard shard = indexService.getShard(shardId.id()); final SearchOperationListener searchOperationListener = shard.getSearchOperationListener(); shard.awaitShardSearchActive(ignored -> { + Releasable decreasePitContexts = null; Engine.SearcherSupplier searcherSupplier = null; ReaderContext readerContext = null; try { + decreasePitContexts = openPitContexts::decrementAndGet; + if (openPitContexts.incrementAndGet() > maxOpenPitContext) { + throw new OpenSearchRejectedExecutionException( + "Trying to create too many Point In Time contexts. Must be less than or equal to: [" + + maxOpenPitContext + "]. " + "This limit can be set by changing the [" + + MAX_OPEN_PIT_CONTEXT.getKey() + "] setting."); + } searcherSupplier = shard.acquireSearcherSupplier(); + List nonVerboseSegments = shard.segments(false); + shard.routingEntry(); final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet()); + readerContext = new PitReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false, + shard.routingEntry(),nonVerboseSegments); readerContext = new ReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false); final ReaderContext finalReaderContext = readerContext; searcherSupplier = null; // transfer ownership to reader context + searchOperationListener.onNewReaderContext(readerContext); + + searchOperationListener.onNewPitContext(finalReaderContext); + readerContext.addOnClose(decreasePitContexts); + decreasePitContexts = null; + + readerContext.addOnClose(() -> { + searchOperationListener.onFreeReaderContext(finalReaderContext); + searchOperationListener.onFreePitContext(finalReaderContext); + }); readerContext.addOnClose(() -> searchOperationListener.onFreeReaderContext(finalReaderContext)); putReaderContext(readerContext); readerContext = null; @@ -806,6 +837,8 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen } catch (Exception exc) { Releasables.closeWhileHandlingException(searcherSupplier, readerContext); listener.onFailure(exc); + } finally{ + Releasables.close(decreasePitContexts); } }); } @@ -1238,6 +1271,11 @@ public ResponseCollectorService getResponseCollectorService() { return this.responseCollectorService; } + public PitReaderContext getPitReaderContext(ShardSearchContextId id) { + ReaderContext readerContext = getReaderContext(id); + return (PitReaderContext) readerContext; + } + class Reaper implements Runnable { @Override public void run() { diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java new file mode 100644 index 0000000000000..103d5104a84c0 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.internal; + +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.index.IndexService; +import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.Segment; +import org.opensearch.index.shard.IndexShard; + +import java.util.List; + +public class PitReaderContext extends ReaderContext { + + public ShardRouting getShardRouting() { + return shardRouting; + } + + private final ShardRouting shardRouting; + private final List segments; + + public PitReaderContext(ShardSearchContextId id, IndexService indexService, + IndexShard indexShard, Engine.SearcherSupplier searcherSupplier, + long keepAliveInMillis, boolean singleSession, + ShardRouting shardRouting, List nonVerboseSegments) { + super(id, indexService, indexShard, searcherSupplier, keepAliveInMillis, singleSession); + this.shardRouting = shardRouting; + segments = nonVerboseSegments; + } + + public List getSegments() { + return segments; + } + + +} From 72bd39b99be17682c41f7712111fd94e5b979091 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Wed, 30 Mar 2022 15:01:40 +0530 Subject: [PATCH 2/2] two phase create pit Signed-off-by: Bharathwaj G --- .../org/opensearch/action/ActionModule.java | 21 +- .../action/search/CreatePITAction.java | 4 +- ...{PITRequest.java => CreatePITRequest.java} | 75 ++--- .../opensearch/action/search/PITResponse.java | 57 ---- .../action/search/SearchTransportService.java | 63 ++++- .../search/TransportCreatePITAction.java | 262 ++++++++++++++---- .../action/search/RestCreatePITAction.java | 17 +- .../org/opensearch/search/SearchService.java | 31 ++- .../search/internal/PitReaderContext.java | 4 + .../search/internal/ReaderContext.java | 2 +- .../opensearch/search/PitSingleNodeTests.java | 180 ++++++++++++ 11 files changed, 536 insertions(+), 180 deletions(-) rename server/src/main/java/org/opensearch/action/search/{PITRequest.java => CreatePITRequest.java} (84%) delete mode 100644 server/src/main/java/org/opensearch/action/search/PITResponse.java create mode 100644 server/src/test/java/org/opensearch/search/PitSingleNodeTests.java diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 8e31aa23d88cf..082f9ae688b92 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -235,14 +235,7 @@ import org.opensearch.action.ingest.SimulatePipelineTransportAction; import org.opensearch.action.main.MainAction; import org.opensearch.action.main.TransportMainAction; -import org.opensearch.action.search.ClearScrollAction; -import org.opensearch.action.search.MultiSearchAction; -import org.opensearch.action.search.SearchAction; -import org.opensearch.action.search.SearchScrollAction; -import org.opensearch.action.search.TransportClearScrollAction; -import org.opensearch.action.search.TransportMultiSearchAction; -import org.opensearch.action.search.TransportSearchAction; -import org.opensearch.action.search.TransportSearchScrollAction; +import org.opensearch.action.search.*; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.support.DestructiveOperations; @@ -398,12 +391,7 @@ import org.opensearch.rest.action.ingest.RestGetPipelineAction; import org.opensearch.rest.action.ingest.RestPutPipelineAction; import org.opensearch.rest.action.ingest.RestSimulatePipelineAction; -import org.opensearch.rest.action.search.RestClearScrollAction; -import org.opensearch.rest.action.search.RestCountAction; -import org.opensearch.rest.action.search.RestExplainAction; -import org.opensearch.rest.action.search.RestMultiSearchAction; -import org.opensearch.rest.action.search.RestSearchAction; -import org.opensearch.rest.action.search.RestSearchScrollAction; +import org.opensearch.rest.action.search.*; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.usage.UsageService; @@ -660,6 +648,7 @@ public void reg actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class); actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class); actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class); + actions.register(CreatePITAction.INSTANCE, TransportCreatePITAction.class); return unmodifiableMap(actions.getRegistry()); } @@ -832,6 +821,10 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestRepositoriesAction()); registerHandler.accept(new RestSnapshotAction()); registerHandler.accept(new RestTemplatesAction()); + + // Point in time API + registerHandler.accept(new RestCreatePITAction()); + for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers( settings, diff --git a/server/src/main/java/org/opensearch/action/search/CreatePITAction.java b/server/src/main/java/org/opensearch/action/search/CreatePITAction.java index 22dfbafdf0b08..69b7b4bb2d61f 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePITAction.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePITAction.java @@ -10,11 +10,11 @@ import org.opensearch.action.ActionType; -public class CreatePITAction extends ActionType { +public class CreatePITAction extends ActionType { public static final CreatePITAction INSTANCE = new CreatePITAction(); public static final String NAME = "indices:data/read/pit"; private CreatePITAction() { - super(NAME, PITResponse::new); + super(NAME, SearchResponse::new); } } diff --git a/server/src/main/java/org/opensearch/action/search/PITRequest.java b/server/src/main/java/org/opensearch/action/search/CreatePITRequest.java similarity index 84% rename from server/src/main/java/org/opensearch/action/search/PITRequest.java rename to server/src/main/java/org/opensearch/action/search/CreatePITRequest.java index c188dd3b41400..eb077969fc0d7 100644 --- a/server/src/main/java/org/opensearch/action/search/PITRequest.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePITRequest.java @@ -24,9 +24,43 @@ import java.util.Map; import java.util.Objects; -public class PITRequest extends ActionRequest implements IndicesRequest.Replaceable { - public PITRequest(TimeValue keepAlive) { +public class CreatePITRequest extends ActionRequest implements IndicesRequest.Replaceable { + + private TimeValue keepAlive; + private final boolean allowPartialPitCreation; + @Nullable + private String routing = null; + @Nullable + private String preference = null; + private String[] indices = Strings.EMPTY_ARRAY; + private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; + + + public CreatePITRequest(TimeValue keepAlive, boolean allowPartialPitCreation) { this.keepAlive = keepAlive; + this.allowPartialPitCreation = allowPartialPitCreation; + } + + public CreatePITRequest(StreamInput in) throws IOException { + super(in); + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + routing = in.readOptionalString(); + preference = in.readOptionalString(); + keepAlive = in.readTimeValue(); + routing = in.readOptionalString(); + allowPartialPitCreation = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + indicesOptions.writeIndicesOptions(out); + out.writeOptionalString(preference); + out.writeTimeValue(keepAlive); + out.writeOptionalString(routing); + out.writeBoolean(allowPartialPitCreation); } public String getRouting() { @@ -49,55 +83,26 @@ public TimeValue getKeepAlive() { return keepAlive; } - private TimeValue keepAlive; - - public PITRequest(StreamInput in) throws IOException { - super(in); - indices = in.readStringArray(); - indicesOptions = IndicesOptions.readIndicesOptions(in); - routing = in.readOptionalString(); - preference = in.readOptionalString(); - keepAlive = in.readTimeValue(); - routing = in.readOptionalString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeStringArray(indices); - indicesOptions.writeIndicesOptions(out); - out.writeOptionalString(preference); - out.writeTimeValue(keepAlive); - out.writeOptionalString(routing); + public boolean isAllowPartialPitCreation() { + return allowPartialPitCreation; } public void setRouting(String routing) { this.routing = routing; } - @Nullable - private String routing = null; - public void setPreference(String preference) { this.preference = preference; } - @Nullable - private String preference = null; - public void setIndices(String[] indices) { this.indices = indices; } - private String[] indices = Strings.EMPTY_ARRAY; - public void setIndicesOptions(IndicesOptions indicesOptions) { this.indicesOptions = Objects.requireNonNull(indicesOptions, "indicesOptions must not be null"); } - private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS; - - @Override public ActionRequestValidationException validate() { return null; @@ -113,8 +118,6 @@ public IndicesOptions indicesOptions() { return indicesOptions; } - - public void setKeepAlive(TimeValue keepAlive) { this.keepAlive = keepAlive; } @@ -128,7 +131,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, * Sets the indices the search will be executed on. */ @Override - public PITRequest indices(String... indices) { + public CreatePITRequest indices(String... indices) { SearchRequest.validateIndices(indices); this.indices = indices; return this; diff --git a/server/src/main/java/org/opensearch/action/search/PITResponse.java b/server/src/main/java/org/opensearch/action/search/PITResponse.java deleted file mode 100644 index 33df87bcb5112..0000000000000 --- a/server/src/main/java/org/opensearch/action/search/PITResponse.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.search; - -import org.opensearch.action.ActionResponse; -import org.opensearch.common.ParseField; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.xcontent.StatusToXContentObject; -import org.opensearch.common.xcontent.XContentBuilder; -import org.opensearch.rest.RestStatus; - -import java.io.IOException; - -public class PITResponse extends ActionResponse implements StatusToXContentObject { - - private static final ParseField ID = new ParseField("id"); - - public String getId() { - return id; - } - - private final String id; - - PITResponse(String id) { - this.id = id; - } - - public PITResponse(StreamInput streamInput) throws IOException { - id = streamInput.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(id); - - } - - @Override - public RestStatus status() { - return RestStatus.OK; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(ID.getPreferredName(), id); - builder.endObject(); - return builder; - } -} diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index 6205bf0a05129..ddadf25f75d14 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -94,6 +94,7 @@ public class SearchTransportService { public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]"; public static final String QUERY_CAN_MATCH_NAME = "indices:data/read/search[can_match]"; public static final String CREATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[create_context]"; + public static final String UPDATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[update_context]"; private final TransportService transportService; private final BiFunction responseWrapper; @@ -141,6 +142,20 @@ public void sendFreeContext( ); } + public void updatePitContext( + Transport.Connection connection, + TransportCreatePITAction.UpdatePITReaderRequest request, + ActionListener actionListener) { + transportService.sendRequest( + connection, + UPDATE_READER_CONTEXT_ACTION_NAME, + request, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler(actionListener, + TransportCreatePITAction.UpdatePitContextResponse::new) + ); + } + public void sendCanMatch( Transport.Connection connection, final ShardSearchRequest request, @@ -308,7 +323,7 @@ public Map getPendingSearchRequests() { return new HashMap<>(clientConnections); } - static class ScrollFreeContextRequest extends TransportRequest { + static class ScrollFreeContextRequest extends TransportRequest { private ShardSearchContextId contextId; ScrollFreeContextRequest(ShardSearchContextId contextId) { @@ -546,12 +561,50 @@ public static void registerRequestHandler(TransportService transportService, Sea } ); TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, SearchService.CanMatchResponse::new); - transportService.registerRequestHandler(CREATE_READER_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, TransportCreatePITAction.CreateReaderContextRequest::new, + transportService.registerRequestHandler( + CREATE_READER_CONTEXT_ACTION_NAME, + ThreadPool.Names.SAME, + TransportCreatePITAction.CreateReaderContextRequest::new, (request, channel, task) -> { - ChannelActionListener listener = new ChannelActionListener<>(channel, CREATE_READER_CONTEXT_ACTION_NAME, request); - searchService.openReaderContext(request.getShardId(), request.getKeepAlive(), ActionListener.wrap(r -> listener.onResponse(new TransportCreatePITAction.CreateReaderContextResponse(r)), listener::onFailure)); + ChannelActionListener< + TransportCreatePITAction.CreateReaderContextResponse, + TransportCreatePITAction.CreateReaderContextRequest> + listener = + new ChannelActionListener<>(channel, CREATE_READER_CONTEXT_ACTION_NAME, request); + searchService.openReaderContext( + request.getShardId(), + request.getKeepAlive(), + ActionListener.wrap( + r -> + listener.onResponse( + new TransportCreatePITAction.CreateReaderContextResponse(r)), + listener::onFailure)); }); - TransportActionProxy.registerProxyAction(transportService, CREATE_READER_CONTEXT_ACTION_NAME, TransportCreatePITAction.CreateReaderContextResponse::new); + TransportActionProxy.registerProxyAction( + transportService, + CREATE_READER_CONTEXT_ACTION_NAME, + TransportCreatePITAction.CreateReaderContextResponse::new); + + transportService.registerRequestHandler( + UPDATE_READER_CONTEXT_ACTION_NAME, + ThreadPool.Names.SAME, + TransportCreatePITAction.UpdatePITReaderRequest::new, + (request, channel, task) -> { + ChannelActionListener listener = + new ChannelActionListener<>(channel, UPDATE_READER_CONTEXT_ACTION_NAME, request); + searchService.updatePitIdAndKeepAlive(request, + ActionListener.wrap( + r -> listener.onResponse(r), + listener::onFailure + )); + } + ); + TransportActionProxy.registerProxyAction( + transportService, + UPDATE_READER_CONTEXT_ACTION_NAME, + TransportCreatePITAction.UpdatePitContextResponse::new); + } /** diff --git a/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java b/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java index 991e4719fb56d..01b4104cb5ab9 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java @@ -9,115 +9,233 @@ package org.opensearch.action.search; import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Strings; import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.shard.ShardId; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchService; +import org.opensearch.search.SearchShardTarget; import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.tasks.Task; -import org.opensearch.transport.TransportException; -import org.opensearch.transport.TransportRequest; -import org.opensearch.transport.TransportResponseHandler; -import org.opensearch.transport.TransportService; +import org.opensearch.transport.*; import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.stream.Collectors; -public class TransportCreatePITAction extends HandledTransportAction { +/** + * Transport action for creating PIT reader context + */ +public class TransportCreatePITAction extends HandledTransportAction { public static final String CREATE_PIT = "create_pit"; + private final TimeValue CREATE_PIT_TEMPORARY_KEEP_ALIVE = new TimeValue(30, TimeUnit.SECONDS); private SearchService searchService; private final TransportService transportService; + private final SearchTransportService searchTransportService; + private final ClusterService clusterService; private TransportSearchAction transportSearchAction; + private final NamedWriteableRegistry namedWriteableRegistry; @Inject - public TransportCreatePITAction(SearchService searchService, - TransportService transportService, - ActionFilters actionFilters, - TransportSearchAction transportSearchAction) { - super(CreatePITAction.NAME, transportService, actionFilters, in -> new PITRequest(in)); + public TransportCreatePITAction( + SearchService searchService, + TransportService transportService, + ActionFilters actionFilters, + SearchTransportService searchTransportService, + ClusterService clusterService, + TransportSearchAction transportSearchAction, + NamedWriteableRegistry namedWriteableRegistry) { + super(CreatePITAction.NAME, transportService, actionFilters, in -> new CreatePITRequest(in)); this.searchService = searchService; this.transportService = transportService; + this.searchTransportService = searchTransportService; + this.clusterService = clusterService; this.transportSearchAction = transportSearchAction; + this.namedWriteableRegistry = namedWriteableRegistry; } - @Override - protected void doExecute(Task task, PITRequest request, ActionListener listener) { - SearchRequest sr = new SearchRequest(request.getIndices()); - sr.preference(request.getPreference()); - sr.routing(request.getRouting()); - sr.indicesOptions(request.getIndicesOptions()); - transportSearchAction.executeRequest(task, sr, CREATE_PIT, true, - (searchTask, target, connection, searchPhaseResultActionListener) -> - /*TODO set a timeout based on "awaitActive"*/ - transportService.sendChildRequest(connection, SearchTransportService.CREATE_READER_CONTEXT_ACTION_NAME, - - new CreateReaderContextRequest(target.getShardId(), request.getKeepAlive()), searchTask, - - new TransportResponseHandler() { + protected void doExecute(Task task, CreatePITRequest request, ActionListener listener) { + SearchRequest searchRequest = new SearchRequest(request.getIndices()); + searchRequest.preference(request.getPreference()); + searchRequest.routing(request.getRouting()); + searchRequest.indicesOptions(request.getIndicesOptions()); + searchRequest.allowPartialSearchResults(request.isAllowPartialPitCreation()); + + final StepListener createPitListener = new StepListener(); + + final ActionListener updatePitIdListener = + new ActionListener() { @Override - public CreateReaderContextResponse read(StreamInput in) throws IOException { - return new CreateReaderContextResponse(in); + public void onResponse(SearchResponse searchResponse) { + listener.onResponse(searchResponse); } @Override - public void handleResponse(CreateReaderContextResponse response) { - searchPhaseResultActionListener.onResponse(response); + public void onFailure(Exception e) { + listener.onFailure(e); } + }; + /** + * Phase 1 of create PIT request : Create PIT reader contexts in the associated shards with a + * temporary keep alive + */ + transportSearchAction.executeRequest( + task, + searchRequest, + CREATE_PIT, + true, + new TransportSearchAction.SinglePhaseSearchAction() { @Override - public void handleException(TransportException exp) { + public void executeOnShardTarget( + SearchTask searchTask, + SearchShardTarget target, + Transport.Connection connection, + ActionListener searchPhaseResultActionListener) { + transportService.sendChildRequest( + connection, + SearchTransportService.CREATE_READER_CONTEXT_ACTION_NAME, + new CreateReaderContextRequest(target.getShardId(), CREATE_PIT_TEMPORARY_KEEP_ALIVE), + searchTask, + new TransportResponseHandler() { + @Override + public CreateReaderContextResponse read(StreamInput in) throws IOException { + return new CreateReaderContextResponse(in); + } - } + @Override + public void handleResponse(CreateReaderContextResponse response) { + searchPhaseResultActionListener.onResponse(response); + } - @Override - public String executor() { - return "generic"; //TODO + @Override + public void handleException(TransportException exp) { + searchPhaseResultActionListener.onFailure(exp); + } + + @Override + public String executor() { + return "generic"; // TODO + } + }); } - }), + }, + createPitListener); + + /** + * Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request + * Fail create pit operation if any of the updates in this phase are failed + */ + createPitListener.whenComplete( + searchResponse -> { + SearchContextId contextId = + SearchContextId.decode(namedWriteableRegistry, searchResponse.pointInTimeId()); + final StepListener> lookupListener = + getConnectionLookupListener(contextId); + lookupListener.whenComplete( + nodelookup -> { + final ActionListener + groupedActionListener = + getGroupedListener( + updatePitIdListener, searchResponse, contextId.shards().size()); + for (Map.Entry entry : + contextId.shards().entrySet()) { + DiscoveryNode node = + nodelookup.apply( + entry.getValue().getClusterAlias(), entry.getValue().getNode()); + try { + final Transport.Connection connection = + searchTransportService.getConnection( + entry.getValue().getClusterAlias(), node); + searchTransportService.updatePitContext( + connection, + new UpdatePITReaderRequest( + entry.getValue().getSearchContextId(), + searchResponse.pointInTimeId(), + request.getKeepAlive().millis()), + groupedActionListener); + } catch (Exception e) { + groupedActionListener.onFailure(e); + } + } + }, + updatePitIdListener::onFailure); + }, + updatePitIdListener::onFailure); + } - new ActionListener() { - @Override - public void onResponse(SearchResponse searchResponse) { - listener.onResponse(new PITResponse(searchResponse.pointInTimeId())); - } + private StepListener> getConnectionLookupListener( + SearchContextId contextId) { + ClusterState state = clusterService.state(); - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + final Set clusters = + contextId.shards().values().stream() + .filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false) + .map(SearchContextIdForNode::getClusterAlias) + .collect(Collectors.toSet()); + final StepListener> lookupListener = + new StepListener<>(); + + if (clusters.isEmpty() == false) { + searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener); + } else { + lookupListener.onResponse((cluster, nodeId) -> state.getNodes().get(nodeId)); + } + return lookupListener; + } + + private ActionListener getGroupedListener( + ActionListener updatePitIdListener, SearchResponse searchResponse, int size) { + return new GroupedActionListener( + new ActionListener>() { + @Override + public void onResponse(final Collection responses) { + updatePitIdListener.onResponse(searchResponse); + } + @Override + public void onFailure(final Exception e) { + updatePitIdListener.onFailure(e); + } + }, + size); } public static class CreateReaderContextRequest extends TransportRequest { private final ShardId shardId; private final TimeValue keepAlive; - public CreateReaderContextRequest(ShardId shardId, TimeValue keepAlive) { this.shardId = shardId; this.keepAlive = keepAlive; } - public ShardId getShardId() { return shardId; } - public TimeValue getKeepAlive() { return keepAlive; } - public CreateReaderContextRequest(StreamInput in) throws IOException { super(in); this.shardId = new ShardId(in); this.keepAlive = in.readTimeValue(); } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -127,21 +245,61 @@ public void writeTo(StreamOutput out) throws IOException { } public static class CreateReaderContextResponse extends SearchPhaseResult { - public CreateReaderContextResponse(ShardSearchContextId shardSearchContextId) { this.contextId = shardSearchContextId; } - public CreateReaderContextResponse(StreamInput in) throws IOException { super(in); contextId = new ShardSearchContextId(in); } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); contextId.writeTo(out); + } + } + + public static class UpdatePITReaderRequest extends TransportRequest { + private String PitId; + private long keepAlive; + private ShardSearchContextId searchContextId; + UpdatePITReaderRequest(ShardSearchContextId searchContextId, String PitId, long keepAlive) { + this.PitId = PitId; + this.searchContextId = searchContextId; + this.keepAlive = keepAlive; + } + UpdatePITReaderRequest(StreamInput in) throws IOException { + super(in); + } + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + public ShardSearchContextId getSearchContextId() { + return searchContextId; + } + public String getPitId() { + return PitId; + } + public String id() { + return this.getPitId(); + } + } + public static class UpdatePitContextResponse extends TransportResponse { + private String pitId; + UpdatePitContextResponse(StreamInput in) throws IOException { + pitId = in.readString(); + } + public UpdatePitContextResponse(String pitId) { + this.pitId = pitId; + } + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(pitId); + } + public String getPitId() { + return pitId; } } } diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java index 86b6e1887d75a..198ad06d7fbff 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePITAction.java @@ -9,7 +9,7 @@ package org.opensearch.rest.action.search; import org.opensearch.action.search.CreatePITAction; -import org.opensearch.action.search.PITRequest; +import org.opensearch.action.search.CreatePITRequest; import org.opensearch.action.support.IndicesOptions; import org.opensearch.client.node.NodeClient; import org.opensearch.common.Strings; @@ -33,14 +33,17 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - PITRequest pitRequest = new PITRequest(request.paramAsTime("keep_alive", null)); - pitRequest.setIndicesOptions(IndicesOptions.fromRequest(request, pitRequest.indicesOptions())); - pitRequest.setPreference(request.param("preference")); - pitRequest.setRouting(request.param("routing")); - pitRequest.setIndices(Strings.splitStringByCommaToArray(request.param("index"))); + boolean allowPartialPitCreation = request.paramAsBoolean("allow_partial_pit_creation", false); + + CreatePITRequest createPitRequest = + new CreatePITRequest(request.paramAsTime("keep_alive", null), allowPartialPitCreation); + createPitRequest.setIndicesOptions(IndicesOptions.fromRequest(request, createPitRequest.indicesOptions())); + createPitRequest.setPreference(request.param("preference")); + createPitRequest.setRouting(request.param("routing")); + createPitRequest.setIndices(Strings.splitStringByCommaToArray(request.param("index"))); return channel -> { RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel()); - cancelClient.execute(CreatePITAction.INSTANCE, pitRequest, new RestStatusToXContentListener<>(channel)); + cancelClient.execute(CreatePITAction.INSTANCE, createPitRequest, new RestStatusToXContentListener<>(channel)); }; } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 221a2c9a1a1d7..729cd2801d899 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -41,9 +41,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; import org.opensearch.action.OriginalIndices; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchShardTask; -import org.opensearch.action.search.SearchType; +import org.opensearch.action.search.*; import org.opensearch.action.support.TransportActions; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.service.ClusterService; @@ -820,7 +818,6 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet()); readerContext = new PitReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false, shard.routingEntry(),nonVerboseSegments); - readerContext = new ReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false); final ReaderContext finalReaderContext = readerContext; searcherSupplier = null; // transfer ownership to reader context @@ -960,6 +957,23 @@ public boolean freeReaderContext(ShardSearchContextId contextId) { return false; } + public void updatePitIdAndKeepAlive(TransportCreatePITAction.UpdatePITReaderRequest request, + ActionListener listener) { + if (getReaderContext(request.getSearchContextId()) != null) { + try { + PitReaderContext readerContext = getPitReaderContext(request.getSearchContextId()); + if(readerContext == null) { + throw new SearchContextMissingException(request.getSearchContextId()); + } + readerContext.updatePitId(request.getPitId()); + readerContext.tryUpdateKeepAlive(maxKeepAlive); + listener.onResponse(new TransportCreatePITAction.UpdatePitContextResponse(request.getPitId())); + } catch(Exception e) { + listener.onFailure(e); + } + } + } + public void freeAllScrollContexts() { for (ReaderContext readerContext : activeReaders.values()) { if (readerContext.scrollContext() != null) { @@ -1290,8 +1304,13 @@ public ResponseCollectorService getResponseCollectorService() { } public PitReaderContext getPitReaderContext(ShardSearchContextId id) { - ReaderContext readerContext = getReaderContext(id); - return (PitReaderContext) readerContext; + for(Map.Entry context : activeReaders.entrySet()) { + if(context.getValue() instanceof PitReaderContext) { + if(context.getKey() == id.getId()) return (PitReaderContext) context.getValue(); + } + } + + return null; } class Reaper implements Runnable { diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java index 103d5104a84c0..e7a29f8c0b68b 100644 --- a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -24,6 +24,7 @@ public ShardRouting getShardRouting() { private final ShardRouting shardRouting; private final List segments; + private String pitId; public PitReaderContext(ShardSearchContextId id, IndexService indexService, IndexShard indexShard, Engine.SearcherSupplier searcherSupplier, @@ -38,5 +39,8 @@ public List getSegments() { return segments; } + public void updatePitId(final String pitId) { + this.pitId = pitId; + } } diff --git a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java index 31d7a44d9bbd2..ca4a047d4adee 100644 --- a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java @@ -138,7 +138,7 @@ public Engine.Searcher acquireSearcher(String source) { return searcherSupplier.acquireSearcher(source); } - private void tryUpdateKeepAlive(long keepAlive) { + public void tryUpdateKeepAlive(long keepAlive) { this.keepAlive.updateAndGet(curr -> Math.max(curr, keepAlive)); } diff --git a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java new file mode 100644 index 0000000000000..3f458865e5351 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.opensearch.action.ActionFuture; +import org.opensearch.action.search.CreatePITAction; +import org.opensearch.action.search.CreatePITRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.search.builder.PointInTimeBuilder; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + + +import java.util.concurrent.ExecutionException; +import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +public class PitSingleNodeTests extends OpenSearchSingleNodeTestCase { + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + public void testCreatePIT() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index"}); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse pitResponse = execute.get(); + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + SearchResponse searchResponse = client().prepareSearch("index") + .setSize(2).setPointInTime(new PointInTimeBuilder(pitResponse.pointInTimeId()).setKeepAlive(TimeValue.timeValueDays(1))).get(); + assertHitCount(searchResponse, 1); + + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(2, service.getActiveContexts()); + service.doClose(); // this kills the keep-alive reaper we have to reset the node after this test + } + + public void testCreatePITWithShardReplicas() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index"}); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse pitResponse = execute.get(); + + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + SearchResponse searchResponse = client().prepareSearch("index") + .setSize(2).setPointInTime(new PointInTimeBuilder(pitResponse.pointInTimeId()).setKeepAlive(TimeValue.timeValueDays(1))).get(); + assertHitCount(searchResponse, 1); + + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(2, service.getActiveContexts()); + service.doClose(); + } + + public void testCreatePITOnValidIndexAndThenDeleteIndex() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index"}); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse pitResponse = execute.get(); + client().admin().indices().prepareDelete("index").get(); + + IndexNotFoundException ex = expectThrows( + IndexNotFoundException.class, + () -> { + SearchResponse searchResponse = client().prepareSearch("index") + .setSize(2).setPointInTime(new PointInTimeBuilder(pitResponse.pointInTimeId()).setKeepAlive(TimeValue.timeValueDays(1))).get(); + } + ); + assertTrue(ex.getMessage().contains("no such index [index]")); + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(0, service.getActiveContexts()); + service.doClose(); + } + + public void testCreatePITOnValidIndexAndThenCloseIndex() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + client().admin().indices().prepareClose("index").get(); + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index"}); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + + ExecutionException ex = expectThrows( + ExecutionException.class, + () -> { + SearchResponse response = execute.get(); + }); + + assertTrue(ex.getMessage().contains("IndexClosedException")); + + SearchService service = getInstanceFromNode(SearchService.class); + assertEquals(0, service.getActiveContexts()); + service.doClose(); + } + + public void testCreatePITWithMultipleIndices() throws ExecutionException, InterruptedException { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + createIndex("index1", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index1").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index", "index1"}); + SearchService service = getInstanceFromNode(SearchService.class); + + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse response = execute.get(); + assertEquals(4,response.getSuccessfulShards()); + assertEquals(4, service.getActiveContexts()); + service.doClose(); + } + + public void testCreatePITWithNonExistentIndex() { + createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build()); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index", "index1"}); + SearchService service = getInstanceFromNode(SearchService.class); + + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + + ExecutionException ex = expectThrows( + ExecutionException.class, + () -> { + SearchResponse response = execute.get(); + }); + + assertTrue(ex.getMessage().contains("no such index [index1]")); + assertEquals(0, service.getActiveContexts()); + service.doClose(); + } + + public void testMaxOpenPitContexts() throws Exception { + createIndex("index"); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + + CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[]{"index"}); + SearchService service = getInstanceFromNode(SearchService.class); + + SearchResponse pitResponse = null; + for (int i = 0; i < SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY); i++) { + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + pitResponse = execute.get(); + logger.info(SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY) + " -- " + i); + } + ExecutionException ex = expectThrows( + ExecutionException.class, + () -> { + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, request); + SearchResponse response = execute.get(); + client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + }); + + assertTrue(ex.getMessage().contains("Trying to create too many Point In Time contexts. " + + "Must be less than or equal to: [" + SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY) + "]. " + + "This limit can be set by changing the [search.max_open_pit_context] setting.")); + + service.doClose(); + } +}