Skip to content

Commit

Permalink
YARN-11577. Improve FederationInterceptorREST Method Result.
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Oct 31, 2023
1 parent 254dbab commit e3d80d0
Show file tree
Hide file tree
Showing 7 changed files with 1,260 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public static void logAndThrowIOException(String errMsg, Throwable t)
throws IOException {
if (t != null) {
LOG.error(errMsg, t);
throw new IOException(errMsg, t);
throw new IOException(errMsg + " " + t.getMessage(), t);
} else {
LOG.error(errMsg);
throw new IOException(errMsg);
Expand All @@ -249,7 +249,7 @@ public static void logAndThrowIOException(Throwable t, String errMsgFormat, Obje
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
throw new IOException(msg, t);
throw new IOException(msg + " " + t.getMessage(), t);
} else {
LOG.error(msg);
throw new IOException(msg);
Expand All @@ -269,7 +269,7 @@ public static void logAndThrowRunTimeException(String errMsg, Throwable t)
throws RuntimeException {
if (t != null) {
LOG.error(errMsg, t);
throw new RuntimeException(errMsg, t);
throw new RuntimeException(errMsg + " " + t.getMessage(), t);
} else {
LOG.error(errMsg);
throw new RuntimeException(errMsg);
Expand All @@ -291,7 +291,7 @@ public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat,
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
throw new RuntimeException(msg, t);
throw new RuntimeException(msg + " " + t.getMessage(), t);
} else {
LOG.error(msg);
throw new RuntimeException(msg);
Expand All @@ -313,7 +313,7 @@ public static RuntimeException logAndReturnRunTimeException(
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
return new RuntimeException(msg, t);
return new RuntimeException(msg + " " + t.getMessage(), t);
} else {
LOG.error(msg);
return new RuntimeException(msg);
Expand Down Expand Up @@ -349,7 +349,7 @@ public static YarnRuntimeException logAndReturnYarnRunTimeException(
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
return new YarnRuntimeException(msg, t);
return new YarnRuntimeException(msg + " " + t.getMessage(), t);
} else {
LOG.error(msg);
return new YarnRuntimeException(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ protected DefaultRequestInterceptorREST getOrCreateInterceptorByAppId(String app

// Get homeSubCluster By appId
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
LOG.info("appId = {} : subClusterInfo = {}.", appId, subClusterInfo.getSubClusterId());
return getOrCreateInterceptorForSubCluster(subClusterInfo);
}

Expand Down Expand Up @@ -827,7 +828,7 @@ public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
});

if (apps.getApps().isEmpty()) {
return null;
return new AppsInfo();
}

// Merge all the application reports got from all the available YARN RMs
Expand Down Expand Up @@ -1135,7 +1136,7 @@ public AppState getAppState(HttpServletRequest hsr, String appId)
} catch (YarnException | IllegalArgumentException e) {
LOG.error("getHomeSubClusterInfoByAppId error, applicationId = {}.", appId, e);
}
return null;
return new AppState();
}

@Override
Expand Down Expand Up @@ -3371,17 +3372,19 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
}

Exception exception = result.getException();

// If allowPartialResult=false, it means that if an exception occurs in a subCluster,
// an exception will be thrown directly.
if (!allowPartialResult && exception != null) {
if (exception != null) {
throw exception;
}
} catch (Throwable e) {
String subClusterId = subClusterInfo != null ?
subClusterInfo.getSubClusterId().getId() : "UNKNOWN";
LOG.error("SubCluster {} failed to {} report.", subClusterId, request.getMethodName(), e);
throw new YarnRuntimeException(e.getCause().getMessage(), e);
// If allowPartialResult=false, it means that if an exception occurs in a subCluster,
// an exception will be thrown directly.
if (!allowPartialResult) {
throw new YarnException("SubCluster " + subClusterId +
" failed to " + request.getMethodName() + " report.", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
* main difference with AMRMProxyService is the protocol they implement.
**/
@Singleton
@Path("/ws/v1/cluster")
@Path(RMWSConsts.RM_WEB_SERVICE_PATH)
public class RouterWebServices implements RMWebServiceProtocol {

private static final Logger LOG =
Expand Down Expand Up @@ -424,7 +424,7 @@ public BulkActivitiesInfo getBulkActivities(
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.APP_ID) String appId,
@PathParam(RMWSConsts.APPID) String appId,
@QueryParam(RMWSConsts.MAX_TIME) String time,
@QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set<String> requestPriorities,
@QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,21 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.router.webapp.HTTPMethods;
import org.apache.hadoop.yarn.server.router.webapp.JavaProcess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,11 +56,20 @@
import java.security.PrivilegedExceptionAction;
import java.util.LinkedList;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static javax.servlet.http.HttpServletResponse.SC_OK;
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static javax.ws.rs.core.MediaType.APPLICATION_XML;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_NEW_APPLICATION;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_NEW;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.ADD_NODE_LABELS;
import static org.apache.hadoop.yarn.server.router.webapp.TestRouterWebServicesREST.waitWebAppRunning;
import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -190,6 +207,8 @@ public static <T> T performGetCalls(final String routerAddress, final String pat
final String queryValue) throws IOException, InterruptedException {

Client clientToRouter = Client.create();
clientToRouter.setReadTimeout(5000);
clientToRouter.setConnectTimeout(5000);
WebResource toRouter = clientToRouter.resource(routerAddress).path(path);

final WebResource.Builder toRouterBuilder;
Expand All @@ -207,4 +226,120 @@ public static <T> T performGetCalls(final String routerAddress, final String pat
return response.getEntity(returnType);
});
}

public static ClientResponse performCall(final String routerAddress, final String webAddress,
final String queryKey, final String queryValue, final Object context,
final HTTPMethods method) throws IOException, InterruptedException {

return UserGroupInformation.createRemoteUser(userName).doAs(
(PrivilegedExceptionAction<ClientResponse>) () -> {
Client clientToRouter = Client.create();
WebResource toRouter = clientToRouter.resource(routerAddress).path(webAddress);

WebResource toRouterWR = toRouter;
if (queryKey != null && queryValue != null) {
toRouterWR = toRouterWR.queryParam(queryKey, queryValue);
}

WebResource.Builder builder;
if (context != null) {
builder = toRouterWR.entity(context, APPLICATION_JSON);
builder = builder.accept(APPLICATION_JSON);
} else {
builder = toRouterWR.accept(APPLICATION_JSON);
}

ClientResponse response = null;

switch (method) {
case DELETE:
response = builder.delete(ClientResponse.class);
break;
case POST:
response = builder.post(ClientResponse.class);
break;
case PUT:
response = builder.put(ClientResponse.class);
break;
default:
break;
}

return response;
});
}

public String getNodeId(String rmAddress) {
Client clientToRM = Client.create();
clientToRM.setConnectTimeout(3000);
clientToRM.setReadTimeout(3000);
WebResource toRM = clientToRM.resource(rmAddress).path(RM_WEB_SERVICE_PATH + NODES);
ClientResponse response =
toRM.accept(APPLICATION_XML).get(ClientResponse.class);
NodesInfo ci = response.getEntity(NodesInfo.class);
List<NodeInfo> nodes = ci.getNodes();
if (nodes.isEmpty()) {
return null;
}
clientToRM.destroy();
return nodes.get(0).getNodeId();
}

public NewApplication getNewApplicationId(String routerAddress) {
Client clientToRM = Client.create();
clientToRM.setConnectTimeout(3000);
clientToRM.setReadTimeout(3000);
WebResource toRM = clientToRM.resource(routerAddress).path(
RM_WEB_SERVICE_PATH + APPS_NEW_APPLICATION);
ClientResponse response = toRM.accept(APPLICATION_XML).post(ClientResponse.class);
clientToRM.destroy();
return response.getEntity(NewApplication.class);
}

public String submitApplication(String routerAddress) {
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
String appId = getNewApplicationId(routerAddress).getApplicationId();
context.setApplicationId(appId);
Client clientToRouter = Client.create();
clientToRouter.setConnectTimeout(3000);
clientToRouter.setReadTimeout(3000);
WebResource toRM = clientToRouter.resource(routerAddress).path(
RM_WEB_SERVICE_PATH + APPS);
toRM.entity(context, APPLICATION_XML).accept(APPLICATION_XML).post(ClientResponse.class);
clientToRouter.destroy();
return appId;
}

public NewReservation getNewReservationId(String routerAddress) {
Client clientToRM = Client.create();
clientToRM.setConnectTimeout(3000);
clientToRM.setReadTimeout(3000);
WebResource toRM = clientToRM.resource(routerAddress).
path(RM_WEB_SERVICE_PATH + RESERVATION_NEW);
ClientResponse response = toRM.accept(APPLICATION_XML).post(ClientResponse.class);
return response.getEntity(NewReservation.class);
}

public String addNodeLabel(String routerAddress) {
Client clientToRM = Client.create();
clientToRM.setConnectTimeout(3000);
clientToRM.setReadTimeout(3000);
WebResource toRM = clientToRM.resource(routerAddress)
.path(RM_WEB_SERVICE_PATH + ADD_NODE_LABELS);
List<NodeLabel> nodeLabels = new ArrayList<>();
nodeLabels.add(NodeLabel.newInstance("default"));
NodeLabelsInfo context = new NodeLabelsInfo(nodeLabels);
ClientResponse response = toRM
.entity(context, APPLICATION_XML)
.accept(APPLICATION_XML)
.post(ClientResponse.class);
return response.getEntity(String.class);
}

public static String format(String format, Object... args) {
Pattern p = Pattern.compile("\\{.*?}");
Matcher m = p.matcher(format);
String newFormat = m.replaceAll("%s");
return String.format(newFormat, args);
}
}
Loading

0 comments on commit e3d80d0

Please sign in to comment.