diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java index 096d8dac0a76c..130503065d6b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java @@ -125,8 +125,9 @@ public static void logAndThrowException(Throwable t, String errMsgFormat, Object public static void logAndThrowException(String errMsg, Throwable t) throws YarnException { if (t != null) { - LOG.error(errMsg, t); - throw new YarnException(errMsg, t); + String newErrMsg = getErrorMsg(errMsg, t); + LOG.error(newErrMsg, t); + throw new YarnException(newErrMsg, t); } else { LOG.error(errMsg); throw new YarnException(errMsg); @@ -146,6 +147,13 @@ public static void logAndThrowException(String errMsg) throws YarnException { throw new YarnException(errMsg); } + private static String getErrorMsg(String errMsg, Throwable t) { + if (t.getMessage() != null) { + return errMsg + "" + t.getMessage(); + } + return errMsg; + } + public static R createRequestInterceptorChain(Configuration conf, String pipeLineClassName, String interceptorClassName, Class clazz) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java index 35b3e6eeb2bd5..f9f08583ba596 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java @@ -24,13 +24,13 @@ import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.lang.reflect.Method; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Random; import java.util.TreeMap; -import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -842,13 +842,27 @@ Collection invokeConcurrent(ClientMethod request, Class clazz) // Generate parallel Callable tasks for (SubClusterId subClusterId : subClusterIds) { callables.add(() -> { - ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterId); - String methodName = request.getMethodName(); - Class[] types = request.getTypes(); - Object[] params = request.getParams(); - Method method = ApplicationClientProtocol.class.getMethod(methodName, types); - Object result = method.invoke(protocol, params); - return Pair.of(subClusterId, result); + try { + ApplicationClientProtocol protocol = getClientRMProxyForSubCluster(subClusterId); + String methodName = request.getMethodName(); + Class[] types = request.getTypes(); + Object[] params = request.getParams(); + Method method = ApplicationClientProtocol.class.getMethod(methodName, types); + Object result = method.invoke(protocol, params); + return Pair.of(subClusterId, result); + } catch (Exception e) { + Throwable cause = e.getCause(); + // We use Callable. If the exception thrown here is InvocationTargetException, + // it is a wrapped exception. We need to get the real cause of the error. + if (cause != null && cause instanceof InvocationTargetException) { + cause = cause.getCause(); + } + String errMsg = (cause.getMessage() != null) ? cause.getMessage() : "UNKNOWN"; + YarnException yarnException = + new YarnException(String.format("subClusterId %s exec %s error %s.", + subClusterId, request.getMethodName(), errMsg), e); + return Pair.of(subClusterId, yarnException); + } }); } @@ -862,8 +876,11 @@ Collection invokeConcurrent(ClientMethod request, Class clazz) Pair pair = future.get(); subClusterId = pair.getKey(); Object result = pair.getValue(); + if (result instanceof YarnException) { + throw YarnException.class.cast(result); + } results.put(subClusterId, clazz.cast(result)); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException | ExecutionException | YarnException e) { Throwable cause = e.getCause(); LOG.error("Cannot execute {} on {} : {}", request.getMethodName(), subClusterId.getId(), cause.getMessage()); @@ -877,9 +894,8 @@ Collection invokeConcurrent(ClientMethod request, Class clazz) // All sub-clusters return results to be considered successful, // otherwise an exception will be thrown. if (exceptions != null && !exceptions.isEmpty()) { - Set subClusterIdSets = exceptions.keySet(); - throw new YarnException("invokeConcurrent Failed, An exception occurred in subClusterIds = " + - StringUtils.join(subClusterIdSets, ",")); + throw new YarnException("invokeConcurrent Failed = " + + StringUtils.join(exceptions.values(), ",")); } // return result diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java index 25ca03353e1c7..bf7ef7d17913d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; @@ -353,4 +354,60 @@ private void checkSubmitSubCluster(ApplicationId appId, SubClusterId expectSubCl SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster(); Assert.assertEquals(expectSubCluster, respSubClusterId); } + + @Test + public void testSubmitApplicationTwoBadNodeWithRealError() throws Exception { + LOG.info("Test submitApplication with two bad SubClusters."); + setupCluster(Arrays.asList(bad1, bad2)); + interceptor.setNumSubmitRetries(1); + + final ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 5); + + final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + + LambdaTestUtils.intercept(YarnException.class, "RM is stopped", + () -> interceptor.submitApplication(request)); + } + + @Test + public void testSubmitApplicationOneBadNodeWithRealError() throws Exception { + LOG.info("Test submitApplication with one bad SubClusters."); + setupCluster(Arrays.asList(bad1)); + interceptor.setNumSubmitRetries(0); + + final ApplicationId appId = + ApplicationId.newInstance(System.currentTimeMillis(), 6); + + final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId); + + LambdaTestUtils.intercept(YarnException.class, "RM is stopped", + () -> interceptor.submitApplication(request)); + } + + @Test + public void testGetClusterMetricsTwoBadNodeWithRealError() throws Exception { + LOG.info("Test getClusterMetrics with two bad SubClusters."); + setupCluster(Arrays.asList(bad1, bad2)); + GetClusterMetricsRequest request = GetClusterMetricsRequest.newInstance(); + + LambdaTestUtils.intercept(YarnException.class, + "subClusterId 1 exec getClusterMetrics error RM is stopped.", + () -> interceptor.getClusterMetrics(request)); + + LambdaTestUtils.intercept(YarnException.class, + "subClusterId 2 exec getClusterMetrics error RM is stopped.", + () -> interceptor.getClusterMetrics(request)); + } + + @Test + public void testGetClusterMetricsOneBadNodeWithRealError() throws Exception { + LOG.info("Test getClusterMetrics with one bad SubClusters."); + setupCluster(Arrays.asList(bad1)); + GetClusterMetricsRequest request = GetClusterMetricsRequest.newInstance(); + + LambdaTestUtils.intercept(YarnException.class, + "subClusterId 1 exec getClusterMetrics error RM is stopped.", + () -> interceptor.getClusterMetrics(request)); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java index 8e80bf2c0adb8..9d6b87e8cc990 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java @@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeAttributeType; import org.apache.hadoop.yarn.api.records.Resource; @@ -126,6 +128,11 @@ public SubmitApplicationResponse submitApplication( throw new ConnectException("RM is stopped"); } + @Override + public GetClusterMetricsResponse getClusterMetrics(GetClusterMetricsRequest request) + throws YarnException { + throw new YarnException("RM is stopped"); + } } /**