Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4589: Counter for the overall duration of succeeded/failed/killed task attempts #382

Merged
merged 2 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,28 @@ public enum DAGCounter {
NUM_KILLED_TASKS,
NUM_SUCCEEDED_TASKS,
TOTAL_LAUNCHED_TASKS,

/* The durations of task attempts are categorized based on their final states. The duration of successful tasks
can serve as a reference when analyzing the durations of failed or killed tasks. This is because solely examining
failed or killed task durations may be misleading, as these durations are measured from the submission time,
which does not always correspond to the actual start time of the task attempt on executor nodes
(e.g., in scenarios involving Hive LLAP).
These counters align with the duration metrics used for WALL_CLOCK_MILLIS.
As such, the following relationship applies:
WALL_CLOCK_MILLIS = DURATION_FAILED_TASKS_MILLIS + DURATION_KILLED_TASKS_MILLIS + DURATION_SUCCEEDED_TASKS_MILLIS
*/

// Total amount of time spent on running FAILED task attempts. This can be blamed for performance degradation, as a
// DAG can still finish successfully in the presence of failed attempts.
DURATION_FAILED_TASKS_MILLIS,

// Total amount of time spent on running KILLED task attempts.
DURATION_KILLED_TASKS_MILLIS,

// Total amount of time spent on running SUCCEEDED task attempts, which can be a reference together with the same for
// FAILED and KILLED attempts.
DURATION_SUCCEEDED_TASKS_MILLIS,

OTHER_LOCAL_TASKS,
DATA_LOCAL_TASKS,
RACK_LOCAL_TASKS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class DAGEventCounterUpdate extends DAGEvent {

public DAGEventCounterUpdate(TezDAGID dagId) {
super(dagId, DAGEventType.DAG_COUNTER_UPDATE);
counterUpdates = new ArrayList<DAGEventCounterUpdate.CounterIncrementalUpdate>();
counterUpdates = new ArrayList<>();
}

public void addCounterUpdate(Enum<?> key, long incrValue) {
Expand All @@ -56,5 +56,9 @@ public Enum<?> getCounterKey() {
public long getIncrementValue() {
return incrValue;
}

public String toString(){
abstractdog marked this conversation as resolved.
Show resolved Hide resolved
return String.format("DAGEventCounterUpdate.CounterIncrementalUpdate(key=%s, incrValue=%d)", key, incrValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -967,23 +967,26 @@ private static DAGEventCounterUpdate createDAGCounterUpdateEventTALaunched(
return dagCounterEvent;
}

private static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished(
@VisibleForTesting
static DAGEventCounterUpdate createDAGCounterUpdateEventTAFinished(
TaskAttemptImpl taskAttempt, TaskAttemptState taState) {
DAGEventCounterUpdate jce =
new DAGEventCounterUpdate(taskAttempt.getDAGID());

long amSideWallClockTimeMs = TimeUnit.NANOSECONDS.toMillis(taskAttempt.getDurationNs());
jce.addCounterUpdate(DAGCounter.WALL_CLOCK_MILLIS, amSideWallClockTimeMs);

if (taState == TaskAttemptState.FAILED) {
jce.addCounterUpdate(DAGCounter.NUM_FAILED_TASKS, 1);
jce.addCounterUpdate(DAGCounter.DURATION_FAILED_TASKS_MILLIS, amSideWallClockTimeMs);
} else if (taState == TaskAttemptState.KILLED) {
jce.addCounterUpdate(DAGCounter.NUM_KILLED_TASKS, 1);
jce.addCounterUpdate(DAGCounter.DURATION_KILLED_TASKS_MILLIS, amSideWallClockTimeMs);
} else if (taState == TaskAttemptState.SUCCEEDED ) {
jce.addCounterUpdate(DAGCounter.NUM_SUCCEEDED_TASKS, 1);
jce.addCounterUpdate(DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS, amSideWallClockTimeMs);
}

long amSideWallClockTimeMs = TimeUnit.NANOSECONDS.toMillis(
taskAttempt.getDurationNs());
jce.addCounterUpdate(DAGCounter.WALL_CLOCK_MILLIS, amSideWallClockTimeMs);

return jce;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.tez.dag.app.dag.impl;

import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.dag.app.MockClock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -2261,6 +2263,85 @@ public void testMapTaskIsBlamedByDownstreamAttemptsFromDifferentHosts() {
Assert.assertEquals(TaskAttemptStateInternal.FAILED, resultState2);
}

@Test
public void testDAGCounterUpdateEvent(){
TaskAttemptImpl taImpl = getMockTaskAttempt();

DAGEventCounterUpdate counterUpdateSucceeded = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl,
TaskAttemptState.SUCCEEDED);
List<DAGEventCounterUpdate.CounterIncrementalUpdate> succeededUpdates = counterUpdateSucceeded.getCounterUpdates();
// SUCCEEDED task related counters are updated (+ WALL_CLOCK_MILLIS)
assertCounterIncrementalUpdate(succeededUpdates, DAGCounter.NUM_SUCCEEDED_TASKS, 1);
assertCounterIncrementalUpdate(succeededUpdates, DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS, 1000);
assertCounterIncrementalUpdate(succeededUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000);
// other counters are not updated (no FAILED, no KILLED)
assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.NUM_FAILED_TASKS);
assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.NUM_KILLED_TASKS);
assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.DURATION_FAILED_TASKS_MILLIS);
assertCounterIncrementalUpdateNotFound(succeededUpdates, DAGCounter.DURATION_KILLED_TASKS_MILLIS);

DAGEventCounterUpdate counterUpdateFailed = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl,
TaskAttemptState.FAILED);
List<DAGEventCounterUpdate.CounterIncrementalUpdate> failedUpdates = counterUpdateFailed.getCounterUpdates();
// FAILED task related counters are updated (+ WALL_CLOCK_MILLIS)
assertCounterIncrementalUpdate(failedUpdates, DAGCounter.NUM_FAILED_TASKS, 1);
assertCounterIncrementalUpdate(failedUpdates, DAGCounter.DURATION_FAILED_TASKS_MILLIS, 1000);
assertCounterIncrementalUpdate(failedUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000);
// other counters are not updated (no SUCCEEDED, no KILLED)
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.NUM_SUCCEEDED_TASKS);
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.NUM_KILLED_TASKS);
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.DURATION_KILLED_TASKS_MILLIS);
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS);

