diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 6f51566f75..75b6a662e0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -1272,7 +1272,7 @@ boolean preemptIfNeeded() { + numHighestPriRequests + " pending requests at pri: " + highestPriRequest.getPriority()); } - + int newContainersReleased = 0; for (int i=0; i mockRMClient = spy( + new AMRMClientAsyncForTest(new AMRMClientForTest(), 100)); + + Configuration conf = new Configuration(); + conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_PERCENTAGE, 50); + + TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, + false, null, null, new PreemptionMatcher(), conf); + final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp); + + final TaskSchedulerWithDrainableContext scheduler = + new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient); + scheduler.initialize(); + scheduler.start(); + int initialRmCapacity = 4; + int lowPriorityTasks = 5; + int highPriorityTasks = 6; + Resource taskAsk = Resource.newInstance(1000, 1); + + Resource totalResource = Resource.newInstance(4000, 4); + when(mockRMClient.getAvailableResources()).thenReturn(totalResource); + + // Add lower priority tasks + Priority lowPriority = Priority.newInstance(74); + for (int i = 0; i < lowPriorityTasks; i++) { + Object low = new Object(); + TaskAttempt ta = mock(TaskAttempt.class); + scheduler.allocateTask(ta, taskAsk, null, null, lowPriority, low, null); + } + + scheduler.getProgress(); // Will update the highest priority + drainableAppCallback.drain(); + // 5 containers requested for lower priority tasks + verify(mockRMClient, times(5)).addContainerRequest(any(CookieContainerRequest.class)); + + // Allocate requested containers + List lowPriorityContainers = new ArrayList<>(); + for (int i = 0; i < initialRmCapacity; i++) { + ContainerId containerId = ContainerId.newContainerId( + ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 1), 1), i); + NodeId nodeId = NodeId.newInstance("host-" + i, 8041); + Container container = Container.newInstance(containerId, nodeId, "host-" + i, taskAsk, lowPriority, null); + lowPriorityContainers.add(container); + } + + totalResource = Resource.newInstance(0, 0); + when(mockRMClient.getAvailableResources()).thenReturn(totalResource); + + // We don't want containers to be assigned to a task by delayedContainerManager as it invokes another preemption flow + // Delayed thread first takes lock on delayedContainerManager instance to check if there are any containers + // We block the thread, ensure all delayed containers have schedule time beyond test's runtime to avoid assignment. + synchronized (scheduler.delayedContainerManager) { + scheduler.onContainersAllocated(lowPriorityContainers); + drainableAppCallback.drain(); + for (HeldContainer container : scheduler.delayedContainerManager.delayedContainers) { + // Set next schedule beyond this test's time to avoid any assignment + container.setNextScheduleTime(System.currentTimeMillis() + 10000); + // No preemption if assignment attempt of new container < 3 + container.incrementAssignmentAttempts(); + container.incrementAssignmentAttempts(); + container.incrementAssignmentAttempts(); + } + } + + // No releases so far + verify(mockRMClient, times(0)).releaseAssignedContainer(any()); + + // Add higher priority task + Priority highPriority = Priority.newInstance(71); + for (int i = 0; i < highPriorityTasks; i++) { + Object high = new Object(); + TaskAttempt ta = mock(TaskAttempt.class); + scheduler.allocateTask(ta, taskAsk, null, null, highPriority, high, null); + } + + drainableAppCallback.drain(); + // low priority tasks + high priority tasks + verify(mockRMClient, times(11)).addContainerRequest(any(CookieContainerRequest.class)); + + // Trigger preemption to release containers as 50% of pending high priority requests + scheduler.getProgress(); + drainableAppCallback.drain(); + + // 50% of 6 high priority requests = 3, 4 containers were held - hence 3 will be released + verify(mockRMClient, times(3)).releaseAssignedContainer(any()); + + // Trigger another preemption cycle + scheduler.getProgress(); + drainableAppCallback.drain(); + // 50% of 6 high priority requests = 3, but only 1 container is held - which will be released, + // incrementing total to 4 + verify(mockRMClient, times(4)).releaseAssignedContainer(any()); + AppFinalStatus finalStatus = + new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", DEFAULT_APP_URL); + when(mockApp.getFinalAppStatus()).thenReturn(finalStatus); + scheduler.shutdown(); + drainableAppCallback.drain(); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Test (timeout=5000) public void testTaskSchedulerPreemption() throws Exception {