diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index b8abb6f96e574..b664d19597c9d 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -83,10 +83,10 @@ public class SearchTransportService { public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]"; - public static final String FREE_CONTEXT_PIT_ACTION_NAME = "indices:data/read/search[free_context/pit]"; + public static final String FREE_PIT_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context/pit]"; public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]"; public static final String CLEAR_SCROLL_CONTEXTS_ACTION_NAME = "indices:data/read/search[clear_scroll_contexts]"; - public static final String DELETE_ALL_PIT_CONTEXTS_ACTION_NAME = "indices:data/read/search[delete_pit_contexts]"; + public static final String FREE_ALL_PIT_CONTEXTS_ACTION_NAME = "indices:data/read/search[delete_pit_contexts]"; public static final String DFS_ACTION_NAME = "indices:data/read/search[phase/dfs]"; public static final String QUERY_ACTION_NAME = "indices:data/read/search[phase/query]"; public static final String QUERY_ID_ACTION_NAME = "indices:data/read/search[phase/query/id]"; @@ -144,20 +144,6 @@ public void sendFreeContext( ); } - public void sendPitFreeContext( - Transport.Connection connection, - ShardSearchContextId contextId, - ActionListener listener - ) { - transportService.sendRequest( - connection, - FREE_CONTEXT_PIT_ACTION_NAME, - new ScrollFreeContextRequest(contextId), - TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new) - ); - } - public void updatePitContext( Transport.Connection connection, UpdatePITContextRequest request, @@ -214,13 +200,27 @@ public void sendClearAllScrollContexts(Transport.Connection connection, final Ac ); } - public void sendDeleteAllPitContexts(Transport.Connection connection, final ActionListener listener) { + public void sendFreePITContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + transportService.sendRequest( + connection, + FREE_PIT_CONTEXT_ACTION_NAME, + new PITFreeContextRequest(contextId), + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new) + ); + } + + public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { transportService.sendRequest( connection, - DELETE_ALL_PIT_CONTEXTS_ACTION_NAME, + FREE_ALL_PIT_CONTEXTS_ACTION_NAME, TransportRequest.Empty.INSTANCE, TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(listener, (in) -> TransportResponse.Empty.INSTANCE) + new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new) ); } @@ -389,6 +389,30 @@ public ShardSearchContextId id() { } + static class PITFreeContextRequest extends TransportRequest { + private ShardSearchContextId contextId; + + PITFreeContextRequest(ShardSearchContextId contextId) { + this.contextId = Objects.requireNonNull(contextId); + } + + PITFreeContextRequest(StreamInput in) throws IOException { + super(in); + contextId = new ShardSearchContextId(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + contextId.writeTo(out); + } + + public ShardSearchContextId id() { + return this.contextId; + } + + } + static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest { private OriginalIndices originalIndices; @@ -465,15 +489,30 @@ public static void registerRequestHandler(TransportService transportService, Sea TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new); transportService.registerRequestHandler( - FREE_CONTEXT_PIT_ACTION_NAME, + FREE_PIT_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, - ScrollFreeContextRequest::new, + PITFreeContextRequest::new, (request, channel, task) -> { boolean freed = searchService.freeReaderContextIfFound(request.id()); channel.sendResponse(new SearchFreeContextResponse(freed)); } ); - TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_PIT_ACTION_NAME, SearchFreeContextResponse::new); + TransportActionProxy.registerProxyAction(transportService, FREE_PIT_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new); + + transportService.registerRequestHandler( + FREE_ALL_PIT_CONTEXTS_ACTION_NAME, + ThreadPool.Names.SAME, + TransportRequest.Empty::new, + (request, channel, task) -> { + boolean freed = searchService.freeAllPitContexts(); + channel.sendResponse(new SearchFreeContextResponse(freed)); + } + ); + TransportActionProxy.registerProxyAction( + transportService, + FREE_ALL_PIT_CONTEXTS_ACTION_NAME, + (in) -> TransportResponse.Empty.INSTANCE + ); transportService.registerRequestHandler( FREE_CONTEXT_ACTION_NAME, @@ -658,21 +697,6 @@ public static void registerRequestHandler(TransportService transportService, Sea ); TransportActionProxy.registerProxyAction(transportService, UPDATE_READER_CONTEXT_ACTION_NAME, UpdatePitContextResponse::new); - transportService.registerRequestHandler( - DELETE_ALL_PIT_CONTEXTS_ACTION_NAME, - ThreadPool.Names.SAME, - TransportRequest.Empty::new, - (request, channel, task) -> { - searchService.freeAllPitContexts(); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - } - ); - TransportActionProxy.registerProxyAction( - transportService, - DELETE_ALL_PIT_CONTEXTS_ACTION_NAME, - (in) -> TransportResponse.Empty.INSTANCE - ); - } /** diff --git a/server/src/main/java/org/opensearch/action/search/TransportDeletePITAction.java b/server/src/main/java/org/opensearch/action/search/TransportDeletePITAction.java index 918b5a791c9f0..5785148913337 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportDeletePITAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportDeletePITAction.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.util.SetOnce; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; @@ -22,7 +23,6 @@ import org.opensearch.common.Strings; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.NamedWriteableRegistry; -import org.opensearch.search.SearchService; import org.opensearch.tasks.Task; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportService; @@ -38,7 +38,6 @@ * Transport action for deleting pit reader context - supports deleting list and all pit contexts */ public class TransportDeletePITAction extends HandledTransportAction { - private SearchService searchService; private final NamedWriteableRegistry namedWriteableRegistry; private TransportSearchAction transportSearchAction; private final ClusterService clusterService; @@ -47,7 +46,6 @@ public class TransportDeletePITAction extends HandledTransportAction listener) { int size = clusterService.state().getNodes().getSize(); - ActionListener groupedActionListener = getGroupedListener(listener, size); + ActionListener groupedActionListener = new GroupedActionListener( + new ActionListener<>() { + @Override + public void onResponse(final Collection responses) { + final SetOnce succeeded = new SetOnce<>(); + for (SearchTransportService.SearchFreeContextResponse response : responses) { + if (!response.isFreed()) { + succeeded.set(false); + break; + } + } + succeeded.trySet(true); + listener.onResponse(new DeletePITResponse(succeeded.get())); + } + + @Override + public void onFailure(final Exception e) { + logger.debug("Delete all PITs failed ", e); + listener.onResponse(new DeletePITResponse(false)); + } + }, + size + ); for (final DiscoveryNode node : clusterService.state().getNodes()) { try { Transport.Connection connection = searchTransportService.getConnection(null, node); - searchTransportService.sendDeleteAllPitContexts(connection, groupedActionListener); + searchTransportService.sendFreeAllPitContexts(connection, groupedActionListener); } catch (Exception e) { groupedActionListener.onFailure(e); } @@ -123,11 +142,11 @@ void deletePits(List contexts, ActionListener l for (SearchContextIdForNode contextId : contexts) { final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode()); if (node == null) { - groupedListener.onFailure(new OpenSearchException("node not connected")); + groupedListener.onFailure(new OpenSearchException("node not found")); } else { try { final Transport.Connection connection = searchTransportService.getConnection(contextId.getClusterAlias(), node); - searchTransportService.sendPitFreeContext( + searchTransportService.sendFreePITContext( connection, contextId.getSearchContextId(), ActionListener.wrap(r -> groupedListener.onResponse(r.isFreed()), e -> groupedListener.onResponse(false)) @@ -154,19 +173,4 @@ private StepListener> getLookupListene } return lookupListener; } - - private ActionListener getGroupedListener(ActionListener deletePitListener, int size) { - return new GroupedActionListener<>(new ActionListener<>() { - @Override - public void onResponse(final Collection responses) { - deletePitListener.onResponse(new DeletePITResponse(true)); - } - - @Override - public void onFailure(final Exception e) { - logger.debug("Delete all PITs failed ", e); - deletePitListener.onResponse(new DeletePITResponse(false)); - } - }, size); - } } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 77fa1fe50a83c..c196f68e28baa 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TopDocs; +import org.apache.lucene.util.SetOnce; import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; @@ -1034,16 +1035,21 @@ public boolean freeReaderContextIfFound(ShardSearchContextId contextId) { return true; } - /** * Free all active pit contexts */ - public void freeAllPitContexts() { + public boolean freeAllPitContexts() { + final SetOnce isFreed = new SetOnce<>(); for (ReaderContext readerContext : activeReaders.values()) { if (readerContext instanceof PitReaderContext) { - freeReaderContextIfFound(readerContext.id()); + final boolean succeeded = freeReaderContextIfFound(readerContext.id()); + if (!succeeded) { + isFreed.trySet(false); + } } } + isFreed.trySet(true); + return isFreed.get(); } /** diff --git a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java index 9742a58346e73..9f9b22a3b9d7d 100644 --- a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java +++ b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java @@ -69,7 +69,7 @@ public void setupData() { node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); - setPitId(); + pitId = getPitId(); namedWriteableRegistry = new NamedWriteableRegistry( Arrays.asList( new NamedWriteableRegistry.Entry(QueryBuilder.class, TermQueryBuilder.NAME, TermQueryBuilder::new), @@ -431,7 +431,7 @@ public void onFailure(Exception e) { } - QueryBuilder randomQueryBuilder() { + public static QueryBuilder randomQueryBuilder() { if (randomBoolean()) { return new TermQueryBuilder(randomAlphaOfLength(10), randomAlphaOfLength(10)); } else if (randomBoolean()) { @@ -441,21 +441,21 @@ QueryBuilder randomQueryBuilder() { } } - private void setPitId() { + public static String getPitId() { AtomicArray array = new AtomicArray<>(3); SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult( new ShardSearchContextId("a", 1), - node1 + null ); testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), null, null)); SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult( new ShardSearchContextId("b", 12), - node2 + null ); testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), null, null)); SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult( new ShardSearchContextId("c", 42), - node3 + null ); testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null)); array.setOnce(0, testSearchPhaseResult1); @@ -477,7 +477,7 @@ private void setPitId() { aliasFilters.put(result.getSearchShardTarget().getShardId().getIndex().getUUID(), aliasFilter); } } - pitId = SearchContextId.encode(array.asList(), aliasFilters, version); + return SearchContextId.encode(array.asList(), aliasFilters, version); } } diff --git a/server/src/test/java/org/opensearch/action/search/TransportDeletePITActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportDeletePITActionTests.java new file mode 100644 index 0000000000000..347c5a11630de --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/TransportDeletePITActionTests.java @@ -0,0 +1,584 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.apache.lucene.search.TotalHits; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilter; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.UUIDs; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.query.IdsQueryBuilder; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; +import org.opensearch.search.aggregations.InternalAggregations; +import org.opensearch.search.internal.InternalSearchResponse; +import org.opensearch.search.internal.ShardSearchContextId; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskId; +import org.opensearch.tasks.TaskManager; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.Transport; +import org.opensearch.transport.TransportResponse; +import org.opensearch.transport.TransportService; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.action.support.PlainActionFuture.newFuture; + +public class TransportDeletePITActionTests extends OpenSearchTestCase { + + DiscoveryNode node1 = null; + DiscoveryNode node2 = null; + DiscoveryNode node3 = null; + String pitId = null; + TransportSearchAction transportSearchAction = null; + Task task = null; + DiscoveryNodes nodes = null; + NamedWriteableRegistry namedWriteableRegistry = null; + ClusterService clusterServiceMock = null; + + @Before + public void setupData() { + node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); + node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); + node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); + pitId = CreatePitControllerTests.getPitId(); + namedWriteableRegistry = new NamedWriteableRegistry( + Arrays.asList( + new NamedWriteableRegistry.Entry(QueryBuilder.class, TermQueryBuilder.NAME, TermQueryBuilder::new), + new NamedWriteableRegistry.Entry(QueryBuilder.class, MatchAllQueryBuilder.NAME, MatchAllQueryBuilder::new), + new NamedWriteableRegistry.Entry(QueryBuilder.class, IdsQueryBuilder.NAME, IdsQueryBuilder::new) + ) + ); + nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); + transportSearchAction = mock(TransportSearchAction.class); + task = new Task( + randomLong(), + "transport", + SearchAction.NAME, + "description", + new TaskId(randomLong() + ":" + randomLong()), + Collections.emptyMap() + ); + InternalSearchResponse response = new InternalSearchResponse( + new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN), + InternalAggregations.EMPTY, + null, + null, + false, + null, + 1 + ); + + clusterServiceMock = mock(ClusterService.class); + ClusterState state = mock(ClusterState.class); + + final Settings keepAliveSettings = Settings.builder() + .put(CreatePITController.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.getKey(), 30000) + .build(); + when(clusterServiceMock.getSettings()).thenReturn(keepAliveSettings); + + when(state.getMetadata()).thenReturn(Metadata.EMPTY_METADATA); + when(state.metadata()).thenReturn(Metadata.EMPTY_METADATA); + when(clusterServiceMock.state()).thenReturn(state); + when(state.getNodes()).thenReturn(nodes); + } + + /** + * Test if transport call for update pit is made to all nodes present as part of PIT ID returned from phase one of create pit + */ + public void testDeletePitSuccess() throws InterruptedException, ExecutionException { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + ThreadPool threadPool = new ThreadPool(settings); + try { + SearchTransportService searchTransportService = new SearchTransportService(null, null) { + + @Override + public void sendFreePITContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + + TransportService transportService = new TransportService( + Settings.EMPTY, + mock(Transport.class), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), + null, + Collections.emptySet() + ) { + @Override + public TaskManager getTaskManager() { + return taskManager; + } + }; + TransportDeletePITAction action = new TransportDeletePITAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePITRequest deletePITRequest = new DeletePITRequest(pitId); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePITResponse dr = future.get(); + assertEquals(true, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + } finally { + assertTrue(OpenSearchTestCase.terminate(threadPool)); + } + } + + public void testDeleteAllPITSuccess() throws InterruptedException, ExecutionException { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + ThreadPool threadPool = new ThreadPool(settings); + try { + SearchTransportService searchTransportService = new SearchTransportService(null, null) { + public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + + TransportService transportService = new TransportService( + Settings.EMPTY, + mock(Transport.class), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), + null, + Collections.emptySet() + ) { + @Override + public TaskManager getTaskManager() { + return taskManager; + } + }; + TransportDeletePITAction action = new TransportDeletePITAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePITRequest deletePITRequest = new DeletePITRequest("_all"); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePITResponse dr = future.get(); + assertEquals(true, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + } finally { + assertTrue(OpenSearchTestCase.terminate(threadPool)); + } + } + + public void testDeletePitWhenNodeIsDown() throws InterruptedException, ExecutionException { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + ThreadPool threadPool = new ThreadPool(settings); + try { + SearchTransportService searchTransportService = new SearchTransportService(null, null) { + + @Override + public void sendFreePITContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + + if (connection.getNode().getId() == "node_3") { + Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); + t.start(); + } else { + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + TransportService transportService = new TransportService( + Settings.EMPTY, + mock(Transport.class), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), + null, + Collections.emptySet() + ) { + @Override + public TaskManager getTaskManager() { + return taskManager; + } + }; + TransportDeletePITAction action = new TransportDeletePITAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePITRequest deletePITRequest = new DeletePITRequest(pitId); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePITResponse dr = future.get(); + assertEquals(false, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + } finally { + assertTrue(OpenSearchTestCase.terminate(threadPool)); + } + } + + public void testDeletePitWhenAllNodesAreDown() throws InterruptedException, ExecutionException { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + ThreadPool threadPool = new ThreadPool(settings); + try { + SearchTransportService searchTransportService = new SearchTransportService(null, null) { + + @Override + public void sendFreePITContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + TransportService transportService = new TransportService( + Settings.EMPTY, + mock(Transport.class), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), + null, + Collections.emptySet() + ) { + @Override + public TaskManager getTaskManager() { + return taskManager; + } + }; + TransportDeletePITAction action = new TransportDeletePITAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePITRequest deletePITRequest = new DeletePITRequest(pitId); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePITResponse dr = future.get(); + assertEquals(false, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + } finally { + assertTrue(OpenSearchTestCase.terminate(threadPool)); + } + } + + public void testDeletePitFailure() throws InterruptedException, ExecutionException { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + ThreadPool threadPool = new ThreadPool(settings); + try { + SearchTransportService searchTransportService = new SearchTransportService(null, null) { + + @Override + public void sendFreePITContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + + if (connection.getNode().getId() == "node_3") { + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(false))); + t.start(); + } else { + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + TransportService transportService = new TransportService( + Settings.EMPTY, + mock(Transport.class), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), + null, + Collections.emptySet() + ) { + @Override + public TaskManager getTaskManager() { + return taskManager; + } + }; + TransportDeletePITAction action = new TransportDeletePITAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePITRequest deletePITRequest = new DeletePITRequest(pitId); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePITResponse dr = future.get(); + assertEquals(false, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + } finally { + assertTrue(OpenSearchTestCase.terminate(threadPool)); + } + } + + public void testDeleteAllPitWhenNodeIsDown() throws InterruptedException, ExecutionException { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + ThreadPool threadPool = new ThreadPool(settings); + try { + SearchTransportService searchTransportService = new SearchTransportService(null, null) { + @Override + public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + deleteNodesInvoked.add(connection.getNode()); + if (connection.getNode().getId() == "node_3") { + Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); + t.start(); + } else { + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + TransportService transportService = new TransportService( + Settings.EMPTY, + mock(Transport.class), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), + null, + Collections.emptySet() + ) { + @Override + public TaskManager getTaskManager() { + return taskManager; + } + }; + TransportDeletePITAction action = new TransportDeletePITAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePITRequest deletePITRequest = new DeletePITRequest("_all"); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePITResponse dr = future.get(); + assertEquals(false, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + } finally { + assertTrue(OpenSearchTestCase.terminate(threadPool)); + } + } + + public void testDeleteAllPitWhenAllNodesAreDown() throws InterruptedException, ExecutionException { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + ThreadPool threadPool = new ThreadPool(settings); + try { + SearchTransportService searchTransportService = new SearchTransportService(null, null) { + + @Override + public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onFailure(new Exception("node down"))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + TransportService transportService = new TransportService( + Settings.EMPTY, + mock(Transport.class), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), + null, + Collections.emptySet() + ) { + @Override + public TaskManager getTaskManager() { + return taskManager; + } + }; + TransportDeletePITAction action = new TransportDeletePITAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePITRequest deletePITRequest = new DeletePITRequest("_all"); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePITResponse dr = future.get(); + assertEquals(false, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + } finally { + assertTrue(OpenSearchTestCase.terminate(threadPool)); + } + } + + public void testDeleteAllPitFailure() throws InterruptedException, ExecutionException { + List deleteNodesInvoked = new CopyOnWriteArrayList<>(); + Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); + ActionFilters actionFilters = mock(ActionFilters.class); + when(actionFilters.filters()).thenReturn(new ActionFilter[0]); + ThreadPool threadPool = new ThreadPool(settings); + try { + SearchTransportService searchTransportService = new SearchTransportService(null, null) { + + public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + deleteNodesInvoked.add(connection.getNode()); + if (connection.getNode().getId() == "node_3") { + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(false))); + t.start(); + } else { + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + TransportService transportService = new TransportService( + Settings.EMPTY, + mock(Transport.class), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), + null, + Collections.emptySet() + ) { + @Override + public TaskManager getTaskManager() { + return taskManager; + } + }; + TransportDeletePITAction action = new TransportDeletePITAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePITRequest deletePITRequest = new DeletePITRequest("_all"); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePITResponse dr = future.get(); + assertEquals(false, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + } finally { + assertTrue(OpenSearchTestCase.terminate(threadPool)); + } + } + +} diff --git a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java index a117e891c6e6d..9bfb3a76220bc 100644 --- a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java @@ -76,6 +76,24 @@ public void testDeletePit() throws Exception { } + public void testDeleteAllPits() throws Exception { + createPitOnIndex("index"); + createIndex("index1", Settings.builder().put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build()); + client().prepareIndex("index1").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).execute().get(); + ensureGreen(); + createPitOnIndex("index1"); + DeletePITRequest deletePITRequest = new DeletePITRequest("_all"); + + /** + * When we invoke delete again, returns success after clearing the remaining readers. Asserting reader context + * not found exceptions don't result in failures ( as deletion in one node is successful ) + */ + ActionFuture execute = client().execute(DeletePITAction.INSTANCE, deletePITRequest); + DeletePITResponse deletePITResponse = execute.get(); + assertTrue(deletePITResponse.isSucceeded()); + client().admin().indices().prepareDelete("index1").get(); + } + public void testDeletePitWhileNodeDrop() throws Exception { CreatePITResponse pitResponse = createPitOnIndex("index"); createIndex("index1", Settings.builder().put("index.number_of_shards", 5).put("index.number_of_replicas", 1).build());