diff --git a/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java b/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java index 9c735c42052e3..dce60f52fd23a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/slice/SearchSliceIT.java @@ -32,9 +32,13 @@ package org.opensearch.search.slice; +import org.opensearch.action.ActionFuture; import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest; import org.opensearch.action.index.IndexRequestBuilder; +import org.opensearch.action.search.CreatePITAction; +import org.opensearch.action.search.CreatePITRequest; +import org.opensearch.action.search.CreatePITResponse; import org.opensearch.action.search.SearchPhaseExecutionException; import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.search.SearchResponse; @@ -46,6 +50,7 @@ import org.opensearch.search.Scroll; import org.opensearch.search.SearchException; import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.search.sort.SortBuilders; import org.opensearch.test.OpenSearchIntegTestCase; @@ -129,6 +134,77 @@ public void testSearchSort() throws Exception { } } + public void testSearchSortWithoutPitOrScroll() throws Exception { + int numShards = randomIntBetween(1, 7); + int numDocs = randomIntBetween(100, 1000); + setupIndex(numDocs, numShards); + int fetchSize = randomIntBetween(10, 100); + SearchRequestBuilder request = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setSize(fetchSize) + .addSort(SortBuilders.fieldSort("_doc")); + SliceBuilder sliceBuilder = new SliceBuilder("_id", 0, 4); + SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, () -> request.slice(sliceBuilder).get()); + assertTrue(ex.getMessage().contains("all shards failed")); + } + + public void testSearchSortWithPIT() throws Exception { + int numShards = randomIntBetween(1, 7); + int numDocs = randomIntBetween(100, 1000); + setupIndex(numDocs, numShards); + int max = randomIntBetween(2, numShards * 3); + CreatePITRequest pitRequest = new CreatePITRequest(TimeValue.timeValueDays(1), true); + pitRequest.setIndices(new String[] { "test" }); + ActionFuture execute = client().execute(CreatePITAction.INSTANCE, pitRequest); + CreatePITResponse pitResponse = execute.get(); + for (String field : new String[] { "_id", "random_int", "static_int" }) { + int fetchSize = randomIntBetween(10, 100); + + // test _doc sort + SearchRequestBuilder request = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setPointInTime(new PointInTimeBuilder(pitResponse.getId())) + .setSize(fetchSize) + .addSort(SortBuilders.fieldSort("_doc")); + assertSearchSlicesWithPIT(request, field, max, numDocs); + + // test numeric sort + request = client().prepareSearch("test") + .setQuery(matchAllQuery()) + .setPointInTime(new PointInTimeBuilder(pitResponse.getId())) + .setSize(fetchSize) + .addSort(SortBuilders.fieldSort("random_int")); + assertSearchSlicesWithPIT(request, field, max, numDocs); + } + } + + private void assertSearchSlicesWithPIT(SearchRequestBuilder request, String field, int numSlice, int numDocs) { + int totalResults = 0; + List keys = new ArrayList<>(); + for (int id = 0; id < numSlice; id++) { + SliceBuilder sliceBuilder = new SliceBuilder(field, id, numSlice); + SearchResponse searchResponse = request.slice(sliceBuilder).setFrom(0).get(); + totalResults += searchResponse.getHits().getHits().length; + int expectedSliceResults = (int) searchResponse.getHits().getTotalHits().value; + int numSliceResults = searchResponse.getHits().getHits().length; + for (SearchHit hit : searchResponse.getHits().getHits()) { + assertTrue(keys.add(hit.getId())); + } + while (searchResponse.getHits().getHits().length > 0) { + searchResponse = request.setFrom(numSliceResults).slice(sliceBuilder).get(); + totalResults += searchResponse.getHits().getHits().length; + numSliceResults += searchResponse.getHits().getHits().length; + for (SearchHit hit : searchResponse.getHits().getHits()) { + assertTrue(keys.add(hit.getId())); + } + } + assertThat(numSliceResults, equalTo(expectedSliceResults)); + } + assertThat(totalResults, equalTo(numDocs)); + assertThat(keys.size(), equalTo(numDocs)); + assertThat(new HashSet(keys).size(), equalTo(numDocs)); + } + public void testWithPreferenceAndRoutings() throws Exception { int numShards = 10; int totalDocs = randomIntBetween(100, 1000); diff --git a/server/src/main/java/org/opensearch/action/search/PITController.java b/server/src/main/java/org/opensearch/action/search/CreatePITController.java similarity index 89% rename from server/src/main/java/org/opensearch/action/search/PITController.java rename to server/src/main/java/org/opensearch/action/search/CreatePITController.java index c61484bb6b348..92efd9ebb1836 100644 --- a/server/src/main/java/org/opensearch/action/search/PITController.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePITController.java @@ -29,6 +29,7 @@ import org.opensearch.transport.Transport; import java.util.Collection; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.function.BiFunction; @@ -42,7 +43,7 @@ * Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request and * fail user request if any of the updates in this phase are failed - we clean up PITs in case of such failures */ -public class PITController implements Runnable { +public class CreatePITController implements Runnable { private final Runnable runner; private final SearchTransportService searchTransportService; private final ClusterService clusterService; @@ -51,14 +52,14 @@ public class PITController implements Runnable { private final Task task; private final ActionListener listener; private final CreatePITRequest request; - private static final Logger logger = LogManager.getLogger(PITController.class); + private static final Logger logger = LogManager.getLogger(CreatePITController.class); public static final Setting CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING = Setting.positiveTimeSetting( "pit.temporary.keep_alive_interval", timeValueSeconds(30), Setting.Property.NodeScope ); - public PITController( + public CreatePITController( CreatePITRequest request, SearchTransportService searchTransportService, ClusterService clusterService, @@ -81,6 +82,12 @@ private TimeValue getCreatePitTemporaryKeepAlive() { return CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.get(clusterService.getSettings()); } + /** + * Method for creating PIT reader context + * Phase 1 of create PIT request : Create PIT reader contexts in the associated shards with a temporary keep alive + * Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request and + * fail user request if any of the updates in this phase are failed - we clean up PITs in case of such failures + */ public void executeCreatePit() { SearchRequest searchRequest = new SearchRequest(request.getIndices()); searchRequest.preference(request.getPreference()); @@ -94,7 +101,7 @@ public void executeCreatePit() { task.getAction(), () -> task.getDescription(), task.getParentTaskId(), - task.getHeaders() + new HashMap<>() ); final StepListener createPitListener = new StepListener<>(); @@ -118,6 +125,7 @@ public void executeCreatePit() { * Creates PIT reader context with temporary keep alive */ void executeCreatePit(Task task, SearchRequest searchRequest, StepListener createPitListener) { + logger.debug("Creating PIT context"); transportSearchAction.executeRequest( task, searchRequest, @@ -152,7 +160,13 @@ void executeUpdatePitId( ActionListener updatePitIdListener ) { createPitListener.whenComplete(searchResponse -> { - CreatePITResponse createPITResponse = new CreatePITResponse(searchResponse); + logger.debug("Updating PIT context with PIT ID, creation time and keep alive"); + /** + * store the create time ( same create time for all PIT contexts across shards ) to be used + * for list PIT api + */ + final long creationTime = System.currentTimeMillis(); + CreatePITResponse createPITResponse = new CreatePITResponse(searchResponse, creationTime); SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, createPITResponse.getId()); final StepListener> lookupListener = getConnectionLookupListener(contextId); lookupListener.whenComplete(nodelookup -> { @@ -162,11 +176,6 @@ void executeUpdatePitId( contextId.shards().size(), contextId.shards().values() ); - /** - * store the create time ( same create time for all PIT contexts across shards ) to be used - * for list PIT api - */ - long createTime = System.currentTimeMillis(); for (Map.Entry entry : contextId.shards().entrySet()) { DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode()); try { @@ -180,7 +189,7 @@ void executeUpdatePitId( entry.getValue().getSearchContextId(), createPITResponse.getId(), request.getKeepAlive().millis(), - createTime + creationTime ), groupedActionListener ); @@ -205,10 +214,10 @@ private StepListener> getConnectionLoo final StepListener> lookupListener = new StepListener<>(); - if (clusters.isEmpty() == false) { - searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener); - } else { + if (clusters.isEmpty()) { lookupListener.onResponse((cluster, nodeId) -> state.getNodes().get(nodeId)); + } else { + searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener); } return lookupListener; } @@ -246,7 +255,7 @@ public void onResponse(Integer freed) { @Override public void onFailure(Exception e) { - logger.debug("Cleaning up PIT contexts failed ", e); + logger.error("Cleaning up PIT contexts failed ", e); } }; ClearScrollController.closeContexts(clusterService.state().getNodes(), searchTransportService, contexts, deleteListener); diff --git a/server/src/main/java/org/opensearch/action/search/CreatePITRequest.java b/server/src/main/java/org/opensearch/action/search/CreatePITRequest.java index 51d263d7ea856..0b7efa73c7e57 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePITRequest.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePITRequest.java @@ -152,6 +152,7 @@ public final String buildDescription() { Strings.arrayToDelimitedString(indices, ",", sb); sb.append("], "); sb.append("pointintime[").append(keepAlive).append("], "); + sb.append("allowPartialPitCreation[").append(allowPartialPitCreation).append("], "); return sb.toString(); } diff --git a/server/src/main/java/org/opensearch/action/search/CreatePITResponse.java b/server/src/main/java/org/opensearch/action/search/CreatePITResponse.java index 3812cb42b22b3..2ddfd7c20a5fc 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePITResponse.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePITResponse.java @@ -29,14 +29,19 @@ public class CreatePITResponse extends ActionResponse implements StatusToXConten private final int failedShards; private final int skippedShards; private final ShardSearchFailure[] shardFailures; + private final long creationTime; - public CreatePITResponse(SearchResponse searchResponse) { + public CreatePITResponse(SearchResponse searchResponse, long creationTime) { + if (searchResponse.pointInTimeId() == null || searchResponse.pointInTimeId().isEmpty()) { + throw new IllegalArgumentException("Point in time ID is empty"); + } this.id = searchResponse.pointInTimeId(); this.totalShards = searchResponse.getTotalShards(); this.successfulShards = searchResponse.getSuccessfulShards(); this.failedShards = searchResponse.getFailedShards(); this.skippedShards = searchResponse.getSkippedShards(); this.shardFailures = searchResponse.getShardFailures(); + this.creationTime = creationTime; } public CreatePITResponse(StreamInput in) throws IOException { @@ -46,6 +51,7 @@ public CreatePITResponse(StreamInput in) throws IOException { successfulShards = in.readVInt(); failedShards = in.readVInt(); skippedShards = in.readVInt(); + creationTime = in.readLong(); int size = in.readVInt(); if (size == 0) { shardFailures = ShardSearchFailure.EMPTY_ARRAY; @@ -70,10 +76,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws getFailedShards(), getShardFailures() ); + builder.field("creationTime", creationTime); builder.endObject(); return builder; } + public long getCreationTime() { + return creationTime; + } + /** * The failed number of shards the search was executed on. */ @@ -97,6 +108,7 @@ public void writeTo(StreamOutput out) throws IOException { shardSearchFailure.writeTo(out); } out.writeString(id); + out.writeLong(creationTime); } public String getId() { diff --git a/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java b/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java index f69d51f066cc8..225973211c406 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportCreatePITAction.java @@ -57,7 +57,7 @@ public TransportCreatePITAction( @Override protected void doExecute(Task task, CreatePITRequest request, ActionListener listener) { - Runnable runnable = new PITController( + Runnable runnable = new CreatePITController( request, searchTransportService, clusterService, diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index cfb22b0518f7e..c67d299be3435 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -32,8 +32,7 @@ package org.opensearch.common.settings; import org.apache.logging.log4j.LogManager; -import org.opensearch.action.main.TransportMainAction; -import org.opensearch.action.search.PITController; +import org.opensearch.action.search.CreatePITController; import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; @@ -467,7 +466,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.ALLOW_EXPENSIVE_QUERIES, SearchService.MAX_OPEN_PIT_CONTEXT, SearchService.MAX_PIT_KEEPALIVE_SETTING, - PITController.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING, + CreatePITController.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING, MultiBucketConsumerService.MAX_BUCKET_SETTING, SearchService.LOW_LEVEL_CANCELLATION_SETTING, SearchService.MAX_OPEN_SCROLL_CONTEXT, diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index 6fd78b834344d..9b2d2166835d0 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -282,7 +282,7 @@ public void preProcess(boolean rewrite) { } } - if (sliceBuilder != null) { + if (sliceBuilder != null && scrollContext() != null) { int sliceLimit = indexService.getIndexSettings().getMaxSlicesPerScroll(); int numSlices = sliceBuilder.getMax(); if (numSlices > sliceLimit) { diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 823bcbc6376ab..8a0a08ad5d703 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -1297,8 +1297,8 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc } if (source.slice() != null) { - if (context.scrollContext() == null) { - throw new SearchException(shardTarget, "`slice` cannot be used outside of a scroll context"); + if (context.scrollContext() == null && !(context.readerContext() instanceof PitReaderContext)) { + throw new SearchException(shardTarget, "`slice` cannot be used outside of a scroll context or PIT context"); } context.sliceBuilder(source.slice()); } diff --git a/server/src/main/java/org/opensearch/tasks/Task.java b/server/src/main/java/org/opensearch/tasks/Task.java index d2e91d6ee3418..a51af17ae8ea2 100644 --- a/server/src/main/java/org/opensearch/tasks/Task.java +++ b/server/src/main/java/org/opensearch/tasks/Task.java @@ -364,10 +364,6 @@ public String getHeader(String header) { return headers.get(header); } - public Map getHeaders() { - return headers; - } - public TaskResult result(DiscoveryNode node, Exception error) throws IOException { return new TaskResult(taskInfo(node.getId(), true, true), error); } diff --git a/server/src/test/java/org/opensearch/action/search/PitControllerTests.java b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java similarity index 97% rename from server/src/test/java/org/opensearch/action/search/PitControllerTests.java rename to server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java index c7c8b22a9a179..9742a58346e73 100644 --- a/server/src/test/java/org/opensearch/action/search/PitControllerTests.java +++ b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java @@ -50,7 +50,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class PitControllerTests extends OpenSearchTestCase { +public class CreatePitControllerTests extends OpenSearchTestCase { DiscoveryNode node1 = null; DiscoveryNode node2 = null; @@ -123,7 +123,7 @@ public void onFailure(Exception e) { ClusterState state = mock(ClusterState.class); final Settings keepAliveSettings = Settings.builder() - .put(PITController.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.getKey(), 30000) + .put(CreatePITController.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.getKey(), 30000) .build(); when(clusterServiceMock.getSettings()).thenReturn(keepAliveSettings); @@ -176,7 +176,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PITController controller = new PITController( + CreatePITController controller = new CreatePITController( request, searchTransportService, clusterServiceMock, @@ -186,7 +186,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod createPitListener ); - CreatePITResponse createPITResponse = new CreatePITResponse(searchResponse); + CreatePITResponse createPITResponse = new CreatePITResponse(searchResponse, System.currentTimeMillis()); ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { @Override @@ -249,7 +249,7 @@ public void sendFreeContext( CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PITController controller = new PITController( + CreatePITController controller = new CreatePITController( request, searchTransportService, clusterServiceMock, @@ -325,7 +325,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod }; CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PITController controller = new PITController( + CreatePITController controller = new CreatePITController( request, searchTransportService, clusterServiceMock, @@ -335,7 +335,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod createPitListener ); - CreatePITResponse createPITResponse = new CreatePITResponse(searchResponse); + CreatePITResponse createPITResponse = new CreatePITResponse(searchResponse, System.currentTimeMillis()); CountDownLatch latch = new CountDownLatch(1); ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { @@ -394,7 +394,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod }; CreatePITRequest request = new CreatePITRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PITController controller = new PITController( + CreatePITController controller = new CreatePITController( request, searchTransportService, clusterServiceMock, @@ -404,7 +404,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod createPitListener ); - CreatePITResponse createPITResponse = new CreatePITResponse(searchResponse); + CreatePITResponse createPITResponse = new CreatePITResponse(searchResponse, System.currentTimeMillis()); CountDownLatch latch = new CountDownLatch(1); ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { diff --git a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java index 32a2e7b21eb6f..3efdcadb8e09f 100644 --- a/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitMultiNodeTests.java @@ -104,6 +104,8 @@ public Settings onNodeStopped(String nodeName) throws Exception { .setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1))) .get(); assertEquals(1, searchResponse.getSuccessfulShards()); + assertEquals(1, searchResponse.getFailedShards()); + assertEquals(0, searchResponse.getSkippedShards()); assertEquals(2, searchResponse.getTotalShards()); return super.onNodeStopped(nodeName); } diff --git a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java index e5970bee6df10..9bad643a4f0ac 100644 --- a/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/PitSingleNodeTests.java @@ -43,7 +43,7 @@ protected Settings nodeSettings() { return Settings.builder() .put(super.nodeSettings()) .put(SearchService.KEEPALIVE_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(1)) - .put(PITController.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.getKey(), TimeValue.timeValueSeconds(1)) + .put(CreatePITController.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.getKey(), TimeValue.timeValueSeconds(1)) .build(); }