Skip to content

Commit

Permalink
TEZ-4589: Counter for the overall duration of succeeded/failed/killed…
Browse files Browse the repository at this point in the history
… task attempts
  • Loading branch information
abstractdog committed Nov 22, 2024
1 parent 7cd6480 commit c0ce8f3
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,28 @@ public enum DAGCounter {
NUM_KILLED_TASKS,
NUM_SUCCEEDED_TASKS,
TOTAL_LAUNCHED_TASKS,

/* The durations of task attempts are categorized based on their final states. The duration of successful tasks
can serve as a reference when analyzing the durations of failed or killed tasks. This is because solely examining
failed or killed task durations may be misleading, as these durations are measured from the submission time,
which does not always correspond to the actual start time of the task attempt on executor nodes
(e.g., in scenarios involving Hive LLAP).
These counters align with the duration metrics used for WALL_CLOCK_MILLIS.
As such, the following relationship applies:
WALL_CLOCK_MILLIS = DURATION_FAILED_TASKS_MILLIS + DURATION_KILLED_TASKS_MILLIS + DURATION_SUCCEEDED_TASKS_MILLIS
*/

// Total amount of time spent on running FAILED task attempts. This can be blamed for performance degradation, as a
// DAG can still finish successfully in the presence of failed attempts.
DURATION_FAILED_TASKS_MILLIS,

// Total amount of time spent on running KILLED task attempts.
DURATION_KILLED_TASKS_MILLIS,

// Total amount of time spent on running SUCCEEDED task attempts, which can be a reference together with the same for
// FAILED and KILLED attempts.
DURATION_SUCCEEDED_TASKS_MILLIS,

OTHER_LOCAL_TASKS,
DATA_LOCAL_TASKS,
RACK_LOCAL_TASKS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class DAGEventCounterUpdate extends DAGEvent {

public DAGEventCounterUpdate(TezDAGID dagId) {
super(dagId, DAGEventType.DAG_COUNTER_UPDATE);
counterUpdates = new ArrayList<DAGEventCounterUpdate.CounterIncrementalUpdate>();
counterUpdates = new ArrayList<>();
}

public void addCounterUpdate(Enum<?> key, long incrValue) {
Expand All @@ -56,5 +56,9 @@ public Enum<?> getCounterKey() {
public long getIncrementValue() {
return incrValue;
}

public String toString(){
return String.format("DAGEventCounterUpdate.CounterIncrementalUpdate(key=%s, incrValue=%d)", key, incrValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -967,23 +967,26 @@ private static DAGEventCounterUpdate createDAGCounterUpdateEventTALaunched(
return dagCounterEvent;
}

private static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished(
@VisibleForTesting
static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished(
TaskAttemptImpl taskAttempt, TaskAttemptState taState) {
DAGEventCounterUpdate jce =
new DAGEventCounterUpdate(taskAttempt.getDAGID());

long amSideWallClockTimeMs = TimeUnit.NANOSECONDS.toMillis(taskAttempt.getDurationNs());
jce.addCounterUpdate(DAGCounter.WALL_CLOCK_MILLIS, amSideWallClockTimeMs);

if (taState == TaskAttemptState.FAILED) {
jce.addCounterUpdate(DAGCounter.NUM_FAILED_TASKS, 1);
jce.addCounterUpdate(DAGCounter.DURATION_FAILED_TASKS_MILLIS, amSideWallClockTimeMs);
} else if (taState == TaskAttemptState.KILLED) {
jce.addCounterUpdate(DAGCounter.NUM_KILLED_TASKS, 1);
jce.addCounterUpdate(DAGCounter.DURATION_KILLED_TASKS_MILLIS, amSideWallClockTimeMs);
} else if (taState == TaskAttemptState.SUCCEEDED ) {
jce.addCounterUpdate(DAGCounter.NUM_SUCCEEDED_TASKS, 1);
jce.addCounterUpdate(DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS, amSideWallClockTimeMs);
}

long amSideWallClockTimeMs = TimeUnit.NANOSECONDS.toMillis(
taskAttempt.getDurationNs());
jce.addCounterUpdate(DAGCounter.WALL_CLOCK_MILLIS, amSideWallClockTimeMs);

return jce;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.tez.dag.app.dag.impl;

import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.dag.app.MockClock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -2261,6 +2263,85 @@ public void testMapTaskIsBlamedByDownstreamAttemptsFromDifferentHosts() {
Assert.assertEquals(TaskAttemptStateInternal.FAILED, resultState2);
}

@Test
public void testDAGCounterUpdateEvent(){
TaskAttemptImpl taImpl = getMockTaskAttempt();

DAGEventCounterUpdate counterUpdateSucceeded = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl,
TaskAttemptState.SUCCEEDED);
List<DAGEventCounterUpdate.CounterIncrementalUpdate> succeededUpdates = counterUpdateSucceeded.getCounterUpdates();
// SUCCEEDED task related counters are updated (+ WALL_CLOCK_MILLIS)
assertCounterIncrementalUpdate(succeededUpdates, DAGCounter.NUM_SUCCEEDED_TASKS, 1);
assertCounterIncrementalUpdate(succeededUpdates, DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS, 1000);
assertCounterIncrementalUpdate(succeededUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000);
// other counters are not updated (no FAILED, no KILLED)
assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.NUM_FAILED_TASKS);
assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.NUM_KILLED_TASKS);
assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.DURATION_FAILED_TASKS_MILLIS);
assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.DURATION_KILLED_TASKS_MILLIS);

DAGEventCounterUpdate counterUpdateFailed = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl,
TaskAttemptState.FAILED);
List<DAGEventCounterUpdate.CounterIncrementalUpdate> failedUpdates = counterUpdateFailed.getCounterUpdates();
// FAILED task related counters are updated (+ WALL_CLOCK_MILLIS)
assertCounterIncrementalUpdate(failedUpdates, DAGCounter.NUM_FAILED_TASKS, 1);
assertCounterIncrementalUpdate(failedUpdates, DAGCounter.DURATION_FAILED_TASKS_MILLIS, 1000);
assertCounterIncrementalUpdate(failedUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000);
// other counters are not updated (no SUCCEEDED, no KILLED)
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.NUM_SUCCEEDED_TASKS);
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.NUM_KILLED_TASKS);
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.DURATION_KILLED_TASKS_MILLIS);
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS);

