diff --git a/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java b/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java index 232ad6a7cd..ab265f6ea8 100644 --- a/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java +++ b/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java @@ -24,6 +24,7 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.tez.dag.api.DAGNotRunningException; +import org.apache.tez.dag.api.NoCurrentDAGException; import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezException; @@ -112,6 +113,9 @@ public static Void unwrapAndThrowException(ServiceException se) } else if (DAGNotRunningException.class.isAssignableFrom(realClass)) { throw instantiateTezException( realClass.asSubclass(DAGNotRunningException.class), re); + } else if (NoCurrentDAGException.class.isAssignableFrom(realClass)) { + throw instantiateTezException( + realClass.asSubclass(NoCurrentDAGException.class), re); } else if (TezException.class.isAssignableFrom(realClass)) { throw instantiateTezException( realClass.asSubclass(TezException.class), re); diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAGNotRunningException.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAGNotRunningException.java index cbc93a9647..93f5d71b53 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAGNotRunningException.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAGNotRunningException.java @@ -21,7 +21,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; /** - * Checked Exception thrown upon error + * Thrown by the AM when the DAG for which the status was queried + * is not running anymore: client can decide further action in this case. */ @Private public class DAGNotRunningException extends TezException { diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/NoCurrentDAGException.java b/tez-api/src/main/java/org/apache/tez/dag/api/NoCurrentDAGException.java new file mode 100644 index 0000000000..26ef89abab --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/NoCurrentDAGException.java @@ -0,0 +1,39 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.api; + +import org.apache.hadoop.classification.InterfaceAudience.Private; + +/** + * Fatal exception: thrown by the AM if there is no DAG running when + * a DAG's status is queried. This is different from {@link org.apache.tez.dag.api.DAGNotRunningException} + * in a sense that this exception is fatal, in which scenario the client might consider the DAG failed, because + * it tries to ask a status from an AM which is not currently running a DAG. This scenario is possible in case + * an AM is restarted and the DagClient fails to realize it's asking the status of a possibly lost DAG. + */ +@Private +public class NoCurrentDAGException extends TezException { + private static final long serialVersionUID = 6337442733802964448L; + + public static final String MESSAGE_PREFIX = "No running DAG at present"; + + public NoCurrentDAGException(String dagId) { + super(MESSAGE_PREFIX + ": " + dagId); + } +} diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java index 95dd85f388..727719eccc 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClientImpl.java @@ -46,6 +46,7 @@ import org.apache.tez.client.FrameworkClient; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAGNotRunningException; +import org.apache.tez.dag.api.NoCurrentDAGException; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; @@ -408,6 +409,9 @@ private DAGStatus getDAGStatusViaAM(@Nullable Set statusOptions, } catch (ApplicationNotFoundException e) { LOG.info("DAG is no longer running - application not found by YARN", e); dagCompleted = true; + } catch (NoCurrentDAGException e) { + LOG.info("Got NoCurrentDAGException from AM, returning a failed DAG", e); + return dagLost(); } catch (TezException e) { // can be either due to a n/w issue or due to AM completed. LOG.info("Cannot retrieve DAG Status due to TezException: {}", e.getMessage()); @@ -423,6 +427,14 @@ private DAGStatus getDAGStatusViaAM(@Nullable Set statusOptions, return dagStatus; } + private DAGStatus dagLost() { + DAGProtos.DAGStatusProto.Builder builder = DAGProtos.DAGStatusProto.newBuilder(); + DAGStatus dagStatus = new DAGStatus(builder, DagStatusSource.AM); + builder.setState(DAGProtos.DAGStatusStateProto.DAG_FAILED); + builder.addAllDiagnostics(Collections.singleton(NoCurrentDAGException.MESSAGE_PREFIX)); + return dagStatus; + } + private VertexStatus getVertexStatusViaAM(String vertexName, Set statusOptions) throws IOException { VertexStatus vertexStatus = null; @@ -435,9 +447,11 @@ private VertexStatus getVertexStatusViaAM(String vertexName, Set LOG.info("DAG is no longer running - application not found by YARN", e); dagCompleted = true; } catch (TezException e) { - // can be either due to a n/w issue of due to AM completed. + // can be either due to a n/w issue or due to AM completed. + LOG.info("Cannot retrieve Vertex Status due to TezException: {}", e.getMessage()); } catch (IOException e) { // can be either due to a n/w issue of due to AM completed. + LOG.info("Cannot retrieve Vertex Status due to IOException: {}", e.getMessage()); } if (vertexStatus == null && !dagCompleted) { @@ -455,10 +469,11 @@ private VertexStatus getVertexStatusViaAM(String vertexName, Set */ @VisibleForTesting protected DAGStatus getDAGStatusViaRM() throws TezException, IOException { - LOG.debug("GetDAGStatus via AM for app: {} dag:{}", appId, dagId); + LOG.debug("Get DAG status via framework client for app: {} dag: {}", appId, dagId); ApplicationReport appReport; try { appReport = frameworkClient.getApplicationReport(appId); + LOG.debug("Got appReport from framework client: {}", appReport); } catch (ApplicationNotFoundException e) { LOG.info("DAG is no longer running - application not found by YARN", e); throw new DAGNotRunningException(e); diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java index 0ea5d1a26d..662de982fd 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/client/rpc/TestDAGClient.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.tez.client.FrameworkClient; import org.apache.tez.common.CachedEntity; +import org.apache.tez.dag.api.NoCurrentDAGException; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.client.DAGClient; @@ -435,7 +436,7 @@ private static void testAtsEnabled(ApplicationId appId, String dagIdStr, boolean } private static class DAGClientRPCImplForTest extends DAGClientRPCImpl { - private AtomicReference faultAMInjectedRef; + private AtomicReference faultAMInjectedRef; int numGetStatusViaAmInvocations = 0; public DAGClientRPCImplForTest(ApplicationId appId, String dagId, @@ -472,7 +473,7 @@ DAGStatus getDAGStatusViaAM(Set statusOptions, long timeout) return super.getDAGStatusViaAM(statusOptions, timeout); } - void injectAMFault(IOException exception) { + void injectAMFault(TezException exception) { faultAMInjectedRef.set(exception); } } @@ -667,7 +668,7 @@ public void testGetDagStatusWithCachedStatusExpiration() throws Exception { // When AM proxy throws an exception, the cachedDAGStatus should be returned dagClientImpl.resetCounters(); dagClientRpc.resetCounters(); - dagClientRpc.injectAMFault(new IOException("injected Fault")); + dagClientRpc.injectAMFault(new TezException("injected Fault")); dagStatus = dagClientImpl.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class)); // get the Status from the cache assertEquals(0, dagClientImpl.numGetStatusViaRmInvocations); @@ -678,7 +679,7 @@ public void testGetDagStatusWithCachedStatusExpiration() throws Exception { // test that RM is invoked when the cacheExpires and the AM fails. dagClientRpc.setAMProxy(createMockProxy(DAGStatusStateProto.DAG_SUCCEEDED, 1000L)); - dagClientRpc.injectAMFault(new IOException("injected AM Fault")); + dagClientRpc.injectAMFault(new TezException("injected AM Fault")); dagClientImpl.resetCounters(); dagClientRpc.resetCounters(); dagClientImpl.enforceExpirationCachedDAGStatus(); @@ -710,4 +711,23 @@ public void testGetDagStatusWithCachedStatusExpiration() throws Exception { } } } + + @Test + public void testDagClientReturnsFailedDAGOnNoCurrentDAGException() throws Exception { + TezConfiguration tezConf = new TezConfiguration(); + + try (DAGClientImplForTest dagClientImpl = new DAGClientImplForTest(mockAppId, dagIdStr, tezConf, null)) { + + DAGClientRPCImplForTest dagClientRpc = new DAGClientRPCImplForTest(mockAppId, dagIdStr, tezConf, null); + dagClientImpl.setRealClient(dagClientRpc); + + DAGClientAMProtocolBlockingPB mock = mock(DAGClientAMProtocolBlockingPB.class); + dagClientRpc.setAMProxy(mock); + dagClientRpc.injectAMFault(new NoCurrentDAGException("dag_0_0_0")); + + DAGStatus dagStatus = dagClientImpl.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L); + assertEquals(DAGStatus.State.FAILED, dagStatus.getState()); + assertEquals(NoCurrentDAGException.MESSAGE_PREFIX, dagStatus.getDiagnostics().get(0)); + } + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java index 1de62012e7..4ed9d86a34 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.client.TezAppMasterStatus; import org.apache.tez.dag.api.DAGNotRunningException; +import org.apache.tez.dag.api.NoCurrentDAGException; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.DAGAppMaster; @@ -94,7 +95,7 @@ DAG getDAG(String dagIdStr) throws TezException { DAG currentDAG = getCurrentDAG(); if (currentDAG == null) { - throw new TezException("No running dag at present"); + throw new NoCurrentDAGException(dagIdStr); } final String currentDAGIdStr = currentDAG.getID().toString(); diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java index c7daebf02d..f594df2d9c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestDAGClientHandler.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tez.client.TezAppMasterStatus; +import org.apache.tez.dag.api.NoCurrentDAGException; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.AppContext; @@ -58,12 +59,9 @@ public void testDAGClientHandler() throws TezException { when(mockDAG.getVertexStatus(anyString(), anySet())) .thenReturn(mockVertexStatusBuilder); - DAGAppMaster mockDagAM = mock(DAGAppMaster.class); - when(mockDagAM.getState()).thenReturn(DAGAppMasterState.RUNNING); - AppContext mockAppContext = mock(AppContext.class); - when(mockDagAM.getContext()).thenReturn(mockAppContext); + DAGAppMaster mockDagAM = getMockAm(); + when(mockDagAM.getContext().getCurrentDAG()).thenReturn(mockDAG); - when(mockAppContext.getClock()).thenReturn(new SystemClock()); DAGClientHandler dagClientHandler = new DAGClientHandler(mockDagAM); @@ -130,5 +128,44 @@ public void testDAGClientHandler() throws TezException { dagClientHandler.shutdownAM(); verify(mockDagAM).shutdownTezAM(contains("Received message to shutdown AM from")); } - + + @Test + public void testCurrentDAGFound() throws TezException { + TezDAGID mockTezDAGId = mock(TezDAGID.class); + when(mockTezDAGId.getId()).thenReturn(1); + when(mockTezDAGId.toString()).thenReturn("dag_9999_0001_1"); + + DAG mockDAG = mock(DAG.class); + when(mockDAG.getID()).thenReturn(mockTezDAGId); + + DAGAppMaster mockDagAM = getMockAm(); + + // make the DAGAppMaster return the mockDAG as current DAG + when(mockDagAM.getContext().getCurrentDAG()).thenReturn(mockDAG); + + DAGClientHandler dagClientHandler = new DAGClientHandler(mockDagAM); + assertEquals("dag_9999_0001_1", dagClientHandler.getDAG("dag_9999_0001_1").getID().toString()); + } + + @Test(expected = NoCurrentDAGException.class) + public void testNoCurrentDAGException() throws TezException { + DAGAppMaster mockDagAM = getMockAm(); + + // make the DAGAppMaster return null as current DAG + when(mockDagAM.getContext().getCurrentDAG()).thenReturn(null); + + // so this should throw NoCurrentDAGException + new DAGClientHandler(mockDagAM).getDAG("dag_0000_0000_0"); + } + + private DAGAppMaster getMockAm() { + DAGAppMaster mockDagAM = mock(DAGAppMaster.class); + when(mockDagAM.getState()).thenReturn(DAGAppMasterState.RUNNING); + + AppContext mockAppContext = mock(AppContext.class); + when(mockDagAM.getContext()).thenReturn(mockAppContext); + when(mockAppContext.getClock()).thenReturn(new SystemClock()); + + return mockDagAM; + } }