Skip to content

Commit

Permalink
TEZ-4543: Throw a special exception to DagClient when there is no cur…
Browse files Browse the repository at this point in the history
…rent DAG (#338). (Laszlo Bodor, reviewed by Ayush Saxena)
  • Loading branch information
abstractdog authored Feb 27, 2024
1 parent f8c2e11 commit 34bb628
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 14 deletions.
4 changes: 4 additions & 0 deletions tez-api/src/main/java/org/apache/tez/common/RPCUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.hadoop.ipc.RemoteException;
import org.apache.tez.dag.api.DAGNotRunningException;
import org.apache.tez.dag.api.NoCurrentDAGException;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezException;

Expand Down Expand Up @@ -112,6 +113,9 @@ public static Void unwrapAndThrowException(ServiceException se)
} else if (DAGNotRunningException.class.isAssignableFrom(realClass)) {
throw instantiateTezException(
realClass.asSubclass(DAGNotRunningException.class), re);
} else if (NoCurrentDAGException.class.isAssignableFrom(realClass)) {
throw instantiateTezException(
realClass.asSubclass(NoCurrentDAGException.class), re);
} else if (TezException.class.isAssignableFrom(realClass)) {
throw instantiateTezException(
realClass.asSubclass(TezException.class), re);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;

/**
* Checked Exception thrown upon error
* Thrown by the AM when the DAG for which the status was queried
* is not running anymore: client can decide further action in this case.
*/
@Private
public class DAGNotRunningException extends TezException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.api;

import org.apache.hadoop.classification.InterfaceAudience.Private;

/**
* Fatal exception: thrown by the AM if there is no DAG running when
* a DAG's status is queried. This is different from {@link org.apache.tez.dag.api.DAGNotRunningException}
* in a sense that this exception is fatal, in which scenario the client might consider the DAG failed, because
* it tries to ask a status from an AM which is not currently running a DAG. This scenario is possible in case
* an AM is restarted and the DagClient fails to realize it's asking the status of a possibly lost DAG.
*/
@Private
public class NoCurrentDAGException extends TezException {
private static final long serialVersionUID = 6337442733802964448L;

public static final String MESSAGE_PREFIX = "No running DAG at present";

public NoCurrentDAGException(String dagId) {
super(MESSAGE_PREFIX + ": " + dagId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.tez.client.FrameworkClient;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAGNotRunningException;
import org.apache.tez.dag.api.NoCurrentDAGException;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
Expand Down Expand Up @@ -408,6 +409,9 @@ private DAGStatus getDAGStatusViaAM(@Nullable Set<StatusGetOpts> statusOptions,
} catch (ApplicationNotFoundException e) {
LOG.info("DAG is no longer running - application not found by YARN", e);
dagCompleted = true;
} catch (NoCurrentDAGException e) {
LOG.info("Got NoCurrentDAGException from AM, returning a failed DAG", e);
return dagLost();
} catch (TezException e) {
// can be either due to a n/w issue or due to AM completed.
LOG.info("Cannot retrieve DAG Status due to TezException: {}", e.getMessage());
Expand All @@ -423,6 +427,14 @@ private DAGStatus getDAGStatusViaAM(@Nullable Set<StatusGetOpts> statusOptions,
return dagStatus;
}

private DAGStatus dagLost() {
DAGProtos.DAGStatusProto.Builder builder = DAGProtos.DAGStatusProto.newBuilder();
DAGStatus dagStatus = new DAGStatus(builder, DagStatusSource.AM);
builder.setState(DAGProtos.DAGStatusStateProto.DAG_FAILED);
builder.addAllDiagnostics(Collections.singleton(NoCurrentDAGException.MESSAGE_PREFIX));
return dagStatus;
}

private VertexStatus getVertexStatusViaAM(String vertexName, Set<StatusGetOpts> statusOptions) throws
IOException {
VertexStatus vertexStatus = null;
Expand All @@ -435,9 +447,11 @@ private VertexStatus getVertexStatusViaAM(String vertexName, Set<StatusGetOpts>
LOG.info("DAG is no longer running - application not found by YARN", e);
dagCompleted = true;
} catch (TezException e) {
// can be either due to a n/w issue of due to AM completed.
// can be either due to a n/w issue or due to AM completed.
LOG.info("Cannot retrieve Vertex Status due to TezException: {}", e.getMessage());
} catch (IOException e) {
// can be either due to a n/w issue of due to AM completed.
LOG.info("Cannot retrieve Vertex Status due to IOException: {}", e.getMessage());
}

if (vertexStatus == null && !dagCompleted) {
Expand All @@ -455,10 +469,11 @@ private VertexStatus getVertexStatusViaAM(String vertexName, Set<StatusGetOpts>
*/
@VisibleForTesting
protected DAGStatus getDAGStatusViaRM() throws TezException, IOException {
LOG.debug("GetDAGStatus via AM for app: {} dag:{}", appId, dagId);
LOG.debug("Get DAG status via framework client for app: {} dag: {}", appId, dagId);
ApplicationReport appReport;
try {
appReport = frameworkClient.getApplicationReport(appId);
LOG.debug("Got appReport from framework client: {}", appReport);
} catch (ApplicationNotFoundException e) {
LOG.info("DAG is no longer running - application not found by YARN", e);
throw new DAGNotRunningException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.client.FrameworkClient;
import org.apache.tez.common.CachedEntity;
import org.apache.tez.dag.api.NoCurrentDAGException;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
Expand Down Expand Up @@ -435,7 +436,7 @@ private static void testAtsEnabled(ApplicationId appId, String dagIdStr, boolean
}

private static class DAGClientRPCImplForTest extends DAGClientRPCImpl {
private AtomicReference<IOException> faultAMInjectedRef;
private AtomicReference<TezException> faultAMInjectedRef;
int numGetStatusViaAmInvocations = 0;

public DAGClientRPCImplForTest(ApplicationId appId, String dagId,
Expand Down Expand Up @@ -472,7 +473,7 @@ DAGStatus getDAGStatusViaAM(Set<StatusGetOpts> statusOptions, long timeout)
return super.getDAGStatusViaAM(statusOptions, timeout);
}

void injectAMFault(IOException exception) {
void injectAMFault(TezException exception) {
faultAMInjectedRef.set(exception);
}
}
Expand Down Expand Up @@ -667,7 +668,7 @@ public void testGetDagStatusWithCachedStatusExpiration() throws Exception {
// When AM proxy throws an exception, the cachedDAGStatus should be returned
dagClientImpl.resetCounters();
dagClientRpc.resetCounters();
dagClientRpc.injectAMFault(new IOException("injected Fault"));
dagClientRpc.injectAMFault(new TezException("injected Fault"));
dagStatus = dagClientImpl.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class));
// get the Status from the cache
assertEquals(0, dagClientImpl.numGetStatusViaRmInvocations);
Expand All @@ -678,7 +679,7 @@ public void testGetDagStatusWithCachedStatusExpiration() throws Exception {

// test that RM is invoked when the cacheExpires and the AM fails.
dagClientRpc.setAMProxy(createMockProxy(DAGStatusStateProto.DAG_SUCCEEDED, 1000L));
dagClientRpc.injectAMFault(new IOException("injected AM Fault"));
dagClientRpc.injectAMFault(new TezException("injected AM Fault"));
dagClientImpl.resetCounters();
dagClientRpc.resetCounters();
dagClientImpl.enforceExpirationCachedDAGStatus();
Expand Down Expand Up @@ -710,4 +711,23 @@ public void testGetDagStatusWithCachedStatusExpiration() throws Exception {
}
}
}

@Test
public void testDagClientReturnsFailedDAGOnNoCurrentDAGException() throws Exception {
TezConfiguration tezConf = new TezConfiguration();

try (DAGClientImplForTest dagClientImpl = new DAGClientImplForTest(mockAppId, dagIdStr, tezConf, null)) {

DAGClientRPCImplForTest dagClientRpc = new DAGClientRPCImplForTest(mockAppId, dagIdStr, tezConf, null);
dagClientImpl.setRealClient(dagClientRpc);

DAGClientAMProtocolBlockingPB mock = mock(DAGClientAMProtocolBlockingPB.class);
dagClientRpc.setAMProxy(mock);
dagClientRpc.injectAMFault(new NoCurrentDAGException("dag_0_0_0"));

DAGStatus dagStatus = dagClientImpl.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L);
assertEquals(DAGStatus.State.FAILED, dagStatus.getState());
assertEquals(NoCurrentDAGException.MESSAGE_PREFIX, dagStatus.getDiagnostics().get(0));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezAppMasterStatus;
import org.apache.tez.dag.api.DAGNotRunningException;
import org.apache.tez.dag.api.NoCurrentDAGException;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.DAGAppMaster;
Expand Down Expand Up @@ -94,7 +95,7 @@ DAG getDAG(String dagIdStr) throws TezException {

DAG currentDAG = getCurrentDAG();
if (currentDAG == null) {
throw new TezException("No running dag at present");
throw new NoCurrentDAGException(dagIdStr);
}

final String currentDAGIdStr = currentDAG.getID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.client.TezAppMasterStatus;
import org.apache.tez.dag.api.NoCurrentDAGException;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.AppContext;
Expand Down Expand Up @@ -58,12 +59,9 @@ public void testDAGClientHandler() throws TezException {
when(mockDAG.getVertexStatus(anyString(), anySet()))
.thenReturn(mockVertexStatusBuilder);

DAGAppMaster mockDagAM = mock(DAGAppMaster.class);
when(mockDagAM.getState()).thenReturn(DAGAppMasterState.RUNNING);
AppContext mockAppContext = mock(AppContext.class);
when(mockDagAM.getContext()).thenReturn(mockAppContext);
DAGAppMaster mockDagAM = getMockAm();

when(mockDagAM.getContext().getCurrentDAG()).thenReturn(mockDAG);
when(mockAppContext.getClock()).thenReturn(new SystemClock());

DAGClientHandler dagClientHandler = new DAGClientHandler(mockDagAM);

Expand Down Expand Up @@ -130,5 +128,44 @@ public void testDAGClientHandler() throws TezException {
dagClientHandler.shutdownAM();
verify(mockDagAM).shutdownTezAM(contains("Received message to shutdown AM from"));
}


@Test
public void testCurrentDAGFound() throws TezException {
TezDAGID mockTezDAGId = mock(TezDAGID.class);
when(mockTezDAGId.getId()).thenReturn(1);
when(mockTezDAGId.toString()).thenReturn("dag_9999_0001_1");

DAG mockDAG = mock(DAG.class);
when(mockDAG.getID()).thenReturn(mockTezDAGId);

DAGAppMaster mockDagAM = getMockAm();

// make the DAGAppMaster return the mockDAG as current DAG
when(mockDagAM.getContext().getCurrentDAG()).thenReturn(mockDAG);

DAGClientHandler dagClientHandler = new DAGClientHandler(mockDagAM);
assertEquals("dag_9999_0001_1", dagClientHandler.getDAG("dag_9999_0001_1").getID().toString());
}

@Test(expected = NoCurrentDAGException.class)
public void testNoCurrentDAGException() throws TezException {
DAGAppMaster mockDagAM = getMockAm();

// make the DAGAppMaster return null as current DAG
when(mockDagAM.getContext().getCurrentDAG()).thenReturn(null);

// so this should throw NoCurrentDAGException
new DAGClientHandler(mockDagAM).getDAG("dag_0000_0000_0");
}

private DAGAppMaster getMockAm() {
DAGAppMaster mockDagAM = mock(DAGAppMaster.class);
when(mockDagAM.getState()).thenReturn(DAGAppMasterState.RUNNING);

AppContext mockAppContext = mock(AppContext.class);
when(mockDagAM.getContext()).thenReturn(mockAppContext);
when(mockAppContext.getClock()).thenReturn(new SystemClock());

return mockDagAM;
}
}

0 comments on commit 34bb628

Please sign in to comment.