Skip to content

Commit

Permalink
YARN-11702: Fix Yarn over allocating containers (#6990) Contributed b…
Browse files Browse the repository at this point in the history
…y Syed Shameerur Rahman.

Reviewed-by: Akira Ajisaka <[email protected]>
Signed-off-by: Shilun Fan <[email protected]>
  • Loading branch information
shameersss1 authored Sep 25, 2024
1 parent e602c60 commit 21ec686
Show file tree
Hide file tree
Showing 8 changed files with 597 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1537,6 +1537,17 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int DEFAULT_RM_MAX_LOG_AGGREGATION_DIAGNOSTICS_IN_MEMORY =
10;

/**
* The configuration key for enabling or disabling the auto-correction of container allocation.
*/
public static final String RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION = RM_PREFIX
+ "scheduler.autocorrect.container.allocation";

/**
* Default value: {@value}.
*/
public static final boolean DEFAULT_RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION = false;

/** Whether to enable log aggregation */
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,21 @@
<name>yarn.resourcemanager.principal</name>
</property>

<property>
<description>
This configuration key enables or disables the auto-correction of container allocation in
YARN. Due to the asynchronous nature of container request and allocation, YARN may sometimes
over-allocate more containers than requested. The auto-correction feature addresses this by
automatically adjusting the number of requested containers based on those already allocated,
preventing extra containers from being allocated.
While the extra allocated containers will be released by the client within a few seconds,
this may not be a concern in normal circumstances. However, if the user is worried about
resource contention due to over-allocation, enabling this feature can help avoid such cases.
</description>
<name>yarn.resourcemanager.scheduler.autocorrect.container.allocation</name>
<value>false</value>
</property>

<property>
<description>The address of the scheduler interface.</description>
<name>yarn.resourcemanager.scheduler.address</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -34,6 +36,10 @@

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -151,6 +157,7 @@ public abstract class AbstractYarnScheduler
Thread updateThread;
private final Object updateThreadMonitor = new Object();
private Timer releaseCache;
private boolean autoCorrectContainerAllocation;

/*
* All schedulers which are inheriting AbstractYarnScheduler should use
Expand Down Expand Up @@ -212,6 +219,9 @@ public void serviceInit(Configuration conf) throws Exception {
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
skipNodeInterval = YarnConfiguration.getSkipNodeInterval(conf);
autoCorrectContainerAllocation =
conf.getBoolean(YarnConfiguration.RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION,
YarnConfiguration.DEFAULT_RM_SCHEDULER_AUTOCORRECT_CONTAINER_ALLOCATION);
long configuredMaximumAllocationWaitTime =
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
Expand Down Expand Up @@ -624,6 +634,106 @@ public void recoverContainersOnNode(List<NMContainerStatus> containerReports,
}
}

/**
* Autocorrect container resourceRequests by decrementing the number of newly allocated containers
* from the current container request. This also updates the newlyAllocatedContainers to be within
* the limits of the current container resourceRequests.
* ResourceRequests locality/resourceName is not considered while autocorrecting the container
* request, hence when there are two types of resourceRequest which is same except for the
* locality/resourceName, it is counted as same {@link ContainerObjectType} and the container
* ask and number of newly allocated container is decremented accordingly.
* For example when a client requests for 4 containers with locality/resourceName
* as "node1", AMRMClientaugments the resourceRequest into two
* where R1(numContainer=4,locality=*) and R2(numContainer=4,locality=node1),
* if Yarn allocated 6 containers previously, it will release 2 containers as well as
* update the container ask to 0.
*
* If there is a client which directly calls Yarn (without AMRMClient) with
* two where R1(numContainer=4,locality=*) and R2(numContainer=4,locality=node1)
* the autocorrection may not work as expected. The use case of such client is very rare.
*
* <p>
* This method is called from {@link AbstractYarnScheduler#allocate} method. It is package private
* to be used within the scheduler package only.
* @param resourceRequests List of resources to be allocated
* @param application ApplicationAttempt
*/
@VisibleForTesting
protected void autoCorrectContainerAllocation(List<ResourceRequest> resourceRequests,
SchedulerApplicationAttempt application) {

// if there is no resourceRequests for containers or no newly allocated container from
// the previous request there is nothing to do.
if (!autoCorrectContainerAllocation || resourceRequests.isEmpty() ||
application.newlyAllocatedContainers.isEmpty()) {
return;
}

// iterate newlyAllocatedContainers and form a mapping of container type
// and number of its occurrence.
Map<ContainerObjectType, List<RMContainer>> allocatedContainerMap = new HashMap<>();
for (RMContainer rmContainer : application.newlyAllocatedContainers) {
Container container = rmContainer.getContainer();
ContainerObjectType containerObjectType = new ContainerObjectType(
container.getAllocationRequestId(), container.getPriority(),
container.getExecutionType(), container.getResource());
allocatedContainerMap.computeIfAbsent(containerObjectType,
k -> new ArrayList<>()).add(rmContainer);
}

Map<ContainerObjectType, Integer> extraContainerAllocatedMap = new HashMap<>();
// iterate through resourceRequests and update the request by
// decrementing the already allocated containers.
for (ResourceRequest request : resourceRequests) {
ContainerObjectType containerObjectType =
new ContainerObjectType(request.getAllocationRequestId(),
request.getPriority(), request.getExecutionTypeRequest().getExecutionType(),
request.getCapability());
int numContainerAllocated = allocatedContainerMap.getOrDefault(containerObjectType,
Collections.emptyList()).size();
if (numContainerAllocated > 0) {
int numContainerAsk = request.getNumContainers();
int updatedContainerRequest = numContainerAsk - numContainerAllocated;
if (updatedContainerRequest < 0) {
// add an entry to extra allocated map
extraContainerAllocatedMap.put(containerObjectType, Math.abs(updatedContainerRequest));
LOG.debug("{} container of the resource type: {} will be released",
Math.abs(updatedContainerRequest), request);
// if newlyAllocatedContainer count is more than the current container
// resourceRequests, reset it to 0.
updatedContainerRequest = 0;
}

// update the request
LOG.debug("Updating container resourceRequests from {} to {} for the resource type: {}",
numContainerAsk, updatedContainerRequest, request);
request.setNumContainers(updatedContainerRequest);
}
}

// Iterate over the entries in extraContainerAllocatedMap
for (Map.Entry<ContainerObjectType, Integer> entry : extraContainerAllocatedMap.entrySet()) {
ContainerObjectType containerObjectType = entry.getKey();
int extraContainers = entry.getValue();

// Get the list of allocated containers for the current ContainerObjectType
List<RMContainer> allocatedContainers = allocatedContainerMap.get(containerObjectType);
if (allocatedContainers != null) {
for (RMContainer rmContainer : allocatedContainers) {
if (extraContainers > 0) {
// Change the state of the container from ALLOCATED to EXPIRED since it is not required.
LOG.debug("Removing extra container:{}", rmContainer.getContainer());
completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(), SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE);
application.newlyAllocatedContainers.remove(rmContainer);
extraContainers--;
}
}
}
}
}

