From c8c18fcb49de1d0cadd368ed8db73750f9d154f7 Mon Sep 17 00:00:00 2001 From: okumin Date: Mon, 23 Dec 2024 20:16:55 +0900 Subject: [PATCH] Refactor complex conditions --- .../tez/dag/app/dag/impl/VertexImpl.java | 60 ++++++++++++------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 979fa57807..c79e40a809 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -2750,7 +2750,7 @@ private VertexState setupVertex() { : rootInputDescriptors.values()) { if (input.getControllerDescriptor() != null && input.getControllerDescriptor().getClassName() != null) { - if (inputsWithInitializers == null) { + if (!hasInputInitializers()) { inputsWithInitializers = Sets.newHashSet(); } inputsWithInitializers.add(input.getName()); @@ -2771,7 +2771,7 @@ private VertexState setupVertex() { } } - if (hasBipartite && inputsWithInitializers != null) { + if (hasBipartite && hasInputInitializers()) { LOG.error("A vertex with an Initial Input and a Shuffle Input are not supported at the moment"); return finished(VertexState.FAILED); } @@ -2819,15 +2819,19 @@ private VertexState setupVertex() { return VertexState.INITED; } - private boolean canSkipInitializingParents() { - // Both cases use RootInputVertexManager. RootInputVertexManager can start tasks even though - // any parents are not fully initialized. + private boolean hasInputInitializers() { + return inputsWithInitializers != null; + } + + private boolean usesRootInputVertexManager() { + // RootInputVertexManager can start tasks even though any parents are not fully initialized. if (vertexPlan.hasVertexManagerPlugin()) { final VertexManagerPluginDescriptor pluginDesc = DagTypeConverters .convertVertexManagerPluginDescriptorFromDAGPlan(vertexPlan.getVertexManagerPlugin()); return pluginDesc.getClassName().equals(RootInputVertexManager.class.getName()); } else { - return inputsWithInitializers != null; + // This case implicitly uses RootInputVertexManager. See VertexImpl#assignVertexManager + return hasInputInitializers(); } } @@ -2840,24 +2844,35 @@ private boolean isVertexInitSkippedInParentVertices() { return true; } - private void assignVertexManager() throws TezException { + private boolean canSkipInitialization() { // condition for skip initializing stage - // - VertexInputInitializerEvent is seen - // - VertexReconfigureDoneEvent is seen - // - Reason to check whether VertexManager has complete its responsibility + // - VertexInitializedEvent is seen + // - VertexConfigurationDoneEvent is seen + // - Reason to check whether VertexManager has completed its responsibility // VertexManager actually is involved in the InputInitializer (InputInitializer generate events // and send them to VertexManager which do some processing and send back to Vertex), so that means - // Input initializer will affect on the VertexManager and we couldn't skip the initializing step if + // Input initializer will affect on the VertexManager and we couldn't skip the initializing step if // VertexManager has not completed its responsibility. - // - Why using VertexReconfigureDoneEvent - // - VertexReconfigureDoneEvent represent the case that user use API reconfigureVertex - // VertexReconfigureDoneEvent will be logged - // - TaskStartEvent is seen in that vertex or setVertexParallelism is called + // - Why using VertexConfigurationDoneEvent + // - VertexConfigurationDoneEvent represent the case that user use API reconfigureVertex + // VertexConfigurationDoneEvent will be logged + // - VertexStartedEvent is seen in that vertex or setVertexParallelism is called // - All the parent vertices have skipped initializing stage while recovering - if (recoveryData != null && recoveryData.shouldSkipInit() - && (recoveryData.isVertexTasksStarted() || - recoveryData.getVertexConfigurationDoneEvent().isSetParallelismCalled()) - && (canSkipInitializingParents() || isVertexInitSkippedInParentVertices())) { + // - Or RootInputVertexManager is used, which can start without waiting for parent vertices + if (recoveryData == null) { + return false; + } + if (!recoveryData.shouldSkipInit()) { + return false; + } + if (!recoveryData.isVertexStarted() && !recoveryData.getVertexConfigurationDoneEvent().isSetParallelismCalled()) { + return false; + } + return isVertexInitSkippedInParentVertices() || usesRootInputVertexManager(); + } + + private void assignVertexManager() throws TezException { + if (canSkipInitialization()) { // Replace the original VertexManager with NoOpVertexManager if the reconfiguration is done in the last AM attempt VertexConfigurationDoneEvent reconfigureDoneEvent = recoveryData.getVertexConfigurationDoneEvent(); if (LOG.isInfoEnabled()) { @@ -2921,7 +2936,7 @@ private void assignVertexManager() throws TezException { // If there is a one to one edge then we use the InputReadyVertexManager // If there is a scatter-gather edge then we use the ShuffleVertexManager // Else we use the default ImmediateStartVertexManager - if (inputsWithInitializers != null) { + if (hasInputInitializers()) { LOG.info("Setting vertexManager to RootInputVertexManager for " + logIdentifier); vertexManager = new VertexManager(RootInputVertexManager @@ -3096,7 +3111,7 @@ private VertexState handleInitEvent(VertexImpl vertex) { LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split" + " to set #tasks for the vertex " + vertex.getLogIdentifier()); - if (vertex.inputsWithInitializers != null) { + if (vertex.hasInputInitializers()) { if (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit()) { LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier); try { @@ -3135,8 +3150,7 @@ private VertexState handleInitEvent(VertexImpl vertex) { LOG.info("Creating " + vertex.numTasks + " tasks for vertex: " + vertex.logIdentifier); vertex.createTasks(); // this block may return VertexState.INITIALIZING - if (vertex.inputsWithInitializers != null && - (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit())) { + if (vertex.hasInputInitializers() && (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit())) { LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier); try { vertex.setupInputInitializerManager();