Skip to content

Commit

Permalink
HDFS-17651.[ARR] Async handler executor isolation
Browse files Browse the repository at this point in the history
  • Loading branch information
hfutatzhanghb committed Dec 23, 2024
1 parent be06adc commit 7f0b75f
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
public final class AsyncRpcProtocolPBUtil {
public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);
/** The executor used for handling responses asynchronously. */
private static Executor worker;
private static Executor asyncResponderExecutor;

private AsyncRpcProtocolPBUtil() {}

Expand Down Expand Up @@ -92,17 +92,17 @@ public static <T, R> R asyncIpcClient(
} catch (Exception ex) {
throw warpCompletionException(ex);
}
}, worker));
}, asyncResponderExecutor));
return asyncReturn(clazz);
}

/**
* Sets the executor used for handling responses asynchronously within
* the utility class.
*
* @param worker The executor to be used for handling responses asynchronously.
* @param asyncResponderExecutor The executor to be used for handling responses asynchronously.
*/
public static void setWorker(Executor worker) {
AsyncRpcProtocolPBUtil.worker = worker;
public static void setAsyncResponderExecutor(Executor asyncResponderExecutor) {
AsyncRpcProtocolPBUtil.asyncResponderExecutor = asyncResponderExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,23 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final String DFS_ROUTER_RPC_ENABLE =
FEDERATION_ROUTER_PREFIX + "rpc.enable";
public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
public static final String DFS_ROUTER_RPC_ENABLE_ASYNC =
FEDERATION_ROUTER_PREFIX + "rpc.async.enable";
public static final boolean DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT = false;
public static final String DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT =
FEDERATION_ROUTER_PREFIX + "rpc.async.handler.count";
public static final int DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT = 2;
public static final String DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT =
FEDERATION_ROUTER_PREFIX + "rpc.async.responder.count";
public static final int DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT = 10;
// HDFS Router Asynchronous RPC
public static final String DFS_ROUTER_ASYNC_RPC_ENABLE_KEY =
FEDERATION_ROUTER_PREFIX + "async.rpc.enable";
public static final boolean DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT = false;
public static final String FEDERATION_ROUTER_ASYNC_RPC_PREFIX =
FEDERATION_ROUTER_PREFIX + "async.rpc.";
// Example: ns1:count1,ns2:count2,ns3:count3
public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "ns.handler.count";
public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT =
"nsPlaceholder1:0,nsPlaceholder2:0";
public static final String DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "handler.count";
public static final int DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count";
public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10;

public static final String DFS_ROUTER_METRICS_ENABLE =
FEDERATION_ROUTER_PREFIX + "metrics.enable";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,7 @@
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.*;
import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
Expand All @@ -52,25 +35,13 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
Expand Down Expand Up @@ -219,8 +190,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,

private static final Logger LOG =
LoggerFactory.getLogger(RouterRpcServer.class);
private ExecutorService asyncRouterHandler;
private ExecutorService asyncRouterResponder;

/** Name service keyword to identify fan-out calls. */
public static final String CONCURRENT_NS = "concurrent";

/** Configuration for the RPC server. */
private Configuration conf;
Expand Down Expand Up @@ -278,6 +250,13 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
/** Schedule the router federation rename jobs. */
private BalanceProcedureScheduler fedRenameScheduler;
private boolean enableAsync;
private Map<String, Integer> nsAsyncHandlerCount = new ConcurrentHashMap<>();
private static Map<String, ExecutorService> routerAsyncHandlerExecutors = new ConcurrentHashMap<>();
private ExecutorService routerAsyncResponderExecutor;
private static ExecutorService routerDefaultAsyncHandlerExecutor;
private ExecutorService asyncRouterHandler;
private ExecutorService asyncRouterResponder;


/**
* Construct a router RPC server.
Expand Down Expand Up @@ -308,11 +287,11 @@ public RouterRpcServer(Configuration conf, Router router,
int handlerQueueSize = this.conf.getInt(DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY,
DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT);

this.enableAsync = conf.getBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC,
DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT);
LOG.info("Router enable async {}", this.enableAsync);
this.enableAsync = conf.getBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY,
DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT);
LOG.info("Router enable async rpc: {}", this.enableAsync);
if (this.enableAsync) {
initAsyncThreadPool();
initAsyncThreadPool(conf);
}
// Override Hadoop Common IPC setting
int readerQueueSize = this.conf.getInt(DFS_ROUTER_READER_QUEUE_SIZE_KEY,
Expand Down Expand Up @@ -463,22 +442,76 @@ public RouterRpcServer(Configuration conf, Router router,
/**
* Init router async handlers and router async responders.
*/
public void initAsyncThreadPool() {
int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
if (asyncRouterHandler == null) {
LOG.info("init router async handler count: {}", asyncHandlerCount);
asyncRouterHandler = Executors.newFixedThreadPool(
asyncHandlerCount, new AsyncThreadFactory("router async handler "));
public void initAsyncThreadPool(Configuration conf) {
LOG.info("Begin initialize asynchronous handler and responder thread pool.");
initNsAsyncHandlerCount();
Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(conf);
Set<String> unassignedNS = new HashSet<>();
allConfiguredNS.add(CONCURRENT_NS);

for (String nsId : allConfiguredNS) {
int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
if (dedicatedHandlers > 0) {
initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, nsId);
} else {
unassignedNS.add(nsId);
}
}

int asyncHandlerCountDefault = conf.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);

if (!unassignedNS.isEmpty()) {
LOG.warn("Async handler unassigned ns: {}", unassignedNS);
LOG.info("Use default async handler count {} for unassigned ns.", asyncHandlerCountDefault);
for (String nsId : unassignedNS) {
initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
}
}

if (routerDefaultAsyncHandlerExecutor == null) {
LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault);
routerAsyncResponderExecutor = Executors.newFixedThreadPool(
asyncHandlerCountDefault, new AsyncThreadFactory("Router Async Default Handler #"));
}
if (asyncRouterResponder == null) {
LOG.info("init router async responder count: {}", asyncResponderCount);
asyncRouterResponder = Executors.newFixedThreadPool(
asyncResponderCount, new AsyncThreadFactory("router async responder "));

int asyncResponderCount = conf.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
if (routerAsyncResponderExecutor == null) {
LOG.info("Initialize router async responder count: {}", asyncResponderCount);
routerAsyncResponderExecutor = Executors.newFixedThreadPool(
asyncResponderCount, new AsyncThreadFactory("Router Async Responder #"));
}
AsyncRpcProtocolPBUtil.setWorker(asyncRouterResponder);
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);
}

private void initNsAsyncHandlerCount() {
String configNsHandler = conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
if (StringUtils.isEmpty(configNsHandler)) {
LOG.error(
"The config key: {} is incorrect! The value is empty.",
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY);
configNsHandler = DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT;
}
String[] nsHandlers = configNsHandler.split(",");
for (String nsHandlerInfo : nsHandlers) {
String[] nsHandlerItems = nsHandlerInfo.split(":");
if (nsHandlerItems.length != 2 || StringUtils.isBlank(nsHandlerItems[0]) ||
!StringUtils.isNumeric(nsHandlerItems[1])) {
LOG.error("The config key: {} is incorrect! The value is {}.",
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, nsHandlerInfo);
continue;
}
nsAsyncHandlerCount.put(nsHandlerItems[0], Integer.parseInt(nsHandlerItems[1]));
}
}

private void initAsyncHandlerThreadPools4Ns(String nsId, int dedicatedHandlers) {
routerAsyncHandlerExecutors.computeIfAbsent(nsId, id -> Executors.newFixedThreadPool(
dedicatedHandlers, new AsyncThreadFactory("Router Async Handler for " + id + " #")));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,38 @@
</property>

<property>
<name>dfs.federation.router.rpc.async.enable</name>
<name>dfs.federation.router.async.rpc.enable</name>
<value>false</value>
<description>
If true, router will process the RPC request asynchronously.
</description>
</property>

<property>
<name>dfs.federation.router.async.rpc.ns.handler.count</name>
<value>nsPlaceholder1:0,nsPlaceholder2:0</value>
<description>

</description>
</property>

<property>
<name>dfs.federation.router.async.rpc.responder.count</name>
<value>10</value>
<description>
For those nameservices not in dfs.federation.router.async.rpc.ns.handler.count configuration entry,
use this value as the asynchronous handler thread counts.
</description>
</property>

<property>
<name>dfs.federation.router.async.rpc.responder.count</name>
<value>10</value>
<description>
The thread counts of async responder executor.
</description>
</property>

<property>
<name>dfs.federation.router.rpc-address</name>
<value>0.0.0.0:8888</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class TestAsyncRpcProtocolPBUtil {

@Before
public void setUp() throws IOException {
AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool());
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(ForkJoinPool.commonPool());
Configuration conf = new Configuration();
RPC.setProtocolEngine(conf, TestRpcBase.TestRpcService.class,
ProtobufRpcEngine2.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class TestRouterClientSideTranslatorPB {

@BeforeClass
public static void setUp() throws Exception {
AsyncRpcProtocolPBUtil.setWorker(ForkJoinPool.commonPool());
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(ForkJoinPool.commonPool());
conf = new HdfsConfiguration();
cluster = (new MiniDFSCluster.Builder(conf))
.numDataNodes(1).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@

import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.*;
import static org.junit.Assert.assertTrue;

/**
Expand Down Expand Up @@ -79,8 +78,8 @@ public static void setUpCluster() throws Exception {

// Reduce the number of RPC clients threads to overload the Router easy
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1);
routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1);
// We decrease the DN cache times to make the test faster
routerConf.setTimeDuration(
RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
Expand All @@ -107,7 +106,7 @@ public void setUp() throws IOException {
router = cluster.getRandomRouter();
routerFs = router.getFileSystem();
routerRpcServer = router.getRouterRpcServer();
routerRpcServer.initAsyncThreadPool();
routerRpcServer.initAsyncThreadPool(routerConf);
RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
routerRpcServer.getRPCMonitor(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@

import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.*;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -94,8 +93,8 @@ public static void setUpCluster() throws Exception {

// Reduce the number of RPC clients threads to overload the Router easy
routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 1);
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT, 1);
routerConf.setInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT, 1);
routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1);
routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1);
// We decrease the DN cache times to make the test faster
routerConf.setTimeDuration(
RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
Expand All @@ -122,7 +121,7 @@ public void setUp() throws IOException {
router = cluster.getRandomRouter();
routerFs = router.getFileSystem();
routerRpcServer = router.getRouterRpcServer();
routerRpcServer.initAsyncThreadPool();
routerRpcServer.initAsyncThreadPool(routerConf);
RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient(
routerConf, router.getRouter(), routerRpcServer.getNamenodeResolver(),
routerRpcServer.getRPCMonitor(),
Expand Down
Loading

0 comments on commit 7f0b75f

Please sign in to comment.