Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Kiran Prakash <[email protected]>
  • Loading branch information
kiranprakash154 committed Sep 2, 2024
1 parent 193dd49 commit 06994fb
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 18 deletions.
22 changes: 20 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,10 @@
import org.opensearch.usage.UsageService;
import org.opensearch.watcher.ResourceWatcherService;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.WorkloadManagementSettings;
import org.opensearch.wlm.WorkloadManagementTransportInterceptor;
import org.opensearch.wlm.listeners.QueryGroupRequestOperationListener;
import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService;

import javax.net.ssl.SNIHostName;

Expand Down Expand Up @@ -1020,7 +1022,17 @@ protected Node(
List<IdentityAwarePlugin> identityAwarePlugins = pluginsService.filterPlugins(IdentityAwarePlugin.class);
identityService.initializeIdentityAwarePlugins(identityAwarePlugins);

final QueryGroupService queryGroupService = new QueryGroupService(); // We will need to replace this with actual instance of the
QueryGroupResourceUsageTrackerService queryGroupResourceUsageTrackerService = new QueryGroupResourceUsageTrackerService(
taskResourceTrackingService
);
WorkloadManagementSettings workloadManagementSettings = new WorkloadManagementSettings(settings, settingsModule.getClusterSettings());
final QueryGroupService queryGroupService = new QueryGroupService(
queryGroupResourceUsageTrackerService,
clusterService,
threadPool,
workloadManagementSettings,
new HashMap<>()
); // We will need to replace this with actual instance of the
// queryGroupService
final QueryGroupRequestOperationListener queryGroupRequestOperationListener = new QueryGroupRequestOperationListener(
queryGroupService,
Expand Down Expand Up @@ -1087,7 +1099,13 @@ protected Node(

WorkloadManagementTransportInterceptor workloadManagementTransportInterceptor = new WorkloadManagementTransportInterceptor(
threadPool,
new QueryGroupService() // We will need to replace this with actual implementation
new QueryGroupService(
queryGroupResourceUsageTrackerService,
clusterService,
threadPool,
workloadManagementSettings,
new HashMap<>()
) // We will need to replace this with actual implementation
);

final Collection<SecureSettingsFactory> secureSettingsFactories = pluginsService.filterPlugins(Plugin.class)
Expand Down
17 changes: 12 additions & 5 deletions server/src/main/java/org/opensearch/wlm/QueryGroupService.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ public class QueryGroupService extends AbstractLifecycleComponent implements Clu
private Set<QueryGroup> activeQueryGroups = new HashSet<>();
private Set<QueryGroup> deletedQueryGroups = new HashSet<>();

protected Set<QueryGroup> getDeletedQueryGroups() {
return deletedQueryGroups;
}

protected Set<QueryGroup> getActiveQueryGroups() {
return activeQueryGroups;
}

/**
* Guice managed constructor
*
Expand Down Expand Up @@ -77,14 +85,13 @@ public QueryGroupService(
protected void doRun() {
Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelResourceUsageViews = queryGroupUsageTracker
.constructQueryGroupLevelUsageViews();
this.activeQueryGroups = getActiveQueryGroups();
this.activeQueryGroups = getActiveQueryGroupsFromClusterState();
DefaultTaskCancellation defaultTaskCancellation = new DefaultTaskCancellation(
workloadManagementSettings,
new DefaultTaskSelectionStrategy(),
queryGroupLevelResourceUsageViews,
activeQueryGroups,
deletedQueryGroups,
workloadManagementSettings.getNodeDuressTrackers()::isNodeInDuress
deletedQueryGroups
);
defaultTaskCancellation.cancelTasks();
}
Expand Down Expand Up @@ -119,7 +126,7 @@ protected void doStop() {
@Override
protected void doClose() throws IOException {}

protected Set<QueryGroup> getActiveQueryGroups() {
protected Set<QueryGroup> getActiveQueryGroupsFromClusterState() {
Map<String, QueryGroup> queryGroups = Metadata.builder(clusterService.state().metadata()).getQueryGroups();
return new HashSet<>(queryGroups.values());
}
Expand Down Expand Up @@ -153,7 +160,7 @@ public void applyClusterState(ClusterChangedEvent event) {
this.deletedQueryGroups.add(deletedQueryGroup);
}
}
}
} // tested

/**
* updates the failure stats for the query group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,19 @@ public class DefaultTaskCancellation {
protected final Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelResourceUsageViews;
protected final Collection<QueryGroup> activeQueryGroups;
protected final Collection<QueryGroup> deletedQueryGroups;
protected BooleanSupplier isNodeInDuress;

public DefaultTaskCancellation(
WorkloadManagementSettings workloadManagementSettings,
DefaultTaskSelectionStrategy defaultTaskSelectionStrategy,
Map<String, QueryGroupLevelResourceUsageView> queryGroupLevelResourceUsageViews,
Collection<QueryGroup> activeQueryGroups,
Collection<QueryGroup> deletedQueryGroups,
BooleanSupplier isNodeInDuress
Collection<QueryGroup> deletedQueryGroups
) {
this.workloadManagementSettings = workloadManagementSettings;
this.defaultTaskSelectionStrategy = defaultTaskSelectionStrategy;
this.queryGroupLevelResourceUsageViews = queryGroupLevelResourceUsageViews;
this.activeQueryGroups = activeQueryGroups;
this.deletedQueryGroups = deletedQueryGroups;
this.isNodeInDuress = isNodeInDuress;
}

/**
Expand All @@ -82,8 +79,12 @@ public final void cancelTasks() {
handleNodeDuress();
}

protected boolean isNodeInDuress() {
return workloadManagementSettings.getNodeDuressTrackers().isNodeInDuress();
}

private void handleNodeDuress() {
if (!isNodeInDuress.getAsBoolean()) {
if (!isNodeInDuress()) {
return;
}
// List of tasks to be executed in order if the node is in duress
Expand All @@ -93,7 +94,7 @@ private void handleNodeDuress() {
);

for (Consumer<Void> duressAction : duressActions) {
if (!isNodeInDuress.getAsBoolean()) {
if (!isNodeInDuress()) {
break;
}
duressAction.accept(null);
Expand Down
134 changes: 134 additions & 0 deletions server/src/test/java/org/opensearch/wlm/QueryGroupServiceTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm;

import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.wlm.stats.QueryGroupState;
import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class QueryGroupServiceTests extends OpenSearchTestCase {
private QueryGroupService queryGroupService;
private QueryGroupResourceUsageTrackerService mockQueryGroupUsageTracker;
private ClusterService mockClusterService;
private ThreadPool mockThreadPool;
private WorkloadManagementSettings mockWorkloadManagementSettings;
private Scheduler.Cancellable mockScheduledFuture;
private Map<String, QueryGroupState> mockQueryGroupStateMap;

@Before
public void setup() {
mockQueryGroupUsageTracker = mock(QueryGroupResourceUsageTrackerService.class);
mockClusterService = mock(ClusterService.class);
mockThreadPool = mock(ThreadPool.class);
mockScheduledFuture = mock(Scheduler.Cancellable.class);
mockWorkloadManagementSettings = mock(WorkloadManagementSettings.class);
mockQueryGroupStateMap = new HashMap<>();

queryGroupService = new QueryGroupService(
mockQueryGroupUsageTracker,
mockClusterService,
mockThreadPool,
mockWorkloadManagementSettings,
mockQueryGroupStateMap
);
}

public void testApplyClusterState() {
ClusterChangedEvent mockClusterChangedEvent = mock(ClusterChangedEvent.class);
ClusterState mockPreviousClusterState = mock(ClusterState.class);
ClusterState mockClusterState = mock(ClusterState.class);
Metadata mockPreviousMetadata = mock(Metadata.class);
Metadata mockMetadata = mock(Metadata.class);
QueryGroup addedQueryGroup = new QueryGroup(
"addedQueryGroup",
"4242",
QueryGroup.ResiliencyMode.ENFORCED,
Map.of(ResourceType.MEMORY, 0.5),
1L
);
QueryGroup deletedQueryGroup = new QueryGroup(
"deletedQueryGroup",
"4241",
QueryGroup.ResiliencyMode.ENFORCED,
Map.of(ResourceType.MEMORY, 0.5),
1L
);
Map<String, QueryGroup> previousQueryGroups = new HashMap<>();
previousQueryGroups.put("4242", addedQueryGroup);
Map<String, QueryGroup> currentQueryGroups = new HashMap<>();
currentQueryGroups.put("4241", deletedQueryGroup);

when(mockClusterChangedEvent.previousState()).thenReturn(mockPreviousClusterState);
when(mockClusterChangedEvent.state()).thenReturn(mockClusterState);
when(mockPreviousClusterState.metadata()).thenReturn(mockPreviousMetadata);
when(mockClusterState.metadata()).thenReturn(mockMetadata);
when(mockPreviousMetadata.queryGroups()).thenReturn(previousQueryGroups);
when(mockMetadata.queryGroups()).thenReturn(currentQueryGroups);
queryGroupService.applyClusterState(mockClusterChangedEvent);

Set<QueryGroup> currentQueryGroupsExpected = Set.of(currentQueryGroups.get("4241"));
Set<QueryGroup> previousQueryGroupsExpected = Set.of(previousQueryGroups.get("4242"));

assertEquals(currentQueryGroupsExpected, queryGroupService.getActiveQueryGroups());
assertEquals(previousQueryGroupsExpected, queryGroupService.getDeletedQueryGroups());
}

public void testDoStart_SchedulesTask() {
when(mockWorkloadManagementSettings.queryGroupServiceEnabled()).thenReturn(true);
when(mockWorkloadManagementSettings.getQueryGroupServiceRunInterval()).thenReturn(TimeValue.timeValueSeconds(1));
queryGroupService.doStart();
verify(mockThreadPool).scheduleWithFixedDelay(any(Runnable.class), any(TimeValue.class), eq(ThreadPool.Names.GENERIC));
}

public void testDoStart_DoesNOTScheduleTask_WhenQueryGroupServiceDisabled() {
when(mockWorkloadManagementSettings.queryGroupServiceEnabled()).thenReturn(false);
when(mockWorkloadManagementSettings.getQueryGroupServiceRunInterval()).thenReturn(TimeValue.timeValueSeconds(1));
queryGroupService.doStart();
verify(mockThreadPool, never()).scheduleWithFixedDelay(any(Runnable.class), any(TimeValue.class), eq(ThreadPool.Names.GENERIC));
}

public void testDoStop_CancelsScheduledTask() {
when(mockWorkloadManagementSettings.queryGroupServiceEnabled()).thenReturn(true);
when(mockThreadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(mockScheduledFuture);
queryGroupService.doStart();
when(mockWorkloadManagementSettings.queryGroupServiceEnabled()).thenReturn(false);
queryGroupService.doStop();
verify(mockScheduledFuture).cancel();
}

public void testDoStop_DoesNOTCancelsScheduledTask_WhenQueryGroupServiceDisabled() {
when(mockWorkloadManagementSettings.queryGroupServiceEnabled()).thenReturn(true);
when(mockThreadPool.scheduleWithFixedDelay(any(), any(), any())).thenReturn(mockScheduledFuture);
queryGroupService.doStart();
when(mockWorkloadManagementSettings.queryGroupServiceEnabled()).thenReturn(true);
queryGroupService.doStop();
verify(mockScheduledFuture, never()).cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,53 @@

package org.opensearch.wlm;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.wlm.WorkloadManagementTransportInterceptor.RequestHandler;
import org.opensearch.wlm.stats.QueryGroupState;
import org.opensearch.wlm.tracker.QueryGroupResourceUsageTrackerService;

import java.util.HashMap;
import java.util.Map;

import static org.mockito.Mockito.mock;
import static org.opensearch.threadpool.ThreadPool.Names.SAME;

public class WorkloadManagementTransportInterceptorTests extends OpenSearchTestCase {

private ThreadPool threadPool;
private WorkloadManagementTransportInterceptor sut;
private QueryGroupService queryGroupService;
private QueryGroupResourceUsageTrackerService mockQueryGroupUsageTracker;
private ClusterService mockClusterService;
private ThreadPool mockThreadPool;
private WorkloadManagementSettings mockWorkloadManagementSettings;
private Scheduler.Cancellable mockScheduledFuture;
private Map<String, QueryGroupState> mockQueryGroupStateMap;

public void setUp() throws Exception {
super.setUp();
mockQueryGroupUsageTracker = mock(QueryGroupResourceUsageTrackerService.class);
mockClusterService = mock(ClusterService.class);
mockThreadPool = mock(ThreadPool.class);
mockScheduledFuture = mock(Scheduler.Cancellable.class);
mockWorkloadManagementSettings = mock(WorkloadManagementSettings.class);
mockQueryGroupStateMap = new HashMap<>();
threadPool = new TestThreadPool(getTestName());
sut = new WorkloadManagementTransportInterceptor(threadPool, new QueryGroupService());
sut = new WorkloadManagementTransportInterceptor(threadPool,
new QueryGroupService(
mockQueryGroupUsageTracker,
mockClusterService,
mockThreadPool,
mockWorkloadManagementSettings,
new HashMap<>()
)
);
}

public void tearDown() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public TestTaskCancellationImpl(
defaultTaskSelectionStrategy,
queryGroupLevelViews,
activeQueryGroups,
deletedQueryGroups,
isNodeInDuress
deletedQueryGroups
);
}
}
Expand Down
Loading

0 comments on commit 06994fb

Please sign in to comment.