private RMContainer recoverAndCreateContainer(NMContainerStatus status,
RMNode node, String queueName) {
Container container =
Expand Down Expand Up @@ -658,6 +768,14 @@ private void recoverResourceRequestForContainer(RMContainer rmContainer) {
return;
}

// when auto correct container allocation is enabled, there can be a case when extra containers
// go to expired state from allocated state. When such scenario happens do not re-attempt the
// container request since this is expected.
if (autoCorrectContainerAllocation &&
RMContainerState.EXPIRED.equals(rmContainer.getState())) {
return;
}

// Add resource request back to Scheduler ApplicationAttempt.

// We lookup the application-attempt here again using
Expand Down Expand Up @@ -1678,4 +1796,77 @@ private List<ApplicationAttemptId> getAppsFromQueue(String queueName)
}
return apps;
}

/**
* ContainerObjectType is a container object with the following properties.
* Namely allocationId, priority, executionType and resourceType.
*/
protected class ContainerObjectType extends Object {
private final long allocationId;
private final Priority priority;
private final ExecutionType executionType;
private final Resource resource;

public ContainerObjectType(long allocationId, Priority priority,
ExecutionType executionType, Resource resource) {
this.allocationId = allocationId;
this.priority = priority;
this.executionType = executionType;
this.resource = resource;
}

public long getAllocationId() {
return allocationId;
}

public Priority getPriority() {
return priority;
}

public ExecutionType getExecutionType() {
return executionType;
}

public Resource getResource() {
return resource;
}

@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(allocationId)
.append(priority)
.append(executionType)
.append(resource)
.toHashCode();
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != this.getClass()) {
return false;
}

ContainerObjectType other = (ContainerObjectType) obj;
return new EqualsBuilder()
.append(allocationId, other.getAllocationId())
.append(priority, other.getPriority())
.append(executionType, other.getExecutionType())
.append(resource, other.getResource())
.isEquals();
}

@Override
public String toString() {
return "{ContainerObjectType: "
+ ", Priority: " + getPriority()
+ ", Allocation Id: " + getAllocationId()
+ ", Execution Type: " + getExecutionType()
+ ", Resource: " + getResource()
+ "}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,8 @@ protected synchronized void addToUpdateContainerErrors(
updateContainerErrors.add(error);
}

protected synchronized void addToNewlyAllocatedContainers(
@VisibleForTesting
public synchronized void addToNewlyAllocatedContainers(
SchedulerNode node, RMContainer rmContainer) {
ContainerId matchedContainerId =
getUpdateContext().matchContainerToOutstandingIncreaseReq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,10 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
application.showRequests();
}

// update the current container ask by considering the already allocated
// containers from previous allocation request and return updatedNewlyAllocatedContainers.
autoCorrectContainerAllocation(ask, application);

// Update application requests
if (application.updateResourceRequests(ask) || application
.updateSchedulingRequests(schedulingRequests)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,11 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
}
application.showRequests();

// update the current container ask by considering the already allocated containers
// from previous allocation request as well as populate the updatedNewlyAllocatedContainers
// list according the to the current ask.
autoCorrectContainerAllocation(ask, application);

// Update application requests
application.updateResourceRequests(ask);

Expand Down
Loading

0 comments on commit 21ec686

Please sign in to comment.