Skip to content

Commit

Permalink
TEZ-4506: Report the node of a task attempt failure better. (#307) (A…
Browse files Browse the repository at this point in the history
…yush Saxena reviewed by Laszlo Bodor)
  • Loading branch information
ayushtkn authored Sep 1, 2023
1 parent b643f9b commit 7855c1f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -401,9 +402,10 @@ private boolean taskTerminated(TezTaskAttemptID taskAttemptID, boolean isKilled,
if (!finalEventQueued.getAndSet(true)) {
List<TezEvent> tezEvents = new ArrayList<TezEvent>();
if (diagnostics == null) {
diagnostics = ExceptionUtils.getStackTrace(t);
diagnostics = "Node: " + InetAddress.getLocalHost() + " : " + ExceptionUtils.getStackTrace(t);
} else {
diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
diagnostics =
"Node: " + InetAddress.getLocalHost() + " : " + diagnostics + ":" + ExceptionUtils.getStackTrace(t);
}
if (isKilled) {
tezEvents.add(new TezEvent(new TaskAttemptKilledEvent(diagnostics),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;

import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -396,19 +398,20 @@ public void verifyTaskFailedEvent(String diagStart, String diagContains, TaskFai
for (TezEvent event : requestEvents) {
if (event.getEvent() instanceof TaskAttemptFailedEvent) {
TaskAttemptFailedEvent failedEvent = (TaskAttemptFailedEvent) event.getEvent();
if (failedEvent.getDiagnostics().startsWith(diagStart)) {
String diagnostics = getDiagnosticsWithoutNodeIp(failedEvent.getDiagnostics());
if (diagnostics.startsWith(diagStart)) {
if (diagContains != null) {
if (failedEvent.getDiagnostics().contains(diagContains)) {
if (diagnostics.contains(diagContains)) {
assertEquals(taskFailureType, failedEvent.getTaskFailureType());
return;
} else {
fail("Diagnostic message does not contain expected message. Found [" +
failedEvent.getDiagnostics() + "], Expected: [" + diagContains + "]");
diagnostics + "], Expected: [" + diagContains + "]");
}
}
} else {
fail("Diagnostic message does not start with expected message. Found [" +
failedEvent.getDiagnostics() + "], Expected: [" + diagStart + "]");
diagnostics + "], Expected: [" + diagStart + "]");
}
}
}
Expand All @@ -425,18 +428,19 @@ public void verifyTaskKilledEvent(String diagStart, String diagContains) {
if (event.getEvent() instanceof TaskAttemptKilledEvent) {
TaskAttemptKilledEvent killedEvent =
(TaskAttemptKilledEvent) event.getEvent();
if (killedEvent.getDiagnostics().startsWith(diagStart)) {
String diagnostics = getDiagnosticsWithoutNodeIp(killedEvent.getDiagnostics());
if (diagnostics.startsWith(diagStart)) {
if (diagContains != null) {
if (killedEvent.getDiagnostics().contains(diagContains)) {
if (diagnostics.contains(diagContains)) {
return;
} else {
fail("Diagnostic message does not contain expected message. Found [" +
killedEvent.getDiagnostics() + "], Expected: [" + diagContains + "]");
diagnostics + "], Expected: [" + diagContains + "]");
}
}
} else {
fail("Diagnostic message does not start with expected message. Found [" +
killedEvent.getDiagnostics() + "], Expected: [" + diagStart + "]");
diagnostics + "], Expected: [" + diagStart + "]");
}
}
}
Expand Down Expand Up @@ -518,6 +522,17 @@ public int getTaskInvocations() {
}
}

private static String getDiagnosticsWithoutNodeIp(String diagnostics) {
String diagnosticsWithoutIP = diagnostics;
if (diagnostics != null && diagnostics.startsWith("Node:")) {
diagnosticsWithoutIP = diagnostics.substring(diagnostics.indexOf(" : ") + 3);
String nodeIp = diagnostics.substring(5, diagnostics.indexOf(" : "));
Assert.assertFalse(nodeIp.isEmpty());
}

return diagnosticsWithoutIP;
}

@SuppressWarnings("deprecation")
public static ContainerId createContainerId(ApplicationId appId) {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
Expand Down

0 comments on commit 7855c1f

Please sign in to comment.