Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11577. Improve FederationInterceptorREST Method Result. #6190

Merged
merged 5 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ public static void logAndThrowException(Throwable t, String errMsgFormat, Object
throws YarnException {
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
throw new YarnException(msg, t);
String newErrMsg = getErrorMsg(msg, t);
LOG.error(newErrMsg, t);
throw new YarnException(newErrMsg, t);
} else {
LOG.error(msg);
throw new YarnException(msg);
Expand Down Expand Up @@ -234,8 +235,9 @@ private static List<String> getInterceptorClassNames(Configuration conf,
public static void logAndThrowIOException(String errMsg, Throwable t)
throws IOException {
if (t != null) {
LOG.error(errMsg, t);
throw new IOException(errMsg, t);
goiri marked this conversation as resolved.
Show resolved Hide resolved
String newErrMsg = getErrorMsg(errMsg, t);
LOG.error(newErrMsg, t);
throw new IOException(newErrMsg, t);
} else {
LOG.error(errMsg);
throw new IOException(errMsg);
Expand All @@ -256,8 +258,9 @@ public static void logAndThrowIOException(Throwable t, String errMsgFormat, Obje
throws IOException {
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
throw new IOException(msg, t);
String newErrMsg = getErrorMsg(msg, t);
LOG.error(newErrMsg, t);
throw new IOException(newErrMsg, t);
} else {
LOG.error(msg);
throw new IOException(msg);
Expand All @@ -276,8 +279,9 @@ public static void logAndThrowIOException(Throwable t, String errMsgFormat, Obje
public static void logAndThrowRunTimeException(String errMsg, Throwable t)
throws RuntimeException {
if (t != null) {
LOG.error(errMsg, t);
throw new RuntimeException(errMsg, t);
String newErrMsg = getErrorMsg(errMsg, t);
LOG.error(newErrMsg, t);
throw new RuntimeException(newErrMsg, t);
} else {
LOG.error(errMsg);
throw new RuntimeException(errMsg);
Expand All @@ -298,8 +302,9 @@ public static void logAndThrowRunTimeException(Throwable t, String errMsgFormat,
throws RuntimeException {
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
throw new RuntimeException(msg, t);
String newErrMsg = getErrorMsg(msg, t);
LOG.error(newErrMsg, t);
throw new RuntimeException(newErrMsg, t);
} else {
LOG.error(msg);
throw new RuntimeException(msg);
Expand All @@ -320,8 +325,9 @@ public static RuntimeException logAndReturnRunTimeException(
Throwable t, String errMsgFormat, Object... args) {
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
return new RuntimeException(msg, t);
String newErrMsg = getErrorMsg(msg, t);
LOG.error(newErrMsg, t);
return new RuntimeException(newErrMsg, t);
} else {
LOG.error(msg);
return new RuntimeException(msg);
Expand Down Expand Up @@ -356,8 +362,9 @@ public static YarnRuntimeException logAndReturnYarnRunTimeException(
Throwable t, String errMsgFormat, Object... args) {
String msg = String.format(errMsgFormat, args);
if (t != null) {
LOG.error(msg, t);
return new YarnRuntimeException(msg, t);
String newErrMsg = getErrorMsg(msg, t);
LOG.error(newErrMsg, t);
return new YarnRuntimeException(newErrMsg, t);
} else {
LOG.error(msg);
return new YarnRuntimeException(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ protected DefaultRequestInterceptorREST getOrCreateInterceptorByAppId(String app

// Get homeSubCluster By appId
SubClusterInfo subClusterInfo = getHomeSubClusterInfoByAppId(appId);
LOG.info("appId = {} : subClusterInfo = {}.", appId, subClusterInfo.getSubClusterId());
return getOrCreateInterceptorForSubCluster(subClusterInfo);
}

Expand Down Expand Up @@ -827,7 +828,7 @@ public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
});

if (apps.getApps().isEmpty()) {
return null;
return new AppsInfo();
goiri marked this conversation as resolved.
Show resolved Hide resolved
}

// Merge all the application reports got from all the available YARN RMs
Expand Down Expand Up @@ -1135,7 +1136,7 @@ public AppState getAppState(HttpServletRequest hsr, String appId)
} catch (YarnException | IllegalArgumentException e) {
LOG.error("getHomeSubClusterInfoByAppId error, applicationId = {}.", appId, e);
}
return null;
return new AppState();
}

@Override
Expand Down Expand Up @@ -3371,17 +3372,19 @@ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> c
}

Exception exception = result.getException();

// If allowPartialResult=false, it means that if an exception occurs in a subCluster,
// an exception will be thrown directly.
if (!allowPartialResult && exception != null) {
if (exception != null) {
throw exception;
}
} catch (Throwable e) {
String subClusterId = subClusterInfo != null ?
subClusterInfo.getSubClusterId().getId() : "UNKNOWN";
LOG.error("SubCluster {} failed to {} report.", subClusterId, request.getMethodName(), e);
throw new YarnRuntimeException(e.getCause().getMessage(), e);
// If allowPartialResult=false, it means that if an exception occurs in a subCluster,
goiri marked this conversation as resolved.
Show resolved Hide resolved
// an exception will be thrown directly.
if (!allowPartialResult) {
throw new YarnException("SubCluster " + subClusterId +
" failed to " + request.getMethodName() + " report.", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
* main difference with AMRMProxyService is the protocol they implement.
**/
@Singleton
@Path("/ws/v1/cluster")
goiri marked this conversation as resolved.
Show resolved Hide resolved
@Path(RMWSConsts.RM_WEB_SERVICE_PATH)
public class RouterWebServices implements RMWebServiceProtocol {

private static final Logger LOG =
Expand Down Expand Up @@ -424,7 +424,7 @@ public BulkActivitiesInfo getBulkActivities(
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.APP_ID) String appId,
@PathParam(RMWSConsts.APPID) String appId,
goiri marked this conversation as resolved.
Show resolved Hide resolved
@QueryParam(RMWSConsts.MAX_TIME) String time,
@QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set<String> requestPriorities,
@QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,21 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewReservation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.router.webapp.HTTPMethods;
import org.apache.hadoop.yarn.server.router.webapp.JavaProcess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,11 +56,20 @@
import java.security.PrivilegedExceptionAction;
import java.util.LinkedList;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static javax.servlet.http.HttpServletResponse.SC_OK;
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
import static javax.ws.rs.core.MediaType.APPLICATION_XML;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RM_WEB_SERVICE_PATH;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.NODES;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS_NEW_APPLICATION;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.APPS;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.RESERVATION_NEW;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts.ADD_NODE_LABELS;
import static org.apache.hadoop.yarn.server.router.webapp.TestRouterWebServicesREST.waitWebAppRunning;
import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -190,6 +207,8 @@ public static <T> T performGetCalls(final String routerAddress, final String pat
final String queryValue) throws IOException, InterruptedException {

Client clientToRouter = Client.create();
clientToRouter.setReadTimeout(5000);
clientToRouter.setConnectTimeout(5000);
WebResource toRouter = clientToRouter.resource(routerAddress).path(path);

final WebResource.Builder toRouterBuilder;
Expand All @@ -207,4 +226,120 @@ public static <T> T performGetCalls(final String routerAddress, final String pat
return response.getEntity(returnType);
});
}

public static ClientResponse performCall(final String routerAddress, final String webAddress,
final String queryKey, final String queryValue, final Object context,
final HTTPMethods method) throws IOException, InterruptedException {

return UserGroupInformation.createRemoteUser(userName).doAs(
(PrivilegedExceptionAction<ClientResponse>) () -> {
Client clientToRouter = Client.create();
WebResource toRouter = clientToRouter.resource(routerAddress).path(webAddress);

WebResource toRouterWR = toRouter;
if (queryKey != null && queryValue != null) {
toRouterWR = toRouterWR.queryParam(queryKey, queryValue);
}

WebResource.Builder builder;
if (context != null) {
builder = toRouterWR.entity(context, APPLICATION_JSON);
builder = builder.accept(APPLICATION_JSON);
} else {
builder = toRouterWR.accept(APPLICATION_JSON);
}

ClientResponse response = null;

switch (method) {
case DELETE:
response = builder.delete(ClientResponse.class);
break;
case POST:
response = builder.post(ClientResponse.class);
break;
case PUT:
response = builder.put(ClientResponse.class);
break;
default:
break;
}

return response;
});
}

public String getNodeId(String rmAddress) {
Client clientToRM = Client.create();
clientToRM.setConnectTimeout(3000);
clientToRM.setReadTimeout(3000);
WebResource toRM = clientToRM.resource(rmAddress).path(RM_WEB_SERVICE_PATH + NODES);
ClientResponse response =
toRM.accept(APPLICATION_XML).get(ClientResponse.class);
NodesInfo ci = response.getEntity(NodesInfo.class);
List<NodeInfo> nodes = ci.getNodes();
if (nodes.isEmpty()) {
return null;
}
clientToRM.destroy();
return nodes.get(0).getNodeId();
}

public NewApplication getNewApplicationId(String routerAddress) {
Client clientToRM = Client.create();
clientToRM.setConnectTimeout(3000);
clientToRM.setReadTimeout(3000);
WebResource toRM = clientToRM.resource(routerAddress).path(
RM_WEB_SERVICE_PATH + APPS_NEW_APPLICATION);
ClientResponse response = toRM.accept(APPLICATION_XML).post(ClientResponse.class);
clientToRM.destroy();
return response.getEntity(NewApplication.class);
}

public String submitApplication(String routerAddress) {
ApplicationSubmissionContextInfo context = new ApplicationSubmissionContextInfo();
String appId = getNewApplicationId(routerAddress).getApplicationId();
context.setApplicationId(appId);
Client clientToRouter = Client.create();
clientToRouter.setConnectTimeout(3000);
clientToRouter.setReadTimeout(3000);
WebResource toRM = clientToRouter.resource(routerAddress).path(
RM_WEB_SERVICE_PATH + APPS);
toRM.entity(context, APPLICATION_XML).accept(APPLICATION_XML).post(ClientResponse.class);
clientToRouter.destroy();
return appId;
}

public NewReservation getNewReservationId(String routerAddress) {
Client clientToRM = Client.create();
clientToRM.setConnectTimeout(3000);
clientToRM.setReadTimeout(3000);
WebResource toRM = clientToRM.resource(routerAddress).
path(RM_WEB_SERVICE_PATH + RESERVATION_NEW);
ClientResponse response = toRM.accept(APPLICATION_XML).post(ClientResponse.class);
return response.getEntity(NewReservation.class);
}

public String addNodeLabel(String routerAddress) {
Client clientToRM = Client.create();
clientToRM.setConnectTimeout(3000);
clientToRM.setReadTimeout(3000);
WebResource toRM = clientToRM.resource(routerAddress)
.path(RM_WEB_SERVICE_PATH + ADD_NODE_LABELS);
List<NodeLabel> nodeLabels = new ArrayList<>();
nodeLabels.add(NodeLabel.newInstance("default"));
NodeLabelsInfo context = new NodeLabelsInfo(nodeLabels);
ClientResponse response = toRM
.entity(context, APPLICATION_XML)
.accept(APPLICATION_XML)
.post(ClientResponse.class);
return response.getEntity(String.class);
}

public static String format(String format, Object... args) {
Pattern p = Pattern.compile("\\{.*?}");
Matcher m = p.matcher(format);
String newFormat = m.replaceAll("%s");
return String.format(newFormat, args);
}
}
Loading