DAGEventCounterUpdate counterUpdateKilled = TaskAttemptImpl.createDAGCounterUpdateEventTAFinished(taImpl,
TaskAttemptState.KILLED);
List<DAGEventCounterUpdate.CounterIncrementalUpdate> killedUpdates = counterUpdateKilled.getCounterUpdates();
// KILLED task related counters are updated (+ WALL_CLOCK_MILLIS)
assertCounterIncrementalUpdate(killedUpdates, DAGCounter.NUM_KILLED_TASKS, 1);
assertCounterIncrementalUpdate(killedUpdates, DAGCounter.DURATION_KILLED_TASKS_MILLIS, 1000);
assertCounterIncrementalUpdate(killedUpdates, DAGCounter.WALL_CLOCK_MILLIS, 1000);
// other counters are not updated (no SUCCEEDED, no FAILED)
assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.NUM_SUCCEEDED_TASKS);
assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.NUM_FAILED_TASKS);
assertCounterIncrementalUpdateNotFound(killedUpdates, DAGCounter.DURATION_FAILED_TASKS_MILLIS);
assertCounterIncrementalUpdateNotFound(failedUpdates, DAGCounter.DURATION_SUCCEEDED_TASKS_MILLIS);
}

private TaskAttemptImpl getMockTaskAttempt() {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
appId, 0);
TezDAGID dagID = TezDAGID.getInstance(appId, 1);
TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);

return new MockTaskAttemptImpl(taskID, 1, mock(EventHandler.class),
mock(TaskCommunicatorManagerInterface.class), new Configuration(), new MonotonicClock(),
mock(TaskHeartbeatHandler.class), mock(AppContext.class), false,
mock(Resource.class), mock(ContainerContext.class), false);
}

private void assertCounterIncrementalUpdate(List<DAGEventCounterUpdate.CounterIncrementalUpdate> counterUpdates,
DAGCounter counter, int expectedValue) {
for (DAGEventCounterUpdate.CounterIncrementalUpdate update : counterUpdates) {
if (update.getCounterKey().equals(counter) && update.getIncrementValue() == expectedValue) {
return;
}
}
Assert.fail(
String.format("Haven't found counter update %s=%d, instead seen: %s", counter, expectedValue, counterUpdates));
}

private void assertCounterIncrementalUpdateNotFound(
List<DAGEventCounterUpdate.CounterIncrementalUpdate> counterUpdates, DAGCounter counter) {
for (DAGEventCounterUpdate.CounterIncrementalUpdate update : counterUpdates) {
if (update.getCounterKey().equals(counter)) {
Assert.fail(
String.format("Found counter update %s=%d, which is not expected", counter, update.getIncrementValue()));
}
}
}

private Event verifyEventType(List<Event> events,
Class<? extends Event> eventClass, int expectedOccurences) {
int count = 0;
Expand Down Expand Up @@ -2344,6 +2425,11 @@ protected void logJobHistoryAttemptUnsuccesfulCompletion(
protected void sendInputFailedToConsumers() {
inputFailedReported = true;
}

@Override
public long getDurationNs(){
return 1000000000L; // 1000000000ns = 1000ms
}
}

private static ContainerContext createFakeContainerContext() {
Expand Down
Loading