Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Kiran Prakash <[email protected]>
  • Loading branch information
kiranprakash154 committed Aug 30, 2024
1 parent 5c82af3 commit 36c349a
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 36 deletions.
81 changes: 45 additions & 36 deletions server/src/main/java/org/opensearch/wlm/QueryGroupService.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -38,16 +40,14 @@
/**
* Main service which will run periodically to track and cancel resource constraint violating tasks in QueryGroups
*/
public class QueryGroupService extends AbstractLifecycleComponent {
public class QueryGroupService extends AbstractLifecycleComponent implements ClusterStateApplier {
private static final Logger logger = LogManager.getLogger(QueryGroupService.class);

private final QueryGroupResourceUsageTrackerService queryGroupUsageTracker;
private final TimeValue queryGroupServiceRunIntervalInMillis;
private volatile Scheduler.Cancellable scheduledFuture;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final ClusterSettings clusterSettings;
private final Settings settings;
private final WorkloadManagementSettings workloadManagementSettings;
private Set<QueryGroup> activeQueryGroups = new HashSet<>();
private Set<QueryGroup> deletedQueryGroups = new HashSet<>();

Expand All @@ -62,17 +62,12 @@ public QueryGroupService(
QueryGroupResourceUsageTrackerService queryGroupUsageTracker,
ClusterService clusterService,
ThreadPool threadPool,
ClusterSettings clusterSettings,
Settings settings
WorkloadManagementSettings workloadManagementSettings
) {
this.queryGroupUsageTracker = queryGroupUsageTracker;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.clusterSettings = clusterSettings;
this.settings = settings;
this.queryGroupServiceRunIntervalInMillis = new TimeValue(
WorkloadManagementSettings.QUERYGROUP_SERVICE_RUN_INTERVAL_SETTING.get(settings)
);
this.workloadManagementSettings = workloadManagementSettings;
}

/**
Expand All @@ -87,7 +82,7 @@ protected void doRun() {
queryGroupLevelResourceUsageViews,
activeQueryGroups,
deletedQueryGroups,
getNodeDuressTracker()::isNodeInDuress
workloadManagementSettings.getNodeDuressTrackers()::isNodeInDuress
);
defaultTaskCancellation.cancelTasks();
}
Expand All @@ -97,18 +92,23 @@ protected void doRun() {
*/
@Override
protected void doStart() {
if(!workloadManagementSettings.queryGroupServiceEnabled()) {
return;
}
scheduledFuture = threadPool.scheduleWithFixedDelay(() -> {
try {
doRun();
} catch (Exception e) {
logger.debug("Exception occurred in Query Sandbox service", e);
}
}, this.queryGroupServiceRunIntervalInMillis, ThreadPool.Names.GENERIC);
queryGroupUsageTracker.constructQueryGroupLevelUsageViews();
}, this.workloadManagementSettings.getQueryGroupServiceRunInterval(), ThreadPool.Names.GENERIC);
}

