Skip to content

Commit

Permalink
YARN-11537. [Addendum][Federation] Router CLI Supports List SubCluste…
Browse files Browse the repository at this point in the history
…rPolicyConfiguration Of Queues. (#6121) Contributed by Shilun Fan.

Reviewed-by: Inigo Goiri <[email protected]>
Signed-off-by: Shilun Fan <[email protected]>
  • Loading branch information
slfan1989 authored Sep 29, 2023
1 parent 8931393 commit 5f47f09
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void init(String userName) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("RPC Router RMAdminClient-" + userName + "-%d ").build();

BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);

Expand Down Expand Up @@ -1032,7 +1032,7 @@ public QueryFederationQueuePoliciesResponse listFederationQueuePolicies(
}

try {
QueryFederationQueuePoliciesResponse response = null;
QueryFederationQueuePoliciesResponse response;

long startTime = clock.getTime();
String queue = request.getQueue();
Expand All @@ -1056,9 +1056,15 @@ public QueryFederationQueuePoliciesResponse listFederationQueuePolicies(
// We filter by pagination.
response = filterPoliciesConfigurationsByQueues(queues, policiesConfigurations,
pageSize, currentPage);
} else {
// If we don't have any filtering criteria, we should also support paginating the results.
response = filterPoliciesConfigurations(policiesConfigurations, pageSize, currentPage);
}
long stopTime = clock.getTime();
routerMetrics.succeededListFederationQueuePoliciesRetrieved(stopTime - startTime);
if (response == null) {
response = QueryFederationQueuePoliciesResponse.newInstance();
}
return response;
} catch (Exception e) {
routerMetrics.incrListFederationQueuePoliciesFailedRetrieved();
Expand Down Expand Up @@ -1137,12 +1143,75 @@ private QueryFederationQueuePoliciesResponse filterPoliciesConfigurationsByQueue
}
}

// Step3. To paginate the returned results.
return queryFederationQueuePoliciesPagination(federationQueueWeights, pageSize, currentPage);
}

/**
* Filter PoliciesConfigurations, and we paginate Policies within this method.
*
* @param policiesConfigurations policy configurations.
* @param pageSize Items per page.
* @param currentPage The number of pages to be queried.
* @return federation queue policies response.
* @throws YarnException indicates exceptions from yarn servers.
*/
private QueryFederationQueuePoliciesResponse filterPoliciesConfigurations(
Map<String, SubClusterPolicyConfiguration> policiesConfigurations,
int pageSize, int currentPage) throws YarnException {

// Step1. Check the parameters, if the policy list is empty, return empty directly.
if (MapUtils.isEmpty(policiesConfigurations)) {
return null;
}

// Step2. Traverse policiesConfigurations and obtain the FederationQueueWeight list.
List<FederationQueueWeight> federationQueueWeights = new ArrayList<>();
for (Map.Entry<String, SubClusterPolicyConfiguration> entry :
policiesConfigurations.entrySet()) {
String queue = entry.getKey();
SubClusterPolicyConfiguration policyConf = entry.getValue();
if (policyConf == null) {
continue;
}
FederationQueueWeight federationQueueWeight = parseFederationQueueWeight(queue, policyConf);
if (federationQueueWeight != null) {
federationQueueWeights.add(federationQueueWeight);
}
}

// Step3. To paginate the returned results.
return queryFederationQueuePoliciesPagination(federationQueueWeights, pageSize, currentPage);
}

/**
* Pagination for FederationQueuePolicies.
*
* @param queueWeights List Of FederationQueueWeight.
* @param pageSize Items per page.
* @param currentPage The number of pages to be queried.
* @return federation queue policies response.
* @throws YarnException indicates exceptions from yarn servers.
*/
private QueryFederationQueuePoliciesResponse queryFederationQueuePoliciesPagination(
List<FederationQueueWeight> queueWeights, int pageSize, int currentPage)
throws YarnException {
if (CollectionUtils.isEmpty(queueWeights)) {
return null;
}

int startIndex = (currentPage - 1) * pageSize;
int endIndex = Math.min(startIndex + pageSize, federationQueueWeights.size());
int endIndex = Math.min(startIndex + pageSize, queueWeights.size());

if (startIndex > endIndex) {
throw new YarnException("The index of the records to be retrieved " +
"has exceeded the maximum index.");
}

List<FederationQueueWeight> subFederationQueueWeights =
federationQueueWeights.subList(startIndex, endIndex);
queueWeights.subList(startIndex, endIndex);

int totalSize = federationQueueWeights.size();
int totalSize = queueWeights.size();
int totalPage =
(totalSize % pageSize == 0) ? totalSize / pageSize : (totalSize / pageSize) + 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -965,5 +965,28 @@ public void testFilterPoliciesConfigurationsByQueues() throws Exception {
List<FederationQueueWeight> federationQueueWeights6 = response6.getFederationQueueWeights();
assertNotNull(federationQueueWeights6);
assertEquals(1, federationQueueWeights6.size());

// Queue7: We design such a test case, we do not set any filter conditions,
// but we need to get the return results
QueryFederationQueuePoliciesRequest request7 =
QueryFederationQueuePoliciesRequest.newInstance(10, 1, null, null);
QueryFederationQueuePoliciesResponse response7 =
interceptor.listFederationQueuePolicies(request7);
assertNotNull(response7);
assertEquals(1, response7.getCurrentPage());
assertEquals(10, response7.getPageSize());
assertEquals(3, response7.getTotalPage());
assertEquals(26, response7.getTotalSize());
List<FederationQueueWeight> federationQueueWeights7 = response7.getFederationQueueWeights();
assertNotNull(federationQueueWeights7);
assertEquals(10, federationQueueWeights7.size());

// Queue8: We are designing a unit test where the number of records
// we need to retrieve exceeds the maximum number of Policies available.
QueryFederationQueuePoliciesRequest request8 =
QueryFederationQueuePoliciesRequest.newInstance(10, 10, null, null);
LambdaTestUtils.intercept(YarnException.class,
"The index of the records to be retrieved has exceeded the maximum index.",
() -> interceptor.listFederationQueuePolicies(request8));
}
}

0 comments on commit 5f47f09

Please sign in to comment.