Skip to content

Commit

Permalink
YARN-11588. [Federation] [Addendum] Fix uncleaned threads in yarn rou…
Browse files Browse the repository at this point in the history
…ter thread pool executor. (#6222)
  • Loading branch information
slfan1989 authored Oct 26, 2023
1 parent 821ed83 commit 6529085
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4369,6 +4369,22 @@ public static boolean isAclEnabled(Configuration conf) {
public static final long DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME =
TimeUnit.SECONDS.toMillis(0); // 0s

/**
* This method configures the policy for core threads regarding termination
* when no tasks arrive within the keep-alive time.
* When set to false, core threads are never terminated due to a lack of tasks.
* When set to true, the same keep-alive policy
* that applies to non-core threads also applies to core threads.
* To prevent constant thread replacement,
* ensure that the keep-alive time is greater than zero when setting it to true.
* It's advisable to call this method before the pool becomes actively used.
*/
public static final String ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT =
ROUTER_PREFIX + "interceptor.user-thread-pool.allow-core-thread-time-out";

public static final boolean DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT =
false;

/** The address of the Router web application. */
public static final String ROUTER_WEBAPP_ADDRESS =
ROUTER_WEBAPP_PREFIX + "address";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5139,6 +5139,23 @@
</description>
</property>

<property>
<name>yarn.router.interceptor.user-thread-pool.allow-core-thread-time-out</name>
<value>false</value>
<description>
This method configures the policy for core threads regarding termination
when no tasks arrive within the keep-alive time.
When set to false, core threads are never terminated due to a lack of tasks.
When set to true, the same keep-alive policy
that applies to non-core threads also applies to core threads.
To prevent constant thread replacement,
ensure that the keep-alive time is greater than zero when setting it to true.
It's advisable to call this method before the pool becomes actively used.
We need to ensure that
yarn.router.interceptor.user-thread-pool.keep-alive-time is greater than 0.
</description>
</property>

<property>
<name>yarn.router.submit.interval.time</name>
<value>10ms</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,13 @@ public void init(String userName) {
keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory);

// Adding this line so that unused user threads will exit and be cleaned up if idle for too long
this.executorService.allowCoreThreadTimeOut(true);
boolean allowCoreThreadTimeOut = getConf().getBoolean(
YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT);

if (keepAliveTime > 0 && allowCoreThreadTimeOut) {
this.executorService.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
}

final Configuration conf = this.getConf();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,21 @@ public void init(String userName) {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("RPC Router RMAdminClient-" + userName + "-%d ").build();

long keepAliveTime = getConf().getTimeDuration(
YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME,
YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_KEEP_ALIVE_TIME, TimeUnit.SECONDS);

BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
keepAliveTime, TimeUnit.MILLISECONDS, workQueue, threadFactory);

boolean allowCoreThreadTimeOut = getConf().getBoolean(
YarnConfiguration.ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT);

if (keepAliveTime > 0 && allowCoreThreadTimeOut) {
this.executorService.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
}

federationFacade = FederationStateStoreFacade.getInstance(this.getConf());
this.conf = this.getConf();
Expand Down

0 comments on commit 6529085

Please sign in to comment.