diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 17a826e7e7..9aab846c9e 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -844,6 +844,17 @@ public TezConfiguration(boolean loadDefaults) { public static final String TEZ_AM_YARN_SCHEDULER_CLASS_DEFAULT = "org.apache.tez.dag.app.rm.YarnTaskSchedulerService"; + /** + * Int value. The AM waits this amount of time when the first DAG is submitted but not all the services are ready. + * This can happen when the client RPC handler is up and able to accept DAGs but e.g. task scheduler + * manager is not ready (e.g. a task scheduler is waiting for external resources). + * A value equal or less than 0 is not supported and leads to an exception. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty + public static final String TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS = TEZ_AM_PREFIX + "ready.for.submit.timeout.ms"; + public static final int TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS_DEFAULT = 30000; + /** Int value. The amount of memory in MB to be used by the AppMaster */ @ConfigurationScope(Scope.AM) @ConfigurationProperty(type="integer") diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 1352b68f26..9c5fd93585 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -264,6 +264,7 @@ public class DAGAppMaster extends AbstractService { private DagEventDispatcher dagEventDispatcher; private VertexEventDispatcher vertexEventDispatcher; private TaskSchedulerManager taskSchedulerManager; + private DAGAppMasterReadinessService appMasterReadinessService; private WebUIService webUIService; private HistoryEventHandler historyEventHandler; private final Map amResources = new HashMap(); @@ -589,6 +590,8 @@ public synchronized void serviceInit(final Configuration conf) throws Exception taskSchedulerManager); addIfServiceDependency(taskSchedulerManager, clientRpcServer); + appMasterReadinessService = createAppMasterReadinessService(); + this.containerLauncherManager = createContainerLauncherManager(containerLauncherDescriptors, isLocal); addIfService(containerLauncherManager, true); @@ -664,6 +667,15 @@ protected TaskSchedulerManager createTaskSchedulerManager( taskSchedulerDescriptors, isLocal, hadoopShim); } + @VisibleForTesting + protected DAGAppMasterReadinessService createAppMasterReadinessService() { + DAGAppMasterReadinessService service = + new DAGAppMasterReadinessService(DAGAppMasterReadinessService.class.getName()); + addIfService(service, false); + addIfServiceDependency(service, taskSchedulerManager); + return service; + } + @VisibleForTesting protected ContainerSignatureMatcher createContainerSignatureMatcher() { return new ContainerContextMatcher(); @@ -1300,6 +1312,8 @@ public Void run() throws Exception { public String submitDAGToAppMaster(DAGPlan dagPlan, Map additionalResources) throws TezException { + appMasterReadinessService.waitToBeReady(); + if (sessionStopped.get()) { throw new SessionNotRunning("AM unable to accept new DAG submissions." + " In the process of shutting down"); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterReadinessService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterReadinessService.java new file mode 100644 index 0000000000..cd7dff0f6b --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMasterReadinessService.java @@ -0,0 +1,83 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tez.dag.app; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is an artifical service to be used in DAGAppMaster, + * which can be added to have dependencies that are crucial in order to be + * able to run DAGs. + * + */ +public class DAGAppMasterReadinessService extends AbstractService { + private static final Logger LOG = LoggerFactory.getLogger(DAGAppMasterReadinessService.class); + + private AtomicBoolean ready = new AtomicBoolean(false); + private int timeoutMs; + + public DAGAppMasterReadinessService(String name) { + super(name); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + timeoutMs = getConfig().getInt(TezConfiguration.TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS, + TezConfiguration.TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS_DEFAULT); + if (timeoutMs <= 0) { + throw new TezException( + "timeout <= 0 is not supported for " + TezConfiguration.TEZ_AM_READY_FOR_SUBMIT_TIMEOUT_MS); + } + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + ready.set(true); + } + + /** + * The waitToBeReady waits until this service really starts. When the serviceStart + * is called and this service is ready, we can make sure that the dependency services + * has already been started too. + * @throws TezException + */ + public void waitToBeReady() throws TezException { + long start = System.currentTimeMillis(); + while (!ready.get()) { + if (System.currentTimeMillis() - start > timeoutMs) { + throw new TezException("App Master is not ready within the configured time period (" + timeoutMs + "ms). " + + "Please check logs for AM service states."); + } + try { + LOG.info("App is not ready yet, waiting 100ms"); + Thread.sleep(100); + } catch (InterruptedException e) { + throw new TezException(e); + } + } + } +} diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java index d8167dbcc4..89103f6fa5 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestDAGAppMaster.java @@ -630,6 +630,7 @@ public TestTokenIdentifier createIdentifier() { public static class DAGAppMasterForTest extends DAGAppMaster { private DAGAppMasterShutdownHandler mockShutdown; private TaskSchedulerManager mockScheduler = mock(TaskSchedulerManager.class); + private DAGAppMasterReadinessService mockAppMasterReadinessService = mock(DAGAppMasterReadinessService.class); public DAGAppMasterForTest(ApplicationAttemptId attemptId, boolean isSession) { super(attemptId, ContainerId.newContainerId(attemptId, 1), "hostname", 12345, 12346, @@ -674,5 +675,10 @@ protected TaskSchedulerManager createTaskSchedulerManager( List taskSchedulerDescriptors) { return mockScheduler; } + + @Override + protected DAGAppMasterReadinessService createAppMasterReadinessService() { + return mockAppMasterReadinessService; + } } }