Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-17651.[ARR] Async handler executor isolation #7244

Merged
merged 10 commits into from
Jan 20, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
public final class AsyncRpcProtocolPBUtil {
public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);
/** The executor used for handling responses asynchronously. */
private static Executor worker;
private static Executor asyncResponderExecutor;

private AsyncRpcProtocolPBUtil() {}

Expand Down Expand Up @@ -97,7 +97,7 @@ public static <T, R> R asyncIpcClient(
} catch (Exception ex) {
throw warpCompletionException(ex);
}
}, worker));
}, asyncResponderExecutor));
return asyncReturn(clazz);
}

Expand Down Expand Up @@ -144,10 +144,10 @@ public static <T> void asyncRouterServer(ServerReq<T> req, ServerRes<T> res) {
* Sets the executor used for handling responses asynchronously within
* the utility class.
*
* @param worker The executor to be used for handling responses asynchronously.
* @param asyncResponderExecutor The executor to be used for handling responses asynchronously.
*/
public static void setWorker(Executor worker) {
AsyncRpcProtocolPBUtil.worker = worker;
public static void setAsyncResponderExecutor(Executor asyncResponderExecutor) {
AsyncRpcProtocolPBUtil.asyncResponderExecutor = asyncResponderExecutor;
}

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,22 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final String DFS_ROUTER_RPC_ENABLE =
FEDERATION_ROUTER_PREFIX + "rpc.enable";
public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
public static final String DFS_ROUTER_RPC_ENABLE_ASYNC =
FEDERATION_ROUTER_PREFIX + "rpc.async.enable";
public static final boolean DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT = false;
public static final String DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT =
FEDERATION_ROUTER_PREFIX + "rpc.async.handler.count";
public static final int DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT = 2;
public static final String DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT =
FEDERATION_ROUTER_PREFIX + "rpc.async.responder.count";
public static final int DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT = 10;
// HDFS Router Asynchronous RPC
public static final String DFS_ROUTER_ASYNC_RPC_ENABLE_KEY =
FEDERATION_ROUTER_PREFIX + "async.rpc.enable";
public static final boolean DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT = false;
public static final String FEDERATION_ROUTER_ASYNC_RPC_PREFIX =
FEDERATION_ROUTER_PREFIX + "async.rpc.";
// Example: ns1:count1,ns2:count2,ns3:count3
public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "ns.handler.count";
public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT = "";
public static final String DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "handler.count";
public static final int DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count";
public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10;

public static final String DFS_ROUTER_METRICS_ENABLE =
FEDERATION_ROUTER_PREFIX + "metrics.enable";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT;
Expand All @@ -26,16 +36,8 @@
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
Expand All @@ -56,22 +58,24 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
Expand Down Expand Up @@ -209,6 +213,7 @@
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.util.ReflectionUtils;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -228,8 +233,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,

private static final Logger LOG =
LoggerFactory.getLogger(RouterRpcServer.class);
private ExecutorService asyncRouterHandler;
private ExecutorService asyncRouterResponder;

/** Name service keyword to identify fan-out calls. */
public static final String CONCURRENT_NS = "concurrent";

/** Configuration for the RPC server. */
private Configuration conf;
Expand Down Expand Up @@ -287,6 +293,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
/** Schedule the router federation rename jobs. */
private BalanceProcedureScheduler fedRenameScheduler;
private boolean enableAsync;
private Map<String, Integer> nsAsyncHandlerCount = new ConcurrentHashMap<>();
private Map<String, ExecutorService> asyncRouterHandlerExecutors = new ConcurrentHashMap<>();
private ExecutorService routerAsyncResponderExecutor;
private ExecutorService routerDefaultAsyncHandlerExecutor;

/**
* Construct a router RPC server.
Expand Down Expand Up @@ -318,11 +328,11 @@ public RouterRpcServer(Configuration conf, Router router,
int handlerQueueSize = this.conf.getInt(DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY,
DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT);

this.enableAsync = conf.getBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC,
DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT);
LOG.info("Router enable async {}", this.enableAsync);
this.enableAsync = conf.getBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY,
DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT);
LOG.info("Router enable async rpc: {}", this.enableAsync);
if (this.enableAsync) {
initAsyncThreadPool();
initAsyncThreadPools(conf);
}
// Override Hadoop Common IPC setting
int readerQueueSize = this.conf.getInt(DFS_ROUTER_READER_QUEUE_SIZE_KEY,
Expand Down Expand Up @@ -446,8 +456,7 @@ public RouterRpcServer(Configuration conf, Router router,
// Create the client
if (this.enableAsync) {
this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router,
this.namenodeResolver, this.rpcMonitor,
routerStateIdContext, asyncRouterHandler);
this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
this.clientProto = new RouterAsyncClientProtocol(conf, this);
this.nnProto = new RouterAsyncNamenodeProtocol(this);
this.routerProto = new RouterAsyncUserProtocol(this);
Expand Down Expand Up @@ -491,23 +500,77 @@ public RouterRpcServer(Configuration conf, Router router,

/**
* Init router async handlers and router async responders.
* @param configuration the configuration.
*/
public void initAsyncThreadPool() {
int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
if (asyncRouterHandler == null) {
LOG.info("init router async handler count: {}", asyncHandlerCount);
asyncRouterHandler = Executors.newFixedThreadPool(
asyncHandlerCount, new AsyncThreadFactory("router async handler "));
public void initAsyncThreadPools(Configuration configuration) {
LOG.info("Begin initialize asynchronous handler and responder thread pool.");
initNsAsyncHandlerCount();
Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(configuration);
Set<String> unassignedNS = new HashSet<>();
allConfiguredNS.add(CONCURRENT_NS);

for (String nsId : allConfiguredNS) {
int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
if (dedicatedHandlers > 0) {
initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, nsId);
} else {
unassignedNS.add(nsId);
}
}

int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);

if (!unassignedNS.isEmpty()) {
LOG.warn("Async handler unassigned ns: {}", unassignedNS);
LOG.info("Use default async handler count {} for unassigned ns.", asyncHandlerCountDefault);
for (String nsId : unassignedNS) {
initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
}
}
if (asyncRouterResponder == null) {
LOG.info("init router async responder count: {}", asyncResponderCount);
asyncRouterResponder = Executors.newFixedThreadPool(
asyncResponderCount, new AsyncThreadFactory("router async responder "));

int asyncResponderCount = configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
if (routerAsyncResponderExecutor == null) {
LOG.info("Initialize router async responder count: {}", asyncResponderCount);
routerAsyncResponderExecutor = Executors.newFixedThreadPool(
asyncResponderCount, new AsyncThreadFactory("Router Async Responder #"));
}
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);

if (routerDefaultAsyncHandlerExecutor == null) {
LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault);
routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
asyncHandlerCountDefault, new AsyncThreadFactory("Router Async Default Handler #"));
}
}