@Override
protected void doStop() {
if(workloadManagementSettings.queryGroupServiceEnabled()) {
return;
}
if (scheduledFuture != null) {
scheduledFuture.cancel();
}
Expand All @@ -117,30 +117,39 @@ protected void doStop() {
@Override
protected void doClose() throws IOException {}

private NodeDuressTrackers getNodeDuressTracker() {
NodeDuressSettings nodeDuressSettings = new NodeDuressSettings(settings, clusterSettings);
return new NodeDuressTrackers(new EnumMap<>(ResourceType.class) {
{
put(
ResourceType.CPU,
new NodeDuressTrackers.NodeDuressTracker(
() -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= nodeDuressSettings.getCpuThreshold(),
nodeDuressSettings::getNumSuccessiveBreaches
)
);
put(
ResourceType.MEMORY,
new NodeDuressTrackers.NodeDuressTracker(
() -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= nodeDuressSettings.getHeapThreshold(),
nodeDuressSettings::getNumSuccessiveBreaches
)
);
}
});
}

protected Set<QueryGroup> getActiveQueryGroups() {
Map<String, QueryGroup> queryGroups = Metadata.builder(clusterService.state().metadata()).getQueryGroups();
return new HashSet<>(queryGroups.values());
}

@Override
public void applyClusterState(ClusterChangedEvent event) {
// Retrieve the current and previous cluster states
Metadata previousMetadata = event.previousState().metadata();
Metadata currentMetadata = event.state().metadata();

// Extract the query groups from both the current and previous cluster states
Map<String, QueryGroup> previousQueryGroups = previousMetadata.queryGroups();
Map<String, QueryGroup> currentQueryGroups = currentMetadata.queryGroups();

// Detect new query groups added in the current cluster state
for (String queryGroupName : currentQueryGroups.keySet()) {
if (!previousQueryGroups.containsKey(queryGroupName)) {
// New query group detected
QueryGroup newQueryGroup = currentQueryGroups.get(queryGroupName);
// Perform any necessary actions with the new query group
this.activeQueryGroups.add(newQueryGroup);
}
}

// Detect query groups deleted in the current cluster state
for (String queryGroupName : previousQueryGroups.keySet()) {
if (!currentQueryGroups.containsKey(queryGroupName)) {
// Query group deleted
QueryGroup deletedQueryGroup = previousQueryGroups.get(queryGroupName);
// Perform any necessary actions with the deleted query group
this.deletedQueryGroups.add(deletedQueryGroup);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.monitor.jvm.JvmStats;
import org.opensearch.monitor.process.ProcessProbe;
import org.opensearch.search.ResourceType;
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
import org.opensearch.search.backpressure.trackers.NodeDuressTrackers;

import java.util.EnumMap;

/**
* Main class to declare Workload Management related settings
Expand All @@ -31,6 +39,9 @@ public class WorkloadManagementSettings {
private Double nodeLevelMemoryRejectionThreshold;
private Double nodeLevelCpuCancellationThreshold;
private Double nodeLevelCpuRejectionThreshold;
private TimeValue queryGroupServiceRunIntervalInMillis;
private NodeDuressTrackers nodeDuressTrackers;
private Boolean queryGroupServiceEnabled;

/**
* Setting name for node level memory based rejection threshold for QueryGroup service
Expand Down Expand Up @@ -84,6 +95,14 @@ public class WorkloadManagementSettings {
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final String QUERYGROUP_SERVICE_ENABLED_SETTING_NAME = "wlm.query_group.service.enabled";

public static final Setting<Boolean> QUERYGROUP_SERVICE_ENABLED_SETTING = Setting.boolSetting(
QUERYGROUP_SERVICE_ENABLED_SETTING_NAME,
false,
Setting.Property.Dynamic,
Setting.Property.IndexScope
);
/**
* Setting name for Query Group Service run interval
*/
Expand All @@ -108,6 +127,9 @@ public WorkloadManagementSettings(Settings settings, ClusterSettings clusterSett
nodeLevelMemoryRejectionThreshold = NODE_LEVEL_MEMORY_REJECTION_THRESHOLD.get(settings);
nodeLevelCpuCancellationThreshold = NODE_LEVEL_CPU_CANCELLATION_THRESHOLD.get(settings);
nodeLevelCpuRejectionThreshold = NODE_LEVEL_CPU_REJECTION_THRESHOLD.get(settings);
queryGroupServiceRunIntervalInMillis = TimeValue.timeValueMillis(QUERYGROUP_SERVICE_RUN_INTERVAL_SETTING.get(settings));
nodeDuressTrackers = setupNodeDuressTracker(settings, clusterSettings);
queryGroupServiceEnabled = QUERYGROUP_SERVICE_ENABLED_SETTING.get(settings);

ensureRejectionThresholdIsLessThanCancellation(
nodeLevelMemoryRejectionThreshold,
Expand All @@ -128,6 +150,50 @@ public WorkloadManagementSettings(Settings settings, ClusterSettings clusterSett
clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_CPU_REJECTION_THRESHOLD, this::setNodeLevelCpuRejectionThreshold);
}

/**
* Gets the interval at which the Query Group Service runs.
*
* @return the interval as a \`TimeValue\` object.
*/
public TimeValue getQueryGroupServiceRunInterval() {
return queryGroupServiceRunIntervalInMillis;
}

/**
* Gets the \`NodeDuressTrackers\` instance which tracks the node duress state.
*
* @return the \`NodeDuressTrackers\` instance.
*/
public NodeDuressTrackers getNodeDuressTrackers() {
return nodeDuressTrackers;
}

public Boolean queryGroupServiceEnabled() {
return queryGroupServiceEnabled;
}

private NodeDuressTrackers setupNodeDuressTracker(Settings settings, ClusterSettings clusterSettings) {
NodeDuressSettings nodeDuressSettings = new NodeDuressSettings(settings, clusterSettings);
return new NodeDuressTrackers(new EnumMap<>(ResourceType.class) {
{
put(
ResourceType.CPU,
new NodeDuressTrackers.NodeDuressTracker(
() -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= nodeDuressSettings.getCpuThreshold(),
nodeDuressSettings::getNumSuccessiveBreaches
)
);
put(
ResourceType.MEMORY,
new NodeDuressTrackers.NodeDuressTracker(
() -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= nodeDuressSettings.getHeapThreshold(),
nodeDuressSettings::getNumSuccessiveBreaches
)
);
}
});
}

/**
* Method to get the node level memory based cancellation threshold
* @return current node level memory based cancellation threshold
Expand Down

0 comments on commit 36c349a

Please sign in to comment.