Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11750. Initialize queues in parallel #7264

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,11 @@
<Method name="run" />
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractParentQueue"/>
<Method name="reinitialize" />
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
</Match>

<Match>
<Class name="org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
Expand Down Expand Up @@ -196,7 +197,7 @@ public QueueMetrics(MetricsSystem ms, String queueName, Queue parent,

this.parent = parent != null ? parent.getMetrics() : null;
this.parentQueue = parent;
this.users = enableUserMetrics ? new HashMap<String, QueueMetrics>() : null;
this.users = enableUserMetrics ? new ConcurrentHashMap<String, QueueMetrics>() : null;
this.enableUserMetrics = enableUserMetrics;

metricsSystem = ms;
Expand Down Expand Up @@ -253,7 +254,7 @@ public synchronized static void clearQueueMetrics() {
* Simple metrics cache to help prevent re-registrations.
*/
private static final Map<String, QueueMetrics> QUEUE_METRICS =
new HashMap<String, QueueMetrics>();
new ConcurrentHashMap<String, QueueMetrics>();

/**
* Returns the metrics cache to help prevent re-registrations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

Expand All @@ -40,6 +42,7 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
Expand Down Expand Up @@ -557,6 +560,7 @@ public boolean isEligibleForAutoQueueCreation() {
public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
writeLock.lock();
ExecutorService executor = null;
try {
// We skip reinitialize for dynamic queues, when this is called, and
// new queue is different from this queue, we will make this queue to be
Expand All @@ -573,6 +577,12 @@ public void reinitialize(CSQueue newlyParsedQueue,
+ newlyParsedQueue.getQueuePath());
}

CapacitySchedulerConfiguration conf = queueContext.getConfiguration();
boolean initializeQueuesParallel = conf.getBoolean(
CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_ENABLE,
CapacitySchedulerConfiguration.DEFAULT_INITIALIZE_QUEUES_PARALLEL_ENABLE
);

AbstractParentQueue newlyParsedParentQueue = (AbstractParentQueue) newlyParsedQueue;

// Set new configs
Expand All @@ -595,46 +605,35 @@ public void reinitialize(CSQueue newlyParsedQueue,
}
}

for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
String newChildQueueName = e.getKey();
CSQueue newChildQueue = e.getValue();

CSQueue childQueue = currentChildQueues.get(newChildQueueName);

// Check if the child-queue already exists
if (childQueue != null) {
// Check if the child-queue has been converted into parent queue or
// parent Queue has been converted to child queue. The CS has already
// checked to ensure that this child-queue is in STOPPED state if
// Child queue has been converted to ParentQueue.
if ((childQueue instanceof AbstractLeafQueue
&& newChildQueue instanceof AbstractParentQueue)
|| (childQueue instanceof AbstractParentQueue
&& newChildQueue instanceof AbstractLeafQueue)) {
// We would convert this LeafQueue to ParentQueue, or vice versa.
// consider this as the combination of DELETE then ADD.
newChildQueue.setParent(this);
currentChildQueues.put(newChildQueueName, newChildQueue);
// inform CapacitySchedulerQueueManager
CapacitySchedulerQueueManager queueManager =
queueContext.getQueueManager();
queueManager.addQueue(newChildQueueName, newChildQueue);
continue;
}
// Re-init existing queues
childQueue.reinitialize(newChildQueue, clusterResource);
LOG.info(getQueuePath() + ": re-configured queue: " + childQueue);
} else{
// New child queue, do not re-init

// Set parent to 'this'
newChildQueue.setParent(this);

// Save in list of current child queues
currentChildQueues.put(newChildQueueName, newChildQueue);

LOG.info(
getQueuePath() + ": added new child queue: " + newChildQueue);
if (initializeQueuesParallel) {
LOG.info("Reinitialize queues in parallel");
int initializeQueuesParallelism = conf.getInt(
CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD,
CapacitySchedulerConfiguration.DEFAULT_INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD);
initializeQueuesParallelism = Math.max(initializeQueuesParallelism, 1);

executor = HadoopExecutors.newFixedThreadPool(initializeQueuesParallelism);
CountDownLatch allDone = new CountDownLatch(newChildQueues.size());
for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
executor.submit(() -> {
try {
reinitializeChildQueues(e, currentChildQueues, clusterResource);
} catch (IOException ex) {
throw new RuntimeException(ex);
} finally {
allDone.countDown();
}
});
}
try {
allDone.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
LOG.info("Reinitialize queues serially");
for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
reinitializeChildQueues(e, currentChildQueues, clusterResource);
}
}

Expand All @@ -658,10 +657,58 @@ public void reinitialize(CSQueue newlyParsedQueue,
// Make sure we notifies QueueOrderingPolicy
queueOrderingPolicy.setQueues(childQueues);
} finally {
if (executor != null) {
executor.shutdownNow();
}
writeLock.unlock();
}
}

private void reinitializeChildQueues(Map.Entry<String, CSQueue> e,
Map<String, CSQueue> currentChildQueues,
Resource clusterResource) throws IOException {
String newChildQueueName = e.getKey();
CSQueue newChildQueue = e.getValue();

CSQueue childQueue = currentChildQueues.get(newChildQueueName);

// Check if the child-queue already exists
if (childQueue != null) {
// Check if the child-queue has been converted into parent queue or
// parent Queue has been converted to child queue. The CS has already
// checked to ensure that this child-queue is in STOPPED state if
// Child queue has been converted to ParentQueue.
if ((childQueue instanceof AbstractLeafQueue
&& newChildQueue instanceof AbstractParentQueue)
|| (childQueue instanceof AbstractParentQueue
&& newChildQueue instanceof AbstractLeafQueue)) {
// We would convert this LeafQueue to ParentQueue, or vice versa.
// consider this as the combination of DELETE then ADD.
newChildQueue.setParent(this);
currentChildQueues.put(newChildQueueName, newChildQueue);
// inform CapacitySchedulerQueueManager
CapacitySchedulerQueueManager queueManager =
queueContext.getQueueManager();
queueManager.addQueue(newChildQueueName, newChildQueue);
return;
}
// Re-init existing queues
childQueue.reinitialize(newChildQueue, clusterResource);
LOG.info(getQueuePath() + ": re-configured queue: " + childQueue);
} else {
// New child queue, do not re-init

// Set parent to 'this'
newChildQueue.setParent(this);

// Save in list of current child queues
currentChildQueues.put(newChildQueueName, newChildQueue);

LOG.info(
getQueuePath() + ": added new child queue: " + newChildQueue);
}
}

private Map<String, CSQueue> getQueuesMap(List<CSQueue> queues) {
Map<String, CSQueue> queuesMap = new HashMap<String, CSQueue>();
for (CSQueue queue : queues) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,25 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = true;

@Private
public static final String INITIALIZE_QUEUES_PARALLEL_PREFIX =
PREFIX + "initialize-queues-parallel";

@Private
public static final String INITIALIZE_QUEUES_PARALLEL_ENABLE =
INITIALIZE_QUEUES_PARALLEL_PREFIX + ".enable";

@Private
public static final boolean DEFAULT_INITIALIZE_QUEUES_PARALLEL_ENABLE = false;

@Private
public static final String INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD =
INITIALIZE_QUEUES_PARALLEL_PREFIX + ".maximum-threads";

@Private
public static final Integer
DEFAULT_INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD = 1;

@Private
public static final String QUEUE_MAPPING = PREFIX + "queue-mappings";

Expand Down
Loading
Loading