Skip to content

Commit

Permalink
ODP-2643: TEZ-4488: TaskSchedulerManager might not be initialized whe…
Browse files Browse the repository at this point in the history
…n the first DAG comes (apache#280) (Laszlo Bodor reviewed by Rajesh Balamohan) (#23)

(cherry picked from commit 249e017)
(cherry picked from commit 41343dc)
(cherry picked from commit 6d81766)

Co-authored-by: Bodor Laszlo <[email protected]>
  • Loading branch information
prabhjyotsingh and abstractdog authored Nov 20, 2024
1 parent ab1cb2e commit 19a1c74
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 0 deletions.
11 changes: 11 additions & 0 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,17 @@ public TezConfiguration(boolean loadDefaults) {
public static final String TEZ_AM_YARN_SCHEDULER_CLASS_DEFAULT =
"org.apache.tez.dag.app.rm.YarnTaskSchedulerService";

/**
* Int value. The AM waits this amount of time when the first DAG is submitted but not all the services are ready.
* This can happen when the client RPC handler is up and able to accept DAGs but e.g. task scheduler
* manager is not ready (e.g. a task scheduler is waiting for external resources).
* A value equal or less than 0 is not supported and leads to an exception.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty
public static final String TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS = TEZ_AM_PREFIX + "ready.for.submit.timeout.ms";
public static final int TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS_DEFAULT = 30000;

/** Int value. The amount of memory in MB to be used by the AppMaster */
@ConfigurationScope(Scope.AM)
@ConfigurationProperty(type="integer")
Expand Down
14 changes: 14 additions & 0 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ public class DAGAppMaster extends AbstractService {
private DagEventDispatcher dagEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
private TaskSchedulerManager taskSchedulerManager;
private DAGAppMasterReadinessService appMasterReadinessService;
private WebUIService webUIService;
private HistoryEventHandler historyEventHandler;
private final Map<String, LocalResource> amResources = new HashMap<String, LocalResource>();
Expand Down Expand Up @@ -589,6 +590,8 @@ public synchronized void serviceInit(final Configuration conf) throws Exception
taskSchedulerManager);
addIfServiceDependency(taskSchedulerManager, clientRpcServer);

appMasterReadinessService = createAppMasterReadinessService();

this.containerLauncherManager = createContainerLauncherManager(containerLauncherDescriptors,
isLocal);
addIfService(containerLauncherManager, true);
Expand Down Expand Up @@ -664,6 +667,15 @@ protected TaskSchedulerManager createTaskSchedulerManager(
taskSchedulerDescriptors, isLocal, hadoopShim);
}

@VisibleForTesting
protected DAGAppMasterReadinessService createAppMasterReadinessService() {
DAGAppMasterReadinessService service =
new DAGAppMasterReadinessService(DAGAppMasterReadinessService.class.getName());
addIfService(service, false);
addIfServiceDependency(service, taskSchedulerManager);
return service;
}

@VisibleForTesting
protected ContainerSignatureMatcher createContainerSignatureMatcher() {
return new ContainerContextMatcher();
Expand Down Expand Up @@ -1300,6 +1312,8 @@ public Void run() throws Exception {

public String submitDAGToAppMaster(DAGPlan dagPlan,
Map<String, LocalResource> additionalResources) throws TezException {
appMasterReadinessService.waitToBeReady();

if (sessionStopped.get()) {
throw new SessionNotRunning("AM unable to accept new DAG submissions."
+ " In the process of shutting down");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app;

import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This is an artifical service to be used in DAGAppMaster,
* which can be added to have dependencies that are crucial in order to be
* able to run DAGs.
*
*/
public class DAGAppMasterReadinessService extends AbstractService {
private static final Logger LOG = LoggerFactory.getLogger(DAGAppMasterReadinessService.class);

private AtomicBoolean ready = new AtomicBoolean(false);
private int timeoutMs;

public DAGAppMasterReadinessService(String name) {
super(name);
}

@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
timeoutMs = getConfig().getInt(TezConfiguration.TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS,
TezConfiguration.TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS_DEFAULT);
if (timeoutMs <= 0) {
throw new TezException(
"timeout <= 0 is not supported for " + TezConfiguration.TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS);
}
}

@Override
protected void serviceStart() throws Exception {
super.serviceStart();
ready.set(true);
}

/**
* The waitToBeReady waits until this service really starts. When the serviceStart
* is called and this service is ready, we can make sure that the dependency services
* has already been started too.
* @throws TezException
*/
public void waitToBeReady() throws TezException {
long start = System.currentTimeMillis();
while (!ready.get()) {
if (System.currentTimeMillis() - start > timeoutMs) {
throw new TezException("App Master is not ready within the configured time period (" + timeoutMs + "ms). "
+ "Please check logs for AM service states.");
}
try {
LOG.info("App is not ready yet, waiting 100ms");
Thread.sleep(100);
} catch (InterruptedException e) {
throw new TezException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ public TestTokenIdentifier createIdentifier() {
public static class DAGAppMasterForTest extends DAGAppMaster {
private DAGAppMasterShutdownHandler mockShutdown;
private TaskSchedulerManager mockScheduler = mock(TaskSchedulerManager.class);
private DAGAppMasterReadinessService mockAppMasterReadinessService = mock(DAGAppMasterReadinessService.class);

public DAGAppMasterForTest(ApplicationAttemptId attemptId, boolean isSession) {
super(attemptId, ContainerId.newContainerId(attemptId, 1), "hostname", 12345, 12346,
Expand Down Expand Up @@ -674,5 +675,10 @@ protected TaskSchedulerManager createTaskSchedulerManager(
List<NamedEntityDescriptor> taskSchedulerDescriptors) {
return mockScheduler;
}

@Override
protected DAGAppMasterReadinessService createAppMasterReadinessService() {
return mockAppMasterReadinessService;
}
}
}

0 comments on commit 19a1c74

Please sign in to comment.