From e7e53b7b23a8a79277b8df11613a73351accee55 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Thu, 25 Jan 2024 08:56:26 +0530 Subject: [PATCH 1/3] TEZ-4451: ThreadLevel IO Stats Support for TEZ. --- .../org/apache/tez/runtime/task/TaskRunner2Callable.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java index 810a806228..7d9df70193 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java @@ -18,6 +18,8 @@ import java.security.PrivilegedExceptionAction; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.TezCommonUtils; @@ -75,6 +77,7 @@ public TaskRunner2CallableResult run() throws Exception { LOG.info("Initializing task" + ", taskAttemptId={}", task.getTaskAttemptID()); TezUtilsInternal.setHadoopCallerContext(task.getHadoopShim(), task.getTaskAttemptID()); TezCommonUtils.logCredentials(LOG, ugi.getCredentials(), "taskInit"); + IOStatisticsContext.getCurrentIOStatisticsContext().reset(); task.initialize(); if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) { @@ -116,6 +119,10 @@ public TaskRunner2CallableResult run() throws Exception { // For a successful task, however, this should be almost no delay since close has already happened. maybeFixInterruptStatus(); LOG.info("Cleaning up task {}, stopRequested={}", task.getTaskAttemptID(), stopRequested.get()); + String ioStats = IOStatisticsContext.getCurrentIOStatisticsContext().snapshot().toString(); + if (StringUtils.isNotEmpty(ioStats)) { + LOG.info("TaskAttemptId={}, IOStatistics={}", task.getTaskAttemptID(), ioStats); + } task.getOutputContexts().forEach(outputContext -> outputContext.trapEvents(new TezTrapEventHandler(outputContext, this.tezUmbilical))); From 6e96ff4505391f9c918c28232a3e6f0fdf57e141 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Mon, 29 Jan 2024 00:51:11 +0530 Subject: [PATCH 2/3] Change to use IOStatisticsLogging. --- .../apache/tez/runtime/task/TaskRunner2Callable.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java index 7d9df70193..30f340620b 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java @@ -19,7 +19,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.statistics.IOStatisticsContext; +import org.apache.hadoop.fs.statistics.IOStatisticsLogging; import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.TezCommonUtils; @@ -30,6 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext; + /** * This class is responsible for running a {@link LogicalIOProcessorRuntimeTask}. * It does not worry about reporting errors, heartbeats etc. @@ -77,7 +79,7 @@ public TaskRunner2CallableResult run() throws Exception { LOG.info("Initializing task" + ", taskAttemptId={}", task.getTaskAttemptID()); TezUtilsInternal.setHadoopCallerContext(task.getHadoopShim(), task.getTaskAttemptID()); TezCommonUtils.logCredentials(LOG, ugi.getCredentials(), "taskInit"); - IOStatisticsContext.getCurrentIOStatisticsContext().reset(); + getCurrentIOStatisticsContext().reset(); task.initialize(); if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) { @@ -119,9 +121,10 @@ public TaskRunner2CallableResult run() throws Exception { // For a successful task, however, this should be almost no delay since close has already happened. maybeFixInterruptStatus(); LOG.info("Cleaning up task {}, stopRequested={}", task.getTaskAttemptID(), stopRequested.get()); - String ioStats = IOStatisticsContext.getCurrentIOStatisticsContext().snapshot().toString(); + String ioStats = + IOStatisticsLogging.ioStatisticsToPrettyString(getCurrentIOStatisticsContext().getIOStatistics()); if (StringUtils.isNotEmpty(ioStats)) { - LOG.info("TaskAttemptId={}, IOStatistics={}", task.getTaskAttemptID(), ioStats); + LOG.info("TaskAttemptId={}, {}", task.getTaskAttemptID(), ioStats); } task.getOutputContexts().forEach(outputContext -> outputContext.trapEvents(new TezTrapEventHandler(outputContext, From 29f71afcc85cd4c700245a5b040e165e7d60d6dd Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Mon, 5 Feb 2024 21:36:55 +0530 Subject: [PATCH 3/3] Change Imports. --- .../org/apache/tez/runtime/task/TaskRunner2Callable.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java index 30f340620b..e6a74321f1 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TaskRunner2Callable.java @@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.fs.statistics.IOStatisticsLogging; import org.apache.hadoop.security.UserGroupInformation; import org.apache.tez.common.CallableWithNdc; @@ -30,8 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.fs.statistics.IOStatisticsContext.getCurrentIOStatisticsContext; - /** * This class is responsible for running a {@link LogicalIOProcessorRuntimeTask}. * It does not worry about reporting errors, heartbeats etc. @@ -79,7 +78,7 @@ public TaskRunner2CallableResult run() throws Exception { LOG.info("Initializing task" + ", taskAttemptId={}", task.getTaskAttemptID()); TezUtilsInternal.setHadoopCallerContext(task.getHadoopShim(), task.getTaskAttemptID()); TezCommonUtils.logCredentials(LOG, ugi.getCredentials(), "taskInit"); - getCurrentIOStatisticsContext().reset(); + IOStatisticsContext.getCurrentIOStatisticsContext().reset(); task.initialize(); if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) { @@ -121,8 +120,8 @@ public TaskRunner2CallableResult run() throws Exception { // For a successful task, however, this should be almost no delay since close has already happened. maybeFixInterruptStatus(); LOG.info("Cleaning up task {}, stopRequested={}", task.getTaskAttemptID(), stopRequested.get()); - String ioStats = - IOStatisticsLogging.ioStatisticsToPrettyString(getCurrentIOStatisticsContext().getIOStatistics()); + String ioStats = IOStatisticsLogging.ioStatisticsToPrettyString( + IOStatisticsContext.getCurrentIOStatisticsContext().getIOStatistics()); if (StringUtils.isNotEmpty(ioStats)) { LOG.info("TaskAttemptId={}, {}", task.getTaskAttemptID(), ioStats); }