From c28762ea5903d9d04dc043d07d4358d6d9b3fc69 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Tue, 17 May 2022 13:23:26 +0530 Subject: [PATCH] fixing broken tests Signed-off-by: Bharathwaj G --- .../search/CreatePitControllerTests.java | 621 +++++++++++------- 1 file changed, 367 insertions(+), 254 deletions(-) diff --git a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java index a1521cca03640..f07bbe5975535 100644 --- a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java +++ b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java @@ -39,6 +39,10 @@ import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.RemoteClusterConnectionTests; import org.opensearch.transport.Transport; import java.util.Arrays; import java.util.Collections; @@ -47,6 +51,8 @@ import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -68,6 +74,27 @@ public class CreatePitControllerTests extends OpenSearchTestCase { ActionListener createPitListener = null; ClusterService clusterServiceMock = null; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + private MockTransportService startTransport(String id, List knownNodes, Version version) { + return startTransport(id, knownNodes, version, Settings.EMPTY); + } + + private MockTransportService startTransport( + final String id, + final List knownNodes, + final Version version, + final Settings settings + ) { + return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, threadPool, settings); + } + @Before public void setupData() { node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); @@ -141,70 +168,91 @@ public void onFailure(Exception e) { public void testUpdatePitAfterCreatePitSuccess() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); List deleteNodesInvoked = new CopyOnWriteArrayList<>(); - SearchTransportService searchTransportService = new SearchTransportService(null, null) { - @Override - public void updatePitContext( - Transport.Connection connection, - UpdatePitContextRequest request, - ActionListener listener - ) { - updateNodesInvoked.add(connection.getNode()); - Thread t = new Thread(() -> listener.onResponse(new UpdatePitContextResponse("pitid", 500000, 500000))); - t.start(); - } - - /** - * Test if cleanup request is called - */ - @Override - public void sendFreeContext( - Transport.Connection connection, - ShardSearchContextId contextId, - ActionListener listener + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { - deleteNodesInvoked.add(connection.getNode()); - Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); - t.start(); - } - - @Override - public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { - return new SearchAsyncActionTests.MockConnection(node); - } - }; - - CountDownLatch latch = new CountDownLatch(1); - - CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); - request.setIndices(new String[] { "index" }); - CreatePitController controller = new CreatePitController( - request, - searchTransportService, - clusterServiceMock, - transportSearchAction, - namedWriteableRegistry, - task, - createPitListener - ); - - ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { - @Override - public void onResponse(CreatePitResponse createPITResponse) { - assertEquals(3, createPITResponse.getTotalShards()); + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + @Override + public void updatePitContext( + Transport.Connection connection, + UpdatePitContextRequest request, + ActionListener listener + ) { + updateNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new UpdatePitContextResponse("pitid", 500000, 500000))); + t.start(); + } + + /** + * Test if cleanup request is called + */ + @Override + public void sendFreeContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + + CountDownLatch latch = new CountDownLatch(1); + + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + CreatePitController controller = new CreatePitController( + request, + searchTransportService, + clusterServiceMock, + transportSearchAction, + namedWriteableRegistry, + task, + createPitListener + ); + + ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(CreatePitResponse createPITResponse) { + assertEquals(3, createPITResponse.getTotalShards()); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }, latch); + + StepListener createListener = new StepListener<>(); + controller.executeCreatePit(createListener, updatelistener); + createListener.onResponse(searchResponse); + latch.await(); + assertEquals(3, updateNodesInvoked.size()); + assertEquals(0, deleteNodesInvoked.size()); } - - @Override - public void onFailure(Exception e) { - throw new AssertionError(e); - } - }, latch); - - StepListener createListener = new StepListener<>(); - controller.executeCreatePit(createListener, updatelistener); - createListener.onResponse(searchResponse); - latch.await(); - assertEquals(3, updateNodesInvoked.size()); - assertEquals(0, deleteNodesInvoked.size()); + } } /** @@ -213,72 +261,93 @@ public void onFailure(Exception e) { public void testUpdatePitAfterCreatePitFailure() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); List deleteNodesInvoked = new CopyOnWriteArrayList<>(); - SearchTransportService searchTransportService = new SearchTransportService(null, null) { - @Override - public void updatePitContext( - Transport.Connection connection, - UpdatePitContextRequest request, - ActionListener listener + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { - updateNodesInvoked.add(connection.getNode()); - Thread t = new Thread(() -> listener.onResponse(new UpdatePitContextResponse("pitid", 500000, 500000))); - t.start(); - } - - @Override - public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { - return new SearchAsyncActionTests.MockConnection(node); - } - - @Override - public void sendFreeContext( - Transport.Connection connection, - ShardSearchContextId contextId, - ActionListener listener - ) { - deleteNodesInvoked.add(connection.getNode()); - Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); - t.start(); - } - }; - - CountDownLatch latch = new CountDownLatch(1); - - CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); - request.setIndices(new String[] { "index" }); - - CreatePitController controller = new CreatePitController( - request, - searchTransportService, - clusterServiceMock, - transportSearchAction, - namedWriteableRegistry, - task, - createPitListener - ); - - ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { - @Override - public void onResponse(CreatePitResponse createPITResponse) { - throw new AssertionError("on response is called"); + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + @Override + public void updatePitContext( + Transport.Connection connection, + UpdatePitContextRequest request, + ActionListener listener + ) { + updateNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new UpdatePitContextResponse("pitid", 500000, 500000))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + + @Override + public void sendFreeContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + }; + + CountDownLatch latch = new CountDownLatch(1); + + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + + CreatePitController controller = new CreatePitController( + request, + searchTransportService, + clusterServiceMock, + transportSearchAction, + namedWriteableRegistry, + task, + createPitListener + ); + + ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(CreatePitResponse createPITResponse) { + throw new AssertionError("on response is called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e.getCause().getMessage().contains("Exception occurred in phase 1")); + } + }, latch); + + StepListener createListener = new StepListener<>(); + + controller.executeCreatePit(createListener, updatelistener); + createListener.onFailure(new Exception("Exception occurred in phase 1")); + latch.await(); + assertEquals(0, updateNodesInvoked.size()); + /** + * cleanup is not called on create pit phase one failure + */ + assertEquals(0, deleteNodesInvoked.size()); } - - @Override - public void onFailure(Exception e) { - assertTrue(e.getCause().getMessage().contains("Exception occurred in phase 1")); - } - }, latch); - - StepListener createListener = new StepListener<>(); - - controller.executeCreatePit(createListener, updatelistener); - createListener.onFailure(new Exception("Exception occurred in phase 1")); - latch.await(); - assertEquals(0, updateNodesInvoked.size()); - /** - * cleanup is not called on create pit phase one failure - */ - assertEquals(0, deleteNodesInvoked.size()); + } } /** @@ -287,143 +356,187 @@ public void onFailure(Exception e) { public void testUpdatePitFailureForNodeDrop() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); List deleteNodesInvoked = new CopyOnWriteArrayList<>(); - SearchTransportService searchTransportService = new SearchTransportService(null, null) { - @Override - public void updatePitContext( - Transport.Connection connection, - UpdatePitContextRequest request, - ActionListener listener + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { - - updateNodesInvoked.add(connection.getNode()); - if (connection.getNode().getId() == "node_3") { - Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); - t.start(); - } else { - Thread t = new Thread(() -> listener.onResponse(new UpdatePitContextResponse("pitid", 500000, 500000))); - t.start(); - } - } - - @Override - public void sendFreeContext( - Transport.Connection connection, - ShardSearchContextId contextId, - ActionListener listener - ) { - deleteNodesInvoked.add(connection.getNode()); - Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); - t.start(); - } - - @Override - public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { - return new SearchAsyncActionTests.MockConnection(node); - } - }; - CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); - request.setIndices(new String[] { "index" }); - CreatePitController controller = new CreatePitController( - request, - searchTransportService, - clusterServiceMock, - transportSearchAction, - namedWriteableRegistry, - task, - createPitListener - ); - - CountDownLatch latch = new CountDownLatch(1); - - ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { - @Override - public void onResponse(CreatePitResponse createPITResponse) { - throw new AssertionError("response is called"); - } - - @Override - public void onFailure(Exception e) { - assertTrue(e.getMessage().contains("node 3 down")); + transportService.start(); + transportService.acceptIncomingRequests(); + + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + @Override + public void updatePitContext( + Transport.Connection connection, + UpdatePitContextRequest request, + ActionListener listener + ) { + + updateNodesInvoked.add(connection.getNode()); + if (connection.getNode().getId() == "node_3") { + Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); + t.start(); + } else { + Thread t = new Thread(() -> listener.onResponse(new UpdatePitContextResponse("pitid", 500000, 500000))); + t.start(); + } + } + + @Override + public void sendFreeContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + CreatePitController controller = new CreatePitController( + request, + searchTransportService, + clusterServiceMock, + transportSearchAction, + namedWriteableRegistry, + task, + createPitListener + ); + + CountDownLatch latch = new CountDownLatch(1); + + ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(CreatePitResponse createPITResponse) { + throw new AssertionError("response is called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e.getMessage().contains("node 3 down")); + } + }, latch); + + StepListener createListener = new StepListener<>(); + controller.executeCreatePit(createListener, updatelistener); + createListener.onResponse(searchResponse); + latch.await(); + assertEquals(3, updateNodesInvoked.size()); + /** + * check if cleanup is called for all nodes in case of update pit failure + */ + assertEquals(3, deleteNodesInvoked.size()); } - }, latch); - - StepListener createListener = new StepListener<>(); - controller.executeCreatePit(createListener, updatelistener); - createListener.onResponse(searchResponse); - latch.await(); - assertEquals(3, updateNodesInvoked.size()); - /** - * check if cleanup is called for all nodes in case of update pit failure - */ - assertEquals(3, deleteNodesInvoked.size()); + } } public void testUpdatePitFailureWhereAllNodesDown() throws InterruptedException { List updateNodesInvoked = new CopyOnWriteArrayList<>(); List deleteNodesInvoked = new CopyOnWriteArrayList<>(); - SearchTransportService searchTransportService = new SearchTransportService(null, null) { - @Override - public void updatePitContext( - Transport.Connection connection, - UpdatePitContextRequest request, - ActionListener listener - ) { - updateNodesInvoked.add(connection.getNode()); - Thread t = new Thread(() -> listener.onFailure(new Exception("node down"))); - t.start(); - } - - @Override - public void sendFreeContext( - Transport.Connection connection, - ShardSearchContextId contextId, - ActionListener listener + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { - deleteNodesInvoked.add(connection.getNode()); - Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); - t.start(); - } - - @Override - public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { - return new SearchAsyncActionTests.MockConnection(node); + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + @Override + public void updatePitContext( + Transport.Connection connection, + UpdatePitContextRequest request, + ActionListener listener + ) { + updateNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onFailure(new Exception("node down"))); + t.start(); + } + + @Override + public void sendFreeContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + CreatePitController controller = new CreatePitController( + request, + searchTransportService, + clusterServiceMock, + transportSearchAction, + namedWriteableRegistry, + task, + createPitListener + ); + + CountDownLatch latch = new CountDownLatch(1); + + ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(CreatePitResponse createPITResponse) { + throw new AssertionError("response is called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e.getMessage().contains("node down")); + } + }, latch); + + StepListener createListener = new StepListener<>(); + controller.executeCreatePit(createListener, updatelistener); + createListener.onResponse(searchResponse); + latch.await(); + assertEquals(3, updateNodesInvoked.size()); + /** + * check if cleanup is called for all nodes in case of update pit failure + */ + assertEquals(3, deleteNodesInvoked.size()); } - }; - CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); - request.setIndices(new String[] { "index" }); - CreatePitController controller = new CreatePitController( - request, - searchTransportService, - clusterServiceMock, - transportSearchAction, - namedWriteableRegistry, - task, - createPitListener - ); - - CountDownLatch latch = new CountDownLatch(1); - - ActionListener updatelistener = new LatchedActionListener<>(new ActionListener() { - @Override - public void onResponse(CreatePitResponse createPITResponse) { - throw new AssertionError("response is called"); - } - - @Override - public void onFailure(Exception e) { - assertTrue(e.getMessage().contains("node down")); - } - }, latch); - - StepListener createListener = new StepListener<>(); - controller.executeCreatePit(createListener, updatelistener); - createListener.onResponse(searchResponse); - latch.await(); - assertEquals(3, updateNodesInvoked.size()); - /** - * check if cleanup is called for all nodes in case of update pit failure - */ - assertEquals(3, deleteNodesInvoked.size()); + } }