diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java index 23c1978430..ca575d4dfa 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/DAGCounter.java @@ -30,6 +30,28 @@ public enum DAGCounter { NUM_KILLED_TASKS, NUM_SUCCEEDED_TASKS, TOTAL_LAUNCHED_TASKS, + + /* The durations of task attempts are categorized based on their final states. The duration of successful tasks + can serve as a reference when analyzing the durations of failed or killed tasks. This is because solely examining + failed or killed task durations may be misleading, as these durations are measured from the submission time, + which does not always correspond to the actual start time of the task attempt on executor nodes + (e.g., in scenarios involving Hive LLAP). + These counters align with the duration metrics used for WALL_CLOCK_MILLIS. + As such, the following relationship applies: + WALL_CLOCK_MILLIS = DURATION_FAILED_TASKS_MILLIS + DURATION_KILLED_TASKS_MILLIS + DURATION_SUCCEEDED_TASKS_MILLIS + */ + + // Total amount of time spent on running FAILED task attempts. This can be blamed for performance degradation, as a + // DAG can still finish successfully in the presence of failed attempts. + DURATION_FAILED_TASKS_MILLIS, + + // Total amount of time spent on running KILLED task attempts. + DURATION_KILLED_TASKS_MILLIS, + + // Total amount of time spent on running SUCCEEDED task attempts, which can be a reference together with the same for + // FAILED and KILLED attempts. + DURATION_SUCCEEDED_TASKS_MILLIS, + OTHER_LOCAL_TASKS, DATA_LOCAL_TASKS, RACK_LOCAL_TASKS, diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java index da0724dd20..3683a4951b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventCounterUpdate.java @@ -29,7 +29,7 @@ public class DAGEventCounterUpdate extends DAGEvent { public DAGEventCounterUpdate(TezDAGID dagId) { super(dagId, DAGEventType.DAG_COUNTER_UPDATE); - counterUpdates = new ArrayList(); + counterUpdates = new ArrayList<>(); } public void addCounterUpdate(Enum key, long incrValue) { @@ -56,5 +56,10 @@ public Enum getCounterKey() { public long getIncrementValue() { return incrValue; } + + @Override + public String toString(){ + return String.format("DAGEventCounterUpdate.CounterIncrementalUpdate(key=%s, incrValue=%d)", key, incrValue); + } } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index fb8aed267b..13769db839 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -967,23 +967,26 @@ private static DAGEventCounterUpdate createDAGCounterUpdateEventTALaunched( return dagCounterEvent; } - private static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished( + @VisibleForTesting + static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished( TaskAttemptImpl taskAttempt, TaskAttemptState taState) { DAGEventCounterUpdate jce = new DAGEventCounterUpdate(taskAttempt.getDAGID()); + long amSideWallClockTimeMs = TimeUnit.NANOSECONDS.toMillis(taskAttempt.getDurationNs()); + jce.addCounterUpdate(DAGCounter.WALL_CLOCK_MILLIS, amSideWallClockTimeMs); + if (taState == TaskAttemptState.FAILED) { jce.addCounterUpdate(DAGCounter.NUM_FAILED_TASKS, 1); + jce.addCounterUpdate(DAGCounter.DURATION_FAILED_TASKS_MILLIS, amSideWallClockTimeMs); } else if (taState == TaskAttemptState.KILLED) { jce.addCounterUpdate(DAGCounter.NUM_KILLED_TASKS, 1); + jce.addCounterUpdate(DAGCounter.DURATION_KILLED_TASKS_MILLIS, amSideWallClockTimeMs); } else if (taState == TaskAttemptState.SUCCEEDED ) { jce.addCounterUpdate(DAGCounter.NUM_SUCCEEDED_TASKS, 1); + jce.addCounterUpdate(DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS, amSideWallClockTimeMs); } - long amSideWallClockTimeMs = TimeUnit.NANOSECONDS.toMillis( - taskAttempt.getDurationNs()); - jce.addCounterUpdate(DAGCounter.WALL_CLOCK_MILLIS, amSideWallClockTimeMs); - return jce; } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index ee8ec67cfd..eeb0cb6ab8 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -18,6 +18,8 @@ package org.apache.tez.dag.app.dag.impl; +import org.apache.hadoop.yarn.util.MonotonicClock; +import org.apache.tez.common.counters.DAGCounter; import org.apache.tez.dag.app.MockClock; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -2261,6 +2263,85 @@ public void testMapTaskIsBlamedByDownstreamAttemptsFromDifferentHosts() { Assert.assertEquals(TaskAttemptStateInternal.FAILED, resultState2); } + @Test + public void testDAGCounterUpdateEvent(){ + TaskAttemptImpl taImpl = getMockTaskAttempt(); + + DAGEventCounterUpdate counterUpdateSucceeded = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl, + TaskAttemptState.SUCCEEDED); + List succeededUpdates = counterUpdateSucceeded.getCounterUpdates(); + // SUCCEEDED task related counters are updated (+ WALL_CLOCK_MILLIS) + assertCounterIncrementalUpdate(succeededUpdates, DAGCounter.NUM_SUCCEEDED_TASKS, 1); + assertCounterIncrementalUpdate(succeededUpdates, DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS, 1000); + assertCounterIncrementalUpdate(succeededUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000); + // other counters are not updated (no FAILED, no KILLED) + assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.NUM_FAILED_TASKS); + assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.NUM_KILLED_TASKS); + assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.DURATION_FAILED_TASKS_MILLIS); + assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.DURATION_KILLED_TASKS_MILLIS); + + DAGEventCounterUpdate counterUpdateFailed = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl, + TaskAttemptState.FAILED); + List failedUpdates = counterUpdateFailed.getCounterUpdates(); + // FAILED task related counters are updated (+ WALL_CLOCK_MILLIS) + assertCounterIncrementalUpdate(failedUpdates, DAGCounter.NUM_FAILED_TASKS, 1); + assertCounterIncrementalUpdate(failedUpdates, DAGCounter.DURATION_FAILED_TASKS_MILLIS, 1000); + assertCounterIncrementalUpdate(failedUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000); + // other counters are not updated (no SUCCEEDED, no KILLED) + assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.NUM_SUCCEEDED_TASKS); + assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.NUM_KILLED_TASKS); + assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.DURATION_KILLED_TASKS_MILLIS); + assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS); + + DAGEventCounterUpdate counterUpdateKilled = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl, + TaskAttemptState.KILLED); + List killedUpdates = counterUpdateKilled.getCounterUpdates(); + // KILLED task related counters are updated (+ WALL_CLOCK_MILLIS) + assertCounterIncrementalUpdate(killedUpdates, DAGCounter.NUM_KILLED_TASKS, 1); + assertCounterIncrementalUpdate(killedUpdates, DAGCounter.DURATION_KILLED_TASKS_MILLIS, 1000); + assertCounterIncrementalUpdate(killedUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000); + // other counters are not updated (no SUCCEEDED, no FAILED) + assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.NUM_SUCCEEDED_TASKS); + assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.NUM_FAILED_TASKS); + assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.DURATION_FAILED_TASKS_MILLIS); + assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS); + } + + private TaskAttemptImpl getMockTaskAttempt() { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 0); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); + TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); + + return new MockTaskAttemptImpl(taskID, 1, mock(EventHandler.class), + mock(TaskCommunicatorManagerInterface.class), new Configuration(), new MonotonicClock(), + mock(TaskHeartbeatHandler.class), mock(AppContext.class), false, + mock(Resource.class), mock(ContainerContext.class), false); + } + + private void assertCounterIncrementalUpdate(List counterUpdates, + DAGCounter counter, int expectedValue) { + for (DAGEventCounterUpdate.CounterIncrementalUpdate update : counterUpdates) { + if (update.getCounterKey().equals(counter) && update.getIncrementValue() == expectedValue) { + return; + } + } + Assert.fail( + String.format("Haven't found counter update %s=%d, instead seen: %s", counter, expectedValue, counterUpdates)); + } + + private void assertCounterIncrementalUpdateNotFound( + List counterUpdates, DAGCounter counter) { + for (DAGEventCounterUpdate.CounterIncrementalUpdate update : counterUpdates) { + if (update.getCounterKey().equals(counter)) { + Assert.fail( + String.format("Found counter update %s=%d, which is not expected", counter, update.getIncrementValue())); + } + } + } + private Event verifyEventType(List events, Class eventClass, int expectedOccurences) { int count = 0; @@ -2344,6 +2425,11 @@ protected void logJobHistoryAttemptUnsuccesfulCompletion( protected void sendInputFailedToConsumers() { inputFailedReported = true; } + + @Override + public long getDurationNs(){ + return 1000000000L; // 1000000000ns = 1000ms + } } private static ContainerContext createFakeContainerContext() {