diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java index 268522c1b1921..b7c1462a60d56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java @@ -130,7 +130,7 @@ public void init(String userName) { ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("RPC Router RMAdminClient-" + userName + "-%d ").build(); - BlockingQueue workQueue = new LinkedBlockingQueue(); + BlockingQueue workQueue = new LinkedBlockingQueue<>(); this.executorService = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory); @@ -1032,7 +1032,7 @@ public QueryFederationQueuePoliciesResponse listFederationQueuePolicies( } try { - QueryFederationQueuePoliciesResponse response = null; + QueryFederationQueuePoliciesResponse response; long startTime = clock.getTime(); String queue = request.getQueue(); @@ -1056,9 +1056,15 @@ public QueryFederationQueuePoliciesResponse listFederationQueuePolicies( // We filter by pagination. response = filterPoliciesConfigurationsByQueues(queues, policiesConfigurations, pageSize, currentPage); + } else { + // If we don't have any filtering criteria, we should also support paginating the results. + response = filterPoliciesConfigurations(policiesConfigurations, pageSize, currentPage); } long stopTime = clock.getTime(); routerMetrics.succeededListFederationQueuePoliciesRetrieved(stopTime - startTime); + if (response == null) { + response = QueryFederationQueuePoliciesResponse.newInstance(); + } return response; } catch (Exception e) { routerMetrics.incrListFederationQueuePoliciesFailedRetrieved(); @@ -1137,12 +1143,75 @@ private QueryFederationQueuePoliciesResponse filterPoliciesConfigurationsByQueue } } + // Step3. To paginate the returned results. + return queryFederationQueuePoliciesPagination(federationQueueWeights, pageSize, currentPage); + } + + /** + * Filter PoliciesConfigurations, and we paginate Policies within this method. + * + * @param policiesConfigurations policy configurations. + * @param pageSize Items per page. + * @param currentPage The number of pages to be queried. + * @return federation queue policies response. + * @throws YarnException indicates exceptions from yarn servers. + */ + private QueryFederationQueuePoliciesResponse filterPoliciesConfigurations( + Map policiesConfigurations, + int pageSize, int currentPage) throws YarnException { + + // Step1. Check the parameters, if the policy list is empty, return empty directly. + if (MapUtils.isEmpty(policiesConfigurations)) { + return null; + } + + // Step2. Traverse policiesConfigurations and obtain the FederationQueueWeight list. + List federationQueueWeights = new ArrayList<>(); + for (Map.Entry entry : + policiesConfigurations.entrySet()) { + String queue = entry.getKey(); + SubClusterPolicyConfiguration policyConf = entry.getValue(); + if (policyConf == null) { + continue; + } + FederationQueueWeight federationQueueWeight = parseFederationQueueWeight(queue, policyConf); + if (federationQueueWeight != null) { + federationQueueWeights.add(federationQueueWeight); + } + } + + // Step3. To paginate the returned results. + return queryFederationQueuePoliciesPagination(federationQueueWeights, pageSize, currentPage); + } + + /** + * Pagination for FederationQueuePolicies. + * + * @param queueWeights List Of FederationQueueWeight. + * @param pageSize Items per page. + * @param currentPage The number of pages to be queried. + * @return federation queue policies response. + * @throws YarnException indicates exceptions from yarn servers. + */ + private QueryFederationQueuePoliciesResponse queryFederationQueuePoliciesPagination( + List queueWeights, int pageSize, int currentPage) + throws YarnException { + if (CollectionUtils.isEmpty(queueWeights)) { + return null; + } + int startIndex = (currentPage - 1) * pageSize; - int endIndex = Math.min(startIndex + pageSize, federationQueueWeights.size()); + int endIndex = Math.min(startIndex + pageSize, queueWeights.size()); + + if (startIndex > endIndex) { + throw new YarnException("The index of the records to be retrieved " + + "has exceeded the maximum index."); + } + List subFederationQueueWeights = - federationQueueWeights.subList(startIndex, endIndex); + queueWeights.subList(startIndex, endIndex); - int totalSize = federationQueueWeights.size(); + int totalSize = queueWeights.size(); int totalPage = (totalSize % pageSize == 0) ? totalSize / pageSize : (totalSize / pageSize) + 1; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java index 8439b01222e2c..520c25d22cb09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/TestFederationRMAdminInterceptor.java @@ -965,5 +965,28 @@ public void testFilterPoliciesConfigurationsByQueues() throws Exception { List federationQueueWeights6 = response6.getFederationQueueWeights(); assertNotNull(federationQueueWeights6); assertEquals(1, federationQueueWeights6.size()); + + // Queue7: We design such a test case, we do not set any filter conditions, + // but we need to get the return results + QueryFederationQueuePoliciesRequest request7 = + QueryFederationQueuePoliciesRequest.newInstance(10, 1, null, null); + QueryFederationQueuePoliciesResponse response7 = + interceptor.listFederationQueuePolicies(request7); + assertNotNull(response7); + assertEquals(1, response7.getCurrentPage()); + assertEquals(10, response7.getPageSize()); + assertEquals(3, response7.getTotalPage()); + assertEquals(26, response7.getTotalSize()); + List federationQueueWeights7 = response7.getFederationQueueWeights(); + assertNotNull(federationQueueWeights7); + assertEquals(10, federationQueueWeights7.size()); + + // Queue8: We are designing a unit test where the number of records + // we need to retrieve exceeds the maximum number of Policies available. + QueryFederationQueuePoliciesRequest request8 = + QueryFederationQueuePoliciesRequest.newInstance(10, 10, null, null); + LambdaTestUtils.intercept(YarnException.class, + "The index of the records to be retrieved has exceeded the maximum index.", + () -> interceptor.listFederationQueuePolicies(request8)); } }