private void initNsAsyncHandlerCount() {
String configNsHandler = conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't prefer the format of this value, but no better idea now. ^_^

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Sir, yes. Actually I don't like this config too, but if we do not use this format , we may introduce more configs. like:
dfs.federation.router.async.rpc.hdfs1.handler.count
dfs.federation.router.async.rpc.hdfs2.handler.count
dfs.federation.router.async.rpc.hdfs3.handler.count

if (StringUtils.isEmpty(configNsHandler)) {
LOG.error(
"The value of config key: {} is empty. Will use default conf.",
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY);
}
AsyncRpcProtocolPBUtil.setWorker(asyncRouterResponder);
String[] nsHandlers = configNsHandler.split(",");
for (String nsHandlerInfo : nsHandlers) {
String[] nsHandlerItems = nsHandlerInfo.split(":");
if (nsHandlerItems.length != 2 || StringUtils.isBlank(nsHandlerItems[0]) ||
!StringUtils.isNumeric(nsHandlerItems[1])) {
LOG.error("The config key: {} is incorrect! The value is {}.",
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, nsHandlerInfo);
continue;
}
nsAsyncHandlerCount.put(nsHandlerItems[0], Integer.parseInt(nsHandlerItems[1]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it involve some unexpected action if nsHandlerItems[0] is not valid namespace name? Is it better to check first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sir, it won't. If nsHandlerItems[0] is not a valid namespace, it just exists in CHM and we use default async handler count to initialize unassignedNS.
image

}
}

private void initAsyncHandlerThreadPools4Ns(String nsId, int dedicatedHandlers) {
asyncRouterHandlerExecutors.computeIfAbsent(nsId, id -> Executors.newFixedThreadPool(
dedicatedHandlers, new AsyncThreadFactory("Router Async Handler for " + id + " #")));
}

/**
Expand Down Expand Up @@ -2426,8 +2489,12 @@ public boolean isAsync() {
return this.enableAsync;
}

public Executor getAsyncRouterHandler() {
return asyncRouterHandler;
public Map<String, ExecutorService> getAsyncRouterHandlerExecutors() {
return asyncRouterHandlerExecutors;
}

public ExecutorService getRouterAsyncHandlerDefaultExecutor() {
return routerDefaultAsyncHandlerExecutor;
}

private static class AsyncThreadFactory implements ThreadFactory {
Expand All @@ -2439,8 +2506,10 @@ private static class AsyncThreadFactory implements ThreadFactory {
}

@Override
public Thread newThread(Runnable r) {
return new Thread(r, namePrefix + threadNumber.getAndIncrement());
public Thread newThread(@NonNull Runnable r) {
Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
thread.setDaemon(true);
return thread;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
Expand Down Expand Up @@ -98,7 +97,6 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
private final ActiveNamenodeResolver namenodeResolver;
/** Optional perf monitor. */
private final RouterRpcMonitor rpcMonitor;
private final Executor asyncRouterHandler;

/**
* Create a router async RPC client to manage remote procedure calls to NNs.
Expand All @@ -108,17 +106,15 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
* @param resolver A NN resolver to determine the currently active NN in HA.
* @param monitor Optional performance monitor.
* @param routerStateIdContext the router state context object to hold the state ids for all
* @param asyncRouterHandler async router handler
* namespaces.
*/
public RouterAsyncRpcClient(Configuration conf,
Router router, ActiveNamenodeResolver resolver, RouterRpcMonitor monitor,
RouterStateIdContext routerStateIdContext, Executor asyncRouterHandler) {
RouterStateIdContext routerStateIdContext) {
super(conf, router, resolver, monitor, routerStateIdContext);
this.router = router;
this.namenodeResolver = resolver;
this.rpcMonitor = monitor;
this.asyncRouterHandler = asyncRouterHandler;
}

/**
Expand Down Expand Up @@ -172,6 +168,7 @@ public Object invokeMethod(
" with params " + Arrays.deepToString(params) + " from "
+ router.getRouterId());
}
String nsid = namenodes.get(0).getNameserviceId();
// transfer threadLocalContext to worker threads of executor.
ThreadLocalContext threadLocalContext = new ThreadLocalContext();
asyncComplete(null);
Expand All @@ -183,7 +180,8 @@ public Object invokeMethod(
threadLocalContext.transfer();
invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes,
useObserver, protocol, method, params);
}, asyncRouterHandler);
}, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid,
router.getRpcServer().getRouterAsyncHandlerDefaultExecutor()));
return null;
}

Expand Down
Loading
Loading