Skip to content

Commit

Permalink
Add cancellation framework changes in wlm (#15651)
Browse files Browse the repository at this point in the history
* cancellation related

Signed-off-by: Kiran Prakash <[email protected]>

* Update CHANGELOG.md

Signed-off-by: Kiran Prakash <[email protected]>

* add better cancellation reason

Signed-off-by: Kiran Prakash <[email protected]>

* Update DefaultTaskCancellationTests.java

Signed-off-by: Kiran Prakash <[email protected]>

* refactor

Signed-off-by: Kiran Prakash <[email protected]>

* refactor

Signed-off-by: Kiran Prakash <[email protected]>

* Update DefaultTaskCancellation.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update DefaultTaskCancellation.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update DefaultTaskCancellation.java

Signed-off-by: Kiran Prakash <[email protected]>

* Update DefaultTaskSelectionStrategy.java

Signed-off-by: Kiran Prakash <[email protected]>

* refactor

Signed-off-by: Kiran Prakash <[email protected]>

* refactor node level threshold

Signed-off-by: Kiran Prakash <[email protected]>

* use query group task

Signed-off-by: Kaushal Kumar <[email protected]>

* code clean up and refactorings

Signed-off-by: Kaushal Kumar <[email protected]>

* add unit tests and fix existing ones

Signed-off-by: Kaushal Kumar <[email protected]>

* uncomment the test case

Signed-off-by: Kaushal Kumar <[email protected]>

* update CHANGELOG

Signed-off-by: Kaushal Kumar <[email protected]>

* fix imports

Signed-off-by: Kaushal Kumar <[email protected]>

* refactor and add UTs for new constructs

Signed-off-by: Kaushal Kumar <[email protected]>

* fix javadocs

Signed-off-by: Kaushal Kumar <[email protected]>

* remove code clutter

Signed-off-by: Kaushal Kumar <[email protected]>

* change annotation version and task selection strategy

Signed-off-by: Kaushal Kumar <[email protected]>

* rename a util class

Signed-off-by: Kaushal Kumar <[email protected]>

* remove wrappers from resource type

Signed-off-by: Kaushal Kumar <[email protected]>

* apply spotless

Signed-off-by: Kaushal Kumar <[email protected]>

* address comments

Signed-off-by: Kaushal Kumar <[email protected]>

* add rename changes

Signed-off-by: Kaushal Kumar <[email protected]>

* address comments

Signed-off-by: Kaushal Kumar <[email protected]>

* refactor changes and logical bug fix

Signed-off-by: Kaushal Kumar <[email protected]>

* address comments

Signed-off-by: Kaushal Kumar <[email protected]>

---------

Signed-off-by: Kiran Prakash <[email protected]>
Signed-off-by: Kaushal Kumar <[email protected]>
Signed-off-by: Ankit Jain <[email protected]>
Co-authored-by: Kiran Prakash <[email protected]>
Co-authored-by: Ankit Jain <[email protected]>
  • Loading branch information
3 people committed Sep 11, 2024
1 parent fa6de0d commit cb40de8
Show file tree
Hide file tree
Showing 20 changed files with 1,297 additions and 98 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 2.x]
### Added
- Add path prefix support to hashed prefix snapshots ([#15664](https://github.com/opensearch-project/OpenSearch/pull/15664))
- [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.cluster.Diff;
import org.opensearch.common.UUIDs;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentObject;
Expand Down Expand Up @@ -41,7 +42,7 @@
* "updated_at": 4513232415
* }
*/
@ExperimentalApi
@PublicApi(since = "2.18.0")
public class QueryGroup extends AbstractDiffable<QueryGroup> implements ToXContentObject {

public static final String _ID_STRING = "_id";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.wlm;

import org.opensearch.tasks.Task;

import java.util.List;
import java.util.Map;

Expand All @@ -20,11 +18,11 @@
*/
public class QueryGroupLevelResourceUsageView {
// resourceUsage holds the resource usage data for a QueryGroup at a point in time
private final Map<ResourceType, Long> resourceUsage;
private final Map<ResourceType, Double> resourceUsage;
// activeTasks holds the list of active tasks for a QueryGroup at a point in time
private final List<Task> activeTasks;
private final List<QueryGroupTask> activeTasks;

public QueryGroupLevelResourceUsageView(Map<ResourceType, Long> resourceUsage, List<Task> activeTasks) {
public QueryGroupLevelResourceUsageView(Map<ResourceType, Double> resourceUsage, List<QueryGroupTask> activeTasks) {
this.resourceUsage = resourceUsage;
this.activeTasks = activeTasks;
}
Expand All @@ -34,7 +32,7 @@ public QueryGroupLevelResourceUsageView(Map<ResourceType, Long> resourceUsage, L
*
* @return The map of resource usage data
*/
public Map<ResourceType, Long> getResourceUsageData() {
public Map<ResourceType, Double> getResourceUsageData() {
return resourceUsage;
}

Expand All @@ -43,7 +41,7 @@ public Map<ResourceType, Long> getResourceUsageData() {
*
* @return The list of active tasks
*/
public List<Task> getActiveTasks() {
public List<QueryGroupTask> getActiveTasks() {
return activeTasks;
}
}
24 changes: 23 additions & 1 deletion server/src/main/java/org/opensearch/wlm/QueryGroupTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,33 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.tasks.CancellableTask;

import java.util.Map;
import java.util.Optional;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

import static org.opensearch.search.SearchService.NO_TIMEOUT;

/**
* Base class to define QueryGroup tasks
*/
@PublicApi(since = "2.18.0")
public class QueryGroupTask extends CancellableTask {

private static final Logger logger = LogManager.getLogger(QueryGroupTask.class);
public static final String QUERY_GROUP_ID_HEADER = "queryGroupId";
public static final Supplier<String> DEFAULT_QUERY_GROUP_ID_SUPPLIER = () -> "DEFAULT_QUERY_GROUP";
private final LongSupplier nanoTimeSupplier;
private String queryGroupId;

public QueryGroupTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT);
this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT, System::nanoTime);
}

public QueryGroupTask(
Expand All @@ -43,8 +47,22 @@ public QueryGroupTask(
TaskId parentTaskId,
Map<String, String> headers,
TimeValue cancelAfterTimeInterval
) {
this(id, type, action, description, parentTaskId, headers, cancelAfterTimeInterval, System::nanoTime);
}

public QueryGroupTask(
long id,
String type,
String action,
String description,
TaskId parentTaskId,
Map<String, String> headers,
TimeValue cancelAfterTimeInterval,
LongSupplier nanoTimeSupplier
) {
super(id, type, action, description, parentTaskId, headers, cancelAfterTimeInterval);
this.nanoTimeSupplier = nanoTimeSupplier;
}

/**
Expand All @@ -69,6 +87,10 @@ public final void setQueryGroupId(final ThreadContext threadContext) {
.orElse(DEFAULT_QUERY_GROUP_ID_SUPPLIER.get());
}

public long getElapsedTime() {
return nanoTimeSupplier.getAsLong() - getStartTimeNanos();
}

@Override
public boolean shouldCancelChildrenOnCancellation() {
return false;
Expand Down
41 changes: 23 additions & 18 deletions server/src/main/java/org/opensearch/wlm/ResourceType.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.tasks.resourcetracker.ResourceStats;
import org.opensearch.tasks.Task;
import org.opensearch.wlm.tracker.CpuUsageCalculator;
import org.opensearch.wlm.tracker.MemoryUsageCalculator;
import org.opensearch.wlm.tracker.ResourceUsageCalculator;

import java.io.IOException;
import java.util.List;
Expand All @@ -25,19 +26,25 @@
*/
@PublicApi(since = "2.17.0")
public enum ResourceType {
CPU("cpu", task -> task.getTotalResourceUtilization(ResourceStats.CPU), true),
MEMORY("memory", task -> task.getTotalResourceUtilization(ResourceStats.MEMORY), true);
CPU("cpu", true, CpuUsageCalculator.INSTANCE, WorkloadManagementSettings::getNodeLevelCpuCancellationThreshold),
MEMORY("memory", true, MemoryUsageCalculator.INSTANCE, WorkloadManagementSettings::getNodeLevelMemoryCancellationThreshold);

private final String name;
private final Function<Task, Long> getResourceUsage;
private final boolean statsEnabled;

private final ResourceUsageCalculator resourceUsageCalculator;
private final Function<WorkloadManagementSettings, Double> nodeLevelThresholdSupplier;
private static List<ResourceType> sortedValues = List.of(CPU, MEMORY);

ResourceType(String name, Function<Task, Long> getResourceUsage, boolean statsEnabled) {
ResourceType(
String name,
boolean statsEnabled,
ResourceUsageCalculator resourceUsageCalculator,
Function<WorkloadManagementSettings, Double> nodeLevelThresholdSupplier
) {
this.name = name;
this.getResourceUsage = getResourceUsage;
this.statsEnabled = statsEnabled;
this.resourceUsageCalculator = resourceUsageCalculator;
this.nodeLevelThresholdSupplier = nodeLevelThresholdSupplier;
}

/**
Expand All @@ -62,20 +69,18 @@ public String getName() {
return name;
}

/**
* Gets the resource usage for a given resource type and task.
*
* @param task the task for which to calculate resource usage
* @return the resource usage
*/
public long getResourceUsage(Task task) {
return getResourceUsage.apply(task);
}

public boolean hasStatsEnabled() {
return statsEnabled;
}

public ResourceUsageCalculator getResourceUsageCalculator() {
return resourceUsageCalculator;
}

public double getNodeLevelThreshold(WorkloadManagementSettings settings) {
return nodeLevelThresholdSupplier.apply(settings);
}

public static List<ResourceType> getSortedValues() {
return sortedValues;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@

package org.opensearch.wlm;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

/**
* Main class to declare Workload Management related settings
*/
@PublicApi(since = "2.18.0")
public class WorkloadManagementSettings {
private static final Double DEFAULT_NODE_LEVEL_MEMORY_REJECTION_THRESHOLD = 0.8;
private static final Double DEFAULT_NODE_LEVEL_MEMORY_CANCELLATION_THRESHOLD = 0.9;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.wlm.cancellation;

import org.opensearch.wlm.QueryGroupTask;
import org.opensearch.wlm.ResourceType;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.wlm.cancellation.QueryGroupTaskCancellationService.MIN_VALUE;

/**
* Represents the highest resource consuming task first selection strategy.
*/
public class MaximumResourceTaskSelectionStrategy implements TaskSelectionStrategy {

public MaximumResourceTaskSelectionStrategy() {}

/**
* Returns a comparator that defines the sorting condition for tasks.
* This is the default implementation since the most resource consuming tasks are the likely to regress the performance.
* from resiliency point of view it makes sense to cancel them first
*
* @return The comparator
*/
private Comparator<QueryGroupTask> sortingCondition(ResourceType resourceType) {
return Comparator.comparingDouble(task -> resourceType.getResourceUsageCalculator().calculateTaskResourceUsage(task));
}

/**
* Selects tasks for cancellation based on the provided limit and resource type.
* The tasks are sorted based on the sorting condition and then selected until the accumulated resource usage reaches the limit.
*
* @param tasks The list of tasks from which to select
* @param limit The limit on the accumulated resource usage
* @param resourceType The type of resource to consider
* @return The list of selected tasks
* @throws IllegalArgumentException If the limit is less than zero
*/
public List<QueryGroupTask> selectTasksForCancellation(List<QueryGroupTask> tasks, double limit, ResourceType resourceType) {
if (limit < 0) {
throw new IllegalArgumentException("limit has to be greater than zero");
}
if (limit < MIN_VALUE) {
return Collections.emptyList();
}

List<QueryGroupTask> sortedTasks = tasks.stream().sorted(sortingCondition(resourceType).reversed()).collect(Collectors.toList());

List<QueryGroupTask> selectedTasks = new ArrayList<>();
double accumulated = 0;
for (QueryGroupTask task : sortedTasks) {
selectedTasks.add(task);
accumulated += resourceType.getResourceUsageCalculator().calculateTaskResourceUsage(task);
if ((accumulated - limit) > MIN_VALUE) {
break;
}
}
return selectedTasks;
}
}
Loading

0 comments on commit cb40de8

Please sign in to comment.