Skip to content

Commit

Permalink
TEZ-4589: Counter for the overall duration of failed/killed task atte…
Browse files Browse the repository at this point in the history
…mpts
  • Loading branch information
abstractdog committed Nov 21, 2024
1 parent 7cd6480 commit 411e095
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ public enum DAGCounter {
NUM_KILLED_TASKS,
NUM_SUCCEEDED_TASKS,
TOTAL_LAUNCHED_TASKS,

// Total amount of time spent on running KILLED task attempts. This can be blamed for performance degradation.
KILLED_TASK_ATTEMPTS_DURATION_MS,

// Total amount of time spent on running FAILED task attempts. This can be blamed for performance degradation, as a
// DAG can still finish successfully in the presence of failed attempts.
FAILED_TASK_ATTEMPTS_DURATION_MS,

OTHER_LOCAL_TASKS,
DATA_LOCAL_TASKS,
RACK_LOCAL_TASKS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class DAGEventCounterUpdate extends DAGEvent {

public DAGEventCounterUpdate(TezDAGID dagId) {
super(dagId, DAGEventType.DAG_COUNTER_UPDATE);
counterUpdates = new ArrayList<DAGEventCounterUpdate.CounterIncrementalUpdate>();
counterUpdates = new ArrayList<>();
}

public void addCounterUpdate(Enum<?> key, long incrValue) {
Expand All @@ -56,5 +56,9 @@ public Enum<?> getCounterKey() {
public long getIncrementValue() {
return incrValue;
}

public String toString(){
return String.format("DAGEventCounterUpdate.CounterIncrementalUpdate(key=%s, incrValue=%d)", key, incrValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,8 @@ private static DAGEventCounterUpdate createDAGCounterUpdateEventTALaunched(
return dagCounterEvent;
}

private static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished(
@VisibleForTesting
static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished(
TaskAttemptImpl taskAttempt, TaskAttemptState taState) {
DAGEventCounterUpdate jce =
new DAGEventCounterUpdate(taskAttempt.getDAGID());
Expand All @@ -980,10 +981,15 @@ private static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished(
jce.addCounterUpdate(DAGCounter.NUM_SUCCEEDED_TASKS, 1);
}

long amSideWallClockTimeMs = TimeUnit.NANOSECONDS.toMillis(
taskAttempt.getDurationNs());
long amSideWallClockTimeMs = TimeUnit.NANOSECONDS.toMillis(taskAttempt.getDurationNs());
jce.addCounterUpdate(DAGCounter.WALL_CLOCK_MILLIS, amSideWallClockTimeMs);

if (taState == TaskAttemptState.FAILED){
jce.addCounterUpdate(DAGCounter.FAILED_TASK_ATTEMPTS_DURATION_MS, amSideWallClockTimeMs);
}
if (taState == TaskAttemptState.KILLED){
jce.addCounterUpdate(DAGCounter.KILLED_TASK_ATTEMPTS_DURATION_MS, amSideWallClockTimeMs);
}
return jce;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.tez.dag.app.dag.impl;

import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.MockClock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -2261,6 +2264,70 @@ public void testMapTaskIsBlamedByDownstreamAttemptsFromDifferentHosts() {
Assert.assertEquals(TaskAttemptStateInternal.FAILED, resultState2);
}

@Test
public void testDAGCounterUpdateEvent(){
TaskAttemptImpl taImpl = getMockTaskAttempt();

DAGEventCounterUpdate counterUpdateFailed = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl,
TaskAttemptState.FAILED);
List<DAGEventCounterUpdate.CounterIncrementalUpdate> failedUpdates = counterUpdateFailed.getCounterUpdates();
// FAILED task related counters are updated (+ WALL_CLOCK_MILLIS)
assertCounterIncrementalUpdate(failedUpdates, DAGCounter.NUM_FAILED_TASKS, 1);
assertCounterIncrementalUpdate(failedUpdates, DAGCounter.FAILED_TASK_ATTEMPTS_DURATION_MS, 1000);
assertCounterIncrementalUpdate(failedUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000);
// other counters are not updated (no SUCCEEDED, no KILLED)
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.NUM_SUCCEEDED_TASKS);
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.NUM_KILLED_TASKS);
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.KILLED_TASK_ATTEMPTS_DURATION_MS);

DAGEventCounterUpdate counterUpdateKilled = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl,
TaskAttemptState.KILLED);
List<DAGEventCounterUpdate.CounterIncrementalUpdate> killedUpdates = counterUpdateKilled.getCounterUpdates();
// KILLED task related counters are updated (+ WALL_CLOCK_MILLIS)
assertCounterIncrementalUpdate(killedUpdates, DAGCounter.NUM_KILLED_TASKS, 1);
assertCounterIncrementalUpdate(killedUpdates, DAGCounter.KILLED_TASK_ATTEMPTS_DURATION_MS, 1000);
assertCounterIncrementalUpdate(killedUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000);
// other counters are not updated (no SUCCEEDED, no FAILED)
assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.NUM_SUCCEEDED_TASKS);
assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.NUM_FAILED_TASKS);
assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.FAILED_TASK_ATTEMPTS_DURATION_MS);
}

private TaskAttemptImpl getMockTaskAttempt() {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
TezDAGID dagID = TezDAGID.getInstance(appId, 1);
TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);

return new MockTaskAttemptImpl(taskID, 1, mock(EventHandler.class),
mock(TaskCommunicatorManagerInterface.class), new Configuration(), new MonotonicClock(),
mock(TaskHeartbeatHandler.class), mock(AppContext.class), false,
mock(Resource.class), mock(ContainerContext.class), false);
}

private void assertCounterIncrementalUpdate(List<DAGEventCounterUpdate.CounterIncrementalUpdate> counterUpdates,
DAGCounter counter, int expectedValue) {
for (DAGEventCounterUpdate.CounterIncrementalUpdate update : counterUpdates) {
if (update.getCounterKey().equals(counter) && update.getIncrementValue() == expectedValue) {
return;
}
}
Assert.fail(
String.format("Haven't found counter update %s=%d, instead seen: %s", counter, expectedValue, counterUpdates));
}

private void assertCounterIncrementalUpdateNotFound(
List<DAGEventCounterUpdate.CounterIncrementalUpdate> counterUpdates, DAGCounter counter) {
for (DAGEventCounterUpdate.CounterIncrementalUpdate update : counterUpdates) {
if (update.getCounterKey().equals(counter)) {
Assert.fail(
String.format("Found counter update %s=%d, which is not expected", counter, update.getIncrementValue()));
}
}
}

private Event verifyEventType(List<Event> events,
Class<? extends Event> eventClass, int expectedOccurences) {
int count = 0;
Expand Down Expand Up @@ -2344,6 +2411,11 @@ protected void logJobHistoryAttemptUnsuccesfulCompletion(
protected void sendInputFailedToConsumers() {
inputFailedReported = true;
}

@Override
public long getDurationNs(){
return 1000000000L; // 1000000000ns = 1000ms
}
}

private static ContainerContext createFakeContainerContext() {
Expand Down

0 comments on commit 411e095

Please sign in to comment.