DAGEventCounterUpdate counterUpdateKilled = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl,
TaskAttemptState.KILLED);
List<DAGEventCounterUpdate.CounterIncrementalUpdate> killedUpdates = counterUpdateKilled.getCounterUpdates();
// KILLED task related counters are updated (+ WALL_CLOCK_MILLIS)
assertCounterIncrementalUpdate(killedUpdates, DAGCounter.NUM_KILLED_TASKS, 1);
assertCounterIncrementalUpdate(killedUpdates, DAGCounter.DURATION_KILLED_TASKS_MILLIS, 1000);
assertCounterIncrementalUpdate(killedUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000);
// other counters are not updated (no SUCCEEDED, no FAILED)
assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.NUM_SUCCEEDED_TASKS);
assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.NUM_FAILED_TASKS);
assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.DURATION_FAILED_TASKS_MILLIS);
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS);
}

private TaskAttemptImpl getMockTaskAttempt() {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
TezDAGID dagID = TezDAGID.getInstance(appId, 1);
TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);

return new MockTaskAttemptImpl(taskID, 1, mock(EventHandler.class),
mock(TaskCommunicatorManagerInterface.class), new Configuration(), new MonotonicClock(),
mock(TaskHeartbeatHandler.class), mock(AppContext.class), false,
mock(Resource.class), mock(ContainerContext.class), false);
}

private void assertCounterIncrementalUpdate(List<DAGEventCounterUpdate.CounterIncrementalUpdate> counterUpdates,
DAGCounter counter, int expectedValue) {
for (DAGEventCounterUpdate.CounterIncrementalUpdate update : counterUpdates) {
if (update.getCounterKey().equals(counter) && update.getIncrementValue() == expectedValue) {
return;
}
}
Assert.fail(
String.format("Haven't found counter update %s=%d, instead seen: %s", counter, expectedValue, counterUpdates));
}

private void assertCounterIncrementalUpdateNotFound(
List<DAGEventCounterUpdate.CounterIncrementalUpdate> counterUpdates, DAGCounter counter) {
for (DAGEventCounterUpdate.CounterIncrementalUpdate update : counterUpdates) {
if (update.getCounterKey().equals(counter)) {
Assert.fail(
String.format("Found counter update %s=%d, which is not expected", counter, update.getIncrementValue()));
}
}
}

private Event verifyEventType(List<Event> events,
Class<? extends Event> eventClass, int expectedOccurences) {
int count = 0;
Expand Down Expand Up @@ -2344,6 +2425,11 @@ protected void logJobHistoryAttemptUnsuccesfulCompletion(
protected void sendInputFailedToConsumers() {
inputFailedReported = true;
}

@Override
public long getDurationNs(){
return 1000000000L; // 1000000000ns = 1000ms
}
}

private static ContainerContext createFakeContainerContext() {
Expand Down

0 comments on commit c0ce8f3

Please sign in to comment.