From 06994fbc3bdb2c51b826f082e7cd19af9a9ee43b Mon Sep 17 00:00:00 2001 From: Kiran Prakash Date: Mon, 2 Sep 2024 20:05:04 +0530 Subject: [PATCH] refactor Signed-off-by: Kiran Prakash --- .../main/java/org/opensearch/node/Node.java | 22 ++- .../org/opensearch/wlm/QueryGroupService.java | 17 ++- .../cancellation/DefaultTaskCancellation.java | 13 +- .../wlm/QueryGroupServiceTests.java | 134 ++++++++++++++++++ ...adManagementTransportInterceptorTests.java | 31 +++- .../DefaultTaskCancellationTests.java | 3 +- ...eryGroupRequestOperationListenerTests.java | 32 ++++- 7 files changed, 234 insertions(+), 18 deletions(-) create mode 100644 server/src/test/java/org/opensearch/wlm/QueryGroupServiceTests.java diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 6ce7e2678d6d0..c93d94f42b69a 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -269,8 +269,10 @@ import org.opensearch.usage.UsageService; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.wlm.QueryGroupService; +import org.opensearch.wlm.WorkloadManagementSettings; import org.opensearch.wlm.WorkloadManagementTransportInterceptor; import org.opensearch.wlm.listeners.QueryGroupRequestOperationListener; +import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService; import javax.net.ssl.SNIHostName; @@ -1020,7 +1022,17 @@ protected Node( List identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class); identityService.initializeIdentityAwarePlugins(identityAwarePlugins); - final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the + QueryGroupResourceUsageTrackerService queryGroupResourceUsageTrackerService = new QueryGroupResourceUsageTrackerService( + taskResourceTrackingService + ); + WorkloadManagementSettings workloadManagementSettings = new WorkloadManagementSettings(settings, settingsModule.getClusterSettings()); + final QueryGroupService queryGroupService = new QueryGroupService( + queryGroupResourceUsageTrackerService, + clusterService, + threadPool, + workloadManagementSettings, + new HashMap<>() + ); // We will need to replace this with actual instance of the // queryGroupService final QueryGroupRequestOperationListener queryGroupRequestOperationListener = new QueryGroupRequestOperationListener( queryGroupService, @@ -1087,7 +1099,13 @@ protected Node( WorkloadManagementTransportInterceptor workloadManagementTransportInterceptor = new WorkloadManagementTransportInterceptor( threadPool, - new QueryGroupService() // We will need to replace this with actual implementation + new QueryGroupService( + queryGroupResourceUsageTrackerService, + clusterService, + threadPool, + workloadManagementSettings, + new HashMap<>() + ) // We will need to replace this with actual implementation ); final Collection secureSettingsFactories = pluginsService.filterPlugins(Plugin.class) diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java index 6215855361f1e..dfaf8bf8280f0 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -50,6 +50,14 @@ public class QueryGroupService extends AbstractLifecycleComponent implements Clu private Set activeQueryGroups = new HashSet<>(); private Set deletedQueryGroups = new HashSet<>(); + protected Set getDeletedQueryGroups() { + return deletedQueryGroups; + } + + protected Set getActiveQueryGroups() { + return activeQueryGroups; + } + /** * Guice managed constructor * @@ -77,14 +85,13 @@ public QueryGroupService( protected void doRun() { Map queryGroupLevelResourceUsageViews = queryGroupUsageTracker .constructQueryGroupLevelUsageViews(); - this.activeQueryGroups = getActiveQueryGroups(); + this.activeQueryGroups = getActiveQueryGroupsFromClusterState(); DefaultTaskCancellation defaultTaskCancellation = new DefaultTaskCancellation( workloadManagementSettings, new DefaultTaskSelectionStrategy(), queryGroupLevelResourceUsageViews, activeQueryGroups, - deletedQueryGroups, - workloadManagementSettings.getNodeDuressTrackers()::isNodeInDuress + deletedQueryGroups ); defaultTaskCancellation.cancelTasks(); } @@ -119,7 +126,7 @@ protected void doStop() { @Override protected void doClose() throws IOException {} - protected Set getActiveQueryGroups() { + protected Set getActiveQueryGroupsFromClusterState() { Map queryGroups = Metadata.builder(clusterService.state().metadata()).getQueryGroups(); return new HashSet<>(queryGroups.values()); } @@ -153,7 +160,7 @@ public void applyClusterState(ClusterChangedEvent event) { this.deletedQueryGroups.add(deletedQueryGroup); } } - } + } // tested /** * updates the failure stats for the query group diff --git a/server/src/main/java/org/opensearch/wlm/cancellation/DefaultTaskCancellation.java b/server/src/main/java/org/opensearch/wlm/cancellation/DefaultTaskCancellation.java index 0b6666bb56ddc..0b068abc7bde9 100644 --- a/server/src/main/java/org/opensearch/wlm/cancellation/DefaultTaskCancellation.java +++ b/server/src/main/java/org/opensearch/wlm/cancellation/DefaultTaskCancellation.java @@ -54,22 +54,19 @@ public class DefaultTaskCancellation { protected final Map queryGroupLevelResourceUsageViews; protected final Collection activeQueryGroups; protected final Collection deletedQueryGroups; - protected BooleanSupplier isNodeInDuress; public DefaultTaskCancellation( WorkloadManagementSettings workloadManagementSettings, DefaultTaskSelectionStrategy defaultTaskSelectionStrategy, Map queryGroupLevelResourceUsageViews, Collection activeQueryGroups, - Collection deletedQueryGroups, - BooleanSupplier isNodeInDuress + Collection deletedQueryGroups ) { this.workloadManagementSettings = workloadManagementSettings; this.defaultTaskSelectionStrategy = defaultTaskSelectionStrategy; this.queryGroupLevelResourceUsageViews = queryGroupLevelResourceUsageViews; this.activeQueryGroups = activeQueryGroups; this.deletedQueryGroups = deletedQueryGroups; - this.isNodeInDuress = isNodeInDuress; } /** @@ -82,8 +79,12 @@ public final void cancelTasks() { handleNodeDuress(); } + protected boolean isNodeInDuress() { + return workloadManagementSettings.getNodeDuressTrackers().isNodeInDuress(); + } + private void handleNodeDuress() { - if (!isNodeInDuress.getAsBoolean()) { + if (!isNodeInDuress()) { return; } // List of tasks to be executed in order if the node is in duress @@ -93,7 +94,7 @@ private void handleNodeDuress() { ); for (Consumer duressAction : duressActions) { - if (!isNodeInDuress.getAsBoolean()) { + if (!isNodeInDuress()) { break; } duressAction.accept(null); diff --git a/server/src/test/java/org/opensearch/wlm/QueryGroupServiceTests.java b/server/src/test/java/org/opensearch/wlm/QueryGroupServiceTests.java new file mode 100644 index 0000000000000..9ee9897516260 --- /dev/null +++ b/server/src/test/java/org/opensearch/wlm/QueryGroupServiceTests.java @@ -0,0 +1,134 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.wlm; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.wlm.stats.QueryGroupState; +import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class QueryGroupServiceTests extends OpenSearchTestCase { + private QueryGroupService queryGroupService; + private QueryGroupResourceUsageTrackerService mockQueryGroupUsageTracker; + private ClusterService mockClusterService; + private ThreadPool mockThreadPool; + private WorkloadManagementSettings mockWorkloadManagementSettings; + private Scheduler.Cancellable mockScheduledFuture; + private Map mockQueryGroupStateMap; + + @Before + public void setup() { + mockQueryGroupUsageTracker = mock(QueryGroupResourceUsageTrackerService.class); + mockClusterService = mock(ClusterService.class); + mockThreadPool = mock(ThreadPool.class); + mockScheduledFuture = mock(Scheduler.Cancellable.class); + mockWorkloadManagementSettings = mock(WorkloadManagementSettings.class); + mockQueryGroupStateMap = new HashMap<>(); + + queryGroupService = new QueryGroupService( + mockQueryGroupUsageTracker, + mockClusterService, + mockThreadPool, + mockWorkloadManagementSettings, + mockQueryGroupStateMap + ); + } + + public void testApplyClusterState() { + ClusterChangedEvent mockClusterChangedEvent = mock(ClusterChangedEvent.class); + ClusterState mockPreviousClusterState = mock(ClusterState.class); + ClusterState mockClusterState = mock(ClusterState.class); + Metadata mockPreviousMetadata = mock(Metadata.class); + Metadata mockMetadata = mock(Metadata.class); + QueryGroup addedQueryGroup = new QueryGroup( + "addedQueryGroup", + "4242", + QueryGroup.ResiliencyMode.ENFORCED, + Map.of(ResourceType.MEMORY, 0.5), + 1L + ); + QueryGroup deletedQueryGroup = new QueryGroup( + "deletedQueryGroup", + "4241", + QueryGroup.ResiliencyMode.ENFORCED, + Map.of(ResourceType.MEMORY, 0.5), + 1L + ); + Map previousQueryGroups = new HashMap<>(); + previousQueryGroups.put("4242", addedQueryGroup); + Map currentQueryGroups = new HashMap<>(); + currentQueryGroups.put("4241", deletedQueryGroup); + + when(mockClusterChangedEvent.previousState()).thenReturn(mockPreviousClusterState); + when(mockClusterChangedEvent.state()).thenReturn(mockClusterState); + when(mockPreviousClusterState.metadata()).thenReturn(mockPreviousMetadata); + when(mockClusterState.metadata()).thenReturn(mockMetadata); + when(mockPreviousMetadata.queryGroups()).thenReturn(previousQueryGroups); + when(mockMetadata.queryGroups()).thenReturn(currentQueryGroups); + queryGroupService.applyClusterState(mockClusterChangedEvent); + + Set currentQueryGroupsExpected = Set.of(currentQueryGroups.get("4241")); + Set previousQueryGroupsExpected = Set.of(previousQueryGroups.get("4242")); + + assertEquals(currentQueryGroupsExpected, queryGroupService.getActiveQueryGroups()); + assertEquals(previousQueryGroupsExpected, queryGroupService.getDeletedQueryGroups()); + } + + public void testDoStart_SchedulesTask() { + when(mockWorkloadManagementSettings.queryGroupServiceEnabled()).thenReturn(true); + when(mockWorkloadManagementSettings.getQueryGroupServiceRunInterval()).thenReturn(TimeValue.timeValueSeconds(1)); + queryGroupService.doStart(); + verify(mockThreadPool).scheduleWithFixedDelay(any(Runnable.class), any(TimeValue.class), eq(ThreadPool.Names.GENERIC)); + } + + public void testDoStart_DoesNOTScheduleTask_WhenQueryGroupServiceDisabled() { + when(mockWorkloadManagementSettings.queryGroupServiceEnabled()).thenReturn(false); + when(mockWorkloadManagementSettings.getQueryGroupServiceRunInterval()).thenReturn(TimeValue.timeValueSeconds(1)); + queryGroupService.doStart(); + verify(mockThreadPool, never()).scheduleWithFixedDelay(any(Runnable.class), any(TimeValue.class), eq(ThreadPool.Names.GENERIC)); + } + + public void testDoStop_CancelsScheduledTask() { + when(mockWorkloadManagementSettings.queryGroupServiceEnabled()).thenReturn(true); + when(mockThreadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(mockScheduledFuture); + queryGroupService.doStart(); + when(mockWorkloadManagementSettings.queryGroupServiceEnabled()).thenReturn(false); + queryGroupService.doStop(); + verify(mockScheduledFuture).cancel(); + } + + public void testDoStop_DoesNOTCancelsScheduledTask_WhenQueryGroupServiceDisabled() { + when(mockWorkloadManagementSettings.queryGroupServiceEnabled()).thenReturn(true); + when(mockThreadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(mockScheduledFuture); + queryGroupService.doStart(); + when(mockWorkloadManagementSettings.queryGroupServiceEnabled()).thenReturn(true); + queryGroupService.doStop(); + verify(mockScheduledFuture, never()).cancel(); + } +} \ No newline at end of file diff --git a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java index 4668b845150a9..c4accc32af6cc 100644 --- a/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java +++ b/server/src/test/java/org/opensearch/wlm/WorkloadManagementTransportInterceptorTests.java @@ -8,24 +8,53 @@ package org.opensearch.wlm; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportRequestHandler; import org.opensearch.wlm.WorkloadManagementTransportInterceptor.RequestHandler; +import org.opensearch.wlm.stats.QueryGroupState; +import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.mock; import static org.opensearch.threadpool.ThreadPool.Names.SAME; public class WorkloadManagementTransportInterceptorTests extends OpenSearchTestCase { private ThreadPool threadPool; private WorkloadManagementTransportInterceptor sut; + private QueryGroupService queryGroupService; + private QueryGroupResourceUsageTrackerService mockQueryGroupUsageTracker; + private ClusterService mockClusterService; + private ThreadPool mockThreadPool; + private WorkloadManagementSettings mockWorkloadManagementSettings; + private Scheduler.Cancellable mockScheduledFuture; + private Map mockQueryGroupStateMap; public void setUp() throws Exception { super.setUp(); + mockQueryGroupUsageTracker = mock(QueryGroupResourceUsageTrackerService.class); + mockClusterService = mock(ClusterService.class); + mockThreadPool = mock(ThreadPool.class); + mockScheduledFuture = mock(Scheduler.Cancellable.class); + mockWorkloadManagementSettings = mock(WorkloadManagementSettings.class); + mockQueryGroupStateMap = new HashMap<>(); threadPool = new TestThreadPool(getTestName()); - sut = new WorkloadManagementTransportInterceptor(threadPool, new QueryGroupService()); + sut = new WorkloadManagementTransportInterceptor(threadPool, + new QueryGroupService( + mockQueryGroupUsageTracker, + mockClusterService, + mockThreadPool, + mockWorkloadManagementSettings, + new HashMap<>() + ) + ); } public void tearDown() throws Exception { diff --git a/server/src/test/java/org/opensearch/wlm/cancellation/DefaultTaskCancellationTests.java b/server/src/test/java/org/opensearch/wlm/cancellation/DefaultTaskCancellationTests.java index 5ab32c1086b8b..5b8c1beb1a9a2 100644 --- a/server/src/test/java/org/opensearch/wlm/cancellation/DefaultTaskCancellationTests.java +++ b/server/src/test/java/org/opensearch/wlm/cancellation/DefaultTaskCancellationTests.java @@ -52,8 +52,7 @@ public TestTaskCancellationImpl( defaultTaskSelectionStrategy, queryGroupLevelViews, activeQueryGroups, - deletedQueryGroups, - isNodeInDuress + deletedQueryGroups ); } } diff --git a/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListenerTests.java b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListenerTests.java index 0307ff623c408..e80d7841a59d2 100644 --- a/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListenerTests.java +++ b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListenerTests.java @@ -8,16 +8,20 @@ package org.opensearch.wlm.listeners; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.wlm.QueryGroupService; import org.opensearch.wlm.QueryGroupTask; import org.opensearch.wlm.ResourceType; +import org.opensearch.wlm.WorkloadManagementSettings; import org.opensearch.wlm.stats.QueryGroupState; import org.opensearch.wlm.stats.QueryGroupStats; +import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService; import java.io.IOException; import java.util.ArrayList; @@ -33,6 +37,12 @@ public class QueryGroupRequestOperationListenerTests extends OpenSearchTestCase public static final int ITERATIONS = 20; ThreadPool testThreadPool; QueryGroupService queryGroupService; + private QueryGroupResourceUsageTrackerService mockQueryGroupUsageTracker; + private ClusterService mockClusterService; + private ThreadPool mockThreadPool; + private WorkloadManagementSettings mockWorkloadManagementSettings; + private Scheduler.Cancellable mockScheduledFuture; + private Map mockQueryGroupStateMap; Map queryGroupStateMap; String testQueryGroupId; @@ -40,6 +50,12 @@ public class QueryGroupRequestOperationListenerTests extends OpenSearchTestCase public void setUp() throws Exception { super.setUp(); + mockQueryGroupUsageTracker = mock(QueryGroupResourceUsageTrackerService.class); + mockClusterService = mock(ClusterService.class); + mockThreadPool = mock(ThreadPool.class); + mockScheduledFuture = mock(Scheduler.Cancellable.class); + mockWorkloadManagementSettings = mock(WorkloadManagementSettings.class); + mockQueryGroupStateMap = new HashMap<>(); queryGroupStateMap = new HashMap<>(); testQueryGroupId = "safjgagnakg-3r3fads"; testThreadPool = new TestThreadPool("RejectionTestThreadPool"); @@ -94,7 +110,13 @@ public void testMultiThreadedValidQueryGroupRequestFailures() { queryGroupStateMap.put(testQueryGroupId, new QueryGroupState()); - queryGroupService = new QueryGroupService(queryGroupStateMap); + queryGroupService = new QueryGroupService( + mockQueryGroupUsageTracker, + mockClusterService, + mockThreadPool, + mockWorkloadManagementSettings, + new HashMap<>() + ); sut = new QueryGroupRequestOperationListener(queryGroupService, testThreadPool); @@ -174,7 +196,13 @@ private void assertSuccess( testThreadPool.getThreadContext().putHeader(QueryGroupTask.QUERY_GROUP_ID_HEADER, threadContextQG_Id); queryGroupStateMap.put(testQueryGroupId, new QueryGroupState()); - queryGroupService = new QueryGroupService(queryGroupStateMap); + queryGroupService = new QueryGroupService( + mockQueryGroupUsageTracker, + mockClusterService, + mockThreadPool, + mockWorkloadManagementSettings, + new HashMap<>() + ); sut = new QueryGroupRequestOperationListener(queryGroupService, testThreadPool); sut.onRequestFailure(null, null);