Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4569: SCATTER_GATHER + BROADCAST hangs on DAG Recovery #361

Merged
merged 5 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2750,7 +2750,7 @@ private VertexState setupVertex() {
: rootInputDescriptors.values()) {
if (input.getControllerDescriptor() != null &&
input.getControllerDescriptor().getClassName() != null) {
if (inputsWithInitializers == null) {
if (!hasInputInitializers()) {
inputsWithInitializers = Sets.newHashSet();
}
inputsWithInitializers.add(input.getName());
Expand All @@ -2771,7 +2771,7 @@ private VertexState setupVertex() {
}
}

if (hasBipartite && inputsWithInitializers != null) {
if (hasBipartite && hasInputInitializers()) {
LOG.error("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
return finished(VertexState.FAILED);
}
Expand Down Expand Up @@ -2819,6 +2819,22 @@ private VertexState setupVertex() {
return VertexState.INITED;
}

private boolean hasInputInitializers() {
return inputsWithInitializers != null;
}

private boolean usesRootInputVertexManager() {
// RootInputVertexManager can start tasks even though any parents are not fully initialized.
if (vertexPlan.hasVertexManagerPlugin()) {
final VertexManagerPluginDescriptor pluginDesc = DagTypeConverters
.convertVertexManagerPluginDescriptorFromDAGPlan(vertexPlan.getVertexManagerPlugin());
return pluginDesc.getClassName().equals(RootInputVertexManager.class.getName());
} else {
// This case implicitly uses RootInputVertexManager. See VertexImpl#assignVertexManager
return hasInputInitializers();
}
}

private boolean isVertexInitSkippedInParentVertices() {
for (Map.Entry<Vertex, Edge> entry : sourceVertices.entrySet()) {
if(!(((VertexImpl) entry.getKey()).isVertexInitSkipped())) {
Expand All @@ -2828,24 +2844,36 @@ private boolean isVertexInitSkippedInParentVertices() {
return true;
}

private void assignVertexManager() throws TezException {
private boolean canSkipInitialization() {
// condition for skip initializing stage
// - VertexInputInitializerEvent is seen
// - VertexReconfigureDoneEvent is seen
// - Reason to check whether VertexManager has complete its responsibility
// - VertexInitializedEvent is seen
// - VertexConfigurationDoneEvent is seen
// - Reason to check whether VertexManager has completed its responsibility
// VertexManager actually is involved in the InputInitializer (InputInitializer generate events
// and send them to VertexManager which do some processing and send back to Vertex), so that means
// Input initializer will affect on the VertexManager and we couldn't skip the initializing step if
// Input initializer will affect on the VertexManager and we couldn't skip the initializing step if
// VertexManager has not completed its responsibility.
// - Why using VertexReconfigureDoneEvent
// - VertexReconfigureDoneEvent represent the case that user use API reconfigureVertex
// VertexReconfigureDoneEvent will be logged
// - Why using VertexConfigurationDoneEvent
// - VertexConfigurationDoneEvent represent the case that user use API reconfigureVertex
// VertexConfigurationDoneEvent will be logged
// - TaskStartEvent is seen in that vertex or setVertexParallelism is called
// - All the parent vertices have skipped initializing stage while recovering
if (recoveryData != null && recoveryData.shouldSkipInit()
&& (recoveryData.isVertexTasksStarted() ||
recoveryData.getVertexConfigurationDoneEvent().isSetParallelismCalled())
&& isVertexInitSkippedInParentVertices()) {
// - Or RootInputVertexManager is used, which can start without waiting for parent vertices
if (recoveryData == null) {
return false;
}
if (!recoveryData.shouldSkipInit()) {
return false;
}
if (!recoveryData.isVertexTasksStarted()
&& !recoveryData.getVertexConfigurationDoneEvent().isSetParallelismCalled()) {
return false;
}
return isVertexInitSkippedInParentVertices() || usesRootInputVertexManager();
}

private void assignVertexManager() throws TezException {
if (canSkipInitialization()) {
// Replace the original VertexManager with NoOpVertexManager if the reconfiguration is done in the last AM attempt
VertexConfigurationDoneEvent reconfigureDoneEvent = recoveryData.getVertexConfigurationDoneEvent();
if (LOG.isInfoEnabled()) {
Expand Down Expand Up @@ -2909,7 +2937,7 @@ && isVertexInitSkippedInParentVertices()) {
// If there is a one to one edge then we use the InputReadyVertexManager
// If there is a scatter-gather edge then we use the ShuffleVertexManager
// Else we use the default ImmediateStartVertexManager
if (inputsWithInitializers != null) {
if (hasInputInitializers()) {
LOG.info("Setting vertexManager to RootInputVertexManager for "
+ logIdentifier);
vertexManager = new VertexManager(RootInputVertexManager
Expand Down Expand Up @@ -3084,7 +3112,7 @@ private VertexState handleInitEvent(VertexImpl vertex) {
LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split"
+ " to set #tasks for the vertex " + vertex.getLogIdentifier());

if (vertex.inputsWithInitializers != null) {
if (vertex.hasInputInitializers()) {
if (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit()) {
LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
try {
Expand Down Expand Up @@ -3123,8 +3151,7 @@ private VertexState handleInitEvent(VertexImpl vertex) {
LOG.info("Creating " + vertex.numTasks + " tasks for vertex: " + vertex.logIdentifier);
vertex.createTasks();
// this block may return VertexState.INITIALIZING
if (vertex.inputsWithInitializers != null &&
(vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit())) {
if (vertex.hasInputInitializers() && (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit())) {
LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
try {
vertex.setupInputInitializerManager();
Expand Down
Loading
Loading