diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index f8f2750267..5aee418b2d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -2817,6 +2817,18 @@ private VertexState setupVertex() { return VertexState.INITED; } + private boolean canSkipInitializingParents() { + // Both cases use RootInputVertexManager. RootInputVertexManager can start tasks even though + // any parents are not fully initialized. + if (vertexPlan.hasVertexManagerPlugin()) { + final VertexManagerPluginDescriptor pluginDesc = DagTypeConverters + .convertVertexManagerPluginDescriptorFromDAGPlan(vertexPlan.getVertexManagerPlugin()); + return pluginDesc.getClassName().equals(RootInputVertexManager.class.getName()); + } else { + return inputsWithInitializers != null; + } + } + private boolean isVertexInitSkippedInParentVertices() { for (Map.Entry entry : sourceVertices.entrySet()) { if(!(((VertexImpl) entry.getKey()).isVertexInitSkipped())) { @@ -2843,7 +2855,7 @@ private void assignVertexManager() throws TezException { if (recoveryData != null && recoveryData.shouldSkipInit() && (recoveryData.isVertexTasksStarted() || recoveryData.getVertexConfigurationDoneEvent().isSetParallelismCalled()) - && isVertexInitSkippedInParentVertices()) { + && (canSkipInitializingParents() || isVertexInitSkippedInParentVertices())) { // Replace the original VertexManager with NoOpVertexManager if the reconfiguration is done in the last AM attempt VertexConfigurationDoneEvent reconfigureDoneEvent = recoveryData.getVertexConfigurationDoneEvent(); if (LOG.isInfoEnabled()) {