Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4543: Throw a special exception to DagClient when there is no current DAG #338

Merged
merged 2 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions tez-api/src/main/java/org/apache/tez/common/RPCUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.hadoop.ipc.RemoteException;
import org.apache.tez.dag.api.DAGNotRunningException;
import org.apache.tez.dag.api.NoCurrentDAGException;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezException;

Expand Down Expand Up @@ -112,6 +113,9 @@ public static Void unwrapAndThrowException(ServiceException se)
} else if (DAGNotRunningException.class.isAssignableFrom(realClass)) {
throw instantiateTezException(
realClass.asSubclass(DAGNotRunningException.class), re);
} else if (NoCurrentDAGException.class.isAssignableFrom(realClass)) {
throw instantiateTezException(
realClass.asSubclass(NoCurrentDAGException.class), re);
} else if (TezException.class.isAssignableFrom(realClass)) {
throw instantiateTezException(
realClass.asSubclass(TezException.class), re);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;

/**
* Checked Exception thrown upon error
* Thrown by the AM when the DAG for which the status was queried
* is not running anymore: client can decide further action in this case.
*/
@Private
public class DAGNotRunningException extends TezException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.api;

import org.apache.hadoop.classification.InterfaceAudience.Private;

/**
* Fatal exception: thrown by the AM if there is no DAG running when
* when a DAG's status is queried. This is different from {@link org.apache.tez.dag.api.DAGNotRunningException}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit
two times when, once in first line & then in second

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) fixing it

* in a sense that this exception is fatal, in which scenario the client might consider the DAG failed, because
* it tries to ask a status from an AM which is not currently running a DAG. This scenario is possible in case
* an AM is restarted and the DagClient fails to realize it's asking the status of a possibly lost DAG.
*/
@Private
public class NoCurrentDAGException extends TezException {
private static final long serialVersionUID = 6337442733802964448L;

public static final String MESSAGE_PREFIX = "No running DAG at present";

public NoCurrentDAGException(String dagId) {
super(MESSAGE_PREFIX + ": " + dagId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.tez.client.FrameworkClient;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAGNotRunningException;
import org.apache.tez.dag.api.NoCurrentDAGException;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
Expand Down Expand Up @@ -408,6 +409,8 @@ private DAGStatus getDAGStatusViaAM(@Nullable Set<StatusGetOpts> statusOptions,
} catch (ApplicationNotFoundException e) {
LOG.info("DAG is no longer running - application not found by YARN", e);
dagCompleted = true;
} catch (NoCurrentDAGException e) {
return dagLost(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the exception e isn't used in the method dagLost, we can remove that param., moreover I think we can explore having a info log with the exception before returning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed

} catch (TezException e) {
// can be either due to a n/w issue or due to AM completed.
LOG.info("Cannot retrieve DAG Status due to TezException: {}", e.getMessage());
Expand All @@ -423,6 +426,14 @@ private DAGStatus getDAGStatusViaAM(@Nullable Set<StatusGetOpts> statusOptions,
return dagStatus;
}

private DAGStatus dagLost(Exception e) {
DAGProtos.DAGStatusProto.Builder builder = DAGProtos.DAGStatusProto.newBuilder();
DAGStatus dagStatus = new DAGStatus(builder, DagStatusSource.AM);
builder.setState(DAGProtos.DAGStatusStateProto.DAG_FAILED);
builder.addAllDiagnostics(Collections.singleton(NoCurrentDAGException.MESSAGE_PREFIX));
return dagStatus;
}

private VertexStatus getVertexStatusViaAM(String vertexName, Set<StatusGetOpts> statusOptions) throws
IOException {
VertexStatus vertexStatus = null;
Expand All @@ -435,9 +446,11 @@ private VertexStatus getVertexStatusViaAM(String vertexName, Set<StatusGetOpts>
LOG.info("DAG is no longer running - application not found by YARN", e);
dagCompleted = true;
} catch (TezException e) {
// can be either due to a n/w issue of due to AM completed.
// can be either due to a n/w issue or due to AM completed.
LOG.info("Cannot retrieve Vertex Status due to TezException: {}", e.getMessage());
} catch (IOException e) {
// can be either due to a n/w issue of due to AM completed.
LOG.info("Cannot retrieve Vertex Status due to IOException: {}", e.getMessage());
}

if (vertexStatus == null && !dagCompleted) {
Expand All @@ -455,10 +468,11 @@ private VertexStatus getVertexStatusViaAM(String vertexName, Set<StatusGetOpts>
*/
@VisibleForTesting
protected DAGStatus getDAGStatusViaRM() throws TezException, IOException {
LOG.debug("GetDAGStatus via AM for app: {} dag:{}", appId, dagId);
LOG.debug("Get DAG status via framework client for app: {} dag: {}", appId, dagId);
ApplicationReport appReport;
try {
appReport = frameworkClient.getApplicationReport(appId);
LOG.debug("Got appReport from framework client: {}", appReport);
} catch (ApplicationNotFoundException e) {
LOG.info("DAG is no longer running - application not found by YARN", e);
throw new DAGNotRunningException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.client.FrameworkClient;
import org.apache.tez.common.CachedEntity;
import org.apache.tez.dag.api.NoCurrentDAGException;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
Expand Down Expand Up @@ -435,7 +436,7 @@ private static void testAtsEnabled(ApplicationId appId, String dagIdStr, boolean
}

private static class DAGClientRPCImplForTest extends DAGClientRPCImpl {
private AtomicReference<IOException> faultAMInjectedRef;
private AtomicReference<TezException> faultAMInjectedRef;
int numGetStatusViaAmInvocations = 0;

public DAGClientRPCImplForTest(ApplicationId appId, String dagId,
Expand Down Expand Up @@ -472,7 +473,7 @@ DAGStatus getDAGStatusViaAM(Set<StatusGetOpts> statusOptions, long timeout)
return super.getDAGStatusViaAM(statusOptions, timeout);
}

void injectAMFault(IOException exception) {
void injectAMFault(TezException exception) {
faultAMInjectedRef.set(exception);
}
}
Expand Down Expand Up @@ -667,7 +668,7 @@ public void testGetDagStatusWithCachedStatusExpiration() throws Exception {
// When AM proxy throws an exception, the cachedDAGStatus should be returned
dagClientImpl.resetCounters();
dagClientRpc.resetCounters();
dagClientRpc.injectAMFault(new IOException("injected Fault"));
dagClientRpc.injectAMFault(new TezException("injected Fault"));
dagStatus = dagClientImpl.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class));
// get the Status from the cache
assertEquals(0, dagClientImpl.numGetStatusViaRmInvocations);
Expand All @@ -678,7 +679,7 @@ public void testGetDagStatusWithCachedStatusExpiration() throws Exception {

// test that RM is invoked when the cacheExpires and the AM fails.
dagClientRpc.setAMProxy(createMockProxy(DAGStatusStateProto.DAG_SUCCEEDED, 1000L));
dagClientRpc.injectAMFault(new IOException("injected AM Fault"));
dagClientRpc.injectAMFault(new TezException("injected AM Fault"));
dagClientImpl.resetCounters();
dagClientRpc.resetCounters();
dagClientImpl.enforceExpirationCachedDAGStatus();
Expand Down Expand Up @@ -710,4 +711,23 @@ public void testGetDagStatusWithCachedStatusExpiration() throws Exception {
}
}
}

@Test
public void testDagClientReturnsFailedDAGOnNoCurrentDAGException() throws Exception {
TezConfiguration tezConf = new TezConfiguration();

try (DAGClientImplForTest dagClientImpl = new DAGClientImplForTest(mockAppId, dagIdStr, tezConf, null)) {

DAGClientRPCImplForTest dagClientRpc = new DAGClientRPCImplForTest(mockAppId, dagIdStr, tezConf, null);
dagClientImpl.setRealClient(dagClientRpc);

DAGClientAMProtocolBlockingPB mock = mock(DAGClientAMProtocolBlockingPB.class);
dagClientRpc.setAMProxy(mock);
dagClientRpc.injectAMFault(new NoCurrentDAGException("dag_0_0_0"));

DAGStatus dagStatus = dagClientImpl.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000L);
assertEquals(DAGStatus.State.FAILED, dagStatus.getState());
assertEquals(NoCurrentDAGException.MESSAGE_PREFIX, dagStatus.getDiagnostics().get(0));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezAppMasterStatus;
import org.apache.tez.dag.api.DAGNotRunningException;
import org.apache.tez.dag.api.NoCurrentDAGException;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.DAGAppMaster;
Expand Down Expand Up @@ -94,7 +95,7 @@ DAG getDAG(String dagIdStr) throws TezException {

DAG currentDAG = getCurrentDAG();
if (currentDAG == null) {
throw new TezException("No running dag at present");
throw new NoCurrentDAGException(dagIdStr);
}

final String currentDAGIdStr = currentDAG.getID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.client.TezAppMasterStatus;
import org.apache.tez.dag.api.NoCurrentDAGException;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.AppContext;
Expand Down Expand Up @@ -58,12 +59,9 @@ public void testDAGClientHandler() throws TezException {
when(mockDAG.getVertexStatus(anyString(), anySet()))
.thenReturn(mockVertexStatusBuilder);

DAGAppMaster mockDagAM = mock(DAGAppMaster.class);
when(mockDagAM.getState()).thenReturn(DAGAppMasterState.RUNNING);
AppContext mockAppContext = mock(AppContext.class);
when(mockDagAM.getContext()).thenReturn(mockAppContext);
DAGAppMaster mockDagAM = getMockAm();

when(mockDagAM.getContext().getCurrentDAG()).thenReturn(mockDAG);
when(mockAppContext.getClock()).thenReturn(new SystemClock());

DAGClientHandler dagClientHandler = new DAGClientHandler(mockDagAM);

Expand Down Expand Up @@ -130,5 +128,44 @@ public void testDAGClientHandler() throws TezException {
dagClientHandler.shutdownAM();
verify(mockDagAM).shutdownTezAM(contains("Received message to shutdown AM from"));
}


@Test
public void testCurrentDAGFound() throws TezException {
TezDAGID mockTezDAGId = mock(TezDAGID.class);
when(mockTezDAGId.getId()).thenReturn(1);
when(mockTezDAGId.toString()).thenReturn("dag_9999_0001_1");

DAG mockDAG = mock(DAG.class);
when(mockDAG.getID()).thenReturn(mockTezDAGId);

DAGAppMaster mockDagAM = getMockAm();

// make the DAGAppMaster return the mockDAG as current DAG
when(mockDagAM.getContext().getCurrentDAG()).thenReturn(mockDAG);

DAGClientHandler dagClientHandler = new DAGClientHandler(mockDagAM);
assertEquals("dag_9999_0001_1", dagClientHandler.getDAG("dag_9999_0001_1").getID().toString());
}

@Test(expected = NoCurrentDAGException.class)
public void testNoCurrentDAGException() throws TezException {
DAGAppMaster mockDagAM = getMockAm();

// make the DAGAppMaster return null as current DAG
when(mockDagAM.getContext().getCurrentDAG()).thenReturn(null);

// so this should throw NoCurrentDAGException
new DAGClientHandler(mockDagAM).getDAG("dag_0000_0000_0");
}

private DAGAppMaster getMockAm() {
DAGAppMaster mockDagAM = mock(DAGAppMaster.class);
when(mockDagAM.getState()).thenReturn(DAGAppMasterState.RUNNING);

AppContext mockAppContext = mock(AppContext.class);
when(mockDagAM.getContext()).thenReturn(mockAppContext);
when(mockAppContext.getClock()).thenReturn(new SystemClock());

return mockDagAM;
}
}
Loading