From b00d605832eaf7f05f52681cd44f552b398d8825 Mon Sep 17 00:00:00 2001 From: slfan1989 <55643692+slfan1989@users.noreply.github.com> Date: Tue, 10 Oct 2023 05:06:41 +0800 Subject: [PATCH] YARN-9048. Add znode hierarchy in Federation ZK State Store. (#6016) --- .../impl/ZookeeperFederationStateStore.java | 412 ++++++++++++++++-- .../src/test/resources/yarn-site.xml | 22 + 2 files changed, 395 insertions(+), 39 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/yarn-site.xml diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java index 63a8aa62f0030..8c59b2ad8cf4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java @@ -26,6 +26,8 @@ import java.util.ArrayList; import java.util.Calendar; import java.util.List; +import java.util.Map; +import java.util.HashMap; import java.util.TimeZone; import java.util.Comparator; import java.util.stream.Collectors; @@ -111,6 +113,7 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,8 +134,16 @@ * | |----- SC1 * | |----- SC2 * |--- APPLICATION - * | |----- APP1 - * | |----- APP2 + * | |----- HIERARCHIES + * | | |----- 1 + * | | | |----- (#ApplicationId barring last character) + * | | | | | |----- APP Data + * | | | .... + * | | | + * | | |----- 2 + * | | | |----- (#ApplicationId barring last 2 characters) + * | | | | |----- (#Last 2 characters of ApplicationId) + * | | | | | |----- APP Data * |--- POLICY * | |----- QUEUE1 * | |----- QUEUE1 @@ -194,12 +205,19 @@ public class ZookeeperFederationStateStore implements FederationStateStore { private int maxAppsInStateStore; /** Directory to store the delegation token data. **/ + private Map routerAppRootHierarchies; private String routerRMDTSecretManagerRoot; private String routerRMDTMasterKeysRootPath; private String routerRMDelegationTokensRootPath; private String routerRMSequenceNumberPath; private String routerRMMasterKeyIdPath; + private int appIdNodeSplitIndex = 0; + private final static int HIERARCHIES_LEVEL = 4; + + @VisibleForTesting + public static final String ROUTER_APP_ROOT_HIERARCHIES = "HIERARCHIES"; + private volatile Clock clock = SystemClock.getInstance(); protected static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 1); @@ -208,6 +226,27 @@ public class ZookeeperFederationStateStore implements FederationStateStore { private ZKFederationStateStoreOpDurations opDurations = ZKFederationStateStoreOpDurations.getInstance(); + /* + * Indicates different app attempt state store operations. + */ + private enum AppAttemptOp { + STORE, + UPDATE, + REMOVE + }; + + /** + * Encapsulates full app node path and corresponding split index. + */ + private final static class AppNodeSplitInfo { + private final String path; + private final int splitIndex; + AppNodeSplitInfo(String path, int splitIndex) { + this.path = path; + this.splitIndex = splitIndex; + } + } + @Override public void init(Configuration conf) throws YarnException { @@ -234,6 +273,23 @@ public void init(Configuration conf) throws YarnException { reservationsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_RESERVATION); versionNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_VERSION); + String hierarchiesPath = getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES); + routerAppRootHierarchies = new HashMap<>(); + routerAppRootHierarchies.put(0, appsZNode); + for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) { + routerAppRootHierarchies.put(splitIndex, + getNodePath(hierarchiesPath, Integer.toString(splitIndex))); + } + + appIdNodeSplitIndex = conf.getInt(YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, + YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX); + if (appIdNodeSplitIndex < 1 || appIdNodeSplitIndex > HIERARCHIES_LEVEL) { + LOG.info("Invalid value {} for config {} specified. Resetting it to {}", + appIdNodeSplitIndex, YarnConfiguration.ZK_APPID_NODE_SPLIT_INDEX, + YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX); + appIdNodeSplitIndex = YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX; + } + // delegation token znodes routerRMDTSecretManagerRoot = getNodePath(baseZNode, ROUTER_RM_DT_SECRET_MANAGER_ROOT); routerRMDTMasterKeysRootPath = getNodePath(routerRMDTSecretManagerRoot, @@ -250,6 +306,12 @@ public void init(Configuration conf) throws YarnException { List zkAcl = ZKCuratorManager.getZKAcls(conf); zkManager.createRootDirRecursively(membershipZNode, zkAcl); zkManager.createRootDirRecursively(appsZNode, zkAcl); + zkManager.createRootDirRecursively( + getNodePath(appsZNode, ROUTER_APP_ROOT_HIERARCHIES)); + for (int splitIndex = 1; splitIndex <= HIERARCHIES_LEVEL; splitIndex++) { + zkManager.createRootDirRecursively( + routerAppRootHierarchies.get(splitIndex)); + } zkManager.createRootDirRecursively(policiesZNode, zkAcl); zkManager.createRootDirRecursively(reservationsZNode, zkAcl); zkManager.createRootDirRecursively(routerRMDTSecretManagerRoot, zkAcl); @@ -320,6 +382,21 @@ public void close() throws Exception { } } + /** + * Register the home {@code SubClusterId} of the newly submitted + * {@code ApplicationId}. Currently response is empty if the operation was + * successful, if not an exception reporting reason for a failure. If a + * mapping for the application already existed, the {@code SubClusterId} in + * this response will return the existing mapping which might be different + * from that in the {@code AddApplicationHomeSubClusterRequest}. + * + * @param request the request to register a new application with its home sub-cluster. + * @return upon successful registration of the application in the StateStore, + * {@code AddApplicationHomeSubClusterRequest} containing the home + * sub-cluster of the application. Otherwise, an exception reporting + * reason for a failure. + * @throws YarnException indicates exceptions from yarn servers. + */ @Override public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( AddApplicationHomeSubClusterRequest request) throws YarnException { @@ -367,6 +444,17 @@ public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster( throw new YarnException("Cannot addApplicationHomeSubCluster by request"); } + /** + * Update the home {@code SubClusterId} of a previously submitted + * {@code ApplicationId}. Currently response is empty if the operation was + * successful, if not an exception reporting reason for a failure. + * + * @param request the request to update the home sub-cluster of an + * application. + * @return empty on successful update of the application in the StateStore, if + * not an exception reporting reason for a failure + * @throws YarnException indicates exceptions from yarn servers. + */ @Override public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( UpdateApplicationHomeSubClusterRequest request) throws YarnException { @@ -402,6 +490,15 @@ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubCluster( return UpdateApplicationHomeSubClusterResponse.newInstance(); } + /** + * Get information about the application identified by the input + * {@code ApplicationId}. + * + * @param request contains the application queried + * @return {@code ApplicationHomeSubCluster} containing the application's home + * subcluster + * @throws YarnException indicates exceptions from yarn servers. + */ @Override public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( GetApplicationHomeSubClusterRequest request) throws YarnException { @@ -437,6 +534,14 @@ public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster( subClusterId, createTime); } + /** + * Get the {@code ApplicationHomeSubCluster} list representing the mapping of + * all submitted applications to it's home sub-cluster. + * + * @param request empty representing all applications + * @return the mapping of all submitted application to it's home sub-cluster + * @throws YarnException indicates exceptions from yarn servers. + */ @Override public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( GetApplicationsHomeSubClusterRequest request) throws YarnException { @@ -448,9 +553,7 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( try { long start = clock.getTime(); SubClusterId requestSC = request.getSubClusterId(); - List children = zkManager.getChildren(appsZNode); - List result = children.stream() - .map(child -> generateAppHomeSC(child)) + List result = loadRouterApplications().stream() .sorted(Comparator.comparing(ApplicationHomeSubCluster::getCreateTime).reversed()) .filter(appHomeSC -> filterHomeSubCluster(requestSC, appHomeSC.getHomeSubCluster())) .limit(maxAppsInStateStore) @@ -467,48 +570,51 @@ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster( throw new YarnException("Cannot get app by request"); } - private ApplicationHomeSubCluster generateAppHomeSC(String appId) { - try { - // Parse ApplicationHomeSubCluster - ApplicationId applicationId = ApplicationId.fromString(appId); - ApplicationHomeSubCluster zkStoreApplicationHomeSubCluster = - getApplicationHomeSubCluster(applicationId); - - // Prepare to return data - SubClusterId subClusterId = zkStoreApplicationHomeSubCluster.getHomeSubCluster(); - ApplicationHomeSubCluster resultApplicationHomeSubCluster = - ApplicationHomeSubCluster.newInstance(applicationId, subClusterId); - return resultApplicationHomeSubCluster; - } catch (Exception ex) { - LOG.error("get homeSubCluster by appId = {}.", appId, ex); - } - return null; - } - + /** + * Delete the mapping of home {@code SubClusterId} of a previously submitted + * {@code ApplicationId}. Currently response is empty if the operation was + * successful, if not an exception reporting reason for a failure. + * + * @param request the request to delete the home sub-cluster of an + * application. + * @return empty on successful update of the application in the StateStore, if + * not an exception reporting reason for a failure + * @throws YarnException if the request is invalid/fails + */ @Override - public DeleteApplicationHomeSubClusterResponse - deleteApplicationHomeSubCluster( - DeleteApplicationHomeSubClusterRequest request) - throws YarnException { + public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubCluster( + DeleteApplicationHomeSubClusterRequest request) throws YarnException { long start = clock.getTime(); FederationApplicationHomeSubClusterStoreInputValidator.validate(request); ApplicationId appId = request.getApplicationId(); - String appZNode = getNodePath(appsZNode, appId.toString()); + String appIdRemovePath = getLeafAppIdNodePath(appId.toString(), false); + int splitIndex = appIdNodeSplitIndex; - boolean exists = false; + boolean exists = true; try { - exists = zkManager.exists(appZNode); + if (!exists(appIdRemovePath)) { + AppNodeSplitInfo alternatePathInfo = getAlternatePath(appId.toString()); + if (alternatePathInfo != null) { + appIdRemovePath = alternatePathInfo.path; + splitIndex = alternatePathInfo.splitIndex; + } else { + exists = false; + } + } } catch (Exception e) { String errMsg = "Cannot check app: " + e.getMessage(); FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } + if (!exists) { String errMsg = "Application " + appId + " does not exist"; FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); } try { - zkManager.delete(appZNode); + zkManager.delete(appIdRemovePath); + // Check if we should remove the parent app node as well. + checkRemoveParentAppNode(appIdRemovePath, splitIndex); } catch (Exception e) { String errMsg = "Cannot delete app: " + e.getMessage(); FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); @@ -730,11 +836,12 @@ public void storeVersion() throws Exception { * * @param appId Application identifier. * @return ApplicationHomeSubCluster identifier. - * @throws Exception If it cannot contact ZooKeeper. + * @throws YarnException If it cannot contact ZooKeeper. */ private ApplicationHomeSubCluster getApplicationHomeSubCluster( - final ApplicationId appId) throws YarnException { - String appZNode = getNodePath(appsZNode, appId.toString()); + final ApplicationId appId) throws YarnException{ + + String appZNode = getLeafAppIdNodePath(appId.toString(), false); ApplicationHomeSubCluster appHomeSubCluster = null; byte[] data = get(appZNode); @@ -761,11 +868,44 @@ private ApplicationHomeSubCluster getApplicationHomeSubCluster( private void storeOrUpdateApplicationHomeSubCluster(final ApplicationId applicationId, final ApplicationHomeSubCluster applicationHomeSubCluster, boolean update) throws YarnException { - String appZNode = getNodePath(appsZNode, applicationId.toString()); - ApplicationHomeSubClusterProto proto = - ((ApplicationHomeSubClusterPBImpl) applicationHomeSubCluster).getProto(); - byte[] data = proto.toByteArray(); - put(appZNode, data, update); + try { + ApplicationHomeSubClusterProto proto = + ((ApplicationHomeSubClusterPBImpl) applicationHomeSubCluster).getProto(); + byte[] data = proto.toByteArray(); + if (update) { + updateApplicationStateInternal(applicationId, data); + } else { + storeApplicationStateInternal(applicationId, data); + } + } catch (Exception e) { + throw new YarnException(e); + } + } + + protected void storeApplicationStateInternal(final ApplicationId applicationId, byte[] data) + throws Exception { + String nodeCreatePath = getLeafAppIdNodePath(applicationId.toString(), true); + LOG.debug("Storing info for app: {} at: {}.", applicationId, nodeCreatePath); + put(nodeCreatePath, data, false); + } + + protected void updateApplicationStateInternal(final ApplicationId applicationId, byte[] data) + throws Exception { + String nodeUpdatePath = getLeafAppIdNodePath(applicationId.toString(), false); + if (!exists(nodeUpdatePath)) { + AppNodeSplitInfo alternatePathInfo = getAlternatePath(applicationId.toString()); + if (alternatePathInfo != null) { + nodeUpdatePath = alternatePathInfo.path; + } else if (appIdNodeSplitIndex != 0) { + // No alternate path exists. Create path as per configured split index. + String rootNode = getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex); + if (!exists(rootNode)) { + zkManager.create(rootNode); + } + } + } + LOG.debug("Storing final state info for app: {} at: {}.", applicationId, nodeUpdatePath); + put(nodeUpdatePath, data, true); } /** @@ -1674,4 +1814,198 @@ public int incrementCurrentKeyId() { } return keyIdSeqCounter.getCount(); } + + /** + * Get parent app node path based on full path and split index supplied. + * @param appIdPath App id path for which parent needs to be returned. + * @param splitIndex split index. + * @return parent app node path. + */ + private String getSplitAppNodeParent(String appIdPath, int splitIndex) { + // Calculated as string upto index (appIdPath Length - split index - 1). We + // deduct 1 to exclude path separator. + return appIdPath.substring(0, appIdPath.length() - splitIndex - 1); + } + + /** + * Checks if parent app node has no leaf nodes and if it does not have, + * removes it. Called while removing application. + * + * @param appIdPath path of app id to be removed. + * @param splitIndex split index. + * @throws Exception if any problem occurs while performing ZK operation. + */ + private void checkRemoveParentAppNode(String appIdPath, int splitIndex) + throws Exception { + if (splitIndex == 0) { + return; + } + + String parentAppNode = getSplitAppNodeParent(appIdPath, splitIndex); + List children; + try { + children = getChildren(parentAppNode); + } catch (KeeperException.NoNodeException ke) { + // It should be fine to swallow this exception as the parent app node we + // intend to delete is already deleted. + LOG.debug("Unable to remove app parent node {} as it does not exist.", + parentAppNode); + return; + } + + // If children==null or children is not empty, we cannot delete the parent path. + if (children == null || !children.isEmpty()) { + return; + } + + // No apps stored under parent path. + try { + zkManager.delete(parentAppNode); + LOG.debug("No leaf app node exists. Removing parent node {}.", parentAppNode); + } catch (KeeperException.NotEmptyException ke) { + // It should be fine to swallow this exception as the parent app node + // has to be deleted only if it has no children. And this node has. + LOG.debug("Unable to remove app parent node {} as it has children.", + parentAppNode); + } + } + + List getChildren(final String path) throws Exception { + return zkManager.getChildren(path); + } + + /** + * Get alternate path for app id if path according to configured split index + * does not exist. We look for path based on all possible split indices. + * @param appId + * @return a {@link AppNodeSplitInfo} object containing the path and split + * index if it exists, null otherwise. + * @throws Exception if any problem occurs while performing ZK operation. + */ + private AppNodeSplitInfo getAlternatePath(String appId) throws Exception { + for (Map.Entry entry : routerAppRootHierarchies.entrySet()) { + // Look for other paths + int splitIndex = entry.getKey(); + if (splitIndex != appIdNodeSplitIndex) { + String alternatePath = + getLeafAppIdNodePath(appId, entry.getValue(), splitIndex, false); + if (exists(alternatePath)) { + return new AppNodeSplitInfo(alternatePath, splitIndex); + } + } + } + return null; + } + + /** + * Returns leaf app node path based on app id and passed split index. If the + * passed flag createParentIfNotExists is true, also creates the parent app + * node if it does not exist. + * @param appId application id. + * @param rootNode app root node based on split index. + * @param appIdNodeSplitIdx split index. + * @param createParentIfNotExists flag which determines if parent app node + * needs to be created(as per split) if it does not exist. + * @return leaf app node path. + * @throws Exception if any problem occurs while performing ZK operation. + */ + private String getLeafAppIdNodePath(String appId, String rootNode, + int appIdNodeSplitIdx, boolean createParentIfNotExists) throws Exception { + if (appIdNodeSplitIdx == 0) { + return getNodePath(rootNode, appId); + } + String nodeName = appId; + int splitIdx = nodeName.length() - appIdNodeSplitIdx; + String rootNodePath = getNodePath(rootNode, nodeName.substring(0, splitIdx)); + if (createParentIfNotExists && !exists(rootNodePath)) { + try { + zkManager.create(rootNodePath); + } catch (KeeperException.NodeExistsException e) { + LOG.debug("Unable to create app parent node {} as it already exists.", rootNodePath); + } + } + return getNodePath(rootNodePath, nodeName.substring(splitIdx)); + } + + /** + * Returns leaf app node path based on app id and configured split index. If + * the passed flag createParentIfNotExists is true, also creates the parent + * app node if it does not exist. + * @param appId application id. + * @param createParentIfNotExists flag which determines if parent app node + * needs to be created(as per split) if it does not exist. + * @return leaf app node path. + * @throws YarnException if any problem occurs while performing ZK operation. + */ + private String getLeafAppIdNodePath(String appId, + boolean createParentIfNotExists) throws YarnException { + try { + String rootNode = routerAppRootHierarchies.get(appIdNodeSplitIndex); + return getLeafAppIdNodePath(appId, rootNode, appIdNodeSplitIndex, createParentIfNotExists); + } catch (Exception e) { + throw new YarnException(e); + } + } + + private ApplicationHomeSubCluster loadRouterAppStateFromAppNode(String appNodePath) + throws Exception { + byte[] data = get(appNodePath); + LOG.debug("Loading application from znode: {}", appNodePath); + ApplicationHomeSubCluster appHomeSubCluster = null; + + if (data == null) { + return appHomeSubCluster; + } + + try { + appHomeSubCluster = new ApplicationHomeSubClusterPBImpl( + ApplicationHomeSubClusterProto.parseFrom(data)); + } catch (InvalidProtocolBufferException e) { + String errMsg = "Cannot parse application at " + appNodePath; + FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg); + } + + return appHomeSubCluster; + } + + private List loadRouterApplications() throws Exception { + List applicationHomeSubClusters = new ArrayList<>(); + for (int splitIndex = 0; splitIndex <= 4; splitIndex++) { + String appRoot = routerAppRootHierarchies.get(splitIndex); + if (appRoot == null) { + continue; + } + List childNodes = getChildren(appRoot); + boolean appNodeFound = false; + for (String childNodeName : childNodes) { + if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) { + appNodeFound = true; + if (splitIndex == 0) { + ApplicationHomeSubCluster applicationHomeSubCluster = + loadRouterAppStateFromAppNode(getNodePath(appRoot, childNodeName)); + applicationHomeSubClusters.add(applicationHomeSubCluster); + } else { + // If AppId Node is partitioned. + String parentNodePath = getNodePath(appRoot, childNodeName); + List leafNodes = getChildren(parentNodePath); + for (String leafNodeName : leafNodes) { + ApplicationHomeSubCluster applicationHomeSubCluster = + loadRouterAppStateFromAppNode(getNodePath(parentNodePath, leafNodeName)); + applicationHomeSubClusters.add(applicationHomeSubCluster); + } + } + } else if (!childNodeName.equals(ROUTER_APP_ROOT_HIERARCHIES)){ + LOG.debug("Unknown child node with name {} under {}.", childNodeName, appRoot); + } + } + if (splitIndex != appIdNodeSplitIndex && !appNodeFound) { + // If no loaded app exists for a particular split index and the split + // index for which apps are being loaded is not the one configured, then + // we do not need to keep track of this hierarchy for storing/updating/ + // removing app/app attempt znodes. + routerAppRootHierarchies.remove(splitIndex); + } + } + return applicationHomeSubClusters; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/yarn-site.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/yarn-site.xml new file mode 100644 index 0000000000000..1f283cf259aff --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/resources/yarn-site.xml @@ -0,0 +1,22 @@ + + + + + + + yarn.resourcemanager.zk-appid-node.split-index + 1 + +