Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11041. Replace all occurences of queuePath with the new QueuePath class - followup #5332

Merged
merged 29 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c9f33c9
YARN-11041. Replace all occurences of queuePath with the new QueuePat…
p-szucs Jan 17, 2023
3f6bfee
YARN-11041. Replace all occurences of queuePath with the new QueuePat…
p-szucs Jan 30, 2023
21ca3f0
YARN-11041. Replace all occurences of queuePath with the new QueuePat…
p-szucs Feb 7, 2023
8cd7931
YARN-11041. Refactored template config settings
p-szucs Feb 13, 2023
0e5d499
YARN-11041. Refactored template config settings
p-szucs Feb 13, 2023
ca58278
YARN-11041. Refactored getQueues method in CapacitySchedulerConfigura…
p-szucs Feb 13, 2023
7978640
YARN-11041. Fixed code formatting
p-szucs Feb 13, 2023
9fc1a84
Merge branch 'trunk' into YARN-11041
p-szucs Feb 24, 2023
8d102fc
YARN-11041. Fixed conflicts
p-szucs Feb 24, 2023
3e4f39b
YARN-11041. Unit test fixes
p-szucs Mar 1, 2023
b0d36f0
YARN-11041. Checkstyle fixes
p-szucs Mar 1, 2023
f789293
YARN-11041. Unit test fixes
p-szucs Mar 3, 2023
e2e24f2
YARN-11041. Refactored ACQ template prefix generation
p-szucs Mar 6, 2023
419d358
Merge branch 'trunk' into YARN-11041
p-szucs Mar 6, 2023
afb669e
YARN-11041. Fixed imports
p-szucs Mar 6, 2023
1f73332
Merge branch 'trunk' into YARN-11041
p-szucs Apr 26, 2023
54fe2f4
Merge branch 'trunk' into YARN-11041
p-szucs May 19, 2023
5a6251d
YARN-11041. Resolved conflicts
p-szucs May 19, 2023
2482125
YARN-11041. Review fixes
p-szucs Jun 12, 2023
72f7aa6
YARN-11041. Review fixes - continued replacing
p-szucs Jun 13, 2023
035350e
YARN-11041. Fixed SpotBugs and unit test issues
p-szucs Jun 14, 2023
88cce3d
YARN-11041. Review fixes - added unit tests for AutoCreatedQueueTempl…
p-szucs Jun 15, 2023
06c378d
Merge branch 'apache:trunk' into YARN-11041
p-szucs Jun 15, 2023
f51618b
Merge branch 'trunk' into YARN-11041
p-szucs Aug 14, 2023
37dd8ff
YARN-11041. Aligning the changes to trunk
p-szucs Aug 16, 2023
219cf4b
YARN-11041. Fixed blanks and checkstyle
p-szucs Aug 17, 2023
895ecad
Merge branch 'trunk' into YARN-11041
p-szucs Oct 20, 2023
e8b8b94
YARN-11041. Fixed compilation error after merged trunk
p-szucs Oct 27, 2023
c93bad2
Merge branch 'trunk' into YARN-11041
p-szucs Dec 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
Expand Down Expand Up @@ -180,9 +181,9 @@ public String getName() {

private static void setupQueueConfiguration(
CapacitySchedulerConfiguration config) {
config.setQueues(CapacitySchedulerConfiguration.ROOT,
config.setQueues(new QueuePath(CapacitySchedulerConfiguration.ROOT),
new String[]{"testqueue"});
String a = CapacitySchedulerConfiguration.ROOT + ".testqueue";
QueuePath a = new QueuePath(CapacitySchedulerConfiguration.ROOT + ".testqueue");
config.setCapacity(a, 100f);
config.setMaximumCapacity(a, 100f);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
Expand Down Expand Up @@ -250,13 +251,13 @@ public void testCallRM() {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();

final String a = CapacitySchedulerConfiguration.ROOT + ".a";
final String b = CapacitySchedulerConfiguration.ROOT + ".b";
final String a1 = a + ".a1";
final String a2 = a + ".a2";
final String b1 = b + ".b1";
final String b2 = b + ".b2";
final String b3 = b + ".b3";
final QueuePath a = new QueuePath(CapacitySchedulerConfiguration.ROOT + ".a");
final QueuePath b = new QueuePath(CapacitySchedulerConfiguration.ROOT + ".b");
final QueuePath a1 = new QueuePath(a + ".a1");
final QueuePath a2 = new QueuePath(a + ".a2");
final QueuePath b1 = new QueuePath(b + ".b1");
final QueuePath b2 = new QueuePath(b + ".b2");
final QueuePath b3 = new QueuePath(b + ".b3");
float aCapacity = 10.5f;
float bCapacity = 89.5f;
float a1Capacity = 30;
Expand All @@ -266,7 +267,7 @@ public void testCallRM() {
float b3Capacity = 20;

// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
csConf.setQueues(new QueuePath(CapacitySchedulerConfiguration.ROOT),
new String[] {"a", "b"});

csConf.setCapacity(a, aCapacity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.CapacityReservationsACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.FairReservationsACLsManager;
Expand Down Expand Up @@ -430,7 +431,7 @@ protected Plan initializePlan(String planQueueName) throws YarnException {
Plan plan = new InMemoryPlan(getRootQueueMetrics(), adPolicy,
getAgent(planQueuePath), totCap, planStepSize, rescCalc, minAllocation,
maxAllocation, planQueueName, getReplanner(planQueuePath),
getReservationSchedulerConfiguration().getMoveOnExpiry(planQueuePath),
getReservationSchedulerConfiguration().getMoveOnExpiry(new QueuePath(planQueuePath)),
maxPeriodicity, rmContext);
LOG.info("Initialized plan {} based on reservable queue {}",
plan.toString(), planQueueName);
Expand All @@ -440,7 +441,7 @@ maxAllocation, planQueueName, getReplanner(planQueuePath),
protected Planner getReplanner(String planQueueName) {
ReservationSchedulerConfiguration reservationConfig =
getReservationSchedulerConfiguration();
String plannerClassName = reservationConfig.getReplanner(planQueueName);
String plannerClassName = reservationConfig.getReplanner(new QueuePath(planQueueName));
LOG.info("Using Replanner: " + plannerClassName + " for queue: "
+ planQueueName);
try {
Expand All @@ -463,7 +464,7 @@ protected Planner getReplanner(String planQueueName) {
protected ReservationAgent getAgent(String queueName) {
ReservationSchedulerConfiguration reservationConfig =
getReservationSchedulerConfiguration();
String agentClassName = reservationConfig.getReservationAgent(queueName);
String agentClassName = reservationConfig.getReservationAgent(new QueuePath(queueName));
LOG.info("Using Agent: " + agentClassName + " for queue: " + queueName);
try {
Class<?> agentClazz = conf.getClassByName(agentClassName);
Expand All @@ -487,7 +488,7 @@ protected SharingPolicy getAdmissionPolicy(String queueName) {
ReservationSchedulerConfiguration reservationConfig =
getReservationSchedulerConfiguration();
String admissionPolicyClassName =
reservationConfig.getReservationAdmissionPolicy(queueName);
reservationConfig.getReservationAdmissionPolicy(new QueuePath(queueName));
LOG.info("Using AdmissionPolicy: " + admissionPolicyClassName
+ " for queue: " + queueName);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningQuotaException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.util.resource.Resources;

import java.util.Map;
Expand Down Expand Up @@ -57,9 +58,10 @@ public class CapacityOverTimePolicy extends NoOverCommitPolicy {
private float maxAvg;

@Override
public void init(String reservationQueuePath,
public void init(String reservationQueue,
ReservationSchedulerConfiguration conf) {
this.conf = conf;
QueuePath reservationQueuePath = new QueuePath(reservationQueue);
validWindow = this.conf.getReservationWindow(reservationQueuePath);
maxInst = this.conf.getInstantaneousMaxCapacity(reservationQueuePath) / 100;
maxAvg = this.conf.getAverageCapacity(reservationQueuePath) / 100;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ReservationACL;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;

import java.util.Map;

Expand Down Expand Up @@ -69,7 +70,7 @@ public ReservationSchedulerConfiguration(
* @param queue name of the queue
* @return true if the queue participates in reservation based scheduling
*/
public abstract boolean isReservable(String queue);
public abstract boolean isReservable(QueuePath queue);

/**
* Gets a map containing the {@link AccessControlList} of users for each
Expand All @@ -80,7 +81,7 @@ public ReservationSchedulerConfiguration(
* which contains a list of users that have the specified permission level.
*/
public abstract Map<ReservationACL, AccessControlList> getReservationAcls(
String queue);
QueuePath queue);

/**
* Gets the length of time in milliseconds for which the {@link SharingPolicy}
Expand All @@ -89,7 +90,7 @@ public abstract Map<ReservationACL, AccessControlList> getReservationAcls(
* @return length in time in milliseconds for which to check the
* {@link SharingPolicy}
*/
public long getReservationWindow(String queue) {
public long getReservationWindow(QueuePath queue) {
return DEFAULT_RESERVATION_WINDOW;
}

Expand All @@ -100,7 +101,7 @@ public long getReservationWindow(String queue) {
* @param queue name of the queue
* @return average capacity allowed by the {@link SharingPolicy}
*/
public float getAverageCapacity(String queue) {
public float getAverageCapacity(QueuePath queue) {
return DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER;
}

Expand All @@ -109,7 +110,7 @@ public float getAverageCapacity(String queue) {
* @param queue name of the queue
* @return maximum allowed capacity at any time
*/
public float getInstantaneousMaxCapacity(String queue) {
public float getInstantaneousMaxCapacity(QueuePath queue) {
return DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER;
}

Expand All @@ -118,7 +119,7 @@ public float getInstantaneousMaxCapacity(String queue) {
* @param queue name of the queue
* @return the class name of the {@link SharingPolicy}
*/
public String getReservationAdmissionPolicy(String queue) {
public String getReservationAdmissionPolicy(QueuePath queue) {
return DEFAULT_RESERVATION_ADMISSION_POLICY;
}

Expand All @@ -128,7 +129,7 @@ public String getReservationAdmissionPolicy(String queue) {
* @param queue name of the queue
* @return the class name of the {@code ReservationAgent}
*/
public String getReservationAgent(String queue) {
public String getReservationAgent(QueuePath queue) {
return DEFAULT_RESERVATION_AGENT_NAME;
}

Expand All @@ -137,7 +138,7 @@ public String getReservationAgent(String queue) {
* @param queuePath name of the queue
* @return true if reservation queues should be visible
*/
public boolean getShowReservationAsQueues(String queuePath) {
public boolean getShowReservationAsQueues(QueuePath queuePath) {
return DEFAULT_SHOW_RESERVATIONS_AS_QUEUES;
}

Expand All @@ -147,7 +148,7 @@ public boolean getShowReservationAsQueues(String queuePath) {
* @param queue name of the queue
* @return the class name of the {@code Planner}
*/
public String getReplanner(String queue) {
public String getReplanner(QueuePath queue) {
return DEFAULT_RESERVATION_PLANNER_NAME;
}

Expand All @@ -158,7 +159,7 @@ public String getReplanner(String queue) {
* @return true if application should be moved, false if they need to be
* killed
*/
public boolean getMoveOnExpiry(String queue) {
public boolean getMoveOnExpiry(QueuePath queue) {
return DEFAULT_RESERVATION_MOVE_ON_EXPIRY;
}

Expand All @@ -168,7 +169,7 @@ public boolean getMoveOnExpiry(String queue) {
* @param queue name of the queue
* @return the time in milliseconds for which to check constraints
*/
public long getEnforcementWindow(String queue) {
public long getEnforcementWindow(QueuePath queue) {
return DEFAULT_RESERVATION_ENFORCEMENT_WINDOW;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;
import java.util.TreeSet;

import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
Expand Down Expand Up @@ -72,7 +73,7 @@ public SimpleCapacityReplanner() {
@Override
public void init(String planQueueName,
ReservationSchedulerConfiguration conf) {
this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName);
this.lengthOfCheckZone = conf.getEnforcementWindow(new QueuePath(planQueueName));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,26 @@ public void setEntitlement(QueueEntitlement entitlement)
}

@Override
protected Resource getMinimumAbsoluteResource(String queuePath,
protected Resource getMinimumAbsoluteResource(QueuePath queuePath,
String label) {
return super.getMinimumAbsoluteResource(queueContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()),
return super.getMinimumAbsoluteResource(QueuePrefixes
.getAutoCreatedQueueObjectTemplateConfPrefix(this.getParent().getQueuePathObject()),
label);
}

@Override
protected Resource getMaximumAbsoluteResource(String queuePath,
protected Resource getMaximumAbsoluteResource(QueuePath queuePath,
String label) {
return super.getMaximumAbsoluteResource(queueContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()),
return super.getMaximumAbsoluteResource(QueuePrefixes
.getAutoCreatedQueueObjectTemplateConfPrefix(this.getParent().getQueuePathObject()),
label);
}

@Override
protected boolean checkConfigTypeIsAbsoluteResource(String queuePath,
protected boolean checkConfigTypeIsAbsoluteResource(QueuePath queuePath,
String label) {
return super.checkConfigTypeIsAbsoluteResource(queueContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()),
return super.checkConfigTypeIsAbsoluteResource(QueuePrefixes
.getAutoCreatedQueueObjectTemplateConfPrefix(this.getParent().getQueuePathObject()),
label);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ protected void setupQueueConfigs(Resource clusterResource) throws
writeLock.lock();
try {
CapacitySchedulerConfiguration configuration = queueContext.getConfiguration();
this.acls = configuration.getAcls(getQueuePath());
this.acls = configuration.getAcls(getQueuePathObject());

if (isDynamicQueue() || this instanceof AbstractAutoCreatedLeafQueue) {
parseAndSetDynamicTemplates();
Expand All @@ -367,7 +367,7 @@ protected void setupQueueConfigs(Resource clusterResource) throws

// Setup queue's maximumAllocation respecting the global
// and the queue settings
this.queueAllocationSettings.setupMaximumAllocation(configuration, getQueuePath(),
this.queueAllocationSettings.setupMaximumAllocation(configuration, getQueuePathObject(),
parent);

// Initialize the queue state based on previous state, configured state
Expand All @@ -382,10 +382,10 @@ protected void setupQueueConfigs(Resource clusterResource) throws
configuration.getReservationContinueLook();

this.configuredCapacityVectors = configuration
.parseConfiguredResourceVector(queuePath.getFullPath(),
.parseConfiguredResourceVector(queuePath,
this.queueNodeLabelsSettings.getConfiguredNodeLabels());
this.configuredMaxCapacityVectors = configuration
.parseConfiguredMaximumCapacityVector(queuePath.getFullPath(),
.parseConfiguredMaximumCapacityVector(queuePath,
this.queueNodeLabelsSettings.getConfiguredNodeLabels(),
QueueCapacityVector.newInstance());

Expand Down Expand Up @@ -420,11 +420,11 @@ protected void setupQueueConfigs(Resource clusterResource) throws
// Store preemption settings
this.preemptionSettings = new CSQueuePreemptionSettings(this, configuration);
this.priority = configuration.getQueuePriority(
getQueuePath());
getQueuePathObject());

// Update multi-node sorting algorithm for scheduling as configured.
setMultiNodeSortingPolicyName(
configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePath()));
configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePathObject()));

// Setup application related limits
this.queueAppLifetimeSettings = new QueueAppLifetimeAndLimitSettings(configuration,
Expand All @@ -440,7 +440,7 @@ protected void setupQueueConfigs(Resource clusterResource) throws
protected void parseAndSetDynamicTemplates() {
// Set the template properties from the parent to the queuepath of the child
((AbstractParentQueue) parent).getAutoCreatedQueueTemplate()
.setTemplateEntriesForChild(queueContext.getConfiguration(), getQueuePath(),
.setTemplateEntriesForChild(queueContext.getConfiguration(), getQueuePathObject(),
this instanceof AbstractLeafQueue);

String parentTemplate = String.format("%s.%s", parent.getQueuePath(),
Expand Down Expand Up @@ -488,21 +488,21 @@ private UserWeights getUserWeightsFromHierarchy() {
// Insert this queue's userWeights, overriding parent's userWeights if
// there is an overlap.
unionInheritedWeights.addFrom(
queueContext.getConfiguration().getAllUserWeightsForQueue(getQueuePath()));
queueContext.getConfiguration().getAllUserWeightsForQueue(getQueuePathObject()));
return unionInheritedWeights;
}

protected Resource getMinimumAbsoluteResource(String queuePath, String label) {
protected Resource getMinimumAbsoluteResource(QueuePath queuePath, String label) {
return queueContext.getConfiguration()
.getMinimumResourceRequirement(label, queuePath, resourceTypes);
}

protected Resource getMaximumAbsoluteResource(String queuePath, String label) {
protected Resource getMaximumAbsoluteResource(QueuePath queuePath, String label) {
return queueContext.getConfiguration()
.getMaximumResourceRequirement(label, queuePath, resourceTypes);
}

protected boolean checkConfigTypeIsAbsoluteResource(String queuePath,
protected boolean checkConfigTypeIsAbsoluteResource(QueuePath queuePath,
String label) {
return queueContext.getConfiguration().checkConfigTypeIsAbsoluteResource(label,
queuePath, resourceTypes);
Expand All @@ -518,7 +518,7 @@ protected void updateCapacityConfigType() {

if (queueContext.getConfiguration().isLegacyQueueMode()) {
localType = checkConfigTypeIsAbsoluteResource(
getQueuePath(), label) ? CapacityConfigType.ABSOLUTE_RESOURCE
getQueuePathObject(), label) ? CapacityConfigType.ABSOLUTE_RESOURCE
: CapacityConfigType.PERCENTAGE;
} else {
// TODO: revisit this later
Expand Down Expand Up @@ -556,8 +556,8 @@ protected void updateCapacityConfigType() {
*/
protected void updateConfigurableResourceLimits(Resource clusterResource) {
for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
final Resource minResource = getMinimumAbsoluteResource(getQueuePath(), label);
Resource maxResource = getMaximumAbsoluteResource(getQueuePath(), label);
final Resource minResource = getMinimumAbsoluteResource(getQueuePathObject(), label);
Resource maxResource = getMaximumAbsoluteResource(getQueuePathObject(), label);

if (parent != null) {
final Resource parentMax = parent.getQueueResourceQuotas()
Expand Down
Loading