diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java index 2bbec5f8132a6..bb9659e60551f 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -10,6 +10,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterStateApplier; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.cluster.service.ClusterService; @@ -38,16 +40,14 @@ /** * Main service which will run periodically to track and cancel resource constraint violating tasks in QueryGroups */ -public class QueryGroupService extends AbstractLifecycleComponent { +public class QueryGroupService extends AbstractLifecycleComponent implements ClusterStateApplier { private static final Logger logger = LogManager.getLogger(QueryGroupService.class); private final QueryGroupResourceUsageTrackerService queryGroupUsageTracker; - private final TimeValue queryGroupServiceRunIntervalInMillis; private volatile Scheduler.Cancellable scheduledFuture; private final ThreadPool threadPool; private final ClusterService clusterService; - private final ClusterSettings clusterSettings; - private final Settings settings; + private final WorkloadManagementSettings workloadManagementSettings; private Set activeQueryGroups = new HashSet<>(); private Set deletedQueryGroups = new HashSet<>(); @@ -62,17 +62,12 @@ public QueryGroupService( QueryGroupResourceUsageTrackerService queryGroupUsageTracker, ClusterService clusterService, ThreadPool threadPool, - ClusterSettings clusterSettings, - Settings settings + WorkloadManagementSettings workloadManagementSettings ) { this.queryGroupUsageTracker = queryGroupUsageTracker; this.clusterService = clusterService; this.threadPool = threadPool; - this.clusterSettings = clusterSettings; - this.settings = settings; - this.queryGroupServiceRunIntervalInMillis = new TimeValue( - WorkloadManagementSettings.QUERYGROUP_SERVICE_RUN_INTERVAL_SETTING.get(settings) - ); + this.workloadManagementSettings = workloadManagementSettings; } /** @@ -87,7 +82,7 @@ protected void doRun() { queryGroupLevelResourceUsageViews, activeQueryGroups, deletedQueryGroups, - getNodeDuressTracker()::isNodeInDuress + workloadManagementSettings.getNodeDuressTrackers()::isNodeInDuress ); defaultTaskCancellation.cancelTasks(); } @@ -97,18 +92,23 @@ protected void doRun() { */ @Override protected void doStart() { + if(!workloadManagementSettings.queryGroupServiceEnabled()) { + return; + } scheduledFuture = threadPool.scheduleWithFixedDelay(() -> { try { doRun(); } catch (Exception e) { logger.debug("Exception occurred in Query Sandbox service", e); } - }, this.queryGroupServiceRunIntervalInMillis, ThreadPool.Names.GENERIC); - queryGroupUsageTracker.constructQueryGroupLevelUsageViews(); + }, this.workloadManagementSettings.getQueryGroupServiceRunInterval(), ThreadPool.Names.GENERIC); } @Override protected void doStop() { + if(workloadManagementSettings.queryGroupServiceEnabled()) { + return; + } if (scheduledFuture != null) { scheduledFuture.cancel(); } @@ -117,30 +117,39 @@ protected void doStop() { @Override protected void doClose() throws IOException {} - private NodeDuressTrackers getNodeDuressTracker() { - NodeDuressSettings nodeDuressSettings = new NodeDuressSettings(settings, clusterSettings); - return new NodeDuressTrackers(new EnumMap<>(ResourceType.class) { - { - put( - ResourceType.CPU, - new NodeDuressTrackers.NodeDuressTracker( - () -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= nodeDuressSettings.getCpuThreshold(), - nodeDuressSettings::getNumSuccessiveBreaches - ) - ); - put( - ResourceType.MEMORY, - new NodeDuressTrackers.NodeDuressTracker( - () -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= nodeDuressSettings.getHeapThreshold(), - nodeDuressSettings::getNumSuccessiveBreaches - ) - ); - } - }); - } - protected Set getActiveQueryGroups() { Map queryGroups = Metadata.builder(clusterService.state().metadata()).getQueryGroups(); return new HashSet<>(queryGroups.values()); } + + @Override + public void applyClusterState(ClusterChangedEvent event) { + // Retrieve the current and previous cluster states + Metadata previousMetadata = event.previousState().metadata(); + Metadata currentMetadata = event.state().metadata(); + + // Extract the query groups from both the current and previous cluster states + Map previousQueryGroups = previousMetadata.queryGroups(); + Map currentQueryGroups = currentMetadata.queryGroups(); + + // Detect new query groups added in the current cluster state + for (String queryGroupName : currentQueryGroups.keySet()) { + if (!previousQueryGroups.containsKey(queryGroupName)) { + // New query group detected + QueryGroup newQueryGroup = currentQueryGroups.get(queryGroupName); + // Perform any necessary actions with the new query group + this.activeQueryGroups.add(newQueryGroup); + } + } + + // Detect query groups deleted in the current cluster state + for (String queryGroupName : previousQueryGroups.keySet()) { + if (!currentQueryGroups.containsKey(queryGroupName)) { + // Query group deleted + QueryGroup deletedQueryGroup = previousQueryGroups.get(queryGroupName); + // Perform any necessary actions with the deleted query group + this.deletedQueryGroups.add(deletedQueryGroup); + } + } + } } diff --git a/server/src/main/java/org/opensearch/wlm/WorkloadManagementSettings.java b/server/src/main/java/org/opensearch/wlm/WorkloadManagementSettings.java index f1c64fb4f0d90..762fd2d8be60d 100644 --- a/server/src/main/java/org/opensearch/wlm/WorkloadManagementSettings.java +++ b/server/src/main/java/org/opensearch/wlm/WorkloadManagementSettings.java @@ -11,6 +11,14 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.monitor.jvm.JvmStats; +import org.opensearch.monitor.process.ProcessProbe; +import org.opensearch.search.ResourceType; +import org.opensearch.search.backpressure.settings.NodeDuressSettings; +import org.opensearch.search.backpressure.trackers.NodeDuressTrackers; + +import java.util.EnumMap; /** * Main class to declare Workload Management related settings @@ -31,6 +39,9 @@ public class WorkloadManagementSettings { private Double nodeLevelMemoryRejectionThreshold; private Double nodeLevelCpuCancellationThreshold; private Double nodeLevelCpuRejectionThreshold; + private TimeValue queryGroupServiceRunIntervalInMillis; + private NodeDuressTrackers nodeDuressTrackers; + private Boolean queryGroupServiceEnabled; /** * Setting name for node level memory based rejection threshold for QueryGroup service @@ -84,6 +95,14 @@ public class WorkloadManagementSettings { Setting.Property.Dynamic, Setting.Property.NodeScope ); + public static final String QUERYGROUP_SERVICE_ENABLED_SETTING_NAME = "wlm.query_group.service.enabled"; + + public static final Setting QUERYGROUP_SERVICE_ENABLED_SETTING = Setting.boolSetting( + QUERYGROUP_SERVICE_ENABLED_SETTING_NAME, + false, + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); /** * Setting name for Query Group Service run interval */ @@ -108,6 +127,9 @@ public WorkloadManagementSettings(Settings settings, ClusterSettings clusterSett nodeLevelMemoryRejectionThreshold = NODE_LEVEL_MEMORY_REJECTION_THRESHOLD.get(settings); nodeLevelCpuCancellationThreshold = NODE_LEVEL_CPU_CANCELLATION_THRESHOLD.get(settings); nodeLevelCpuRejectionThreshold = NODE_LEVEL_CPU_REJECTION_THRESHOLD.get(settings); + queryGroupServiceRunIntervalInMillis = TimeValue.timeValueMillis(QUERYGROUP_SERVICE_RUN_INTERVAL_SETTING.get(settings)); + nodeDuressTrackers = setupNodeDuressTracker(settings, clusterSettings); + queryGroupServiceEnabled = QUERYGROUP_SERVICE_ENABLED_SETTING.get(settings); ensureRejectionThresholdIsLessThanCancellation( nodeLevelMemoryRejectionThreshold, @@ -128,6 +150,50 @@ public WorkloadManagementSettings(Settings settings, ClusterSettings clusterSett clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_CPU_REJECTION_THRESHOLD, this::setNodeLevelCpuRejectionThreshold); } + /** + * Gets the interval at which the Query Group Service runs. + * + * @return the interval as a \`TimeValue\` object. + */ + public TimeValue getQueryGroupServiceRunInterval() { + return queryGroupServiceRunIntervalInMillis; + } + + /** + * Gets the \`NodeDuressTrackers\` instance which tracks the node duress state. + * + * @return the \`NodeDuressTrackers\` instance. + */ + public NodeDuressTrackers getNodeDuressTrackers() { + return nodeDuressTrackers; + } + + public Boolean queryGroupServiceEnabled() { + return queryGroupServiceEnabled; + } + + private NodeDuressTrackers setupNodeDuressTracker(Settings settings, ClusterSettings clusterSettings) { + NodeDuressSettings nodeDuressSettings = new NodeDuressSettings(settings, clusterSettings); + return new NodeDuressTrackers(new EnumMap<>(ResourceType.class) { + { + put( + ResourceType.CPU, + new NodeDuressTrackers.NodeDuressTracker( + () -> ProcessProbe.getInstance().getProcessCpuPercent() / 100.0 >= nodeDuressSettings.getCpuThreshold(), + nodeDuressSettings::getNumSuccessiveBreaches + ) + ); + put( + ResourceType.MEMORY, + new NodeDuressTrackers.NodeDuressTracker( + () -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= nodeDuressSettings.getHeapThreshold(), + nodeDuressSettings::getNumSuccessiveBreaches + ) + ); + } + }); + } + /** * Method to get the node level memory based cancellation threshold * @return current node level memory based cancellation threshold