From 56426d52fc586f8eb83fdee38eab30d07ce20406 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Mon, 23 Dec 2024 13:22:02 +0530 Subject: [PATCH] TEZ-4537: Fix several time comparison issues. (#332). (Ayush Saxena, reviewed by Laszlo Bodor) --- .../main/java/org/apache/tez/common/AsyncDispatcher.java | 2 +- .../main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java | 2 +- .../org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java | 6 +++--- .../java/org/apache/tez/runtime/task/ContainerReporter.java | 2 +- .../apache/tez/runtime/library/common/shuffle/Fetcher.java | 2 +- .../shuffle/orderedgrouped/FetcherOrderedGrouped.java | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java index 14f7121837..af4c97a781 100644 --- a/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java +++ b/tez-common/src/main/java/org/apache/tez/common/AsyncDispatcher.java @@ -153,7 +153,7 @@ protected void serviceStop() throws Exception { TezConfiguration.TEZ_AM_DISPATCHER_DRAIN_EVENTS_TIMEOUT_DEFAULT); synchronized (waitForDrained) { - while (!eventQueue.isEmpty() && eventHandlingThread.isAlive() && System.currentTimeMillis() < endTime) { + while (!eventQueue.isEmpty() && eventHandlingThread.isAlive() && (System.currentTimeMillis() - endTime < 0)) { waitForDrained.wait(1000); LOG.info( "Waiting for AsyncDispatcher to drain. Current queue size: {}, handler thread state: {}", diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 3075807417..57a100f566 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -747,7 +747,7 @@ public TezCounters getCachedCounters() { try { // FIXME a better lightweight approach for counters is needed if (fullCounters == null && cachedCounters != null - && ((cachedCountersTimestamp+10000) > System.currentTimeMillis())) { + && ((cachedCountersTimestamp + 10000) - System.currentTimeMillis() > 0)) { LOG.info("Asked for counters" + ", cachedCountersTimestamp=" + cachedCountersTimestamp + ", currentTime=" + System.currentTimeMillis()); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java index 888b0381be..fae19f29e6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java @@ -642,7 +642,7 @@ long getHeldContainerExpireTime(long startTime) { long currentTime = System.currentTimeMillis(); boolean releaseContainer = false; - if (isNew || (heldContainer.getContainerExpiryTime() <= currentTime + if (isNew || (heldContainer.getContainerExpiryTime() - currentTime <= 0 && idleContainerTimeoutMin != -1)) { // container idle timeout has expired or is a new unused container. // new container is possibly a spurious race condition allocation. @@ -775,7 +775,7 @@ long getHeldContainerExpireTime(long startTime) { // if we are not being able to assign containers to pending tasks then // we cannot avoid releasing containers. Or else we may not be able to // get new containers from YARN to match the pending request - if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime + if (!isNew && heldContainer.getContainerExpiryTime() - currentTime <= 0 && idleContainerTimeoutMin != -1) { LOG.info("Container's idle timeout expired. Releasing container" + ", containerId=" + heldContainer.container.getId() @@ -2025,7 +2025,7 @@ private void mainLoop() { LOG.debug("Considering HeldContainer: {} for assignment", delayedContainer); long currentTs = System.currentTimeMillis(); long nextScheduleTs = delayedContainer.getNextScheduleTime(); - if (currentTs >= nextScheduleTs) { + if (currentTs - nextScheduleTs >= 0) { Map assignedContainers = null; synchronized(YarnTaskSchedulerService.this) { // Remove the container and try scheduling it. diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java index 8d73fd3e3b..828e948fed 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/ContainerReporter.java @@ -77,7 +77,7 @@ protected ContainerTask callInternal() throws Exception { private void maybeLogSleepMessage(long sleepTimeMilliSecs) { long currentTime = System.currentTimeMillis(); - if (sleepTimeMilliSecs + currentTime > nextGetTaskPrintTime) { + if ((sleepTimeMilliSecs + currentTime) - nextGetTaskPrintTime > 0) { LOG.info("Sleeping for " + sleepTimeMilliSecs + "ms before retrying getTask again. Got null now. " + "Next getTask sleep message after " + LOG_INTERVAL + "ms"); diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index fc7ad3823b..6094e6bdb9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -1075,7 +1075,7 @@ private boolean shouldRetry(InputAttemptIdentifier srcAttemptId, Throwable ioe) retryStartTime = currentTime; } - if (currentTime - retryStartTime < httpConnectionParams.getReadTimeout()) { + if ((currentTime - retryStartTime) - httpConnectionParams.getReadTimeout() < 0) { LOG.warn("Shuffle output from " + srcAttemptId + " failed (to "+ localHostname +"), retry it."); //retry connecting to the host diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 9e2706e280..a4fad0c416 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -640,7 +640,7 @@ private boolean shouldRetry(MapHost host, Throwable ioe) { retryStartTime = currentTime; } - if (currentTime - retryStartTime < httpConnectionParams.getReadTimeout()) { + if ((currentTime - retryStartTime) - httpConnectionParams.getReadTimeout() < 0) { LOG.warn("Shuffle output from " + host.getHostIdentifier() + " failed, retry it."); //retry connecting to the host