From 7340feb9a84471cf931d13fb39a46aadadf421e8 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Mon, 9 May 2022 12:25:34 +0530 Subject: [PATCH 1/2] Changes to include rest high level client tests and addressing comments Signed-off-by: Bharathwaj G --- .../opensearch/client/RequestConverters.java | 22 ++- .../client/RestHighLevelClient.java | 42 +++++ .../java/org/opensearch/client/PitIT.java | 61 ++++++++ .../client/RequestConvertersTests.java | 23 +++ .../java/org/opensearch/client/SearchIT.java | 35 +++++ .../rest-api-spec/api/create_pit.json | 43 ++++++ .../search/slice/SearchSliceIT.java | 7 +- .../action/search/CreatePitAction.java | 2 +- .../action/search/CreatePitController.java | 146 ++++++++++-------- .../action/search/CreatePitRequest.java | 21 ++- .../action/search/CreatePitResponse.java | 120 ++++++++++++-- .../search/TransportCreatePitAction.java | 23 +-- .../common/settings/ClusterSettings.java | 2 +- .../common/settings/IndexScopedSettings.java | 1 + .../org/opensearch/index/IndexSettings.java | 28 ++++ .../action/search/RestCreatePitAction.java | 7 +- .../search/DefaultSearchContext.java | 18 +++ .../org/opensearch/search/SearchService.java | 5 +- .../search/CreatePitControllerTests.java | 17 +- .../search/DefaultSearchContextTests.java | 45 ++++++ .../opensearch/search/PitSingleNodeTests.java | 2 +- 21 files changed, 555 insertions(+), 115 deletions(-) create mode 100644 client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java index afecdc3eea1a3..969f6e34b8e51 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java @@ -54,6 +54,7 @@ import org.opensearch.action.get.MultiGetRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchScrollRequest; @@ -92,6 +93,7 @@ import org.opensearch.index.reindex.ReindexRequest; import org.opensearch.index.reindex.UpdateByQueryRequest; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.rest.action.search.RestCreatePitAction; import org.opensearch.rest.action.search.RestSearchAction; import org.opensearch.script.mustache.MultiSearchTemplateRequest; import org.opensearch.script.mustache.SearchTemplateRequest; @@ -433,9 +435,13 @@ static void addSearchRequestParams(Params params, SearchRequest searchRequest) { params.putParam(RestSearchAction.TYPED_KEYS_PARAM, "true"); params.withRouting(searchRequest.routing()); params.withPreference(searchRequest.preference()); - params.withIndicesOptions(searchRequest.indicesOptions()); + if (searchRequest.pointInTimeBuilder() == null) { + params.withIndicesOptions(searchRequest.indicesOptions()); + } params.withSearchType(searchRequest.searchType().name().toLowerCase(Locale.ROOT)); - params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips())); + if (searchRequest.pointInTimeBuilder() == null) { + params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips())); + } if (searchRequest.getPreFilterShardSize() != null) { params.putParam("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize())); } @@ -458,6 +464,18 @@ static Request searchScroll(SearchScrollRequest searchScrollRequest) throws IOEx return request; } + static Request createPit(CreatePitRequest createPitRequest) throws IOException { + Params params = new Params(); + + params.putParam(RestCreatePitAction.ALLOW_PARTIAL_PIT_CREATION, "true"); + params.putParam(RestCreatePitAction.KEEP_ALIVE, "1d"); + params.withIndicesOptions(createPitRequest.indicesOptions()); + Request request = new Request(HttpPost.METHOD_NAME, endpoint(createPitRequest.indices(), "_search/point_in_time")); + request.addParameters(params.asMap()); + request.setEntity(createEntity(createPitRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + static Request clearScroll(ClearScrollRequest clearScrollRequest) throws IOException { Request request = new Request(HttpDelete.METHOD_NAME, "/_search/scroll"); request.setEntity(createEntity(clearScrollRequest, REQUEST_BODY_CONTENT_TYPE)); diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java index d293b979debb5..f3360630a26b7 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java @@ -59,6 +59,8 @@ import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.ClearScrollRequest; import org.opensearch.action.search.ClearScrollResponse; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchRequest; @@ -1256,6 +1258,46 @@ public final Cancellable scrollAsync( ); } + /** + * Create PIT context using create PIT API + * + * @param createPitRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + */ + public final CreatePitResponse createPit(CreatePitRequest createPitRequest, RequestOptions options) throws IOException { + return performRequestAndParseEntity( + createPitRequest, + RequestConverters::createPit, + options, + CreatePitResponse::fromXContent, + emptySet() + ); + } + + /** + * Asynchronously Create PIT context using create PIT API + * + * @param createPitRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + * @return the response + */ + public final Cancellable createPitAsync( + CreatePitRequest createPitRequest, + RequestOptions options, + ActionListener listener + ) { + return performRequestAsyncAndParseEntity( + createPitRequest, + RequestConverters::createPit, + options, + CreatePitResponse::fromXContent, + listener, + emptySet() + ); + } + /** * Clears one or more scroll ids using the Clear Scroll API. * diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java b/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java new file mode 100644 index 0000000000000..13845d029b9a0 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client; + +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.junit.Before; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.common.unit.TimeValue; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * Tests point in time API with rest high level client + */ +public class PitIT extends OpenSearchRestHighLevelClientTestCase { + + @Before + public void indexDocuments() throws IOException { + { + Request doc1 = new Request(HttpPut.METHOD_NAME, "/index/_doc/1"); + doc1.setJsonEntity("{\"type\":\"type1\", \"id\":1, \"num\":10, \"num2\":50}"); + client().performRequest(doc1); + Request doc2 = new Request(HttpPut.METHOD_NAME, "/index/_doc/2"); + doc2.setJsonEntity("{\"type\":\"type1\", \"id\":2, \"num\":20, \"num2\":40}"); + client().performRequest(doc2); + Request doc3 = new Request(HttpPut.METHOD_NAME, "/index/_doc/3"); + doc3.setJsonEntity("{\"type\":\"type1\", \"id\":3, \"num\":50, \"num2\":35}"); + client().performRequest(doc3); + Request doc4 = new Request(HttpPut.METHOD_NAME, "/index/_doc/4"); + doc4.setJsonEntity("{\"type\":\"type2\", \"id\":4, \"num\":100, \"num2\":10}"); + client().performRequest(doc4); + Request doc5 = new Request(HttpPut.METHOD_NAME, "/index/_doc/5"); + doc5.setJsonEntity("{\"type\":\"type2\", \"id\":5, \"num\":100, \"num2\":10}"); + client().performRequest(doc5); + } + + client().performRequest(new Request(HttpPost.METHOD_NAME, "/_refresh")); + } + + public void testCreatePit() throws IOException { + CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index"); + CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); + assertTrue(pitResponse.getId() != null); + assertEquals(1, pitResponse.getTotalShards()); + assertEquals(1, pitResponse.getSuccessfulShards()); + assertEquals(0, pitResponse.getFailedShards()); + assertEquals(0, pitResponse.getSkippedShards()); + } + /** + * Todo: add deletion logic and test cluster settings + */ +} diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/RequestConvertersTests.java index 0415b864ba35e..4f0b2ac0d88a1 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/RequestConvertersTests.java @@ -53,6 +53,7 @@ import org.opensearch.action.get.MultiGetRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchScrollRequest; @@ -131,6 +132,7 @@ import java.util.Locale; import java.util.Map; import java.util.StringJoiner; +import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -1303,6 +1305,27 @@ public void testClearScroll() throws IOException { assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue()); } + public void testCreatePit() throws IOException { + String[] indices = randomIndicesNames(0, 5); + Map expectedParams = new HashMap<>(); + expectedParams.put("keep_alive", "1d"); + expectedParams.put("allow_partial_pit_creation", "true"); + CreatePitRequest createPitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, indices); + setRandomIndicesOptions(createPitRequest::indicesOptions, createPitRequest::indicesOptions, expectedParams); + Request request = RequestConverters.createPit(createPitRequest); + StringJoiner endpoint = new StringJoiner("/", "/", ""); + String index = String.join(",", indices); + if (Strings.hasLength(index)) { + endpoint.add(index); + } + endpoint.add("_search/point_in_time"); + assertEquals(HttpPost.METHOD_NAME, request.getMethod()); + assertEquals(endpoint.toString(), request.getEndpoint()); + assertEquals(expectedParams, request.getParameters()); + assertToXContentBody(createPitRequest, request.getEntity()); + assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue()); + } + public void testSearchTemplate() throws Exception { // Create a random request. String[] indices = randomIndicesNames(0, 5); diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java b/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java index 19e287fb91be5..01a7f892c80a1 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java @@ -43,6 +43,8 @@ import org.opensearch.action.fieldcaps.FieldCapabilitiesResponse; import org.opensearch.action.search.ClearScrollRequest; import org.opensearch.action.search.ClearScrollResponse; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchRequest; @@ -89,6 +91,7 @@ import org.opensearch.search.aggregations.metrics.WeightedAvgAggregationBuilder; import org.opensearch.search.aggregations.support.MultiValuesSourceFieldConfig; import org.opensearch.search.aggregations.support.ValueType; +import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.fetch.subphase.FetchSourceContext; import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder; @@ -105,6 +108,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertToXContentEquivalent; @@ -762,6 +766,37 @@ public void testSearchScroll() throws Exception { } } + public void testSearchWithPit() throws Exception { + for (int i = 0; i < 100; i++) { + XContentBuilder builder = jsonBuilder().startObject().field("field", i).endObject(); + Request doc = new Request(HttpPut.METHOD_NAME, "/test/_doc/" + Integer.toString(i)); + doc.setJsonEntity(Strings.toString(builder)); + client().performRequest(doc); + } + client().performRequest(new Request(HttpPost.METHOD_NAME, "/test/_refresh")); + + CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "test"); + CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(35) + .sort("field", SortOrder.ASC) + .pointInTimeBuilder(new PointInTimeBuilder(pitResponse.getId())); + SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder); + SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); + + try { + long counter = 0; + assertSearchHeader(searchResponse); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(100L)); + assertThat(searchResponse.getHits().getHits().length, equalTo(35)); + for (SearchHit hit : searchResponse.getHits()) { + assertThat(((Number) hit.getSortValues()[0]).longValue(), equalTo(counter++)); + } + } finally { + // TODO : Delete PIT + } + } + public void testMultiSearch() throws Exception { MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); SearchRequest searchRequest1 = new SearchRequest("index1"); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json b/rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json new file mode 100644 index 0000000000000..eb5f4977b99d9 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json @@ -0,0 +1,43 @@ +{ + "create_pit":{ + "documentation":{ + "url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/", + "description":"Creates point in time context." + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/{index}/_search/point_in_time", + "methods":[ + "POST" + ], + "parts":{ + "index":{ + "type":"list", + "description":"A comma-separated list of index names to search; use `_all` or empty string to perform the operation on all indices" + } + } + } + ] + }, + "params":{ + "allow_partial_pit_creation":{ + "type":"boolean", + "description":"Allow if point in time can be created with partial failures" + }, + "keep_alive":{ + "type":"string", + "description":"Specify the keep alive for point in time" + }, + "preference":{ + "type":"string", + "description":"Specify the node or shard the operation should be performed on (default: random)" + }, + "routing":{ + "type":"list", + "description":"A comma-separated list of specific routing values" + } + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java b/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java index c0dad0eba4c1e..eacbcc42a8157 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java @@ -91,7 +91,12 @@ private void setupIndex(int numDocs, int numberOfShards) throws IOException, Exe client().admin() .indices() .prepareCreate("test") - .setSettings(Settings.builder().put("number_of_shards", numberOfShards).put("index.max_slices_per_scroll", 10000)) + .setSettings( + Settings.builder() + .put("number_of_shards", numberOfShards) + .put("index.max_slices_per_scroll", 10000) + .put("index.max_slices_per_pit", 10000) + ) .setMapping(mapping) ); ensureGreen(); diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitAction.java b/server/src/main/java/org/opensearch/action/search/CreatePitAction.java index a9bf6762ae1a3..73788bee88542 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitAction.java @@ -15,7 +15,7 @@ */ public class CreatePitAction extends ActionType { public static final CreatePitAction INSTANCE = new CreatePitAction(); - public static final String NAME = "indices:data/write/pit"; + public static final String NAME = "indices:data/read/pit"; private CreatePitAction() { super(NAME, CreatePitResponse::new); diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitController.java b/server/src/main/java/org/opensearch/action/search/CreatePitController.java index aac6b4910b32f..68f465c377154 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitController.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitController.java @@ -28,8 +28,8 @@ import org.opensearch.tasks.Task; import org.opensearch.transport.Transport; +import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.function.BiFunction; @@ -41,10 +41,10 @@ * Controller for creating PIT reader context * Phase 1 of create PIT request : Create PIT reader contexts in the associated shards with a temporary keep alive * Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request and - * fail user request if any of the updates in this phase are failed - we clean up PITs in case of such failures + * fail user request if any of the updates in this phase are failed - we clean up PITs in case of such failures. + * This two phase approach is used to save PIT ID as part of context which is later used for other use cases like list PIT etc. */ -public class CreatePitController implements Runnable { - private final Runnable runner; +public class CreatePitController { private final SearchTransportService searchTransportService; private final ClusterService clusterService; private final TransportSearchAction transportSearchAction; @@ -53,8 +53,8 @@ public class CreatePitController implements Runnable { private final ActionListener listener; private final CreatePitRequest request; private static final Logger logger = LogManager.getLogger(CreatePitController.class); - public static final Setting CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING = Setting.positiveTimeSetting( - "pit.temporary.keep_alive_interval", + public static final Setting PIT_CREATE_PHASE_KEEP_ALIVE = Setting.positiveTimeSetting( + "pit.create.phase.keep_alive", timeValueSeconds(30), Setting.Property.NodeScope ); @@ -75,20 +75,25 @@ public CreatePitController( this.task = task; this.listener = listener; this.request = request; - runner = this::executeCreatePit; } private TimeValue getCreatePitTemporaryKeepAlive() { - return CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.get(clusterService.getSettings()); + return PIT_CREATE_PHASE_KEEP_ALIVE.get(clusterService.getSettings()); + } + + public void execute() { + final StepListener createPitListener = new StepListener<>(); + final ActionListener updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), e -> { + logger.error("PIT creation failed while updating PIT ID", e); + listener.onFailure(e); + }); + executeCreatePit(createPitListener, updatePitIdListener); } /** - * Method for creating PIT reader context - * Phase 1 of create PIT request : Create PIT reader contexts in the associated shards with a temporary keep alive - * Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request and - * fail user request if any of the updates in this phase are failed - we clean up PITs in case of such failures + * This method creates PIT reader context */ - public void executeCreatePit() { + public void executeCreatePit(StepListener createPitListener, ActionListener updatePitIdListener) { SearchRequest searchRequest = new SearchRequest(request.getIndices()); searchRequest.preference(request.getPreference()); searchRequest.routing(request.getRouting()); @@ -101,15 +106,8 @@ public void executeCreatePit() { task.getAction(), () -> task.getDescription(), task.getParentTaskId(), - new HashMap<>() + null ); - - final StepListener createPitListener = new StepListener<>(); - - final ActionListener updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), e -> { - logger.error("PIT creation failed while updating PIT ID", e); - listener.onFailure(e); - }); /** * Phase 1 of create PIT */ @@ -118,14 +116,17 @@ public void executeCreatePit() { /** * Phase 2 of create PIT where we update pit id in pit contexts */ - executeUpdatePitId(request, createPitListener, updatePitIdListener); + createPitListener.whenComplete( + searchResponse -> { executeUpdatePitId(request, searchResponse, updatePitIdListener); }, + updatePitIdListener::onFailure + ); } /** * Creates PIT reader context with temporary keep alive */ void executeCreatePit(Task task, SearchRequest searchRequest, StepListener createPitListener) { - logger.debug("Creating PIT context"); + logger.debug("Executing creation of PIT context for indices [{}]", Arrays.toString(searchRequest.indices())); transportSearchAction.executeRequest( task, searchRequest, @@ -156,50 +157,66 @@ public void executeOnShardTarget( */ void executeUpdatePitId( CreatePitRequest request, - StepListener createPitListener, + SearchResponse searchResponse, ActionListener updatePitIdListener ) { - createPitListener.whenComplete(searchResponse -> { - logger.debug("Updating PIT context with PIT ID, creation time and keep alive"); - /** - * store the create time ( same create time for all PIT contexts across shards ) to be used - * for list PIT api - */ - final long creationTime = System.currentTimeMillis(); - CreatePitResponse createPITResponse = new CreatePitResponse(searchResponse, creationTime); - SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, createPITResponse.getId()); - final StepListener> lookupListener = getConnectionLookupListener(contextId); - lookupListener.whenComplete(nodelookup -> { - final ActionListener groupedActionListener = getGroupedListener( - updatePitIdListener, - createPITResponse, - contextId.shards().size(), - contextId.shards().values() - ); - for (Map.Entry entry : contextId.shards().entrySet()) { - DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode()); - try { - final Transport.Connection connection = searchTransportService.getConnection( - entry.getValue().getClusterAlias(), + logger.debug("Updating PIT context with PIT ID [{}], creation time and keep alive", searchResponse.pointInTimeId()); + /** + * store the create time ( same create time for all PIT contexts across shards ) to be used + * for list PIT api + */ + final long creationTime = System.currentTimeMillis(); + CreatePitResponse createPITResponse = new CreatePitResponse( + searchResponse.pointInTimeId(), + creationTime, + searchResponse.getTotalShards(), + searchResponse.getSuccessfulShards(), + searchResponse.getSkippedShards(), + searchResponse.getFailedShards(), + searchResponse.getShardFailures() + ); + SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, createPITResponse.getId()); + final StepListener> lookupListener = getConnectionLookupListener(contextId); + lookupListener.whenComplete(nodelookup -> { + final ActionListener groupedActionListener = getGroupedListener( + updatePitIdListener, + createPITResponse, + contextId.shards().size(), + contextId.shards().values() + ); + for (Map.Entry entry : contextId.shards().entrySet()) { + DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode()); + try { + final Transport.Connection connection = searchTransportService.getConnection(entry.getValue().getClusterAlias(), node); + searchTransportService.updatePitContext( + connection, + new UpdatePitContextRequest( + entry.getValue().getSearchContextId(), + createPITResponse.getId(), + request.getKeepAlive().millis(), + creationTime + ), + groupedActionListener + ); + } catch (Exception e) { + logger.error( + () -> new ParameterizedMessage( + "Create pit update phase failed for PIT ID [{}] on node [{}]", + searchResponse.pointInTimeId(), node - ); - searchTransportService.updatePitContext( - connection, - new UpdatePitContextRequest( - entry.getValue().getSearchContextId(), - createPITResponse.getId(), - request.getKeepAlive().millis(), - creationTime - ), - groupedActionListener - ); - } catch (Exception e) { - logger.error(() -> new ParameterizedMessage("Create pit update phase failed on node [{}]", node), e); - groupedActionListener.onFailure(new OpenSearchException("Create pit failed on node[" + node + "]", e)); - } + ), + e + ); + groupedActionListener.onFailure( + new OpenSearchException( + "Create pit update phase for PIT ID [" + searchResponse.pointInTimeId() + "] failed on node[" + node + "]", + e + ) + ); } - }, updatePitIdListener::onFailure); + } }, updatePitIdListener::onFailure); + } private StepListener> getConnectionLookupListener(SearchContextId contextId) { @@ -260,9 +277,4 @@ public void onFailure(Exception e) { }; ClearScrollController.closeContexts(clusterService.state().getNodes(), searchTransportService, contexts, deleteListener); } - - @Override - public void run() { - runner.run(); - } } diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitRequest.java b/server/src/main/java/org/opensearch/action/search/CreatePitRequest.java index b44a257f80071..45d6d9e2c9f54 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitRequest.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitRequest.java @@ -17,6 +17,8 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.ToXContent; +import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; @@ -29,7 +31,7 @@ /** * A request to make create point in time against one or more indices. */ -public class CreatePitRequest extends ActionRequest implements IndicesRequest.Replaceable { +public class CreatePitRequest extends ActionRequest implements IndicesRequest.Replaceable, ToXContent { // keep alive for pit reader context private TimeValue keepAlive; @@ -173,4 +175,21 @@ public CreatePitRequest indices(String... indices) { this.indices = indices; return this; } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("keep_alive", keepAlive); + builder.field("allow_partial_pit_creation", allowPartialPitCreation); + if (indices != null) { + builder.startArray("indices"); + for (String index : indices) { + builder.value(index); + } + builder.endArray(); + } + if (indicesOptions != null) { + indicesOptions.toXContent(builder, params); + } + return builder; + } } diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitResponse.java b/server/src/main/java/org/opensearch/action/search/CreatePitResponse.java index cb5063fb529ce..25eb9aff9e3d7 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitResponse.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitResponse.java @@ -9,19 +9,28 @@ package org.opensearch.action.search; import org.opensearch.action.ActionResponse; +import org.opensearch.common.ParseField; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.xcontent.StatusToXContentObject; import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentParser; import org.opensearch.rest.RestStatus; import org.opensearch.rest.action.RestActions; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** * Create point in time response with point in time id and shard success / failures */ public class CreatePitResponse extends ActionResponse implements StatusToXContentObject { + private static final ParseField ID = new ParseField("id"); + private static final ParseField CREATION_TIME = new ParseField("creation_time"); + // point in time id private final String id; private final int totalShards; @@ -31,19 +40,6 @@ public class CreatePitResponse extends ActionResponse implements StatusToXConten private final ShardSearchFailure[] shardFailures; private final long creationTime; - public CreatePitResponse(SearchResponse searchResponse, long creationTime) { - if (searchResponse.pointInTimeId() == null || searchResponse.pointInTimeId().isEmpty()) { - throw new IllegalArgumentException("Point in time ID is empty"); - } - this.id = searchResponse.pointInTimeId(); - this.totalShards = searchResponse.getTotalShards(); - this.successfulShards = searchResponse.getSuccessfulShards(); - this.failedShards = searchResponse.getFailedShards(); - this.skippedShards = searchResponse.getSkippedShards(); - this.shardFailures = searchResponse.getShardFailures(); - this.creationTime = creationTime; - } - public CreatePitResponse(StreamInput in) throws IOException { super(in); id = in.readString(); @@ -63,10 +59,28 @@ public CreatePitResponse(StreamInput in) throws IOException { } } + public CreatePitResponse( + String id, + long creationTime, + int totalShards, + int successfulShards, + int skippedShards, + int failedShards, + ShardSearchFailure[] shardFailures + ) { + this.id = id; + this.creationTime = creationTime; + this.totalShards = totalShards; + this.successfulShards = successfulShards; + this.skippedShards = skippedShards; + this.failedShards = failedShards; + this.shardFailures = shardFailures; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("id", id); + builder.field(ID.getPreferredName(), id); RestActions.buildBroadcastShardsHeader( builder, params, @@ -76,11 +90,87 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws getFailedShards(), getShardFailures() ); - builder.field("creationTime", creationTime); + builder.field(CREATION_TIME.getPreferredName(), creationTime); builder.endObject(); return builder; } + /** + * Parse the create PIT response body into a new {@link CreatePitResponse} object + */ + public static CreatePitResponse fromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + parser.nextToken(); + return innerFromXContent(parser); + } + + public static CreatePitResponse innerFromXContent(XContentParser parser) throws IOException { + ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser); + String currentFieldName = parser.currentName(); + int successfulShards = -1; + int totalShards = -1; + int skippedShards = 0; + int failedShards = 0; + String id = null; + long creationTime = 0; + List failures = new ArrayList<>(); + for (XContentParser.Token token = parser.nextToken(); token != XContentParser.Token.END_OBJECT; token = parser.nextToken()) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if (CREATION_TIME.match(currentFieldName, parser.getDeprecationHandler())) { + creationTime = parser.longValue(); + } else if (ID.match(currentFieldName, parser.getDeprecationHandler())) { + id = parser.text(); + } else { + parser.skipChildren(); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (RestActions._SHARDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + currentFieldName = parser.currentName(); + } else if (token.isValue()) { + if (RestActions.FAILED_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { + failedShards = parser.intValue(); // we don't need it but need to consume it + } else if (RestActions.SUCCESSFUL_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { + successfulShards = parser.intValue(); + } else if (RestActions.TOTAL_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { + totalShards = parser.intValue(); + } else if (RestActions.SKIPPED_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { + skippedShards = parser.intValue(); + } else { + parser.skipChildren(); + } + } else if (token == XContentParser.Token.START_ARRAY) { + if (RestActions.FAILURES_FIELD.match(currentFieldName, parser.getDeprecationHandler())) { + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + failures.add(ShardSearchFailure.fromXContent(parser)); + } + } else { + parser.skipChildren(); + } + } else { + parser.skipChildren(); + } + } + } else { + parser.skipChildren(); + } + } + } + + return new CreatePitResponse( + id, + creationTime, + totalShards, + successfulShards, + skippedShards, + failedShards, + failures.toArray(ShardSearchFailure.EMPTY_ARRAY) + ); + } + public long getCreationTime() { return creationTime; } diff --git a/server/src/main/java/org/opensearch/action/search/TransportCreatePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportCreatePitAction.java index 068b8cd46a9f9..1d15e9d018211 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportCreatePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportCreatePitAction.java @@ -57,16 +57,19 @@ public TransportCreatePitAction( @Override protected void doExecute(Task task, CreatePitRequest request, ActionListener listener) { - Runnable runnable = new CreatePitController( - request, - searchTransportService, - clusterService, - transportSearchAction, - namedWriteableRegistry, - task, - listener - ); - runnable.run(); + Thread t = new Thread(() -> { + CreatePitController controller = new CreatePitController( + request, + searchTransportService, + clusterService, + transportSearchAction, + namedWriteableRegistry, + task, + listener + ); + controller.execute(); + }); + t.start(); } public static class CreateReaderContextRequest extends TransportRequest { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 2103b8b52fff2..5e007e01139ce 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -465,7 +465,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.ALLOW_EXPENSIVE_QUERIES, SearchService.MAX_OPEN_PIT_CONTEXT, SearchService.MAX_PIT_KEEPALIVE_SETTING, - CreatePitController.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING, + CreatePitController.PIT_CREATE_PHASE_KEEP_ALIVE, MultiBucketConsumerService.MAX_BUCKET_SETTING, SearchService.LOW_LEVEL_CANCELLATION_SETTING, SearchService.MAX_OPEN_SCROLL_CONTEXT, diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 68e1b5b598d40..22a5a88a3daf3 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -147,6 +147,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_CHECK_ON_STARTUP, IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD, IndexSettings.MAX_SLICES_PER_SCROLL, + IndexSettings.MAX_SLICES_PER_PIT, IndexSettings.MAX_REGEX_LENGTH_SETTING, ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING, IndexSettings.INDEX_GC_DELETES_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 8ba9c47902115..d637b355d7b98 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -449,6 +449,17 @@ public final class IndexSettings { Property.IndexScope ); + /** + * The maximum number of slices allowed in a search request with PIT + */ + public static final Setting MAX_SLICES_PER_PIT = Setting.intSetting( + "index.max_slices_per_pit", + 1024, + 1, + Property.Dynamic, + Property.IndexScope + ); + /** * The maximum length of regex string allowed in a regexp query. */ @@ -601,6 +612,10 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { * The maximum number of slices allowed in a scroll request. */ private volatile int maxSlicesPerScroll; + /** + * The maximum number of slices allowed in a PIT request. + */ + private volatile int maxSlicesPerPit; /** * The maximum length of regex string allowed in a regexp query. @@ -715,6 +730,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti maxShingleDiff = scopedSettings.get(MAX_SHINGLE_DIFF_SETTING); maxRefreshListeners = scopedSettings.get(MAX_REFRESH_LISTENERS_PER_SHARD); maxSlicesPerScroll = scopedSettings.get(MAX_SLICES_PER_SCROLL); + maxSlicesPerPit = scopedSettings.get(MAX_SLICES_PER_PIT); maxAnalyzedOffset = scopedSettings.get(MAX_ANALYZED_OFFSET_SETTING); maxTermsCount = scopedSettings.get(MAX_TERMS_COUNT_SETTING); maxRegexLength = scopedSettings.get(MAX_REGEX_LENGTH_SETTING); @@ -787,6 +803,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(MAX_ANALYZED_OFFSET_SETTING, this::setHighlightMaxAnalyzedOffset); scopedSettings.addSettingsUpdateConsumer(MAX_TERMS_COUNT_SETTING, this::setMaxTermsCount); scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll); + scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_PIT, this::setMaxSlicesPerPit); scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields); scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter); scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength); @@ -1247,6 +1264,17 @@ public int getMaxSlicesPerScroll() { return maxSlicesPerScroll; } + /** + * The maximum number of slices allowed in a PIT request. + */ + public int getMaxSlicesPerPit() { + return maxSlicesPerPit; + } + + private void setMaxSlicesPerPit(int value) { + this.maxSlicesPerPit = value; + } + private void setMaxSlicesPerScroll(int value) { this.maxSlicesPerScroll = value; } diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestCreatePitAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePitAction.java index bb7f323f76033..9439670880015 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestCreatePitAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePitAction.java @@ -28,6 +28,9 @@ * Rest action for creating PIT context */ public class RestCreatePitAction extends BaseRestHandler { + public static String ALLOW_PARTIAL_PIT_CREATION = "allow_partial_pit_creation"; + public static String KEEP_ALIVE = "keep_alive"; + @Override public String getName() { return "create_pit_action"; @@ -35,9 +38,9 @@ public String getName() { @Override public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - boolean allowPartialPitCreation = request.paramAsBoolean("allow_partial_pit_creation", true); + boolean allowPartialPitCreation = request.paramAsBoolean(ALLOW_PARTIAL_PIT_CREATION, true); String[] indices = Strings.splitStringByCommaToArray(request.param("index")); - TimeValue keepAlive = request.paramAsTime("keep_alive", null); + TimeValue keepAlive = request.paramAsTime(KEEP_ALIVE, null); CreatePitRequest createPitRequest = new CreatePitRequest(keepAlive, allowPartialPitCreation, indices); createPitRequest.setIndicesOptions(IndicesOptions.fromRequest(request, createPitRequest.indicesOptions())); createPitRequest.setPreference(request.param("preference")); diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 9b2d2166835d0..9a0397d3f6511 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -75,6 +75,7 @@ import org.opensearch.search.fetch.subphase.ScriptFieldsContext; import org.opensearch.search.fetch.subphase.highlight.SearchHighlightContext; import org.opensearch.search.internal.ContextIndexSearcher; +import org.opensearch.search.internal.PitReaderContext; import org.opensearch.search.internal.ReaderContext; import org.opensearch.search.internal.ScrollContext; import org.opensearch.search.internal.SearchContext; @@ -299,6 +300,23 @@ public void preProcess(boolean rewrite) { } } + if (sliceBuilder != null && readerContext != null && readerContext instanceof PitReaderContext) { + int sliceLimit = indexService.getIndexSettings().getMaxSlicesPerPit(); + int numSlices = sliceBuilder.getMax(); + if (numSlices > sliceLimit) { + throw new IllegalArgumentException( + "The number of slices [" + + numSlices + + "] is too large. It must " + + "be less than [" + + sliceLimit + + "]. This limit can be set by changing the [" + + IndexSettings.MAX_SLICES_PER_PIT.getKey() + + "] index level setting." + ); + } + } + // initialize the filtering alias based on the provided filters try { final QueryBuilder queryBuilder = request.getAliasFilter().getQueryBuilder(); diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 660774a986194..fdf06d75db45c 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -227,11 +227,12 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv /** * This setting defines the maximum number of active PIT reader contexts in the node , since each PIT context - * has a resource cost attached to it + * has a resource cost attached to it. This setting is less than scroll since users are + * encouraged to share the PIT details. */ public static final Setting MAX_OPEN_PIT_CONTEXT = Setting.intSetting( "search.max_open_pit_context", - 500, + 300, 0, Property.Dynamic, Property.NodeScope diff --git a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java index b9dec52e812c9..5411d6c72686d 100644 --- a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java +++ b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java @@ -126,9 +126,7 @@ public void onFailure(Exception e) { clusterServiceMock = mock(ClusterService.class); ClusterState state = mock(ClusterState.class); - final Settings keepAliveSettings = Settings.builder() - .put(CreatePitController.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.getKey(), 30000) - .build(); + final Settings keepAliveSettings = Settings.builder().put(CreatePitController.PIT_CREATE_PHASE_KEEP_ALIVE.getKey(), 30000).build(); when(clusterServiceMock.getSettings()).thenReturn(keepAliveSettings); when(state.getMetadata()).thenReturn(Metadata.EMPTY_METADATA); @@ -190,8 +188,6 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod createPitListener ); - CreatePitResponse createPITResponse = new CreatePitResponse(searchResponse, System.currentTimeMillis()); - ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { @Override public void onResponse(CreatePitResponse createPITResponse) { @@ -205,8 +201,7 @@ public void onFailure(Exception e) { }, latch); StepListener createListener = new StepListener<>(); - - controller.executeUpdatePitId(request, createListener, updatelistener); + controller.executeCreatePit(createListener, updatelistener); createListener.onResponse(searchResponse); latch.await(); assertEquals(3, updateNodesInvoked.size()); @@ -277,7 +272,7 @@ public void onFailure(Exception e) { StepListener createListener = new StepListener<>(); - controller.executeUpdatePitId(request, createListener, updatelistener); + controller.executeCreatePit(createListener, updatelistener); createListener.onFailure(new Exception("Exception occurred in phase 1")); latch.await(); assertEquals(0, updateNodesInvoked.size()); @@ -339,7 +334,6 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod createPitListener ); - CreatePitResponse createPITResponse = new CreatePitResponse(searchResponse, System.currentTimeMillis()); CountDownLatch latch = new CountDownLatch(1); ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { @@ -355,7 +349,7 @@ public void onFailure(Exception e) { }, latch); StepListener createListener = new StepListener<>(); - controller.executeUpdatePitId(request, createListener, updatelistener); + controller.executeCreatePit(createListener, updatelistener); createListener.onResponse(searchResponse); latch.await(); assertEquals(3, updateNodesInvoked.size()); @@ -408,7 +402,6 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod createPitListener ); - CreatePitResponse createPITResponse = new CreatePitResponse(searchResponse, System.currentTimeMillis()); CountDownLatch latch = new CountDownLatch(1); ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { @@ -424,7 +417,7 @@ public void onFailure(Exception e) { }, latch); StepListener createListener = new StepListener<>(); - controller.executeUpdatePitId(request, createListener, updatelistener); + controller.executeCreatePit(createListener, updatelistener); createListener.onResponse(searchResponse); latch.await(); assertEquals(3, updateNodesInvoked.size()); diff --git a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java index 06afb956ab4a6..3c83f899dd1b5 100644 --- a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java @@ -67,6 +67,7 @@ import org.opensearch.indices.breaker.NoneCircuitBreakerService; import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.LegacyReaderContext; +import org.opensearch.search.internal.PitReaderContext; import org.opensearch.search.internal.ReaderContext; import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.search.internal.ShardSearchRequest; @@ -134,10 +135,12 @@ public void testPreProcess() throws Exception { int maxResultWindow = randomIntBetween(50, 100); int maxRescoreWindow = randomIntBetween(50, 100); int maxSlicesPerScroll = randomIntBetween(50, 100); + int maxSlicesPerPit = randomIntBetween(50, 100); Settings settings = Settings.builder() .put("index.max_result_window", maxResultWindow) .put("index.max_slices_per_scroll", maxSlicesPerScroll) .put("index.max_rescore_window", maxRescoreWindow) + .put("index.max_slices_per_pit", maxSlicesPerPit) .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) @@ -400,6 +403,48 @@ protected Engine.Searcher acquireSearcherInternal(String source) { assertTrue(query1 instanceof MatchNoDocsQuery || query2 instanceof MatchNoDocsQuery); readerContext.close(); + + ReaderContext pitReaderContext = new PitReaderContext( + newContextId(), + indexService, + indexShard, + searcherSupplier.get(), + 1000, + true + ); + DefaultSearchContext context5 = new DefaultSearchContext( + pitReaderContext, + shardSearchRequest, + target, + null, + bigArrays, + null, + timeout, + null, + false, + Version.CURRENT, + false, + executor + ); + int numSlicesForPit = maxSlicesPerPit + randomIntBetween(1, 100); + when(sliceBuilder.getMax()).thenReturn(numSlicesForPit); + context5.sliceBuilder(sliceBuilder); + + exception = expectThrows(IllegalArgumentException.class, () -> context5.preProcess(false)); + assertThat( + exception.getMessage(), + equalTo( + "The number of slices [" + + numSlicesForPit + + "] is too large. It must " + + "be less than [" + + maxSlicesPerPit + + "]. This limit can be set by changing the [" + + IndexSettings.MAX_SLICES_PER_PIT.getKey() + + "] index level setting." + ) + ); + pitReaderContext.close(); threadPool.shutdown(); } } diff --git a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java index 08bf66c472abd..53ab3e0de401b 100644 --- a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java @@ -46,7 +46,7 @@ protected Settings nodeSettings() { return Settings.builder() .put(super.nodeSettings()) .put(SearchService.KEEPALIVE_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(1)) - .put(CreatePitController.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.getKey(), TimeValue.timeValueSeconds(1)) + .put(CreatePitController.PIT_CREATE_PHASE_KEEP_ALIVE.getKey(), TimeValue.timeValueSeconds(1)) .build(); } From 648402e2a7d16131b918111098fae099811db83d Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Tue, 10 May 2022 23:04:32 +0530 Subject: [PATCH 2/2] Addressing comments Signed-off-by: Bharathwaj G --- .../action/search/CreatePitController.java | 11 +++++++++-- .../opensearch/search/internal/PitReaderContext.java | 6 +++--- .../opensearch/search/internal/ReaderContext.java | 12 ++++++++++-- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitController.java b/server/src/main/java/org/opensearch/action/search/CreatePitController.java index 68f465c377154..1e74ab1922d8c 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitController.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitController.java @@ -126,7 +126,9 @@ public void executeCreatePit(StepListener createPitListener, Act * Creates PIT reader context with temporary keep alive */ void executeCreatePit(Task task, SearchRequest searchRequest, StepListener createPitListener) { - logger.debug("Executing creation of PIT context for indices [{}]", Arrays.toString(searchRequest.indices())); + logger.debug( + () -> new ParameterizedMessage("Executing creation of PIT context for indices [{}]", Arrays.toString(searchRequest.indices())) + ); transportSearchAction.executeRequest( task, searchRequest, @@ -160,7 +162,12 @@ void executeUpdatePitId( SearchResponse searchResponse, ActionListener updatePitIdListener ) { - logger.debug("Updating PIT context with PIT ID [{}], creation time and keep alive", searchResponse.pointInTimeId()); + logger.debug( + () -> new ParameterizedMessage( + "Updating PIT context with PIT ID [{}], creation time and keep alive", + searchResponse.pointInTimeId() + ) + ); /** * store the create time ( same create time for all PIT contexts across shards ) to be used * for list PIT api diff --git a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java index 5c19d54327fea..43ca7e0ebd823 100644 --- a/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/PitReaderContext.java @@ -50,13 +50,13 @@ public void setPitId(final String pitId) { * keepAliveInMillis. */ public Releasable updatePitIdAndKeepAlive(long keepAliveInMillis, String pitId, long createTime) { - refCounted.incRef(); + getRefCounted().incRef(); tryUpdateKeepAlive(keepAliveInMillis); setPitId(pitId); setCreationTime(createTime); return Releasables.releaseOnce(() -> { - this.lastAccessTime.updateAndGet(curr -> Math.max(curr, nowInMillis())); - refCounted.decRef(); + getLastAccessTime().updateAndGet(curr -> Math.max(curr, nowInMillis())); + getRefCounted().decRef(); }); } diff --git a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java index 6342051ce4405..7a7ee27d36d56 100644 --- a/server/src/main/java/org/opensearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/opensearch/search/internal/ReaderContext.java @@ -66,9 +66,9 @@ public class ReaderContext implements Releasable { private final boolean singleSession; private final AtomicLong keepAlive; - protected final AtomicLong lastAccessTime; + private final AtomicLong lastAccessTime; // For reference why we use RefCounted here see https://github.com/elastic/elasticsearch/pull/20095. - protected final AbstractRefCounted refCounted; + private final AbstractRefCounted refCounted; private final List onCloses = new CopyOnWriteArrayList<>(); @@ -107,6 +107,14 @@ protected long nowInMillis() { return indexShard.getThreadPool().relativeTimeInMillis(); } + protected AbstractRefCounted getRefCounted() { + return refCounted; + } + + protected AtomicLong getLastAccessTime() { + return lastAccessTime; + } + @Override public final void close() { if (closed.compareAndSet(false, true)) {